Skip to content

Commit

Permalink
Merge pull request #17833 from serathius/robustness-compact
Browse files Browse the repository at this point in the history
Implement compaction support in robustness test
  • Loading branch information
serathius committed Jun 7, 2024
2 parents 2ffaf5f + 5959110 commit b38d863
Show file tree
Hide file tree
Showing 13 changed files with 178 additions and 57 deletions.
4 changes: 4 additions & 0 deletions tests/robustness/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,13 @@ func (c *RecordingClient) Defragment(ctx context.Context) (*clientv3.DefragmentR
func (c *RecordingClient) Compact(ctx context.Context, rev int64) (*clientv3.CompactResponse, error) {
c.kvMux.Lock()
defer c.kvMux.Unlock()
callTime := time.Since(c.baseTime)
resp, err := c.client.Compact(ctx, rev)
returnTime := time.Since(c.baseTime)
c.kvOperations.AppendCompact(rev, callTime, returnTime, resp, err)
return resp, err
}

func (c *RecordingClient) MemberList(ctx context.Context, opts ...clientv3.OpOption) (*clientv3.MemberListResponse, error) {
c.kvMux.Lock()
defer c.kvMux.Unlock()
Expand Down
14 changes: 12 additions & 2 deletions tests/robustness/failpoint/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,22 @@ func (t triggerCompact) Trigger(ctx context.Context, _ *testing.T, member e2e.Et
}
_, err = cc.Compact(ctx, rev)
if err != nil && !connectionError(err) {
return nil, err
return nil, fmt.Errorf("failed to compact: %w", err)
}
return []report.ClientReport{cc.Report()}, nil
}

func (t triggerCompact) Available(e2e.EtcdProcessClusterConfig, e2e.EtcdProcess) bool {
func (t triggerCompact) Available(config e2e.EtcdProcessClusterConfig, _ e2e.EtcdProcess) bool {
// Since introduction of compaction into traffic, injecting compaction failpoints started interfeering with peer proxy.
// TODO: Re-enable the peer proxy for compact failpoints when we confirm the root cause.
if config.PeerProxy {
return false
}
// For multiBatchCompaction we need to guarantee that there are enough revisions between two compaction requests.
// With addition of compaction requests to traffic this might be hard if experimental-compaction-batch-limit is too high.
if t.multiBatchCompaction {
return config.ServerConfig.ExperimentalCompactionBatchLimit <= 10
}
return true
}

Expand Down
7 changes: 7 additions & 0 deletions tests/robustness/model/describe.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ func describeEtcdResponse(request EtcdRequest, response MaybeEtcdResponse) strin
if response.Error != "" {
return fmt.Sprintf("err: %q", response.Error)
}
if response.ClientError != "" {
return fmt.Sprintf("err: %q", response.ClientError)
}
if response.PartialResponse {
return fmt.Sprintf("unknown, rev: %d", response.Revision)
}
Expand All @@ -38,6 +41,8 @@ func describeEtcdResponse(request EtcdRequest, response MaybeEtcdResponse) strin
return "ok"
}
return fmt.Sprintf("ok, rev: %d", response.Revision)
case Compact:
return "ok"
default:
return fmt.Sprintf("<! unknown request type: %q !>", request.Type)
}
Expand Down Expand Up @@ -67,6 +72,8 @@ func describeEtcdRequest(request EtcdRequest) string {
return fmt.Sprintf("leaseRevoke(%d)", request.LeaseRevoke.LeaseID)
case Defragment:
return fmt.Sprintf("defragment()")
case Compact:
return fmt.Sprintf("compact(%d)", request.Compact.Revision)
default:
return fmt.Sprintf("<! unknown request type: %q !>", request.Type)
}
Expand Down
48 changes: 39 additions & 9 deletions tests/robustness/model/deterministic.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"sort"

"github.com/anishathalye/porcupine"

"go.etcd.io/etcd/server/v3/storage/mvcc"
)

// DeterministicModel assumes a deterministic execution of etcd requests. All
Expand Down Expand Up @@ -65,10 +67,11 @@ var DeterministicModel = porcupine.Model{
}

type EtcdState struct {
Revision int64
KeyValues map[string]ValueRevision
KeyLeases map[string]int64
Leases map[int64]EtcdLease
Revision int64
CompactRevision int64
KeyValues map[string]ValueRevision
KeyLeases map[string]int64
Leases map[int64]EtcdLease
}

