-
Notifications
You must be signed in to change notification settings - Fork 175
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Consensus Compliance Engine updated to conform to Component
interface
#3121
Consensus Compliance Engine updated to conform to Component
interface
#3121
Conversation
* no longer conforms to deprecated Engine interface
Component
interfaceComponent
interface
…:onflow/flow-go into jordan/6263-compliance-component-pattern
Codecov Report
@@ Coverage Diff @@
## feature/active-pacemaker #3121 +/- ##
============================================================
- Coverage 54.84% 53.21% -1.63%
============================================================
Files 487 694 +207
Lines 39550 65832 +26282
============================================================
+ Hits 21692 35035 +13343
- Misses 16068 27870 +11802
- Partials 1790 2927 +1137
Flags with carried forward coverage won't be shown. Click here to find out more.
Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here. |
err = cs.engine.BroadcastProposalWithDelay(header, 0) | ||
require.Error(cs.T(), err, "should fail with missing payload") | ||
header.View-- | ||
<-util.AllClosed(broadcasted, submitted) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
<-util.AllClosed(broadcasted, submitted) | |
unittest.AssertClosesBefore(cs.T(), util.AllClosed(broadcasted, submitted), time.Second) |
require.Error(cs.T(), err, "should fail with missing payload") | ||
header.View-- | ||
<-util.AllClosed(broadcasted, submitted) | ||
cs.cancel() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same question here, I don't think we need to stop manually
return cs.engine.pendingVotes.(*engine.FifoMessageStore).Len() == 0 | ||
}, time.Second, time.Millisecond*10) | ||
// stop the engine | ||
cs.cancel() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to stop manually?
|
||
allViews := allFinalizedViews(t, nodes) | ||
for i, views := range allViews { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this for debugging ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes - will remove this...
* catch irrecoverable errors in separate goroutine * don't cancel engine context with individual cases
@@ -149,6 +149,13 @@ func AssertReturnsBefore(t *testing.T, f func(), duration time.Duration, msgAndA | |||
} | |||
} | |||
|
|||
// ClosedChannel returns a closed channel. | |||
func ClosedChannel() <-chan struct{} { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice.
engine/enqueue.go
Outdated
// MessageStore is the interface to abstract how messages are buffered in memory before | ||
// being handled by the engine | ||
// being handled by the engine. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// MessageStore is the interface to abstract how messages are buffered in memory before | |
// being handled by the engine | |
// being handled by the engine. | |
// MessageStore is the interface to abstract how messages are buffered in memory | |
// while waiting to be processed. |
mempool module.MempoolMetrics | ||
metrics module.EngineMetrics |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I personally find the variable naming here a bit ambiguous, because mempool
is not an actual mempool but the metrics collector for mempools. I think it would improve clarity, if this was reflected in the variable name.
mempool module.MempoolMetrics | |
metrics module.EngineMetrics | |
mempoolMetrics module.MempoolMetrics | |
componentMetrics module.EngineMetrics |
would suggest to also rename the corresponding variables in compliance.Core
respectively
metrics module.EngineMetrics metrics module.EngineMetrics
@@ -47,11 +51,17 @@ func (s *Stopper) AddNode(n *Node) *StopperConsumer { | |||
s.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we add a brief goDoc to the AddNode
method please? In particular, a comment regarding the intended usage of the return value would be great. To be honest, I don't understand why we return this consumer here and I also don't understand why it is called a StopperConsumer
. Is this legacy code? I feel we could just remove it ... but not sure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point - as far as I can tell the StopperConsumer
does nothing at all. Going to remove it.
stopConsumer := &StopperConsumer{} | ||
return stopConsumer | ||
} | ||
|
||
// WithStopFunc adds a function to use to stop all nodes (typically the cancel function of the context used to start them). | ||
func (s *Stopper) WithStopFunc(stop func()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function is not concurrency safe, which should be highlighted in the goDoc.
@@ -454,15 +434,15 @@ func (e *Engine) BroadcastTimeout(timeout *model.TimeoutObject) error { | |||
return | |||
} | |||
if err != nil { | |||
log.Error().Err(err).Msg("could not broadcast timeout") | |||
log.Err(err).Msg("could not broadcast timeout") | |||
return | |||
} | |||
log.Info().Msg("consensus timeout broadcast") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not quite sure, but my understanding is that the commonly used past tense of "broadcast" is also "broadcast". I think it would avoid ambiguity, if we clarify that the broadcast has completed:
log.Info().Msg("consensus timeout broadcast") | |
log.Info().Msg("consensus timeout was broadcast") |
similarly here, I would suggest to use a consistent working
flow-go/engine/consensus/compliance/engine.go
Line 536 in f7b85fb
log.Info().Msg("block proposal broadcasted")
// submit to broadcast proposal | ||
err := cs.engine.BroadcastProposalWithDelay(block.Header, 0) | ||
require.NoError(cs.T(), err, "header broadcast should pass") | ||
// unset chain and height to make sure they are correctly reconstructed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we put lines 141 - 172 into their own cs.Run
method to increase the structure of the test?
// TestOnFinalizedBlock tests if finalized block gets processed when send through `Engine`. | ||
// Tests the whole processing pipeline. | ||
func (cs *ComplianceSuite) TestOnFinalizedBlock() { | ||
func (cs *EngineSuite) TestOnFinalizedBlock() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For my learning: is the old implementation (below) actually buggy or just unnecessarily verbose?
flow-go/engine/consensus/compliance/engine_test.go
Lines 227 to 230 in f7c9481
require.Eventually(cs.T(), | |
func() bool { | |
return cs.pending.AssertCalled(cs.T(), "PruneByView", finalizedBlock.View) | |
}, time.Second, time.Millisecond*20) |
if e.me.NodeID() != proposal.Header.ProposerID { | ||
return fmt.Errorf("sanity check failed: attempted to provide another node's proposal (%x!=%x)", e.me.NodeID(), proposal.Header.ProposerID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this sanity check is also in the compliance Engine:
flow-go/engine/consensus/compliance/engine.go
Lines 454 to 457 in f7b85fb
// first, check that we are the proposer of the block | |
if header.ProposerID != e.me.NodeID() { | |
return fmt.Errorf("cannot broadcast proposal with non-local proposer (%x)", header.ProposerID) | |
} |
do we need it in both places?
... the check is probably cheap enough that it makes no practical difference, so why not 🤷
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I'm more inclined to just leave it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the nice refactoring, the great documentation and the test revisions.
In response to Yurii's comment, I have created PR #3243 (targeting this branch) that adds the missing error documentation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, great effort.
Co-authored-by: Alexander Hentschel <alex.hentschel@axiomzen.co>
bors merge |
3248: Collection`compliance`, `epochmgr` Engines Implement `component.Component` r=jordanschalm a=jordanschalm⚠️ Depends on #3121 This PR updates `collection/compliance.Engine` and `collection/epochmgr.Engine` to implement `component.Component` (#3121, but for collection nodes). Co-authored-by: Jordan Schalm <jordan@dapperlabs.com>
This PR updates
consensus/compliance
engine to implementcomponent.Component
.The
epochmgr
andcollection/compliance
engine will be updated in a subsequent PR.Other Changes
compliance
,sync
, andprovider
engines with specific interfacesprovider
engine from a deprecatedEngine
to aMessageProcessor
Stopper
slightly to work with context cancellation