From 07011575dba0805013611497023382a21b446a8d Mon Sep 17 00:00:00 2001 From: Blake Eggleston Date: Mon, 9 Sep 2024 14:14:40 -0700 Subject: [PATCH 1/4] fix repair range splitting --- .../apache/cassandra/service/accord/repair/AccordRepair.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/java/org/apache/cassandra/service/accord/repair/AccordRepair.java b/src/java/org/apache/cassandra/service/accord/repair/AccordRepair.java index 18fb045475df..51497792cd43 100644 --- a/src/java/org/apache/cassandra/service/accord/repair/AccordRepair.java +++ b/src/java/org/apache/cassandra/service/accord/repair/AccordRepair.java @@ -124,10 +124,11 @@ private List repairRange(TokenRange range) throws Throw List repairedRanges = new ArrayList<>(); int rangeStepUpdateInterval = ACCORD_REPAIR_RANGE_STEP_UPDATE_INTERVAL.getInt(); RoutingKey remainingStart = range.start(); + // TODO (expected): repair ranges should have a configurable lower limit of split size so already small repairs aren't broken up into excessively tiny ones BigInteger rangeSize = splitter.sizeOf(range); if (rangeStep == null) { - BigInteger divide = splitter.divide(rangeSize, 1000); + BigInteger divide = splitter.divide(rangeSize, 10000); rangeStep = divide.equals(BigInteger.ZERO) ? rangeSize : BigInteger.ONE.max(divide); } From be02bb65d09e6ea881369a8a36ceeae840ce0728 Mon Sep 17 00:00:00 2001 From: Blake Eggleston Date: Wed, 18 Sep 2024 13:52:55 -0700 Subject: [PATCH 2/4] streamline table metadata fetching --- src/java/org/apache/cassandra/schema/Schema.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/java/org/apache/cassandra/schema/Schema.java b/src/java/org/apache/cassandra/schema/Schema.java index 4c572c7cf7b3..422ce995fda0 100644 --- a/src/java/org/apache/cassandra/schema/Schema.java +++ b/src/java/org/apache/cassandra/schema/Schema.java @@ -279,10 +279,15 @@ public TableMetadata getTableMetadata(String keyspace, String table) @Override public TableMetadata getTableMetadata(TableId id) { - return ObjectUtils.getFirstNonNull(() -> localKeyspaces.getTableOrViewNullable(id), - () -> distributedKeyspaces().getTableOrViewNullable(id), - () -> VirtualKeyspaceRegistry.instance.getTableMetadataNullable(id)); + TableMetadata metadata = localKeyspaces.getTableOrViewNullable(id); + if (metadata != null) + return metadata; + metadata = distributedKeyspaces().getTableOrViewNullable(id); + if (metadata != null) + return metadata; + + return VirtualKeyspaceRegistry.instance.getTableMetadataNullable(id); } public TableMetadata getTableMetadata(Descriptor descriptor) From 9555bcede56b1434add22c4eeac4730415a12e43 Mon Sep 17 00:00:00 2001 From: Blake Eggleston Date: Wed, 18 Sep 2024 13:54:57 -0700 Subject: [PATCH 3/4] remove unnecessary set building from overlapping key loading --- .../service/accord/async/AsyncLoader.java | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java b/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java index 87183603d6b0..4edd9932fe50 100644 --- a/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java +++ b/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java @@ -27,6 +27,7 @@ import accord.utils.async.AsyncResult; import accord.utils.async.Observable; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import org.apache.cassandra.service.accord.*; @@ -201,30 +202,30 @@ public void onAdd(AccordCachingState state) return AsyncChains.all(root); } - private AsyncChain> findOverlappingKeys(Ranges ranges) + private AsyncChain> findOverlappingKeys(Ranges ranges) { if (ranges.isEmpty()) { // During topology changes some shards may be included with empty ranges - return AsyncChains.success(Collections.emptySet()); + return AsyncChains.success(Collections.emptyList()); } - List>> chains = new ArrayList<>(ranges.size()); + List>> chains = new ArrayList<>(ranges.size()); for (Range range : ranges) chains.add(findOverlappingKeys(range)); - return AsyncChains.reduce(chains, (a, b) -> ImmutableSet.builder().addAll(a).addAll(b).build()); + return AsyncChains.reduce(chains, (a, b) -> ImmutableList.builderWithExpectedSize(a.size() + b.size()).addAll(a).addAll(b).build()); } - private AsyncChain> findOverlappingKeys(Range range) + private AsyncChain> findOverlappingKeys(Range range) { // save to a variable as java gets confused when `.map` is called on the result of asChain - AsyncChain> map = Observable.asChain(callback -> + AsyncChain> map = Observable.asChain(callback -> AccordKeyspace.findAllKeysBetween(commandStore.id(), (AccordRoutingKey) range.start(), range.startInclusive(), (AccordRoutingKey) range.end(), range.endInclusive(), callback), - Collectors.toSet()); - return map.map(s -> ImmutableSet.builder().addAll(s).build()); + Collectors.toList()); + return map.map(ImmutableList::copyOf); } @VisibleForTesting From cc8fe9190914b1cbe449e951c32d3fc2012e5c0c Mon Sep 17 00:00:00 2001 From: Blake Eggleston Date: Thu, 19 Sep 2024 13:59:06 -0700 Subject: [PATCH 4/4] add separate recover delay for range sync points and adjust defaults --- .../org/apache/cassandra/config/AccordSpec.java | 15 ++++++++++++++- .../cassandra/service/accord/api/AccordAgent.java | 2 +- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/src/java/org/apache/cassandra/config/AccordSpec.java b/src/java/org/apache/cassandra/config/AccordSpec.java index 102ae68b67c7..8c8cbcb2976e 100644 --- a/src/java/org/apache/cassandra/config/AccordSpec.java +++ b/src/java/org/apache/cassandra/config/AccordSpec.java @@ -18,10 +18,15 @@ package org.apache.cassandra.config; +import accord.primitives.Routable; +import accord.primitives.Txn; +import accord.primitives.TxnId; import com.fasterxml.jackson.annotation.JsonIgnore; import org.apache.cassandra.journal.Params; import org.apache.cassandra.service.consensus.TransactionalMode; +import java.util.concurrent.TimeUnit; + public class AccordSpec { public volatile boolean enabled = false; @@ -32,7 +37,15 @@ public class AccordSpec public volatile OptionaldPositiveInt shard_count = OptionaldPositiveInt.UNDEFINED; - public volatile DurationSpec.IntMillisecondsBound recover_delay = new DurationSpec.IntMillisecondsBound(1000); + public volatile DurationSpec.IntMillisecondsBound recover_delay = new DurationSpec.IntMillisecondsBound(5000); + public volatile DurationSpec.IntMillisecondsBound range_sync_recover_delay = new DurationSpec.IntMillisecondsBound(10000); + + public long recoveryDelayFor(TxnId txnId, TimeUnit unit) + { + if (txnId.kind() == Txn.Kind.SyncPoint && txnId.domain() == Routable.Domain.Range) + return range_sync_recover_delay.to(unit); + return recover_delay.to(unit); + } /** * When a barrier transaction is requested how many times to repeat attempting the barrier before giving up diff --git a/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java b/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java index 441277e7574d..bf351c323fda 100644 --- a/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java +++ b/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java @@ -186,7 +186,7 @@ public long attemptCoordinationDelay(Node node, SafeCommandStore safeStore, TxnI // TODO (expected): make this a configurable calculation on normal request latencies (like ContentionStrategy) long oneSecond = SECONDS.toMicros(1L); - long startTime = mostRecentAttempt.hlc() + DatabaseDescriptor.getAccord().recover_delay.to(MICROSECONDS) + long startTime = mostRecentAttempt.hlc() + DatabaseDescriptor.getAccord().recoveryDelayFor(txnId, MICROSECONDS) + (retryCount == 0 ? 0 : random.nextLong(oneSecond << Math.min(retryCount, 4))); startTime = nonClashingStartTime(startTime, shard == null ? null : shard.nodes, node.id(), oneSecond, random);