Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ASTS] Preliminaries I: deleteFromAtomicTable #7197

Open
wants to merge 13 commits into
base: develop
Choose a base branch
from
6 changes: 6 additions & 0 deletions .palantir/revapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,12 @@ acceptedBreaks:
new: "method com.palantir.lock.LockState com.palantir.atlasdb.factory.timelock.TimeoutSensitiveLockRpcClient::getLockState(java.lang.String,\
\ com.palantir.lock.LockDescriptor)"
justification: "LockRpcClient#getLockState is broken"
"0.1121.0":
com.palantir.atlasdb:atlasdb-api:
- code: "java.method.addedToInterface"
new: "method void com.palantir.atlasdb.keyvalue.api.KeyValueService::deleteFromAtomicTable(com.palantir.atlasdb.keyvalue.api.TableReference,\
\ java.util.Set<com.palantir.atlasdb.keyvalue.api.Cell>)"
justification: "internal KVS API"
"0.770.0":
com.palantir.atlasdb:atlasdb-api:
- code: "java.class.removed"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,16 @@ void multiPut(Map<TableReference, ? extends Map<Cell, byte[]>> valuesByTable, lo
*/
void setOnce(TableReference tableRef, Map<Cell, byte[]> values);

/**
* Performs a delete from an atomic table - that is, a table written to by
* {@link #putUnlessExists(TableReference, Map)} or {@link #checkAndSet(CheckAndSetRequest)}. If applied to a
* table that is read to and written from using AtlasDB timestamps, behaviour is undefined.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thought: having TransactionKeyValueService is nice here, because user transactions don't get to call this without doing scary stuff

*
* @param tableRef table to perform atomic deletes from
* @param cells cells to delete
*/
void deleteFromAtomicTable(TableReference tableRef, Set<Cell> cells);

/**
* Check whether CAS is supported. This check can go away when JDBC KVS is deleted.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ public void testCfEqualityChecker() throws TException {
.collect(Collectors.toList()));

assertThat(ColumnFamilyDefinitions.isMatchingCf(
kvs.getCfForTable(NEVER_SEEN, getMetadata(), FOUR_DAYS_IN_SECONDS), clusterSideCf))
kvs.getCfForTable(NEVER_SEEN, getMetadata(), FOUR_DAYS_IN_SECONDS), clusterSideCf))
.as("After serialization and deserialization to database, Cf metadata did not match.")
.isTrue();
}
Expand All @@ -256,8 +256,10 @@ private static String getInternalTestTableName() {
@SuppressWarnings("CompileTimeConstant")
public void shouldNotErrorForTimestampTableWhenCreatingCassandraKvs() {
verify(logger, atLeast(0))
.error(startsWith("Found a table {} that did not have persisted"),
assertArg((Arg<String> arg) -> assertThat(arg.getValue()).doesNotContain("timestamp")));
.error(
startsWith("Found a table {} that did not have persisted"),
assertArg(
(Arg<String> arg) -> assertThat(arg.getValue()).doesNotContain("timestamp")));
}

@Test
Expand Down Expand Up @@ -492,20 +494,20 @@ public void setOnceTest() {

keyValueService.putUnlessExists(userTable, ImmutableMap.of(CELL, sad));
assertThat(keyValueService
.get(userTable, ImmutableMap.of(CELL, Long.MAX_VALUE))
.get(CELL)
.getContents())
.get(userTable, ImmutableMap.of(CELL, Long.MAX_VALUE))
.get(CELL)
.getContents())
.containsExactly(sad);

keyValueService.setOnce(userTable, ImmutableMap.of(CELL, happy));
assertThat(keyValueService
.get(userTable, ImmutableMap.of(CELL, Long.MAX_VALUE))
.get(CELL)
.getContents())
.get(userTable, ImmutableMap.of(CELL, Long.MAX_VALUE))
.get(CELL)
.getContents())
.containsExactly(happy);
assertThat(keyValueService
.getAllTimestamps(userTable, ImmutableSet.of(CELL), Long.MAX_VALUE)
.size())
.getAllTimestamps(userTable, ImmutableSet.of(CELL), Long.MAX_VALUE)
.size())
.isEqualTo(1);
keyValueService.truncateTable(userTable);
}
Expand Down Expand Up @@ -665,9 +667,9 @@ public void testMultiCheckAndSetCannotUpdateAcrossMultipleRows() {
Cell nextTestCell = Cell.create(row(1), column(1));

assertThatThrownBy(() -> keyValueService.multiCheckAndSet(MultiCheckAndSetRequest.newCells(
TEST_TABLE,
firstTestCell.getRowName(),
ImmutableMap.of(firstTestCell, val(0, 0), nextTestCell, val(0, 1)))))
TEST_TABLE,
firstTestCell.getRowName(),
ImmutableMap.of(firstTestCell, val(0, 0), nextTestCell, val(0, 1)))))
.isInstanceOf(SafeIllegalStateException.class)
.hasMessageContaining("Can only update cells in one row.");
}
Expand All @@ -683,7 +685,7 @@ public void testMultiCheckAndSetIndependentlyFails() {
TEST_TABLE, nextTestCell.getRowName(), ImmutableMap.of(nextTestCell, val(0, 1))));

assertThatThrownBy(() -> keyValueService.multiCheckAndSet(MultiCheckAndSetRequest.newCells(
TEST_TABLE, nextTestCell.getRowName(), ImmutableMap.of(nextTestCell, val(0, 2)))))
TEST_TABLE, nextTestCell.getRowName(), ImmutableMap.of(nextTestCell, val(0, 2)))))
.isInstanceOf(MultiCheckAndSetException.class);

MultiCheckAndSetException ex = catchThrowableOfType(
Expand Down Expand Up @@ -808,7 +810,7 @@ private CfDef createDefaultCfDef(String namespace, String tableName) {
.setComment("")
.setColumn_metadata(new ArrayList<>())
.setTriggers(new ArrayList<>())
.setKey_alias(new byte[]{0x6B, 0x65, 0x79})
.setKey_alias(new byte[] {0x6B, 0x65, 0x79})
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for some reason, Spotless applied these manipulations - I believe they don't have any semantic changes.

.setComparator_type("org.apache.cassandra.db.marshal.CompositeType"
+ "(org.apache.cassandra.db.marshal.BytesType,org.apache.cassandra.db.marshal.LongType)")
.setCompaction_strategy_options(new HashMap<>())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import com.palantir.atlasdb.keyvalue.cassandra.RowColumnRangeExtractor.RowColumnRangeResult;
import com.palantir.atlasdb.keyvalue.cassandra.async.client.creation.ClusterFactory.CassandraClusterConfig;
import com.palantir.atlasdb.keyvalue.cassandra.cas.CheckAndSetRunner;
import com.palantir.atlasdb.keyvalue.cassandra.cas.SinglePartitionAtomicTableCellDeleter;
import com.palantir.atlasdb.keyvalue.cassandra.paging.RowGetter;
import com.palantir.atlasdb.keyvalue.cassandra.pool.CassandraServer;
import com.palantir.atlasdb.keyvalue.cassandra.sweep.CandidateRowForSweeping;
Expand Down Expand Up @@ -240,6 +241,7 @@ public void close() {
private final CassandraTableDropper cassandraTableDropper;
private final CassandraTableTruncator cassandraTableTruncator;
private final CheckAndSetRunner checkAndSetRunner;
private final SinglePartitionAtomicTableCellDeleter atomicTableCellDeleter;

private final CassandraTables cassandraTables;

Expand Down Expand Up @@ -462,6 +464,7 @@ private CassandraKeyValueServiceImpl(
wrappingQueryRunner,
mutationTimestampProvider::getSweepSentinelWriteTimestamp);
this.checkAndSetRunner = new CheckAndSetRunner(queryRunner);
this.atomicTableCellDeleter = new SinglePartitionAtomicTableCellDeleter(queryRunner, DELETE_CONSISTENCY);
this.tableMetadata = new CassandraTableMetadata(rangeLoader, cassandraTables, clientPool, wrappingQueryRunner);
this.cassandraTableCreator = new CassandraTableCreator(clientPool, config);
this.cassandraTableTruncator = new CassandraTableTruncator(queryRunner, clientPool);
Expand Down Expand Up @@ -1881,6 +1884,20 @@ public void setOnce(TableReference tableRef, Map<Cell, byte[]> values) {
}
}

@Override
public void deleteFromAtomicTable(TableReference tableRef, Set<Cell> cells) {
clientPool.runWithRetry(client -> {
for (Cell cell : cells) {
try {
atomicTableCellDeleter.deleteFromAtomicTable(client, tableRef, cell);
} catch (TException e) {
throw Throwables.unwrapAndThrowAtlasDbDependencyException(e);
}
}
return null;
});
}

public static Map<ByteString, Map<Cell, byte[]>> partitionPerRow(Map<Cell, byte[]> values) {
return values.entrySet().stream()
.collect(Collectors.groupingBy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import com.google.common.io.BaseEncoding;
import com.google.protobuf.ByteString;
import com.palantir.atlasdb.AtlasDbConstants;
import com.palantir.atlasdb.encoding.PtBytes;
Expand Down Expand Up @@ -46,8 +45,8 @@ public final class CassandraTimestampUtils {
public static final ByteString INVALIDATED_VALUE = ByteString.copyFrom(new byte[] {0});
public static final long INITIAL_VALUE = 10000;

private static final long CASSANDRA_TIMESTAMP = -1;
private static final String ROW_AND_COLUMN_NAME_HEX_STRING = encodeCassandraHexString(ROW_AND_COLUMN_NAME);
private static final String ROW_AND_COLUMN_NAME_HEX_STRING =
CqlUtilities.encodeCassandraHexString(ROW_AND_COLUMN_NAME);
private static final ByteString CQL_SUCCESS = ByteString.copyFrom(new byte[] {1});

@VisibleForTesting
Expand Down Expand Up @@ -187,9 +186,11 @@ private static CqlQuery constructInsertIfNotExistsQuery(String columnName, byte[
.addArgs(
LoggingArgs.internalTableName(AtlasDbConstants.TIMESTAMP_TABLE),
SafeArg.of("rowAndColumnName", ROW_AND_COLUMN_NAME_HEX_STRING),
SafeArg.of("columnName", encodeCassandraHexString(columnName)),
SafeArg.of("atlasTimestamp", CASSANDRA_TIMESTAMP),
SafeArg.of("newValue", encodeCassandraHexBytes(target)))
SafeArg.of("columnName", CqlUtilities.encodeCassandraHexString(columnName)),
SafeArg.of(
"atlasTimestamp",
CqlUtilities.CASSANDRA_REPRESENTATION_OF_ATLASDB_ATOMIC_TABLE_TIMESTAMP),
SafeArg.of("newValue", CqlUtilities.encodeCassandraHexBytes(target)))
.build();
}

Expand All @@ -199,22 +200,16 @@ private static CqlQuery constructUpdateIfEqualQuery(String columnName, byte[] ex
.safeQueryFormat("UPDATE \"%s\" SET value=%s WHERE key=%s AND column1=%s AND column2=%s IF value=%s;")
.addArgs(
LoggingArgs.internalTableName(AtlasDbConstants.TIMESTAMP_TABLE),
SafeArg.of("newValue", encodeCassandraHexBytes(target)),
SafeArg.of("newValue", CqlUtilities.encodeCassandraHexBytes(target)),
SafeArg.of("rowAndColumnName", ROW_AND_COLUMN_NAME_HEX_STRING),
SafeArg.of("columnName", encodeCassandraHexString(columnName)),
SafeArg.of("atlasTimestamp", CASSANDRA_TIMESTAMP),
SafeArg.of("oldValue", encodeCassandraHexBytes(expected)))
SafeArg.of("columnName", CqlUtilities.encodeCassandraHexString(columnName)),
SafeArg.of(
"atlasTimestamp",
CqlUtilities.CASSANDRA_REPRESENTATION_OF_ATLASDB_ATOMIC_TABLE_TIMESTAMP),
SafeArg.of("oldValue", CqlUtilities.encodeCassandraHexBytes(expected)))
.build();
}

private static String encodeCassandraHexString(String string) {
return encodeCassandraHexBytes(PtBytes.toBytes(string));
}

private static String encodeCassandraHexBytes(byte[] bytes) {
return "0x" + BaseEncoding.base16().upperCase().encode(bytes);
}

private static String wrapInQuotes(String tableName) {
return "\"" + tableName + "\"";
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* (c) Copyright 2024 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.atlasdb.keyvalue.cassandra;

import com.google.common.io.BaseEncoding;
import com.palantir.atlasdb.encoding.PtBytes;

public final class CqlUtilities {
public static final long CASSANDRA_REPRESENTATION_OF_ATLASDB_ATOMIC_TABLE_TIMESTAMP = -1;

private CqlUtilities() {
// Utility class
}

public static String encodeCassandraHexString(String string) {
return encodeCassandraHexBytes(PtBytes.toBytes(string));
}

public static String encodeCassandraHexBytes(byte[] bytes) {
return "0x" + BaseEncoding.base16().upperCase().encode(bytes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,16 @@
*/
package com.palantir.atlasdb.keyvalue.cassandra.cas;

import com.google.common.io.BaseEncoding;
import com.palantir.atlasdb.keyvalue.api.CheckAndSetRequest;
import com.palantir.atlasdb.keyvalue.cassandra.CqlQuery;
import com.palantir.atlasdb.keyvalue.cassandra.CqlUtilities;
import com.palantir.atlasdb.keyvalue.cassandra.ImmutableCqlQuery;
import com.palantir.atlasdb.logging.LoggingArgs;
import com.palantir.logsafe.Preconditions;
import com.palantir.logsafe.SafeArg;
import com.palantir.logsafe.UnsafeArg;

final class CheckAndSetQueries {
private static final long CASSANDRA_TIMESTAMP = -1L;
private static final String CASSANDRA_PREFIX = "0x";

private CheckAndSetQueries() {
// Static Factory
}
Expand All @@ -38,45 +35,53 @@ static CqlQuery getQueryForRequest(CheckAndSetRequest request) {

private static CqlQuery insertIfNotExists(CheckAndSetRequest request) {
Preconditions.checkState(
!request.oldValue().isPresent(),
request.oldValue().isEmpty(),
"insertIfNotExists queries should only be made if we don't have an old value");
return ImmutableCqlQuery.builder()
.safeQueryFormat(
"INSERT INTO \"%s\" (key, column1, column2, value)" + " VALUES (%s, %s, %s, %s) IF NOT EXISTS;")
.addArgs(
LoggingArgs.internalTableName(request.table()),
UnsafeArg.of(
"row", encodeCassandraHexString(request.cell().getRowName())),
"row",
CqlUtilities.encodeCassandraHexBytes(
request.cell().getRowName())),
UnsafeArg.of(
"column",
encodeCassandraHexString(request.cell().getColumnName())),
SafeArg.of("cassandraTimestamp", CASSANDRA_TIMESTAMP),
UnsafeArg.of("newValue", encodeCassandraHexString(request.newValue())))
CqlUtilities.encodeCassandraHexBytes(
request.cell().getColumnName())),
SafeArg.of(
"cassandraTimestamp",
CqlUtilities.CASSANDRA_REPRESENTATION_OF_ATLASDB_ATOMIC_TABLE_TIMESTAMP),
UnsafeArg.of("newValue", CqlUtilities.encodeCassandraHexBytes(request.newValue())))
.build();
}

