Skip to content

Commit

Permalink
offset progress
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremyk-91 committed Sep 3, 2024
1 parent fba150b commit 8671e7f
Showing 1 changed file with 11 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void setUp() {
public void updateBucketProgressToAtLeastRetriesOnFailure() {
doThrow(GENERIC_RUNTIME_EXCEPTION).doNothing().when(keyValueService).checkAndSet(any());
assertThatCode(() -> bucketProgressStore.updateBucketProgressToAtLeast(
DEFAULT_BUCKET, BucketProgress.createForTimestampOffset(1000L)))
DEFAULT_BUCKET, BucketProgress.createForTimestampProgress(1000L)))
.doesNotThrowAnyException();
verify(keyValueService, times(2)).checkAndSet(any());
}
Expand All @@ -80,7 +80,7 @@ public void updateBucketProgressToAtLeastRetriesOnFailure() {
public void updateBucketProgressToAtLeastDoesNotRetryIndefinitely() {
doThrow(GENERIC_RUNTIME_EXCEPTION).when(keyValueService).checkAndSet(any());
assertThatThrownBy(() -> bucketProgressStore.updateBucketProgressToAtLeast(
DEFAULT_BUCKET, BucketProgress.createForTimestampOffset(1000L)))
DEFAULT_BUCKET, BucketProgress.createForTimestampProgress(1000L)))
.isEqualTo(GENERIC_RUNTIME_EXCEPTION);
verify(keyValueService, times(10)).checkAndSet(any());
}
Expand All @@ -92,10 +92,10 @@ public void updateBucketProgressToAtLeastDoesNotWriteIfInDatabaseProgressIsHighe
DEFAULT_BUCKET_CELL,
Value.create(
BUCKET_PROGRESS_SERIALIZER.serializeProgress(
BucketProgress.createForTimestampOffset(200L)),
BucketProgress.createForTimestampProgress(200L)),
AtlasDbConstants.TRANSACTION_TS)));

BucketProgress bucketProgress = BucketProgress.createForTimestampOffset(100L);
BucketProgress bucketProgress = BucketProgress.createForTimestampProgress(100L);
assertThatCode(() -> bucketProgressStore.updateBucketProgressToAtLeast(DEFAULT_BUCKET, bucketProgress))
.doesNotThrowAnyException();
verify(keyValueService, never()).checkAndSet(any());
Expand All @@ -108,24 +108,24 @@ public void updateBucketProgressToAtLeastDoesNotWriteIfInDatabaseProgressIsEqual
DEFAULT_BUCKET_CELL,
Value.create(
BUCKET_PROGRESS_SERIALIZER.serializeProgress(
BucketProgress.createForTimestampOffset(100L)),
BucketProgress.createForTimestampProgress(100L)),
AtlasDbConstants.TRANSACTION_TS)));

BucketProgress bucketProgress = BucketProgress.createForTimestampOffset(100L);
BucketProgress bucketProgress = BucketProgress.createForTimestampProgress(100L);
assertThatCode(() -> bucketProgressStore.updateBucketProgressToAtLeast(DEFAULT_BUCKET, bucketProgress))
.doesNotThrowAnyException();
verify(keyValueService, never()).checkAndSet(any());
}

@Test
public void updateBucketProgressToAtLeastRetriesWithNewExpectationsIfReceivingCheckAndSetExceptions() {
BucketProgress bucketProgress = BucketProgress.createForTimestampOffset(100L);
BucketProgress bucketProgress = BucketProgress.createForTimestampProgress(100L);

byte[] serializedBucketProgress = BUCKET_PROGRESS_SERIALIZER.serializeProgress(bucketProgress);
byte[] serializedIntermediateProgressOne =
BUCKET_PROGRESS_SERIALIZER.serializeProgress(BucketProgress.createForTimestampOffset(10L));
BUCKET_PROGRESS_SERIALIZER.serializeProgress(BucketProgress.createForTimestampProgress(10L));
byte[] serializedIntermediateProgressTwo =
BUCKET_PROGRESS_SERIALIZER.serializeProgress(BucketProgress.createForTimestampOffset(50L));
BUCKET_PROGRESS_SERIALIZER.serializeProgress(BucketProgress.createForTimestampProgress(50L));
when(keyValueService.get(any(), anyMap()))
.thenReturn(
ImmutableMap.of(),
Expand All @@ -137,9 +137,9 @@ public void updateBucketProgressToAtLeastRetriesWithNewExpectationsIfReceivingCh
Value.create(serializedIntermediateProgressTwo, AtlasDbConstants.TRANSACTION_TS)));

doThrow(createCheckAndSetExceptionForDefaultBucket(
bucketProgress, BucketProgress.createForTimestampOffset(30L)))
bucketProgress, BucketProgress.createForTimestampProgress(30L)))
.doThrow(createCheckAndSetExceptionForDefaultBucket(
bucketProgress, BucketProgress.createForTimestampOffset(70L)))
bucketProgress, BucketProgress.createForTimestampProgress(70L)))
.doNothing()
.when(keyValueService)
.checkAndSet(any());
Expand Down

0 comments on commit 8671e7f

Please sign in to comment.