Skip to content

Commit

Permalink
Immutable commit timestamp and delayed writes.
Browse files Browse the repository at this point in the history
  • Loading branch information
jkozlowski committed Aug 6, 2024
1 parent 47cc06a commit 897d00a
Show file tree
Hide file tree
Showing 45 changed files with 628 additions and 139 deletions.
38 changes: 38 additions & 0 deletions .palantir/revapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,36 @@ acceptedBreaks:
- code: "java.method.addedToInterface"
new: "method boolean com.palantir.atlasdb.cell.api.TransactionKeyValueService::isValid(long)"
justification: "internal API"
"0.1062.0":
com.palantir.atlasdb:atlasdb-api:
- code: "java.annotation.removed"
old: "parameter void com.palantir.atlasdb.transaction.api.precommit.PreCommitRequirementValidator::throwIfImmutableTsOrCommitLocksExpired(===com.palantir.lock.v2.LockToken===)"
new: "parameter void com.palantir.atlasdb.transaction.api.precommit.PreCommitRequirementValidator::throwIfImmutableTsOrCommitLocksExpired(===java.util.Set<com.palantir.lock.v2.LockToken>===)"
justification: "Prototyping"
- code: "java.annotation.removed"
old: "parameter void com.palantir.atlasdb.transaction.api.precommit.PreCommitRequirementValidator::throwIfPreCommitRequirementsNotMet(===com.palantir.lock.v2.LockToken===,\
\ long)"
new: "parameter void com.palantir.atlasdb.transaction.api.precommit.PreCommitRequirementValidator::throwIfPreCommitRequirementsNotMet(===java.util.Set<com.palantir.lock.v2.LockToken>===,\
\ long)"
justification: "Prototyping"
- code: "java.method.abstractMethodAdded"
new: "method void com.palantir.atlasdb.keyvalue.api.watch.LockWatchManager::logState()"
justification: "Prototype"
- code: "java.method.addedToInterface"
new: "method void com.palantir.atlasdb.transaction.api.Transaction::putDelayed(com.palantir.atlasdb.keyvalue.api.TableReference,\
\ java.util.List<java.util.Map.Entry<com.palantir.atlasdb.transaction.api.DelayedWrite,\
\ byte[]>>)"
justification: "Prototype"
- code: "java.method.parameterTypeChanged"
old: "parameter void com.palantir.atlasdb.transaction.api.precommit.PreCommitRequirementValidator::throwIfImmutableTsOrCommitLocksExpired(===com.palantir.lock.v2.LockToken===)"
new: "parameter void com.palantir.atlasdb.transaction.api.precommit.PreCommitRequirementValidator::throwIfImmutableTsOrCommitLocksExpired(===java.util.Set<com.palantir.lock.v2.LockToken>===)"
justification: "Prototyping"
- code: "java.method.parameterTypeChanged"
old: "parameter void com.palantir.atlasdb.transaction.api.precommit.PreCommitRequirementValidator::throwIfPreCommitRequirementsNotMet(===com.palantir.lock.v2.LockToken===,\
\ long)"
new: "parameter void com.palantir.atlasdb.transaction.api.precommit.PreCommitRequirementValidator::throwIfPreCommitRequirementsNotMet(===java.util.Set<com.palantir.lock.v2.LockToken>===,\
\ long)"
justification: "Prototyping"
"0.1073.0":
com.palantir.atlasdb:atlasdb-api:
- code: "java.method.parameterTypeChanged"
Expand Down Expand Up @@ -151,6 +181,14 @@ acceptedBreaks:
new: "method com.palantir.lock.LockState com.palantir.atlasdb.factory.timelock.TimeoutSensitiveLockRpcClient::getLockState(java.lang.String,\
\ com.palantir.lock.LockDescriptor)"
justification: "LockRpcClient#getLockState is broken"
"0.1129.0":
com.palantir.atlasdb:atlasdb-api:
- code: "java.method.addedToInterface"
new: "method long com.palantir.atlasdb.transaction.api.TransactionManager::getCommitImmutableTimestamp()"
justification: "Prototype"
- code: "java.method.addedToInterface"
new: "method void com.palantir.atlasdb.transaction.api.Transaction::putDelayed(java.util.List<com.palantir.atlasdb.transaction.api.DelayedWrite>)"
justification: "Prototype"
"0.770.0":
com.palantir.atlasdb:atlasdb-api:
- code: "java.class.removed"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* (c) Copyright 2024 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.atlasdb.transaction.api;

