Skip to content

Commit

Permalink
Immutable commit timestamp and delayed writes.
Browse files Browse the repository at this point in the history
  • Loading branch information
jkozlowski committed Sep 9, 2024
1 parent bd78b57 commit b97b416
Show file tree
Hide file tree
Showing 74 changed files with 1,548 additions and 450 deletions.
28 changes: 28 additions & 0 deletions .palantir/revapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,34 @@ acceptedBreaks:
- code: "java.method.removed"
old: "method void com.palantir.atlasdb.keyvalue.api.watch.NoOpLockWatchManager::removeTransactionStateFromCache(long)"
justification: "Internal API"
"0.1142.0":
com.palantir.atlasdb:atlasdb-api:
- code: "java.annotation.removed"
old: "parameter void com.palantir.atlasdb.transaction.api.precommit.PreCommitRequirementValidator::throwIfImmutableTsOrCommitLocksExpired(===com.palantir.lock.v2.LockToken===)"
new: "parameter void com.palantir.atlasdb.transaction.api.precommit.PreCommitRequirementValidator::throwIfImmutableTsOrCommitLocksExpired(===java.util.Set<com.palantir.lock.v2.LockToken>===)"
justification: "Prototyping"
- code: "java.annotation.removed"
old: "parameter void com.palantir.atlasdb.transaction.api.precommit.PreCommitRequirementValidator::throwIfPreCommitRequirementsNotMet(===com.palantir.lock.v2.LockToken===,\
\ long)"
new: "parameter void com.palantir.atlasdb.transaction.api.precommit.PreCommitRequirementValidator::throwIfPreCommitRequirementsNotMet(===java.util.Set<com.palantir.lock.v2.LockToken>===,\
\ long)"
justification: "Prototyping"
- code: "java.method.addedToInterface"
new: "method long com.palantir.atlasdb.transaction.api.TransactionManager::getCommitImmutableTimestamp()"
justification: "Prototyping"
- code: "java.method.addedToInterface"
new: "method void com.palantir.atlasdb.transaction.api.Transaction::putDelayed(java.util.List<com.palantir.atlasdb.transaction.api.DelayedWrite>)"
justification: "Prototyping"
- code: "java.method.parameterTypeChanged"
old: "parameter void com.palantir.atlasdb.transaction.api.precommit.PreCommitRequirementValidator::throwIfImmutableTsOrCommitLocksExpired(===com.palantir.lock.v2.LockToken===)"
new: "parameter void com.palantir.atlasdb.transaction.api.precommit.PreCommitRequirementValidator::throwIfImmutableTsOrCommitLocksExpired(===java.util.Set<com.palantir.lock.v2.LockToken>===)"
justification: "Prototyping"
- code: "java.method.parameterTypeChanged"
old: "parameter void com.palantir.atlasdb.transaction.api.precommit.PreCommitRequirementValidator::throwIfPreCommitRequirementsNotMet(===com.palantir.lock.v2.LockToken===,\
\ long)"
new: "parameter void com.palantir.atlasdb.transaction.api.precommit.PreCommitRequirementValidator::throwIfPreCommitRequirementsNotMet(===java.util.Set<com.palantir.lock.v2.LockToken>===,\
\ long)"
justification: "Prototyping"
"0.770.0":
com.palantir.atlasdb:atlasdb-api:
- code: "java.class.removed"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* (c) Copyright 2024 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.atlasdb.transaction.api;

import com.palantir.atlasdb.keyvalue.api.Cell;
import com.palantir.atlasdb.keyvalue.api.TableReference;
import java.util.Map;
import java.util.function.LongFunction;

