Skip to content

Commit

Permalink
Download counts: handle several data files (#8011)
Browse files Browse the repository at this point in the history
  • Loading branch information
szakarias committed Sep 3, 2024
1 parent fe31b83 commit c7a0f7b
Show file tree
Hide file tree
Showing 3 changed files with 398 additions and 145 deletions.
263 changes: 143 additions & 120 deletions app/lib/service/download_counts/sync_download_counts.dart
Original file line number Diff line number Diff line change
Expand Up @@ -56,120 +56,152 @@ Map<String, int> _extractDayCounts(Map<String, dynamic> json) {
return dayCounts;
}

Future<bool> processDownloadCounts(
String downloadCountsFileName, DateTime date) async {
final downloadCountsBucket =
Future<Set<String>> processDownloadCounts(DateTime date) async {
final fileNamePrefix =
['daily_download_counts', formatDateForFileName(date), 'data-'].join('/');
final bucket =
storageService.bucket(activeConfiguration.downloadCountsBucketName!);
Uint8List bytes;
try {
bytes = await downloadCountsBucket.readAsBytes(downloadCountsFileName);
} on Exception catch (e) {
_logger.info('Failed to read "$downloadCountsFileName"./n'
'$e');
return false;
}
if (bytes.isEmpty) {
_logger.severe('$downloadCountsFileName is empty.');
return false;
}

bool hasFailedLines = false;
// Before '2024-05-03' the query generating the download count data had a bug
// in the reg exp causing incorrect versions to be stored.
final regExpQueryFixDate = DateTime.parse('2024-05-03');
final processedPackages = <String>{};
var lines = <String>[];

try {
lines = utf8.decode(bytes).split('\n');
} on FormatException catch (e) {
_logger.severe('Failed to utf8 decode bytes of $downloadCountsFileName/n'
'$e');
return false;
final failedFiles = <String>{};

final bucketEntries = await bucket.list(prefix: fileNamePrefix).toList();

if (bucketEntries.isEmpty) {
_logger.info('Failed to read any files with prefix "$fileNamePrefix"./n');
failedFiles.add(fileNamePrefix);
}

final processedPackages = <String>{};
final pool = Pool(10);
await Future.wait(lines.map((line) async {
return await pool.withResource(() async {
if (line.isBlank) {
return;
}
final String package;
final Map<String, int> dayCounts;
try {
final data = json.decode(line);
if (data is! Map<String, dynamic>) {
throw FormatException('Download counts data is not valid json');
}
for (final f in bucketEntries) {
final fileName = f.name;
if (f.isDirectory) {
_logger.severe(
'Cannot process $f, since $f is a directory. Expected a "jsonl" file.');
failedFiles.add(fileName);
continue;
}
if (!f.name.endsWith('jsonl')) {
_logger.severe('Cannot process $f. Expected a "jsonl" file.');
failedFiles.add(fileName);
continue;
}

Uint8List bytes;
try {
bytes = await bucket.readAsBytes(fileName);
} on Exception catch (e) {
_logger.info('Failed to read "$fileName"./n'
'$e');
failedFiles.add(fileName);
continue;
}

if (bytes.isEmpty) {
_logger.severe('$fileName is empty.');
failedFiles.add(fileName);
continue;
}
// Before '2024-05-03' the query generating the download count data had a bug
// in the reg exp causing incorrect versions to be stored.
final regExpQueryFixDate = DateTime.parse('2024-05-03');
List<String> lines;

try {
lines = utf8.decode(bytes).split('\n');
} on FormatException catch (e) {
_logger.severe('Failed to utf8 decode bytes of $fileName/n'
'$e');
failedFiles.add(fileName);
continue;
}

if (data['package'] is! String) {
throw FormatException('"package" must be a String');
await Future.wait(lines.map((line) async {
return await pool.withResource(() async {
if (line.isBlank) {
return;
}
package = data['package'] as String;
processedPackages.add(package);

dayCounts = _extractDayCounts(data);
} on FormatException catch (e) {
_logger.severe(
'Failed to proccess line $line of file $downloadCountsFileName \n'
'$e');
hasFailedLines = true;
return;
}

List<String> versions;
try {
// Validate that the package and version exist and ignore the
// non-existing packages and versions.
// First do it via the cached data, fall back to query for invisible
// and moderated packages.
versions = (await packageBackend.listVersionsCached(package))
.versions
.map((e) => e.version)
.toList();

final nonExistingVersions = <String>[];
dayCounts.keys.forEach((version) {
if (!versions.contains(version)) {
nonExistingVersions.add(version);
if (date.isBefore(regExpQueryFixDate)) {
// If the data is generated before the fix of the query, we
// ignore versions that do not exist.
_logger.warning(
'$package-$version appeared in download counts data but does'
' not exist');
} else {
_logger.severe(
'$package-$version appeared in download counts data but does'
' not exist');
}
final String package;
final Map<String, int> dayCounts;
try {
final data = json.decode(line);
if (data is! Map<String, dynamic>) {
throw FormatException('Download counts data is not valid json');
}
});

nonExistingVersions.forEach((v) => dayCounts.remove(v));
} on NotFoundException catch (e) {
final pkg = await packageBackend.lookupPackage(package);
// The package is neither invisible or tombstoned, hence there is
// probably an error in the generated data.
if (pkg == null &&
(await packageBackend.lookupModeratedPackage(package)) == null) {
_logger.severe(
'Package $package appeared in download counts data for file '
'$downloadCountsFileName but does not exist.\n'
'Error: $e');

if (data['package'] is! String) {
throw FormatException('"package" must be a String');
}
package = data['package'] as String;
processedPackages.add(package);

dayCounts = _extractDayCounts(data);
} on FormatException catch (e) {
_logger.severe('Failed to proccess line $line of file $fileName \n'
'$e');
failedFiles.add(fileName);
return;
} // else {
// The package is either invisible, tombstoned or has no versions.
// }
}

await downloadCountsBackend.updateDownloadCounts(
package,
dayCounts,
date,
);
});
}));
}

List<String> versions;
try {
// Validate that the package and version exist and ignore the
// non-existing packages and versions.
// First do it via the cached data, fall back to query for invisible
// and moderated packages.
versions = (await packageBackend.listVersionsCached(package))
.versions
.map((e) => e.version)
.toList();

final nonExistingVersions = <String>[];
dayCounts.keys.forEach((version) {
if (!versions.contains(version)) {
nonExistingVersions.add(version);
if (date.isBefore(regExpQueryFixDate)) {
// If the data is generated before the fix of the query, we
// ignore versions that do not exist.
_logger.warning(
'$package-$version appeared in download counts data but does'
' not exist');
} else {
_logger.severe(
'$package-$version appeared in download counts data but does'
' not exist');
}
}
});

nonExistingVersions.forEach((v) => dayCounts.remove(v));
} on NotFoundException catch (e) {
final pkg = await packageBackend.lookupPackage(package);
// The package is neither invisible or tombstoned, hence there is
// probably an error in the generated data.
if (pkg == null &&
(await packageBackend.lookupModeratedPackage(package)) == null) {
_logger.severe(
'Package $package appeared in download counts data for file '
'$fileName but does not exist.\n'
'Error: $e');
return;
} // else {
// The package is either invisible, tombstoned or has no versions.
// }
}
await downloadCountsBackend.updateDownloadCounts(
package,
dayCounts,
date,
);
});
}));
}

if (processedPackages.isEmpty) {
// We didn't successfully process any package. Either the bucket had no
// files or all files were invalid, hence we return early.
return failedFiles;
}

// Record zero downloads for this date for packages not mentioned in the
// query output.
Expand All @@ -182,7 +214,8 @@ Future<bool> processDownloadCounts(
await downloadCountsBackend.updateDownloadCounts(package, {}, date);
});
}));
return !hasFailedLines;

return failedFiles;
}

const defaultNumberOfSyncDays = 5;
Expand Down Expand Up @@ -212,29 +245,19 @@ Future<void> syncDownloadCounts(

for (int i = 0; i < numberOfSyncDays; i++) {
final syncDate = startingDate.addCalendarDays(-i);
// TODO(zarah): Handle the case where there is more than one file per day.
final fileName = [
'daily_download_counts',
formatDateForFileName(syncDate),
'data-000000000000.jsonl',
].join('/');
final success = await processDownloadCounts(fileName, syncDate);
if (!success) {
failedFiles.add(fileName);
}
failedFiles.addAll(await processDownloadCounts(syncDate));
}
final yesterdayFileName = [
final yesterdayFileNamePrefix = [
'daily_download_counts',
formatDateForFileName(yesterday),
'data-000000000000.jsonl',
].join('/');

if (failedFiles.isNotEmpty) {
_logger
.shout('Download counts sync was partial. The following files failed:\n'
'$failedFiles');

if (failedFiles.length > 1 || failedFiles.first != yesterdayFileName) {
if (!(failedFiles
.every((fileName) => fileName.contains(yesterdayFileNamePrefix)))) {
// We only disregard failure of yesterday's file. Otherwise we consider
// the sync to be broken.
throw Exception(
Expand Down
Loading

0 comments on commit c7a0f7b

Please sign in to comment.