Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CEP-15 (C*) - misc accord perf improvements #3558

Open
wants to merge 4 commits into
base: cep-15-accord
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion src/java/org/apache/cassandra/config/AccordSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@

package org.apache.cassandra.config;

import accord.primitives.Routable;
import accord.primitives.Txn;
import accord.primitives.TxnId;
import com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.cassandra.journal.Params;
import org.apache.cassandra.service.consensus.TransactionalMode;

import java.util.concurrent.TimeUnit;

public class AccordSpec
{
public volatile boolean enabled = false;
Expand All @@ -32,7 +37,15 @@ public class AccordSpec

public volatile OptionaldPositiveInt shard_count = OptionaldPositiveInt.UNDEFINED;

public volatile DurationSpec.IntMillisecondsBound recover_delay = new DurationSpec.IntMillisecondsBound(1000);
public volatile DurationSpec.IntMillisecondsBound recover_delay = new DurationSpec.IntMillisecondsBound(5000);
public volatile DurationSpec.IntMillisecondsBound range_sync_recover_delay = new DurationSpec.IntMillisecondsBound(10000);

public long recoveryDelayFor(TxnId txnId, TimeUnit unit)
{
if (txnId.kind() == Txn.Kind.SyncPoint && txnId.domain() == Routable.Domain.Range)
return range_sync_recover_delay.to(unit);
return recover_delay.to(unit);
}

/**
* When a barrier transaction is requested how many times to repeat attempting the barrier before giving up
Expand Down
11 changes: 8 additions & 3 deletions src/java/org/apache/cassandra/schema/Schema.java
Original file line number Diff line number Diff line change
Expand Up @@ -279,10 +279,15 @@ public TableMetadata getTableMetadata(String keyspace, String table)
@Override
public TableMetadata getTableMetadata(TableId id)
{
return ObjectUtils.getFirstNonNull(() -> localKeyspaces.getTableOrViewNullable(id),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

<3 hidden Stream

() -> distributedKeyspaces().getTableOrViewNullable(id),
() -> VirtualKeyspaceRegistry.instance.getTableMetadataNullable(id));
TableMetadata metadata = localKeyspaces.getTableOrViewNullable(id);
if (metadata != null)
return metadata;

metadata = distributedKeyspaces().getTableOrViewNullable(id);
if (metadata != null)
return metadata;

return VirtualKeyspaceRegistry.instance.getTableMetadataNullable(id);
}

public TableMetadata getTableMetadata(Descriptor descriptor)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ public long attemptCoordinationDelay(Node node, SafeCommandStore safeStore, TxnI

// TODO (expected): make this a configurable calculation on normal request latencies (like ContentionStrategy)
long oneSecond = SECONDS.toMicros(1L);
long startTime = mostRecentAttempt.hlc() + DatabaseDescriptor.getAccord().recover_delay.to(MICROSECONDS)
long startTime = mostRecentAttempt.hlc() + DatabaseDescriptor.getAccord().recoveryDelayFor(txnId, MICROSECONDS)
+ (retryCount == 0 ? 0 : random.nextLong(oneSecond << Math.min(retryCount, 4)));

startTime = nonClashingStartTime(startTime, shard == null ? null : shard.nodes, node.id(), oneSecond, random);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import accord.utils.async.AsyncResult;
import accord.utils.async.Observable;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import org.apache.cassandra.service.accord.*;
Expand Down Expand Up @@ -201,30 +202,30 @@ public void onAdd(AccordCachingState<Key, CommandsForKey> state)
return AsyncChains.all(root);
}

private AsyncChain<Set<? extends Key>> findOverlappingKeys(Ranges ranges)
private AsyncChain<List<? extends Key>> findOverlappingKeys(Ranges ranges)
{
if (ranges.isEmpty())
{
// During topology changes some shards may be included with empty ranges
return AsyncChains.success(Collections.emptySet());
return AsyncChains.success(Collections.emptyList());
}

List<AsyncChain<Set<PartitionKey>>> chains = new ArrayList<>(ranges.size());
List<AsyncChain<List<PartitionKey>>> chains = new ArrayList<>(ranges.size());
for (Range range : ranges)
chains.add(findOverlappingKeys(range));
return AsyncChains.reduce(chains, (a, b) -> ImmutableSet.<Key>builder().addAll(a).addAll(b).build());
return AsyncChains.reduce(chains, (a, b) -> ImmutableList.<Key>builderWithExpectedSize(a.size() + b.size()).addAll(a).addAll(b).build());
}

private AsyncChain<Set<PartitionKey>> findOverlappingKeys(Range range)
private AsyncChain<List<PartitionKey>> findOverlappingKeys(Range range)
{
// save to a variable as java gets confused when `.map` is called on the result of asChain
AsyncChain<Set<PartitionKey>> map = Observable.asChain(callback ->
AsyncChain<List<PartitionKey>> map = Observable.asChain(callback ->
AccordKeyspace.findAllKeysBetween(commandStore.id(),
(AccordRoutingKey) range.start(), range.startInclusive(),
(AccordRoutingKey) range.end(), range.endInclusive(),
callback),
Collectors.toSet());
return map.map(s -> ImmutableSet.<PartitionKey>builder().addAll(s).build());
Collectors.toList());
return map.map(ImmutableList::copyOf);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,11 @@ private List<accord.primitives.Range> repairRange(TokenRange range) throws Throw
List<accord.primitives.Range> repairedRanges = new ArrayList<>();
int rangeStepUpdateInterval = ACCORD_REPAIR_RANGE_STEP_UPDATE_INTERVAL.getInt();
RoutingKey remainingStart = range.start();
// TODO (expected): repair ranges should have a configurable lower limit of split size so already small repairs aren't broken up into excessively tiny ones
BigInteger rangeSize = splitter.sizeOf(range);
if (rangeStep == null)
{
BigInteger divide = splitter.divide(rangeSize, 1000);
BigInteger divide = splitter.divide(rangeSize, 10000);
rangeStep = divide.equals(BigInteger.ZERO) ? rangeSize : BigInteger.ONE.max(divide);
}

Expand Down