/**
* Curious creature, but it allows the user to delay deciding the particular cell to write to until the actual commit,
* at which point the user will be provided with a unique long value that can be used to generate the cell.
*
* This is a very specific feature that not many people probably require, or should try to use.
*
* Right now the interface allows you to do TOO MUCH, you don't even have to specify the tables you write to.
* This can be improved and locked down, but this allows for maximum experimentation.
*/
@FunctionalInterface
public interface DelayedWrite extends LongFunction<Map<TableReference, Map<Cell, byte[]>>> {}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.palantir.lock.watch.ChangeMetadata;
import com.palantir.util.result.Result;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
Expand Down Expand Up @@ -323,6 +324,9 @@ Stream<BatchingVisitable<RowResult<byte[]>>> getRangesLazy(
@Idempotent
void putWithMetadata(TableReference tableRef, Map<Cell, ValueAndChangeMetadata> valuesAndMetadata);

@Idempotent
void putDelayed(List<DelayedWrite> values);

/**
* Deletes values from the key-value store.
* @param tableRef the table from which to delete the values
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,8 @@ <T, C extends PreCommitCondition, E extends Exception> T runTaskWithConditionRea
*/
long getImmutableTimestamp();

long getCommitImmutableTimestamp();

/**
* Returns the lock service used by this transaction manager.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import com.palantir.atlasdb.transaction.api.TransactionFailedException;
import com.palantir.lock.v2.LockToken;
import java.util.Map;
import javax.annotation.Nullable;
import java.util.Set;

public interface PreCommitRequirementValidator {
/**
Expand All @@ -41,10 +41,10 @@ void throwIfPreCommitConditionInvalidAtCommitOnWriteTransaction(
* user pre-commit condition is no longer valid, or possibly because of other internal state such as commit
* locks having expired.
*/
void throwIfPreCommitRequirementsNotMet(@Nullable LockToken commitLocksToken, long timestamp);
void throwIfPreCommitRequirementsNotMet(Set<LockToken> commitLocksToken, long timestamp);

/**
* Throws a {@link TransactionFailedException} if the immutable timestamp lock or commit locks have expired.
*/
void throwIfImmutableTsOrCommitLocksExpired(@Nullable LockToken commitLocksToken);
void throwIfImmutableTsOrCommitLocksExpired(Set<LockToken> commitLocksToken);
}
1 change: 1 addition & 0 deletions atlasdb-autobatch/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ dependencies {
implementation 'com.palantir.conjure.java.api:errors'
implementation 'com.palantir.safe-logging:preconditions'
implementation 'com.palantir.tracing:tracing'
implementation 'com.palantir.tracing:tracing-api'
implementation 'io.dropwizard.metrics:metrics-core'
implementation project(':commons-executors')

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
import com.palantir.logsafe.logger.SafeLogger;
import com.palantir.logsafe.logger.SafeLoggerFactory;
import com.palantir.tracing.Observability;
import com.palantir.tracing.Tracer;
import com.palantir.tracing.Tracers;
import com.palantir.tracing.api.SpanType;
import java.time.Duration;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -97,7 +100,25 @@ public static <O> AutobatcherBuilder<SupplierKey, O> coalescing(Supplier<O> supp
*/
public static <I, O> AutobatcherBuilder<I, O> independent(Consumer<List<BatchElement<I, O>>> batchFunction) {
return new AutobatcherBuilder<>(parameters -> new IndependentBatchingEventHandler<>(
maybeWrapWithTimeout(batchFunction, parameters), parameters.batchSize()));
maybeWrapWithTimeout(
input -> {
long start = System.nanoTime();
try {
Tracer.initTraceWithSpan(
Observability.SAMPLE,
Tracers.randomId(),
parameters.safeLoggablePurpose(),
SpanType.LOCAL);
batchFunction.accept(input);
} finally {
log.info(
"Finished batch",
SafeArg.of("duration", Duration.ofNanos(System.nanoTime() - start)));
Tracer.fastCompleteSpan();
}
},
parameters),
parameters.batchSize()));
}

