Skip to content

Commit

Permalink
Do not fail when query contains duplicate row with local write (#7293)
Browse files Browse the repository at this point in the history
  • Loading branch information
pkoenig10 committed Sep 19, 2024
1 parent 7d3b69c commit 4be9234
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 66 deletions.
26 changes: 26 additions & 0 deletions .palantir/revapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,32 @@ acceptedBreaks:
\ com.palantir.atlasdb.keyvalue.cassandra.Blacklist, com.palantir.atlasdb.keyvalue.cassandra.pool.CassandraClientPoolMetrics,\
\ com.palantir.atlasdb.keyvalue.cassandra.CassandraClientMetrics)"
justification: "Unused outside atlas"
"0.1152.0":
com.palantir.atlasdb:atlasdb-api:
- code: "java.method.parameterTypeChanged"
old: "parameter java.util.NavigableMap<byte[], com.palantir.atlasdb.keyvalue.api.RowResult<byte[]>>\
\ com.palantir.atlasdb.transaction.api.snapshot.AutoDelegate_KeyValueSnapshotReader::getRows(com.palantir.atlasdb.keyvalue.api.TableReference,\
\ java.lang.Iterable<byte[]>, com.palantir.atlasdb.keyvalue.api.ColumnSelection,\
\ ===com.google.common.collect.ImmutableMap.Builder<com.palantir.atlasdb.keyvalue.api.Cell,\
\ byte[]>===)"
new: "parameter java.util.NavigableMap<byte[], com.palantir.atlasdb.keyvalue.api.RowResult<byte[]>>\
\ com.palantir.atlasdb.transaction.api.snapshot.AutoDelegate_KeyValueSnapshotReader::getRows(com.palantir.atlasdb.keyvalue.api.TableReference,\
\ java.lang.Iterable<byte[]>, com.palantir.atlasdb.keyvalue.api.ColumnSelection,\
\ ===java.util.Map<com.palantir.atlasdb.keyvalue.api.Cell, byte[]>===)"
justification: "KeyValueSnapshotReader is a relatively new API and is not widely\
\ used"
- code: "java.method.parameterTypeChanged"
old: "parameter java.util.NavigableMap<byte[], com.palantir.atlasdb.keyvalue.api.RowResult<byte[]>>\
\ com.palantir.atlasdb.transaction.api.snapshot.KeyValueSnapshotReader::getRows(com.palantir.atlasdb.keyvalue.api.TableReference,\
\ java.lang.Iterable<byte[]>, com.palantir.atlasdb.keyvalue.api.ColumnSelection,\
\ ===com.google.common.collect.ImmutableMap.Builder<com.palantir.atlasdb.keyvalue.api.Cell,\
\ byte[]>===)"
new: "parameter java.util.NavigableMap<byte[], com.palantir.atlasdb.keyvalue.api.RowResult<byte[]>>\
\ com.palantir.atlasdb.transaction.api.snapshot.KeyValueSnapshotReader::getRows(com.palantir.atlasdb.keyvalue.api.TableReference,\
\ java.lang.Iterable<byte[]>, com.palantir.atlasdb.keyvalue.api.ColumnSelection,\
\ ===java.util.Map<com.palantir.atlasdb.keyvalue.api.Cell, byte[]>===)"
justification: "KeyValueSnapshotReader is a relatively new API and is not widely\
\ used"
"0.770.0":
com.palantir.atlasdb:atlasdb-api:
- code: "java.class.removed"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.palantir.atlasdb.transaction.api.snapshot;