import com.palantir.atlasdb.keyvalue.api.Cell;
import com.palantir.atlasdb.keyvalue.api.TableReference;
import java.util.Map;
import java.util.function.LongFunction;

/**
* Curious creature, but it allows the user to delay deciding the particular cell to write to until the actual commit,
* at which point the user will be provided with a unique long value that can be used to generate the cell.
*
* This is a very specific feature that not many people probably require, or should try to use.
*
* Right now the interface allows you to do TOO MUCH, you don't even have to specify the tables you write to.
* This can be improved and locked down, but this allows for maximum experimentation.
*/
@FunctionalInterface
public interface DelayedWrite extends LongFunction<Map<TableReference, Map<Cell, byte[]>>> {}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.palantir.lock.watch.ChangeMetadata;
import com.palantir.util.result.Result;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
Expand Down Expand Up @@ -323,6 +324,9 @@ Stream<BatchingVisitable<RowResult<byte[]>>> getRangesLazy(
@Idempotent
void putWithMetadata(TableReference tableRef, Map<Cell, ValueAndChangeMetadata> valuesAndMetadata);

@Idempotent
void putDelayed(List<DelayedWrite> values);

/**
* Deletes values from the key-value store.
* @param tableRef the table from which to delete the values
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,8 @@ <T, C extends PreCommitCondition, E extends Exception> T runTaskWithConditionRea
*/
long getImmutableTimestamp();

long getCommitImmutableTimestamp();

/**
* Returns the lock service used by this transaction manager.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import com.palantir.atlasdb.transaction.api.TransactionFailedException;
import com.palantir.lock.v2.LockToken;
import java.util.Map;
import javax.annotation.Nullable;
import java.util.Set;

public interface PreCommitRequirementValidator {
/**
Expand All @@ -41,10 +41,10 @@ void throwIfPreCommitConditionInvalidAtCommitOnWriteTransaction(
* user pre-commit condition is no longer valid, or possibly because of other internal state such as commit
* locks having expired.
*/
void throwIfPreCommitRequirementsNotMet(@Nullable LockToken commitLocksToken, long timestamp);
void throwIfPreCommitRequirementsNotMet(Set<LockToken> commitLocksToken, long timestamp);

/**
* Throws a {@link TransactionFailedException} if the immutable timestamp lock or commit locks have expired.
*/
void throwIfImmutableTsOrCommitLocksExpired(@Nullable LockToken commitLocksToken);
void throwIfImmutableTsOrCommitLocksExpired(Set<LockToken> commitLocksToken);
}
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ public void testCfEqualityChecker() throws TException {
.collect(Collectors.toList()));

assertThat(ColumnFamilyDefinitions.isMatchingCf(
kvs.getCfForTable(NEVER_SEEN, getMetadata(), FOUR_DAYS_IN_SECONDS), clusterSideCf))
kvs.getCfForTable(NEVER_SEEN, getMetadata(), FOUR_DAYS_IN_SECONDS), clusterSideCf))
.as("After serialization and deserialization to database, Cf metadata did not match.")
.isTrue();
}
Expand All @@ -256,8 +256,10 @@ private static String getInternalTestTableName() {
@SuppressWarnings("CompileTimeConstant")
public void shouldNotErrorForTimestampTableWhenCreatingCassandraKvs() {
verify(logger, atLeast(0))
.error(startsWith("Found a table {} that did not have persisted"),
assertArg((Arg<String> arg) -> assertThat(arg.getValue()).doesNotContain("timestamp")));
.error(
startsWith("Found a table {} that did not have persisted"),
assertArg(
(Arg<String> arg) -> assertThat(arg.getValue()).doesNotContain("timestamp")));
}

@Test
Expand Down Expand Up @@ -492,20 +494,20 @@ public void setOnceTest() {

keyValueService.putUnlessExists(userTable, ImmutableMap.of(CELL, sad));
assertThat(keyValueService
.get(userTable, ImmutableMap.of(CELL, Long.MAX_VALUE))
.get(CELL)
.getContents())
.get(userTable, ImmutableMap.of(CELL, Long.MAX_VALUE))
.get(CELL)
.getContents())
.containsExactly(sad);

keyValueService.setOnce(userTable, ImmutableMap.of(CELL, happy));
assertThat(keyValueService
.get(userTable, ImmutableMap.of(CELL, Long.MAX_VALUE))
.get(CELL)
.getContents())
.get(userTable, ImmutableMap.of(CELL, Long.MAX_VALUE))
.get(CELL)
.getContents())
.containsExactly(happy);
assertThat(keyValueService
.getAllTimestamps(userTable, ImmutableSet.of(CELL), Long.MAX_VALUE)
.size())
.getAllTimestamps(userTable, ImmutableSet.of(CELL), Long.MAX_VALUE)
.size())
.isEqualTo(1);
keyValueService.truncateTable(userTable);
}
Expand Down Expand Up @@ -665,9 +667,9 @@ public void testMultiCheckAndSetCannotUpdateAcrossMultipleRows() {
Cell nextTestCell = Cell.create(row(1), column(1));

assertThatThrownBy(() -> keyValueService.multiCheckAndSet(MultiCheckAndSetRequest.newCells(
TEST_TABLE,
firstTestCell.getRowName(),
ImmutableMap.of(firstTestCell, val(0, 0), nextTestCell, val(0, 1)))))
TEST_TABLE,
firstTestCell.getRowName(),
ImmutableMap.of(firstTestCell, val(0, 0), nextTestCell, val(0, 1)))))
.isInstanceOf(SafeIllegalStateException.class)
.hasMessageContaining("Can only update cells in one row.");
}
Expand All @@ -683,7 +685,7 @@ public void testMultiCheckAndSetIndependentlyFails() {
TEST_TABLE, nextTestCell.getRowName(), ImmutableMap.of(nextTestCell, val(0, 1))));

