Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix (dot/rpc, dot/state): state_subscribeStorage to only notify for value changes #1460

Merged
merged 42 commits into from
Apr 22, 2021
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
af287cb
move websocket messages and listeners into own files
edwardmack Mar 12, 2021
9d0003a
fix notifyStorageSubscriptions to only notify for changes
edwardmack Mar 12, 2021
678d946
Merge branch 'development' into ed/fixRPCsubscribeStorage
edwardmack Mar 15, 2021
bf8f887
Merge branch 'development' into ed/fixRPCsubscribeStorage
edwardmack Mar 17, 2021
090028f
address PR comments
edwardmack Mar 17, 2021
a9279a7
add to websocket tests
edwardmack Mar 18, 2021
0595ca8
repair append, cleanup filter declareation
edwardmack Mar 18, 2021
a479786
fix anti-pattern in log message
edwardmack Mar 18, 2021
a9ecb4a
create notifyStorageSubscription for individual sub notify
edwardmack Mar 18, 2021
299448b
add websocket listeners unit tests
edwardmack Mar 19, 2021
0d73536
Merge branch 'development' into ed/fixRPCsubscribeStorage
edwardmack Mar 19, 2021
ca61c54
cleanup merge conflicts
edwardmack Mar 19, 2021
3230d70
lint
edwardmack Mar 19, 2021
d076caf
add sleep timer
edwardmack Mar 19, 2021
3bd1a53
Merge branch 'development' into ed/fixRPCsubscribeStorage
edwardmack Mar 22, 2021
ea3b80a
refactor websocket files
edwardmack Mar 22, 2021
90706be
lint
edwardmack Mar 22, 2021
c76452b
a locks to fix data race
edwardmack Mar 23, 2021
db4c9ea
Merge branch 'development' into ed/fixRPCsubscribeStorage
edwardmack Mar 23, 2021
e720c6a
Merge branch 'development' into ed/fixRPCsubscribeStorage
edwardmack Apr 8, 2021
cfa1810
Merge branch 'development' into ed/fixRPCsubscribeStorage
edwardmack Apr 12, 2021
dcf5d2a
implement observer design pattern
edwardmack Apr 13, 2021
4f120fb
Merge branch 'development' into ed/fixRPCsubscribeStorage
edwardmack Apr 13, 2021
a44e40f
fix race conditions
edwardmack Apr 13, 2021
4a5dd26
Merge branch 'development' into ed/fixRPCsubscribeStorage
edwardmack Apr 13, 2021
b344e5a
add tests
edwardmack Apr 14, 2021
1d247f2
add tests
edwardmack Apr 14, 2021
9706d78
Merge branch 'development' into ed/fixRPCsubscribeStorage
edwardmack Apr 14, 2021
e57ccb5
add tests
edwardmack Apr 14, 2021
f013cf5
add tests
edwardmack Apr 14, 2021
bfcd6a8
add tests
edwardmack Apr 15, 2021
dcf1493
add tests
edwardmack Apr 15, 2021
e32e256
Merge branch 'development' into ed/fixRPCsubscribeStorage
edwardmack Apr 15, 2021
c30f34e
add troubleshooting stuff for testing transactions
edwardmack Apr 20, 2021
c3a6463
Merge branch 'development' into ed/fixRPCsubscribeStorage
edwardmack Apr 20, 2021
ab9fd78
save commit
edwardmack Apr 20, 2021
d9e228e
address PR comments
edwardmack Apr 20, 2021
390fadf
lint
edwardmack Apr 20, 2021
2ab0f55
remove unused printf and comments
edwardmack Apr 20, 2021
39d8748
fix test
edwardmack Apr 20, 2021
4de9a74
update tests
edwardmack Apr 20, 2021
a2068a3
add return from error
edwardmack Apr 21, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions dot/rpc/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,8 @@ func (h *HTTPServer) Stop() error {
for _, conn := range h.wsConns {
for _, sub := range conn.Subscriptions {
switch v := sub.(type) {
case *subscription.StorageChangeListener:
h.serverConfig.StorageAPI.UnregisterStorageChangeChannel(v.ChanID)
close(v.Channel)
case *subscription.StorageObserver:
h.serverConfig.StorageAPI.UnregisterStorageObserver(v)
case *subscription.BlockListener:
h.serverConfig.BlockAPI.UnregisterImportedChannel(v.ChanID)
close(v.Channel)
Expand Down
4 changes: 2 additions & 2 deletions dot/rpc/modules/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ type StorageAPI interface {
GetStorage(root *common.Hash, key []byte) ([]byte, error)
GetStorageByBlockHash(bhash common.Hash, key []byte) ([]byte, error)
Entries(root *common.Hash) (map[string][]byte, error)
RegisterStorageChangeChannel(sub state.StorageSubscription) (byte, error)
UnregisterStorageChangeChannel(id byte)
GetStateRootFromBlock(bhash *common.Hash) (*common.Hash, error)
GetKeysWithPrefix(root *common.Hash, prefix []byte) ([][]byte, error)
RegisterStorageObserver(observer state.Observer)
UnregisterStorageObserver(observer state.Observer)
}

// BlockAPI is the interface for the block state
Expand Down
227 changes: 39 additions & 188 deletions dot/rpc/subscription/listeners.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,121 +30,61 @@ type Listener interface {
Listen()
}

func (c *WSConn) startListener(lid int) {
go c.Subscriptions[lid].Listen()
// WSConnAPI interface defining methors a WSConn should have
type WSConnAPI interface {
safeSend(interface{})
}

func (c *WSConn) initStorageChangeListener(reqID float64, params interface{}) (int, error) {
scl := &StorageChangeListener{
Channel: make(chan *state.SubscriptionResult),
wsconn: c,
}
sub := &state.StorageSubscription{
Filter: make(map[string]bool),
Listener: scl.Channel,
}

pA := params.([]interface{})
for _, param := range pA {
switch p := param.(type) {
case []interface{}:
for _, pp := range param.([]interface{}) {
sub.Filter[pp.(string)] = true
}
case string:
sub.Filter[p] = true
default:
return 0, fmt.Errorf("unknow parameter type")
}
}
// StorageObserver struct to hold data for observer (Observer Design Pattern)
type StorageObserver struct {
id int
filter map[string][]byte
wsconn WSConnAPI
}

if c.StorageAPI == nil {
c.safeSendError(reqID, nil, "error StorageAPI not set")
return 0, fmt.Errorf("error StorageAPI not set")
// Update is called to notify observer of new value
func (s *StorageObserver) Update(change *state.SubscriptionResult) {
if change == nil {
return
}

chanID, err := c.StorageAPI.RegisterStorageChangeChannel(*sub)
if err != nil {
return 0, err
result := make(map[string]interface{})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace this with struct.

type Result struct {
   changes []Change
   block string
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume it needs to be in this format because it's what the JSON-RPC format is expecting? correct me if I'm wrong @edwardmack

Copy link
Contributor

@arijitAD arijitAD Apr 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case, we can use struct tags.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@noot yes, I did this for JSON-RPC formatting, and yes using a struct is more readable. I've added that with tags.

result["block"] = change.Hash.String()
changes := make([][]string, 0, len(change.Changes))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename Changes to Change or better, since this represents a single change.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated (using new struct).

for _, v := range change.Changes {
kv := []string{common.BytesToHex(v.Key), common.BytesToHex(v.Value)}
changes = append(changes, kv)
}
scl.ChanID = chanID
result["changes"] = changes

c.qtyListeners++
scl.subID = c.qtyListeners
c.Subscriptions[scl.subID] = scl
c.StorageSubChannels[scl.subID] = chanID

initRes := newSubscriptionResponseJSON(scl.subID, reqID)
c.safeSend(initRes)

return scl.subID, nil
res := newSubcriptionBaseResponseJSON()
res.Method = "state_storage"
res.Params.Result = result
res.Params.SubscriptionID = s.GetID()
s.wsconn.safeSend(res)
}

// StorageChangeListener for listening to state change channels
type StorageChangeListener struct {
Channel chan *state.SubscriptionResult
wsconn *WSConn
ChanID byte
subID int
// GetID the id for the Observer
func (s *StorageObserver) GetID() int {
return s.id
}

// Listen implementation of Listen interface to listen for importedChan changes
func (l *StorageChangeListener) Listen() {
for change := range l.Channel {
if change == nil {
continue
}

result := make(map[string]interface{})
result["block"] = change.Hash.String()
changes := [][]string{}
for _, v := range change.Changes {
kv := []string{common.BytesToHex(v.Key), common.BytesToHex(v.Value)}
changes = append(changes, kv)
}
result["changes"] = changes

res := newSubcriptionBaseResponseJSON()
res.Method = "state_storage"
res.Params.Result = result
res.Params.SubscriptionID = l.subID
l.wsconn.safeSend(res)
}
// GetFilter returns the filter the Observer is using
func (s *StorageObserver) GetFilter() map[string][]byte {
return s.filter
}

// Listen to satisfy Listener interface (but is no longer used by StorageObserver)
func (s *StorageObserver) Listen() {}

// BlockListener to handle listening for blocks importedChan
type BlockListener struct {
Channel chan *types.Block
wsconn *WSConn
wsconn WSConnAPI
ChanID byte
subID int
}

func (c *WSConn) initBlockListener(reqID float64) (int, error) {
bl := &BlockListener{
Channel: make(chan *types.Block),
wsconn: c,
}

if c.BlockAPI == nil {
c.safeSendError(reqID, nil, "error BlockAPI not set")
return 0, fmt.Errorf("error BlockAPI not set")
}
chanID, err := c.BlockAPI.RegisterImportedChannel(bl.Channel)
if err != nil {
return 0, err
}
bl.ChanID = chanID
c.qtyListeners++
bl.subID = c.qtyListeners
c.Subscriptions[bl.subID] = bl
c.BlockSubChannels[bl.subID] = chanID
initRes := newSubscriptionResponseJSON(bl.subID, reqID)
c.safeSend(initRes)

return bl.subID, nil
}

// Listen implementation of Listen interface to listen for importedChan changes
func (l *BlockListener) Listen() {
for block := range l.Channel {
Expand All @@ -167,36 +107,11 @@ func (l *BlockListener) Listen() {
// BlockFinalizedListener to handle listening for finalized blocks
type BlockFinalizedListener struct {
channel chan *types.Header
wsconn *WSConn
wsconn WSConnAPI
chanID byte
subID int
}

func (c *WSConn) initBlockFinalizedListener(reqID float64) (int, error) {
bfl := &BlockFinalizedListener{
channel: make(chan *types.Header),
wsconn: c,
}

if c.BlockAPI == nil {
c.safeSendError(reqID, nil, "error BlockAPI not set")
return 0, fmt.Errorf("error BlockAPI not set")
}
chanID, err := c.BlockAPI.RegisterFinalizedChannel(bfl.channel)
if err != nil {
return 0, err
}
bfl.chanID = chanID
c.qtyListeners++
bfl.subID = c.qtyListeners
c.Subscriptions[bfl.subID] = bfl
c.BlockSubChannels[bfl.subID] = chanID
initRes := newSubscriptionResponseJSON(bfl.subID, reqID)
c.safeSend(initRes)

return bfl.subID, nil
}

// Listen implementation of Listen interface to listen for importedChan changes
func (l *BlockFinalizedListener) Listen() {
for header := range l.channel {
Expand All @@ -217,7 +132,7 @@ func (l *BlockFinalizedListener) Listen() {

// ExtrinsicSubmitListener to handle listening for extrinsic events
type ExtrinsicSubmitListener struct {
wsconn *WSConn
wsconn WSConnAPI
subID int
extrinsic types.Extrinsic

Expand All @@ -231,55 +146,6 @@ type ExtrinsicSubmitListener struct {
// AuthorExtrinsicUpdates method name
const AuthorExtrinsicUpdates = "author_extrinsicUpdate"

func (c *WSConn) initExtrinsicWatch(reqID float64, params interface{}) (int, error) {
pA := params.([]interface{})
extBytes, err := common.HexToBytes(pA[0].(string))
if err != nil {
return 0, err
}

// listen for built blocks
esl := &ExtrinsicSubmitListener{
importedChan: make(chan *types.Block),
wsconn: c,
extrinsic: types.Extrinsic(extBytes),
finalizedChan: make(chan *types.Header),
}

if c.BlockAPI == nil {
return 0, fmt.Errorf("error BlockAPI not set")
}
esl.importedChanID, err = c.BlockAPI.RegisterImportedChannel(esl.importedChan)
if err != nil {
return 0, err
}

esl.finalizedChanID, err = c.BlockAPI.RegisterFinalizedChannel(esl.finalizedChan)
if err != nil {
return 0, err
}

c.qtyListeners++
esl.subID = c.qtyListeners
c.Subscriptions[esl.subID] = esl
c.BlockSubChannels[esl.subID] = esl.importedChanID

err = c.CoreAPI.HandleSubmittedExtrinsic(extBytes)
if err != nil {
return 0, err
}
c.safeSend(newSubscriptionResponseJSON(esl.subID, reqID))

// TODO (ed) since HandleSubmittedExtrinsic has been called we assume the extrinsic is in the tx queue
// should we add a channel to tx queue so we're notified when it's in the queue
if c.CoreAPI.IsBlockProducer() {
c.safeSend(newSubscriptionResponse(AuthorExtrinsicUpdates, esl.subID, "ready"))
}

// todo (ed) determine which peer extrinsic has been broadcast to, and set status
return esl.subID, err
}

// Listen implementation of Listen interface to listen for importedChan changes
func (l *ExtrinsicSubmitListener) Listen() {
// listen for imported blocks with extrinsic
Expand Down Expand Up @@ -322,25 +188,10 @@ type RuntimeVersionListener struct {
subID int
}

func (c *WSConn) initRuntimeVersionListener(reqID float64) (int, error) {
rvl := &RuntimeVersionListener{
wsconn: c,
}
if c.CoreAPI == nil {
c.safeSendError(reqID, nil, "error CoreAPI not set")
return 0, fmt.Errorf("error CoreAPI not set")
}
c.qtyListeners++
rvl.subID = c.qtyListeners
c.Subscriptions[rvl.subID] = rvl
initRes := newSubscriptionResponseJSON(rvl.subID, reqID)
c.safeSend(initRes)

return rvl.subID, nil
}

// Listen implementation of Listen interface to listen for runtime version changes
func (l *RuntimeVersionListener) Listen() {
// This sends current runtime version once when subscription is created
// TODO (ed) add logic to send updates when runtime version changes
rtVersion, err := l.wsconn.CoreAPI.GetRuntimeVersion(nil)
if err != nil {
return
Expand Down
Loading