Skip to content

Commit

Permalink
Simplify download counts processing (#7977)
Browse files Browse the repository at this point in the history
  • Loading branch information
szakarias committed Aug 26, 2024
1 parent 2764d3e commit e6631d1
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 75 deletions.
150 changes: 76 additions & 74 deletions app/lib/service/download_counts/sync_download_counts.dart
Original file line number Diff line number Diff line change
Expand Up @@ -74,113 +74,115 @@ Future<bool> processDownloadCounts(
}

bool hasFailedLines = false;
bool hasPartiallyFailedLines = false;
// Before '2024-05-03' the query generating the download count data had a bug
// in the reg exp causing incorrect versions to be stored.
final regExpQueryFixDate = DateTime.parse('2024-05-03');
final processedPackages = <String>{};
var lines = <String>[];

try {
lines = utf8.decode(bytes).split('\n');
} on FormatException catch (e) {
_logger.severe('Failed to utf8 decode bytes of $downloadCountsFileName/n'
'$e');
return false;
}

final pool = Pool(10);
await Future.wait(lines.map((line) async {
return await pool.withResource(() async {
if (line.isBlank) {
return;
}
String package;
final String package;
final Map<String, int> dayCounts;
try {
final data = json.decode(line) as Map<String, dynamic>;
final dayCounts = _extractDayCounts(data);
final data = json.decode(line);
if (data is! Map<String, dynamic>) {
throw FormatException('Download counts data is not valid json');
}

if (data['package'] is! String) {
throw FormatException('"package" must be a String');
}
package = data['package'] as String;
List<String> versions;
try {
// Validate that the package and version exist.
// First do it via the cached data, fall back to query for invisible
// and moderated packages.
versions = (await packageBackend.listVersionsCached(package))
.versions
.map((e) => e.version)
.toList();

final nonExistingVersions = <String>[];
dayCounts.keys.forEach((version) {
if (!versions.contains(version)) {
nonExistingVersions.add(version);
if (date.isBefore(regExpQueryFixDate)) {
// If the data is generated before the fix of the query, we
// ignore versions that do not exist.
_logger.warning(
'$package-$version appeared in download counts data but does'
' not exist');
} else {
hasPartiallyFailedLines = true;
_logger.severe(
'$package-$version appeared in download counts data but does'
' not exist');
}
}
});

nonExistingVersions.forEach((v) => dayCounts.remove(v));
} on NotFoundException catch (e) {
final pkg = await packageBackend.lookupPackage(package);
// The package is neither invisible or tombstoned, hence there is
// probably an error in the generated data.
if (pkg == null &&
(await packageBackend.lookupModeratedPackage(package)) == null) {
_logger.severe(
'Package $package appeared in download counts data for file '
'$downloadCountsFileName but does not exist.\n'
'Error: $e');
hasPartiallyFailedLines = true;
return;
} // else {
// The package is either invisible, tombstoned or has no versions.
// }
}
await downloadCountsBackend.updateDownloadCounts(
package,
dayCounts,
date,
);
processedPackages.add(package);

dayCounts = _extractDayCounts(data);
} on FormatException catch (e) {
_logger.severe(
'Failed to proccess line $line of file $downloadCountsFileName \n'
'$e');
hasFailedLines = true;
return;
}

List<String> versions;
try {
// Validate that the package and version exist and ignore the
// non-existing packages and versions.
// First do it via the cached data, fall back to query for invisible
// and moderated packages.
versions = (await packageBackend.listVersionsCached(package))
.versions
.map((e) => e.version)
.toList();

final nonExistingVersions = <String>[];
dayCounts.keys.forEach((version) {
if (!versions.contains(version)) {
nonExistingVersions.add(version);
if (date.isBefore(regExpQueryFixDate)) {
// If the data is generated before the fix of the query, we
// ignore versions that do not exist.
_logger.warning(
'$package-$version appeared in download counts data but does'
' not exist');
} else {
_logger.severe(
'$package-$version appeared in download counts data but does'
' not exist');
}
}
});

nonExistingVersions.forEach((v) => dayCounts.remove(v));
} on NotFoundException catch (e) {
final pkg = await packageBackend.lookupPackage(package);
// The package is neither invisible or tombstoned, hence there is
// probably an error in the generated data.
if (pkg == null &&
(await packageBackend.lookupModeratedPackage(package)) == null) {
_logger.severe(
'Package $package appeared in download counts data for file '
'$downloadCountsFileName but does not exist.\n'
'Error: $e');
return;
} // else {
// The package is either invisible, tombstoned or has no versions.
// }
}

await downloadCountsBackend.updateDownloadCounts(
package,
dayCounts,
date,
);
});
}));

if (hasFailedLines) {
return false;
} else {
// We only add '0's for unmentioned packages if all lines have been
// succesfully processed. Otherwise we risk adding '0' for a package that
// has no data due to some error during processing.
final allPackageNames = await packageBackend.allPackageNames().toSet();
final missingPackages =
allPackageNames.difference(processedPackages.toSet());

await Future.wait(missingPackages.map((package) async {
return await pool.withResource(() async {
// Calling 'updateDownloadCounts' for 'package' with an empty dataset
// causes '0' to be added for all versions, hereby indicating 0 downloads.
await downloadCountsBackend.updateDownloadCounts(package, {}, date);
});
}));
return !hasPartiallyFailedLines;
}
// Record zero downloads for this date for packages not mentioned in the
// query output.
final allPackageNames = await packageBackend.allPackageNames().toSet();
final missingPackages = allPackageNames.difference(processedPackages.toSet());
await Future.wait(missingPackages.map((package) async {
return await pool.withResource(() async {
// Calling 'updateDownloadCounts' for 'package' with an empty dataset
// causes '0' to be added for all versions, hereby indicating 0 downloads.
await downloadCountsBackend.updateDownloadCounts(package, {}, date);
});
}));
return !hasFailedLines;
}

const defaultNumberOfSyncDays = 5;
Expand Down
2 changes: 1 addition & 1 deletion app/test/service/download_counts/download_counts_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ void main() {
} finally {
await subscription.cancel();
}
expect(succeeded, false);
expect(succeeded, true);
expect(messages.first, contains('Could not find `package "hest"`.'));
// We still process the lines that are possible
final countData =
Expand Down

0 comments on commit e6631d1

Please sign in to comment.