assertThatThrownBy(() -> keyValueService.multiCheckAndSet(MultiCheckAndSetRequest.newCells(
TEST_TABLE, nextTestCell.getRowName(), ImmutableMap.of(nextTestCell, val(0, 2)))))
TEST_TABLE, nextTestCell.getRowName(), ImmutableMap.of(nextTestCell, val(0, 2)))))
.isInstanceOf(MultiCheckAndSetException.class);

MultiCheckAndSetException ex = catchThrowableOfType(
Expand Down Expand Up @@ -808,7 +810,7 @@ private CfDef createDefaultCfDef(String namespace, String tableName) {
.setComment("")
.setColumn_metadata(new ArrayList<>())
.setTriggers(new ArrayList<>())
.setKey_alias(new byte[]{0x6B, 0x65, 0x79})
.setKey_alias(new byte[] {0x6B, 0x65, 0x79})
.setComparator_type("org.apache.cassandra.db.marshal.CompositeType"
+ "(org.apache.cassandra.db.marshal.BytesType,org.apache.cassandra.db.marshal.LongType)")
.setCompaction_strategy_options(new HashMap<>())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.palantir.atlasdb.keyvalue.api.RowResult;
import com.palantir.atlasdb.keyvalue.api.TableReference;
import com.palantir.atlasdb.transaction.api.ConstraintCheckable;
import com.palantir.atlasdb.transaction.api.DelayedWrite;
import com.palantir.atlasdb.transaction.api.GetRangesQuery;
import com.palantir.atlasdb.transaction.api.Transaction;
import com.palantir.atlasdb.transaction.api.TransactionFailedException;
Expand All @@ -37,6 +38,7 @@
import com.palantir.lock.watch.ChangeMetadata;
import com.palantir.util.result.Result;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
Expand Down Expand Up @@ -144,6 +146,11 @@ public void putWithMetadata(TableReference tableRef, Map<Cell, ValueAndChangeMet
delegate().putWithMetadata(tableRef, valuesAndMetadata);
}

@Override
public void putDelayed(List<DelayedWrite> values) {
delegate().putDelayed(values);
}

@Override
public void delete(TableReference tableRef, Set<Cell> keys) {
delegate().delete(tableRef, keys);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,13 @@ public ImmutableTimestampLockManager(
this.lockValidityChecker = lockValidityChecker;
}

public Optional<ExpiredLocks> getExpiredImmutableTimestampAndCommitLocks(Optional<LockToken> commitLocksToken) {
public Optional<ExpiredLocks> getExpiredImmutableTimestampAndCommitLocks(Set<LockToken> commitLocksToken) {
Set<LockToken> toRefresh = new HashSet<>();
commitLocksToken.ifPresent(toRefresh::add);

// TODO(jakubk): Handle this upstream
if (commitLocksToken != null) {
toRefresh.addAll(commitLocksToken);
}
immutableTimestampLock.ifPresent(toRefresh::add);

if (toRefresh.isEmpty()) {
Expand All @@ -55,21 +59,21 @@ public Optional<ExpiredLocks> getExpiredImmutableTimestampAndCommitLocks(Optiona
}

public SummarizedLockCheckResult getExpiredImmutableTimestampAndCommitLocksWithFullSummary(
LockToken commitLocksToken) {
Set<LockToken> commitLocksToken) {
Preconditions.checkNotNull(
commitLocksToken,
"commitLocksToken was null, not expected to be in a call to"
+ " getExpiredImmutableTimestampAndCommitLocksWithFullSummary",
SafeArg.of("immutableTimestampLock", immutableTimestampLock));
Optional<ExpiredLocks> expiredLocks = getExpiredImmutableTimestampAndCommitLocks(Optional.of(commitLocksToken));
Optional<ExpiredLocks> expiredLocks = getExpiredImmutableTimestampAndCommitLocks(commitLocksToken);
return SummarizedLockCheckResult.builder()
.expiredLocks(expiredLocks)
.immutableTimestampLock(immutableTimestampLock)
.userProvidedLock(commitLocksToken)
.build();
}

private String getExpiredLocksErrorString(Optional<LockToken> commitLocksToken, Set<LockToken> expiredLocks) {
private String getExpiredLocksErrorString(Set<LockToken> commitLocksToken, Set<LockToken> expiredLocks) {
return "The following immutable timestamp lock was required: " + immutableTimestampLock
+ "; the following commit locks were required: " + commitLocksToken
+ "; the following locks are no longer valid: " + expiredLocks;
Expand All @@ -95,7 +99,7 @@ public interface SummarizedLockCheckResult {

Optional<LockToken> immutableTimestampLock();

LockToken userProvidedLock();
Set<LockToken> userProvidedLock();

static ImmutableSummarizedLockCheckResult.Builder builder() {
return ImmutableSummarizedLockCheckResult.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.palantir.atlasdb.AtlasDbMetricNames;
import com.palantir.atlasdb.util.MetricsManager;
import com.palantir.lock.v2.ClientLockingOptions;
import com.palantir.lock.v2.GetCommitTimestampResponse;
import com.palantir.lock.v2.LockImmutableTimestampResponse;
import com.palantir.lock.v2.LockRequest;
import com.palantir.lock.v2.LockResponse;
Expand Down Expand Up @@ -69,7 +70,7 @@ public long getFreshTimestamp() {
}

@Override
public long getCommitTimestamp(long startTs, LockToken commitLocksToken) {
public GetCommitTimestampResponse getCommitTimestamp(long startTs, LockToken commitLocksToken) {
return executeWithRecord(() -> timelockService.getCommitTimestamp(startTs, commitLocksToken));
}

Expand All @@ -93,6 +94,11 @@ public long getImmutableTimestamp() {
return executeWithRecord(timelockService::getImmutableTimestamp);
}

@Override
public long getCommitImmutableTimestamp() {
return executeWithRecord(timelockService::getCommitImmutableTimestamp);
}

@Override
public LockResponse lock(LockRequest request) {
return executeWithRecord(() -> timelockService.lock(request));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,17 @@
import com.palantir.atlasdb.keyvalue.api.TableReference;
import com.palantir.atlasdb.keyvalue.impl.Cells;
import com.palantir.atlasdb.logging.LoggingArgs;
import com.palantir.atlasdb.transaction.api.DelayedWrite;
import com.palantir.lock.watch.ChangeMetadata;
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.UnsafeArg;
import com.palantir.logsafe.exceptions.SafeIllegalStateException;
import com.palantir.logsafe.logger.SafeLogger;
import com.palantir.logsafe.logger.SafeLoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -44,6 +47,7 @@ class LocalWriteBuffer {

private final ConcurrentMap<TableReference, ConcurrentNavigableMap<Cell, byte[]>> writesByTable =
new ConcurrentHashMap<>();
private final List<DelayedWrite> delayedWritesByTable = Collections.synchronizedList(new ArrayList<>());
private final ConcurrentMap<TableReference, Map<Cell, ChangeMetadata>> metadataByTable = new ConcurrentHashMap<>();
private final ConcurrentMap<TableReference, Object> locksByTable = new ConcurrentHashMap<>();
private final AtomicLong valuesByteCount = new AtomicLong();
Expand Down Expand Up @@ -92,13 +96,21 @@ public void putLocalWritesAndMetadata(
}
}

public void putDelayed(List<DelayedWrite> values) {
delayedWritesByTable.addAll(values);
}

/**
* Returns all local writes that have been buffered.
*/
public ConcurrentMap<TableReference, ConcurrentNavigableMap<Cell, byte[]>> getLocalWrites() {
return writesByTable;
}

public List<DelayedWrite> getDelayedWrites() {
return delayedWritesByTable;
}

/**
* Returns the local writes for cells of the given table.
*/
Expand Down
Loading

0 comments on commit 897d00a

Please sign in to comment.