diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml index 587f5e66a6..89af1d8864 100644 --- a/.palantir/revapi.yml +++ b/.palantir/revapi.yml @@ -79,6 +79,36 @@ acceptedBreaks: - code: "java.method.addedToInterface" new: "method boolean com.palantir.atlasdb.cell.api.TransactionKeyValueService::isValid(long)" justification: "internal API" + "0.1062.0": + com.palantir.atlasdb:atlasdb-api: + - code: "java.annotation.removed" + old: "parameter void com.palantir.atlasdb.transaction.api.precommit.PreCommitRequirementValidator::throwIfImmutableTsOrCommitLocksExpired(===com.palantir.lock.v2.LockToken===)" + new: "parameter void com.palantir.atlasdb.transaction.api.precommit.PreCommitRequirementValidator::throwIfImmutableTsOrCommitLocksExpired(===java.util.Set===)" + justification: "Prototyping" + - code: "java.annotation.removed" + old: "parameter void com.palantir.atlasdb.transaction.api.precommit.PreCommitRequirementValidator::throwIfPreCommitRequirementsNotMet(===com.palantir.lock.v2.LockToken===,\ + \ long)" + new: "parameter void com.palantir.atlasdb.transaction.api.precommit.PreCommitRequirementValidator::throwIfPreCommitRequirementsNotMet(===java.util.Set===,\ + \ long)" + justification: "Prototyping" + - code: "java.method.abstractMethodAdded" + new: "method void com.palantir.atlasdb.keyvalue.api.watch.LockWatchManager::logState()" + justification: "Prototype" + - code: "java.method.addedToInterface" + new: "method void com.palantir.atlasdb.transaction.api.Transaction::putDelayed(com.palantir.atlasdb.keyvalue.api.TableReference,\ + \ java.util.List>)" + justification: "Prototype" + - code: "java.method.parameterTypeChanged" + old: "parameter void com.palantir.atlasdb.transaction.api.precommit.PreCommitRequirementValidator::throwIfImmutableTsOrCommitLocksExpired(===com.palantir.lock.v2.LockToken===)" + new: "parameter void com.palantir.atlasdb.transaction.api.precommit.PreCommitRequirementValidator::throwIfImmutableTsOrCommitLocksExpired(===java.util.Set===)" + justification: "Prototyping" + - code: "java.method.parameterTypeChanged" + old: "parameter void com.palantir.atlasdb.transaction.api.precommit.PreCommitRequirementValidator::throwIfPreCommitRequirementsNotMet(===com.palantir.lock.v2.LockToken===,\ + \ long)" + new: "parameter void com.palantir.atlasdb.transaction.api.precommit.PreCommitRequirementValidator::throwIfPreCommitRequirementsNotMet(===java.util.Set===,\ + \ long)" + justification: "Prototyping" "0.1073.0": com.palantir.atlasdb:atlasdb-api: - code: "java.method.parameterTypeChanged" @@ -151,6 +181,14 @@ acceptedBreaks: new: "method com.palantir.lock.LockState com.palantir.atlasdb.factory.timelock.TimeoutSensitiveLockRpcClient::getLockState(java.lang.String,\ \ com.palantir.lock.LockDescriptor)" justification: "LockRpcClient#getLockState is broken" + "0.1129.0": + com.palantir.atlasdb:atlasdb-api: + - code: "java.method.addedToInterface" + new: "method long com.palantir.atlasdb.transaction.api.TransactionManager::getCommitImmutableTimestamp()" + justification: "Prototype" + - code: "java.method.addedToInterface" + new: "method void com.palantir.atlasdb.transaction.api.Transaction::putDelayed(java.util.List)" + justification: "Prototype" "0.770.0": com.palantir.atlasdb:atlasdb-api: - code: "java.class.removed" diff --git a/atlasdb-api/src/main/java/com/palantir/atlasdb/transaction/api/DelayedWrite.java b/atlasdb-api/src/main/java/com/palantir/atlasdb/transaction/api/DelayedWrite.java new file mode 100644 index 0000000000..74700e990f --- /dev/null +++ b/atlasdb-api/src/main/java/com/palantir/atlasdb/transaction/api/DelayedWrite.java @@ -0,0 +1,34 @@ +/* + * (c) Copyright 2024 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.atlasdb.transaction.api; + +import com.palantir.atlasdb.keyvalue.api.Cell; +import com.palantir.atlasdb.keyvalue.api.TableReference; +import java.util.Map; +import java.util.function.LongFunction; + +/** + * Curious creature, but it allows the user to delay deciding the particular cell to write to until the actual commit, + * at which point the user will be provided with a unique long value that can be used to generate the cell. + * + * This is a very specific feature that not many people probably require, or should try to use. + * + * Right now the interface allows you to do TOO MUCH, you don't even have to specify the tables you write to. + * This can be improved and locked down, but this allows for maximum experimentation. + */ +@FunctionalInterface +public interface DelayedWrite extends LongFunction>> {} diff --git a/atlasdb-api/src/main/java/com/palantir/atlasdb/transaction/api/Transaction.java b/atlasdb-api/src/main/java/com/palantir/atlasdb/transaction/api/Transaction.java index 4285083b3a..abf7a2136f 100644 --- a/atlasdb-api/src/main/java/com/palantir/atlasdb/transaction/api/Transaction.java +++ b/atlasdb-api/src/main/java/com/palantir/atlasdb/transaction/api/Transaction.java @@ -33,6 +33,7 @@ import com.palantir.lock.watch.ChangeMetadata; import com.palantir.util.result.Result; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.NavigableMap; import java.util.Set; @@ -323,6 +324,9 @@ Stream>> getRangesLazy( @Idempotent void putWithMetadata(TableReference tableRef, Map valuesAndMetadata); + @Idempotent + void putDelayed(List values); + /** * Deletes values from the key-value store. * @param tableRef the table from which to delete the values diff --git a/atlasdb-api/src/main/java/com/palantir/atlasdb/transaction/api/TransactionManager.java b/atlasdb-api/src/main/java/com/palantir/atlasdb/transaction/api/TransactionManager.java index e407ac4682..fd420044ae 100644 --- a/atlasdb-api/src/main/java/com/palantir/atlasdb/transaction/api/TransactionManager.java +++ b/atlasdb-api/src/main/java/com/palantir/atlasdb/transaction/api/TransactionManager.java @@ -309,6 +309,8 @@ T runTaskWithConditionRea */ long getImmutableTimestamp(); + long getCommitImmutableTimestamp(); + /** * Returns the lock service used by this transaction manager. * diff --git a/atlasdb-api/src/main/java/com/palantir/atlasdb/transaction/api/precommit/PreCommitRequirementValidator.java b/atlasdb-api/src/main/java/com/palantir/atlasdb/transaction/api/precommit/PreCommitRequirementValidator.java index 7b6ff0a459..66e034b5d3 100644 --- a/atlasdb-api/src/main/java/com/palantir/atlasdb/transaction/api/precommit/PreCommitRequirementValidator.java +++ b/atlasdb-api/src/main/java/com/palantir/atlasdb/transaction/api/precommit/PreCommitRequirementValidator.java @@ -21,7 +21,7 @@ import com.palantir.atlasdb.transaction.api.TransactionFailedException; import com.palantir.lock.v2.LockToken; import java.util.Map; -import javax.annotation.Nullable; +import java.util.Set; public interface PreCommitRequirementValidator { /** @@ -41,10 +41,10 @@ void throwIfPreCommitConditionInvalidAtCommitOnWriteTransaction( * user pre-commit condition is no longer valid, or possibly because of other internal state such as commit * locks having expired. */ - void throwIfPreCommitRequirementsNotMet(@Nullable LockToken commitLocksToken, long timestamp); + void throwIfPreCommitRequirementsNotMet(Set commitLocksToken, long timestamp); /** * Throws a {@link TransactionFailedException} if the immutable timestamp lock or commit locks have expired. */ - void throwIfImmutableTsOrCommitLocksExpired(@Nullable LockToken commitLocksToken); + void throwIfImmutableTsOrCommitLocksExpired(Set commitLocksToken); } diff --git a/atlasdb-cassandra-integration-tests/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/AbstractCassandraKeyValueServiceIntegrationTest.java b/atlasdb-cassandra-integration-tests/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/AbstractCassandraKeyValueServiceIntegrationTest.java index 6407f46d3c..8801390f3e 100644 --- a/atlasdb-cassandra-integration-tests/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/AbstractCassandraKeyValueServiceIntegrationTest.java +++ b/atlasdb-cassandra-integration-tests/src/test/java/com/palantir/atlasdb/keyvalue/cassandra/AbstractCassandraKeyValueServiceIntegrationTest.java @@ -231,7 +231,7 @@ public void testCfEqualityChecker() throws TException { .collect(Collectors.toList())); assertThat(ColumnFamilyDefinitions.isMatchingCf( - kvs.getCfForTable(NEVER_SEEN, getMetadata(), FOUR_DAYS_IN_SECONDS), clusterSideCf)) + kvs.getCfForTable(NEVER_SEEN, getMetadata(), FOUR_DAYS_IN_SECONDS), clusterSideCf)) .as("After serialization and deserialization to database, Cf metadata did not match.") .isTrue(); } @@ -256,8 +256,10 @@ private static String getInternalTestTableName() { @SuppressWarnings("CompileTimeConstant") public void shouldNotErrorForTimestampTableWhenCreatingCassandraKvs() { verify(logger, atLeast(0)) - .error(startsWith("Found a table {} that did not have persisted"), - assertArg((Arg arg) -> assertThat(arg.getValue()).doesNotContain("timestamp"))); + .error( + startsWith("Found a table {} that did not have persisted"), + assertArg( + (Arg arg) -> assertThat(arg.getValue()).doesNotContain("timestamp"))); } @Test @@ -492,20 +494,20 @@ public void setOnceTest() { keyValueService.putUnlessExists(userTable, ImmutableMap.of(CELL, sad)); assertThat(keyValueService - .get(userTable, ImmutableMap.of(CELL, Long.MAX_VALUE)) - .get(CELL) - .getContents()) + .get(userTable, ImmutableMap.of(CELL, Long.MAX_VALUE)) + .get(CELL) + .getContents()) .containsExactly(sad); keyValueService.setOnce(userTable, ImmutableMap.of(CELL, happy)); assertThat(keyValueService - .get(userTable, ImmutableMap.of(CELL, Long.MAX_VALUE)) - .get(CELL) - .getContents()) + .get(userTable, ImmutableMap.of(CELL, Long.MAX_VALUE)) + .get(CELL) + .getContents()) .containsExactly(happy); assertThat(keyValueService - .getAllTimestamps(userTable, ImmutableSet.of(CELL), Long.MAX_VALUE) - .size()) + .getAllTimestamps(userTable, ImmutableSet.of(CELL), Long.MAX_VALUE) + .size()) .isEqualTo(1); keyValueService.truncateTable(userTable); } @@ -665,9 +667,9 @@ public void testMultiCheckAndSetCannotUpdateAcrossMultipleRows() { Cell nextTestCell = Cell.create(row(1), column(1)); assertThatThrownBy(() -> keyValueService.multiCheckAndSet(MultiCheckAndSetRequest.newCells( - TEST_TABLE, - firstTestCell.getRowName(), - ImmutableMap.of(firstTestCell, val(0, 0), nextTestCell, val(0, 1))))) + TEST_TABLE, + firstTestCell.getRowName(), + ImmutableMap.of(firstTestCell, val(0, 0), nextTestCell, val(0, 1))))) .isInstanceOf(SafeIllegalStateException.class) .hasMessageContaining("Can only update cells in one row."); } @@ -683,7 +685,7 @@ public void testMultiCheckAndSetIndependentlyFails() { TEST_TABLE, nextTestCell.getRowName(), ImmutableMap.of(nextTestCell, val(0, 1)))); assertThatThrownBy(() -> keyValueService.multiCheckAndSet(MultiCheckAndSetRequest.newCells( - TEST_TABLE, nextTestCell.getRowName(), ImmutableMap.of(nextTestCell, val(0, 2))))) + TEST_TABLE, nextTestCell.getRowName(), ImmutableMap.of(nextTestCell, val(0, 2))))) .isInstanceOf(MultiCheckAndSetException.class); MultiCheckAndSetException ex = catchThrowableOfType( @@ -808,7 +810,7 @@ private CfDef createDefaultCfDef(String namespace, String tableName) { .setComment("") .setColumn_metadata(new ArrayList<>()) .setTriggers(new ArrayList<>()) - .setKey_alias(new byte[]{0x6B, 0x65, 0x79}) + .setKey_alias(new byte[] {0x6B, 0x65, 0x79}) .setComparator_type("org.apache.cassandra.db.marshal.CompositeType" + "(org.apache.cassandra.db.marshal.BytesType,org.apache.cassandra.db.marshal.LongType)") .setCompaction_strategy_options(new HashMap<>()) diff --git a/atlasdb-client/src/main/java/com/palantir/atlasdb/transaction/impl/ForwardingTransaction.java b/atlasdb-client/src/main/java/com/palantir/atlasdb/transaction/impl/ForwardingTransaction.java index 6caba9c9fc..04cdc2eb4d 100644 --- a/atlasdb-client/src/main/java/com/palantir/atlasdb/transaction/impl/ForwardingTransaction.java +++ b/atlasdb-client/src/main/java/com/palantir/atlasdb/transaction/impl/ForwardingTransaction.java @@ -25,6 +25,7 @@ import com.palantir.atlasdb.keyvalue.api.RowResult; import com.palantir.atlasdb.keyvalue.api.TableReference; import com.palantir.atlasdb.transaction.api.ConstraintCheckable; +import com.palantir.atlasdb.transaction.api.DelayedWrite; import com.palantir.atlasdb.transaction.api.GetRangesQuery; import com.palantir.atlasdb.transaction.api.Transaction; import com.palantir.atlasdb.transaction.api.TransactionFailedException; @@ -37,6 +38,7 @@ import com.palantir.lock.watch.ChangeMetadata; import com.palantir.util.result.Result; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.NavigableMap; import java.util.Set; @@ -144,6 +146,11 @@ public void putWithMetadata(TableReference tableRef, Map values) { + delegate().putDelayed(values); + } + @Override public void delete(TableReference tableRef, Set keys) { delegate().delete(tableRef, keys); diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/ImmutableTimestampLockManager.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/ImmutableTimestampLockManager.java index 0052e555e7..d14c555a45 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/ImmutableTimestampLockManager.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/ImmutableTimestampLockManager.java @@ -36,9 +36,13 @@ public ImmutableTimestampLockManager( this.lockValidityChecker = lockValidityChecker; } - public Optional getExpiredImmutableTimestampAndCommitLocks(Optional commitLocksToken) { + public Optional getExpiredImmutableTimestampAndCommitLocks(Set commitLocksToken) { Set toRefresh = new HashSet<>(); - commitLocksToken.ifPresent(toRefresh::add); + + // TODO(jakubk): Handle this upstream + if (commitLocksToken != null) { + toRefresh.addAll(commitLocksToken); + } immutableTimestampLock.ifPresent(toRefresh::add); if (toRefresh.isEmpty()) { @@ -55,13 +59,13 @@ public Optional getExpiredImmutableTimestampAndCommitLocks(Optiona } public SummarizedLockCheckResult getExpiredImmutableTimestampAndCommitLocksWithFullSummary( - LockToken commitLocksToken) { + Set commitLocksToken) { Preconditions.checkNotNull( commitLocksToken, "commitLocksToken was null, not expected to be in a call to" + " getExpiredImmutableTimestampAndCommitLocksWithFullSummary", SafeArg.of("immutableTimestampLock", immutableTimestampLock)); - Optional expiredLocks = getExpiredImmutableTimestampAndCommitLocks(Optional.of(commitLocksToken)); + Optional expiredLocks = getExpiredImmutableTimestampAndCommitLocks(commitLocksToken); return SummarizedLockCheckResult.builder() .expiredLocks(expiredLocks) .immutableTimestampLock(immutableTimestampLock) @@ -69,7 +73,7 @@ public SummarizedLockCheckResult getExpiredImmutableTimestampAndCommitLocksWithF .build(); } - private String getExpiredLocksErrorString(Optional commitLocksToken, Set expiredLocks) { + private String getExpiredLocksErrorString(Set commitLocksToken, Set expiredLocks) { return "The following immutable timestamp lock was required: " + immutableTimestampLock + "; the following commit locks were required: " + commitLocksToken + "; the following locks are no longer valid: " + expiredLocks; @@ -95,7 +99,7 @@ public interface SummarizedLockCheckResult { Optional immutableTimestampLock(); - LockToken userProvidedLock(); + Set userProvidedLock(); static ImmutableSummarizedLockCheckResult.Builder builder() { return ImmutableSummarizedLockCheckResult.builder(); diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/InstrumentedTimelockService.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/InstrumentedTimelockService.java index 352c7d0b1f..a7cd9940f6 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/InstrumentedTimelockService.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/InstrumentedTimelockService.java @@ -19,6 +19,7 @@ import com.palantir.atlasdb.AtlasDbMetricNames; import com.palantir.atlasdb.util.MetricsManager; import com.palantir.lock.v2.ClientLockingOptions; +import com.palantir.lock.v2.GetCommitTimestampResponse; import com.palantir.lock.v2.LockImmutableTimestampResponse; import com.palantir.lock.v2.LockRequest; import com.palantir.lock.v2.LockResponse; @@ -69,7 +70,7 @@ public long getFreshTimestamp() { } @Override - public long getCommitTimestamp(long startTs, LockToken commitLocksToken) { + public GetCommitTimestampResponse getCommitTimestamp(long startTs, LockToken commitLocksToken) { return executeWithRecord(() -> timelockService.getCommitTimestamp(startTs, commitLocksToken)); } @@ -93,6 +94,11 @@ public long getImmutableTimestamp() { return executeWithRecord(timelockService::getImmutableTimestamp); } + @Override + public long getCommitImmutableTimestamp() { + return executeWithRecord(timelockService::getCommitImmutableTimestamp); + } + @Override public LockResponse lock(LockRequest request) { return executeWithRecord(() -> timelockService.lock(request)); diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/LocalWriteBuffer.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/LocalWriteBuffer.java index 103bfaf35b..d586195626 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/LocalWriteBuffer.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/LocalWriteBuffer.java @@ -23,14 +23,17 @@ import com.palantir.atlasdb.keyvalue.api.TableReference; import com.palantir.atlasdb.keyvalue.impl.Cells; import com.palantir.atlasdb.logging.LoggingArgs; +import com.palantir.atlasdb.transaction.api.DelayedWrite; import com.palantir.lock.watch.ChangeMetadata; import com.palantir.logsafe.SafeArg; import com.palantir.logsafe.UnsafeArg; import com.palantir.logsafe.exceptions.SafeIllegalStateException; import com.palantir.logsafe.logger.SafeLogger; import com.palantir.logsafe.logger.SafeLoggerFactory; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -44,6 +47,7 @@ class LocalWriteBuffer { private final ConcurrentMap> writesByTable = new ConcurrentHashMap<>(); + private final List delayedWritesByTable = Collections.synchronizedList(new ArrayList<>()); private final ConcurrentMap> metadataByTable = new ConcurrentHashMap<>(); private final ConcurrentMap locksByTable = new ConcurrentHashMap<>(); private final AtomicLong valuesByteCount = new AtomicLong(); @@ -92,6 +96,10 @@ public void putLocalWritesAndMetadata( } } + public void putDelayed(List values) { + delayedWritesByTable.addAll(values); + } + /** * Returns all local writes that have been buffered. */ @@ -99,6 +107,10 @@ public ConcurrentMap> getLo return writesByTable; } + public List getDelayedWrites() { + return delayedWritesByTable; + } + /** * Returns the local writes for cells of the given table. */ diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransaction.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransaction.java index a7ba1c983f..24c6da63da 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransaction.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransaction.java @@ -82,6 +82,7 @@ import com.palantir.atlasdb.transaction.api.ConflictHandler; import com.palantir.atlasdb.transaction.api.ConstraintCheckable; import com.palantir.atlasdb.transaction.api.ConstraintCheckingTransaction; +import com.palantir.atlasdb.transaction.api.DelayedWrite; import com.palantir.atlasdb.transaction.api.DeleteExecutor; import com.palantir.atlasdb.transaction.api.GetRangesQuery; import com.palantir.atlasdb.transaction.api.ImmutableGetRangesQuery; @@ -147,6 +148,7 @@ import com.palantir.lock.AtlasRowLockDescriptor; import com.palantir.lock.LockDescriptor; import com.palantir.lock.v2.ClientLockingOptions; +import com.palantir.lock.v2.GetCommitTimestampResponse; import com.palantir.lock.v2.LockRequest; import com.palantir.lock.v2.LockResponse; import com.palantir.lock.v2.LockToken; @@ -162,6 +164,7 @@ import com.palantir.logsafe.exceptions.SafeRuntimeException; import com.palantir.logsafe.logger.SafeLogger; import com.palantir.logsafe.logger.SafeLoggerFactory; +import com.palantir.timestamp.TimestampRange; import com.palantir.tracing.CloseableTracer; import com.palantir.util.AssertUtils; import com.palantir.util.RateLimitedLogger; @@ -200,6 +203,7 @@ import java.util.function.Function; import java.util.function.LongSupplier; import java.util.function.Supplier; +import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.stream.StreamSupport; import javax.annotation.Nullable; @@ -1750,6 +1754,26 @@ public void putWithMetadata(TableReference tableRef, Map values) { + // Preconditions.checkArgument(!AtlasDbConstants.HIDDEN_TABLES.contains(tableRef)); + // markTableAsInvolvedInThisTransaction(tableRef); + + if (values.isEmpty()) { + return; + } + + numWriters.incrementAndGet(); + try { + // We need to check the status after incrementing writers to ensure that we fail if we are committing. + ensureUncommitted(); + + localWriteBuffer.putDelayed(values); + } finally { + numWriters.decrementAndGet(); + } + } + private void putWithMetadataInternal( TableReference tableRef, Map values, Map metadata) { ensureNoEmptyValues(values); @@ -1991,7 +2015,13 @@ private void commitWrites(TransactionService transactionService) { // This must happen before conflict checking, otherwise we could complete the checks and then have someone // else write underneath us before we proceed (thus missing a write/write conflict). // Timing still useful to distinguish bad lock percentiles from user-generated lock requests. - LockToken commitLocksToken = timedAndTraced("commitAcquireLocks", this::acquireLocksForCommit); + Set commitLocksToken = new HashSet<>(); + commitLocksToken.add(timedAndTraced("commitAcquireLocks", this::acquireLocksForCommit)); + + // We could technically grab the commit timestamp as part of grabbing the locks and we would probably + // call it something slightly different then. + // The idea is the same though: have an idea of "beyond this point, writes are unsettled and therefore + // should not be read" try { // Conflict checking. We can actually do this later without compromising correctness, but there is no // reason to postpone this check - we waste resources writing unnecessarily if these are going to fail. @@ -2030,9 +2060,14 @@ private void commitWrites(TransactionService transactionService) { // We must do this before we check that our locks are still valid to ensure that other transactions that // will hold these locks are sure to have start timestamps after our commit timestamp. // Timing is still useful, as this may perform operations pertaining to lock watches. - long commitTimestamp = timedAndTraced( + // TODO(jakubk): Lalala, trying to code this fast. We know commitLocksToken is empty at this point! + GetCommitTimestampResponse commitTimestampResponse = timedAndTraced( "getCommitTimestamp", - () -> timelockService.getCommitTimestamp(getStartTimestamp(), commitLocksToken)); + () -> timelockService.getCommitTimestamp( + getStartTimestamp(), Iterables.getOnlyElement(commitLocksToken))); + commitLocksToken.add( + commitTimestampResponse.immutableTimestamp().getLock()); + long commitTimestamp = commitTimestampResponse.timestamp(); commitTsForScrubbing = commitTimestamp; // Punch on commit so that if hard delete is the only thing happening on a system, @@ -2052,7 +2087,11 @@ private void commitWrites(TransactionService transactionService) { "transactionKvsValidityCheck", () -> throwIfTransactionKeyValueServiceNoLongerValid(commitTimestamp)); - // Serializable transactions need to check their reads haven't changed, by reading again at + // TODO(jakubk): Add the special writes here. This can probably run in parallel with everything else. + // TODO(jakubk): Eventually we'd fold this into some other RPC, but for now just grab some fresh + // timestamps. + timedAndTraced("writeDelayedWrites", this::writeDelayedWrites); + // commitTs + 1. This must happen before the lock check for thorough tables, because the lock check // verifies the immutable timestamp hasn't moved forward - thorough sweep might sweep a conflict out // from underneath us. @@ -2088,11 +2127,44 @@ private void commitWrites(TransactionService transactionService) { .update(localWriteBuffer.getValuesByteCount()); } finally { // Not timed because tryUnlock() is an asynchronous operation. - traced("postCommitUnlock", () -> timelockService.tryUnlock(ImmutableSet.of(commitLocksToken))); + traced("postCommitUnlock", () -> timelockService.tryUnlock(commitLocksToken)); } }); } + private void writeDelayedWrites() { + List delayedWrites = localWriteBuffer.getDelayedWrites(); + if (delayedWrites.isEmpty()) { + return; + } + TimestampRange freshTimestamps = timelockService.getFreshTimestamps(delayedWrites.size()); + AtomicLong curFreshTimestamp = new AtomicLong(freshTimestamps.getLowerBound()); + LongSupplier freshTimestampsSupplier = () -> { + long freshTimestamp = curFreshTimestamp.getAndIncrement(); + Preconditions.checkState(freshTimestamp <= freshTimestamps.getUpperBound(), "Oops"); + return freshTimestamp; + }; + Map> materializedWrites = new HashMap<>(); + for (DelayedWrite writeAction : delayedWrites) { + long freshTimestamp = freshTimestampsSupplier.getAsLong(); + writeAction.apply(freshTimestamp).forEach((tableReference, writes) -> { + Map writesForTable = + materializedWrites.computeIfAbsent(tableReference, k -> new HashMap<>()); + writesForTable.putAll(writes); + }); + } + + // This is likely not 100% reliable and will require work to actually prove out/figure out + // safe way to not drop these writes on failed transactions. + // However, this is the art of the possible and I'm sure we can figure something out. + timedAndTraced( + "writeDelayedWritesWritingToSweepQueue", + () -> sweepQueue.enqueue(materializedWrites, getStartTimestamp())); + timedAndTraced( + "writeDelayedWritesCommitWrite", + () -> transactionKeyValueService.multiPut(materializedWrites, getStartTimestamp())); + } + private void throwIfTransactionKeyValueServiceNoLongerValid(long commitTimestamp) { if (!transactionKeyValueService.isValid(commitTimestamp)) { throw new SafeTransactionFailedRetriableException( @@ -2149,7 +2221,7 @@ protected ConflictHandler getConflictHandlerForTable(TableReference tableRef) { /** * Make sure we have all the rows we are checking already locked before calling this. */ - protected void throwIfConflictOnCommit(LockToken commitLocksToken, TransactionService transactionService) + protected void throwIfConflictOnCommit(Set commitLocksToken, TransactionService transactionService) throws TransactionConflictException { for (Map.Entry> write : localWriteBuffer.getLocalWrites().entrySet()) { @@ -2163,7 +2235,7 @@ protected void throwIfWriteAlreadyCommitted( TableReference tableRef, Map writes, ConflictHandler conflictHandler, - LockToken commitLocksToken, + Set commitLocksToken, TransactionService transactionService) throws TransactionConflictException { if (writes.isEmpty() || !conflictHandler.checkWriteWriteConflicts()) { @@ -2203,7 +2275,7 @@ private void throwIfValueChangedConflict( Map writes, Set spanningWrites, Set dominatingWrites, - LockToken commitLocksToken) { + Set commitLocksToken) { Map cellToConflict = new HashMap<>(); Map cellToTs = new HashMap<>(); for (CellConflict c : Sets.union(spanningWrites, dominatingWrites)) { @@ -2548,7 +2620,8 @@ private LongSet getStartTimestampsForValues(Iterable values) { * @throws TransactionLockTimeoutException If our locks timed out while trying to commit. * @throws TransactionCommitFailedException failed when committing in a way that isn't retriable */ - private void putCommitTimestamp(long commitTimestamp, LockToken locksToken, TransactionService transactionService) + private void putCommitTimestamp( + long commitTimestamp, Set locksToken, TransactionService transactionService) throws TransactionFailedException { Preconditions.checkArgument(commitTimestamp > getStartTimestamp(), "commitTs must be greater than startTs"); try { @@ -2567,7 +2640,7 @@ private void putCommitTimestamp(long commitTimestamp, LockToken locksToken, Tran } private void handleKeyAlreadyExistsException( - long commitTs, KeyAlreadyExistsException ex, LockToken commitLocksToken) { + long commitTs, KeyAlreadyExistsException ex, Set commitLocksToken) { try { if (wasCommitSuccessful(commitTs)) { // We did actually commit successfully. This case could happen if the impl @@ -2593,7 +2666,11 @@ private void handleKeyAlreadyExistsException( .immutableTimestampLock() .map(token -> token.toSafeArg("immutableTimestampLock")) .orElseGet(() -> SafeArg.of("immutableTimestampLock", null)), - lockCheckResult.userProvidedLock().toSafeArg("commitLocksToken")); + SafeArg.of( + "commitLocksToken", + lockCheckResult.userProvidedLock().stream() + .map(LockToken::getRequestId) + .collect(Collectors.toSet()))); } } catch (TransactionFailedException e1) { throw e1; diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransactionManager.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransactionManager.java index b32cda504f..820e9d11e7 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransactionManager.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/SnapshotTransactionManager.java @@ -512,6 +512,11 @@ public long getImmutableTimestamp() { return immutableTs; } + @Override + public long getCommitImmutableTimestamp() { + return timelockService.getCommitImmutableTimestamp(); + } + private void recordImmutableTimestamp(long immutableTs) { recentImmutableTs.updateAndGet(current -> Math.max(current, immutableTs)); } diff --git a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/precommit/DefaultPreCommitRequirementValidator.java b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/precommit/DefaultPreCommitRequirementValidator.java index 5bca6ed977..50bdc1774b 100644 --- a/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/precommit/DefaultPreCommitRequirementValidator.java +++ b/atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/transaction/impl/precommit/DefaultPreCommitRequirementValidator.java @@ -31,6 +31,7 @@ import com.palantir.logsafe.logger.SafeLoggerFactory; import java.util.Map; import java.util.Optional; +import java.util.Set; public final class DefaultPreCommitRequirementValidator implements PreCommitRequirementValidator { private static final SafeLogger log = SafeLoggerFactory.get(DefaultPreCommitRequirementValidator.class); @@ -70,15 +71,15 @@ public void throwIfPreCommitConditionInvalidAtCommitOnWriteTransaction( } @Override - public void throwIfPreCommitRequirementsNotMet(LockToken commitLocksToken, long timestamp) { + public void throwIfPreCommitRequirementsNotMet(Set commitLocksToken, long timestamp) { throwIfImmutableTsOrCommitLocksExpired(commitLocksToken); throwIfPreCommitConditionInvalid(timestamp); } @Override - public void throwIfImmutableTsOrCommitLocksExpired(LockToken commitLocksToken) { - Optional expiredLocks = immutableTimestampLockManager.getExpiredImmutableTimestampAndCommitLocks( - Optional.ofNullable(commitLocksToken)); + public void throwIfImmutableTsOrCommitLocksExpired(Set commitLocksToken) { + Optional expiredLocks = + immutableTimestampLockManager.getExpiredImmutableTimestampAndCommitLocks(commitLocksToken); if (expiredLocks.isPresent()) { throw createDefaultTransactionLockTimeoutException(expiredLocks.get()); } diff --git a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/transaction/impl/ImmutableTimestampLockManagerTest.java b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/transaction/impl/ImmutableTimestampLockManagerTest.java index d47843ef38..9840112969 100644 --- a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/transaction/impl/ImmutableTimestampLockManagerTest.java +++ b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/transaction/impl/ImmutableTimestampLockManagerTest.java @@ -27,7 +27,9 @@ import com.palantir.atlasdb.transaction.impl.precommit.LockValidityChecker; import com.palantir.lock.v2.LockToken; import java.util.Optional; +import java.util.Set; import java.util.UUID; +import java.util.stream.Collectors; import java.util.stream.Stream; import org.junit.jupiter.api.Assumptions; import org.junit.jupiter.api.Test; @@ -51,7 +53,8 @@ public void returnsNothingExpiredWhenAllLocksStillValid( when(validityChecker.getStillValidLockTokens(anySet())) .thenAnswer(invocation -> invocation.getArguments()[0]); - assertThat(immutableTimestampLockManager.getExpiredImmutableTimestampAndCommitLocks(userCommitLock)) + assertThat(immutableTimestampLockManager.getExpiredImmutableTimestampAndCommitLocks( + userCommitLock.stream().collect(Collectors.toSet()))) .isEmpty(); } @@ -67,12 +70,12 @@ public void returnsLocksThatWereCheckedWhenGettingFullSummaryAndStillValid( .thenAnswer(invocation -> invocation.getArguments()[0]); assertThat(immutableTimestampLockManager.getExpiredImmutableTimestampAndCommitLocksWithFullSummary( - DEFAULT_COMMIT_LOCK_TOKEN)) + Set.of(DEFAULT_COMMIT_LOCK_TOKEN))) .satisfies(summarizedLockCheckResult -> { assertThat(summarizedLockCheckResult.expiredLocks()).isEmpty(); assertThat(summarizedLockCheckResult.immutableTimestampLock()) .isEqualTo(immutableTimestampLock); - assertThat(summarizedLockCheckResult.userProvidedLock()).isEqualTo(DEFAULT_COMMIT_LOCK_TOKEN); + assertThat(summarizedLockCheckResult.userProvidedLock()).containsExactly(DEFAULT_COMMIT_LOCK_TOKEN); }); } @@ -90,10 +93,12 @@ public void returnsExpiredLocksWhenLocksNoLongerValid( when(validityChecker.getStillValidLockTokens(anySet())).thenReturn(ImmutableSet.of()); - assertThat(immutableTimestampLockManager.getExpiredImmutableTimestampAndCommitLocks(userCommitLock)) + assertThat(immutableTimestampLockManager.getExpiredImmutableTimestampAndCommitLocks( + userCommitLock.stream().collect(Collectors.toSet()))) .hasValueSatisfying(expiredLocks -> { assertThat(expiredLocks.errorDescription()).contains(immutableTimestampLock.toString()); - assertThat(expiredLocks.errorDescription()).contains(userCommitLock.toString()); + userCommitLock.ifPresent(lockToken -> + assertThat(expiredLocks.errorDescription()).contains(lockToken.toString())); }); } @@ -108,12 +113,12 @@ public void returnsLocksThatWereCheckedWhenGettingFullSummaryAndExpired( when(validityChecker.getStillValidLockTokens(anySet())).thenReturn(ImmutableSet.of()); assertThat(immutableTimestampLockManager.getExpiredImmutableTimestampAndCommitLocksWithFullSummary( - DEFAULT_COMMIT_LOCK_TOKEN)) + Set.of(DEFAULT_COMMIT_LOCK_TOKEN))) .satisfies(summarizedLockCheckResult -> { assertThat(summarizedLockCheckResult.expiredLocks()).isPresent(); assertThat(summarizedLockCheckResult.immutableTimestampLock()) .isEqualTo(immutableTimestampLock); - assertThat(summarizedLockCheckResult.userProvidedLock()).isEqualTo(DEFAULT_COMMIT_LOCK_TOKEN); + assertThat(summarizedLockCheckResult.userProvidedLock()).containsExactly(DEFAULT_COMMIT_LOCK_TOKEN); }); } @@ -127,7 +132,7 @@ public void throwsIfOnlyCommitLockExpiredWhenCheckingBoth() { .thenReturn(ImmutableSet.of(DEFAULT_IMMUTABLE_TIMESTAMP_LOCK_TOKEN)); assertThat(immutableTimestampLockManager.getExpiredImmutableTimestampAndCommitLocks( - Optional.of(DEFAULT_COMMIT_LOCK_TOKEN))) + Set.of(DEFAULT_COMMIT_LOCK_TOKEN))) .hasValueSatisfying(expiredLocks -> { // This is a bit fragile, but emphasising readability here assertThat(expiredLocks.errorDescription()) @@ -144,7 +149,7 @@ public void throwsIfOnlyImmutableTimestampLockExpiredWhenCheckingBoth() { when(validityChecker.getStillValidLockTokens(anySet())).thenReturn(ImmutableSet.of(DEFAULT_COMMIT_LOCK_TOKEN)); assertThat(immutableTimestampLockManager.getExpiredImmutableTimestampAndCommitLocks( - Optional.of(DEFAULT_COMMIT_LOCK_TOKEN))) + Set.of(DEFAULT_COMMIT_LOCK_TOKEN))) .hasValueSatisfying(expiredLocks -> { // This is a bit fragile, but emphasising readability here assertThat(expiredLocks.errorDescription()) @@ -159,7 +164,7 @@ public void doesNotCallLockRefresherIfNothingToCheck() { ImmutableTimestampLockManager immutableTimestampLockManager = new ImmutableTimestampLockManager(Optional.empty(), validityChecker); - assertThat(immutableTimestampLockManager.getExpiredImmutableTimestampAndCommitLocks(Optional.empty())) + assertThat(immutableTimestampLockManager.getExpiredImmutableTimestampAndCommitLocks(Set.of())) .isEmpty(); verify(validityChecker, never()).getStillValidLockTokens(anySet()); } diff --git a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/transaction/impl/precommit/DefaultPreCommitRequirementValidatorTest.java b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/transaction/impl/precommit/DefaultPreCommitRequirementValidatorTest.java index cedebaf7ac..0f7fd697aa 100644 --- a/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/transaction/impl/precommit/DefaultPreCommitRequirementValidatorTest.java +++ b/atlasdb-impl-shared/src/test/java/com/palantir/atlasdb/transaction/impl/precommit/DefaultPreCommitRequirementValidatorTest.java @@ -20,6 +20,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.nullable; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -37,6 +38,7 @@ import com.palantir.lock.v2.LockToken; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.UUID; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -57,7 +59,7 @@ public final class DefaultPreCommitRequirementValidatorTest { PtBytes.toBytes("two"), Cell.create(PtBytes.toBytes("suggestion"), PtBytes.toBytes("moot")), PtBytes.toBytes("three"))); - private static final LockToken LOCK_TOKEN = LockToken.of(UUID.randomUUID()); + private static final Set LOCK_TOKEN = Set.of(LockToken.of(UUID.randomUUID())); @Mock private PreCommitCondition userPreCommitCondition; @@ -105,10 +107,10 @@ public void throwIfPreCommitConditionInvalidAtCommitOnWriteTransactionPropagates @Test public void throwIfImmutableTsOrCommitLocksExpiredDelegatesEmptyRequestToLockManager() { - when(immutableTimestampLockManager.getExpiredImmutableTimestampAndCommitLocks(any())) + when(immutableTimestampLockManager.getExpiredImmutableTimestampAndCommitLocks(nullable(Set.class))) .thenReturn(Optional.empty()); validator.throwIfImmutableTsOrCommitLocksExpired(null); - verify(immutableTimestampLockManager).getExpiredImmutableTimestampAndCommitLocks(Optional.empty()); + verify(immutableTimestampLockManager).getExpiredImmutableTimestampAndCommitLocks(null); } @Test @@ -116,7 +118,7 @@ public void throwIfImmutableTsOrCommitLocksExpiredDelegatesRequestWithLockTokenT when(immutableTimestampLockManager.getExpiredImmutableTimestampAndCommitLocks(any())) .thenReturn(Optional.empty()); validator.throwIfImmutableTsOrCommitLocksExpired(LOCK_TOKEN); - verify(immutableTimestampLockManager).getExpiredImmutableTimestampAndCommitLocks(Optional.of(LOCK_TOKEN)); + verify(immutableTimestampLockManager).getExpiredImmutableTimestampAndCommitLocks(LOCK_TOKEN); } @Test @@ -150,6 +152,6 @@ public void throwIfPreCommitRequirementsNotMetDoesNotThrowIfNoLocksExpiredAndPre assertThatCode(() -> validator.throwIfPreCommitRequirementsNotMet(LOCK_TOKEN, TIMESTAMP)) .doesNotThrowAnyException(); verify(userPreCommitCondition).throwIfConditionInvalid(TIMESTAMP); - verify(immutableTimestampLockManager).getExpiredImmutableTimestampAndCommitLocks(Optional.of(LOCK_TOKEN)); + verify(immutableTimestampLockManager).getExpiredImmutableTimestampAndCommitLocks(LOCK_TOKEN); } } diff --git a/atlasdb-tests-shared/src/main/java/com/palantir/timelock/paxos/InMemoryNamespacedTimelockRpcClient.java b/atlasdb-tests-shared/src/main/java/com/palantir/timelock/paxos/InMemoryNamespacedTimelockRpcClient.java index fa78fd2f40..92d63b0090 100644 --- a/atlasdb-tests-shared/src/main/java/com/palantir/timelock/paxos/InMemoryNamespacedTimelockRpcClient.java +++ b/atlasdb-tests-shared/src/main/java/com/palantir/timelock/paxos/InMemoryNamespacedTimelockRpcClient.java @@ -32,6 +32,11 @@ public long getImmutableTimestamp() { return delegate.getImmutableTimestamp(); } + @Override + public long getCommitImmutableTimestamp() { + return delegate.getCommitImmutableTimestamp(); + } + @Override public long currentTimeMillis() { return delegate.currentTimeMillis(); diff --git a/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/transaction/impl/AbstractSnapshotTransactionTest.java b/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/transaction/impl/AbstractSnapshotTransactionTest.java index ed53c13830..71bef66d9c 100644 --- a/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/transaction/impl/AbstractSnapshotTransactionTest.java +++ b/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/transaction/impl/AbstractSnapshotTransactionTest.java @@ -140,6 +140,7 @@ import com.palantir.lock.LockService; import com.palantir.lock.SimpleTimeDuration; import com.palantir.lock.TimeDuration; +import com.palantir.lock.v2.GetCommitTimestampResponse; import com.palantir.lock.v2.LockImmutableTimestampResponse; import com.palantir.lock.v2.LockResponse; import com.palantir.lock.v2.LockToken; @@ -1612,7 +1613,9 @@ public void commitDoesNotThrowIfAlreadySuccessfullyCommitted() { getSnapshotTransactionWith(spiedTimeLockService, () -> transactionTs, res, PreCommitConditions.NO_OP); when(spiedTimeLockService.getFreshTimestamp()).thenReturn(transactionTs + 1); - doReturn(transactionTs + 1).when(spiedTimeLockService).getCommitTimestamp(anyLong(), any()); + doReturn(GetCommitTimestampResponse.of(res, transactionTs + 1)) + .when(spiedTimeLockService) + .getCommitTimestamp(anyLong(), any()); // forcing to try to commit a transaction that is already committed transactionService.putUnlessExists(transactionTs, spiedTimeLockService.getFreshTimestamp()); diff --git a/atlasdb-workload-server/src/main/java/com/palantir/atlasdb/workload/store/UnreliableTimestampManager.java b/atlasdb-workload-server/src/main/java/com/palantir/atlasdb/workload/store/UnreliableTimestampManager.java index a9fb97891d..70733ed6e1 100644 --- a/atlasdb-workload-server/src/main/java/com/palantir/atlasdb/workload/store/UnreliableTimestampManager.java +++ b/atlasdb-workload-server/src/main/java/com/palantir/atlasdb/workload/store/UnreliableTimestampManager.java @@ -19,6 +19,7 @@ import com.google.common.annotations.VisibleForTesting; import com.palantir.atlasdb.buggify.impl.DefaultNativeSamplingSecureRandomFactory; import com.palantir.lock.client.RandomizedTimestampManager; +import com.palantir.lock.v2.GetCommitTimestampResponse; import com.palantir.lock.v2.LockToken; import com.palantir.lock.v2.TimelockService; import com.palantir.logsafe.SafeArg; @@ -96,7 +97,7 @@ public long getFreshTimestamp() { } @Override - public long getCommitTimestamp(long startTs, LockToken commitLocksToken) { + public GetCommitTimestampResponse getCommitTimestamp(long startTs, LockToken commitLocksToken) { return runWithReadLock(() -> delegate.getCommitTimestamp(startTs, commitLocksToken)); } diff --git a/lock-api-objects/src/main/java/com/palantir/lock/v2/GetCommitTimestampResponse.java b/lock-api-objects/src/main/java/com/palantir/lock/v2/GetCommitTimestampResponse.java new file mode 100644 index 0000000000..1e3a2d1ae9 --- /dev/null +++ b/lock-api-objects/src/main/java/com/palantir/lock/v2/GetCommitTimestampResponse.java @@ -0,0 +1,32 @@ +/* + * (c) Copyright 2024 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.lock.v2; + +import org.immutables.value.Value; + +@Value.Immutable +public interface GetCommitTimestampResponse { + @Value.Parameter + LockImmutableTimestampResponse immutableTimestamp(); + + @Value.Parameter + long timestamp(); + + static GetCommitTimestampResponse of(LockImmutableTimestampResponse immutableTimestamp, long timestamp) { + return ImmutableGetCommitTimestampResponse.of(immutableTimestamp, timestamp); + } +} diff --git a/lock-api-objects/src/main/java/com/palantir/lock/v2/TimelockService.java b/lock-api-objects/src/main/java/com/palantir/lock/v2/TimelockService.java index 84d9529103..c761d26973 100644 --- a/lock-api-objects/src/main/java/com/palantir/lock/v2/TimelockService.java +++ b/lock-api-objects/src/main/java/com/palantir/lock/v2/TimelockService.java @@ -42,7 +42,7 @@ default boolean isInitialized() { long getFreshTimestamp(); - long getCommitTimestamp(long startTs, LockToken commitLocksToken); + GetCommitTimestampResponse getCommitTimestamp(long startTs, LockToken commitLocksToken); TimestampRange getFreshTimestamps(@Safe @QueryParam("number") int numTimestampsRequested); @@ -76,6 +76,8 @@ default List startIdentifiedAtlasDbTr long getImmutableTimestamp(); + long getCommitImmutableTimestamp(); + LockResponse lock(LockRequest request); /** diff --git a/lock-api/src/main/java/com/palantir/lock/client/BatchingCommitTimestampGetter.java b/lock-api/src/main/java/com/palantir/lock/client/BatchingCommitTimestampGetter.java index 356b02e92d..dc85d73600 100644 --- a/lock-api/src/main/java/com/palantir/lock/client/BatchingCommitTimestampGetter.java +++ b/lock-api/src/main/java/com/palantir/lock/client/BatchingCommitTimestampGetter.java @@ -18,11 +18,15 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Streams; +import com.google.common.primitives.Ints; import com.palantir.atlasdb.autobatch.Autobatchers; import com.palantir.atlasdb.autobatch.BatchElement; import com.palantir.atlasdb.autobatch.DisruptorAutobatcher; import com.palantir.atlasdb.futures.AtlasFutures; import com.palantir.atlasdb.timelock.api.GetCommitTimestampsResponse; +import com.palantir.common.base.Throwables; +import com.palantir.lock.v2.GetCommitTimestampResponse; +import com.palantir.lock.v2.LockImmutableTimestampResponse; import com.palantir.lock.v2.LockToken; import com.palantir.lock.watch.LockWatchCache; import com.palantir.lock.watch.LockWatchVersion; @@ -34,20 +38,22 @@ import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.LongStream; +import java.util.stream.Stream; import org.immutables.value.Value; /** * This class batches getCommitTimestamps requests to TimeLock server for a single client/namespace. - * */ + */ final class BatchingCommitTimestampGetter implements CommitTimestampGetter { - private final DisruptorAutobatcher autobatcher; + private final DisruptorAutobatcher autobatcher; - private BatchingCommitTimestampGetter(DisruptorAutobatcher autobatcher) { + private BatchingCommitTimestampGetter(DisruptorAutobatcher autobatcher) { this.autobatcher = autobatcher; } public static BatchingCommitTimestampGetter create(LockLeaseService leaseService, LockWatchCache cache) { - DisruptorAutobatcher autobatcher = Autobatchers.independent(consumer(leaseService, cache)) + DisruptorAutobatcher autobatcher = Autobatchers.independent( + consumer(leaseService, cache)) .safeLoggablePurpose("get-commit-timestamp") .batchFunctionTimeout(Duration.ofSeconds(30)) .build(); @@ -55,7 +61,7 @@ public static BatchingCommitTimestampGetter create(LockLeaseService leaseService } @Override - public long getCommitTimestamp(long startTs, LockToken commitLocksToken) { + public GetCommitTimestampResponse getCommitTimestamp(long startTs, LockToken commitLocksToken) { return AtlasFutures.getUnchecked(autobatcher.apply(ImmutableRequest.builder() .startTs(startTs) .commitLocksToken(commitLocksToken) @@ -63,10 +69,24 @@ public long getCommitTimestamp(long startTs, LockToken commitLocksToken) { } @VisibleForTesting - static Consumer>> consumer(LockLeaseService leaseService, LockWatchCache cache) { + static Consumer>> consumer( + LockLeaseService leaseService, LockWatchCache cache) { return batch -> { int count = batch.size(); - List commitTimestamps = new ArrayList<>(); + List commitTimestamps = getResponses(leaseService, cache, batch, count); + for (int i = 0; i < count; i++) { + batch.get(i).result().set(commitTimestamps.get(i)); + } + }; + } + + private static List getResponses( + LockLeaseService leaseService, + LockWatchCache cache, + List> batch, + int count) { + List commitTimestamps = new ArrayList<>(); + try { while (commitTimestamps.size() < count) { Optional requestedVersion = cache.getEventCache().lastKnownVersion(); @@ -74,15 +94,31 @@ static Consumer>> consumer(LockLeaseService lea leaseService.getCommitTimestamps(requestedVersion, count - commitTimestamps.size()); commitTimestamps.addAll(process(batch.subList(commitTimestamps.size(), count), response, cache)); } - - for (int i = 0; i < count; i++) { - batch.get(i).result().set(commitTimestamps.get(i)); - } - }; + return commitTimestamps; + } catch (Throwable t) { + // Something else should cleanup the caches, that's fine. The joys of underhanded design. + // But we need to unlock the commit immutable timestamp locks. + // This method call is weird, but this is what BatchingIdentifiedAtlasDbTransactionStarter does, + // so I'll allow it. + TransactionStarterHelper.unlock( + commitTimestamps.stream() + .map(response -> response.immutableTimestamp().getLock()) + .collect(Collectors.toSet()), + leaseService); + throw Throwables.throwUncheckedException(t); + } } - private static List process( - List> requests, GetCommitTimestampsResponse response, LockWatchCache cache) { + private static List process( + List> requests, + GetCommitTimestampsResponse response, + LockWatchCache cache) { + LockToken immutableTsLock = response.getCommitImmutableTimestamp().getLock(); + long commitImmutableTs = response.getCommitImmutableTimestamp().getImmutableTimestamp(); + Stream immutableTsAndLocks = LockTokenShare.share( + immutableTsLock, + Ints.checkedCast(response.getInclusiveUpper() - response.getInclusiveLower() + 1)) + .map(tokenShare -> LockImmutableTimestampResponse.of(commitImmutableTs, tokenShare)); List timestamps = LongStream.rangeClosed(response.getInclusiveLower(), response.getInclusiveUpper()) .boxed() .collect(Collectors.toList()); @@ -94,7 +130,8 @@ private static List process( .build()) .collect(Collectors.toList()); cache.processCommitTimestampsUpdate(transactionUpdates, response.getLockWatchUpdate()); - return timestamps; + return Streams.zip(immutableTsAndLocks, timestamps.stream(), GetCommitTimestampResponse::of) + .collect(Collectors.toList()); } @Override diff --git a/lock-api/src/main/java/com/palantir/lock/client/CommitTimestampGetter.java b/lock-api/src/main/java/com/palantir/lock/client/CommitTimestampGetter.java index 890e34f51c..ddcaec21e1 100644 --- a/lock-api/src/main/java/com/palantir/lock/client/CommitTimestampGetter.java +++ b/lock-api/src/main/java/com/palantir/lock/client/CommitTimestampGetter.java @@ -16,10 +16,11 @@ package com.palantir.lock.client; +import com.palantir.lock.v2.GetCommitTimestampResponse; import com.palantir.lock.v2.LockToken; public interface CommitTimestampGetter extends AutoCloseable { - long getCommitTimestamp(long startTs, LockToken commitLocksToken); + GetCommitTimestampResponse getCommitTimestamp(long startTs, LockToken commitLocksToken); @Override void close(); diff --git a/lock-api/src/main/java/com/palantir/lock/client/LockLeaseService.java b/lock-api/src/main/java/com/palantir/lock/client/LockLeaseService.java index c732c2e253..fd09bf2298 100644 --- a/lock-api/src/main/java/com/palantir/lock/client/LockLeaseService.java +++ b/lock-api/src/main/java/com/palantir/lock/client/LockLeaseService.java @@ -126,7 +126,21 @@ GetCommitTimestampsResponse getCommitTimestamps(Optional maybe .numTimestamps(batchSize) .lastKnownVersion(ConjureLockRequests.toConjure(maybeVersion)) .build(); - return delegate.getCommitTimestamps(request); + return assignLeasedLockTokenToImmutableCommitTimestampLock(delegate.getCommitTimestamps(request)); + } + + private static GetCommitTimestampsResponse assignLeasedLockTokenToImmutableCommitTimestampLock( + GetCommitTimestampsResponse response) { + Lease lease = response.getLease(); + LeasedLockToken leasedLockToken = LeasedLockToken.of( + ConjureLockToken.of( + response.getCommitImmutableTimestamp().getLock().getRequestId()), + lease); + long immutableTs = response.getCommitImmutableTimestamp().getImmutableTimestamp(); + return GetCommitTimestampsResponse.builder() + .from(response) + .commitImmutableTimestamp(LockImmutableTimestampResponse.of(immutableTs, leasedLockToken)) + .build(); } LockResponse lock(LockRequest request) { diff --git a/lock-api/src/main/java/com/palantir/lock/client/MultiClientCommitTimestampGetter.java b/lock-api/src/main/java/com/palantir/lock/client/MultiClientCommitTimestampGetter.java index 1e4928458a..6a163ab18a 100644 --- a/lock-api/src/main/java/com/palantir/lock/client/MultiClientCommitTimestampGetter.java +++ b/lock-api/src/main/java/com/palantir/lock/client/MultiClientCommitTimestampGetter.java @@ -20,6 +20,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Streams; +import com.google.common.primitives.Ints; import com.palantir.atlasdb.autobatch.Autobatchers; import com.palantir.atlasdb.autobatch.BatchElement; import com.palantir.atlasdb.autobatch.DisruptorAutobatcher; @@ -28,6 +29,8 @@ import com.palantir.atlasdb.timelock.api.GetCommitTimestampsResponse; import com.palantir.atlasdb.timelock.api.Namespace; import com.palantir.common.streams.KeyedStream; +import com.palantir.lock.v2.GetCommitTimestampResponse; +import com.palantir.lock.v2.LockImmutableTimestampResponse; import com.palantir.lock.v2.LockToken; import com.palantir.lock.watch.LockWatchCache; import com.palantir.lock.watch.LockWatchStateUpdate; @@ -43,24 +46,27 @@ import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.LongStream; +import java.util.stream.Stream; import org.immutables.value.Value; public final class MultiClientCommitTimestampGetter implements AutoCloseable { - private final DisruptorAutobatcher autobatcher; + private final DisruptorAutobatcher autobatcher; - private MultiClientCommitTimestampGetter(DisruptorAutobatcher autobatcher) { + private MultiClientCommitTimestampGetter( + DisruptorAutobatcher autobatcher) { this.autobatcher = autobatcher; } public static MultiClientCommitTimestampGetter create(InternalMultiClientConjureTimelockService delegate) { - DisruptorAutobatcher autobatcher = Autobatchers.independent(consumer(delegate)) + DisruptorAutobatcher autobatcher = Autobatchers.independent( + consumer(delegate)) .safeLoggablePurpose("multi-client-commit-timestamp-getter") .batchFunctionTimeout(Duration.ofSeconds(30)) .build(); return new MultiClientCommitTimestampGetter(autobatcher); } - public long getCommitTimestamp( + public GetCommitTimestampResponse getCommitTimestamp( Namespace namespace, long startTs, LockToken commitLocksToken, LockWatchCache cache) { return AtlasFutures.getUnchecked(autobatcher.apply(ImmutableNamespacedRequest.builder() .namespace(namespace) @@ -71,7 +77,7 @@ public long getCommitTimestamp( } @VisibleForTesting - static Consumer>> consumer( + static Consumer>> consumer( InternalMultiClientConjureTimelockService delegate) { return batch -> { BatchStateManager batchStateManager = BatchStateManager.createFromRequestBatch(batch); @@ -88,10 +94,11 @@ private BatchStateManager(Map requestMap this.requestMap = requestMap; } - static BatchStateManager createFromRequestBatch(List> batch) { + static BatchStateManager createFromRequestBatch( + List> batch) { Map requestMap = new HashMap<>(); - for (BatchElement elem : batch) { + for (BatchElement elem : batch) { NamespacedRequest argument = elem.argument(); Namespace namespace = argument.namespace(); NamespacedBatchStateManager namespacedBatchStateManager = requestMap.computeIfAbsent( @@ -120,7 +127,7 @@ private void processResponse(Map respons } private static final class NamespacedBatchStateManager { - private final Queue> pendingRequestQueue; + private final Queue> pendingRequestQueue; private final LockWatchCache cache; private Optional lastKnownVersion; @@ -134,7 +141,7 @@ private boolean hasPendingRequests() { return !pendingRequestQueue.isEmpty(); } - private void addRequest(BatchElement elem) { + private void addRequest(BatchElement elem) { pendingRequestQueue.add(elem); } @@ -151,22 +158,35 @@ private Optional updateAndGetLastKnownVersion() { } private void serviceRequests(GetCommitTimestampsResponse commitTimestampsResponse) { - List commitTimestamps = getCommitTimestampValues(commitTimestampsResponse); + LockToken immutableTsLock = + commitTimestampsResponse.getCommitImmutableTimestamp().getLock(); + long immutableTs = + commitTimestampsResponse.getCommitImmutableTimestamp().getImmutableTimestamp(); + Stream immutableTsAndLocks = LockTokenShare.share( + immutableTsLock, + Ints.checkedCast(commitTimestampsResponse.getInclusiveUpper() + - commitTimestampsResponse.getInclusiveLower() + + 1)) + .map(tokenShare -> LockImmutableTimestampResponse.of(immutableTs, tokenShare)); + List timestamps = LongStream.rangeClosed( + commitTimestampsResponse.getInclusiveLower(), commitTimestampsResponse.getInclusiveUpper()) + .boxed() + .collect(Collectors.toList()); + List commitTimestamps = Streams.zip( + immutableTsAndLocks, timestamps.stream(), GetCommitTimestampResponse::of) + .collect(Collectors.toList()); - processLockWatchUpdate(commitTimestamps, commitTimestampsResponse.getLockWatchUpdate()); + processLockWatchUpdate( + commitTimestamps.stream() + .map(GetCommitTimestampResponse::timestamp) + .collect(Collectors.toList()), + commitTimestampsResponse.getLockWatchUpdate()); - for (Long commitTimestamp : commitTimestamps) { + for (GetCommitTimestampResponse commitTimestamp : commitTimestamps) { pendingRequestQueue.poll().result().set(commitTimestamp); } } - private List getCommitTimestampValues(GetCommitTimestampsResponse commitTimestampsResponse) { - return LongStream.rangeClosed( - commitTimestampsResponse.getInclusiveLower(), commitTimestampsResponse.getInclusiveUpper()) - .boxed() - .collect(Collectors.toList()); - } - private void processLockWatchUpdate(List timestamps, LockWatchStateUpdate lockWatchUpdate) { List transactionUpdates = Streams.zip( timestamps.stream(), diff --git a/lock-api/src/main/java/com/palantir/lock/client/NamespacedCommitTimestampGetter.java b/lock-api/src/main/java/com/palantir/lock/client/NamespacedCommitTimestampGetter.java index 08a7f4b3ec..76d14dba5e 100644 --- a/lock-api/src/main/java/com/palantir/lock/client/NamespacedCommitTimestampGetter.java +++ b/lock-api/src/main/java/com/palantir/lock/client/NamespacedCommitTimestampGetter.java @@ -17,6 +17,7 @@ package com.palantir.lock.client; import com.palantir.atlasdb.timelock.api.Namespace; +import com.palantir.lock.v2.GetCommitTimestampResponse; import com.palantir.lock.v2.LockToken; import com.palantir.lock.watch.LockWatchCache; @@ -35,7 +36,7 @@ public NamespacedCommitTimestampGetter( } @Override - public long getCommitTimestamp(long startTs, LockToken commitLocksToken) { + public GetCommitTimestampResponse getCommitTimestamp(long startTs, LockToken commitLocksToken) { return referenceTrackingBatcher.getDelegate().getCommitTimestamp(namespace, startTs, commitLocksToken, cache); } diff --git a/lock-api/src/main/java/com/palantir/lock/client/ProfilingTimelockService.java b/lock-api/src/main/java/com/palantir/lock/client/ProfilingTimelockService.java index bfa07884c7..ad1c62c0a8 100644 --- a/lock-api/src/main/java/com/palantir/lock/client/ProfilingTimelockService.java +++ b/lock-api/src/main/java/com/palantir/lock/client/ProfilingTimelockService.java @@ -21,6 +21,7 @@ import com.google.common.util.concurrent.RateLimiter; import com.palantir.common.base.Throwables; import com.palantir.lock.v2.ClientLockingOptions; +import com.palantir.lock.v2.GetCommitTimestampResponse; import com.palantir.lock.v2.LockImmutableTimestampResponse; import com.palantir.lock.v2.LockRequest; import com.palantir.lock.v2.LockResponse; @@ -50,11 +51,11 @@ * operation every {@link ProfilingTimelockService#LOGGING_TIME_WINDOW}. It will log what the longest operation that * completed in the past {@link ProfilingTimelockService#LOGGING_TIME_WINDOW} window was, and how long it took. If no * operation took longer than {@link ProfilingTimelockService#SLOW_THRESHOLD} it will not log. - * + *

* The {@link ProfilingTimelockService} does not cover specific operations which are at risk for taking long * times owing to contention - in particular, this includes {@link #lock(LockRequest)} and * {@link #waitForLocks(WaitForLocksRequest)}. - * + *

* Profiling is done explicitly at this level (and not at {@link com.palantir.lock.v2.TimelockRpcClient} level) * to reflect the impact of timelock operations and cluster state changes (e.g. leader elections, rolling bounces) on * clients. @@ -102,7 +103,7 @@ public long getFreshTimestamp() { } @Override - public long getCommitTimestamp(long startTs, LockToken commitLocksToken) { + public GetCommitTimestampResponse getCommitTimestamp(long startTs, LockToken commitLocksToken) { return runTaskTimed("getCommitTimestamp", () -> delegate.getCommitTimestamp(startTs, commitLocksToken)); } @@ -128,6 +129,11 @@ public long getImmutableTimestamp() { return runTaskTimed("getImmutableTimestamp", delegate::getImmutableTimestamp); } + @Override + public long getCommitImmutableTimestamp() { + return runTaskTimed("getCommitImmutableTimestamp", delegate::getCommitImmutableTimestamp); + } + @Override public LockResponse lock(LockRequest request) { // Don't profile this, as it may be skewed by user contention on locks. diff --git a/lock-api/src/main/java/com/palantir/lock/client/RandomizedTimestampManager.java b/lock-api/src/main/java/com/palantir/lock/client/RandomizedTimestampManager.java index 0963801801..8d30b6560d 100644 --- a/lock-api/src/main/java/com/palantir/lock/client/RandomizedTimestampManager.java +++ b/lock-api/src/main/java/com/palantir/lock/client/RandomizedTimestampManager.java @@ -16,6 +16,7 @@ package com.palantir.lock.client; +import com.palantir.lock.v2.GetCommitTimestampResponse; import com.palantir.lock.v2.LockToken; import com.palantir.timestamp.TimestampRange; @@ -28,7 +29,7 @@ public interface RandomizedTimestampManager { long getFreshTimestamp(); - long getCommitTimestamp(long startTs, LockToken commitLocksToken); + GetCommitTimestampResponse getCommitTimestamp(long startTs, LockToken commitLocksToken); TimestampRange getFreshTimestamps(int numTimestampsRequested); } diff --git a/lock-api/src/main/java/com/palantir/lock/client/RemoteTimelockServiceAdapter.java b/lock-api/src/main/java/com/palantir/lock/client/RemoteTimelockServiceAdapter.java index 1db5f4894a..91fd3dba25 100644 --- a/lock-api/src/main/java/com/palantir/lock/client/RemoteTimelockServiceAdapter.java +++ b/lock-api/src/main/java/com/palantir/lock/client/RemoteTimelockServiceAdapter.java @@ -21,6 +21,7 @@ import com.palantir.atlasdb.timelock.api.ConjureTimestampRange; import com.palantir.atlasdb.timelock.api.Namespace; import com.palantir.lock.v2.ClientLockingOptions; +import com.palantir.lock.v2.GetCommitTimestampResponse; import com.palantir.lock.v2.LockImmutableTimestampResponse; import com.palantir.lock.v2.LockRequest; import com.palantir.lock.v2.LockResponse; @@ -102,7 +103,7 @@ public long getFreshTimestamp() { } @Override - public long getCommitTimestamp(long startTs, LockToken commitLocksToken) { + public GetCommitTimestampResponse getCommitTimestamp(long startTs, LockToken commitLocksToken) { return commitTimestampGetter.getCommitTimestamp(startTs, commitLocksToken); } @@ -119,6 +120,11 @@ public long getImmutableTimestamp() { return rpcClient.getImmutableTimestamp(); } + @Override + public long getCommitImmutableTimestamp() { + return rpcClient.getCommitImmutableTimestamp(); + } + @Override public WaitForLocksResponse waitForLocks(WaitForLocksRequest request) { return lockLeaseService.waitForLocks(request); diff --git a/lock-api/src/main/java/com/palantir/lock/client/TimeLockClient.java b/lock-api/src/main/java/com/palantir/lock/client/TimeLockClient.java index fd5b31c1b0..3c2ea9433b 100644 --- a/lock-api/src/main/java/com/palantir/lock/client/TimeLockClient.java +++ b/lock-api/src/main/java/com/palantir/lock/client/TimeLockClient.java @@ -22,6 +22,7 @@ import com.palantir.common.concurrent.PTExecutors; import com.palantir.leader.NotCurrentLeaderException; import com.palantir.lock.v2.ClientLockingOptions; +import com.palantir.lock.v2.GetCommitTimestampResponse; import com.palantir.lock.v2.LockImmutableTimestampResponse; import com.palantir.lock.v2.LockLeaseRefresher; import com.palantir.lock.v2.LockRequest; @@ -102,7 +103,7 @@ public long getFreshTimestamp() { } @Override - public long getCommitTimestamp(long startTs, LockToken commitLocksToken) { + public GetCommitTimestampResponse getCommitTimestamp(long startTs, LockToken commitLocksToken) { return delegate.getCommitTimestamp(startTs, commitLocksToken); } @@ -139,6 +140,11 @@ public long getImmutableTimestamp() { return executeOnTimeLock(delegate::getImmutableTimestamp); } + @Override + public long getCommitImmutableTimestamp() { + return executeOnTimeLock(delegate::getCommitImmutableTimestamp); + } + @Override public LockResponse lock(LockRequest request) { return lock(request, ClientLockingOptions.getDefault()); diff --git a/lock-api/src/main/java/com/palantir/lock/client/UnreliableTimeLockService.java b/lock-api/src/main/java/com/palantir/lock/client/UnreliableTimeLockService.java index e45252af68..4107202417 100644 --- a/lock-api/src/main/java/com/palantir/lock/client/UnreliableTimeLockService.java +++ b/lock-api/src/main/java/com/palantir/lock/client/UnreliableTimeLockService.java @@ -20,6 +20,7 @@ import com.palantir.atlasdb.buggify.api.BuggifyFactory; import com.palantir.atlasdb.buggify.impl.DefaultBuggifyFactory; import com.palantir.lock.v2.ClientLockingOptions; +import com.palantir.lock.v2.GetCommitTimestampResponse; import com.palantir.lock.v2.LockImmutableTimestampResponse; import com.palantir.lock.v2.LockRequest; import com.palantir.lock.v2.LockResponse; @@ -76,7 +77,7 @@ public long getFreshTimestamp() { } @Override - public long getCommitTimestamp(long startTs, LockToken commitLocksToken) { + public GetCommitTimestampResponse getCommitTimestamp(long startTs, LockToken commitLocksToken) { maybeRandomlyIncreaseTimestamp(); return timestampManager.getCommitTimestamp(startTs, commitLocksToken); } @@ -102,6 +103,12 @@ public long getImmutableTimestamp() { return delegate.getImmutableTimestamp(); } + @Override + public long getCommitImmutableTimestamp() { + // Feature not active + return Long.MAX_VALUE; + } + @Override public LockResponse lock(LockRequest request) { LockResponse response = delegate.lock(request); diff --git a/lock-api/src/main/java/com/palantir/lock/v2/DefaultNamespacedTimelockRpcClient.java b/lock-api/src/main/java/com/palantir/lock/v2/DefaultNamespacedTimelockRpcClient.java index 3335a2d354..f7f0e41517 100644 --- a/lock-api/src/main/java/com/palantir/lock/v2/DefaultNamespacedTimelockRpcClient.java +++ b/lock-api/src/main/java/com/palantir/lock/v2/DefaultNamespacedTimelockRpcClient.java @@ -30,6 +30,11 @@ public long getImmutableTimestamp() { return timelockRpcClient.getImmutableTimestamp(namespace); } + @Override + public long getCommitImmutableTimestamp() { + return timelockRpcClient.getCommitImmutableTimestamp(namespace); + } + @Override public long currentTimeMillis() { return timelockRpcClient.currentTimeMillis(namespace); diff --git a/lock-api/src/main/java/com/palantir/lock/v2/NamespacedTimelockRpcClient.java b/lock-api/src/main/java/com/palantir/lock/v2/NamespacedTimelockRpcClient.java index ac8d4c5f15..6f8622fd0e 100644 --- a/lock-api/src/main/java/com/palantir/lock/v2/NamespacedTimelockRpcClient.java +++ b/lock-api/src/main/java/com/palantir/lock/v2/NamespacedTimelockRpcClient.java @@ -23,5 +23,7 @@ public interface NamespacedTimelockRpcClient { long getImmutableTimestamp(); + long getCommitImmutableTimestamp(); + long currentTimeMillis(); } diff --git a/lock-api/src/main/java/com/palantir/lock/v2/TimelockRpcClient.java b/lock-api/src/main/java/com/palantir/lock/v2/TimelockRpcClient.java index c242825cc7..d1f39e491a 100644 --- a/lock-api/src/main/java/com/palantir/lock/v2/TimelockRpcClient.java +++ b/lock-api/src/main/java/com/palantir/lock/v2/TimelockRpcClient.java @@ -40,6 +40,10 @@ public interface TimelockRpcClient { @Path("immutable-timestamp") long getImmutableTimestamp(@PathParam("namespace") String namespace); + @POST + @Path("commit-immutable-timestamp") + long getCommitImmutableTimestamp(@PathParam("namespace") String namespace); + @POST @Path("current-time-millis") long currentTimeMillis(@PathParam("namespace") String namespace); diff --git a/lock-api/src/test/java/com/palantir/lock/client/BatchingCommitTimestampGetterTest.java b/lock-api/src/test/java/com/palantir/lock/client/BatchingCommitTimestampGetterTest.java index 7c787703f7..039bb9d46d 100644 --- a/lock-api/src/test/java/com/palantir/lock/client/BatchingCommitTimestampGetterTest.java +++ b/lock-api/src/test/java/com/palantir/lock/client/BatchingCommitTimestampGetterTest.java @@ -31,7 +31,13 @@ import com.palantir.atlasdb.autobatch.BatchElement; import com.palantir.atlasdb.autobatch.DisruptorAutobatcher; import com.palantir.atlasdb.timelock.api.GetCommitTimestampsResponse; +import com.palantir.common.time.NanoTime; import com.palantir.lock.StringLockDescriptor; +import com.palantir.lock.v2.GetCommitTimestampResponse; +import com.palantir.lock.v2.LeaderTime; +import com.palantir.lock.v2.LeadershipId; +import com.palantir.lock.v2.Lease; +import com.palantir.lock.v2.LockImmutableTimestampResponse; import com.palantir.lock.v2.LockToken; import com.palantir.lock.watch.LockEvent; import com.palantir.lock.watch.LockWatchCache; @@ -42,11 +48,13 @@ import com.palantir.lock.watch.LockWatchVersion; import com.palantir.lock.watch.TransactionUpdate; import com.palantir.tritium.metrics.registry.DefaultTaggedMetricRegistry; +import java.time.Duration; import java.util.Arrays; import java.util.List; import java.util.Optional; import java.util.UUID; import java.util.function.Consumer; +import java.util.stream.Collectors; import org.junit.jupiter.api.Test; import org.mockito.InOrder; import org.mockito.Mockito; @@ -68,13 +76,19 @@ public final class BatchingCommitTimestampGetterTest { private static final Optional IDENTIFIED_VERSION_1 = Optional.empty(); private static final Optional IDENTIFIED_VERSION_2 = Optional.of(LockWatchVersion.of(UUID.randomUUID(), -1)); + private static final LeadershipId LEADERSHIP_ID = LeadershipId.random(); + private static final LeaderTime LEADER_TIME = LeaderTime.of(LEADERSHIP_ID, NanoTime.createForTests(1L)); + private static final Lease LEASE = Lease.of(LEADER_TIME, Duration.ofSeconds(977)); + private static final LockToken COMMIT_TS_LOCK_TOKEN = mock(LockToken.class); + private static final LockImmutableTimestampResponse COMMIT_TS_RESPONSE = + LockImmutableTimestampResponse.of(1L, COMMIT_TS_LOCK_TOKEN); private final LockLeaseService lockLeaseService = mock(LockLeaseService.class); private final LockWatchEventCache eventCache = mock(LockWatchEventCache.class); private final LockWatchValueCache valueCache = mock(LockWatchValueCache.class); private final LockWatchCache cache = spy(new LockWatchCacheImpl(eventCache, valueCache)); - private final Consumer>> batchProcessor = - BatchingCommitTimestampGetter.consumer(lockLeaseService, cache); + private final Consumer>> + batchProcessor = BatchingCommitTimestampGetter.consumer(lockLeaseService, cache); @Test public void consumerFillsTheWholeBatch() { @@ -87,7 +101,10 @@ public void consumerFillsTheWholeBatch() { whenGetCommitTimestamps(IDENTIFIED_VERSION_1, 4, 5, 6, UPDATE_1); whenGetCommitTimestamps(IDENTIFIED_VERSION_2, 2, 7, 8, UPDATE_2); - assertThat(processBatch(request1, request2, request3, request4)).containsExactly(5L, 6L, 7L, 8L); + assertThat(processBatch(request1, request2, request3, request4).stream() + .map(GetCommitTimestampResponse::timestamp) + .collect(Collectors.toList())) + .containsExactly(5L, 6L, 7L, 8L); InOrder inOrder = Mockito.inOrder(lockLeaseService, cache); inOrder.verify(lockLeaseService).getCommitTimestamps(IDENTIFIED_VERSION_1, 4); @@ -109,16 +126,22 @@ private void whenGetCommitTimestamps( .inclusiveLower(start) .inclusiveUpper(end) .lockWatchUpdate(update) + .commitImmutableTimestamp(COMMIT_TS_RESPONSE) + .lease(LEASE) .build()); } - private List processBatch(BatchingCommitTimestampGetter.Request... requests) { - List> elements = Arrays.stream(requests) - .map(request -> ImmutableTestBatchElement.builder() - .argument(request) - .result(new DisruptorAutobatcher.DisruptorFuture<>( - AutobatcherTelemetryComponents.create("test", new DefaultTaggedMetricRegistry()))) - .build()) + private List processBatch(BatchingCommitTimestampGetter.Request... requests) { + List> elements = Arrays.stream( + requests) + .map(request -> + ImmutableTestBatchElement + .builder() + .argument(request) + .result(new DisruptorAutobatcher.DisruptorFuture<>( + AutobatcherTelemetryComponents.create( + "test", new DefaultTaggedMetricRegistry()))) + .build()) .collect(toList()); batchProcessor.accept(elements); return Futures.getUnchecked(Futures.allAsList(Lists.transform(elements, BatchElement::result))); diff --git a/lock-api/src/test/java/com/palantir/lock/client/MultiClientCommitTimestampGetterTest.java b/lock-api/src/test/java/com/palantir/lock/client/MultiClientCommitTimestampGetterTest.java index 4f425f8cfe..f34724cb5a 100644 --- a/lock-api/src/test/java/com/palantir/lock/client/MultiClientCommitTimestampGetterTest.java +++ b/lock-api/src/test/java/com/palantir/lock/client/MultiClientCommitTimestampGetterTest.java @@ -40,13 +40,20 @@ import com.palantir.atlasdb.timelock.api.GetCommitTimestampsResponse; import com.palantir.atlasdb.timelock.api.Namespace; import com.palantir.common.streams.KeyedStream; +import com.palantir.common.time.NanoTime; import com.palantir.lock.client.MultiClientCommitTimestampGetter.NamespacedRequest; +import com.palantir.lock.v2.GetCommitTimestampResponse; +import com.palantir.lock.v2.LeaderTime; +import com.palantir.lock.v2.LeadershipId; +import com.palantir.lock.v2.Lease; +import com.palantir.lock.v2.LockImmutableTimestampResponse; import com.palantir.lock.v2.LockToken; import com.palantir.lock.watch.LockWatchCache; import com.palantir.lock.watch.LockWatchCacheImpl; import com.palantir.lock.watch.LockWatchStateUpdate; import com.palantir.logsafe.exceptions.SafeIllegalStateException; import com.palantir.tritium.metrics.registry.DefaultTaggedMetricRegistry; +import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -61,6 +68,12 @@ public class MultiClientCommitTimestampGetterTest { private static final int COMMIT_TS_LIMIT_PER_REQUEST = 5; private static final SafeIllegalStateException EXCEPTION = new SafeIllegalStateException("Something went wrong!"); + private static final LeadershipId LEADERSHIP_ID = LeadershipId.random(); + private static final LeaderTime LEADER_TIME = LeaderTime.of(LEADERSHIP_ID, NanoTime.createForTests(1L)); + private static final Lease LEASE = Lease.of(LEADER_TIME, Duration.ofSeconds(977)); + private static final LockToken COMMIT_TS_LOCK_TOKEN = mock(LockToken.class); + private static final LockImmutableTimestampResponse COMMIT_TS_RESPONSE = + LockImmutableTimestampResponse.of(1L, COMMIT_TS_LOCK_TOKEN); private final Map lowestStartTsMap = new HashMap<>(); private final Map lockWatchCacheMap = new HashMap<>(); @@ -69,7 +82,7 @@ public class MultiClientCommitTimestampGetterTest { private final InternalMultiClientConjureTimelockService timelockService = mock(InternalMultiClientConjureTimelockService.class); - private final Consumer>> consumer = + private final Consumer>> consumer = MultiClientCommitTimestampGetter.consumer(timelockService); private final LockWatchStateUpdate lockWatchStateUpdate = @@ -102,7 +115,8 @@ public void canServiceMultipleClientsWithMultipleServerCalls() { @Test public void updatesCacheWhileProcessingResponse() { Namespace client = Namespace.of("Kitty"); - List> batchElements = IntStream.range(0, COMMIT_TS_LIMIT_PER_REQUEST * 2) + List> batchElements = IntStream.range( + 0, COMMIT_TS_LIMIT_PER_REQUEST * 2) .mapToObj(ind -> batchElementForNamespace(client)) .collect(toList()); setupServiceAndAssertSanityOfResponse(batchElements); @@ -116,11 +130,13 @@ public void doesNotUpdateCacheIfClientNotServed() { Namespace alpha = Namespace.of("alpha" + UUID.randomUUID()); Namespace beta = Namespace.of("beta" + UUID.randomUUID()); - BatchElement requestForAlpha = batchElementForNamespace(alpha); - BatchElement requestForBeta = batchElementForNamespace(beta); + BatchElement requestForAlpha = batchElementForNamespace(alpha); + BatchElement requestForBeta = batchElementForNamespace(beta); - List> allRequests = ImmutableList.of(requestForAlpha, requestForBeta); - List> alphaRequestList = ImmutableList.of(requestForAlpha); + List> allRequests = + ImmutableList.of(requestForAlpha, requestForBeta); + List> alphaRequestList = + ImmutableList.of(requestForAlpha); Map responseMap = getCommitTimestamps(alphaRequestList); when(timelockService.getCommitTimestamps(any())).thenReturn(responseMap).thenThrow(EXCEPTION); @@ -141,7 +157,8 @@ public void doesNotUpdateCacheIfClientNotServed() { verify(betaCache, never()).processCommitTimestampsUpdate(any(), any()); } - private void setupServiceAndAssertSanityOfResponse(List> batch) { + private void setupServiceAndAssertSanityOfResponse( + List> batch) { Map> expectedResponseMap = new HashMap<>(); when(timelockService.getCommitTimestamps(any())).thenAnswer(invocation -> { @@ -160,13 +177,13 @@ private void setupServiceAndAssertSanityOfResponse(List> batch, + List> batch, Map> expectedResponseMap) { assertThat(batch.stream().filter(elem -> !elem.result().isDone()).collect(Collectors.toSet())) .as("All requests must be served") .isEmpty(); - Map> partitionedResponseMap = batch.stream() + Map> partitionedResponseMap = batch.stream() .collect(groupingBy( elem -> elem.argument().namespace(), Collectors.mapping(elem -> Futures.getUnchecked(elem.result()), toList()))); @@ -177,14 +194,18 @@ private void assertSanityOfResponse( private static void assertCorrectnessOfCompletedRequests( Map> expectedResponseMap, - Map> partitionedResponseMap) { + Map> partitionedResponseMap) { KeyedStream.stream(partitionedResponseMap) .forEach((namespace, commitTsList) -> assertCorrectnessOfServedTimestamps(expectedResponseMap.get(namespace), commitTsList)); } private static void assertCorrectnessOfServedTimestamps( - List expectedCommitTimestampsResponses, List commitTsList) { + List expectedCommitTimestampsResponses, + List responsesList) { + List commitTsList = responsesList.stream() + .map(GetCommitTimestampResponse::timestamp) + .collect(Collectors.toList()); long requestedCommitTsCount = expectedCommitTimestampsResponses.stream() .mapToLong(resp -> resp.getInclusiveUpper() - resp.getInclusiveLower() + 1) .sum(); @@ -202,8 +223,8 @@ private static void assertCorrectnessOfServedTimestamps( } private Map getCommitTimestamps( - List> batch) { - Map>> partitionedRequests = + List> batch) { + Map>> partitionedRequests = batch.stream().collect(groupingBy(elem -> elem.argument().namespace(), toList())); return getCommitTimestampResponse(KeyedStream.stream(partitionedRequests) .map(requestList -> GetCommitTimestampsRequest.builder() @@ -226,6 +247,8 @@ private Map getCommitTimestampResponse( .inclusiveLower(inclusiveLower) .inclusiveUpper(exclusiveUpper - 1) .lockWatchUpdate(lockWatchStateUpdate) + .commitImmutableTimestamp(COMMIT_TS_RESPONSE) + .lease(LEASE) .build()); }) .collectToMap(); @@ -239,15 +262,15 @@ private void updateLowerBound(Namespace namespace, long numTimestamps) { lowestStartTsMap.put(namespace, lowestStartTsMap.getOrDefault(namespace, 1L) + numTimestamps); } - private List> getCommitTimestampRequestsForClients( + private List> getCommitTimestampRequestsForClients( int clientCount, int requestCount) { - List> test = IntStream.range(0, requestCount) + List> test = IntStream.range(0, requestCount) .mapToObj(ind -> batchElementForNamespace(Namespace.of("Test_" + (ind % clientCount)))) .collect(Collectors.toList()); return test; } - private BatchElement batchElementForNamespace(Namespace namespace) { + private BatchElement batchElementForNamespace(Namespace namespace) { return BatchElement.of( ImmutableNamespacedRequest.builder() .namespace(namespace) @@ -255,7 +278,7 @@ private BatchElement batchElementForNamespace(Namespace .cache(lockWatchCacheMap.computeIfAbsent(namespace, _unused -> spy(LockWatchCacheImpl.noOp()))) .commitLocksToken(lockToken) .build(), - new DisruptorFuture( + new DisruptorFuture<>( AutobatcherTelemetryComponents.create("test", new DefaultTaggedMetricRegistry()))); } } diff --git a/lock-api/src/test/java/com/palantir/lock/client/TimestampCorroboratingTimelockServiceTest.java b/lock-api/src/test/java/com/palantir/lock/client/TimestampCorroboratingTimelockServiceTest.java index 036b4a7691..3e6575218c 100644 --- a/lock-api/src/test/java/com/palantir/lock/client/TimestampCorroboratingTimelockServiceTest.java +++ b/lock-api/src/test/java/com/palantir/lock/client/TimestampCorroboratingTimelockServiceTest.java @@ -168,7 +168,13 @@ public void startTransactionsThrowsIfSpanningBound() { @Test public void getCommitTimestampsShouldFail() { when(rawTimelockService.getCommitTimestamps(any())) - .thenReturn(GetCommitTimestampsResponse.of(1L, 3L, LOCK_WATCH_UPDATE)); + .thenReturn(GetCommitTimestampsResponse.builder() + .inclusiveLower(1L) + .inclusiveUpper(3L) + .lockWatchUpdate(LOCK_WATCH_UPDATE) + .commitImmutableTimestamp(LOCK_IMMUTABLE_TIMESTAMP_RESPONSE) + .lease(Lease.of(LeaderTime.of(LeadershipId.random(), NanoTime.now()), Duration.ZERO)) + .build()); assertThrowsOnSecondCall(() -> timelockService.getCommitTimestamps( GetCommitTimestampsRequest.of(3, ConjureIdentifiedVersion.of(UUID.randomUUID(), 3L)))); assertThat(timelockService diff --git a/lock-impl/src/main/java/com/palantir/lock/impl/LegacyTimelockService.java b/lock-impl/src/main/java/com/palantir/lock/impl/LegacyTimelockService.java index 0ff9978a39..69c54c1a00 100644 --- a/lock-impl/src/main/java/com/palantir/lock/impl/LegacyTimelockService.java +++ b/lock-impl/src/main/java/com/palantir/lock/impl/LegacyTimelockService.java @@ -25,6 +25,7 @@ import com.palantir.lock.LockService; import com.palantir.lock.SimpleTimeDuration; import com.palantir.lock.v2.ClientLockingOptions; +import com.palantir.lock.v2.GetCommitTimestampResponse; import com.palantir.lock.v2.LockImmutableTimestampResponse; import com.palantir.lock.v2.LockRequest; import com.palantir.lock.v2.LockResponse; @@ -68,8 +69,11 @@ public long getFreshTimestamp() { } @Override - public long getCommitTimestamp(long startTs, LockToken commitLocksToken) { - return getFreshTimestamp(); + public GetCommitTimestampResponse getCommitTimestamp(long startTs, LockToken commitLocksToken) { + // TODO(jakubk): Fingers crossed this passes tests and I can ignore this + long freshTimestamp = getFreshTimestamp(); + return GetCommitTimestampResponse.of( + LockImmutableTimestampResponse.of(freshTimestamp, commitLocksToken), freshTimestamp); } @Override @@ -114,6 +118,11 @@ public long getImmutableTimestamp() { return getImmutableTimestampInternal(ts); } + @Override + public long getCommitImmutableTimestamp() { + return Long.MAX_VALUE; + } + @Override public LockResponse lock(LockRequest request) { LockRefreshToken legacyToken = lockAnonymous(toLegacyLockRequest(request)); diff --git a/timelock-api/src/main/conjure/timelock-api.yml b/timelock-api/src/main/conjure/timelock-api.yml index bb3e8d6fbb..77de1f936d 100644 --- a/timelock-api/src/main/conjure/timelock-api.yml +++ b/timelock-api/src/main/conjure/timelock-api.yml @@ -204,6 +204,8 @@ types: inclusiveLower: Long inclusiveUpper: Long lockWatchUpdate: LockWatchStateUpdate + commitImmutableTimestamp: LockImmutableTimestampResponse + lease: Lease GetCommitTimestampRequest: fields: lastKnownVersion: optional diff --git a/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/AsyncTimelockService.java b/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/AsyncTimelockService.java index ccd79d4735..d760425c9f 100644 --- a/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/AsyncTimelockService.java +++ b/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/AsyncTimelockService.java @@ -57,6 +57,8 @@ public interface AsyncTimelockService long getImmutableTimestamp(); + long getCommitImmutableTimestamp(); + StartAtlasDbTransactionResponse deprecatedStartTransaction(IdentifiedTimeLockRequest request); StartAtlasDbTransactionResponseV3 startTransaction(StartIdentifiedAtlasDbTransactionRequest request); diff --git a/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/AsyncTimelockServiceImpl.java b/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/AsyncTimelockServiceImpl.java index ada32c9f8a..f387265980 100644 --- a/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/AsyncTimelockServiceImpl.java +++ b/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/AsyncTimelockServiceImpl.java @@ -55,6 +55,7 @@ import com.palantir.lock.watch.LockWatchVersion; import com.palantir.timestamp.ManagedTimestampService; import com.palantir.timestamp.TimestampRange; +import com.palantir.tritium.ids.UniqueIds; import java.util.Optional; import java.util.Set; import java.util.UUID; @@ -106,6 +107,12 @@ public long getImmutableTimestamp() { return lockService.getImmutableTimestamp().orElse(timestamp); } + @Override + public long getCommitImmutableTimestamp() { + long timestamp = timestampService.getFreshTimestamp(); + return lockService.getCommitImmutableTimestamp().orElse(timestamp); + } + @Override public ListenableFuture lock(IdentifiedLockRequest request) { AsyncResult> result = lockService.lock( @@ -206,6 +213,18 @@ private Leased lockImmutableTimestampWithLease(U return Leased.of(lockImmutableTimestampResponse, leasedLock.lease()); } + private Leased lockCommitImmutableTimestampWithLease(long timestamp) { + Leased leasedLock = lockService + .lockCommitImmutableTimestamp(UniqueIds.pseudoRandomUuidV4(), timestamp) + .get(); + long immutableTs = lockService.getCommitImmutableTimestamp().orElse(timestamp); + + LockImmutableTimestampResponse lockImmutableTimestampResponse = + LockImmutableTimestampResponse.of(immutableTs, leasedLock.value()); + + return Leased.of(lockImmutableTimestampResponse, leasedLock.lease()); + } + @Override public ListenableFuture startTransactionsWithWatches( ConjureStartTransactionsRequest request) { @@ -234,11 +253,19 @@ private ConjureStartTransactionsResponse startTransactionsWithWatchesSync(Conjur @Override public ListenableFuture getCommitTimestamps( int numTimestamps, Optional lastKnownVersion) { + // TODO(jakubk): This is a pretty obvious race between grabbing the timestamps + // and locking the immutable timestamp, but this exists in the case of the normal immutable timestamp, + // so let's not worry about it for now. TimestampRange freshTimestamps = getFreshTimestamps(numTimestamps); - return Futures.immediateFuture(GetCommitTimestampsResponse.of( - freshTimestamps.getLowerBound(), - freshTimestamps.getUpperBound(), - getWatchStateUpdate(lastKnownVersion))); + Leased leasedLockImmutableCommitTimestampResponse = + lockCommitImmutableTimestampWithLease(freshTimestamps.getLowerBound()); + return Futures.immediateFuture(GetCommitTimestampsResponse.builder() + .inclusiveLower(freshTimestamps.getLowerBound()) + .inclusiveUpper(freshTimestamps.getUpperBound()) + .lockWatchUpdate(getWatchStateUpdate(lastKnownVersion)) + .commitImmutableTimestamp(leasedLockImmutableCommitTimestampResponse.value()) + .lease(leasedLockImmutableCommitTimestampResponse.lease()) + .build()); } @Override diff --git a/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/UndertowAsyncTimelockResource.java b/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/UndertowAsyncTimelockResource.java index a207eb721f..0429cb34fd 100644 --- a/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/UndertowAsyncTimelockResource.java +++ b/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/UndertowAsyncTimelockResource.java @@ -138,6 +138,16 @@ public long getImmutableTimestamp( return getAsyncTimelockService(namespace, userAgent).getImmutableTimestamp(); } + @Handle(method = HttpMethod.POST, path = "/{namespace}/timelock/commit-immutable-timestamp") + public long getCommitImmutableTimestamp( + @Safe @Handle.PathParam String namespace, + @Safe + @HeaderParam(TimelockNamespaces.USER_AGENT_HEADER) + @Handle.Header(TimelockNamespaces.USER_AGENT_HEADER) + Optional userAgent) { + return getAsyncTimelockService(namespace, userAgent).getCommitImmutableTimestamp(); + } + @Handle(method = HttpMethod.POST, path = "/{namespace}/timelock/lock") public ListenableFuture deprecatedLock( @Safe @Handle.PathParam String namespace, diff --git a/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/lock/AsyncLockService.java b/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/lock/AsyncLockService.java index d8fdf17d28..74ff8336e8 100644 --- a/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/lock/AsyncLockService.java +++ b/timelock-impl/src/main/java/com/palantir/atlasdb/timelock/lock/AsyncLockService.java @@ -44,6 +44,7 @@ public class AsyncLockService implements Closeable { private final HeldLocksCollection heldLocks; private final AwaitedLocksCollection awaitedLocks; private final ImmutableTimestampTracker immutableTsTracker; + private final ImmutableTimestampTracker immutableCommitTsTracker; private final LeaderClock leaderClock; private final LockLog lockLog; private final LockWatchingService lockWatchingService; @@ -73,6 +74,7 @@ public static AsyncLockService createDefault( return new AsyncLockService( new LockCollection(), new ImmutableTimestampTracker(), + new ImmutableTimestampTracker(), lockAcquirer, heldLocks, new AwaitedLocksCollection(), @@ -86,6 +88,7 @@ public static AsyncLockService createDefault( AsyncLockService( LockCollection locks, ImmutableTimestampTracker immutableTimestampTracker, + ImmutableTimestampTracker immutableCommitTimestampTracker, LockAcquirer acquirer, HeldLocksCollection heldLocks, AwaitedLocksCollection awaitedLocks, @@ -96,6 +99,7 @@ public static AsyncLockService createDefault( LockLog lockLog) { this.locks = locks; this.immutableTsTracker = immutableTimestampTracker; + this.immutableCommitTsTracker = immutableCommitTimestampTracker; this.heldLocks = heldLocks; this.awaitedLocks = awaitedLocks; this.reaperExecutor = reaperExecutor; @@ -142,6 +146,14 @@ public AsyncResult> lockImmutableTimestamp(UUID requestId, lon return immutableTimestampLockResult; } + public AsyncResult> lockCommitImmutableTimestamp(UUID requestId, long timestamp) { + AsyncResult> immutableTimestampLockResult = heldLocks.getExistingOrAcquire( + requestId, () -> acquireCommitImmutableTimestampLock(requestId, timestamp)); + // TODO(fdesouza): Remove this once PDS-95791 is resolved. + // lockLog.registerLockImmutableTimestampRequest(requestId, timestamp, immutableTimestampLockResult); + return immutableTimestampLockResult; + } + public AsyncResult waitForLocks(UUID requestId, Set lockDescriptors, TimeLimit timeout) { return awaitedLocks.getExistingOrAwait(requestId, () -> awaitLocks(requestId, lockDescriptors, timeout)); } @@ -150,6 +162,10 @@ public Optional getImmutableTimestamp() { return immutableTsTracker.getImmutableTimestamp(); } + public Optional getCommitImmutableTimestamp() { + return immutableCommitTsTracker.getImmutableTimestamp(); + } + private AsyncResult acquireLocks( UUID requestId, Set lockDescriptors, @@ -169,6 +185,11 @@ private AsyncResult acquireImmutableTimestampLock(UUID requestId, lon return lockAcquirer.acquireLocks(requestId, OrderedLocks.fromSingleLock(immutableTsLock), TimeLimit.zero()); } + private AsyncResult acquireCommitImmutableTimestampLock(UUID requestId, long timestamp) { + AsyncLock immutableTsLock = immutableCommitTsTracker.getLockFor(timestamp); + return lockAcquirer.acquireLocks(requestId, OrderedLocks.fromSingleLock(immutableTsLock), TimeLimit.zero()); + } + public boolean unlock(LockToken token) { return unlock(ImmutableSet.of(token)).contains(token); } diff --git a/timelock-impl/src/test/java/com/palantir/atlasdb/timelock/batch/MultiClientConjureTimelockResourceTest.java b/timelock-impl/src/test/java/com/palantir/atlasdb/timelock/batch/MultiClientConjureTimelockResourceTest.java index 46ffc5805b..a915a9713c 100644 --- a/timelock-impl/src/test/java/com/palantir/atlasdb/timelock/batch/MultiClientConjureTimelockResourceTest.java +++ b/timelock-impl/src/test/java/com/palantir/atlasdb/timelock/batch/MultiClientConjureTimelockResourceTest.java @@ -72,6 +72,12 @@ public class MultiClientConjureTimelockResourceTest { private static final RedirectRetryTargeter TARGETER = RedirectRetryTargeter.create(LOCAL, ImmutableList.of(LOCAL, REMOTE)); private static final int DUMMY_COMMIT_TS_COUNT = 5; + private static final LeadershipId LEADERSHIP_ID = LeadershipId.random(); + private static final LeaderTime LEADER_TIME = LeaderTime.of(LEADERSHIP_ID, NanoTime.createForTests(1L)); + private static final Lease LEASE = Lease.of(LEADER_TIME, Duration.ofSeconds(977)); + private static final LockToken COMMIT_TS_LOCK_TOKEN = mock(LockToken.class); + private static final LockImmutableTimestampResponse COMMIT_TS_RESPONSE = + LockImmutableTimestampResponse.of(1L, COMMIT_TS_LOCK_TOKEN); private Map namespaces = new HashMap<>(); private Map namespaceToLeaderMap = new HashMap<>(); @@ -234,8 +240,13 @@ private AsyncTimelockService createAsyncTimeLockServiceForClient(String client) private GetCommitTimestampsResponse getCommitTimestampResponse(String namespace) { int inclusiveLower = getInclusiveLowerCommitTs(namespace); - return GetCommitTimestampsResponse.of( - inclusiveLower, inclusiveLower + DUMMY_COMMIT_TS_COUNT, lockWatchStateUpdate); + return GetCommitTimestampsResponse.builder() + .inclusiveLower(inclusiveLower) + .inclusiveUpper(inclusiveLower + DUMMY_COMMIT_TS_COUNT) + .lockWatchUpdate(lockWatchStateUpdate) + .commitImmutableTimestamp(COMMIT_TS_RESPONSE) + .lease(LEASE) + .build(); } private Integer getInclusiveLowerCommitTs(String namespace) { diff --git a/timelock-server/src/test/java/com/palantir/atlasdb/timelock/lock/AsyncLockServiceEteTest.java b/timelock-server/src/test/java/com/palantir/atlasdb/timelock/lock/AsyncLockServiceEteTest.java index 75295e575f..2f4f2b7204 100644 --- a/timelock-server/src/test/java/com/palantir/atlasdb/timelock/lock/AsyncLockServiceEteTest.java +++ b/timelock-server/src/test/java/com/palantir/atlasdb/timelock/lock/AsyncLockServiceEteTest.java @@ -69,6 +69,7 @@ public class AsyncLockServiceEteTest { private final AsyncLockService service = new AsyncLockService( new LockCollection(), new ImmutableTimestampTracker(), + new ImmutableTimestampTracker(), new LockAcquirer( new LockLog(new MetricRegistry(), () -> 2L), Executors.newSingleThreadScheduledExecutor(), diff --git a/timelock-server/src/test/java/com/palantir/atlasdb/timelock/lock/AsyncLockServiceTest.java b/timelock-server/src/test/java/com/palantir/atlasdb/timelock/lock/AsyncLockServiceTest.java index dd82982f47..0c9c7ab98c 100644 --- a/timelock-server/src/test/java/com/palantir/atlasdb/timelock/lock/AsyncLockServiceTest.java +++ b/timelock-server/src/test/java/com/palantir/atlasdb/timelock/lock/AsyncLockServiceTest.java @@ -58,10 +58,12 @@ public class AsyncLockServiceTest { private final HeldLocksCollection heldLocks = spy(HeldLocksCollection.create(leaderClock)); private final AwaitedLocksCollection awaitedLocks = spy(new AwaitedLocksCollection()); private final ImmutableTimestampTracker immutableTimestampTracker = mock(ImmutableTimestampTracker.class); + private final ImmutableTimestampTracker commitImmutableTimestampTracker = mock(ImmutableTimestampTracker.class); private final DeterministicScheduler reaperExecutor = new DeterministicScheduler(); private final AsyncLockService lockService = new AsyncLockService( locks, immutableTimestampTracker, + commitImmutableTimestampTracker, acquirer, heldLocks, awaitedLocks,