import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListenableFuture;
import com.palantir.atlasdb.keyvalue.api.Cell;
import com.palantir.atlasdb.keyvalue.api.ColumnSelection;
Expand Down Expand Up @@ -51,5 +50,5 @@ NavigableMap<byte[], RowResult<byte[]>> getRows(
TableReference tableReference,
Iterable<byte[]> rows,
ColumnSelection columnSelection,
ImmutableMap.Builder<Cell, byte[]> resultCollector);
Map<Cell, byte[]> localWrites);
}
Original file line number Diff line number Diff line change
Expand Up @@ -504,16 +504,11 @@ private NavigableMap<byte[], RowResult<byte[]>> getRowsInternal(
return AbstractTransaction.EMPTY_SORTED_ROWS;
}
hasReads = true;
ImmutableSortedMap.Builder<Cell, byte[]> resultCollector = ImmutableSortedMap.naturalOrder();
NavigableMap<Cell, byte[]> writes = localWriteBuffer.getLocalWrites().get(tableRef);
if (writes != null && !writes.isEmpty()) {
for (byte[] row : rows) {
extractLocalWritesForRow(resultCollector, writes, row, columnSelection);
}
}
Map<Cell, byte[]> writesForRows = extractLocalWritesForRows(writes, rows, columnSelection);

