Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RC] sweepID tables #7297

Open
wants to merge 8 commits into
base: release/0.981.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -960,6 +960,7 @@ private static TargetedSweeper uninitializedTargetedSweeper(
CoordinationAwareKnownAbandonedTransactionsStore abandonedTxnStore =
new CoordinationAwareKnownAbandonedTransactionsStore(
coordinationService, new AbandonedTimestampStoreImpl(kvs));
log.info("[PDS-586351] Creating an uninitialized targeted sweeper...");
return TargetedSweeper.createUninitialized(
metricsManager,
runtime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static org.assertj.core.api.Assertions.assertThat;

import com.palantir.atlasdb.memory.InMemoryAtlasDbConfig;
import com.palantir.atlasdb.sweep.queue.config.TargetedSweepInstallConfig.SweepIndexResetProgressStage;
import com.palantir.conjure.java.api.config.ssl.SslConfiguration;
import java.io.File;
import java.io.IOException;
Expand All @@ -42,6 +43,9 @@ public void canDeserializeAtlasDbConfig() throws IOException {
assertTimeLockConfigDeserializedCorrectly(config.timelock().get());

assertThat(config.leader()).isNotPresent();

assertThat(config.targetedSweep().sweepIndexResetProgressStage())
.isEqualTo(SweepIndexResetProgressStage.WRITE_IMMEDIATE_FORMAT_AND_SKIP_UNKNOWNS);
}

@Test
Expand Down
2 changes: 2 additions & 0 deletions atlasdb-config/src/test/resources/test-config.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
atlasdb:
namespace: brian
targetedSweep:
sweepIndexResetProgressStage: WRITE_IMMEDIATE_FORMAT_AND_SKIP_UNKNOWNS

keyValueService:
type: memory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,49 @@
import com.google.common.primitives.Ints;
import com.palantir.atlasdb.encoding.PtBytes;
import com.palantir.atlasdb.ptobject.EncodingUtils;
import com.palantir.atlasdb.sweep.queue.config.TargetedSweepInstallConfig;
import com.palantir.atlasdb.sweep.queue.id.SweepTableIndices;
import com.palantir.conjure.java.jackson.optimizations.ObjectMapperOptimizations;
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.exceptions.SafeIllegalStateException;
import com.palantir.logsafe.exceptions.SafeRuntimeException;
import java.io.IOException;
import java.util.NoSuchElementException;
import java.util.Optional;

public final class WriteReferencePersister {
private static final byte[] writePrefix = {1};
private static final byte[] ZERO_BYTE = {0};
private static final byte[] ONE_BYTE = {1};

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
.registerModule(new Jdk8Module())
.registerModules(ObjectMapperOptimizations.createModules());
private static final StoredWriteReference DUMMY = ImmutableStoredWriteReference.of(PtBytes.EMPTY_BYTE_ARRAY);

private final SweepTableIndices tableIndices;
private final WriteMethod writeMethod;
private final UnknownIdentifierHandlingMethod unknownIdentifierHandlingMethod;

public WriteReferencePersister(SweepTableIndices tableIndices) {
WriteReferencePersister(
SweepTableIndices tableIndices,
WriteMethod writeMethod,
UnknownIdentifierHandlingMethod unknownIdentifierHandlingMethod) {
this.tableIndices = tableIndices;
this.writeMethod = writeMethod;
this.unknownIdentifierHandlingMethod = unknownIdentifierHandlingMethod;
}

public static WriteReferencePersister create(
SweepTableIndices sweepTableIndices,
TargetedSweepInstallConfig.SweepIndexResetProgressStage resetProgressStage) {
return new WriteReferencePersister(
sweepTableIndices,
resetProgressStage.shouldWriteImmediateFormat()
? WriteMethod.TABLE_NAME_AS_STRING_BINARY
: WriteMethod.TABLE_ID_BINARY,
resetProgressStage.shouldSkipUnknowns()
? UnknownIdentifierHandlingMethod.IGNORE
: UnknownIdentifierHandlingMethod.THROW);
}

public Optional<WriteReference> unpersist(StoredWriteReference writeReference) {
Expand Down Expand Up @@ -72,20 +98,38 @@ public Optional<WriteReference> visitTableNameAsStringBinary(byte[] ref) {
public Optional<WriteReference> visitTableIdBinary(byte[] ref) {
int offset = 1;
int tableId = Ints.checkedCast(EncodingUtils.decodeUnsignedVarLong(ref, offset));
TableReference tableReference = tableIndices.getTableReference(tableId);
Optional<TableReference> maybeTableReference = safeGetTableReference(tableId);
if (maybeTableReference.isEmpty()) {
return Optional.empty();
}
offset += EncodingUtils.sizeOfUnsignedVarLong(tableId);
byte[] row = EncodingUtils.decodeSizedBytes(ref, offset);
offset += EncodingUtils.sizeOfSizedBytes(row);
byte[] column = EncodingUtils.decodeSizedBytes(ref, offset);
offset += EncodingUtils.sizeOfSizedBytes(column);
long isTombstone = EncodingUtils.decodeUnsignedVarLong(ref, offset);
return Optional.of(ImmutableWriteReference.builder()
.tableRef(tableReference)
.tableRef(maybeTableReference.get())
.cell(Cell.create(row, column))
.isTombstone(isTombstone == 1)
.build());
}

private Optional<TableReference> safeGetTableReference(int tableId) {
try {
return Optional.of(tableIndices.getTableReference(tableId));
} catch (NoSuchElementException e) {
switch (unknownIdentifierHandlingMethod) {
case IGNORE:
return Optional.empty();
case THROW:
throw e;
default:
throw new SafeIllegalStateException("Unexpected unknown identifier handling method", e);
}
}
}

@Override
public Optional<WriteReference> visitDummy() {
return Optional.empty();
Expand All @@ -98,10 +142,43 @@ public StoredWriteReference persist(Optional<WriteReference> writeReference) {
return DUMMY;
}
WriteReference writeRef = writeReference.get();
byte[] tableId = EncodingUtils.encodeUnsignedVarLong(tableIndices.getTableId(writeRef.tableRef()));
byte[] tableIdentifier = getTableIdentifier(writeRef.tableRef());
byte[] row = EncodingUtils.encodeSizedBytes(writeRef.cell().getRowName());
byte[] column = EncodingUtils.encodeSizedBytes(writeRef.cell().getColumnName());
byte[] isTombstone = EncodingUtils.encodeUnsignedVarLong(writeRef.isTombstone() ? 1 : 0);
return ImmutableStoredWriteReference.of(EncodingUtils.add(writePrefix, tableId, row, column, isTombstone));
return ImmutableStoredWriteReference.of(
EncodingUtils.add(writeMethod.getBytePrefix(), tableIdentifier, row, column, isTombstone));
}

private byte[] getTableIdentifier(TableReference tableReference) {
switch (writeMethod) {
case TABLE_ID_BINARY:
return EncodingUtils.encodeUnsignedVarLong(tableIndices.getTableId(tableReference));
case TABLE_NAME_AS_STRING_BINARY:
return EncodingUtils.encodeVarString(tableReference.toString());
default:
throw new SafeIllegalStateException("Unhandled write method", SafeArg.of("writeMethod", writeMethod));
}
}

@SuppressWarnings("ImmutableEnumChecker") // Overhead of needless wrapping is probably undesirable.
enum WriteMethod {
TABLE_NAME_AS_STRING_BINARY(ZERO_BYTE),
TABLE_ID_BINARY(ONE_BYTE);

private final byte[] bytePrefix;

WriteMethod(byte[] bytePrefix) {
this.bytePrefix = bytePrefix;
}

byte[] getBytePrefix() {
return bytePrefix;
}
}

enum UnknownIdentifierHandlingMethod {
THROW,
IGNORE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@
*/
public class TargetedSweepMetricPublicationFilter implements MetricPublicationFilter {
@VisibleForTesting
static final long MINIMUM_READS_WRITES_TO_BE_CONSIDERED_ACTIVE = 1_000;
static final long MINIMUM_READS_WRITES_TO_BE_CONSIDERED_ACTIVE = 0;

@VisibleForTesting
static final Duration MINIMUM_STALE_DURATION = Duration.ofHours(4);
static final Duration MINIMUM_STALE_DURATION = Duration.ofMillis(1);

private final AtomicBoolean publicationLatch;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,19 @@
package com.palantir.atlasdb.sweep.queue;

import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableSet;
import com.palantir.atlasdb.AtlasDbConstants;
import com.palantir.atlasdb.keyvalue.api.KeyValueService;
import com.palantir.atlasdb.keyvalue.api.TableReference;
import com.palantir.atlasdb.protos.generated.TableMetadataPersistence.LogSafety;
import com.palantir.atlasdb.schema.TargetedSweepSchema;
import com.palantir.atlasdb.schema.generated.TargetedSweepTableFactory;
import com.palantir.atlasdb.sweep.Sweeper;
import com.palantir.atlasdb.sweep.metrics.SweepOutcome;
import com.palantir.atlasdb.sweep.metrics.TargetedSweepMetrics;
import com.palantir.atlasdb.sweep.queue.SweepQueueReader.ReadBatchingRuntimeContext;
import com.palantir.atlasdb.sweep.queue.clear.DefaultTableClearer;
import com.palantir.atlasdb.sweep.queue.config.TargetedSweepInstallConfig.SweepIndexResetProgressStage;
import com.palantir.atlasdb.table.description.Schemas;
import com.palantir.atlasdb.table.description.SweeperStrategy;
import com.palantir.atlasdb.transaction.impl.TimelockTimestampServiceAdapter;
Expand All @@ -37,8 +41,10 @@
import com.palantir.logsafe.logger.SafeLoggerFactory;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;

public final class SweepQueue implements MultiTableSweepQueueWriter {
Expand Down Expand Up @@ -74,9 +80,18 @@ public static SweepQueue create(
TransactionService transaction,
AbandonedTransactionConsumer abortedTransactionConsumer,
TargetedSweepFollower follower,
ReadBatchingRuntimeContext readBatchingRuntimeContext) {
SweepQueueFactory factory =
SweepQueueFactory.create(metrics, kvs, timelock, shardsConfig, transaction, readBatchingRuntimeContext);
ReadBatchingRuntimeContext readBatchingRuntimeContext,
Function<TableReference, Optional<LogSafety>> tablesToTrackDeletions,
SweepIndexResetProgressStage resetProgressStage) {
SweepQueueFactory factory = SweepQueueFactory.create(
metrics,
kvs,
timelock,
shardsConfig,
transaction,
readBatchingRuntimeContext,
tablesToTrackDeletions,
resetProgressStage);
return new SweepQueue(factory, follower, abortedTransactionConsumer);
}

Expand All @@ -89,7 +104,13 @@ public static MultiTableSweepQueueWriter createWriter(
TimelockService timelock,
Supplier<Integer> shardsConfig,
ReadBatchingRuntimeContext readBatchingRuntimeContext) {
return SweepQueueFactory.create(metrics, kvs, timelock, shardsConfig, readBatchingRuntimeContext)
return SweepQueueFactory.create(
metrics,
kvs,
timelock,
shardsConfig,
readBatchingRuntimeContext,
SweepIndexResetProgressStage.NO_ACTIVE_RESET)
.createWriter();
}

Expand Down Expand Up @@ -232,6 +253,7 @@ public static final class SweepQueueFactory {
private final KeyValueService kvs;
private final TimelockService timelock;
private final ReadBatchingRuntimeContext readBatchingRuntimeContext;
private final Function<TableReference, Optional<LogSafety>> tablesToTrackDeletions;

private SweepQueueFactory(
ShardProgress progress,
Expand All @@ -242,7 +264,8 @@ private SweepQueueFactory(
TargetedSweepMetrics metrics,
KeyValueService kvs,
TimelockService timelock,
ReadBatchingRuntimeContext readBatchingRuntimeContext) {
ReadBatchingRuntimeContext readBatchingRuntimeContext,
Function<TableReference, Optional<LogSafety>> tablesToTrackDeletions) {
this.progress = progress;
this.numShards = numShards;
this.cells = cells;
Expand All @@ -252,19 +275,29 @@ private SweepQueueFactory(
this.kvs = kvs;
this.timelock = timelock;
this.readBatchingRuntimeContext = readBatchingRuntimeContext;
this.tablesToTrackDeletions = tablesToTrackDeletions;
}

static SweepQueueFactory create(
TargetedSweepMetrics metrics,
KeyValueService kvs,
TimelockService timelock,
Supplier<Integer> shardsConfig,
ReadBatchingRuntimeContext readBatchingRuntimeContext) {
ReadBatchingRuntimeContext readBatchingRuntimeContext,
SweepIndexResetProgressStage resetProgressStage) {
// It is OK that the transaction service is different from the one used by the transaction manager,
// as transaction services must not hold any local state in them that would affect correctness.
TransactionService transaction =
TransactionServices.createRaw(kvs, new TimelockTimestampServiceAdapter(timelock), false);
return create(metrics, kvs, timelock, shardsConfig, transaction, readBatchingRuntimeContext);
return create(
metrics,
kvs,
timelock,
shardsConfig,
transaction,
readBatchingRuntimeContext,
_unused -> Optional.empty(),
resetProgressStage);
}

static SweepQueueFactory create(
Expand All @@ -273,13 +306,42 @@ static SweepQueueFactory create(
TimelockService timelock,
Supplier<Integer> shardsConfig,
TransactionService transaction,
ReadBatchingRuntimeContext readBatchingRuntimeContext) {
ReadBatchingRuntimeContext readBatchingRuntimeContext,
Function<TableReference, Optional<LogSafety>> tablesToTrackDeletions,
SweepIndexResetProgressStage resetProgressStage) {
Schemas.createTablesAndIndexes(TargetedSweepSchema.INSTANCE.getLatestSchema(), kvs);
log.info("[PDS-586351] Creating a sweep queue factory...");
if (resetProgressStage.shouldInvalidateOldMappings()) {
log.info("Invalidating old sweep mappings... now truncating sweep identifier tables.");

TargetedSweepTableFactory tableFactory = TargetedSweepTableFactory.of();
try {
kvs.truncateTables(ImmutableSet.of(
tableFactory.getSweepIdToNameTable(null).getTableRef(),
tableFactory.getSweepNameToIdTable(null).getTableRef()));
log.info("Successfully truncated the sweep identifier tables.");
} catch (Exception e) {
log.warn(
"A failure was observed when truncating the sweep identifier tables. If you are running"
+ " this as part of a broader clearance task, you MUST make sure that the success"
+ " message is logged BEFORE considering the reset to have been performed. Seeing this"
+ " message is neither an indication that the operation was success, nor is it an"
+ " indication that the operation was not a success.",
e);
throw e;
}
} else {
log.info(
"Not invalidating old sweep mappings, because we don't believe we've been configured to do"
+ " this.",
SafeArg.of("resetProgressStage", resetProgressStage));
}

ShardProgress shardProgress = new ShardProgress(kvs);
Supplier<Integer> shards =
createProgressUpdatingSupplier(shardsConfig, shardProgress, SweepQueueUtils.REFRESH_TIME);
WriteInfoPartitioner partitioner = new WriteInfoPartitioner(kvs, shards);
SweepableCells cells = new SweepableCells(kvs, partitioner, metrics, transaction);
SweepableCells cells = new SweepableCells(kvs, partitioner, metrics, transaction, resetProgressStage);
SweepableTimestamps timestamps = new SweepableTimestamps(kvs, partitioner);
return new SweepQueueFactory(
shardProgress,
Expand All @@ -290,7 +352,8 @@ static SweepQueueFactory create(
metrics,
kvs,
timelock,
readBatchingRuntimeContext);
readBatchingRuntimeContext,
tablesToTrackDeletions);
}

private SweepQueueWriter createWriter() {
Expand All @@ -302,7 +365,11 @@ private SweepQueueReader createReader() {
}

private SweepQueueDeleter createDeleter(TargetedSweepFollower follower) {
return new SweepQueueDeleter(kvs, follower, new DefaultTableClearer(kvs, timelock::getImmutableTimestamp));
return new SweepQueueDeleter(
kvs,
follower,
new DefaultTableClearer(kvs, timelock::getImmutableTimestamp),
tablesToTrackDeletions);
}

private SweepQueueCleaner createCleaner() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import com.palantir.atlasdb.schema.generated.TargetedSweepTableFactory;
import com.palantir.atlasdb.sweep.CommitTsCache;
import com.palantir.atlasdb.sweep.metrics.TargetedSweepMetrics;
import com.palantir.atlasdb.sweep.queue.config.TargetedSweepInstallConfig.SweepIndexResetProgressStage;
import com.palantir.atlasdb.sweep.queue.id.SweepTableIndices;
import com.palantir.atlasdb.transaction.impl.TransactionConstants;
import com.palantir.atlasdb.transaction.service.TransactionService;
Expand Down Expand Up @@ -75,10 +76,11 @@ public SweepableCells(
KeyValueService kvs,
WriteInfoPartitioner partitioner,
TargetedSweepMetrics metrics,
TransactionService transactionService) {
TransactionService transactionService,
SweepIndexResetProgressStage resetProgressStage) {
super(kvs, TargetedSweepTableFactory.of().getSweepableCellsTable(null).getTableRef(), partitioner, metrics);
this.commitTsCache = CommitTsCache.create(transactionService);
this.writeReferencePersister = new WriteReferencePersister(new SweepTableIndices(kvs));
this.writeReferencePersister = WriteReferencePersister.create(new SweepTableIndices(kvs), resetProgressStage);
}

@Override
Expand Down
Loading