Skip to content

Commit

Permalink
raft loop prober with counter
Browse files Browse the repository at this point in the history
Signed-off-by: Chao Chen <chaochn@amazon.com>
  • Loading branch information
chaochn47 committed Oct 18, 2023
1 parent bbf59a9 commit d126728
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 5 deletions.
31 changes: 30 additions & 1 deletion server/etcdserver/api/etcdhttp/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@ import (
"net/http"
"path"
"strings"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"golang.org/x/time/rate"

pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/client/pkg/v3/types"
Expand All @@ -36,6 +39,8 @@ import (
const (
PathHealth = "/health"
PathProxyHealth = "/proxy/health"

LivezRaftLoopDeadLockCheckInterval = 5 * time.Second
)

type ServerHealth interface {
Expand All @@ -44,6 +49,7 @@ type ServerHealth interface {
Range(context.Context, *pb.RangeRequest) (*pb.RangeResponse, error)
Config() config.ServerConfig
AuthStore() auth.AuthStore
TickElapsed() uint64
}

// HandleHealth registers metrics and health handlers. it checks health by using v3 range request
Expand Down Expand Up @@ -203,11 +209,19 @@ type HealthCheck func(ctx context.Context) error
type CheckRegistry struct {
path string
checks map[string]HealthCheck

// tickElapsed and rate limiter is only used in livez check registry.
mu sync.Mutex
lastRaftTickElapsed uint64

rateLimiter *rate.Limiter
}

func installLivezEndpoints(lg *zap.Logger, mux *http.ServeMux, server ServerHealth) {
reg := CheckRegistry{path: "/livez", checks: make(map[string]HealthCheck)}
rl := rate.NewLimiter(rate.Every(LivezRaftLoopDeadLockCheckInterval), 1)
reg := CheckRegistry{path: "/livez", checks: make(map[string]HealthCheck), rateLimiter: rl}
reg.Register("serializable_read", serializableReadCheck(server))
reg.Register("raft_loop_progress", raftLoopDeadLockCheck(server, &reg))
reg.InstallHttpEndpoints(lg, mux)
}

Expand Down Expand Up @@ -365,3 +379,18 @@ func serializableReadCheck(srv ServerHealth) func(ctx context.Context) error {
return nil
}
}

func raftLoopDeadLockCheck(srv ServerHealth, reg *CheckRegistry) func(ctx context.Context) error {
return func(ctx context.Context) error {
if reg.rateLimiter.Allow() {
tickElapsed := srv.TickElapsed()
reg.mu.Lock()
defer reg.mu.Unlock()
if tickElapsed <= reg.lastRaftTickElapsed {
return fmt.Errorf("raft loop dead lock")
}
reg.lastRaftTickElapsed = tickElapsed
}
return nil
}
}
2 changes: 2 additions & 0 deletions server/etcdserver/api/etcdhttp/health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ func (s *fakeHealthServer) Leader() types.ID {

func (s *fakeHealthServer) AuthStore() auth.AuthStore { return s.authStore }

func (s *fakeHealthServer) TickElapsed() uint64 { return 1 }

func (s *fakeHealthServer) ClientCertAuthEnabled() bool { return false }

type healthTestCase struct {
Expand Down
13 changes: 11 additions & 2 deletions server/etcdserver/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ import (

"go.uber.org/zap"

"go.etcd.io/raft/v3"
"go.etcd.io/raft/v3/raftpb"

"go.etcd.io/etcd/client/pkg/v3/logutil"
"go.etcd.io/etcd/pkg/v3/contention"
"go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp"
serverstorage "go.etcd.io/etcd/server/v3/storage"
"go.etcd.io/raft/v3"
"go.etcd.io/raft/v3/raftpb"
)

const (
Expand Down Expand Up @@ -82,6 +83,7 @@ type raftNode struct {

tickMu *sync.Mutex
raftNodeConfig
tickElapsed uint64

// a chan to send/receive snapshot
msgSnapC chan raftpb.Message
Expand Down Expand Up @@ -155,9 +157,16 @@ func newRaftNode(cfg raftNodeConfig) *raftNode {
func (r *raftNode) tick() {
r.tickMu.Lock()
r.Tick()
r.tickElapsed++
r.tickMu.Unlock()
}

func (r *raftNode) safeReadTickElapsed() uint64 {
r.tickMu.Lock()
defer r.tickMu.Unlock()
return r.tickElapsed
}

// start prepares and starts raftNode in a new goroutine. It is no longer safe
// to modify the fields after it has been started.
func (r *raftNode) start(rh *raftReadyHandler) {
Expand Down
9 changes: 7 additions & 2 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ import (
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"

"go.etcd.io/raft/v3"
"go.etcd.io/raft/v3/raftpb"

pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/membershippb"
"go.etcd.io/etcd/api/v3/version"
Expand Down Expand Up @@ -67,8 +70,6 @@ import (
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/mvcc"
"go.etcd.io/etcd/server/v3/storage/schema"
"go.etcd.io/raft/v3"
"go.etcd.io/raft/v3/raftpb"
)

const (
Expand Down Expand Up @@ -1643,6 +1644,10 @@ func (s *EtcdServer) AppliedIndex() uint64 { return s.getAppliedIndex() }

func (s *EtcdServer) Term() uint64 { return s.getTerm() }

// TickElapsed returns the raft tick elapsed counter.
// It is used to check if etcdserver raft loop is deadlocked.
func (s *EtcdServer) TickElapsed() uint64 { return s.r.safeReadTickElapsed() }

type confChangeResponse struct {
membs []*membership.Member
raftAdvanceC <-chan struct{}
Expand Down

0 comments on commit d126728

Please sign in to comment.