Skip to content

Commit

Permalink
Merge pull request etcd-io#16072 from serathius/robustness-stale-read
Browse files Browse the repository at this point in the history
Validate stale read
  • Loading branch information
serathius committed Jun 19, 2023
2 parents 1420292 + 1663600 commit 9c659eb
Show file tree
Hide file tree
Showing 10 changed files with 229 additions and 58 deletions.
24 changes: 12 additions & 12 deletions tests/robustness/model/deterministic.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ var DeterministicModel = porcupine.Model{
return string(data)
},
Step: func(st interface{}, in interface{}, out interface{}) (bool, interface{}) {
var s etcdState
var s EtcdState
err := json.Unmarshal([]byte(st.(string)), &s)
if err != nil {
panic(err)
}
ok, s := s.Step(in.(EtcdRequest), out.(EtcdResponse))
ok, s := s.apply(in.(EtcdRequest), out.(EtcdResponse))
data, err := json.Marshal(s)
if err != nil {
panic(err)
Expand All @@ -64,29 +64,29 @@ var DeterministicModel = porcupine.Model{
},
}

type etcdState struct {
type EtcdState struct {
Revision int64
KeyValues map[string]ValueRevision
KeyLeases map[string]int64
Leases map[int64]EtcdLease
}

func (s etcdState) Step(request EtcdRequest, response EtcdResponse) (bool, etcdState) {
newState, modelResponse := s.step(request)
func (s EtcdState) apply(request EtcdRequest, response EtcdResponse) (bool, EtcdState) {
newState, modelResponse := s.Step(request)
return Match(MaybeEtcdResponse{EtcdResponse: response}, modelResponse), newState
}

func freshEtcdState() etcdState {
return etcdState{
func freshEtcdState() EtcdState {
return EtcdState{
Revision: 1,
KeyValues: map[string]ValueRevision{},
KeyLeases: map[string]int64{},
Leases: map[int64]EtcdLease{},
}
}

// step handles a successful request, returning updated state and response it would generate.
func (s etcdState) step(request EtcdRequest) (etcdState, MaybeEtcdResponse) {
// Step handles a successful request, returning updated state and response it would generate.
func (s EtcdState) Step(request EtcdRequest) (EtcdState, MaybeEtcdResponse) {
newKVs := map[string]ValueRevision{}
for k, v := range s.KeyValues {
newKVs[k] = v
Expand Down Expand Up @@ -185,7 +185,7 @@ func (s etcdState) step(request EtcdRequest) (etcdState, MaybeEtcdResponse) {
}
}

func (s etcdState) getRange(key string, options RangeOptions) RangeResponse {
func (s EtcdState) getRange(key string, options RangeOptions) RangeResponse {
response := RangeResponse{
KVs: []KeyValue{},
}
Expand Down Expand Up @@ -217,15 +217,15 @@ func (s etcdState) getRange(key string, options RangeOptions) RangeResponse {
return response
}

func detachFromOldLease(s etcdState, key string) etcdState {
func detachFromOldLease(s EtcdState, key string) EtcdState {
if oldLeaseId, ok := s.KeyLeases[key]; ok {
delete(s.Leases[oldLeaseId].Keys, key)
delete(s.KeyLeases, key)
}
return s
}

func attachToNewLease(s etcdState, leaseID int64, key string) etcdState {
func attachToNewLease(s EtcdState, leaseID int64, key string) EtcdState {
s.KeyLeases[key] = leaseID
s.Leases[leaseID].Keys[key] = leased
return s
Expand Down
4 changes: 2 additions & 2 deletions tests/robustness/model/deterministic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ func TestModelDeterministic(t *testing.T) {
if op.expectFailure == ok {
t.Logf("state: %v", state)
t.Errorf("Unexpected operation result, expect: %v, got: %v, operation: %s", !op.expectFailure, ok, DeterministicModel.DescribeOperation(op.req, op.resp.EtcdResponse))
var loadedState etcdState
var loadedState EtcdState
err := json.Unmarshal([]byte(state.(string)), &loadedState)
if err != nil {
t.Fatalf("Failed to load state: %v", err)
}
_, resp := loadedState.step(op.req)
_, resp := loadedState.Step(op.req)
t.Errorf("Response diff: %s", cmp.Diff(op.resp, resp))
break
}
Expand Down
30 changes: 15 additions & 15 deletions tests/robustness/model/non_deterministic.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ var NonDeterministicModel = porcupine.Model{
if err != nil {
panic(err)
}
ok, states := states.Step(in.(EtcdRequest), out.(MaybeEtcdResponse))
ok, states := states.apply(in.(EtcdRequest), out.(MaybeEtcdResponse))
data, err := json.Marshal(states)
if err != nil {
panic(err)
Expand All @@ -52,51 +52,51 @@ var NonDeterministicModel = porcupine.Model{
},
}

type nonDeterministicState []etcdState
type nonDeterministicState []EtcdState

func (states nonDeterministicState) Step(request EtcdRequest, response MaybeEtcdResponse) (bool, nonDeterministicState) {
func (states nonDeterministicState) apply(request EtcdRequest, response MaybeEtcdResponse) (bool, nonDeterministicState) {
var newStates nonDeterministicState
switch {
case response.Err != nil:
newStates = states.stepFailedRequest(request)
newStates = states.stepFailedResponse(request)
case response.PartialResponse:
newStates = states.stepPartialRequest(request, response.EtcdResponse.Revision)
newStates = states.applyResponseRevision(request, response.EtcdResponse.Revision)
default:
newStates = states.stepSuccessfulRequest(request, response.EtcdResponse)
newStates = states.applySuccessfulResponse(request, response.EtcdResponse)
}
return len(newStates) > 0, newStates
}

// stepFailedRequest duplicates number of states by considering request persisted and lost.
func (states nonDeterministicState) stepFailedRequest(request EtcdRequest) nonDeterministicState {
// stepFailedResponse duplicates number of states by considering both cases, request was persisted and request was lost.
func (states nonDeterministicState) stepFailedResponse(request EtcdRequest) nonDeterministicState {
newStates := make(nonDeterministicState, 0, len(states)*2)
for _, s := range states {
newStates = append(newStates, s)
newState, _ := s.step(request)
newState, _ := s.Step(request)
if !reflect.DeepEqual(newState, s) {
newStates = append(newStates, newState)
}
}
return newStates
}

// stepPartialRequest filters possible states by leaving ony states that would return proper revision.
func (states nonDeterministicState) stepPartialRequest(request EtcdRequest, responseRevision int64) nonDeterministicState {
// applyResponseRevision filters possible states by leaving ony states that would return proper revision.
func (states nonDeterministicState) applyResponseRevision(request EtcdRequest, responseRevision int64) nonDeterministicState {
newStates := make(nonDeterministicState, 0, len(states))
for _, s := range states {
newState, modelResponse := s.step(request)
newState, modelResponse := s.Step(request)
if modelResponse.Revision == responseRevision {
newStates = append(newStates, newState)
}
}
return newStates
}

// stepSuccessfulRequest filters possible states by leaving ony states that would respond correctly.
func (states nonDeterministicState) stepSuccessfulRequest(request EtcdRequest, response EtcdResponse) nonDeterministicState {
// applySuccessfulResponse filters possible states by leaving ony states that would respond correctly.
func (states nonDeterministicState) applySuccessfulResponse(request EtcdRequest, response EtcdResponse) nonDeterministicState {
newStates := make(nonDeterministicState, 0, len(states))
for _, s := range states {
newState, modelResponse := s.step(request)
newState, modelResponse := s.Step(request)
if Match(modelResponse, MaybeEtcdResponse{EtcdResponse: response}) {
newStates = append(newStates, newState)
}
Expand Down
2 changes: 1 addition & 1 deletion tests/robustness/model/non_deterministic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ func TestModelNonDeterministic(t *testing.T) {
t.Fatalf("Failed to load state: %v", err)
}
for i, s := range loadedState {
_, resp := s.step(op.req)
_, resp := s.Step(op.req)
t.Errorf("For state %d, response diff: %s", i, cmp.Diff(op.resp, resp))
}
break
Expand Down
99 changes: 99 additions & 0 deletions tests/robustness/model/replay.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright 2023 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package model

import (
"fmt"
)

func NewReplay(eventHistory []WatchEvent) *EtcdReplay {
var lastEventRevision int64 = 1
for _, event := range eventHistory {
if event.Revision > lastEventRevision && event.Revision != lastEventRevision+1 {
panic("Replay requires a complete event history")
}
lastEventRevision = event.Revision
}
return &EtcdReplay{
eventHistory: eventHistory,
}
}

type EtcdReplay struct {
eventHistory []WatchEvent

// Cached state and event index used for it's calculation
cachedState *EtcdState
eventHistoryIndex int
}

func (r *EtcdReplay) StateForRevision(revision int64) (EtcdState, error) {
if revision < 1 {
return EtcdState{}, fmt.Errorf("invalid revision: %d", revision)
}
if r.cachedState == nil || r.cachedState.Revision > revision {
r.reset()
}

for r.eventHistoryIndex < len(r.eventHistory) && r.cachedState.Revision < revision {
nextRequest, nextRevision, nextIndex := r.next()
newState, _ := r.cachedState.Step(nextRequest)
if newState.Revision != nextRevision {
return EtcdState{}, fmt.Errorf("model returned different revision than one present in event history, model: %d, event: %d", newState.Revision, nextRevision)
}
r.cachedState = &newState
r.eventHistoryIndex = nextIndex
}
if r.eventHistoryIndex > len(r.eventHistory) && r.cachedState.Revision < revision {
return EtcdState{}, fmt.Errorf("requested revision higher then available in even history, requested: %d, model: %d", revision, r.cachedState.Revision)
}
return *r.cachedState, nil
}

func (r *EtcdReplay) reset() {
state := freshEtcdState()
r.cachedState = &state
r.eventHistoryIndex = 0
}

func (r *EtcdReplay) next() (request EtcdRequest, revision int64, index int) {
revision = r.eventHistory[r.eventHistoryIndex].Revision
index = r.eventHistoryIndex
operations := []EtcdOperation{}
for r.eventHistory[index].Revision == revision {
operations = append(operations, r.eventHistory[index].Op)
index++
}
return EtcdRequest{
Type: Txn,
Txn: &TxnRequest{
OperationsOnSuccess: operations,
},
}, revision, index
}

func operationToRequest(op EtcdOperation) EtcdRequest {
return EtcdRequest{
Type: Txn,
Txn: &TxnRequest{
OperationsOnSuccess: []EtcdOperation{op},
},
}
}

type WatchEvent struct {
Op EtcdOperation
Revision int64
}
7 changes: 5 additions & 2 deletions tests/robustness/report.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type report struct {
lg *zap.Logger
clus *e2e.EtcdProcessCluster
clientReports []traffic.ClientReport
visualizeHistory func(path string)
visualizeHistory func(path string) error
}

func testResultsDirectory(t *testing.T) string {
Expand Down Expand Up @@ -89,7 +89,10 @@ func (r *report) Report(t *testing.T, force bool) {
}
}
if r.visualizeHistory != nil {
r.visualizeHistory(filepath.Join(path, "history.html"))
err := r.visualizeHistory(filepath.Join(path, "history.html"))
if err != nil {
t.Error(err)
}
}
}

Expand Down
13 changes: 4 additions & 9 deletions tests/robustness/traffic/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,22 +46,17 @@ type RecordingClient struct {
}

type WatchResponse struct {
Events []WatchEvent
Events []model.WatchEvent
IsProgressNotify bool
Revision int64
Time time.Duration
}

type TimedWatchEvent struct {
WatchEvent
model.WatchEvent
Time time.Duration
}

type WatchEvent struct {
Op model.EtcdOperation
Revision int64
}

func NewClient(endpoints []string, ids identity.Provider, baseTime time.Time) (*RecordingClient, error) {
cc, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
Expand Down Expand Up @@ -259,7 +254,7 @@ func ToWatchResponse(r clientv3.WatchResponse, baseTime time.Time) WatchResponse
return resp
}

func toWatchEvent(event clientv3.Event) WatchEvent {
func toWatchEvent(event clientv3.Event) model.WatchEvent {
var op model.OperationType
switch event.Type {
case mvccpb.PUT:
Expand All @@ -269,7 +264,7 @@ func toWatchEvent(event clientv3.Event) WatchEvent {
default:
panic(fmt.Sprintf("Unexpected event type: %s", event.Type))
}
return WatchEvent{
return model.WatchEvent{
Revision: event.Kv.ModRevision,
Op: model.EtcdOperation{
Type: op,
Expand Down
Loading

0 comments on commit 9c659eb

Please sign in to comment.