Skip to content

Commit

Permalink
feat(bucket-assigner): Bucket assigner state machine ADT (#7259)
Browse files Browse the repository at this point in the history
  • Loading branch information
mdaudali committed Sep 5, 2024
1 parent 2268c1e commit bd78b57
Show file tree
Hide file tree
Showing 2 changed files with 225 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* (c) Copyright 2024 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.atlasdb.sweep.asts.bucketingthings;

import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import org.immutables.value.Value;

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes({
@JsonSubTypes.Type(value = ImmutableStart.class, name = BucketAssignerState.Start.TYPE),
@JsonSubTypes.Type(value = ImmutableOpening.class, name = BucketAssignerState.Opening.TYPE),
@JsonSubTypes.Type(
value = ImmutableWaitingUntilCloseable.class,
name = BucketAssignerState.WaitingUntilCloseable.TYPE),
@JsonSubTypes.Type(value = ImmutableClosingFromOpen.class, name = BucketAssignerState.ClosingFromOpen.TYPE),
@JsonSubTypes.Type(value = ImmutableImmediatelyClosing.class, name = BucketAssignerState.ImmediatelyClosing.TYPE),
})
public interface BucketAssignerState {
<T> T accept(BucketAssignerState.Visitor<T> visitor);

static BucketAssignerState.Start start(long startTimestampInclusive) {
return ImmutableStart.of(startTimestampInclusive);
}

static BucketAssignerState.Opening opening(long startTimestampInclusive) {
return ImmutableOpening.of(startTimestampInclusive);
}

static BucketAssignerState.WaitingUntilCloseable waitingUntilCloseable(long startTimestampInclusive) {
return ImmutableWaitingUntilCloseable.of(startTimestampInclusive);
}

static BucketAssignerState.ClosingFromOpen closingFromOpen(
long startTimestampInclusive, long endTimestampExclusive) {
return ImmutableClosingFromOpen.of(startTimestampInclusive, endTimestampExclusive);
}

static BucketAssignerState.ImmediatelyClosing immediatelyClosing(
long startTimestampInclusive, long endTimestampExclusive) {
return ImmutableImmediatelyClosing.of(startTimestampInclusive, endTimestampExclusive);
}

@Value.Immutable
@JsonSerialize(as = ImmutableStart.class)
@JsonDeserialize(as = ImmutableStart.class)
interface Start extends BucketAssignerState {
String TYPE = "start";

@Value.Parameter
long startTimestampInclusive();

@Override
default <T> T accept(BucketAssignerState.Visitor<T> visitor) {
return visitor.visit(this);
}
}

@Value.Immutable
@JsonSerialize(as = ImmutableOpening.class)
@JsonDeserialize(as = ImmutableOpening.class)
interface Opening extends BucketAssignerState {
String TYPE = "opening";

@Value.Parameter
long startTimestampInclusive();

@Override
default <T> T accept(BucketAssignerState.Visitor<T> visitor) {
return visitor.visit(this);
}
}

@Value.Immutable
@JsonSerialize(as = ImmutableWaitingUntilCloseable.class)
@JsonDeserialize(as = ImmutableWaitingUntilCloseable.class)
interface WaitingUntilCloseable extends BucketAssignerState {
String TYPE = "waitingUntilCloseable";

@Value.Parameter
long startTimestampInclusive();

@Override
default <T> T accept(BucketAssignerState.Visitor<T> visitor) {
return visitor.visit(this);
}
}

@Value.Immutable
@JsonSerialize(as = ImmutableClosingFromOpen.class)
@JsonDeserialize(as = ImmutableClosingFromOpen.class)
interface ClosingFromOpen extends BucketAssignerState {
String TYPE = "closingFromOpen";

@Value.Parameter
long startTimestampInclusive();

@Value.Parameter
long endTimestampExclusive();

@Override
default <T> T accept(BucketAssignerState.Visitor<T> visitor) {
return visitor.visit(this);
}
}

@Value.Immutable
@JsonSerialize(as = ImmutableImmediatelyClosing.class)
@JsonDeserialize(as = ImmutableImmediatelyClosing.class)
interface ImmediatelyClosing extends BucketAssignerState {
String TYPE = "immediatelyClosing";

@Value.Parameter
long startTimestampInclusive();

@Value.Parameter
long endTimestampExclusive();

@Override
default <T> T accept(BucketAssignerState.Visitor<T> visitor) {
return visitor.visit(this);
}
}

interface Visitor<T> {
T visit(BucketAssignerState.Start start);

T visit(BucketAssignerState.Opening opening);

T visit(BucketAssignerState.WaitingUntilCloseable waitingUntilCloseable);

T visit(BucketAssignerState.ClosingFromOpen closingFromOpen);

T visit(BucketAssignerState.ImmediatelyClosing immediatelyClosing);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* (c) Copyright 2024 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.atlasdb.sweep.asts.bucketingthings;

import static org.assertj.core.api.Assertions.assertThat;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.io.BaseEncoding;
import com.palantir.conjure.java.serialization.ObjectMappers;
import java.io.IOException;
import java.util.stream.Stream;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

public class BucketAssignerStateTest {
private static final ObjectMapper OBJECT_MAPPER = ObjectMappers.newSmileServerObjectMapper();

// Think _very_ carefully about changing this without a migration.
private static final byte[] START_SERIALIZED =
BaseEncoding.base64().decode("OikKBfqDdHlwZURzdGFydJZzdGFydFRpbWVzdGFtcEluY2x1c2l2ZcL7");
private static final byte[] OPENING_SERIALIZED =
BaseEncoding.base64().decode("OikKBfqDdHlwZUZvcGVuaW5nlnN0YXJ0VGltZXN0YW1wSW5jbHVzaXZlxPs=");
private static final byte[] WAITING_UNTIL_CLOSED_SERIALIZED = BaseEncoding.base64()
.decode("OikKBfqDdHlwZVR3YWl0aW5nVW50aWxDbG9zZWFibGWWc3RhcnRUaW1lc3RhbXBJbmNsdXNpdmXG+w==");
private static final byte[] CLOSING_FROM_OPEN_SERIALIZED = BaseEncoding.base64()
.decode(
"OikKBfqDdHlwZU5jbG9zaW5nRnJvbU9wZW6Wc3RhcnRUaW1lc3RhbXBJbmNsdXNpdmXIlGVuZFRpbWVzdGFtcEV4Y2x1c2l2Zcr7");
private static final byte[] IMMEDIATELY_CLOSING_SERIALIZED = BaseEncoding.base64()
.decode(
"OikKBfqDdHlwZVFpbW1lZGlhdGVseUNsb3NpbmeWc3RhcnRUaW1lc3RhbXBJbmNsdXNpdmXMlGVuZFRpbWVzdGFtcEV4Y2x1c2l2Zc77");

@ParameterizedTest(name = "{0}")
@MethodSource("bucketAssignerStates")
public void canRoundTripSerdeState(BucketAssignerState state, byte[] _unused) throws IOException {
byte[] json = OBJECT_MAPPER.writeValueAsBytes(state);
BucketAssignerState deserialized = OBJECT_MAPPER.readValue(json, BucketAssignerState.class);
assertThat(deserialized).isEqualTo(state);
// Adding instance of check to be _really_ sure, since a mistake here would be very painful.
assertThat(deserialized).isInstanceOf(state.getClass());
}

@ParameterizedTest(name = "{0}")
@MethodSource("bucketAssignerStates")
public void canDeserializeExistingVersion(BucketAssignerState bucketProgress, byte[] serializedForm)
throws IOException {
assertThat(OBJECT_MAPPER.readValue(serializedForm, BucketAssignerState.class))
.isEqualTo(bucketProgress);
}

private static Stream<Arguments> bucketAssignerStates() {
return Stream.of(
Arguments.of(BucketAssignerState.start(1), START_SERIALIZED),
Arguments.of(BucketAssignerState.opening(2), OPENING_SERIALIZED),
Arguments.of(BucketAssignerState.waitingUntilCloseable(3), WAITING_UNTIL_CLOSED_SERIALIZED),
Arguments.of(BucketAssignerState.closingFromOpen(4, 5), CLOSING_FROM_OPEN_SERIALIZED),
Arguments.of(BucketAssignerState.immediatelyClosing(6, 7), IMMEDIATELY_CLOSING_SERIALIZED));
}
}

0 comments on commit bd78b57

Please sign in to comment.