private static <I, O> Consumer<List<BatchElement<I, O>>> maybeWrapWithTimeout(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.palantir.atlasdb.keyvalue.cassandra.pool.CassandraClientPoolMetrics;
import com.palantir.atlasdb.keyvalue.cassandra.pool.CassandraServer;
import com.palantir.atlasdb.keyvalue.cassandra.pool.CassandraService;
import com.palantir.atlasdb.tracing.Tracing;
import com.palantir.atlasdb.util.MetricsManager;
import com.palantir.common.base.FunctionCheckedException;
import com.palantir.common.concurrent.InitializeableScheduledExecutorServiceSupplier;
Expand All @@ -42,6 +43,7 @@
import com.palantir.logsafe.logger.SafeLogger;
import com.palantir.logsafe.logger.SafeLoggerFactory;
import com.palantir.refreshable.Refreshable;
import com.palantir.tracing.CloseableTracer;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -635,36 +637,45 @@ public <V, K extends Exception> V runWithRetryOnServer(
RetryableCassandraRequest<V, K> req = new RetryableCassandraRequest<>(specifiedServer, fn);

while (true) {
if (log.isTraceEnabled()) {
log.trace("Running function on host {}.", SafeArg.of("server", req.getCassandraServer()));
}
CassandraClientPoolingContainer hostPool = getPreferredHostOrFallBack(req);
try (CloseableTracer trace =
Tracing.startLocalTrace("CassandraClientPoolImpl.runWithRetryOnServer", tagConsumer -> {
tagConsumer.integer("try", req.getNumberOfAttempts());
})) {
if (log.isTraceEnabled()) {
log.trace("Running function on host {}.", SafeArg.of("server", req.getCassandraServer()));
}
CassandraClientPoolingContainer hostPool = getPreferredHostOrFallBack(req);

try {
V response = runWithPooledResourceRecordingMetrics(hostPool, req.getFunction());
removeFromBlacklistAfterResponse(hostPool.getCassandraServer());
return response;
} catch (Exception ex) {
exceptionHandler.handleExceptionFromRequest(req, hostPool.getCassandraServer(), ex);
try {
V response = runWithPooledResourceRecordingMetrics(hostPool, req.getFunction());
removeFromBlacklistAfterResponse(hostPool.getCassandraServer());
return response;
} catch (Exception ex) {
exceptionHandler.handleExceptionFromRequest(req, hostPool.getCassandraServer(), ex);
}
}
}
}

private <V, K extends Exception> CassandraClientPoolingContainer getPreferredHostOrFallBack(
RetryableCassandraRequest<V, K> req) {
CassandraClientPoolingContainer hostPool = cassandra.getPools().get(req.getCassandraServer());

if (blacklist.contains(req.getCassandraServer()) || hostPool == null || req.shouldGiveUpOnPreferredHost()) {
CassandraServer previousHost = hostPool == null ? req.getCassandraServer() : hostPool.getCassandraServer();
Optional<CassandraClientPoolingContainer> hostPoolCandidate = cassandra.getRandomGoodHostForPredicate(
address -> !req.alreadyTriedOnHost(address), req.getTriedHosts());
hostPool = hostPoolCandidate.orElseGet(cassandra::getRandomGoodHost);
log.warn(
"Randomly redirected a query intended for host {} to {}.",
SafeArg.of("previousHost", previousHost),
SafeArg.of("randomHost", hostPool.getCassandraServer()));
try (CloseableTracer trace =
Tracing.startLocalTrace("CassandraClientPoolImpl.getPreferredHostOfFallBack", tagConsumer -> {})) {
CassandraClientPoolingContainer hostPool = cassandra.getPools().get(req.getCassandraServer());

if (blacklist.contains(req.getCassandraServer()) || hostPool == null || req.shouldGiveUpOnPreferredHost()) {
CassandraServer previousHost =
hostPool == null ? req.getCassandraServer() : hostPool.getCassandraServer();
Optional<CassandraClientPoolingContainer> hostPoolCandidate = cassandra.getRandomGoodHostForPredicate(
address -> !req.alreadyTriedOnHost(address), req.getTriedHosts());
hostPool = hostPoolCandidate.orElseGet(cassandra::getRandomGoodHost);
log.warn(
"Randomly redirected a query intended for host {} to {}.",
SafeArg.of("previousHost", previousHost),
SafeArg.of("randomHost", hostPool.getCassandraServer()));
}
return hostPool;
}
return hostPool;
}

@Override
Expand Down Expand Up @@ -693,15 +704,20 @@ private void removeFromBlacklistAfterResponse(CassandraServer cassandraServer) {
private <V, K extends Exception> V runWithPooledResourceRecordingMetrics(
CassandraClientPoolingContainer hostPool, FunctionCheckedException<CassandraClient, V, K> fn) throws K {

metrics.recordRequestOnHost(hostPool);
try {
return hostPool.runWithPooledResource(fn);
} catch (Exception e) {
metrics.recordExceptionOnHost(hostPool);
if (CassandraRequestExceptionHandler.isConnectionException(e)) {
metrics.recordConnectionExceptionOnHost(hostPool);
try (CloseableTracer trace = Tracing.startLocalTrace(
"CassandraClientPoolImpl.runWithPooledResourceRecordingMetrics", tagConsumer -> {
tagConsumer.accept("host", hostPool.getCassandraServer().cassandraHostName());
})) {
metrics.recordRequestOnHost(hostPool);
try {
return hostPool.runWithPooledResource(fn);
} catch (Exception e) {
metrics.recordExceptionOnHost(hostPool);
if (CassandraRequestExceptionHandler.isConnectionException(e)) {
metrics.recordConnectionExceptionOnHost(hostPool);
}
throw e;
}
throw e;
}
}

Expand Down
Loading

0 comments on commit b97b416

Please sign in to comment.