From 8446d14c35c5b3c9428cd31d84df370b607fefa0 Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Fri, 17 Aug 2018 16:17:17 -0700 Subject: [PATCH 1/5] etcdserver/api/rafthttp: rename to "pipelineProber" Preliminary work to add prober to "streamRt" Signed-off-by: Gyuho Lee --- etcdserver/api/rafthttp/transport.go | 14 +++++----- etcdserver/api/rafthttp/transport_test.go | 34 +++++++++++------------ 2 files changed, 24 insertions(+), 24 deletions(-) diff --git a/etcdserver/api/rafthttp/transport.go b/etcdserver/api/rafthttp/transport.go index e52acfceb93..78b15956520 100644 --- a/etcdserver/api/rafthttp/transport.go +++ b/etcdserver/api/rafthttp/transport.go @@ -130,7 +130,7 @@ type Transport struct { remotes map[types.ID]*remote // remotes map that helps newly joined member to catch up peers map[types.ID]Peer // peers map - prober probing.Prober + pipelineProber probing.Prober } func (t *Transport) Start() error { @@ -145,7 +145,7 @@ func (t *Transport) Start() error { } t.remotes = make(map[types.ID]*remote) t.peers = make(map[types.ID]Peer) - t.prober = probing.NewProber(t.pipelineRt) + t.pipelineProber = probing.NewProber(t.pipelineRt) // If client didn't provide dial retry frequency, use the default // (100ms backoff between attempts to create a new stream), @@ -221,7 +221,7 @@ func (t *Transport) Stop() { for _, p := range t.peers { p.stop() } - t.prober.RemoveAll() + t.pipelineProber.RemoveAll() if tr, ok := t.streamRt.(*http.Transport); ok { tr.CloseIdleConnections() } @@ -317,7 +317,7 @@ func (t *Transport) AddPeer(id types.ID, us []string) { } fs := t.LeaderStats.Follower(id.String()) t.peers[id] = startPeer(t, urls, id, fs) - addPeerToProber(t.Logger, t.prober, id.String(), us) + addPeerToProber(t.Logger, t.pipelineProber, id.String(), us) if t.Logger != nil { t.Logger.Info( @@ -358,7 +358,7 @@ func (t *Transport) removePeer(id types.ID) { } delete(t.peers, id) delete(t.LeaderStats.Followers, id.String()) - t.prober.Remove(id.String()) + t.pipelineProber.Remove(id.String()) if t.Logger != nil { t.Logger.Info( @@ -388,8 +388,8 @@ func (t *Transport) UpdatePeer(id types.ID, us []string) { } t.peers[id].update(urls) - t.prober.Remove(id.String()) - addPeerToProber(t.Logger, t.prober, id.String(), us) + t.pipelineProber.Remove(id.String()) + addPeerToProber(t.Logger, t.pipelineProber, id.String(), us) if t.Logger != nil { t.Logger.Info( diff --git a/etcdserver/api/rafthttp/transport_test.go b/etcdserver/api/rafthttp/transport_test.go index 9a58ce74fee..ef208ef8ed9 100644 --- a/etcdserver/api/rafthttp/transport_test.go +++ b/etcdserver/api/rafthttp/transport_test.go @@ -97,10 +97,10 @@ func TestTransportCutMend(t *testing.T) { func TestTransportAdd(t *testing.T) { ls := stats.NewLeaderStats("") tr := &Transport{ - LeaderStats: ls, - streamRt: &roundTripperRecorder{}, - peers: make(map[types.ID]Peer), - prober: probing.NewProber(nil), + LeaderStats: ls, + streamRt: &roundTripperRecorder{}, + peers: make(map[types.ID]Peer), + pipelineProber: probing.NewProber(nil), } tr.AddPeer(1, []string{"http://localhost:2380"}) @@ -125,10 +125,10 @@ func TestTransportAdd(t *testing.T) { func TestTransportRemove(t *testing.T) { tr := &Transport{ - LeaderStats: stats.NewLeaderStats(""), - streamRt: &roundTripperRecorder{}, - peers: make(map[types.ID]Peer), - prober: probing.NewProber(nil), + LeaderStats: stats.NewLeaderStats(""), + streamRt: &roundTripperRecorder{}, + peers: make(map[types.ID]Peer), + pipelineProber: probing.NewProber(nil), } tr.AddPeer(1, []string{"http://localhost:2380"}) tr.RemovePeer(types.ID(1)) @@ -142,8 +142,8 @@ func TestTransportRemove(t *testing.T) { func TestTransportUpdate(t *testing.T) { peer := newFakePeer() tr := &Transport{ - peers: map[types.ID]Peer{types.ID(1): peer}, - prober: probing.NewProber(nil), + peers: map[types.ID]Peer{types.ID(1): peer}, + pipelineProber: probing.NewProber(nil), } u := "http://localhost:2380" tr.UpdatePeer(types.ID(1), []string{u}) @@ -156,13 +156,13 @@ func TestTransportUpdate(t *testing.T) { func TestTransportErrorc(t *testing.T) { errorc := make(chan error, 1) tr := &Transport{ - Raft: &fakeRaft{}, - LeaderStats: stats.NewLeaderStats(""), - ErrorC: errorc, - streamRt: newRespRoundTripper(http.StatusForbidden, nil), - pipelineRt: newRespRoundTripper(http.StatusForbidden, nil), - peers: make(map[types.ID]Peer), - prober: probing.NewProber(nil), + Raft: &fakeRaft{}, + LeaderStats: stats.NewLeaderStats(""), + ErrorC: errorc, + streamRt: newRespRoundTripper(http.StatusForbidden, nil), + pipelineRt: newRespRoundTripper(http.StatusForbidden, nil), + peers: make(map[types.ID]Peer), + pipelineProber: probing.NewProber(nil), } tr.AddPeer(1, []string{"http://localhost:2380"}) defer tr.Stop() From f0d7043662bdbf5c8fcb9c0b3201e39a223a437f Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Fri, 17 Aug 2018 16:22:00 -0700 Subject: [PATCH 2/5] etcdserver/api/rafthttp: display roundtripper name in warnings Signed-off-by: Gyuho Lee --- etcdserver/api/rafthttp/probing_status.go | 16 ++++++++++++---- etcdserver/api/rafthttp/transport.go | 4 ++-- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/etcdserver/api/rafthttp/probing_status.go b/etcdserver/api/rafthttp/probing_status.go index a8199dfdfad..d97590730c2 100644 --- a/etcdserver/api/rafthttp/probing_status.go +++ b/etcdserver/api/rafthttp/probing_status.go @@ -17,10 +17,16 @@ package rafthttp import ( "time" + "github.com/prometheus/client_golang/prometheus" "github.com/xiang90/probing" "go.uber.org/zap" ) +const ( + // RoundTripperNameSnapshot is the name of round-tripper that sends merged snapshot message. + RoundTripperNameSnapshot = "ROUND_TRIPPER_SNAPSHOT" +) + var ( // proberInterval must be shorter than read timeout. // Or the connection will time-out. @@ -29,7 +35,7 @@ var ( statusErrorInterval = 5 * time.Second ) -func addPeerToProber(lg *zap.Logger, p probing.Prober, id string, us []string) { +func addPeerToProber(lg *zap.Logger, p probing.Prober, id string, us []string, roundTripperName string, rttSecProm *prometheus.HistogramVec) { hus := make([]string, len(us)) for i := range us { hus[i] = us[i] + ProbingPrefix @@ -47,10 +53,10 @@ func addPeerToProber(lg *zap.Logger, p probing.Prober, id string, us []string) { return } - go monitorProbingStatus(lg, s, id) + go monitorProbingStatus(lg, s, id, roundTripperName, rttSecProm) } -func monitorProbingStatus(lg *zap.Logger, s probing.Status, id string) { +func monitorProbingStatus(lg *zap.Logger, s probing.Status, id string, roundTripperName string, rttSecProm *prometheus.HistogramVec) { // set the first interval short to log error early. interval := statusErrorInterval for { @@ -60,6 +66,7 @@ func monitorProbingStatus(lg *zap.Logger, s probing.Status, id string) { if lg != nil { lg.Warn( "prober detected unhealthy status", + zap.String("round-tripper-name", roundTripperName), zap.String("remote-peer-id", id), zap.Duration("rtt", s.SRTT()), zap.Error(s.Err()), @@ -75,6 +82,7 @@ func monitorProbingStatus(lg *zap.Logger, s probing.Status, id string) { if lg != nil { lg.Warn( "prober found high clock drift", + zap.String("round-tripper-name", roundTripperName), zap.String("remote-peer-id", id), zap.Duration("clock-drift", s.SRTT()), zap.Duration("rtt", s.ClockDiff()), @@ -84,7 +92,7 @@ func monitorProbingStatus(lg *zap.Logger, s probing.Status, id string) { plog.Warningf("the clock difference against peer %s is too high [%v > %v]", id, s.ClockDiff(), time.Second) } } - rttSec.WithLabelValues(id).Observe(s.SRTT().Seconds()) + rttSecProm.WithLabelValues(id).Observe(s.SRTT().Seconds()) case <-s.StopNotify(): return diff --git a/etcdserver/api/rafthttp/transport.go b/etcdserver/api/rafthttp/transport.go index 78b15956520..30642ced648 100644 --- a/etcdserver/api/rafthttp/transport.go +++ b/etcdserver/api/rafthttp/transport.go @@ -317,7 +317,7 @@ func (t *Transport) AddPeer(id types.ID, us []string) { } fs := t.LeaderStats.Follower(id.String()) t.peers[id] = startPeer(t, urls, id, fs) - addPeerToProber(t.Logger, t.pipelineProber, id.String(), us) + addPeerToProber(t.Logger, t.pipelineProber, id.String(), us, RoundTripperNameSnapshot, rttSec) if t.Logger != nil { t.Logger.Info( @@ -389,7 +389,7 @@ func (t *Transport) UpdatePeer(id types.ID, us []string) { t.peers[id].update(urls) t.pipelineProber.Remove(id.String()) - addPeerToProber(t.Logger, t.pipelineProber, id.String(), us) + addPeerToProber(t.Logger, t.pipelineProber, id.String(), us, RoundTripperNameSnapshot, rttSec) if t.Logger != nil { t.Logger.Info( From 6c94debe6c293d7803ba9014f97bfa05b423e105 Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Fri, 17 Aug 2018 16:31:55 -0700 Subject: [PATCH 3/5] etcdserver/api/rafthttp: add Raft message stream prober In our production cluster, we found one TCP connection had >8-sec latencies to a remote peer, but "etcd_network_peer_round_trip_time_seconds" metrics shows <1-sec latency distribution, which means we weren't sampling enough, or all the latency spikes happen outside of snapshot pipeline connection. The later is most likely the case, since the cluster had leader elections from missing heartbeats. This PR adds another probing routine to monitor the connection for Raft message transports. Signed-off-by: Gyuho Lee --- etcdserver/api/rafthttp/probing_status.go | 3 +++ etcdserver/api/rafthttp/transport.go | 7 +++++++ 2 files changed, 10 insertions(+) diff --git a/etcdserver/api/rafthttp/probing_status.go b/etcdserver/api/rafthttp/probing_status.go index d97590730c2..4d10ec87d06 100644 --- a/etcdserver/api/rafthttp/probing_status.go +++ b/etcdserver/api/rafthttp/probing_status.go @@ -23,6 +23,9 @@ import ( ) const ( + // RoundTripperNameRaftMessage is the name of round-tripper that sends + // all other Raft messages, other than "snap.Message". + RoundTripperNameRaftMessage = "ROUND_TRIPPER_RAFT_MESSAGE" // RoundTripperNameSnapshot is the name of round-tripper that sends merged snapshot message. RoundTripperNameSnapshot = "ROUND_TRIPPER_SNAPSHOT" ) diff --git a/etcdserver/api/rafthttp/transport.go b/etcdserver/api/rafthttp/transport.go index 30642ced648..d6e55091c33 100644 --- a/etcdserver/api/rafthttp/transport.go +++ b/etcdserver/api/rafthttp/transport.go @@ -131,6 +131,7 @@ type Transport struct { peers map[types.ID]Peer // peers map pipelineProber probing.Prober + streamProber probing.Prober } func (t *Transport) Start() error { @@ -146,6 +147,7 @@ func (t *Transport) Start() error { t.remotes = make(map[types.ID]*remote) t.peers = make(map[types.ID]Peer) t.pipelineProber = probing.NewProber(t.pipelineRt) + t.streamProber = probing.NewProber(t.streamRt) // If client didn't provide dial retry frequency, use the default // (100ms backoff between attempts to create a new stream), @@ -222,6 +224,7 @@ func (t *Transport) Stop() { p.stop() } t.pipelineProber.RemoveAll() + t.streamProber.RemoveAll() if tr, ok := t.streamRt.(*http.Transport); ok { tr.CloseIdleConnections() } @@ -318,6 +321,7 @@ func (t *Transport) AddPeer(id types.ID, us []string) { fs := t.LeaderStats.Follower(id.String()) t.peers[id] = startPeer(t, urls, id, fs) addPeerToProber(t.Logger, t.pipelineProber, id.String(), us, RoundTripperNameSnapshot, rttSec) + addPeerToProber(t.Logger, t.streamProber, id.String(), us, RoundTripperNameRaftMessage, rttSec) if t.Logger != nil { t.Logger.Info( @@ -359,6 +363,7 @@ func (t *Transport) removePeer(id types.ID) { delete(t.peers, id) delete(t.LeaderStats.Followers, id.String()) t.pipelineProber.Remove(id.String()) + t.streamProber.Remove(id.String()) if t.Logger != nil { t.Logger.Info( @@ -390,6 +395,8 @@ func (t *Transport) UpdatePeer(id types.ID, us []string) { t.pipelineProber.Remove(id.String()) addPeerToProber(t.Logger, t.pipelineProber, id.String(), us, RoundTripperNameSnapshot, rttSec) + t.streamProber.Remove(id.String()) + addPeerToProber(t.Logger, t.streamProber, id.String(), us, RoundTripperNameRaftMessage, rttSec) if t.Logger != nil { t.Logger.Info( From b796e437e78ae0b04be7f210911ad706a41c6f97 Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Fri, 17 Aug 2018 16:38:37 -0700 Subject: [PATCH 4/5] etcdserver/api/rafthttp: add 'ConnectionType' label to round-trip metric We need to track which connection had high latency spikes. ``` etcd_network_peer_round_trip_time_seconds_bucket{ConnectionType="ROUND_TRIPPER_RAFT_MESSAGE",To="729934363faa4a24",le="0.0001"} 0 etcd_network_peer_round_trip_time_seconds_bucket{ConnectionType="ROUND_TRIPPER_RAFT_MESSAGE",To="729934363faa4a24",le="0.0002"} 1 etcd_network_peer_round_trip_time_seconds_bucket{ConnectionType="ROUND_TRIPPER_SNAPSHOT",To="729934363faa4a24",le="0.0001"} 0 etcd_network_peer_round_trip_time_seconds_bucket{ConnectionType="ROUND_TRIPPER_SNAPSHOT",To="729934363faa4a24",le="0.0002"} 1 ``` Signed-off-by: Gyuho Lee --- etcdserver/api/rafthttp/metrics.go | 2 +- etcdserver/api/rafthttp/probing_status.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/etcdserver/api/rafthttp/metrics.go b/etcdserver/api/rafthttp/metrics.go index 5f862e9decc..df4c6887201 100644 --- a/etcdserver/api/rafthttp/metrics.go +++ b/etcdserver/api/rafthttp/metrics.go @@ -143,7 +143,7 @@ var ( // highest bucket start of 0.0001 sec * 2^15 == 3.2768 sec Buckets: prometheus.ExponentialBuckets(0.0001, 2, 16), }, - []string{"To"}, + []string{"ConnectionType", "To"}, ) ) diff --git a/etcdserver/api/rafthttp/probing_status.go b/etcdserver/api/rafthttp/probing_status.go index 4d10ec87d06..8ece478b1d4 100644 --- a/etcdserver/api/rafthttp/probing_status.go +++ b/etcdserver/api/rafthttp/probing_status.go @@ -95,7 +95,7 @@ func monitorProbingStatus(lg *zap.Logger, s probing.Status, id string, roundTrip plog.Warningf("the clock difference against peer %s is too high [%v > %v]", id, s.ClockDiff(), time.Second) } } - rttSecProm.WithLabelValues(id).Observe(s.SRTT().Seconds()) + rttSecProm.WithLabelValues(roundTripperName, id).Observe(s.SRTT().Seconds()) case <-s.StopNotify(): return From 37cf84cb8f9dbaa30a225cf468daf49cfc961154 Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Fri, 17 Aug 2018 18:04:08 -0700 Subject: [PATCH 5/5] etcdserver/api/rafthttp: fix prober test failures Fix ``` panic: runtime error: invalid memory address or nil pointer dereference [recovered] panic: runtime error: invalid memory address or nil pointer dereference ``` Signed-off-by: Gyuho Lee --- etcdserver/api/rafthttp/transport_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/etcdserver/api/rafthttp/transport_test.go b/etcdserver/api/rafthttp/transport_test.go index ef208ef8ed9..7c8641fec4f 100644 --- a/etcdserver/api/rafthttp/transport_test.go +++ b/etcdserver/api/rafthttp/transport_test.go @@ -101,6 +101,7 @@ func TestTransportAdd(t *testing.T) { streamRt: &roundTripperRecorder{}, peers: make(map[types.ID]Peer), pipelineProber: probing.NewProber(nil), + streamProber: probing.NewProber(nil), } tr.AddPeer(1, []string{"http://localhost:2380"}) @@ -129,6 +130,7 @@ func TestTransportRemove(t *testing.T) { streamRt: &roundTripperRecorder{}, peers: make(map[types.ID]Peer), pipelineProber: probing.NewProber(nil), + streamProber: probing.NewProber(nil), } tr.AddPeer(1, []string{"http://localhost:2380"}) tr.RemovePeer(types.ID(1)) @@ -144,6 +146,7 @@ func TestTransportUpdate(t *testing.T) { tr := &Transport{ peers: map[types.ID]Peer{types.ID(1): peer}, pipelineProber: probing.NewProber(nil), + streamProber: probing.NewProber(nil), } u := "http://localhost:2380" tr.UpdatePeer(types.ID(1), []string{u}) @@ -163,6 +166,7 @@ func TestTransportErrorc(t *testing.T) { pipelineRt: newRespRoundTripper(http.StatusForbidden, nil), peers: make(map[types.ID]Peer), pipelineProber: probing.NewProber(nil), + streamProber: probing.NewProber(nil), } tr.AddPeer(1, []string{"http://localhost:2380"}) defer tr.Stop()