NavigableMap<byte[], RowResult<byte[]>> results =
keyValueSnapshotReader.getRows(tableRef, rows, columnSelection, resultCollector);
keyValueSnapshotReader.getRows(tableRef, rows, columnSelection, writesForRows);
long getRowsMillis = TimeUnit.NANOSECONDS.toMillis(timer.stop());
if (perfLogger.isDebugEnabled()) {
perfLogger.debug(
Expand Down Expand Up @@ -926,53 +921,55 @@ public SortedMap<byte[], RowResult<byte[]>> getRowsIgnoringLocalWrites(

// can't skip lock check as we don't know how many cells to expect for the column selection
validatePreCommitRequirementsOnNonExhaustiveReadIfNecessary(tableRef, getStartTimestamp());
return filterRowResults(tableRef, rawResults, ImmutableMap.builderWithExpectedSize(rawResults.size()));
}

private NavigableMap<byte[], RowResult<byte[]>> filterRowResults(
TableReference tableRef, Map<Cell, Value> rawResults, ImmutableMap.Builder<Cell, byte[]> resultCollector) {
ImmutableMap<Cell, byte[]> collected = resultCollector
.putAll(getWithPostFilteringSync(tableRef, rawResults, Value.GET_VALUE))
.buildOrThrow();
Map<Cell, byte[]> filterDeletedValues = removeEmptyColumns(collected, tableRef);
return RowResults.viewOfSortedMap(Cells.breakCellsUpByRow(filterDeletedValues));
Map<Cell, byte[]> results = Maps.newHashMapWithExpectedSize(rawResults.size());
getWithPostFilteringSync(tableRef, rawResults, Value.GET_VALUE).forEach(e -> {
results.put(e.getKey(), e.getValue());
});

removeEmptyColumns(tableRef, results);

return RowResults.viewOfSortedMap(Cells.breakCellsUpByRow(results));
}

private Map<Cell, byte[]> removeEmptyColumns(Map<Cell, byte[]> unfiltered, TableReference tableReference) {
Map<Cell, byte[]> filtered = Maps.filterValues(unfiltered, Predicates.not(Value::isTombstone));
// compute filtered size without traversing lazily transformed map `size()` as that allocates entries
long filteredCount = unfiltered.values().stream()
.filter(Predicates.not(Value::isTombstone))
.count();
private void removeEmptyColumns(TableReference tableReference, Map<Cell, byte[]> results) {
int unfilteredSize = results.size();

results.values().removeIf(Value::isTombstone);

long emptyValues = unfiltered.size() - filteredCount;
int emptyValues = unfilteredSize - results.size();
snapshotEventRecorder.recordFilteredEmptyValues(tableReference, emptyValues);
TraceStatistics.incEmptyValues(emptyValues);

return filtered;
}

/**
* This will add any local writes for this row to the result map.
* This will return any local writes for these rows.
* <p>
* If an empty value was written as a delete, this will also be included in the map.
*/
private void extractLocalWritesForRow(
@Output ImmutableMap.Builder<Cell, byte[]> result,
SortedMap<Cell, byte[]> writes,
byte[] row,
ColumnSelection columnSelection) {
Cell lowCell = Cells.createSmallestCellForRow(row);
for (Map.Entry<Cell, byte[]> entry : writes.tailMap(lowCell).entrySet()) {
Cell cell = entry.getKey();
if (!Arrays.equals(row, cell.getRowName())) {
break;
}
if (columnSelection.allColumnsSelected()
|| columnSelection.getSelectedColumns().contains(cell.getColumnName())) {
result.put(cell, entry.getValue());
private Map<Cell, byte[]> extractLocalWritesForRows(
SortedMap<Cell, byte[]> writes, Iterable<byte[]> rows, ColumnSelection columnSelection) {
if (writes == null || writes.isEmpty()) {
return Map.of();
}

Map<Cell, byte[]> results = new HashMap<>();

for (byte[] row : rows) {
Cell lowCell = Cells.createSmallestCellForRow(row);
for (Map.Entry<Cell, byte[]> entry : writes.tailMap(lowCell).entrySet()) {
Cell cell = entry.getKey();
if (!Arrays.equals(row, cell.getRowName())) {
break;
}
if (columnSelection.allColumnsSelected()
|| columnSelection.getSelectedColumns().contains(cell.getColumnName())) {
results.put(cell, entry.getValue());
}
}
}

return Collections.unmodifiableMap(results);
}

@Override
Expand Down Expand Up @@ -1019,14 +1016,14 @@ ListenableFuture<Map<Cell, byte[]>> getInternal(
}
hasReads = true;

Map<Cell, byte[]> result = new HashMap<>();
Map<Cell, byte[]> results = new HashMap<>();
Map<Cell, byte[]> writes = localWriteBuffer.getLocalWrites().get(tableRef);
long numberOfNonDeleteLocalWrites = 0;
if (writes != null && !writes.isEmpty()) {
for (Cell cell : cells) {
byte[] value = writes.get(cell);
if (value != null) {
result.put(cell, value);
results.put(cell, value);
if (value != PtBytes.EMPTY_BYTE_ARRAY) {
numberOfNonDeleteLocalWrites++;
}
Expand All @@ -1036,20 +1033,20 @@ ListenableFuture<Map<Cell, byte[]>> getInternal(

// We don't need to read any cells that were written locally.
long expectedNumberOfPresentCellsToFetch = numberOfExpectedPresentCells - numberOfNonDeleteLocalWrites;
Set<Cell> cellsToFetch = Sets.difference(cells, result.keySet());
Set<Cell> cellsToFetch = Sets.difference(cells, results.keySet());
ListenableFuture<Map<Cell, byte[]>> initialResults = keyValueSnapshotReader.getAsync(tableRef, cellsToFetch);
return Futures.transform(
initialResults,
fromKeyValueService -> {
result.putAll(fromKeyValueService);
results.putAll(fromKeyValueService);

long getMillis = TimeUnit.NANOSECONDS.toMillis(timer.stop());
if (perfLogger.isDebugEnabled()) {
perfLogger.debug(
"Snapshot transaction get cells (some possibly deleted)",
LoggingArgs.tableRef(tableRef),
SafeArg.of("numberOfCells", cells.size()),
SafeArg.of("numberOfCellsRetrieved", result.size()),
SafeArg.of("numberOfCellsRetrieved", results.size()),
SafeArg.of("getOperation", operationName),
SafeArg.of("durationMillis", getMillis));
}
Expand All @@ -1060,7 +1057,9 @@ ListenableFuture<Map<Cell, byte[]>> getInternal(
fromKeyValueService.size() == expectedNumberOfPresentCellsToFetch;
validatePreCommitRequirementsOnReadIfNecessary(
tableRef, getStartTimestamp(), allPossibleCellsReadAndPresent);
return removeEmptyColumns(result, tableRef);
removeEmptyColumns(tableRef, results);

return Collections.unmodifiableMap(results);
},
MoreExecutors.directExecutor());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.palantir.atlasdb.transaction.impl.snapshot;

import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
Expand Down Expand Up @@ -118,12 +117,22 @@ public NavigableMap<byte[], RowResult<byte[]>> getRows(
TableReference tableReference,
Iterable<byte[]> rows,
ColumnSelection columnSelection,
ImmutableMap.Builder<Cell, byte[]> resultCollector) {
Map<Cell, byte[]> localWrites) {
Map<Cell, Value> rawResults = new HashMap<>(transactionKeyValueService.getRows(
tableReference, rows, columnSelection, startTimestampSupplier.getAsLong()));

// We don't need to do work postFiltering if we have a write locally.
rawResults.keySet().removeAll(resultCollector.buildOrThrow().keySet());
return filterRowResults(tableReference, rawResults, resultCollector);
rawResults.keySet().removeAll(localWrites.keySet());

Map<Cell, byte[]> results = Maps.newHashMapWithExpectedSize(rawResults.size());
getWithPostFilteringSync(tableReference, rawResults, Value.GET_VALUE).forEach(e -> {
results.put(e.getKey(), e.getValue());
});
results.putAll(localWrites);

removeEmptyColumns(tableReference, results);

return RowResults.viewOfSortedMap(Cells.breakCellsUpByRow(results));
}

private ListenableFuture<Map<Cell, byte[]>> getInternal(TableReference tableReference, Set<Cell> cells) {
Expand Down Expand Up @@ -356,28 +365,19 @@ private boolean rollbackOtherTransaction(long startTs) {
}
}

private NavigableMap<byte[], RowResult<byte[]>> filterRowResults(
TableReference tableRef, Map<Cell, Value> rawResults, ImmutableMap.Builder<Cell, byte[]> resultCollector) {
ImmutableMap<Cell, byte[]> collected = resultCollector
.putAll(getWithPostFilteringSync(tableRef, rawResults, Value.GET_VALUE))
.buildOrThrow();
Map<Cell, byte[]> filterDeletedValues = removeEmptyColumns(collected, tableRef);
return RowResults.viewOfSortedMap(Cells.breakCellsUpByRow(filterDeletedValues));
}

private <T> Collection<Map.Entry<Cell, T>> getWithPostFilteringSync(
TableReference tableRef, Map<Cell, Value> rawResults, Function<Value, T> transformer) {
return AtlasFutures.getUnchecked(getWithPostFilteringAsync(tableRef, rawResults, transformer));
}

private Map<Cell, byte[]> removeEmptyColumns(Map<Cell, byte[]> unfiltered, TableReference tableReference) {
Map<Cell, byte[]> filtered = Maps.filterValues(unfiltered, Predicates.not(Value::isTombstone));
private void removeEmptyColumns(TableReference tableReference, Map<Cell, byte[]> results) {
int unfilteredSize = results.size();

int emptyValues = unfiltered.size() - filtered.size();
results.values().removeIf(Value::isTombstone);

int emptyValues = unfilteredSize - results.size();
metricRecorder.recordFilteredEmptyValues(tableReference, emptyValues);
TraceStatistics.incEmptyValues(emptyValues);

return filtered;
}

private static boolean isSweepSentinel(Value value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1574,6 +1574,20 @@ public void getRowsWithDuplicateQueries() {
assertThat(readRows).hasSize(1);
}

@Test
public void getRowsWithDuplicateQueriesOfLocalWrites() {
Transaction tx = startTransaction();
byte[] row0 = row(0);
byte[] anotherRow0 = row(0);
byte[] col0 = column(0);
tx.put(TEST_TABLE, ImmutableMap.of(Cell.create(row0, col0), value(0)));

SortedMap<byte[], RowResult<byte[]>> readRows =
tx.getRows(TEST_TABLE, ImmutableList.of(row0, anotherRow0), ColumnSelection.all());
assertThat(readRows.firstKey()).containsExactly(row0);
assertThat(readRows).hasSize(1);
}

@Test
public void getRowsAppliesColumnSelection() {
Transaction tx = startTransaction();
Expand Down
6 changes: 6 additions & 0 deletions changelog/@unreleased/pr-7293.v2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
type: fix
fix:
description: Fix bug that causes queries with duplicate rows to fail when the duplicate
row has a local write.
links:
- https://github.com/palantir/atlasdb/pull/7293

0 comments on commit 4be9234

Please sign in to comment.