private static CqlQuery updateIfMatching(CheckAndSetRequest request) {
Preconditions.checkState(
request.oldValue().isPresent(),
"updateIfMatching queries should only be made if we do have an old value");
byte[] data = request.newValue();
return ImmutableCqlQuery.builder()
.safeQueryFormat("UPDATE \"%s\" SET value=%s WHERE key=%s AND column1=%s AND column2=%s IF value=%s;")
.addArgs(
LoggingArgs.internalTableName(request.table()),
UnsafeArg.of("newValue", encodeCassandraHexString(request.newValue())),
UnsafeArg.of("newValue", CqlUtilities.encodeCassandraHexBytes(data)),
UnsafeArg.of(
"row", encodeCassandraHexString(request.cell().getRowName())),
"row",
CqlUtilities.encodeCassandraHexBytes(
request.cell().getRowName())),
UnsafeArg.of(
"column",
encodeCassandraHexString(request.cell().getColumnName())),
SafeArg.of("cassandraTimestamp", CASSANDRA_TIMESTAMP),
CqlUtilities.encodeCassandraHexBytes(
request.cell().getColumnName())),
SafeArg.of(
"cassandraTimestamp",
CqlUtilities.CASSANDRA_REPRESENTATION_OF_ATLASDB_ATOMIC_TABLE_TIMESTAMP),
UnsafeArg.of(
"oldValue",
encodeCassandraHexString(request.oldValue().get())))
CqlUtilities.encodeCassandraHexBytes(
request.oldValue().get())))
.build();
}

private static String encodeCassandraHexString(byte[] data) {
return CASSANDRA_PREFIX + BaseEncoding.base16().upperCase().encode(data);
}
}
Loading