diff --git a/etcdserver/api/rafthttp/metrics.go b/etcdserver/api/rafthttp/metrics.go index 5f862e9decc..df4c6887201 100644 --- a/etcdserver/api/rafthttp/metrics.go +++ b/etcdserver/api/rafthttp/metrics.go @@ -143,7 +143,7 @@ var ( // highest bucket start of 0.0001 sec * 2^15 == 3.2768 sec Buckets: prometheus.ExponentialBuckets(0.0001, 2, 16), }, - []string{"To"}, + []string{"ConnectionType", "To"}, ) ) diff --git a/etcdserver/api/rafthttp/probing_status.go b/etcdserver/api/rafthttp/probing_status.go index a8199dfdfad..8ece478b1d4 100644 --- a/etcdserver/api/rafthttp/probing_status.go +++ b/etcdserver/api/rafthttp/probing_status.go @@ -17,10 +17,19 @@ package rafthttp import ( "time" + "github.com/prometheus/client_golang/prometheus" "github.com/xiang90/probing" "go.uber.org/zap" ) +const ( + // RoundTripperNameRaftMessage is the name of round-tripper that sends + // all other Raft messages, other than "snap.Message". + RoundTripperNameRaftMessage = "ROUND_TRIPPER_RAFT_MESSAGE" + // RoundTripperNameSnapshot is the name of round-tripper that sends merged snapshot message. + RoundTripperNameSnapshot = "ROUND_TRIPPER_SNAPSHOT" +) + var ( // proberInterval must be shorter than read timeout. // Or the connection will time-out. @@ -29,7 +38,7 @@ var ( statusErrorInterval = 5 * time.Second ) -func addPeerToProber(lg *zap.Logger, p probing.Prober, id string, us []string) { +func addPeerToProber(lg *zap.Logger, p probing.Prober, id string, us []string, roundTripperName string, rttSecProm *prometheus.HistogramVec) { hus := make([]string, len(us)) for i := range us { hus[i] = us[i] + ProbingPrefix @@ -47,10 +56,10 @@ func addPeerToProber(lg *zap.Logger, p probing.Prober, id string, us []string) { return } - go monitorProbingStatus(lg, s, id) + go monitorProbingStatus(lg, s, id, roundTripperName, rttSecProm) } -func monitorProbingStatus(lg *zap.Logger, s probing.Status, id string) { +func monitorProbingStatus(lg *zap.Logger, s probing.Status, id string, roundTripperName string, rttSecProm *prometheus.HistogramVec) { // set the first interval short to log error early. interval := statusErrorInterval for { @@ -60,6 +69,7 @@ func monitorProbingStatus(lg *zap.Logger, s probing.Status, id string) { if lg != nil { lg.Warn( "prober detected unhealthy status", + zap.String("round-tripper-name", roundTripperName), zap.String("remote-peer-id", id), zap.Duration("rtt", s.SRTT()), zap.Error(s.Err()), @@ -75,6 +85,7 @@ func monitorProbingStatus(lg *zap.Logger, s probing.Status, id string) { if lg != nil { lg.Warn( "prober found high clock drift", + zap.String("round-tripper-name", roundTripperName), zap.String("remote-peer-id", id), zap.Duration("clock-drift", s.SRTT()), zap.Duration("rtt", s.ClockDiff()), @@ -84,7 +95,7 @@ func monitorProbingStatus(lg *zap.Logger, s probing.Status, id string) { plog.Warningf("the clock difference against peer %s is too high [%v > %v]", id, s.ClockDiff(), time.Second) } } - rttSec.WithLabelValues(id).Observe(s.SRTT().Seconds()) + rttSecProm.WithLabelValues(roundTripperName, id).Observe(s.SRTT().Seconds()) case <-s.StopNotify(): return diff --git a/etcdserver/api/rafthttp/transport.go b/etcdserver/api/rafthttp/transport.go index e52acfceb93..d6e55091c33 100644 --- a/etcdserver/api/rafthttp/transport.go +++ b/etcdserver/api/rafthttp/transport.go @@ -130,7 +130,8 @@ type Transport struct { remotes map[types.ID]*remote // remotes map that helps newly joined member to catch up peers map[types.ID]Peer // peers map - prober probing.Prober + pipelineProber probing.Prober + streamProber probing.Prober } func (t *Transport) Start() error { @@ -145,7 +146,8 @@ func (t *Transport) Start() error { } t.remotes = make(map[types.ID]*remote) t.peers = make(map[types.ID]Peer) - t.prober = probing.NewProber(t.pipelineRt) + t.pipelineProber = probing.NewProber(t.pipelineRt) + t.streamProber = probing.NewProber(t.streamRt) // If client didn't provide dial retry frequency, use the default // (100ms backoff between attempts to create a new stream), @@ -221,7 +223,8 @@ func (t *Transport) Stop() { for _, p := range t.peers { p.stop() } - t.prober.RemoveAll() + t.pipelineProber.RemoveAll() + t.streamProber.RemoveAll() if tr, ok := t.streamRt.(*http.Transport); ok { tr.CloseIdleConnections() } @@ -317,7 +320,8 @@ func (t *Transport) AddPeer(id types.ID, us []string) { } fs := t.LeaderStats.Follower(id.String()) t.peers[id] = startPeer(t, urls, id, fs) - addPeerToProber(t.Logger, t.prober, id.String(), us) + addPeerToProber(t.Logger, t.pipelineProber, id.String(), us, RoundTripperNameSnapshot, rttSec) + addPeerToProber(t.Logger, t.streamProber, id.String(), us, RoundTripperNameRaftMessage, rttSec) if t.Logger != nil { t.Logger.Info( @@ -358,7 +362,8 @@ func (t *Transport) removePeer(id types.ID) { } delete(t.peers, id) delete(t.LeaderStats.Followers, id.String()) - t.prober.Remove(id.String()) + t.pipelineProber.Remove(id.String()) + t.streamProber.Remove(id.String()) if t.Logger != nil { t.Logger.Info( @@ -388,8 +393,10 @@ func (t *Transport) UpdatePeer(id types.ID, us []string) { } t.peers[id].update(urls) - t.prober.Remove(id.String()) - addPeerToProber(t.Logger, t.prober, id.String(), us) + t.pipelineProber.Remove(id.String()) + addPeerToProber(t.Logger, t.pipelineProber, id.String(), us, RoundTripperNameSnapshot, rttSec) + t.streamProber.Remove(id.String()) + addPeerToProber(t.Logger, t.streamProber, id.String(), us, RoundTripperNameRaftMessage, rttSec) if t.Logger != nil { t.Logger.Info( diff --git a/etcdserver/api/rafthttp/transport_test.go b/etcdserver/api/rafthttp/transport_test.go index 9a58ce74fee..7c8641fec4f 100644 --- a/etcdserver/api/rafthttp/transport_test.go +++ b/etcdserver/api/rafthttp/transport_test.go @@ -97,10 +97,11 @@ func TestTransportCutMend(t *testing.T) { func TestTransportAdd(t *testing.T) { ls := stats.NewLeaderStats("") tr := &Transport{ - LeaderStats: ls, - streamRt: &roundTripperRecorder{}, - peers: make(map[types.ID]Peer), - prober: probing.NewProber(nil), + LeaderStats: ls, + streamRt: &roundTripperRecorder{}, + peers: make(map[types.ID]Peer), + pipelineProber: probing.NewProber(nil), + streamProber: probing.NewProber(nil), } tr.AddPeer(1, []string{"http://localhost:2380"}) @@ -125,10 +126,11 @@ func TestTransportAdd(t *testing.T) { func TestTransportRemove(t *testing.T) { tr := &Transport{ - LeaderStats: stats.NewLeaderStats(""), - streamRt: &roundTripperRecorder{}, - peers: make(map[types.ID]Peer), - prober: probing.NewProber(nil), + LeaderStats: stats.NewLeaderStats(""), + streamRt: &roundTripperRecorder{}, + peers: make(map[types.ID]Peer), + pipelineProber: probing.NewProber(nil), + streamProber: probing.NewProber(nil), } tr.AddPeer(1, []string{"http://localhost:2380"}) tr.RemovePeer(types.ID(1)) @@ -142,8 +144,9 @@ func TestTransportRemove(t *testing.T) { func TestTransportUpdate(t *testing.T) { peer := newFakePeer() tr := &Transport{ - peers: map[types.ID]Peer{types.ID(1): peer}, - prober: probing.NewProber(nil), + peers: map[types.ID]Peer{types.ID(1): peer}, + pipelineProber: probing.NewProber(nil), + streamProber: probing.NewProber(nil), } u := "http://localhost:2380" tr.UpdatePeer(types.ID(1), []string{u}) @@ -156,13 +159,14 @@ func TestTransportUpdate(t *testing.T) { func TestTransportErrorc(t *testing.T) { errorc := make(chan error, 1) tr := &Transport{ - Raft: &fakeRaft{}, - LeaderStats: stats.NewLeaderStats(""), - ErrorC: errorc, - streamRt: newRespRoundTripper(http.StatusForbidden, nil), - pipelineRt: newRespRoundTripper(http.StatusForbidden, nil), - peers: make(map[types.ID]Peer), - prober: probing.NewProber(nil), + Raft: &fakeRaft{}, + LeaderStats: stats.NewLeaderStats(""), + ErrorC: errorc, + streamRt: newRespRoundTripper(http.StatusForbidden, nil), + pipelineRt: newRespRoundTripper(http.StatusForbidden, nil), + peers: make(map[types.ID]Peer), + pipelineProber: probing.NewProber(nil), + streamProber: probing.NewProber(nil), } tr.AddPeer(1, []string{"http://localhost:2380"}) defer tr.Stop()