diff --git a/gossip/election/adapter.go b/gossip/election/adapter.go index dd3025d16d5..d19e0d68f00 100644 --- a/gossip/election/adapter.go +++ b/gossip/election/adapter.go @@ -31,8 +31,8 @@ type msgImpl struct { msg *proto.GossipMessage } -func (mi *msgImpl) SenderID() string { - return string(mi.msg.GetLeadershipMsg().PkiID) +func (mi *msgImpl) SenderID() peerID { + return mi.msg.GetLeadershipMsg().PkiID } func (mi *msgImpl) IsProposal() bool { @@ -47,8 +47,8 @@ type peerImpl struct { member *discovery.NetworkMember } -func (pi *peerImpl) ID() string { - return string(pi.member.PKIid) +func (pi *peerImpl) ID() peerID { + return peerID(pi.member.PKIid) } type gossip interface { diff --git a/gossip/election/adapter_test.go b/gossip/election/adapter_test.go index 5c425e7436a..17ed177d33c 100644 --- a/gossip/election/adapter_test.go +++ b/gossip/election/adapter_test.go @@ -17,6 +17,7 @@ limitations under the License. package election import ( + "bytes" "fmt" "strings" "sync" @@ -91,7 +92,7 @@ func TestAdapterImpl_Peers(t *testing.T) { } for _, peer := range peers { - if _, exist := peersPKIDs[peer.ID()]; !exist { + if _, exist := peersPKIDs[string(peer.ID())]; !exist { t.Errorf("Peer %s PKID not found", peer.(*peerImpl).member.Endpoint) } } @@ -141,7 +142,7 @@ func TestAdapterImpl_Gossip(t *testing.T) { case msg := <-channels[fmt.Sprintf("Peer%d", 1)]: if !msg.IsDeclaration() { t.Error("Msg should be declaration") - } else if strings.Compare(msg.SenderID(), string(sender.self.PKIid)) != 0 { + } else if !bytes.Equal(msg.SenderID(), sender.self.PKIid) { t.Error("Msg Sender is wrong") } else { totalMsg++ @@ -149,7 +150,7 @@ func TestAdapterImpl_Gossip(t *testing.T) { case msg := <-channels[fmt.Sprintf("Peer%d", 2)]: if !msg.IsDeclaration() { t.Error("Msg should be declaration") - } else if strings.Compare(msg.SenderID(), string(sender.self.PKIid)) != 0 { + } else if !bytes.Equal(msg.SenderID(), sender.self.PKIid) { t.Error("Msg Sender is wrong") } else { totalMsg++ diff --git a/gossip/election/election.go b/gossip/election/election.go index f9e64aac982..67f42115f4e 100644 --- a/gossip/election/election.go +++ b/gossip/election/election.go @@ -17,6 +17,7 @@ limitations under the License. package election import ( + "bytes" "fmt" "sync" "sync/atomic" @@ -113,16 +114,18 @@ type LeaderElectionService interface { Stop() } +type peerID []byte + // Peer describes a remote peer type Peer interface { // ID returns the ID of the peer - ID() string + ID() peerID } // Msg describes a message sent from a remote peer type Msg interface { // SenderID returns the ID of the peer sent the message - SenderID() string + SenderID() peerID // IsProposal returns whether this message is a leadership proposal IsProposal() bool // IsDeclaration returns whether this message is a leadership declaration @@ -138,7 +141,7 @@ func NewLeaderElectionService(adapter LeaderElectionAdapter, id string, callback panic(fmt.Errorf("Empty id")) } le := &leaderElectionSvcImpl{ - id: id, + id: peerID(id), proposals: util.NewSet(), adapter: adapter, stopChan: make(chan struct{}, 1), @@ -157,7 +160,7 @@ func NewLeaderElectionService(adapter LeaderElectionAdapter, id string, callback // leaderElectionSvcImpl is an implementation of a LeaderElectionService type leaderElectionSvcImpl struct { - id string + id peerID proposals *util.Set sync.Mutex stopChan chan struct{} @@ -209,13 +212,13 @@ func (le *leaderElectionSvcImpl) handleMessage(msg Msg) { defer le.Unlock() if msg.IsProposal() { - le.proposals.Add(msg.SenderID()) + le.proposals.Add(string(msg.SenderID())) } else if msg.IsDeclaration() { atomic.StoreInt32(&le.leaderExists, int32(1)) if le.sleeping && len(le.interruptChan) == 0 { le.interruptChan <- struct{}{} } - if msg.SenderID() < le.id && le.IsLeader() { + if bytes.Compare(msg.SenderID(), le.id) < 0 && le.IsLeader() { le.stopBeingLeader() } } else { @@ -281,7 +284,7 @@ func (le *leaderElectionSvcImpl) leaderElection() { // for being a leader for _, o := range le.proposals.ToArray() { id := o.(string) - if id < le.id { + if bytes.Compare(peerID(id), le.id) < 0 { return } } @@ -344,9 +347,9 @@ func (le *leaderElectionSvcImpl) drainInterruptChannel() { } // isAlive returns whether peer of given id is considered alive -func (le *leaderElectionSvcImpl) isAlive(id string) bool { +func (le *leaderElectionSvcImpl) isAlive(id peerID) bool { for _, p := range le.adapter.Peers() { - if p.ID() == id { + if bytes.Equal(p.ID(), id) { return true } } diff --git a/gossip/election/election_test.go b/gossip/election/election_test.go index 0a9380e76ac..5c1d251f999 100644 --- a/gossip/election/election_test.go +++ b/gossip/election/election_test.go @@ -45,8 +45,8 @@ type msg struct { proposal bool } -func (m *msg) SenderID() string { - return m.sender +func (m *msg) SenderID() peerID { + return peerID(m.sender) } func (m *msg) IsProposal() bool { @@ -76,8 +76,8 @@ func (p *peer) On(methodName string, arguments ...interface{}) *mock.Call { return p.Mock.On(methodName, arguments...) } -func (p *peer) ID() string { - return p.id +func (p *peer) ID() peerID { + return peerID(p.id) } func (p *peer) Gossip(m Msg) {