From 1fddaec6b9058597b5a3d851cf0f6e259edc6261 Mon Sep 17 00:00:00 2001 From: EclesioMeloJunior Date: Mon, 9 May 2022 10:35:48 -0400 Subject: [PATCH 01/15] fix: websocket message checks from untrusted data --- dot/rpc/subscription/websocket.go | 43 ++++++++++++++------------ dot/rpc/subscription/websocket_test.go | 38 +++++++++++++++++++++++ 2 files changed, 62 insertions(+), 19 deletions(-) diff --git a/dot/rpc/subscription/websocket.go b/dot/rpc/subscription/websocket.go index cb53b91438..e654ebfa2d 100644 --- a/dot/rpc/subscription/websocket.go +++ b/dot/rpc/subscription/websocket.go @@ -22,6 +22,12 @@ import ( "github.com/gorilla/websocket" ) +type websocketMessage struct { + ID float64 `json:"id"` + Method string `json:"method"` + Params any +} + type httpclient interface { Do(*http.Request) (*http.Response, error) } @@ -46,7 +52,7 @@ type WSConn struct { } // readWebsocketMessage will read and parse the message data to a string->interface{} data -func (c *WSConn) readWebsocketMessage() ([]byte, map[string]interface{}, error) { +func (c *WSConn) readWebsocketMessage() ([]byte, *websocketMessage, error) { _, mbytes, err := c.Wsconn.ReadMessage() if err != nil { logger.Debugf("websocket failed to read message: %s", err) @@ -55,8 +61,7 @@ func (c *WSConn) readWebsocketMessage() ([]byte, map[string]interface{}, error) logger.Tracef("websocket message received: %s", string(mbytes)) - // determine if request is for subscribe method type - var msg map[string]interface{} + msg := new(websocketMessage) err = json.Unmarshal(mbytes, &msg) if err != nil { @@ -80,23 +85,24 @@ func (c *WSConn) HandleComm() { continue } - params := msg["params"] - reqid := msg["id"].(float64) - method := msg["method"].(string) + if msg.Method == "" { + c.safeSendError(0, big.NewInt(InvalidRequestCode), InvalidRequestMessage) + continue + } - logger.Debugf("ws method %s called with params %v", method, params) + logger.Debugf("ws method %s called with params %v", msg.Method, msg.Params) - if !strings.Contains(method, "_unsubscribe") && !strings.Contains(method, "_unwatch") { - setupListener := c.getSetupListener(method) + if !strings.Contains(msg.Method, "_unsubscribe") && !strings.Contains(msg.Method, "_unwatch") { + setupListener := c.getSetupListener(msg.Method) if setupListener == nil { c.executeRPCCall(mbytes) continue } - listener, err := setupListener(reqid, params) + listener, err := setupListener(msg.ID, msg.Params) if err != nil { - logger.Warnf("failed to create listener (method=%s): %s", method, err) + logger.Warnf("failed to create listener (method=%s): %s", msg.Method, err) continue } @@ -104,29 +110,28 @@ func (c *WSConn) HandleComm() { continue } - listener, err := c.getUnsubListener(params) - + listener, err := c.getUnsubListener(msg.Params) if err != nil { - logger.Warnf("failed to get unsubscriber (method=%s): %s", method, err) + logger.Warnf("failed to get unsubscriber (method=%s): %s", msg.Method, err) if errors.Is(err, errUknownParamSubscribeID) || errors.Is(err, errCannotFindUnsubsriber) { - c.safeSendError(reqid, big.NewInt(InvalidRequestCode), InvalidRequestMessage) + c.safeSendError(msg.ID, big.NewInt(InvalidRequestCode), InvalidRequestMessage) continue } if errors.Is(err, errCannotParseID) || errors.Is(err, errCannotFindListener) { - c.safeSend(newBooleanResponseJSON(false, reqid)) + c.safeSend(newBooleanResponseJSON(false, msg.ID)) continue } } err = listener.Stop() if err != nil { - logger.Warnf("failed to stop listener goroutine (method=%s): %s", method, err) - c.safeSend(newBooleanResponseJSON(false, reqid)) + logger.Warnf("failed to stop listener goroutine (method=%s): %s", msg.Method, err) + c.safeSend(newBooleanResponseJSON(false, msg.ID)) } - c.safeSend(newBooleanResponseJSON(true, reqid)) + c.safeSend(newBooleanResponseJSON(true, msg.ID)) continue } } diff --git a/dot/rpc/subscription/websocket_test.go b/dot/rpc/subscription/websocket_test.go index a085dc6f2e..d959014fb5 100644 --- a/dot/rpc/subscription/websocket_test.go +++ b/dot/rpc/subscription/websocket_test.go @@ -21,6 +21,44 @@ import ( "github.com/stretchr/testify/require" ) +func TestWSConn_EmptyMethod(t *testing.T) { + wsconn, c, cancel := setupWSConn(t) + wsconn.Subscriptions = make(map[uint32]Listener) + defer cancel() + + go wsconn.HandleComm() + + tests := []struct { + sentMessage []byte + expected []byte + }{ + { + sentMessage: []byte(`{ + "jsonrpc": "2.0", + "method": "", + "id": 0, + "params": [] + }`), + expected: []byte(`{"jsonrpc":"2.0","error":{"code":-32600,"message":"Invalid request"},"id":0}` + "\n"), + }, + { + sentMessage: []byte(`{ + "jsonrpc": "2.0", + "params": [] + }`), + expected: []byte(`{"jsonrpc":"2.0","error":{"code":-32600,"message":"Invalid request"},"id":0}` + "\n"), + }, + } + + for _, tt := range tests { + c.WriteMessage(websocket.TextMessage, tt.sentMessage) + + _, msg, err := c.ReadMessage() + require.NoError(t, err) + require.Equal(t, tt.expected, msg) + } +} + func TestWSConn_HandleComm(t *testing.T) { wsconn, c, cancel := setupWSConn(t) wsconn.Subscriptions = make(map[uint32]Listener) From c748d1a413240c73714a686afa99ec7974d0d3de Mon Sep 17 00:00:00 2001 From: EclesioMeloJunior Date: Mon, 9 May 2022 11:03:42 -0400 Subject: [PATCH 02/15] chore: ensure no panic while ID is not number --- dot/rpc/subscription/websocket_test.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/dot/rpc/subscription/websocket_test.go b/dot/rpc/subscription/websocket_test.go index d959014fb5..6f58862fba 100644 --- a/dot/rpc/subscription/websocket_test.go +++ b/dot/rpc/subscription/websocket_test.go @@ -21,7 +21,7 @@ import ( "github.com/stretchr/testify/require" ) -func TestWSConn_EmptyMethod(t *testing.T) { +func TestWSConn_CheckWebsocketInvalidData(t *testing.T) { wsconn, c, cancel := setupWSConn(t) wsconn.Subscriptions = make(map[uint32]Listener) defer cancel() @@ -48,6 +48,15 @@ func TestWSConn_EmptyMethod(t *testing.T) { }`), expected: []byte(`{"jsonrpc":"2.0","error":{"code":-32600,"message":"Invalid request"},"id":0}` + "\n"), }, + { + sentMessage: []byte(`{ + "jsonrpc": "2.0", + "id": "abcdef" + "method": "some_method_name" + "params": [] + }`), + expected: []byte(`{"jsonrpc":"2.0","error":{"code":-32600,"message":"Invalid request"},"id":0}` + "\n"), + }, } for _, tt := range tests { From 6815b4c2a44f57078ed6ac75879c01c6d5d7ec8d Mon Sep 17 00:00:00 2001 From: EclesioMeloJunior Date: Fri, 13 May 2022 14:30:30 -0400 Subject: [PATCH 03/15] chore: addressing comments --- dot/rpc/http.go | 2 +- dot/rpc/subscription/websocket.go | 4 ++-- dot/rpc/subscription/websocket_test.go | 8 ++++---- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/dot/rpc/http.go b/dot/rpc/http.go index fb50e79503..9427c8ef67 100644 --- a/dot/rpc/http.go +++ b/dot/rpc/http.go @@ -233,7 +233,7 @@ func (h *HTTPServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { wsc := NewWSConn(ws, h.serverConfig) h.wsConns = append(h.wsConns, wsc) - go wsc.HandleComm() + go wsc.HandleConn() } // NewWSConn to create new WebSocket Connection struct diff --git a/dot/rpc/subscription/websocket.go b/dot/rpc/subscription/websocket.go index e654ebfa2d..c0eceabb8c 100644 --- a/dot/rpc/subscription/websocket.go +++ b/dot/rpc/subscription/websocket.go @@ -72,8 +72,8 @@ func (c *WSConn) readWebsocketMessage() ([]byte, *websocketMessage, error) { return mbytes, msg, nil } -//HandleComm handles messages received on websocket connections -func (c *WSConn) HandleComm() { +//HandleConn handles messages received on websocket connections +func (c *WSConn) HandleConn() { for { mbytes, msg, err := c.readWebsocketMessage() if errors.Is(err, errCannotReadFromWebsocket) { diff --git a/dot/rpc/subscription/websocket_test.go b/dot/rpc/subscription/websocket_test.go index 6f58862fba..ec23f6fa75 100644 --- a/dot/rpc/subscription/websocket_test.go +++ b/dot/rpc/subscription/websocket_test.go @@ -26,7 +26,7 @@ func TestWSConn_CheckWebsocketInvalidData(t *testing.T) { wsconn.Subscriptions = make(map[uint32]Listener) defer cancel() - go wsconn.HandleComm() + go wsconn.HandleConn() tests := []struct { sentMessage []byte @@ -68,12 +68,12 @@ func TestWSConn_CheckWebsocketInvalidData(t *testing.T) { } } -func TestWSConn_HandleComm(t *testing.T) { +func TestWSConn_HandleConn(t *testing.T) { wsconn, c, cancel := setupWSConn(t) wsconn.Subscriptions = make(map[uint32]Listener) defer cancel() - go wsconn.HandleComm() + go wsconn.HandleConn() time.Sleep(time.Second * 2) // test storageChangeListener @@ -341,7 +341,7 @@ func TestSubscribeAllHeads(t *testing.T) { wsconn.Subscriptions = make(map[uint32]Listener) defer cancel() - go wsconn.HandleComm() + go wsconn.HandleConn() time.Sleep(time.Second * 2) _, err := wsconn.initAllBlocksListerner(1, nil) From affaecf09c52187dbca9e081d5ef96ea1b3fd58d Mon Sep 17 00:00:00 2001 From: EclesioMeloJunior Date: Fri, 13 May 2022 14:33:45 -0400 Subject: [PATCH 04/15] chore: returning a value instead of a pointer --- dot/rpc/subscription/websocket.go | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/dot/rpc/subscription/websocket.go b/dot/rpc/subscription/websocket.go index c0eceabb8c..36de853f18 100644 --- a/dot/rpc/subscription/websocket.go +++ b/dot/rpc/subscription/websocket.go @@ -52,24 +52,23 @@ type WSConn struct { } // readWebsocketMessage will read and parse the message data to a string->interface{} data -func (c *WSConn) readWebsocketMessage() ([]byte, *websocketMessage, error) { - _, mbytes, err := c.Wsconn.ReadMessage() +func (c *WSConn) readWebsocketMessage() (bytes []byte, msg websocketMessage, err error) { + _, bytes, err = c.Wsconn.ReadMessage() if err != nil { logger.Debugf("websocket failed to read message: %s", err) - return nil, nil, errCannotReadFromWebsocket + return bytes, msg, errCannotReadFromWebsocket } - logger.Tracef("websocket message received: %s", string(mbytes)) + logger.Tracef("websocket message received: %s", string(bytes)) - msg := new(websocketMessage) - err = json.Unmarshal(mbytes, &msg) + err = json.Unmarshal(bytes, &msg) if err != nil { logger.Debugf("websocket failed to unmarshal request message: %s", err) - return nil, nil, errCannotUnmarshalMessage + return bytes, msg, errCannotUnmarshalMessage } - return mbytes, msg, nil + return bytes, msg, nil } //HandleConn handles messages received on websocket connections From 1689e515732a24a24f1dec6829a7f844af164bab Mon Sep 17 00:00:00 2001 From: EclesioMeloJunior Date: Tue, 17 May 2022 16:24:07 -0400 Subject: [PATCH 05/15] chore: supernit --- dot/rpc/subscription/websocket.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dot/rpc/subscription/websocket.go b/dot/rpc/subscription/websocket.go index 36de853f18..502e78ebc9 100644 --- a/dot/rpc/subscription/websocket.go +++ b/dot/rpc/subscription/websocket.go @@ -71,7 +71,7 @@ func (c *WSConn) readWebsocketMessage() (bytes []byte, msg websocketMessage, err return bytes, msg, nil } -//HandleConn handles messages received on websocket connections +// HandleConn handles messages received on websocket connections func (c *WSConn) HandleConn() { for { mbytes, msg, err := c.readWebsocketMessage() From 6277304f6d39cfe9b4a1d21a63b5e60b590a433a Mon Sep 17 00:00:00 2001 From: EclesioMeloJunior Date: Mon, 23 May 2022 15:51:24 -0400 Subject: [PATCH 06/15] chore: unmarshal the struct in the caller --- dot/rpc/subscription/websocket.go | 25 +++++++++++-------------- 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/dot/rpc/subscription/websocket.go b/dot/rpc/subscription/websocket.go index 502e78ebc9..5efa193f33 100644 --- a/dot/rpc/subscription/websocket.go +++ b/dot/rpc/subscription/websocket.go @@ -52,35 +52,32 @@ type WSConn struct { } // readWebsocketMessage will read and parse the message data to a string->interface{} data -func (c *WSConn) readWebsocketMessage() (bytes []byte, msg websocketMessage, err error) { +func (c *WSConn) readWebsocketMessage() (bytes []byte, err error) { _, bytes, err = c.Wsconn.ReadMessage() if err != nil { logger.Debugf("websocket failed to read message: %s", err) - return bytes, msg, errCannotReadFromWebsocket + return bytes, errCannotReadFromWebsocket } logger.Tracef("websocket message received: %s", string(bytes)) - - err = json.Unmarshal(bytes, &msg) - - if err != nil { - logger.Debugf("websocket failed to unmarshal request message: %s", err) - return bytes, msg, errCannotUnmarshalMessage - } - - return bytes, msg, nil + return bytes, nil } // HandleConn handles messages received on websocket connections func (c *WSConn) HandleConn() { for { - mbytes, msg, err := c.readWebsocketMessage() + mbytes, err := c.readWebsocketMessage() if errors.Is(err, errCannotReadFromWebsocket) { return + } else if errors.Is(err, errCannotUnmarshalMessage) { + c.safeSendError(0, big.NewInt(InvalidRequestCode), InvalidRequestMessage) + continue } - if errors.Is(err, errCannotUnmarshalMessage) { - c.safeSendError(0, big.NewInt(InvalidRequestCode), InvalidRequestMessage) + msg := new(websocketMessage) + err = json.Unmarshal(mbytes, &msg) + if err != nil { + logger.Debugf("websocket failed to unmarshal request message: %s", err) continue } From 076a9fe101e21e8d7bfbb47d194b19afdf8f6454 Mon Sep 17 00:00:00 2001 From: EclesioMeloJunior Date: Mon, 23 May 2022 17:15:49 -0400 Subject: [PATCH 07/15] chore: fix errors wrappers --- dot/rpc/subscription/websocket.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dot/rpc/subscription/websocket.go b/dot/rpc/subscription/websocket.go index 5efa193f33..0d49dd0e90 100644 --- a/dot/rpc/subscription/websocket.go +++ b/dot/rpc/subscription/websocket.go @@ -56,7 +56,7 @@ func (c *WSConn) readWebsocketMessage() (bytes []byte, err error) { _, bytes, err = c.Wsconn.ReadMessage() if err != nil { logger.Debugf("websocket failed to read message: %s", err) - return bytes, errCannotReadFromWebsocket + return nil, errCannotReadFromWebsocket } logger.Tracef("websocket message received: %s", string(bytes)) @@ -77,7 +77,7 @@ func (c *WSConn) HandleConn() { msg := new(websocketMessage) err = json.Unmarshal(mbytes, &msg) if err != nil { - logger.Debugf("websocket failed to unmarshal request message: %s", err) + logger.Debugf("failed to unmarshal websocket request message: %s", err) continue } From cf9b5d7f7d0150d89319ab23ecdfca3673c672bb Mon Sep 17 00:00:00 2001 From: EclesioMeloJunior Date: Tue, 24 May 2022 11:56:24 -0400 Subject: [PATCH 08/15] chore: fix writing the error on the websocket conn --- dot/rpc/subscription/websocket.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/dot/rpc/subscription/websocket.go b/dot/rpc/subscription/websocket.go index 0d49dd0e90..04d0f1c532 100644 --- a/dot/rpc/subscription/websocket.go +++ b/dot/rpc/subscription/websocket.go @@ -69,15 +69,14 @@ func (c *WSConn) HandleConn() { mbytes, err := c.readWebsocketMessage() if errors.Is(err, errCannotReadFromWebsocket) { return - } else if errors.Is(err, errCannotUnmarshalMessage) { + } else if err != nil { c.safeSendError(0, big.NewInt(InvalidRequestCode), InvalidRequestMessage) - continue } msg := new(websocketMessage) err = json.Unmarshal(mbytes, &msg) if err != nil { - logger.Debugf("failed to unmarshal websocket request message: %s", err) + c.safeSendError(0, big.NewInt(InvalidRequestCode), InvalidRequestMessage) continue } From d22ce0e9cbc6c5b75c01481e67f8074c9e13f686 Mon Sep 17 00:00:00 2001 From: EclesioMeloJunior Date: Tue, 24 May 2022 12:10:44 -0400 Subject: [PATCH 09/15] chore: remove unneeded error var --- dot/rpc/subscription/websocket.go | 1 - 1 file changed, 1 deletion(-) diff --git a/dot/rpc/subscription/websocket.go b/dot/rpc/subscription/websocket.go index 04d0f1c532..cf74b9d56f 100644 --- a/dot/rpc/subscription/websocket.go +++ b/dot/rpc/subscription/websocket.go @@ -33,7 +33,6 @@ type httpclient interface { } var errCannotReadFromWebsocket = errors.New("cannot read message from websocket") -var errCannotUnmarshalMessage = errors.New("cannot unmarshal webasocket message data") var logger = log.NewFromGlobal(log.AddContext("pkg", "rpc/subscription")) // WSConn struct to hold WebSocket Connection references From 9c5f01b19c9b3115abe5b1f14fbe1e282e2842f9 Mon Sep 17 00:00:00 2001 From: EclesioMeloJunior Date: Wed, 25 May 2022 09:32:17 -0400 Subject: [PATCH 10/15] chore: remove unneeded condition branch --- dot/rpc/subscription/websocket.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/dot/rpc/subscription/websocket.go b/dot/rpc/subscription/websocket.go index cf74b9d56f..507458d75c 100644 --- a/dot/rpc/subscription/websocket.go +++ b/dot/rpc/subscription/websocket.go @@ -66,10 +66,8 @@ func (c *WSConn) readWebsocketMessage() (bytes []byte, err error) { func (c *WSConn) HandleConn() { for { mbytes, err := c.readWebsocketMessage() - if errors.Is(err, errCannotReadFromWebsocket) { + if err != nil { return - } else if err != nil { - c.safeSendError(0, big.NewInt(InvalidRequestCode), InvalidRequestMessage) } msg := new(websocketMessage) From 6b9bcc3b4090e05cf7e69ff41753eca6173596f0 Mon Sep 17 00:00:00 2001 From: EclesioMeloJunior Date: Thu, 26 May 2022 11:39:49 -0400 Subject: [PATCH 11/15] smooth the diffs --- dot/rpc/subscription/websocket_test.go | 94 +++++++++++++------------- 1 file changed, 47 insertions(+), 47 deletions(-) diff --git a/dot/rpc/subscription/websocket_test.go b/dot/rpc/subscription/websocket_test.go index ec23f6fa75..5ad9c6db4b 100644 --- a/dot/rpc/subscription/websocket_test.go +++ b/dot/rpc/subscription/websocket_test.go @@ -21,53 +21,6 @@ import ( "github.com/stretchr/testify/require" ) -func TestWSConn_CheckWebsocketInvalidData(t *testing.T) { - wsconn, c, cancel := setupWSConn(t) - wsconn.Subscriptions = make(map[uint32]Listener) - defer cancel() - - go wsconn.HandleConn() - - tests := []struct { - sentMessage []byte - expected []byte - }{ - { - sentMessage: []byte(`{ - "jsonrpc": "2.0", - "method": "", - "id": 0, - "params": [] - }`), - expected: []byte(`{"jsonrpc":"2.0","error":{"code":-32600,"message":"Invalid request"},"id":0}` + "\n"), - }, - { - sentMessage: []byte(`{ - "jsonrpc": "2.0", - "params": [] - }`), - expected: []byte(`{"jsonrpc":"2.0","error":{"code":-32600,"message":"Invalid request"},"id":0}` + "\n"), - }, - { - sentMessage: []byte(`{ - "jsonrpc": "2.0", - "id": "abcdef" - "method": "some_method_name" - "params": [] - }`), - expected: []byte(`{"jsonrpc":"2.0","error":{"code":-32600,"message":"Invalid request"},"id":0}` + "\n"), - }, - } - - for _, tt := range tests { - c.WriteMessage(websocket.TextMessage, tt.sentMessage) - - _, msg, err := c.ReadMessage() - require.NoError(t, err) - require.Equal(t, tt.expected, msg) - } -} - func TestWSConn_HandleConn(t *testing.T) { wsconn, c, cancel := setupWSConn(t) wsconn.Subscriptions = make(map[uint32]Listener) @@ -419,3 +372,50 @@ func TestSubscribeAllHeads(t *testing.T) { require.NoError(t, l.Stop()) mockBlockAPI.On("FreeImportedBlockNotifierChannel", mock.AnythingOfType("chan *types.Block")) } + +func TestWSConn_CheckWebsocketInvalidData(t *testing.T) { + wsconn, c, cancel := setupWSConn(t) + wsconn.Subscriptions = make(map[uint32]Listener) + defer cancel() + + go wsconn.HandleConn() + + tests := []struct { + sentMessage []byte + expected []byte + }{ + { + sentMessage: []byte(`{ + "jsonrpc": "2.0", + "method": "", + "id": 0, + "params": [] + }`), + expected: []byte(`{"jsonrpc":"2.0","error":{"code":-32600,"message":"Invalid request"},"id":0}` + "\n"), + }, + { + sentMessage: []byte(`{ + "jsonrpc": "2.0", + "params": [] + }`), + expected: []byte(`{"jsonrpc":"2.0","error":{"code":-32600,"message":"Invalid request"},"id":0}` + "\n"), + }, + { + sentMessage: []byte(`{ + "jsonrpc": "2.0", + "id": "abcdef" + "method": "some_method_name" + "params": [] + }`), + expected: []byte(`{"jsonrpc":"2.0","error":{"code":-32600,"message":"Invalid request"},"id":0}` + "\n"), + }, + } + + for _, tt := range tests { + c.WriteMessage(websocket.TextMessage, tt.sentMessage) + + _, msg, err := c.ReadMessage() + require.NoError(t, err) + require.Equal(t, tt.expected, msg) + } +} From 4275e5b0c3e09fba4447886fb4643690e3098a6e Mon Sep 17 00:00:00 2001 From: EclesioMeloJunior Date: Thu, 26 May 2022 15:03:41 -0400 Subject: [PATCH 12/15] chore: unmarshaling the message inside `readWebsocketMessage` --- dot/rpc/subscription/websocket.go | 59 +++++++++++++++++-------------- 1 file changed, 32 insertions(+), 27 deletions(-) diff --git a/dot/rpc/subscription/websocket.go b/dot/rpc/subscription/websocket.go index 507458d75c..5c1de8fd17 100644 --- a/dot/rpc/subscription/websocket.go +++ b/dot/rpc/subscription/websocket.go @@ -51,50 +51,55 @@ type WSConn struct { } // readWebsocketMessage will read and parse the message data to a string->interface{} data -func (c *WSConn) readWebsocketMessage() (bytes []byte, err error) { - _, bytes, err = c.Wsconn.ReadMessage() +func (c *WSConn) readWebsocketMessage() (rawBytes []byte, wsMessage *websocketMessage, err error) { + _, rawBytes, err = c.Wsconn.ReadMessage() if err != nil { - logger.Debugf("websocket failed to read message: %s", err) - return nil, errCannotReadFromWebsocket + return nil, nil, fmt.Errorf("%w: %s", errCannotReadFromWebsocket, err.Error()) } - logger.Tracef("websocket message received: %s", string(bytes)) - return bytes, nil + msg := new(websocketMessage) + err = json.Unmarshal(rawBytes, &msg) + if err != nil { + return nil, nil, err + } + + return rawBytes, msg, nil } // HandleConn handles messages received on websocket connections func (c *WSConn) HandleConn() { + defer c.Wsconn.Close() + for { - mbytes, err := c.readWebsocketMessage() + rawBytes, wsMessage, err := c.readWebsocketMessage() if err != nil { - return - } + logger.Debugf("websocket failed to read message: %s", err) + if errors.Is(err, errCannotReadFromWebsocket) { + return + } - msg := new(websocketMessage) - err = json.Unmarshal(mbytes, &msg) - if err != nil { c.safeSendError(0, big.NewInt(InvalidRequestCode), InvalidRequestMessage) continue } - if msg.Method == "" { + logger.Tracef("websocket message received: %s", string(rawBytes)) + if wsMessage.Method == "" { c.safeSendError(0, big.NewInt(InvalidRequestCode), InvalidRequestMessage) continue } - logger.Debugf("ws method %s called with params %v", msg.Method, msg.Params) - - if !strings.Contains(msg.Method, "_unsubscribe") && !strings.Contains(msg.Method, "_unwatch") { - setupListener := c.getSetupListener(msg.Method) + logger.Debugf("ws method %s called with params %v", wsMessage.Method, wsMessage.Params) + if !strings.Contains(wsMessage.Method, "_unsubscribe") && !strings.Contains(wsMessage.Method, "_unwatch") { + setupListener := c.getSetupListener(wsMessage.Method) if setupListener == nil { - c.executeRPCCall(mbytes) + c.executeRPCCall(rawBytes) continue } - listener, err := setupListener(msg.ID, msg.Params) + listener, err := setupListener(wsMessage.ID, wsMessage.Params) if err != nil { - logger.Warnf("failed to create listener (method=%s): %s", msg.Method, err) + logger.Warnf("failed to create listener (method=%s): %s", wsMessage.Method, err) continue } @@ -102,28 +107,28 @@ func (c *WSConn) HandleConn() { continue } - listener, err := c.getUnsubListener(msg.Params) + listener, err := c.getUnsubListener(wsMessage.Params) if err != nil { - logger.Warnf("failed to get unsubscriber (method=%s): %s", msg.Method, err) + logger.Warnf("failed to get unsubscriber (method=%s): %s", wsMessage.Method, err) if errors.Is(err, errUknownParamSubscribeID) || errors.Is(err, errCannotFindUnsubsriber) { - c.safeSendError(msg.ID, big.NewInt(InvalidRequestCode), InvalidRequestMessage) + c.safeSendError(wsMessage.ID, big.NewInt(InvalidRequestCode), InvalidRequestMessage) continue } if errors.Is(err, errCannotParseID) || errors.Is(err, errCannotFindListener) { - c.safeSend(newBooleanResponseJSON(false, msg.ID)) + c.safeSend(newBooleanResponseJSON(false, wsMessage.ID)) continue } } err = listener.Stop() if err != nil { - logger.Warnf("failed to stop listener goroutine (method=%s): %s", msg.Method, err) - c.safeSend(newBooleanResponseJSON(false, msg.ID)) + logger.Warnf("failed to stop listener goroutine (method=%s): %s", wsMessage.Method, err) + c.safeSend(newBooleanResponseJSON(false, wsMessage.ID)) } - c.safeSend(newBooleanResponseJSON(true, msg.ID)) + c.safeSend(newBooleanResponseJSON(true, wsMessage.ID)) continue } } From 226c7df2827313f87cf643241b488eb882b571df Mon Sep 17 00:00:00 2001 From: EclesioMeloJunior Date: Thu, 26 May 2022 15:08:37 -0400 Subject: [PATCH 13/15] chore: addressing comments --- dot/rpc/subscription/websocket.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/dot/rpc/subscription/websocket.go b/dot/rpc/subscription/websocket.go index 5c1de8fd17..cefa30cdce 100644 --- a/dot/rpc/subscription/websocket.go +++ b/dot/rpc/subscription/websocket.go @@ -25,7 +25,7 @@ import ( type websocketMessage struct { ID float64 `json:"id"` Method string `json:"method"` - Params any + Params any `json:"params"` } type httpclient interface { @@ -57,13 +57,13 @@ func (c *WSConn) readWebsocketMessage() (rawBytes []byte, wsMessage *websocketMe return nil, nil, fmt.Errorf("%w: %s", errCannotReadFromWebsocket, err.Error()) } - msg := new(websocketMessage) - err = json.Unmarshal(rawBytes, &msg) + wsMessage = new(websocketMessage) + err = json.Unmarshal(rawBytes, wsMessage) if err != nil { return nil, nil, err } - return rawBytes, msg, nil + return rawBytes, wsMessage, nil } // HandleConn handles messages received on websocket connections From adad86e0d60860bd14706d2afe92c0a145424949 Mon Sep 17 00:00:00 2001 From: EclesioMeloJunior Date: Thu, 26 May 2022 15:09:56 -0400 Subject: [PATCH 14/15] chore: smooth diffs --- dot/rpc/subscription/websocket.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dot/rpc/subscription/websocket.go b/dot/rpc/subscription/websocket.go index cefa30cdce..688869e312 100644 --- a/dot/rpc/subscription/websocket.go +++ b/dot/rpc/subscription/websocket.go @@ -68,8 +68,6 @@ func (c *WSConn) readWebsocketMessage() (rawBytes []byte, wsMessage *websocketMe // HandleConn handles messages received on websocket connections func (c *WSConn) HandleConn() { - defer c.Wsconn.Close() - for { rawBytes, wsMessage, err := c.readWebsocketMessage() if err != nil { @@ -83,12 +81,14 @@ func (c *WSConn) HandleConn() { } logger.Tracef("websocket message received: %s", string(rawBytes)) + if wsMessage.Method == "" { c.safeSendError(0, big.NewInt(InvalidRequestCode), InvalidRequestMessage) continue } logger.Debugf("ws method %s called with params %v", wsMessage.Method, wsMessage.Params) + if !strings.Contains(wsMessage.Method, "_unsubscribe") && !strings.Contains(wsMessage.Method, "_unwatch") { setupListener := c.getSetupListener(wsMessage.Method) From e7024e46d9c70a701153fcbd4a8e497c71d1079d Mon Sep 17 00:00:00 2001 From: EclesioMeloJunior Date: Thu, 26 May 2022 15:49:04 -0400 Subject: [PATCH 15/15] chore: checking method inside `readWebsocketMessage` --- dot/rpc/subscription/websocket.go | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/dot/rpc/subscription/websocket.go b/dot/rpc/subscription/websocket.go index 688869e312..faf951db14 100644 --- a/dot/rpc/subscription/websocket.go +++ b/dot/rpc/subscription/websocket.go @@ -32,7 +32,11 @@ type httpclient interface { Do(*http.Request) (*http.Response, error) } -var errCannotReadFromWebsocket = errors.New("cannot read message from websocket") +var ( + errCannotReadFromWebsocket = errors.New("cannot read message from websocket") + errEmptyMethod = errors.New("empty method") +) + var logger = log.NewFromGlobal(log.AddContext("pkg", "rpc/subscription")) // WSConn struct to hold WebSocket Connection references @@ -63,6 +67,10 @@ func (c *WSConn) readWebsocketMessage() (rawBytes []byte, wsMessage *websocketMe return nil, nil, err } + if wsMessage.Method == "" { + return nil, nil, errEmptyMethod + } + return rawBytes, wsMessage, nil } @@ -81,12 +89,6 @@ func (c *WSConn) HandleConn() { } logger.Tracef("websocket message received: %s", string(rawBytes)) - - if wsMessage.Method == "" { - c.safeSendError(0, big.NewInt(InvalidRequestCode), InvalidRequestMessage) - continue - } - logger.Debugf("ws method %s called with params %v", wsMessage.Method, wsMessage.Params) if !strings.Contains(wsMessage.Method, "_unsubscribe") && !strings.Contains(wsMessage.Method, "_unwatch") {