Skip to content

Commit

Permalink
Remove moderated package and version assets after 3 years. (#8055)
Browse files Browse the repository at this point in the history
  • Loading branch information
isoos committed Sep 16, 2024
1 parent f24755c commit 1e3830b
Show file tree
Hide file tree
Showing 4 changed files with 266 additions and 26 deletions.
185 changes: 161 additions & 24 deletions app/lib/admin/backend.dart
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ import 'package:clock/clock.dart';
import 'package:collection/collection.dart';
import 'package:convert/convert.dart';
import 'package:gcloud/service_scope.dart' as ss;
import 'package:gcloud/storage.dart';
import 'package:logging/logging.dart';
import 'package:meta/meta.dart';
import 'package:pool/pool.dart';

import '../account/backend.dart';
Expand All @@ -22,14 +24,19 @@ import '../account/models.dart';
import '../admin/models.dart';
import '../audit/models.dart';
import '../package/backend.dart'
show checkPackageVersionParams, packageBackend, purgePackageCache;
show
checkPackageVersionParams,
packageBackend,
purgePackageCache,
tarballObjectName;
import '../package/models.dart';
import '../publisher/models.dart';
import '../scorecard/backend.dart';
import '../shared/configuration.dart';
import '../shared/datastore.dart';
import '../shared/email.dart';
import '../shared/exceptions.dart';
import '../shared/storage.dart';
import '../shared/versions.dart';
import '../task/backend.dart';
import 'actions/actions.dart' show AdminAction;
Expand Down Expand Up @@ -297,13 +304,27 @@ class AdminBackend {
/// Creates a [ModeratedPackage] instance (if not already present) in
/// Datastore representing the removed package. No new package with the same
/// name can be published.
///
/// Verifies the current authenticated user for admin permissions.
Future<void> removePackage(String packageName) async {
final caller =
await requireAuthenticatedAdmin(AdminPermission.removePackage);

_logger.info('${caller.displayId}) initiated the delete '
'of package $packageName');
await _removePackage(packageName);
}

/// Removes the package from the Datastore and updates other related
/// entities. It is safe to call [removePackage] on an already removed
/// package, as the call is idempotent.
///
/// Creates a [ModeratedPackage] instance (if not already present) in
/// Datastore representing the removed package. No new package with the same
/// name can be published.
Future<void> _removePackage(
String packageName, {
DateTime? moderated,
}) async {
final packageKey = _db.emptyKey.append(Package, id: packageName);
final versions = (await _db
.query<PackageVersion>(ancestorKey: packageKey)
Expand All @@ -312,6 +333,49 @@ class AdminBackend {
.toList())
.toSet();

final pool = Pool(10);
final futures = <Future>[];
for (final v in versions) {
// Deleting public and canonical archives, 404 errors are ignored.
futures.add(pool.withResource(
() => packageBackend.removePackageTarball(packageName, v)));
}
await Future.wait(futures);
await pool.close();

_logger.info('Removing package from Package.replacedBy...');
final replacedByQuery = _db.query<Package>()
..filter('replacedBy =', packageName);
await for (final pkg in replacedByQuery.run()) {
await withRetryTransaction(_db, (tx) async {
final p = await tx.lookupOrNull<Package>(pkg.key);
if (p == null) {
return;
}
if (p.replacedBy == packageName) {
p.replacedBy = null;
tx.insert(p);
}
});
}

_logger.info('Removing package from PackageVersionInfo ...');
await _db.deleteWithQuery(
_db.query<PackageVersionInfo>()..filter('package =', packageName));

_logger.info('Removing package from PackageVersionAsset ...');
await _db.deleteWithQuery(
_db.query<PackageVersionAsset>()..filter('package =', packageName));

_logger.info('Removing package from Like ...');
await _db.deleteWithQuery(
_db.query<Like>()..filter('packageName =', packageName));

_logger.info('Removing package from AuditLogRecord...');
await _db.deleteWithQuery(
_db.query<AuditLogRecord>()..filter('packages =', packageName));

_logger.info('Removing Package from Datastore...');
await withRetryTransaction(_db, (tx) async {
final package = await tx.lookupOrNull<Package>(packageKey);
if (package == null) {
Expand All @@ -335,11 +399,13 @@ class AdminBackend {
.map((pv) => pv.version!)
.toList());

versions.addAll(package.deletedVersions ?? const <String>[]);

tx.insert(ModeratedPackage()
..parentKey = _db.emptyKey
..id = packageName
..name = packageName
..moderated = clock.now().toUtc()
..moderated = moderated ?? clock.now().toUtc()
..versions = versions.toList()
..publisherId = package.publisherId
..uploaders = package.uploaders);
Expand All @@ -348,31 +414,10 @@ class AdminBackend {
}
});

final pool = Pool(10);
final futures = <Future>[];
versions.forEach((final v) {
futures.add(pool.withResource(
() => packageBackend.removePackageTarball(packageName, v)));
});
await Future.wait(futures);
await pool.close();

_logger.info('Removing package from PackageVersion ...');
await _db
.deleteWithQuery(_db.query<PackageVersion>(ancestorKey: packageKey));

_logger.info('Removing package from PackageVersionInfo ...');
await _db.deleteWithQuery(
_db.query<PackageVersionInfo>()..filter('package =', packageName));

_logger.info('Removing package from PackageVersionAsset ...');
await _db.deleteWithQuery(
_db.query<PackageVersionAsset>()..filter('package =', packageName));

_logger.info('Removing package from Like ...');
await _db.deleteWithQuery(
_db.query<Like>()..filter('packageName =', packageName));

_logger.info('Package "$packageName" got successfully removed.');
_logger.info(
'NOTICE: Redis caches referencing the package will expire given time.');
Expand Down Expand Up @@ -740,4 +785,96 @@ class AdminBackend {
}
return refCase;
}

/// Scans datastore and deletes moderated subjects where the last action
/// was more than 3 years ago.
Future<void> deleteModeratedSubjects({
@visibleForTesting DateTime? before,
}) async {
before ??= clock.ago(days: 3 * 366).toUtc(); // extra buffer days
final canonicalBucket =
storageService.bucket(activeConfiguration.canonicalPackagesBucketName!);

// delete packages
final pQuery = _db.query<Package>()
..filter('moderatedAt <', before)
..order('moderatedAt');
await for (final package in pQuery.run()) {
// sanity check
if (!package.isModerated) {
continue;
}

_logger.info('Deleting moderated package: ${package.name}');
await _removePackage(
package.name!,
moderated: package.moderatedAt,
);
_logger.info('Deleted moderated package: ${package.name}');
}

// delete package versions
final pvQuery = _db.query<PackageVersion>()
..filter('moderatedAt <', before)
..order('moderatedAt');
await for (final version in pvQuery.run()) {
// sanity check
if (!version.isModerated) {
continue;
}

_logger.info(
'Deleting moderated package version: ${version.qualifiedVersionKey}');

// deleting from canonical bucket
final objectName = tarballObjectName(version.package, version.version!);
final info = await canonicalBucket.tryInfo(objectName);
if (info != null) {
await canonicalBucket.delete(objectName);
}

// deleting from datastore
await withRetryTransaction(_db, (tx) async {
final pv = await tx.lookupOrNull<PackageVersion>(version.key);
if (pv == null) {
return null;
}
final p = await tx.lookupOrNull<Package>(version.packageKey!);
if (p == null) {
return;
}
final pvi = await tx.lookupOrNull<PackageVersionInfo>(_db.emptyKey
.append(PackageVersionInfo,
id: version.qualifiedVersionKey.qualifiedVersion));

p.deletedVersions ??= [];
p.deletedVersions!.add(version.version!);
p.deletedVersions!.sort();
p.updated = clock.now().toUtc();
tx.insert(p);

// delete version + info + assets
tx.delete(pv.key);
if (pvi != null) {
tx.delete(pvi.key);

for (final assetKind in pvi.assets) {
tx.delete(
_db.emptyKey.append(PackageVersionAsset,
id: Uri(pathSegments: [
version.package,
version.version!,
assetKind
]).path),
);
}
}
});
_logger.info(
'Deleted moderated package version: ${version.qualifiedVersionKey}');
}

// TODO: delete publisher instances
// TODO: mark user instances deleted
}
}
12 changes: 10 additions & 2 deletions app/lib/tool/neat_task/pub_dev_tasks.dart
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@ import 'dart:io';
import 'package:gcloud/service_scope.dart' as ss;
import 'package:logging/logging.dart';
import 'package:neat_periodic_task/neat_periodic_task.dart';
import 'package:pub_dev/package/export_api_to_bucket.dart';
import 'package:pub_dev/service/download_counts/sync_download_counts.dart';

import '../../account/backend.dart';
import '../../account/consent_backend.dart';
import '../../admin/backend.dart';
import '../../audit/backend.dart';
import '../../package/backend.dart';
import '../../package/export_api_to_bucket.dart';
import '../../search/backend.dart';
import '../../service/download_counts/sync_download_counts.dart';
import '../../service/email/backend.dart';
import '../../service/security_advisories/sync_security_advisories.dart';
import '../../service/topics/count_topics.dart';
Expand Down Expand Up @@ -122,6 +123,13 @@ void _setupGenericPeriodicTasks() {
task: () async => await apiExporter?.uploadPkgNameCompletionData(),
);

// Deletes moderated packages, versions, publishers and users.
_weekly(
name: 'delete-moderated-subjects',
isRuntimeVersioned: false,
task: () async => adminBackend.deleteModeratedSubjects(),
);

// Deletes task status entities where the status hasn't been updated
// for more than a month.
_weekly(
Expand Down
42 changes: 42 additions & 0 deletions app/test/admin/moderate_package_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ import 'package:_pub_shared/data/account_api.dart';
import 'package:_pub_shared/data/admin_api.dart';
import 'package:_pub_shared/data/package_api.dart';
import 'package:clock/clock.dart';
import 'package:gcloud/storage.dart';
import 'package:http/http.dart' as http;
import 'package:pub_dev/account/backend.dart';
import 'package:pub_dev/admin/actions/actions.dart';
import 'package:pub_dev/admin/backend.dart';
import 'package:pub_dev/admin/models.dart';
import 'package:pub_dev/fake/backend/fake_auth_provider.dart';
import 'package:pub_dev/fake/backend/fake_pub_worker.dart';
Expand All @@ -18,6 +21,7 @@ import 'package:pub_dev/scorecard/backend.dart';
import 'package:pub_dev/search/backend.dart';
import 'package:pub_dev/shared/configuration.dart';
import 'package:pub_dev/shared/datastore.dart';
import 'package:pub_dev/shared/storage.dart';
import 'package:pub_dev/tool/maintenance/update_public_bucket.dart';
import 'package:test/test.dart';

Expand Down Expand Up @@ -369,5 +373,43 @@ void main() {
message: 'ModerationCase.status ("no-action") != "pending".',
);
});

testWithProfile(
'cleanup deletes datastore entities and canonical archive file',
fn: () async {
// delete old version
await accountBackend.withBearerToken(siteAdminToken, () async {
await adminBackend.removePackageVersion('oxygen', '1.0.0');
});

// canonical file is present
final bucket = storageService
.bucket(activeConfiguration.canonicalPackagesBucketName!);
expect(
await bucket.tryInfo(tarballObjectName('oxygen', '1.2.0')),
isNotNull,
);

// moderate and cleanup
await _moderate('oxygen', state: true, caseId: 'none');
await adminBackend.deleteModeratedSubjects(before: clock.now().toUtc());

// no package, version or canonical file
expect(await packageBackend.lookupPackage('oxygen'), isNull);
expect(
await packageBackend.lookupPackageVersion('oxygen', '1.2.0'),
isNull,
);
expect(
await bucket.tryInfo(tarballObjectName('oxygen', '1.2.0')),
isNull,
);

// ModeratedPackage entity contains both previously deleted and current versions
final mp = await packageBackend.lookupModeratedPackage('oxygen');
expect(mp, isNotNull);
expect(mp!.versions, contains('1.0.0'));
expect(mp.versions, contains('1.2.0'));
});
});
}
Loading

0 comments on commit 1e3830b

Please sign in to comment.