Skip to content

Commit

Permalink
Periodically resend RoundChanges for same round (ethereum#800)
Browse files Browse the repository at this point in the history
* Refactor newRoundChangeTimer

* Resend round change periodically

* Add test, make config params

* Review comments
  • Loading branch information
timmoreton committed Jan 9, 2020
1 parent c368910 commit ae12ed7
Show file tree
Hide file tree
Showing 8 changed files with 234 additions and 67 deletions.
2 changes: 1 addition & 1 deletion consensus/istanbul/backend/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (sb *Backend) HandleMsg(addr common.Address, msg p2p.Msg, peer consensus.Pe
sb.coreMu.Lock()
defer sb.coreMu.Unlock()

sb.logger.Trace("HandleMsg called", "address", addr, "m", msg, "peer.Node()", peer.Node())
sb.logger.Trace("HandleMsg called", "address", addr, "m", msg, "peer", peer.Node())

if sb.isIstanbulMsg(msg) {
if (!sb.coreStarted && !sb.config.Proxy) && (msg.Code == istanbulConsensusMsg) {
Expand Down
38 changes: 22 additions & 16 deletions consensus/istanbul/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,16 @@ const (
)

type Config struct {
RequestTimeout uint64 `toml:",omitempty"` // The timeout for each Istanbul round in milliseconds.
BlockPeriod uint64 `toml:",omitempty"` // Default minimum difference between two consecutive block's timestamps in second
ProposerPolicy ProposerPolicy `toml:",omitempty"` // The policy for proposer selection
Epoch uint64 `toml:",omitempty"` // The number of blocks after which to checkpoint and reset the pending votes
LookbackWindow uint64 `toml:",omitempty"` // The window of blocks in which a validator is forgived from voting
ValidatorEnodeDBPath string `toml:",omitempty"` // The location for the validator enodes DB
RoundStateDBPath string `toml:",omitempty"` // The location for the round states DB
RequestTimeout uint64 `toml:",omitempty"` // The timeout for each Istanbul round in milliseconds.
TimeoutBackoffFactor uint64 `toml:",omitempty"` // Timeout at subsequent rounds is: RequestTimeout + 2**round * TimeoutBackoffFactor (in milliseconds)
MinResendRoundChangeTimeout uint64 `toml:",omitempty"` // Minimum interval with which to resend RoundChange messages for same round
MaxResendRoundChangeTimeout uint64 `toml:",omitempty"` // Maximum interval with which to resend RoundChange messages for same round
BlockPeriod uint64 `toml:",omitempty"` // Default minimum difference between two consecutive block's timestamps in second
ProposerPolicy ProposerPolicy `toml:",omitempty"` // The policy for proposer selection
Epoch uint64 `toml:",omitempty"` // The number of blocks after which to checkpoint and reset the pending votes
LookbackWindow uint64 `toml:",omitempty"` // The window of blocks in which a validator is forgived from voting
ValidatorEnodeDBPath string `toml:",omitempty"` // The location for the validator enodes DB
RoundStateDBPath string `toml:",omitempty"` // The location for the round states DB

// Proxy Configs
Proxy bool `toml:",omitempty"` // Specifies if this node is a proxy
Expand All @@ -49,13 +52,16 @@ type Config struct {
}

var DefaultConfig = &Config{
RequestTimeout: 3000,
BlockPeriod: 1,
ProposerPolicy: ShuffledRoundRobin,
Epoch: 30000,
LookbackWindow: 12,
ValidatorEnodeDBPath: "validatorenodes",
RoundStateDBPath: "roundstates",
Proxy: false,
Proxied: false,
RequestTimeout: 3000,
TimeoutBackoffFactor: 1000,
MinResendRoundChangeTimeout: 15 * 1000,
MaxResendRoundChangeTimeout: 2 * 60 * 1000,
BlockPeriod: 1,
ProposerPolicy: ShuffledRoundRobin,
Epoch: 30000,
LookbackWindow: 12,
ValidatorEnodeDBPath: "validatorenodes",
RoundStateDBPath: "roundstates",
Proxy: false,
Proxied: false,
}
104 changes: 79 additions & 25 deletions consensus/istanbul/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,14 @@ type core struct {
logger log.Logger
selectProposer istanbul.ProposerSelector

backend istanbul.Backend
events *event.TypeMuxSubscription
finalCommittedSub *event.TypeMuxSubscription
timeoutSub *event.TypeMuxSubscription
futurePreprepareTimer *time.Timer
backend istanbul.Backend
events *event.TypeMuxSubscription
finalCommittedSub *event.TypeMuxSubscription
timeoutSub *event.TypeMuxSubscription

futurePreprepareTimer *time.Timer
resendRoundChangeMessageTimer *time.Timer
roundChangeTimer *time.Timer

validateFn func([]byte, []byte) (common.Address, error)

Expand All @@ -92,8 +95,7 @@ type core struct {
current RoundState
handlerWg *sync.WaitGroup

roundChangeSet *roundChangeSet
roundChangeTimer *time.Timer
roundChangeSet *roundChangeSet

pendingRequests *prque.Prque
pendingRequestsMu *sync.Mutex
Expand Down Expand Up @@ -392,7 +394,7 @@ func (c *core) startNewRound(round *big.Int) error {
if roundChange && c.isProposer() && request != nil {
c.sendPreprepare(request, roundChangeCertificate)
}
c.newRoundChangeTimer()
c.resetRoundChangeTimer()

logger.Debug("New round", "new_round", newView.Round, "new_seq", newView.Sequence, "new_proposer", c.current.Proposer(), "valSet", c.current.ValidatorSet().List(), "size", c.current.ValidatorSet().Size(), "isProposer", c.isProposer())
return nil
Expand All @@ -409,10 +411,6 @@ func (c *core) waitForDesiredRound(r *big.Int) error {
}

logger.Debug("Round Change: Waiting for desired round")
desiredView := &istanbul.View{
Sequence: new(big.Int).Set(c.current.Sequence()),
Round: new(big.Int).Set(r),
}

// Perform all of the updates
_, headAuthor := c.backend.GetCurrentHeadBlockAndAuthor()
Expand All @@ -422,13 +420,13 @@ func (c *core) waitForDesiredRound(r *big.Int) error {
return err
}

c.newRoundChangeTimerForView(desiredView)
c.resetRoundChangeTimer()

// Process Backlog Messages
c.backlog.updateState(c.current.View(), c.current.State())

// Send round change
c.sendRoundChange(r)
c.sendRoundChange()
return nil
}

Expand Down Expand Up @@ -512,36 +510,92 @@ func (c *core) isProposer() bool {
func (c *core) stopFuturePreprepareTimer() {
if c.futurePreprepareTimer != nil {
c.futurePreprepareTimer.Stop()
c.futurePreprepareTimer = nil
}
}

func (c *core) stopTimer() {
c.stopFuturePreprepareTimer()
func (c *core) stopRoundChangeTimer() {
if c.roundChangeTimer != nil {
c.roundChangeTimer.Stop()
c.roundChangeTimer = nil
}
}

func (c *core) newRoundChangeTimer() {
c.newRoundChangeTimerForView(c.current.View())
func (c *core) stopResendRoundChangeTimer() {
if c.resendRoundChangeMessageTimer != nil {
c.resendRoundChangeMessageTimer.Stop()
c.resendRoundChangeMessageTimer = nil
}
}

func (c *core) newRoundChangeTimerForView(view *istanbul.View) {
c.stopTimer()
func (c *core) stopAllTimers() {
c.stopFuturePreprepareTimer()
c.stopRoundChangeTimer()
c.stopResendRoundChangeTimer()
}

timeout := time.Duration(c.config.RequestTimeout) * time.Millisecond
round := view.Round.Uint64()
func (c *core) getRoundChangeTimeout() time.Duration {
baseTimeout := time.Duration(c.config.RequestTimeout) * time.Millisecond
round := c.current.DesiredRound().Uint64()
if round == 0 {
// timeout for first round takes into account expected block period
timeout += time.Duration(c.config.BlockPeriod) * time.Second
return baseTimeout + time.Duration(c.config.BlockPeriod)*time.Second
} else {
// timeout for subsequent rounds adds an exponential backoff.
timeout += time.Duration(math.Pow(2, float64(round))) * time.Second
return baseTimeout + time.Duration(math.Pow(2, float64(round)))*time.Duration(c.config.TimeoutBackoffFactor)*time.Millisecond
}
}

// Reset then set the timer that causes a timeoutAndMoveToNextRoundEvent to be processed.
// This may also reset the timer for the next resendRoundChangeEvent.
func (c *core) resetRoundChangeTimer() {
// Stop all timers here since all 'resends' happen within the interval of a round's timeout.
// (Races are handled anyway by checking the seq and desired round haven't changed between
// submitting and processing events).
c.stopAllTimers()

view := &istanbul.View{Sequence: c.current.Sequence(), Round: c.current.DesiredRound()}
timeout := c.getRoundChangeTimeout()
c.roundChangeTimer = time.AfterFunc(timeout, func() {
c.sendEvent(timeoutEvent{&istanbul.View{Sequence: view.Sequence, Round: view.Round}})
c.sendEvent(timeoutAndMoveToNextRoundEvent{view})
})

if c.current.DesiredRound().Cmp(common.Big1) > 0 {
logger := c.newLogger("func", "resetRoundChangeTimer")
logger.Info("Reset round change timer", "timeout_ms", timeout/time.Millisecond)
}

c.resetResendRoundChangeTimer()
}

// Reset then, if in StateWaitingForNewRound and on round whose timeout is greater than MinResendRoundChangeTimeout,
// set a timer that is at most MaxResendRoundChangeTimeout that causes a resendRoundChangeEvent to be processed.
func (c *core) resetResendRoundChangeTimer() {
c.stopResendRoundChangeTimer()
if c.current.State() == StateWaitingForNewRound {
minResendTimeout := time.Duration(c.config.MinResendRoundChangeTimeout) * time.Millisecond
resendTimeout := c.getRoundChangeTimeout() / 2
if resendTimeout < minResendTimeout {
return
}
maxResendTimeout := time.Duration(c.config.MaxResendRoundChangeTimeout) * time.Millisecond
if resendTimeout > maxResendTimeout {
resendTimeout = maxResendTimeout
}
view := &istanbul.View{Sequence: c.current.Sequence(), Round: c.current.DesiredRound()}
c.resendRoundChangeMessageTimer = time.AfterFunc(resendTimeout, func() {
c.sendEvent(resendRoundChangeEvent{view})
})
}
}

// Rebroadcast RoundChange message for desired round if still in StateWaitingForNewRound.
// Do not advance desired round. Then clear/reset timer so we may rebroadcast again.
func (c *core) resendRoundChangeMessage() {
if c.current.State() == StateWaitingForNewRound {
c.sendRoundChange()
}
c.resetResendRoundChangeTimer()
}

func (c *core) checkValidatorSignature(data []byte, sig []byte) (common.Address, error) {
Expand Down
5 changes: 4 additions & 1 deletion consensus/istanbul/core/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ type backlogEvent struct {
msg *istanbul.Message
}

type timeoutEvent struct {
type resendRoundChangeEvent struct {
view *istanbul.View
}
type timeoutAndMoveToNextRoundEvent struct {
view *istanbul.View
}
38 changes: 30 additions & 8 deletions consensus/istanbul/core/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ func (c *core) Start() error {
c.current = roundState
c.roundChangeSet = newRoundChangeSet(c.current.ValidatorSet())

c.newRoundChangeTimer()
// Reset the Round Change timer for the current round to timeout.
// (If we've restored RoundState such that we are in StateWaitingForRoundChange,
// this may also start a timer to send a repeat round change message.)
c.resetRoundChangeTimer()

// Process backlog
c.processPendingRequests()
Expand All @@ -50,7 +53,7 @@ func (c *core) Start() error {

// Stop implements core.Engine.Stop
func (c *core) Stop() error {
c.stopTimer()
c.stopAllTimers()
c.unsubscribeEvents()

// Make sure the handler goroutine exits
Expand All @@ -76,7 +79,8 @@ func (c *core) subscribeEvents() {
backlogEvent{},
)
c.timeoutSub = c.backend.EventMux().Subscribe(
timeoutEvent{},
timeoutAndMoveToNextRoundEvent{},
resendRoundChangeEvent{},
)
c.finalCommittedSub = c.backend.EventMux().Subscribe(
istanbul.FinalCommittedEvent{},
Expand Down Expand Up @@ -133,9 +137,13 @@ func (c *core) handleEvents() {
return
}
switch ev := event.Data.(type) {
case timeoutEvent:
if err := c.handleTimeoutMsg(ev.view); err != nil {
logger.Error("Error on handleTimeoutMsg", "err", err)
case timeoutAndMoveToNextRoundEvent:
if err := c.handleTimeoutAndMoveToNextRound(ev.view); err != nil {
logger.Error("Error on handleTimeoutAndMoveToNextRound", "err", err)
}
case resendRoundChangeEvent:
if err := c.handleResendRoundChangeEvent(ev.view); err != nil {
logger.Error("Error on handleResendRoundChangeEvent", "err", err)
}
}
case event, ok := <-c.finalCommittedSub.Chan():
Expand Down Expand Up @@ -207,9 +215,10 @@ func (c *core) handleCheckedMsg(msg *istanbul.Message, src istanbul.Validator) e
return errInvalidMessage
}

func (c *core) handleTimeoutMsg(desiredView *istanbul.View) error {
logger := c.newLogger("func", "handleTimeoutMsg", "set_at_seq", desiredView.Sequence, "set_at_desiredRound", desiredView.Round)
func (c *core) handleTimeoutAndMoveToNextRound(desiredView *istanbul.View) error {
logger := c.newLogger("func", "handleTimeoutAndMoveToNextRound", "set_at_seq", desiredView.Sequence, "set_at_desiredRound", desiredView.Round)

// Avoid races where message is enqueued then a later event advances sequence or desired round.
if c.current.Sequence().Cmp(desiredView.Sequence) != 0 || c.current.DesiredRound().Cmp(desiredView.Round) != 0 {
logger.Trace("Timed out but now on a different view")
return nil
Expand All @@ -219,3 +228,16 @@ func (c *core) handleTimeoutMsg(desiredView *istanbul.View) error {
nextRound := new(big.Int).Add(desiredView.Round, common.Big1)
return c.waitForDesiredRound(nextRound)
}

func (c *core) handleResendRoundChangeEvent(desiredView *istanbul.View) error {
logger := c.newLogger("func", "handleResendRoundChangeEvent", "set_at_seq", desiredView.Sequence, "set_at_desiredRound", desiredView.Round)

// Avoid races where message is enqueued then a later event advances sequence or desired round.
if c.current.Sequence().Cmp(desiredView.Sequence) != 0 || c.current.DesiredRound().Cmp(desiredView.Round) != 0 {
logger.Trace("Timed out but now on a different view")
return nil
}

c.resendRoundChangeMessage()
return nil
}
17 changes: 6 additions & 11 deletions consensus/istanbul/core/roundchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,11 @@ import (
"github.com/ethereum/go-ethereum/consensus/istanbul"
)

// sendRoundChange sends the ROUND CHANGE message with the given round
func (c *core) sendRoundChange(round *big.Int) {
logger := c.newLogger("func", "sendRoundChange", "target_round", round)
// sendRoundChange broadcasts a ROUND CHANGE message with the current desired round.
func (c *core) sendRoundChange() {
logger := c.newLogger("func", "sendRoundChange")

if c.current.View().Round.Cmp(round) >= 0 {
logger.Warn("Cannot send round change for previous round")
return
}

msg, err := c.buildRoundChangeMsg(round)
msg, err := c.buildRoundChangeMsg(c.current.DesiredRound())
if err != nil {
logger.Error("Could not build round change message", "err", msg)
return
Expand All @@ -45,9 +40,9 @@ func (c *core) sendRoundChange(round *big.Int) {
c.broadcast(msg)
}

// sendRoundChange sends a ROUND CHANGE message for the current round back to a single address
// sendRoundChange sends a ROUND CHANGE message for the current desired round back to a single address
func (c *core) sendRoundChangeAgain(addr common.Address) {
logger := c.newLogger("func", "sendRoundChange", "desired_round", c.current.DesiredRound(), "to", addr)
logger := c.newLogger("func", "sendRoundChangeAgain", "to", addr)

msg, err := c.buildRoundChangeMsg(c.current.DesiredRound())
if err != nil {
Expand Down
Loading

0 comments on commit ae12ed7

Please sign in to comment.