-
Notifications
You must be signed in to change notification settings - Fork 512
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add strong read consistency support in ingester for experimental ingest storage #7030
Conversation
c80608a
to
728e961
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
only a few comments on the testing approaches
}() | ||
|
||
// Wait some time and then unblock the Fetch. | ||
time.Sleep(time.Second) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it possible to do this without a Sleep? Sleeps are usually flaky it might end up just not doing what we need it to (also slows down the test).
If my understanding is correct we want to wait so that the Fetch fails a couple of times before we unblock it. So we want to test resiliency during startup. Can we just close a channel in the ControlKey once we get the first error? Or perhaps count the number of invocations and unblock the fetch only after the first few invocations have failed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I start from the assumption that "every sleep in a test is bad" is not the right approach (similarly to "any use of panic() in the code is bad"... well it depends). There are times when a sleep is good enough to get what you want, it's reliable when the sleep time is large enough, and sometimes it's 10x times easier than alternatives. That being said...
We want to unblock the Fetch while the query is waiting on Ingester.enforceReadConsistency()
. If we do as you say, that is failing the first few Fetch requests, well it's not much different than a sleep because Fetch (or ListOffsets, that's the other thing we could fail) are issued in a loop which is completely independent from the query path.
A better approach may be unblock the Fetch right before calling runTestQuery()
. There's still no guarantee that we would get execution in the other we want (at the end, the Fetch may run before the actual query is waiting for consistency) so to make it further reliable we may have to also add a short sleep (oh, welcome back sleep!), or we could ignore the problem assuming that in practice it will be very unlikely to happen.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A better approach may be unblock the Fetch right before calling runTestQuery()
Done here: 79d0a19. Even if there's no strong guarantee that the query will wait for read consistency before we unblock the fetch, I think that in practice it's nearly impossible to get the execution in the wrong order because of the ListOffsets fetched in a loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fair enough, thanks for the explanation. Now that you put it like that the sleep makes slightly more sense than the closed channel before running the query. Both are ok, so happy to keep it as-is
"github.com/grafana/mimir/pkg/util/validation" | ||
) | ||
|
||
func TestIngester_QueryStream_IngestStorageReadConsistency(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this test looks a lot like an end-to-end/integration test rather than a unit test. Do you want to move it to e2e instead? I'd even go as far as running the distributor and writing to it instead of writing to kafka. I can help with doing this change if you want in a follow-up PR
it would also remove the need to share ConsumerGroup, which IMO is an implementation detail of the storage/ingest package
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this test looks a lot like an end-to-end/integration test rather than a unit test. Do you want to move it to e2e instead?
The way I see it is that an integration test shouldn't mock any component, while this test requires to mock the Kafka cluster, reason why I think it's appropriate for a unit test (also, we do pretty much the same kind of testing in most of the functions of ingester_test.go
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would also remove the need to share ConsumerGroup, which IMO is an implementation detail of the storage/ingest package
I agree on this. I also was in doubt. For now I've hardcoded the consumer group name in testkafka
package. We can worry about it when we'll need multiple ones: c6af570
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this test looks a lot like an end-to-end/integration test rather than a unit test. Do you want to move it to e2e instead?
The way I see it is that an integration test shouldn't mock any component, while this test requires to mock the Kafka cluster, reason why I think it's appropriate for a unit test (also, we do pretty much the same kind of testing in most of the functions of
ingester_test.go
).
ok, I'm not feeling super strong about this. And testing is usually more conventions-based, so let's keep as-is
pkg/mimir/runtime_config_test.go
Outdated
} | ||
|
||
func TestRuntimeConfigLoader_ShouldLoadEmptyFile(t *testing.T) { | ||
validation.SetDefaultLimitsForYAMLUnmarshalling(getDefaultLimits()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can add a TestMain() and call this once for the whole package as opposed to calling it in every test. Something along the lines of
func TestMain(m *testing.M) {
validation.SetDefaultLimitsForYAMLUnmarshalling(getDefaultLimits())
m.Run()
}
same applies to pkg/util/validation/limits_test.go
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we can do it. The tricky thing is that we have some tests overriding the default limits, so we have to remember to revert back the default limits in the test cleanup: 3156fcf
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
pkg/util/validation/limits_test.go
Outdated
@@ -776,11 +813,16 @@ func TestExtensions(t *testing.T) { | |||
}) | |||
|
|||
t.Run("default limits does not interfere with tenants extensions", func(t *testing.T) { | |||
// Reset the default limits at the end of the test. | |||
t.Cleanup(func() { | |||
SetDefaultLimitsForYAMLUnmarshalling(getDefaultLimits()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left this comment on the individual commit too, but it didn't show on the PR
Should we move this closed to the other SetDefaultLimitsForYAMLUnmarshalling
so it's easier to spot what's being cleaned up. Currently they're 10 lines apart
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's a good idea. Done here: 0077fd1
…st storage Signed-off-by: Marco Pracucci <marco@pracucci.com>
Signed-off-by: Marco Pracucci <marco@pracucci.com>
Signed-off-by: Marco Pracucci <marco@pracucci.com>
Signed-off-by: Marco Pracucci <marco@pracucci.com>
Signed-off-by: Marco Pracucci <marco@pracucci.com>
…dConsistency Signed-off-by: Marco Pracucci <marco@pracucci.com>
Signed-off-by: Marco Pracucci <marco@pracucci.com>
79d0a19
to
0077fd1
Compare
What this PR does
This is another step forward in the experimental ingest storage conditional read-after-write implementation. In this PR I'm introducing a per-tenant config option to set the default read consistency guarantee (used only when ingest storage is enabled). When the read consistency is set to
strong
, then in the ingester we wait until all the data has been replayed from Kafka up until that point before executing a query.Notes:
QueryStream()
. Reason why I would suggest to not test every function for now is because we may change the strong consistency implementation later, moving it from the ingester to an upper layer in the read path.Which issue(s) this PR fixes or relates to
N/A
Checklist
CHANGELOG.md
updated - the order of entries should be[CHANGE]
,[FEATURE]
,[ENHANCEMENT]
,[BUGFIX]
.about-versioning.md
updated with experimental features.