Skip to content

Commit

Permalink
Merge "Leader election log"
Browse files Browse the repository at this point in the history
  • Loading branch information
christo4ferris authored and Gerrit Code Review committed Mar 1, 2017
2 parents 6325834 + b5e73c2 commit 6a3f766
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 20 deletions.
8 changes: 4 additions & 4 deletions gossip/election/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ type msgImpl struct {
msg *proto.GossipMessage
}

func (mi *msgImpl) SenderID() string {
return string(mi.msg.GetLeadershipMsg().PkiID)
func (mi *msgImpl) SenderID() peerID {
return mi.msg.GetLeadershipMsg().PkiID
}

func (mi *msgImpl) IsProposal() bool {
Expand All @@ -47,8 +47,8 @@ type peerImpl struct {
member *discovery.NetworkMember
}

func (pi *peerImpl) ID() string {
return string(pi.member.PKIid)
func (pi *peerImpl) ID() peerID {
return peerID(pi.member.PKIid)
}

type gossip interface {
Expand Down
7 changes: 4 additions & 3 deletions gossip/election/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package election

import (
"bytes"
"fmt"
"strings"
"sync"
Expand Down Expand Up @@ -91,7 +92,7 @@ func TestAdapterImpl_Peers(t *testing.T) {
}

for _, peer := range peers {
if _, exist := peersPKIDs[peer.ID()]; !exist {
if _, exist := peersPKIDs[string(peer.ID())]; !exist {
t.Errorf("Peer %s PKID not found", peer.(*peerImpl).member.Endpoint)
}
}
Expand Down Expand Up @@ -141,15 +142,15 @@ func TestAdapterImpl_Gossip(t *testing.T) {
case msg := <-channels[fmt.Sprintf("Peer%d", 1)]:
if !msg.IsDeclaration() {
t.Error("Msg should be declaration")
} else if strings.Compare(msg.SenderID(), string(sender.self.PKIid)) != 0 {
} else if !bytes.Equal(msg.SenderID(), sender.self.PKIid) {
t.Error("Msg Sender is wrong")
} else {
totalMsg++
}
case msg := <-channels[fmt.Sprintf("Peer%d", 2)]:
if !msg.IsDeclaration() {
t.Error("Msg should be declaration")
} else if strings.Compare(msg.SenderID(), string(sender.self.PKIid)) != 0 {
} else if !bytes.Equal(msg.SenderID(), sender.self.PKIid) {
t.Error("Msg Sender is wrong")
} else {
totalMsg++
Expand Down
21 changes: 12 additions & 9 deletions gossip/election/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package election

import (
"bytes"
"fmt"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -113,16 +114,18 @@ type LeaderElectionService interface {
Stop()
}

type peerID []byte

// Peer describes a remote peer
type Peer interface {
// ID returns the ID of the peer
ID() string
ID() peerID
}

// Msg describes a message sent from a remote peer
type Msg interface {
// SenderID returns the ID of the peer sent the message
SenderID() string
SenderID() peerID
// IsProposal returns whether this message is a leadership proposal
IsProposal() bool
// IsDeclaration returns whether this message is a leadership declaration
Expand All @@ -138,7 +141,7 @@ func NewLeaderElectionService(adapter LeaderElectionAdapter, id string, callback
panic(fmt.Errorf("Empty id"))
}
le := &leaderElectionSvcImpl{
id: id,
id: peerID(id),
proposals: util.NewSet(),
adapter: adapter,
stopChan: make(chan struct{}, 1),
Expand All @@ -157,7 +160,7 @@ func NewLeaderElectionService(adapter LeaderElectionAdapter, id string, callback

// leaderElectionSvcImpl is an implementation of a LeaderElectionService
type leaderElectionSvcImpl struct {
id string
id peerID
proposals *util.Set
sync.Mutex
stopChan chan struct{}
Expand Down Expand Up @@ -209,13 +212,13 @@ func (le *leaderElectionSvcImpl) handleMessage(msg Msg) {
defer le.Unlock()

if msg.IsProposal() {
le.proposals.Add(msg.SenderID())
le.proposals.Add(string(msg.SenderID()))
} else if msg.IsDeclaration() {
atomic.StoreInt32(&le.leaderExists, int32(1))
if le.sleeping && len(le.interruptChan) == 0 {
le.interruptChan <- struct{}{}
}
if msg.SenderID() < le.id && le.IsLeader() {
if bytes.Compare(msg.SenderID(), le.id) < 0 && le.IsLeader() {
le.stopBeingLeader()
}
} else {
Expand Down Expand Up @@ -281,7 +284,7 @@ func (le *leaderElectionSvcImpl) leaderElection() {
// for being a leader
for _, o := range le.proposals.ToArray() {
id := o.(string)
if id < le.id {
if bytes.Compare(peerID(id), le.id) < 0 {
return
}
}
Expand Down Expand Up @@ -344,9 +347,9 @@ func (le *leaderElectionSvcImpl) drainInterruptChannel() {
}

// isAlive returns whether peer of given id is considered alive
func (le *leaderElectionSvcImpl) isAlive(id string) bool {
func (le *leaderElectionSvcImpl) isAlive(id peerID) bool {
for _, p := range le.adapter.Peers() {
if p.ID() == id {
if bytes.Equal(p.ID(), id) {
return true
}
}
Expand Down
8 changes: 4 additions & 4 deletions gossip/election/election_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ type msg struct {
proposal bool
}

func (m *msg) SenderID() string {
return m.sender
func (m *msg) SenderID() peerID {
return peerID(m.sender)
}

func (m *msg) IsProposal() bool {
Expand Down Expand Up @@ -76,8 +76,8 @@ func (p *peer) On(methodName string, arguments ...interface{}) *mock.Call {
return p.Mock.On(methodName, arguments...)
}

func (p *peer) ID() string {
return p.id
func (p *peer) ID() peerID {
return peerID(p.id)
}

func (p *peer) Gossip(m Msg) {
Expand Down

0 comments on commit 6a3f766

Please sign in to comment.