-
Notifications
You must be signed in to change notification settings - Fork 9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(bucket-assigner): Bucket assigner state machine ADT #7259
feat(bucket-assigner): Bucket assigner state machine ADT #7259
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Main question on this one is whether we need the bucket number in the state machine - I thought we did, though maybe you're planning on tracking it separately?
@JsonSubTypes.Type(value = ImmutableOpen.class, name = BucketAssignerState.Open.TYPE), | ||
@JsonSubTypes.Type(value = ImmutableWaitingClose.class, name = BucketAssignerState.WaitingClose.TYPE), | ||
@JsonSubTypes.Type(value = ImmutableCloseFromOpen.class, name = BucketAssignerState.CloseFromOpen.TYPE), | ||
@JsonSubTypes.Type(value = ImmutableImmediateClose.class, name = BucketAssignerState.ImmediateClose.TYPE), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For 2, 4 and 5: do these states describe the action that is about to happen, or what has happened? I'd probably use Opening
, ClosingFromOpen
, ImmediateClosing
or similar - if I'm not mistaken these are the stages where we might have partially succeeded the fan-out PUEs
|
||
@ParameterizedTest | ||
@MethodSource("bucketAssignerStates") | ||
public void canRoundTripSerdeState(BucketAssignerState state) throws JsonProcessingException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As done on some of the Bucket Storage pieces, I think for something like this it's probably worth even storing a desired/"golden" serialized form and ensuring those deserialize consistently
long startTimestampInclusive(); | ||
|
||
@Value.Parameter | ||
long endTimestampExclusive(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no action: confirmed endTimestamp
is only needed on these 2
default <T> T accept(BucketAssignerState.Visitor<T> visitor) { | ||
return visitor.visit(this); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need the bucket number to write at, or is this tracked somewhere else? I'm basically worried about something like us being in immediate close state, sleeping, and then having all records of buckets from before us getting cleaned up
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tracked separately - I store (bucketIdentifier, state). I'm not strongly tied to storing the bucket ID in the state, but I think we may need to use elsewhere, and didn't want to have to parse each state to get the bucketId
e9e9c4c
to
166c74c
Compare
a4219f1
to
41ef74b
Compare
166c74c
to
855559b
Compare
41ef74b
to
05260bc
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 modulo nit/comment
public interface BucketAssignerState { | ||
<T> T accept(BucketAssignerState.Visitor<T> visitor); | ||
|
||
static BucketAssignerState.Start start(long startTimestampInclusive) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
discussed offline: Probably let's have BucketAssignerState.X
on all the states here
855559b
to
cd85474
Compare
cd85474
to
23fe030
Compare
Generate changelog in
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
General
Before this PR:
Our previous assumptions that a bucket could just be a fine partition was invalidated by the sheer load on one of our internal products, where processing one fine partition at a time would mean we'd never keep up.
Instead, we've decided to bucket the timestamps into 10 minute windows. Each shard will have the same timestamp range for a given bucket identifier (this is not necessary for correctness, but a simplification for implementation that is currently sufficient).
This PR implements the state machine ADT that will be used to keep track of where we're at in the bucket assigning process.
In particular, we have the following states:
For more information, read the internal RFCs.
After this PR:
==COMMIT_MSG==
==COMMIT_MSG==
Priority: P2
Concerns / possible downsides (what feedback would you like?):
Not with this one, tbh.
Is documentation needed?:
Compatibility
Does this PR create any API breaks (e.g. at the Java or HTTP layers) - if so, do we have compatibility?:
No
Does this PR change the persisted format of any data - if so, do we have forward and backward compatibility?:
No
The code in this PR may be part of a blue-green deploy. Can upgrades from previous versions safely coexist? (Consider restarts of blue or green nodes.):
Yes
Does this PR rely on statements being true about other products at a deployment - if so, do we have correct product dependencies on these products (or other ways of verifying that these statements are true)?:
No
Does this PR need a schema migration?
No
Testing and Correctness
What, if any, assumptions are made about the current state of the world? If they change over time, how will we find out?:
None
What was existing testing like? What have you done to improve it?:
Added tests for serde. It may appear trivial, but it did actually catch a mistake where I missed one set of serde tags!
If this PR contains complex concurrent or asynchronous code, is it correct? The onus is on the PR writer to demonstrate this.:
N/A
If this PR involves acquiring locks or other shared resources, how do we ensure that these are always released?:
N/A
Execution
How would I tell this PR works in production? (Metrics, logs, etc.):
Serde works without failure
Has the safety of all log arguments been decided correctly?:
N/A
Will this change significantly affect our spending on metrics or logs?:
N/A
How would I tell that this PR does not work in production? (monitors, etc.):
Serde fails explicitly (great), or we start deserialising into the wrong state (BAD, hence the tests)
If this PR does not work as expected, how do I fix that state? Would rollback be straightforward?:
N/A
If the above plan is more complex than “recall and rollback”, please tag the support PoC here (if it is the end of the week, tag both the current and next PoC):
N/A
Scale
Would this PR be expected to pose a risk at scale? Think of the shopping product at our largest stack.:
No
Would this PR be expected to perform a large number of database calls, and/or expensive database calls (e.g., row range scans, concurrent CAS)?:
Not here.
Would this PR ever, with time and scale, become the wrong thing to do - and if so, how would we know that we need to do something differently?:
Not this one
Development Process
Where should we start reviewing?:
BucketAssignerState
If this PR is in excess of 500 lines excluding versions lock-files, why does it not make sense to split it?:
N/A
Please tag any other people who should be aware of this PR:
@jeremyk-91
@sverma30
@raiju