diff --git a/tests/robustness/model/replay.go b/tests/robustness/model/replay.go index b8ea784683a3..bda6eb2a60e3 100644 --- a/tests/robustness/model/replay.go +++ b/tests/robustness/model/replay.go @@ -36,14 +36,16 @@ func NewReplay(persistedRequests []EtcdRequest) *EtcdReplay { state = newState } return &EtcdReplay{ + Requests: persistedRequests, revisionToEtcdState: revisionToEtcdState, - Events: events, + events: events, } } type EtcdReplay struct { + Requests []EtcdRequest revisionToEtcdState []EtcdState - Events []PersistedEvent + events []PersistedEvent } func (r *EtcdReplay) StateForRevision(revision int64) (EtcdState, error) { @@ -54,7 +56,7 @@ func (r *EtcdReplay) StateForRevision(revision int64) (EtcdState, error) { } func (r *EtcdReplay) EventsForWatch(watch WatchRequest) (events []PersistedEvent) { - for _, e := range r.Events { + for _, e := range r.events { if e.Revision < watch.Revision || !e.Match(watch) { continue } diff --git a/tests/robustness/validate/validate_test.go b/tests/robustness/validate/validate_test.go index d752b8bb2576..bfb03267d7e9 100644 --- a/tests/robustness/validate/validate_test.go +++ b/tests/robustness/validate/validate_test.go @@ -1331,6 +1331,34 @@ func TestValidateWatch(t *testing.T) { putRequest("c", "3"), }, }, + { + name: "Reliable - issue #18089 - pass", + reports: []report.ClientReport{ + { + Watch: []model.WatchOperation{ + { + Request: model.WatchRequest{ + WithPrefix: true, + Revision: 3, + }, + Responses: []model.WatchResponse{ + { + Events: []model.WatchEvent{ + putWatchEvent("b", "2", 4, true), + }, + }, + }, + }, + }, + }, + }, + persistedRequests: []model.EtcdRequest{ + putRequest("a", "1"), + deleteRequest("a"), + putRequest("b", "2"), + compactRequest(3), + }, + }, { name: "Resumable - watch revision from middle event - pass", reports: []report.ClientReport{ @@ -1953,3 +1981,12 @@ func deleteRequest(key string) model.EtcdRequest { Defragment: nil, } } + +func compactRequest(revision int64) model.EtcdRequest { + return model.EtcdRequest{ + Type: model.Compact, + Compact: &model.CompactRequest{ + Revision: revision, + }, + } +} diff --git a/tests/robustness/validate/watch.go b/tests/robustness/validate/watch.go index 506cbeca431f..e270e4118f3b 100644 --- a/tests/robustness/validate/watch.go +++ b/tests/robustness/validate/watch.go @@ -205,6 +205,11 @@ func validateReliable(lg *zap.Logger, replay *model.EtcdReplay, report report.Cl gotEvents = append(gotEvents, event.PersistedEvent) } } + // TODO(https://github.com/etcd-io/etcd/issues/18089): Remove when bug is fixed + detected, newWantEvents := checkIssue18089(lg, report.ClientID, wantEvents, gotEvents, replay) + if detected { + wantEvents = newWantEvents + } if diff := cmp.Diff(wantEvents, gotEvents, cmpopts.IgnoreFields(model.PersistedEvent{}, "IsCreate")); diff != "" { lg.Error("Broke watch guarantee", zap.String("guarantee", "reliable"), zap.Int("client", report.ClientID), zap.String("diff", diff)) err = errBrokeReliable @@ -218,18 +223,22 @@ func validateResumable(lg *zap.Logger, replay *model.EtcdReplay, report report.C if watch.Request.Revision == 0 { continue } - events := replay.EventsForWatch(watch.Request) + wantEvents := replay.EventsForWatch(watch.Request) index := 0 - for index < len(events) && (events[index].Revision < watch.Request.Revision || !events[index].Match(watch.Request)) { + for index < len(wantEvents) && (wantEvents[index].Revision < watch.Request.Revision || !wantEvents[index].Match(watch.Request)) { index++ } - if index == len(events) { + if index == len(wantEvents) { continue } - firstEvent := firstWatchEvent(watch) + gotFirstEvent := firstWatchEvent(watch) // If watch is resumable, first event it gets should the first event that happened after the requested revision. - if firstEvent != nil && events[index] != firstEvent.PersistedEvent { - lg.Error("Broke watch guarantee", zap.String("guarantee", "resumable"), zap.Int("client", report.ClientID), zap.Any("request", watch.Request), zap.Any("got-event", *firstEvent), zap.Any("want-event", events[index])) + if gotFirstEvent != nil && wantEvents[index] != gotFirstEvent.PersistedEvent { + // TODO(https://github.com/etcd-io/etcd/issues/18089): Remove when bug is fixed + if detected, _ := checkIssue18089(lg, report.ClientID, wantEvents, []model.PersistedEvent{gotFirstEvent.PersistedEvent}, replay); detected { + continue + } + lg.Error("Broke watch guarantee", zap.String("guarantee", "resumable"), zap.Int("client", report.ClientID), zap.Any("request", watch.Request), zap.Any("got-event", *gotFirstEvent), zap.Any("want-event", wantEvents[index])) err = errBrokeResumable } } @@ -332,3 +341,41 @@ func firstWatchEvent(op model.WatchOperation) *model.WatchEvent { } return nil } + +func checkIssue18089(lg *zap.Logger, clientID int, want, got []model.PersistedEvent, replay *model.EtcdReplay) (bool, []model.PersistedEvent) { + type keyRevision struct { + Key string + Revision int64 + } + gotKeyRevision := map[keyRevision]struct{}{} + for _, event := range got { + gotKeyRevision[keyRevision{ + Key: event.Key, + Revision: event.Revision, + }] = struct{}{} + } + newWant := []model.PersistedEvent{} + issueDetected := false + for _, event := range want { + _, found := gotKeyRevision[keyRevision{ + Key: event.Key, + Revision: event.Revision, + }] + if !found && event.Type == model.DeleteOperation && matchingCompaction(event.Revision, replay) { + issueDetected = true + lg.Info("Detected issue 18089 still present, missing delete watch event for a compacted revision", zap.Int("client", clientID), zap.Any("missing-event", event)) + continue + } + newWant = append(newWant, event) + } + return issueDetected, newWant +} + +func matchingCompaction(revision int64, replay *model.EtcdReplay) bool { + for _, req := range replay.Requests { + if req.Type == model.Compact && req.Compact.Revision == revision { + return true + } + } + return false +}