diff --git a/go.mod b/go.mod index f3d7d16557..f3521dfe3e 100644 --- a/go.mod +++ b/go.mod @@ -35,7 +35,7 @@ require ( github.com/lightningnetwork/lightning-onion v1.2.1-0.20240712235311-98bd56499dfb github.com/lightningnetwork/lnd/cert v1.2.2 github.com/lightningnetwork/lnd/clock v1.1.1 - github.com/lightningnetwork/lnd/fn v1.2.0 + github.com/lightningnetwork/lnd/fn v1.2.1 github.com/lightningnetwork/lnd/healthcheck v1.2.4 github.com/lightningnetwork/lnd/kvdb v1.4.8 github.com/lightningnetwork/lnd/queue v1.1.1 diff --git a/peer/msg_router.go b/peer/msg_router.go index 95501215c6..4aa728478f 100644 --- a/peer/msg_router.go +++ b/peer/msg_router.go @@ -64,50 +64,28 @@ type MsgRouter interface { Stop() } -// queryMsg is a message sent into the main event loop to query or modify the -// internal state. -type queryMsg[Q any, R any] struct { - query Q - - respChan chan fn.Either[R, error] -} - // sendQuery sends a query to the main event loop, and returns the response. -func sendQuery[Q any, R any](sendChan chan queryMsg[Q, R], queryArg Q, - quit chan struct{}) fn.Either[R, error] { +func sendQuery[Q any, R any](sendChan chan fn.Req[Q, R], queryArg Q, + quit chan struct{}) fn.Result[R] { - query := queryMsg[Q, R]{ - query: queryArg, - respChan: make(chan fn.Either[R, error], 1), - } + query, respChan := fn.NewReq[Q, R](queryArg) if !fn.SendOrQuit(sendChan, query, quit) { - return fn.NewRight[R](fmt.Errorf("router shutting down")) - } - - resp, err := fn.RecvResp(query.respChan, nil, quit) - if err != nil { - return fn.NewRight[R](err) + return fn.Errf[R]("router shutting down") } - return resp + return fn.NewResult(fn.RecvResp(respChan, nil, quit)) } // sendQueryErr is a helper function based on sendQuery that can be used when // the query only needs an error response. -func sendQueryErr[Q any](sendChan chan queryMsg[Q, error], queryArg Q, +func sendQueryErr[Q any](sendChan chan fn.Req[Q, error], queryArg Q, quitChan chan struct{}) error { - var err error - resp := sendQuery(sendChan, queryArg, quitChan) - resp.WhenRight(func(e error) { - err = e - }) - resp.WhenLeft(func(e error) { - err = e - }) - - return err + return fn.ElimEither( + fn.Iden, fn.Iden, + sendQuery(sendChan, queryArg, quitChan).Either, + ) } // EndpointsMap is a map of all registered endpoints. @@ -121,19 +99,19 @@ type MultiMsgRouter struct { stopOnce sync.Once // registerChan is the channel that all new endpoints will be sent to. - registerChan chan queryMsg[MsgEndpoint, error] + registerChan chan fn.Req[MsgEndpoint, error] // unregisterChan is the channel that all endpoints that are to be // removed are sent to. - unregisterChan chan queryMsg[EndPointName, error] + unregisterChan chan fn.Req[EndPointName, error] // msgChan is the channel that all messages will be sent to for // processing. - msgChan chan queryMsg[lnwire.Message, error] + msgChan chan fn.Req[lnwire.Message, error] // endpointsQueries is a channel that all queries to the endpoints map // will be sent to. - endpointQueries chan queryMsg[MsgEndpoint, EndpointsMap] + endpointQueries chan fn.Req[MsgEndpoint, EndpointsMap] wg sync.WaitGroup quit chan struct{} @@ -142,10 +120,10 @@ type MultiMsgRouter struct { // NewMultiMsgRouter creates a new instance of a peer message router. func NewMultiMsgRouter() *MultiMsgRouter { return &MultiMsgRouter{ - registerChan: make(chan queryMsg[MsgEndpoint, error]), - unregisterChan: make(chan queryMsg[EndPointName, error]), - msgChan: make(chan queryMsg[lnwire.Message, error]), - endpointQueries: make(chan queryMsg[MsgEndpoint, EndpointsMap]), + registerChan: make(chan fn.Req[MsgEndpoint, error]), + unregisterChan: make(chan fn.Req[EndPointName, error]), + msgChan: make(chan fn.Req[lnwire.Message, error]), + endpointQueries: make(chan fn.Req[MsgEndpoint, EndpointsMap]), quit: make(chan struct{}), } } @@ -189,14 +167,10 @@ func (p *MultiMsgRouter) RouteMsg(msg lnwire.Message) error { // Endpoints returns a list of all registered endpoints. func (p *MultiMsgRouter) Endpoints() EndpointsMap { - resp := sendQuery(p.endpointQueries, nil, p.quit) - - var endpoints EndpointsMap - resp.WhenLeft(func(e EndpointsMap) { - endpoints = e - }) - - return endpoints + return fn.ElimEither( + fn.Iden, fn.Const[EndpointsMap, error](nil), + sendQuery(p.endpointQueries, nil, p.quit).Either, + ) } // msgRouter is the main goroutine that handles all incoming messages. @@ -211,7 +185,7 @@ func (p *MultiMsgRouter) msgRouter() { // A new endpoint was just sent in, so we'll add it to our set // of registered endpoints. case newEndpointMsg := <-p.registerChan: - endpoint := newEndpointMsg.query + endpoint := newEndpointMsg.Request peerLog.Infof("MsgRouter: registering new "+ "MsgEndpoint(%s)", endpoint.Name()) @@ -223,36 +197,29 @@ func (p *MultiMsgRouter) msgRouter() { "duplicate endpoint: %v", endpoint.Name()) - newEndpointMsg.respChan <- fn.NewRight[error]( - ErrDuplicateEndpoint, - ) + newEndpointMsg.Resolve(ErrDuplicateEndpoint) continue } endpoints[endpoint.Name()] = endpoint - // TODO(roasbeef): put in method? - newEndpointMsg.respChan <- fn.NewRight[error, error]( - nil, - ) + newEndpointMsg.Resolve(nil) // A request to unregister an endpoint was just sent in, so // we'll attempt to remove it. case endpointName := <-p.unregisterChan: - delete(endpoints, endpointName.query) + delete(endpoints, endpointName.Request) peerLog.Infof("MsgRouter: unregistering "+ - "MsgEndpoint(%s)", endpointName.query) + "MsgEndpoint(%s)", endpointName.Request) - endpointName.respChan <- fn.NewRight[error, error]( - nil, - ) + endpointName.Resolve(nil) // A new message was just sent in. We'll attempt to route it to // all the endpoints that can handle it. case msgQuery := <-p.msgChan: - msg := msgQuery.query + msg := msgQuery.Request // Loop through all the endpoints and send the message // to those that can handle it the message. @@ -276,7 +243,7 @@ func (p *MultiMsgRouter) msgRouter() { err = ErrUnableToRouteMsg } - msgQuery.respChan <- fn.NewRight[error](err) + msgQuery.Resolve(err) // A query for the endpoint state just came in, we'll send back // a copy of our current state. @@ -284,10 +251,7 @@ func (p *MultiMsgRouter) msgRouter() { endpointsCopy := make(EndpointsMap, len(endpoints)) maps.Copy(endpointsCopy, endpoints) - //nolint:lll - endpointQuery.respChan <- fn.NewLeft[EndpointsMap, error]( - endpointsCopy, - ) + endpointQuery.Resolve(endpointsCopy) case <-p.quit: return