Skip to content

Commit

Permalink
peer: simplify MsgRouter with new fn tools
Browse files Browse the repository at this point in the history
  • Loading branch information
ProofOfKeags committed Aug 14, 2024
1 parent 014fe89 commit 7597854
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 68 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ require (
github.com/lightningnetwork/lightning-onion v1.2.1-0.20240712235311-98bd56499dfb
github.com/lightningnetwork/lnd/cert v1.2.2
github.com/lightningnetwork/lnd/clock v1.1.1
github.com/lightningnetwork/lnd/fn v1.2.0
github.com/lightningnetwork/lnd/fn v1.2.1
github.com/lightningnetwork/lnd/healthcheck v1.2.4
github.com/lightningnetwork/lnd/kvdb v1.4.8
github.com/lightningnetwork/lnd/queue v1.1.1
Expand Down
98 changes: 31 additions & 67 deletions peer/msg_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,50 +64,28 @@ type MsgRouter interface {
Stop()
}

// queryMsg is a message sent into the main event loop to query or modify the
// internal state.
type queryMsg[Q any, R any] struct {
query Q

respChan chan fn.Either[R, error]
}

// sendQuery sends a query to the main event loop, and returns the response.
func sendQuery[Q any, R any](sendChan chan queryMsg[Q, R], queryArg Q,
quit chan struct{}) fn.Either[R, error] {
func sendQuery[Q any, R any](sendChan chan fn.Req[Q, R], queryArg Q,
quit chan struct{}) fn.Result[R] {

query := queryMsg[Q, R]{
query: queryArg,
respChan: make(chan fn.Either[R, error], 1),
}
query, respChan := fn.NewReq[Q, R](queryArg)

if !fn.SendOrQuit(sendChan, query, quit) {
return fn.NewRight[R](fmt.Errorf("router shutting down"))
}

resp, err := fn.RecvResp(query.respChan, nil, quit)
if err != nil {
return fn.NewRight[R](err)
return fn.Errf[R]("router shutting down")
}

return resp
return fn.NewResult(fn.RecvResp(respChan, nil, quit))
}

// sendQueryErr is a helper function based on sendQuery that can be used when
// the query only needs an error response.
func sendQueryErr[Q any](sendChan chan queryMsg[Q, error], queryArg Q,
func sendQueryErr[Q any](sendChan chan fn.Req[Q, error], queryArg Q,
quitChan chan struct{}) error {

var err error
resp := sendQuery(sendChan, queryArg, quitChan)
resp.WhenRight(func(e error) {
err = e
})
resp.WhenLeft(func(e error) {
err = e
})

return err
return fn.ElimEither(
fn.Iden, fn.Iden,
sendQuery(sendChan, queryArg, quitChan).Either,
)
}

// EndpointsMap is a map of all registered endpoints.
Expand All @@ -121,19 +99,19 @@ type MultiMsgRouter struct {
stopOnce sync.Once

// registerChan is the channel that all new endpoints will be sent to.
registerChan chan queryMsg[MsgEndpoint, error]
registerChan chan fn.Req[MsgEndpoint, error]

// unregisterChan is the channel that all endpoints that are to be
// removed are sent to.
unregisterChan chan queryMsg[EndPointName, error]
unregisterChan chan fn.Req[EndPointName, error]

// msgChan is the channel that all messages will be sent to for
// processing.
msgChan chan queryMsg[lnwire.Message, error]
msgChan chan fn.Req[lnwire.Message, error]

// endpointsQueries is a channel that all queries to the endpoints map
// will be sent to.
endpointQueries chan queryMsg[MsgEndpoint, EndpointsMap]
endpointQueries chan fn.Req[MsgEndpoint, EndpointsMap]

wg sync.WaitGroup
quit chan struct{}
Expand All @@ -142,10 +120,10 @@ type MultiMsgRouter struct {
// NewMultiMsgRouter creates a new instance of a peer message router.
func NewMultiMsgRouter() *MultiMsgRouter {
return &MultiMsgRouter{
registerChan: make(chan queryMsg[MsgEndpoint, error]),
unregisterChan: make(chan queryMsg[EndPointName, error]),
msgChan: make(chan queryMsg[lnwire.Message, error]),
endpointQueries: make(chan queryMsg[MsgEndpoint, EndpointsMap]),
registerChan: make(chan fn.Req[MsgEndpoint, error]),
unregisterChan: make(chan fn.Req[EndPointName, error]),
msgChan: make(chan fn.Req[lnwire.Message, error]),
endpointQueries: make(chan fn.Req[MsgEndpoint, EndpointsMap]),
quit: make(chan struct{}),
}
}
Expand Down Expand Up @@ -189,14 +167,10 @@ func (p *MultiMsgRouter) RouteMsg(msg lnwire.Message) error {

// Endpoints returns a list of all registered endpoints.
func (p *MultiMsgRouter) Endpoints() EndpointsMap {
resp := sendQuery(p.endpointQueries, nil, p.quit)

var endpoints EndpointsMap
resp.WhenLeft(func(e EndpointsMap) {
endpoints = e
})

return endpoints
return fn.ElimEither(
fn.Iden, fn.Const[EndpointsMap, error](nil),
sendQuery(p.endpointQueries, nil, p.quit).Either,
)
}

// msgRouter is the main goroutine that handles all incoming messages.
Expand All @@ -211,7 +185,7 @@ func (p *MultiMsgRouter) msgRouter() {
// A new endpoint was just sent in, so we'll add it to our set
// of registered endpoints.
case newEndpointMsg := <-p.registerChan:
endpoint := newEndpointMsg.query
endpoint := newEndpointMsg.Request

peerLog.Infof("MsgRouter: registering new "+
"MsgEndpoint(%s)", endpoint.Name())
Expand All @@ -223,36 +197,29 @@ func (p *MultiMsgRouter) msgRouter() {
"duplicate endpoint: %v",
endpoint.Name())

newEndpointMsg.respChan <- fn.NewRight[error](
ErrDuplicateEndpoint,
)
newEndpointMsg.Resolve(ErrDuplicateEndpoint)

continue
}

endpoints[endpoint.Name()] = endpoint

// TODO(roasbeef): put in method?
newEndpointMsg.respChan <- fn.NewRight[error, error](
nil,
)
newEndpointMsg.Resolve(nil)

// A request to unregister an endpoint was just sent in, so
// we'll attempt to remove it.
case endpointName := <-p.unregisterChan:
delete(endpoints, endpointName.query)
delete(endpoints, endpointName.Request)

peerLog.Infof("MsgRouter: unregistering "+
"MsgEndpoint(%s)", endpointName.query)
"MsgEndpoint(%s)", endpointName.Request)

endpointName.respChan <- fn.NewRight[error, error](
nil,
)
endpointName.Resolve(nil)

// A new message was just sent in. We'll attempt to route it to
// all the endpoints that can handle it.
case msgQuery := <-p.msgChan:
msg := msgQuery.query
msg := msgQuery.Request

// Loop through all the endpoints and send the message
// to those that can handle it the message.
Expand All @@ -276,18 +243,15 @@ func (p *MultiMsgRouter) msgRouter() {
err = ErrUnableToRouteMsg
}

msgQuery.respChan <- fn.NewRight[error](err)
msgQuery.Resolve(err)

// A query for the endpoint state just came in, we'll send back
// a copy of our current state.
case endpointQuery := <-p.endpointQueries:
endpointsCopy := make(EndpointsMap, len(endpoints))
maps.Copy(endpointsCopy, endpoints)

//nolint:lll
endpointQuery.respChan <- fn.NewLeft[EndpointsMap, error](
endpointsCopy,
)
endpointQuery.Resolve(endpointsCopy)

case <-p.quit:
return
Expand Down

0 comments on commit 7597854

Please sign in to comment.