func (s EtcdState) apply(request EtcdRequest, response EtcdResponse) (bool, EtcdState) {
Expand All @@ -77,7 +80,10 @@ func (s EtcdState) apply(request EtcdRequest, response EtcdResponse) (bool, Etcd
}

func (s EtcdState) DeepCopy() EtcdState {
newState := EtcdState{Revision: s.Revision}
newState := EtcdState{
Revision: s.Revision,
CompactRevision: s.CompactRevision,
}

newState.KeyValues = maps.Clone(s.KeyValues)
newState.KeyLeases = maps.Clone(s.KeyLeases)
Expand All @@ -92,10 +98,12 @@ func (s EtcdState) DeepCopy() EtcdState {

func freshEtcdState() EtcdState {
return EtcdState{
Revision: 1,
KeyValues: map[string]ValueRevision{},
KeyLeases: map[string]int64{},
Leases: map[int64]EtcdLease{},
Revision: 1,
// Start from CompactRevision equal -1 as etcd allows client to compact revision 0 for some reason.
CompactRevision: -1,
KeyValues: map[string]ValueRevision{},
KeyLeases: map[string]int64{},
Leases: map[int64]EtcdLease{},
}
}

Expand All @@ -112,6 +120,9 @@ func (s EtcdState) Step(request EtcdRequest) (EtcdState, MaybeEtcdResponse) {
if request.Range.Revision > newState.Revision {
return newState, MaybeEtcdResponse{Error: ErrEtcdFutureRev.Error()}
}
if request.Range.Revision < newState.CompactRevision {
return newState, MaybeEtcdResponse{EtcdResponse: EtcdResponse{ClientError: mvcc.ErrCompacted.Error()}}
}
return newState, MaybeEtcdResponse{PartialResponse: true, EtcdResponse: EtcdResponse{Revision: newState.Revision}}
case Txn:
failure := false
Expand Down Expand Up @@ -190,6 +201,14 @@ func (s EtcdState) Step(request EtcdRequest) (EtcdState, MaybeEtcdResponse) {
return newState, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Revision: newState.Revision, LeaseRevoke: &LeaseRevokeResponse{}}}
case Defragment:
return newState, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Defragment: &DefragmentResponse{}, Revision: newState.Revision}}
case Compact:
if request.Compact.Revision <= newState.CompactRevision {
return newState, MaybeEtcdResponse{EtcdResponse: EtcdResponse{ClientError: mvcc.ErrCompacted.Error()}}
}
newState.CompactRevision = request.Compact.Revision
// Set fake revision as compaction returns non-linearizable revision.
// TODO: Model non-linearizable response revision in model.
return newState, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Compact: &CompactResponse{}, Revision: -1}}
default:
panic(fmt.Sprintf("Unknown request type: %v", request.Type))
}
Expand Down Expand Up @@ -249,6 +268,7 @@ const (
LeaseGrant RequestType = "leaseGrant"
LeaseRevoke RequestType = "leaseRevoke"
Defragment RequestType = "defragment"
Compact RequestType = "compact"
)

