Skip to content
This repository has been archived by the owner on Aug 19, 2022. It is now read-only.

Commit

Permalink
refactor interfaces (#9)
Browse files Browse the repository at this point in the history
  • Loading branch information
magik6k authored and raulk committed Jun 21, 2019
1 parent f100eac commit 6212a92
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 122 deletions.
118 changes: 67 additions & 51 deletions basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,21 @@ type basicBus struct {

var _ event.Bus = (*basicBus)(nil)

type Emitter struct {
type emitter struct {
n *node
typ reflect.Type
closed int32
dropper func(reflect.Type)
}

func (e *Emitter) Emit(evt interface{}) {
func (e *emitter) Emit(evt interface{}) {
if atomic.LoadInt32(&e.closed) != 0 {
panic("emitter is closed")
}
e.n.emit(evt)
}

func (e *Emitter) Close() error {
func (e *emitter) Close() error {
if !atomic.CompareAndSwapInt32(&e.closed, 0, 1) {
panic("closed an emitter more than once")
}
Expand Down Expand Up @@ -93,67 +93,83 @@ func (b *basicBus) tryDropNode(typ reflect.Type) {
b.lk.Unlock()
}

type sub struct {
ch chan interface{}
nodes []*node
dropper func(reflect.Type)
}

func (s *sub) Out() <-chan interface{} {
return s.ch
}

func (s *sub) Close() error {
close(s.ch)
for _, n := range s.nodes {
n.lk.Lock()
for i := 0; i < len(n.sinks); i++ {
if n.sinks[i] == s.ch {
n.sinks[i], n.sinks[len(n.sinks)-1] = n.sinks[len(n.sinks)-1], nil
n.sinks = n.sinks[:len(n.sinks)-1]
break
}
}
tryDrop := len(n.sinks) == 0 && atomic.LoadInt32(&n.nEmitters) == 0
n.lk.Unlock()
if tryDrop {
s.dropper(n.typ)
}
}
return nil
}

var _ event.Subscription = (*sub)(nil)

// Subscribe creates new subscription. Failing to drain the channel will cause
// publishers to get blocked. CancelFunc is guaranteed to return after last send
// to the channel
//
// Example:
// ch := make(chan EventT, 10)
// defer close(ch)
// cancel, err := eventbus.Subscribe(ch)
// defer cancel()
func (b *basicBus) Subscribe(typedChan interface{}, opts ...event.SubscriptionOpt) (c event.CancelFunc, err error) {
func (b *basicBus) Subscribe(evtTypes interface{}, opts ...event.SubscriptionOpt) (_ event.Subscription, err error) {
var settings subSettings
for _, opt := range opts {
if err := opt(&settings); err != nil {
return nil, err
}
}

refCh := reflect.ValueOf(typedChan)
typ := refCh.Type()
if typ.Kind() != reflect.Chan {
return nil, errors.New("expected a channel")
types, ok := evtTypes.([]interface{})
if !ok {
types = []interface{}{evtTypes}
}
if typ.ChanDir()&reflect.SendDir == 0 {
return nil, errors.New("channel doesn't allow send")

out := &sub{
ch: make(chan interface{}, settings.buffer),
nodes: make([]*node, len(types)),

dropper: b.tryDropNode,
}

if settings.forcedType != nil {
if settings.forcedType.Elem().AssignableTo(typ) {
return nil, fmt.Errorf("forced type %s cannot be sent to chan %s", settings.forcedType, typ)
for i, etyp := range types {
typ := reflect.TypeOf(etyp)

if typ.Kind() != reflect.Ptr {
return nil, errors.New("subscribe called with non-pointer type")
}
typ = settings.forcedType
}

err = b.withNode(typ.Elem(), func(n *node) {
n.sinks = append(n.sinks, refCh)
c = func() {
n.lk.Lock()
for i := 0; i < len(n.sinks); i++ {
if n.sinks[i] == refCh {
n.sinks[i], n.sinks[len(n.sinks)-1] = n.sinks[len(n.sinks)-1], reflect.Value{}
n.sinks = n.sinks[:len(n.sinks)-1]
break

err = b.withNode(typ.Elem(), func(n *node) {
n.sinks = append(n.sinks, out.ch)
out.nodes[i] = n
}, func(n *node) {
if n.keepLast {
l := n.last.Load()
if l == nil {
return
}
out.ch <- l
}
tryDrop := len(n.sinks) == 0 && atomic.LoadInt32(&n.nEmitters) == 0
n.lk.Unlock()
if tryDrop {
b.tryDropNode(typ.Elem())
}
}
}, func(n *node) {
if n.keepLast {
lastVal, ok := n.last.Load().(reflect.Value)
if !ok {
return
}
})
}

refCh.Send(lastVal)
}
})
return
return out, nil
}

// Emitter creates new emitter
Expand Down Expand Up @@ -183,7 +199,7 @@ func (b *basicBus) Emitter(evtType interface{}, opts ...event.EmitterOpt) (e eve
err = b.withNode(typ, func(n *node) {
atomic.AddInt32(&n.nEmitters, 1)
n.keepLast = n.keepLast || settings.makeStateful
e = &Emitter{n: n, typ: typ, dropper: b.tryDropNode}
e = &emitter{n: n, typ: typ, dropper: b.tryDropNode}
}, func(_ *node) {})
return
}
Expand All @@ -203,7 +219,7 @@ type node struct {
keepLast bool
last atomic.Value

sinks []reflect.Value
sinks []chan interface{}
}

func newNode(typ reflect.Type) *node {
Expand All @@ -220,11 +236,11 @@ func (n *node) emit(event interface{}) {

n.lk.RLock()
if n.keepLast {
n.last.Store(eval)
n.last.Store(event)
}

for _, ch := range n.sinks {
ch.Send(eval)
ch <- event
}
n.lk.RUnlock()
}
68 changes: 32 additions & 36 deletions basic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,14 @@ func (EventA) String() string {

func TestEmit(t *testing.T) {
bus := NewBus()
events := make(chan EventA)
cancel, err := bus.Subscribe(events)
sub, err := bus.Subscribe(new(EventA))
if err != nil {
t.Fatal(err)
}

go func() {
defer cancel()
<-events
defer sub.Close()
<-sub.Out()
}()

em, err := bus.Emitter(new(EventA))
Expand All @@ -49,8 +48,7 @@ func TestEmit(t *testing.T) {

func TestSub(t *testing.T) {
bus := NewBus()
events := make(chan EventB)
cancel, err := bus.Subscribe(events)
sub, err := bus.Subscribe(new(EventB))
if err != nil {
t.Fatal(err)
}
Expand All @@ -61,8 +59,8 @@ func TestSub(t *testing.T) {
wait.Add(1)

go func() {
defer cancel()
event = <-events
defer sub.Close()
event = (<-sub.Out()).(EventB)
wait.Done()
}()

Expand Down Expand Up @@ -131,9 +129,9 @@ func TestClosingRaces(t *testing.T) {
lk.RLock()
defer lk.RUnlock()

cancel, _ := b.Subscribe(make(chan EventA))
sub, _ := b.Subscribe(new(EventA))
time.Sleep(10 * time.Millisecond)
cancel()
sub.Close()

wg.Done()
}()
Expand Down Expand Up @@ -174,15 +172,14 @@ func TestSubMany(t *testing.T) {

for i := 0; i < n; i++ {
go func() {
events := make(chan EventB)
cancel, err := bus.Subscribe(events)
sub, err := bus.Subscribe(new(EventB))
if err != nil {
panic(err)
}
defer cancel()
defer sub.Close()

ready.Done()
atomic.AddInt32(&r, int32(<-events))
atomic.AddInt32(&r, int32((<-sub.Out()).(EventB)))
wait.Done()
}()
}
Expand All @@ -205,8 +202,7 @@ func TestSubMany(t *testing.T) {

func TestSubType(t *testing.T) {
bus := NewBus()
events := make(chan fmt.Stringer)
cancel, err := bus.Subscribe(events, ForceSubType(new(EventA)))
sub, err := bus.Subscribe([]interface{}{new(EventA), new(EventB)})
if err != nil {
t.Fatal(err)
}
Expand All @@ -217,8 +213,8 @@ func TestSubType(t *testing.T) {
wait.Add(1)

go func() {
defer cancel()
event = <-events
defer sub.Close()
event = (<-sub.Out()).(EventA)
wait.Done()
}()

Expand All @@ -244,39 +240,37 @@ func TestNonStateful(t *testing.T) {
}
defer em.Close()

eventsA := make(chan EventB, 1)
cancelS, err := bus.Subscribe(eventsA)
sub1, err := bus.Subscribe(new(EventB), BufSize(1))
if err != nil {
t.Fatal(err)
}
defer cancelS()
defer sub1.Close()

select {
case <-eventsA:
case <-sub1.Out():
t.Fatal("didn't expect to get an event")
default:
}

em.Emit(EventB(1))

select {
case e := <-eventsA:
if e != 1 {
case e := <-sub1.Out():
if e.(EventB) != 1 {
t.Fatal("got wrong event")
}
default:
t.Fatal("expected to get an event")
}

eventsB := make(chan EventB, 1)
cancelS2, err := bus.Subscribe(eventsB)
sub2, err := bus.Subscribe(new(EventB), BufSize(1))
if err != nil {
t.Fatal(err)
}
defer cancelS2()
defer sub2.Close()

select {
case <-eventsA:
case <-sub2.Out():
t.Fatal("didn't expect to get an event")
default:
}
Expand All @@ -292,14 +286,13 @@ func TestStateful(t *testing.T) {

em.Emit(EventB(2))

eventsA := make(chan EventB, 1)
cancelS, err := bus.Subscribe(eventsA)
sub, err := bus.Subscribe(new(EventB), BufSize(1))
if err != nil {
t.Fatal(err)
}
defer cancelS()
defer sub.Close()

if <-eventsA != 2 {
if (<-sub.Out()).(EventB) != 2 {
t.Fatal("got wrong event")
}
}
Expand All @@ -320,16 +313,19 @@ func testMany(t testing.TB, subs, emits, msgs int, stateful bool) {

for i := 0; i < subs; i++ {
go func() {
events := make(chan EventB)
cancel, err := bus.Subscribe(events)
sub, err := bus.Subscribe(new(EventB))
if err != nil {
panic(err)
}
defer cancel()
defer sub.Close()

ready.Done()
for i := 0; i < emits*msgs; i++ {
atomic.AddInt64(&r, int64(<-events))
e, ok := <-sub.Out()
if !ok {
panic("wat")
}
atomic.AddInt64(&r, int64(e.(EventB)))
}
wait.Done()
}()
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ module github.com/libp2p/go-eventbus
go 1.12

require (
github.com/libp2p/go-libp2p-core v0.0.4
github.com/libp2p/go-libp2p-core v0.0.6
github.com/libp2p/go-libp2p-testing v0.0.4
)
Loading

0 comments on commit 6212a92

Please sign in to comment.