Skip to content
This repository has been archived by the owner on Apr 4, 2024. It is now read-only.

Problem: websocket client get duplicated messages #955

Merged
merged 8 commits into from
Feb 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,16 @@ Ref: https://keepachangelog.com/en/1.0.0/

## Unreleased


### Improvements

* (log) [#948](https://github.com/tharsis/ethermint/pull/948) redirect go-ethereum's logs to cosmos-sdk logger.
* (rpc) [tharsis#953](https://github.com/tharsis/ethermint/pull/953) Add `eth_signTypedData` api support.
* (log) [#948](https://github.com/tharsis/ethermint/pull/948) redirect go-ethereum's logs to Cosmos SDK logger.

### Bug Fixes

* (rpc) [#955](https://github.com/tharsis/ethermint/pull/955) Fix websocket server push duplicated messages to subscriber.
* (rpc) [tharsis#953](https://github.com/tharsis/ethermint/pull/953) Add `eth_signTypedData` api support.
* (log) [#948](https://github.com/tharsis/ethermint/pull/948) redirect go-ethereum's logs to cosmos-sdk logger.

## [v0.10.0-beta1] - 2022-02-15

Expand Down
34 changes: 17 additions & 17 deletions rpc/ethereum/namespaces/eth/filters/filter_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,62 +90,62 @@ func (es *EventSystem) WithContext(ctx context.Context) {

// subscribe performs a new event subscription to a given Tendermint event.
// The subscription creates a unidirectional receive event channel to receive the ResultEvent.
func (es *EventSystem) subscribe(sub *Subscription) (*Subscription, context.CancelFunc, error) {
func (es *EventSystem) subscribe(sub *Subscription) (*Subscription, pubsub.UnsubscribeFunc, error) {
var (
err error
cancelFn context.CancelFunc
)

es.ctx, cancelFn = context.WithCancel(context.Background())
ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()

existingSubs := es.eventBus.Topics()
for _, topic := range existingSubs {
if topic == sub.event {
eventCh, err := es.eventBus.Subscribe(sub.event)
eventCh, unsubFn, err := es.eventBus.Subscribe(sub.event)
if err != nil {
err := errors.Wrapf(err, "failed to subscribe to topic: %s", sub.event)
return nil, cancelFn, err
return nil, nil, err
}

sub.eventCh = eventCh
return sub, cancelFn, nil
return sub, unsubFn, nil
}
}

switch sub.typ {
case filters.LogsSubscription:
err = es.tmWSClient.Subscribe(es.ctx, sub.event)
err = es.tmWSClient.Subscribe(ctx, sub.event)
case filters.BlocksSubscription:
err = es.tmWSClient.Subscribe(es.ctx, sub.event)
err = es.tmWSClient.Subscribe(ctx, sub.event)
case filters.PendingTransactionsSubscription:
err = es.tmWSClient.Subscribe(es.ctx, sub.event)
err = es.tmWSClient.Subscribe(ctx, sub.event)
default:
err = fmt.Errorf("invalid filter subscription type %d", sub.typ)
}

if err != nil {
sub.err <- err
return nil, cancelFn, err
return nil, nil, err
}

// wrap events in a go routine to prevent blocking
es.install <- sub
<-sub.installed

eventCh, err := es.eventBus.Subscribe(sub.event)
eventCh, unsubFn, err := es.eventBus.Subscribe(sub.event)
if err != nil {
err := errors.Wrapf(err, "failed to subscribe to topic after installed: %s", sub.event)
return sub, cancelFn, err
return nil, nil, errors.Wrapf(err, "failed to subscribe to topic after installed: %s", sub.event)
}

sub.eventCh = eventCh
return sub, cancelFn, nil
return sub, unsubFn, nil
}

// SubscribeLogs creates a subscription that will write all logs matching the
// given criteria to the given logs channel. Default value for the from and to
// block is "latest". If the fromBlock > toBlock an error is returned.
func (es *EventSystem) SubscribeLogs(crit filters.FilterCriteria) (*Subscription, context.CancelFunc, error) {
func (es *EventSystem) SubscribeLogs(crit filters.FilterCriteria) (*Subscription, pubsub.UnsubscribeFunc, error) {
var from, to rpc.BlockNumber
if crit.FromBlock == nil {
from = rpc.LatestBlockNumber
Expand Down Expand Up @@ -173,7 +173,7 @@ func (es *EventSystem) SubscribeLogs(crit filters.FilterCriteria) (*Subscription

// subscribeLogs creates a subscription that will write all logs matching the
// given criteria to the given logs channel.
func (es *EventSystem) subscribeLogs(crit filters.FilterCriteria) (*Subscription, context.CancelFunc, error) {
func (es *EventSystem) subscribeLogs(crit filters.FilterCriteria) (*Subscription, pubsub.UnsubscribeFunc, error) {
sub := &Subscription{
id: rpc.NewID(),
typ: filters.LogsSubscription,
Expand All @@ -188,7 +188,7 @@ func (es *EventSystem) subscribeLogs(crit filters.FilterCriteria) (*Subscription
}

// SubscribeNewHeads subscribes to new block headers events.
func (es EventSystem) SubscribeNewHeads() (*Subscription, context.CancelFunc, error) {
func (es EventSystem) SubscribeNewHeads() (*Subscription, pubsub.UnsubscribeFunc, error) {
sub := &Subscription{
id: rpc.NewID(),
typ: filters.BlocksSubscription,
Expand All @@ -202,7 +202,7 @@ func (es EventSystem) SubscribeNewHeads() (*Subscription, context.CancelFunc, er
}

// SubscribePendingTxs subscribes to new pending transactions events from the mempool.
func (es EventSystem) SubscribePendingTxs() (*Subscription, context.CancelFunc, error) {
func (es EventSystem) SubscribePendingTxs() (*Subscription, pubsub.UnsubscribeFunc, error) {
sub := &Subscription{
id: rpc.NewID(),
typ: filters.PendingTransactionsSubscription,
Expand Down
39 changes: 29 additions & 10 deletions rpc/ethereum/pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,43 @@ package pubsub

import (
"sync"
"sync/atomic"

"github.com/pkg/errors"

coretypes "github.com/tendermint/tendermint/rpc/core/types"
)

type UnsubscribeFunc func()

type EventBus interface {
AddTopic(name string, src <-chan coretypes.ResultEvent) error
RemoveTopic(name string)
Subscribe(name string) (<-chan coretypes.ResultEvent, error)
Subscribe(name string) (<-chan coretypes.ResultEvent, UnsubscribeFunc, error)
Topics() []string
}

type memEventBus struct {
topics map[string]<-chan coretypes.ResultEvent
topicsMux *sync.RWMutex
subscribers map[string][]chan<- coretypes.ResultEvent
subscribersMux *sync.RWMutex
topics map[string]<-chan coretypes.ResultEvent
topicsMux *sync.RWMutex
subscribers map[string]map[uint64]chan<- coretypes.ResultEvent
subscribersMux *sync.RWMutex
currentUniqueID uint64
}

func NewEventBus() EventBus {
return &memEventBus{
topics: make(map[string]<-chan coretypes.ResultEvent),
topicsMux: new(sync.RWMutex),
subscribers: make(map[string][]chan<- coretypes.ResultEvent),
subscribers: make(map[string]map[uint64]chan<- coretypes.ResultEvent),
subscribersMux: new(sync.RWMutex),
}
}

func (m *memEventBus) GenUniqueID() uint64 {
return atomic.AddUint64(&m.currentUniqueID, 1)
}

func (m *memEventBus) Topics() (topics []string) {
m.topicsMux.RLock()
defer m.topicsMux.RUnlock()
Expand Down Expand Up @@ -67,21 +75,32 @@ func (m *memEventBus) RemoveTopic(name string) {
m.topicsMux.Unlock()
}

func (m *memEventBus) Subscribe(name string) (<-chan coretypes.ResultEvent, error) {
func (m *memEventBus) Subscribe(name string) (<-chan coretypes.ResultEvent, UnsubscribeFunc, error) {
m.topicsMux.RLock()
_, ok := m.topics[name]
m.topicsMux.RUnlock()

if !ok {
return nil, errors.Errorf("topic not found: %s", name)
return nil, nil, errors.Errorf("topic not found: %s", name)
}

ch := make(chan coretypes.ResultEvent)
m.subscribersMux.Lock()
defer m.subscribersMux.Unlock()
m.subscribers[name] = append(m.subscribers[name], ch)

return ch, nil
id := m.GenUniqueID()
if _, ok := m.subscribers[name]; !ok {
m.subscribers[name] = make(map[uint64]chan<- coretypes.ResultEvent)
}
m.subscribers[name][id] = ch

unsubscribe := func() {
m.subscribersMux.Lock()
defer m.subscribersMux.Unlock()
delete(m.subscribers[name], id)
}

return ch, unsubscribe, nil
}

func (m *memEventBus) publishTopic(name string, src <-chan coretypes.ResultEvent) {
Expand Down
6 changes: 3 additions & 3 deletions rpc/ethereum/pubsub/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ func TestSubscribe(t *testing.T) {

q.AddTopic("lol", lolSrc)

kekSubC, err := q.Subscribe("kek")
kekSubC, _, err := q.Subscribe("kek")
require.NoError(t, err)

lolSubC, err := q.Subscribe("lol")
lolSubC, _, err := q.Subscribe("lol")
require.NoError(t, err)

lol2SubC, err := q.Subscribe("lol")
lol2SubC, _, err := q.Subscribe("lol")
require.NoError(t, err)

wg := new(sync.WaitGroup)
Expand Down
Loading