type EtcdRequest struct {
Expand All @@ -258,6 +278,7 @@ type EtcdRequest struct {
Range *RangeRequest
Txn *TxnRequest
Defragment *DefragmentRequest
Compact *CompactRequest
}

func (r *EtcdRequest) IsRead() bool {
Expand Down Expand Up @@ -349,6 +370,8 @@ type EtcdResponse struct {
LeaseGrant *LeaseGrantReponse
LeaseRevoke *LeaseRevokeResponse
Defragment *DefragmentResponse
Compact *CompactResponse
ClientError string
Revision int64
}

Expand Down Expand Up @@ -417,3 +440,10 @@ func ToValueOrHash(value string) ValueOrHash {
}
return v
}

type CompactResponse struct {
}

type CompactRequest struct {
Revision int64
}
27 changes: 27 additions & 0 deletions tests/robustness/model/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ package model

import (
"fmt"
"strings"
"time"

"github.com/anishathalye/porcupine"

"go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/server/v3/storage/mvcc"
"go.etcd.io/etcd/tests/v3/robustness/identity"
)

Expand Down Expand Up @@ -259,6 +261,23 @@ func (h *AppendableHistory) AppendDefragment(start, end time.Duration, resp *cli
h.appendSuccessful(request, start, end, defragmentResponse(revision))
}

func (h *AppendableHistory) AppendCompact(rev int64, start, end time.Duration, resp *clientv3.CompactResponse, err error) {
request := compactRequest(rev)
if err != nil {
if strings.Contains(err.Error(), mvcc.ErrCompacted.Error()) {
h.appendSuccessful(request, start, end, MaybeEtcdResponse{
EtcdResponse: EtcdResponse{ClientError: mvcc.ErrCompacted.Error()},
})
return
}
h.appendFailed(request, start, end, err)
return
}
// Set fake revision as compaction returns non-linearizable revision.
// TODO: Model non-linearizable response revision in model.
h.appendSuccessful(request, start, end, compactResponse(-1))
}

func (h *AppendableHistory) appendFailed(request EtcdRequest, start, end time.Duration, err error) {
op := porcupine.Operation{
ClientId: h.streamID,
Expand Down Expand Up @@ -444,6 +463,14 @@ func defragmentResponse(revision int64) MaybeEtcdResponse {
return MaybeEtcdResponse{EtcdResponse: EtcdResponse{Defragment: &DefragmentResponse{}, Revision: revision}}
}

func compactRequest(rev int64) EtcdRequest {
return EtcdRequest{Type: Compact, Compact: &CompactRequest{Revision: rev}}
}

func compactResponse(revision int64) MaybeEtcdResponse {
return MaybeEtcdResponse{EtcdResponse: EtcdResponse{Compact: &CompactResponse{}, Revision: revision}}
}

type History struct {
operations []porcupine.Operation
}
Expand Down
6 changes: 6 additions & 0 deletions tests/robustness/options/server_config_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ func WithSnapshotCount(input ...uint64) e2e.EPClusterOption {
}
}

func WithCompactionBatchLimit(input ...int) e2e.EPClusterOption {
return func(c *e2e.EtcdProcessClusterConfig) {
c.ServerConfig.ExperimentalCompactionBatchLimit = input[internalRand.Intn(len(input))]
}
}

func WithSnapshotCatchUpEntries(input ...uint64) e2e.EPClusterOption {
return func(c *e2e.EtcdProcessClusterConfig) {
c.ServerConfig.SnapshotCatchUpEntries = input[internalRand.Intn(len(input))]
Expand Down
6 changes: 5 additions & 1 deletion tests/robustness/report/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,11 @@ func parseEntryNormal(ent raftpb.Entry) (*model.EtcdRequest, error) {
case raftReq.ClusterVersionSet != nil:
return nil, nil
case raftReq.Compaction != nil:
return nil, nil
request := model.EtcdRequest{
Type: model.Compact,
Compact: &model.CompactRequest{Revision: raftReq.Compaction.Revision},
}
return &request, nil
case raftReq.Txn != nil:
txn := model.TxnRequest{
Conditions: []model.EtcdCondition{},
Expand Down
3 changes: 2 additions & 1 deletion tests/robustness/scenarios.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ func exploratoryScenarios(_ *testing.T) []testScenario {
options.WithSnapshotCount(50, 100, 1000),
options.WithSubsetOptions(randomizableOptions...),
e2e.WithGoFailEnabled(true),
e2e.WithCompactionBatchLimit(100),
// Set low minimal compaction batch limit to allow for triggering multi batch compaction failpoints.
options.WithCompactionBatchLimit(10, 100, 1000),
e2e.WithWatchProcessNotifyInterval(100 * time.Millisecond),
}

Expand Down
15 changes: 12 additions & 3 deletions tests/robustness/traffic/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,14 @@ var (
{choice: List, weight: 15},
{choice: StaleGet, weight: 10},
{choice: StaleList, weight: 10},
{choice: Put, weight: 23},
{choice: LargePut, weight: 2},
{choice: Delete, weight: 5},
{choice: MultiOpTxn, weight: 5},
{choice: PutWithLease, weight: 5},
{choice: LeaseRevoke, weight: 5},
{choice: CompareAndSet, weight: 5},
{choice: Put, weight: 15},
{choice: LargePut, weight: 5},
{choice: Compact, weight: 5},
},
}
EtcdPut = etcdTraffic{
Expand All @@ -56,9 +57,10 @@ var (
{choice: List, weight: 15},
{choice: StaleGet, weight: 10},
{choice: StaleList, weight: 10},
{choice: Put, weight: 40},
{choice: MultiOpTxn, weight: 5},
{choice: LargePut, weight: 5},
{choice: Put, weight: 35},
{choice: Compact, weight: 5},
},
}
)
Expand Down Expand Up @@ -89,6 +91,7 @@ const (
LeaseRevoke etcdRequestType = "leaseRevoke"
CompareAndSet etcdRequestType = "compareAndSet"
Defragment etcdRequestType = "defragment"
Compact etcdRequestType = "compact"
)

func (t etcdTraffic) Name() string {
Expand Down Expand Up @@ -266,6 +269,12 @@ func (c etcdTrafficClient) Request(ctx context.Context, request etcdRequestType,
if resp != nil {
rev = resp.Header.Revision
}
case Compact:
var resp *clientv3.CompactResponse
resp, err = c.client.Compact(opCtx, lastRev)
if resp != nil {
rev = resp.Header.Revision
}
default:
panic("invalid choice")
}
Expand Down
17 changes: 13 additions & 4 deletions tests/robustness/traffic/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ var (
resource: "pods",
namespace: "default",
writeChoices: []choiceWeight[KubernetesRequestType]{
{choice: KubernetesUpdate, weight: 90},
{choice: KubernetesUpdate, weight: 85},
{choice: KubernetesDelete, weight: 5},
{choice: KubernetesCreate, weight: 5},
{choice: KubernetesCompact, weight: 5},
},
}
)
Expand Down Expand Up @@ -168,6 +169,8 @@ func (t kubernetesTraffic) Write(ctx context.Context, kc *kubernetesClient, ids
_, err = kc.OptimisticUpdate(writeCtx, key, fmt.Sprintf("%d", ids.NewRequestID()), rev)
case KubernetesCreate:
err = kc.OptimisticCreate(writeCtx, t.generateKey(), fmt.Sprintf("%d", ids.NewRequestID()))
case KubernetesCompact:
err = kc.Compact(writeCtx, rev)
default:
panic(fmt.Sprintf("invalid choice: %q", op))
}
Expand Down Expand Up @@ -213,9 +216,10 @@ func (t kubernetesTraffic) generateKey() string {
type KubernetesRequestType string

const (
KubernetesDelete KubernetesRequestType = "delete"
KubernetesUpdate KubernetesRequestType = "update"
KubernetesCreate KubernetesRequestType = "create"
KubernetesDelete KubernetesRequestType = "delete"
KubernetesUpdate KubernetesRequestType = "update"
KubernetesCreate KubernetesRequestType = "create"
KubernetesCompact KubernetesRequestType = "compact"
)

type kubernetesClient struct {
Expand Down Expand Up @@ -254,6 +258,11 @@ func (k kubernetesClient) RequestProgress(ctx context.Context) error {
return k.client.RequestProgress(clientv3.WithRequireLeader(ctx))
}

func (k kubernetesClient) Compact(ctx context.Context, rev int64) error {
_, err := k.client.Compact(ctx, rev)
return err
}

// Kubernetes optimistically assumes that key didn't change since it was last observed, so it executes operations within a transaction conditioned on key not changing.
// However, if the keys value changed it wants imminently to read it, thus the Get operation on failure.
func (k kubernetesClient) optimisticOperationOrGet(ctx context.Context, key string, operation clientv3.Op, expectedRevision int64) (*mvccpb.KeyValue, error) {
Expand Down
2 changes: 2 additions & 0 deletions tests/robustness/validate/patch_history.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ func persistedOperationsReturnTime(allOperations []porcupine.Operation, persiste
}
case model.LeaseGrant:
case model.LeaseRevoke:
case model.Compact:
default:
panic(fmt.Sprintf("Unknown request type: %q", request.Type))
}
Expand All @@ -218,6 +219,7 @@ func operationReturnTime(operations []porcupine.Operation) map[model.EtcdOperati
case model.Range:
case model.LeaseGrant:
case model.LeaseRevoke:
case model.Compact:
default:
panic(fmt.Sprintf("Unknown request type: %q", request.Type))
}
Expand Down
Loading

0 comments on commit b38d863

Please sign in to comment.