From 1663600bec8b8006fb303db8256b0cd362d37786 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Mon, 19 Jun 2023 14:12:25 +0200 Subject: [PATCH] tests/robustness: Validate stale get requests by replaying etcd state Signed-off-by: Marek Siarkowicz --- tests/robustness/model/deterministic.go | 24 ++--- tests/robustness/model/deterministic_test.go | 4 +- tests/robustness/model/non_deterministic.go | 30 +++--- .../model/non_deterministic_test.go | 2 +- tests/robustness/model/replay.go | 99 +++++++++++++++++++ tests/robustness/report.go | 7 +- tests/robustness/traffic/client.go | 13 +-- tests/robustness/validate/operations.go | 80 +++++++++++++-- tests/robustness/validate/validate.go | 11 +-- tests/robustness/validate/watch.go | 17 +++- 10 files changed, 229 insertions(+), 58 deletions(-) create mode 100644 tests/robustness/model/replay.go diff --git a/tests/robustness/model/deterministic.go b/tests/robustness/model/deterministic.go index 3a88ab4a503..c2741808bc3 100644 --- a/tests/robustness/model/deterministic.go +++ b/tests/robustness/model/deterministic.go @@ -47,12 +47,12 @@ var DeterministicModel = porcupine.Model{ return string(data) }, Step: func(st interface{}, in interface{}, out interface{}) (bool, interface{}) { - var s etcdState + var s EtcdState err := json.Unmarshal([]byte(st.(string)), &s) if err != nil { panic(err) } - ok, s := s.Step(in.(EtcdRequest), out.(EtcdResponse)) + ok, s := s.apply(in.(EtcdRequest), out.(EtcdResponse)) data, err := json.Marshal(s) if err != nil { panic(err) @@ -64,20 +64,20 @@ var DeterministicModel = porcupine.Model{ }, } -type etcdState struct { +type EtcdState struct { Revision int64 KeyValues map[string]ValueRevision KeyLeases map[string]int64 Leases map[int64]EtcdLease } -func (s etcdState) Step(request EtcdRequest, response EtcdResponse) (bool, etcdState) { - newState, modelResponse := s.step(request) +func (s EtcdState) apply(request EtcdRequest, response EtcdResponse) (bool, EtcdState) { + newState, modelResponse := s.Step(request) return Match(MaybeEtcdResponse{EtcdResponse: response}, modelResponse), newState } -func freshEtcdState() etcdState { - return etcdState{ +func freshEtcdState() EtcdState { + return EtcdState{ Revision: 1, KeyValues: map[string]ValueRevision{}, KeyLeases: map[string]int64{}, @@ -85,8 +85,8 @@ func freshEtcdState() etcdState { } } -// step handles a successful request, returning updated state and response it would generate. -func (s etcdState) step(request EtcdRequest) (etcdState, MaybeEtcdResponse) { +// Step handles a successful request, returning updated state and response it would generate. +func (s EtcdState) Step(request EtcdRequest) (EtcdState, MaybeEtcdResponse) { newKVs := map[string]ValueRevision{} for k, v := range s.KeyValues { newKVs[k] = v @@ -185,7 +185,7 @@ func (s etcdState) step(request EtcdRequest) (etcdState, MaybeEtcdResponse) { } } -func (s etcdState) getRange(key string, options RangeOptions) RangeResponse { +func (s EtcdState) getRange(key string, options RangeOptions) RangeResponse { response := RangeResponse{ KVs: []KeyValue{}, } @@ -217,7 +217,7 @@ func (s etcdState) getRange(key string, options RangeOptions) RangeResponse { return response } -func detachFromOldLease(s etcdState, key string) etcdState { +func detachFromOldLease(s EtcdState, key string) EtcdState { if oldLeaseId, ok := s.KeyLeases[key]; ok { delete(s.Leases[oldLeaseId].Keys, key) delete(s.KeyLeases, key) @@ -225,7 +225,7 @@ func detachFromOldLease(s etcdState, key string) etcdState { return s } -func attachToNewLease(s etcdState, leaseID int64, key string) etcdState { +func attachToNewLease(s EtcdState, leaseID int64, key string) EtcdState { s.KeyLeases[key] = leaseID s.Leases[leaseID].Keys[key] = leased return s diff --git a/tests/robustness/model/deterministic_test.go b/tests/robustness/model/deterministic_test.go index 7a9ff0ab2d9..b8ae14e679f 100644 --- a/tests/robustness/model/deterministic_test.go +++ b/tests/robustness/model/deterministic_test.go @@ -33,12 +33,12 @@ func TestModelDeterministic(t *testing.T) { if op.expectFailure == ok { t.Logf("state: %v", state) t.Errorf("Unexpected operation result, expect: %v, got: %v, operation: %s", !op.expectFailure, ok, DeterministicModel.DescribeOperation(op.req, op.resp.EtcdResponse)) - var loadedState etcdState + var loadedState EtcdState err := json.Unmarshal([]byte(state.(string)), &loadedState) if err != nil { t.Fatalf("Failed to load state: %v", err) } - _, resp := loadedState.step(op.req) + _, resp := loadedState.Step(op.req) t.Errorf("Response diff: %s", cmp.Diff(op.resp, resp)) break } diff --git a/tests/robustness/model/non_deterministic.go b/tests/robustness/model/non_deterministic.go index ba89a7438b3..24ebbe299e2 100644 --- a/tests/robustness/model/non_deterministic.go +++ b/tests/robustness/model/non_deterministic.go @@ -40,7 +40,7 @@ var NonDeterministicModel = porcupine.Model{ if err != nil { panic(err) } - ok, states := states.Step(in.(EtcdRequest), out.(MaybeEtcdResponse)) + ok, states := states.apply(in.(EtcdRequest), out.(MaybeEtcdResponse)) data, err := json.Marshal(states) if err != nil { panic(err) @@ -52,27 +52,27 @@ var NonDeterministicModel = porcupine.Model{ }, } -type nonDeterministicState []etcdState +type nonDeterministicState []EtcdState -func (states nonDeterministicState) Step(request EtcdRequest, response MaybeEtcdResponse) (bool, nonDeterministicState) { +func (states nonDeterministicState) apply(request EtcdRequest, response MaybeEtcdResponse) (bool, nonDeterministicState) { var newStates nonDeterministicState switch { case response.Err != nil: - newStates = states.stepFailedRequest(request) + newStates = states.stepFailedResponse(request) case response.PartialResponse: - newStates = states.stepPartialRequest(request, response.EtcdResponse.Revision) + newStates = states.applyResponseRevision(request, response.EtcdResponse.Revision) default: - newStates = states.stepSuccessfulRequest(request, response.EtcdResponse) + newStates = states.applySuccessfulResponse(request, response.EtcdResponse) } return len(newStates) > 0, newStates } -// stepFailedRequest duplicates number of states by considering request persisted and lost. -func (states nonDeterministicState) stepFailedRequest(request EtcdRequest) nonDeterministicState { +// stepFailedResponse duplicates number of states by considering both cases, request was persisted and request was lost. +func (states nonDeterministicState) stepFailedResponse(request EtcdRequest) nonDeterministicState { newStates := make(nonDeterministicState, 0, len(states)*2) for _, s := range states { newStates = append(newStates, s) - newState, _ := s.step(request) + newState, _ := s.Step(request) if !reflect.DeepEqual(newState, s) { newStates = append(newStates, newState) } @@ -80,11 +80,11 @@ func (states nonDeterministicState) stepFailedRequest(request EtcdRequest) nonDe return newStates } -// stepPartialRequest filters possible states by leaving ony states that would return proper revision. -func (states nonDeterministicState) stepPartialRequest(request EtcdRequest, responseRevision int64) nonDeterministicState { +// applyResponseRevision filters possible states by leaving ony states that would return proper revision. +func (states nonDeterministicState) applyResponseRevision(request EtcdRequest, responseRevision int64) nonDeterministicState { newStates := make(nonDeterministicState, 0, len(states)) for _, s := range states { - newState, modelResponse := s.step(request) + newState, modelResponse := s.Step(request) if modelResponse.Revision == responseRevision { newStates = append(newStates, newState) } @@ -92,11 +92,11 @@ func (states nonDeterministicState) stepPartialRequest(request EtcdRequest, resp return newStates } -// stepSuccessfulRequest filters possible states by leaving ony states that would respond correctly. -func (states nonDeterministicState) stepSuccessfulRequest(request EtcdRequest, response EtcdResponse) nonDeterministicState { +// applySuccessfulResponse filters possible states by leaving ony states that would respond correctly. +func (states nonDeterministicState) applySuccessfulResponse(request EtcdRequest, response EtcdResponse) nonDeterministicState { newStates := make(nonDeterministicState, 0, len(states)) for _, s := range states { - newState, modelResponse := s.step(request) + newState, modelResponse := s.Step(request) if Match(modelResponse, MaybeEtcdResponse{EtcdResponse: response}) { newStates = append(newStates, newState) } diff --git a/tests/robustness/model/non_deterministic_test.go b/tests/robustness/model/non_deterministic_test.go index e0d5c8edc8b..014d4b39dfb 100644 --- a/tests/robustness/model/non_deterministic_test.go +++ b/tests/robustness/model/non_deterministic_test.go @@ -339,7 +339,7 @@ func TestModelNonDeterministic(t *testing.T) { t.Fatalf("Failed to load state: %v", err) } for i, s := range loadedState { - _, resp := s.step(op.req) + _, resp := s.Step(op.req) t.Errorf("For state %d, response diff: %s", i, cmp.Diff(op.resp, resp)) } break diff --git a/tests/robustness/model/replay.go b/tests/robustness/model/replay.go new file mode 100644 index 00000000000..67e4aea3097 --- /dev/null +++ b/tests/robustness/model/replay.go @@ -0,0 +1,99 @@ +// Copyright 2023 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package model + +import ( + "fmt" +) + +func NewReplay(eventHistory []WatchEvent) *EtcdReplay { + var lastEventRevision int64 = 1 + for _, event := range eventHistory { + if event.Revision > lastEventRevision && event.Revision != lastEventRevision+1 { + panic("Replay requires a complete event history") + } + lastEventRevision = event.Revision + } + return &EtcdReplay{ + eventHistory: eventHistory, + } +} + +type EtcdReplay struct { + eventHistory []WatchEvent + + // Cached state and event index used for it's calculation + cachedState *EtcdState + eventHistoryIndex int +} + +func (r *EtcdReplay) StateForRevision(revision int64) (EtcdState, error) { + if revision < 1 { + return EtcdState{}, fmt.Errorf("invalid revision: %d", revision) + } + if r.cachedState == nil || r.cachedState.Revision > revision { + r.reset() + } + + for r.eventHistoryIndex < len(r.eventHistory) && r.cachedState.Revision < revision { + nextRequest, nextRevision, nextIndex := r.next() + newState, _ := r.cachedState.Step(nextRequest) + if newState.Revision != nextRevision { + return EtcdState{}, fmt.Errorf("model returned different revision than one present in event history, model: %d, event: %d", newState.Revision, nextRevision) + } + r.cachedState = &newState + r.eventHistoryIndex = nextIndex + } + if r.eventHistoryIndex > len(r.eventHistory) && r.cachedState.Revision < revision { + return EtcdState{}, fmt.Errorf("requested revision higher then available in even history, requested: %d, model: %d", revision, r.cachedState.Revision) + } + return *r.cachedState, nil +} + +func (r *EtcdReplay) reset() { + state := freshEtcdState() + r.cachedState = &state + r.eventHistoryIndex = 0 +} + +func (r *EtcdReplay) next() (request EtcdRequest, revision int64, index int) { + revision = r.eventHistory[r.eventHistoryIndex].Revision + index = r.eventHistoryIndex + operations := []EtcdOperation{} + for r.eventHistory[index].Revision == revision { + operations = append(operations, r.eventHistory[index].Op) + index++ + } + return EtcdRequest{ + Type: Txn, + Txn: &TxnRequest{ + OperationsOnSuccess: operations, + }, + }, revision, index +} + +func operationToRequest(op EtcdOperation) EtcdRequest { + return EtcdRequest{ + Type: Txn, + Txn: &TxnRequest{ + OperationsOnSuccess: []EtcdOperation{op}, + }, + } +} + +type WatchEvent struct { + Op EtcdOperation + Revision int64 +} diff --git a/tests/robustness/report.go b/tests/robustness/report.go index 4e995acbdb0..8250cb4baeb 100644 --- a/tests/robustness/report.go +++ b/tests/robustness/report.go @@ -34,7 +34,7 @@ type report struct { lg *zap.Logger clus *e2e.EtcdProcessCluster clientReports []traffic.ClientReport - visualizeHistory func(path string) + visualizeHistory func(path string) error } func testResultsDirectory(t *testing.T) string { @@ -89,7 +89,10 @@ func (r *report) Report(t *testing.T, force bool) { } } if r.visualizeHistory != nil { - r.visualizeHistory(filepath.Join(path, "history.html")) + err := r.visualizeHistory(filepath.Join(path, "history.html")) + if err != nil { + t.Error(err) + } } } diff --git a/tests/robustness/traffic/client.go b/tests/robustness/traffic/client.go index 98bc83f510f..7db8182dbff 100644 --- a/tests/robustness/traffic/client.go +++ b/tests/robustness/traffic/client.go @@ -46,22 +46,17 @@ type RecordingClient struct { } type WatchResponse struct { - Events []WatchEvent + Events []model.WatchEvent IsProgressNotify bool Revision int64 Time time.Duration } type TimedWatchEvent struct { - WatchEvent + model.WatchEvent Time time.Duration } -type WatchEvent struct { - Op model.EtcdOperation - Revision int64 -} - func NewClient(endpoints []string, ids identity.Provider, baseTime time.Time) (*RecordingClient, error) { cc, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, @@ -259,7 +254,7 @@ func ToWatchResponse(r clientv3.WatchResponse, baseTime time.Time) WatchResponse return resp } -func toWatchEvent(event clientv3.Event) WatchEvent { +func toWatchEvent(event clientv3.Event) model.WatchEvent { var op model.OperationType switch event.Type { case mvccpb.PUT: @@ -269,7 +264,7 @@ func toWatchEvent(event clientv3.Event) WatchEvent { default: panic(fmt.Sprintf("Unexpected event type: %s", event.Type)) } - return WatchEvent{ + return model.WatchEvent{ Revision: event.Kv.ModRevision, Op: model.EtcdOperation{ Type: op, diff --git a/tests/robustness/validate/operations.go b/tests/robustness/validate/operations.go index 9189af12675..b83189c5f8d 100644 --- a/tests/robustness/validate/operations.go +++ b/tests/robustness/validate/operations.go @@ -15,28 +15,92 @@ package validate import ( + "fmt" + "reflect" + "sort" "testing" "time" "github.com/anishathalye/porcupine" + "github.com/google/go-cmp/cmp" "go.uber.org/zap" "go.etcd.io/etcd/tests/v3/robustness/model" ) -func validateOperationHistoryAndReturnVisualize(t *testing.T, lg *zap.Logger, operations []porcupine.Operation) (visualize func(basepath string)) { - linearizable, info := porcupine.CheckOperationsVerbose(model.NonDeterministicModel, operations, 5*time.Minute) - if linearizable == porcupine.Illegal { +func validateOperationsAndVisualize(t *testing.T, lg *zap.Logger, operations []porcupine.Operation, eventHistory []model.WatchEvent) (visualize func(basepath string) error) { + const timeout = 5 * time.Minute + lg.Info("Validating linearizable operations", zap.Duration("timeout", timeout)) + result, visualize := validateLinearizableOperationAndVisualize(lg, operations, timeout) + switch result { + case porcupine.Illegal: + t.Error("Linearization failed for provided operations") + return + case porcupine.Unknown: t.Error("Model is not linearizable") + return + case porcupine.Ok: + t.Log("Linearization passed") + default: + t.Fatalf("Unknown Linearization") } - if linearizable == porcupine.Unknown { - t.Error("Linearization timed out") - } - return func(path string) { + lg.Info("Validating serializable operations") + // TODO: Use linearization result instead of event history to get order of events + // This is currently impossible as porcupine doesn't expose operation order created during linearization. + validateSerializableOperations(t, operations, eventHistory) + return visualize +} + +func validateLinearizableOperationAndVisualize(lg *zap.Logger, operations []porcupine.Operation, timeout time.Duration) (result porcupine.CheckResult, visualize func(basepath string) error) { + linearizable, info := porcupine.CheckOperationsVerbose(model.NonDeterministicModel, operations, timeout) + return linearizable, func(path string) error { lg.Info("Saving visualization", zap.String("path", path)) err := porcupine.VisualizePath(model.NonDeterministicModel, info, path) if err != nil { - t.Errorf("Failed to visualize, err: %v", err) + return fmt.Errorf("failed to visualize, err: %v", err) } + return nil + } +} + +func validateSerializableOperations(t *testing.T, operations []porcupine.Operation, totalEventHistory []model.WatchEvent) { + staleReads := filterSerializableReads(operations) + if len(staleReads) == 0 { + return + } + sort.Slice(staleReads, func(i, j int) bool { + return staleReads[i].Input.(model.EtcdRequest).Range.Revision < staleReads[j].Input.(model.EtcdRequest).Range.Revision + }) + replay := model.NewReplay(totalEventHistory) + for _, read := range staleReads { + request := read.Input.(model.EtcdRequest) + response := read.Output.(model.MaybeEtcdResponse) + validateSerializableOperation(t, replay, request, response) + } +} + +func filterSerializableReads(operations []porcupine.Operation) []porcupine.Operation { + resp := []porcupine.Operation{} + for _, op := range operations { + request := op.Input.(model.EtcdRequest) + if request.Type == model.Range && request.Range.Revision != 0 { + resp = append(resp, op) + } + } + return resp +} + +func validateSerializableOperation(t *testing.T, replay *model.EtcdReplay, request model.EtcdRequest, response model.MaybeEtcdResponse) { + if response.PartialResponse || response.Err != nil { + return + } + state, err := replay.StateForRevision(request.Range.Revision) + if err != nil { + t.Fatal(err) + } + + _, expectResp := state.Step(request) + if !reflect.DeepEqual(response.EtcdResponse.Range, expectResp.Range) { + t.Errorf("Invalid serializable response, diff: %s", cmp.Diff(response.EtcdResponse.Range, expectResp.Range)) } } diff --git a/tests/robustness/validate/validate.go b/tests/robustness/validate/validate.go index d7a9a806dcd..c664053fb83 100644 --- a/tests/robustness/validate/validate.go +++ b/tests/robustness/validate/validate.go @@ -24,14 +24,13 @@ import ( "go.etcd.io/etcd/tests/v3/robustness/traffic" ) -// ValidateAndReturnVisualize return visualize as porcupine.linearizationInfo used to generate visualization is private. -func ValidateAndReturnVisualize(t *testing.T, lg *zap.Logger, cfg Config, reports []traffic.ClientReport) (visualize func(basepath string)) { - validateWatch(t, cfg, reports) - // TODO: Validate stale reads responses. +// ValidateAndReturnVisualize returns visualize as porcupine.linearizationInfo used to generate visualization is private. +func ValidateAndReturnVisualize(t *testing.T, lg *zap.Logger, cfg Config, reports []traffic.ClientReport) (visualize func(basepath string) error) { + eventHistory := validateWatch(t, cfg, reports) allOperations := operations(reports) watchEvents := uniqueWatchEvents(reports) - newOperations := patchOperationsWithWatchEvents(allOperations, watchEvents) - return validateOperationHistoryAndReturnVisualize(t, lg, newOperations) + patchedOperations := patchOperationsWithWatchEvents(allOperations, watchEvents) + return validateOperationsAndVisualize(t, lg, patchedOperations, eventHistory) } func operations(reports []traffic.ClientReport) []porcupine.Operation { diff --git a/tests/robustness/validate/watch.go b/tests/robustness/validate/watch.go index 7b3d98c2409..cb2380f6e37 100644 --- a/tests/robustness/validate/watch.go +++ b/tests/robustness/validate/watch.go @@ -19,10 +19,11 @@ import ( "github.com/google/go-cmp/cmp" + "go.etcd.io/etcd/tests/v3/robustness/model" "go.etcd.io/etcd/tests/v3/robustness/traffic" ) -func validateWatch(t *testing.T, cfg Config, reports []traffic.ClientReport) { +func validateWatch(t *testing.T, cfg Config, reports []traffic.ClientReport) []model.WatchEvent { // Validate etcd watch properties defined in https://etcd.io/docs/v3.6/learning/api_guarantees/#watch-apis for _, r := range reports { validateOrdered(t, r) @@ -34,8 +35,10 @@ func validateWatch(t *testing.T, cfg Config, reports []traffic.ClientReport) { validateEventsMatch(t, reports) // Expects that longest history encompasses all events. // TODO: Use combined events from all histories instead of the longest history. + eventHistory := longestEventHistory(reports) // TODO: Validate that each watch report is reliable, not only the longest one. - validateReliable(t, longestEventHistory(reports)) + validateReliable(t, eventHistory) + return watchEvents(eventHistory) } func validateBookmarkable(t *testing.T, report traffic.ClientReport) { @@ -127,7 +130,7 @@ func validateEventsMatch(t *testing.T, reports []traffic.ClientReport) { key string } type eventClientId struct { - traffic.WatchEvent + model.WatchEvent ClientId int } revisionKeyToEvent := map[revisionKey]eventClientId{} @@ -158,3 +161,11 @@ func longestEventHistory(report []traffic.ClientReport) []traffic.TimedWatchEven } return toWatchEvents(report[longestIndex].Watch) } + +func watchEvents(timed []traffic.TimedWatchEvent) []model.WatchEvent { + result := make([]model.WatchEvent, 0, len(timed)) + for _, event := range timed { + result = append(result, event.WatchEvent) + } + return result +}