From 6212a929bf670119ce9aeccad46d7f1ce6699369 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 21 Jun 2019 18:50:36 +0200 Subject: [PATCH] refactor interfaces (#9) --- basic.go | 118 ++++++++++++++++++++++++++++---------------------- basic_test.go | 68 ++++++++++++++--------------- go.mod | 2 +- go.sum | 31 ++++++++++++- opts.go | 36 ++------------- 5 files changed, 133 insertions(+), 122 deletions(-) diff --git a/basic.go b/basic.go index 8cb030d..ca102b5 100644 --- a/basic.go +++ b/basic.go @@ -21,21 +21,21 @@ type basicBus struct { var _ event.Bus = (*basicBus)(nil) -type Emitter struct { +type emitter struct { n *node typ reflect.Type closed int32 dropper func(reflect.Type) } -func (e *Emitter) Emit(evt interface{}) { +func (e *emitter) Emit(evt interface{}) { if atomic.LoadInt32(&e.closed) != 0 { panic("emitter is closed") } e.n.emit(evt) } -func (e *Emitter) Close() error { +func (e *emitter) Close() error { if !atomic.CompareAndSwapInt32(&e.closed, 0, 1) { panic("closed an emitter more than once") } @@ -93,16 +93,42 @@ func (b *basicBus) tryDropNode(typ reflect.Type) { b.lk.Unlock() } +type sub struct { + ch chan interface{} + nodes []*node + dropper func(reflect.Type) +} + +func (s *sub) Out() <-chan interface{} { + return s.ch +} + +func (s *sub) Close() error { + close(s.ch) + for _, n := range s.nodes { + n.lk.Lock() + for i := 0; i < len(n.sinks); i++ { + if n.sinks[i] == s.ch { + n.sinks[i], n.sinks[len(n.sinks)-1] = n.sinks[len(n.sinks)-1], nil + n.sinks = n.sinks[:len(n.sinks)-1] + break + } + } + tryDrop := len(n.sinks) == 0 && atomic.LoadInt32(&n.nEmitters) == 0 + n.lk.Unlock() + if tryDrop { + s.dropper(n.typ) + } + } + return nil +} + +var _ event.Subscription = (*sub)(nil) + // Subscribe creates new subscription. Failing to drain the channel will cause // publishers to get blocked. CancelFunc is guaranteed to return after last send // to the channel -// -// Example: -// ch := make(chan EventT, 10) -// defer close(ch) -// cancel, err := eventbus.Subscribe(ch) -// defer cancel() -func (b *basicBus) Subscribe(typedChan interface{}, opts ...event.SubscriptionOpt) (c event.CancelFunc, err error) { +func (b *basicBus) Subscribe(evtTypes interface{}, opts ...event.SubscriptionOpt) (_ event.Subscription, err error) { var settings subSettings for _, opt := range opts { if err := opt(&settings); err != nil { @@ -110,50 +136,40 @@ func (b *basicBus) Subscribe(typedChan interface{}, opts ...event.SubscriptionOp } } - refCh := reflect.ValueOf(typedChan) - typ := refCh.Type() - if typ.Kind() != reflect.Chan { - return nil, errors.New("expected a channel") + types, ok := evtTypes.([]interface{}) + if !ok { + types = []interface{}{evtTypes} } - if typ.ChanDir()&reflect.SendDir == 0 { - return nil, errors.New("channel doesn't allow send") + + out := &sub{ + ch: make(chan interface{}, settings.buffer), + nodes: make([]*node, len(types)), + + dropper: b.tryDropNode, } - if settings.forcedType != nil { - if settings.forcedType.Elem().AssignableTo(typ) { - return nil, fmt.Errorf("forced type %s cannot be sent to chan %s", settings.forcedType, typ) + for i, etyp := range types { + typ := reflect.TypeOf(etyp) + + if typ.Kind() != reflect.Ptr { + return nil, errors.New("subscribe called with non-pointer type") } - typ = settings.forcedType - } - - err = b.withNode(typ.Elem(), func(n *node) { - n.sinks = append(n.sinks, refCh) - c = func() { - n.lk.Lock() - for i := 0; i < len(n.sinks); i++ { - if n.sinks[i] == refCh { - n.sinks[i], n.sinks[len(n.sinks)-1] = n.sinks[len(n.sinks)-1], reflect.Value{} - n.sinks = n.sinks[:len(n.sinks)-1] - break + + err = b.withNode(typ.Elem(), func(n *node) { + n.sinks = append(n.sinks, out.ch) + out.nodes[i] = n + }, func(n *node) { + if n.keepLast { + l := n.last.Load() + if l == nil { + return } + out.ch <- l } - tryDrop := len(n.sinks) == 0 && atomic.LoadInt32(&n.nEmitters) == 0 - n.lk.Unlock() - if tryDrop { - b.tryDropNode(typ.Elem()) - } - } - }, func(n *node) { - if n.keepLast { - lastVal, ok := n.last.Load().(reflect.Value) - if !ok { - return - } + }) + } - refCh.Send(lastVal) - } - }) - return + return out, nil } // Emitter creates new emitter @@ -183,7 +199,7 @@ func (b *basicBus) Emitter(evtType interface{}, opts ...event.EmitterOpt) (e eve err = b.withNode(typ, func(n *node) { atomic.AddInt32(&n.nEmitters, 1) n.keepLast = n.keepLast || settings.makeStateful - e = &Emitter{n: n, typ: typ, dropper: b.tryDropNode} + e = &emitter{n: n, typ: typ, dropper: b.tryDropNode} }, func(_ *node) {}) return } @@ -203,7 +219,7 @@ type node struct { keepLast bool last atomic.Value - sinks []reflect.Value + sinks []chan interface{} } func newNode(typ reflect.Type) *node { @@ -220,11 +236,11 @@ func (n *node) emit(event interface{}) { n.lk.RLock() if n.keepLast { - n.last.Store(eval) + n.last.Store(event) } for _, ch := range n.sinks { - ch.Send(eval) + ch <- event } n.lk.RUnlock() } diff --git a/basic_test.go b/basic_test.go index 8d2bfae..fc23e61 100644 --- a/basic_test.go +++ b/basic_test.go @@ -27,15 +27,14 @@ func (EventA) String() string { func TestEmit(t *testing.T) { bus := NewBus() - events := make(chan EventA) - cancel, err := bus.Subscribe(events) + sub, err := bus.Subscribe(new(EventA)) if err != nil { t.Fatal(err) } go func() { - defer cancel() - <-events + defer sub.Close() + <-sub.Out() }() em, err := bus.Emitter(new(EventA)) @@ -49,8 +48,7 @@ func TestEmit(t *testing.T) { func TestSub(t *testing.T) { bus := NewBus() - events := make(chan EventB) - cancel, err := bus.Subscribe(events) + sub, err := bus.Subscribe(new(EventB)) if err != nil { t.Fatal(err) } @@ -61,8 +59,8 @@ func TestSub(t *testing.T) { wait.Add(1) go func() { - defer cancel() - event = <-events + defer sub.Close() + event = (<-sub.Out()).(EventB) wait.Done() }() @@ -131,9 +129,9 @@ func TestClosingRaces(t *testing.T) { lk.RLock() defer lk.RUnlock() - cancel, _ := b.Subscribe(make(chan EventA)) + sub, _ := b.Subscribe(new(EventA)) time.Sleep(10 * time.Millisecond) - cancel() + sub.Close() wg.Done() }() @@ -174,15 +172,14 @@ func TestSubMany(t *testing.T) { for i := 0; i < n; i++ { go func() { - events := make(chan EventB) - cancel, err := bus.Subscribe(events) + sub, err := bus.Subscribe(new(EventB)) if err != nil { panic(err) } - defer cancel() + defer sub.Close() ready.Done() - atomic.AddInt32(&r, int32(<-events)) + atomic.AddInt32(&r, int32((<-sub.Out()).(EventB))) wait.Done() }() } @@ -205,8 +202,7 @@ func TestSubMany(t *testing.T) { func TestSubType(t *testing.T) { bus := NewBus() - events := make(chan fmt.Stringer) - cancel, err := bus.Subscribe(events, ForceSubType(new(EventA))) + sub, err := bus.Subscribe([]interface{}{new(EventA), new(EventB)}) if err != nil { t.Fatal(err) } @@ -217,8 +213,8 @@ func TestSubType(t *testing.T) { wait.Add(1) go func() { - defer cancel() - event = <-events + defer sub.Close() + event = (<-sub.Out()).(EventA) wait.Done() }() @@ -244,15 +240,14 @@ func TestNonStateful(t *testing.T) { } defer em.Close() - eventsA := make(chan EventB, 1) - cancelS, err := bus.Subscribe(eventsA) + sub1, err := bus.Subscribe(new(EventB), BufSize(1)) if err != nil { t.Fatal(err) } - defer cancelS() + defer sub1.Close() select { - case <-eventsA: + case <-sub1.Out(): t.Fatal("didn't expect to get an event") default: } @@ -260,23 +255,22 @@ func TestNonStateful(t *testing.T) { em.Emit(EventB(1)) select { - case e := <-eventsA: - if e != 1 { + case e := <-sub1.Out(): + if e.(EventB) != 1 { t.Fatal("got wrong event") } default: t.Fatal("expected to get an event") } - eventsB := make(chan EventB, 1) - cancelS2, err := bus.Subscribe(eventsB) + sub2, err := bus.Subscribe(new(EventB), BufSize(1)) if err != nil { t.Fatal(err) } - defer cancelS2() + defer sub2.Close() select { - case <-eventsA: + case <-sub2.Out(): t.Fatal("didn't expect to get an event") default: } @@ -292,14 +286,13 @@ func TestStateful(t *testing.T) { em.Emit(EventB(2)) - eventsA := make(chan EventB, 1) - cancelS, err := bus.Subscribe(eventsA) + sub, err := bus.Subscribe(new(EventB), BufSize(1)) if err != nil { t.Fatal(err) } - defer cancelS() + defer sub.Close() - if <-eventsA != 2 { + if (<-sub.Out()).(EventB) != 2 { t.Fatal("got wrong event") } } @@ -320,16 +313,19 @@ func testMany(t testing.TB, subs, emits, msgs int, stateful bool) { for i := 0; i < subs; i++ { go func() { - events := make(chan EventB) - cancel, err := bus.Subscribe(events) + sub, err := bus.Subscribe(new(EventB)) if err != nil { panic(err) } - defer cancel() + defer sub.Close() ready.Done() for i := 0; i < emits*msgs; i++ { - atomic.AddInt64(&r, int64(<-events)) + e, ok := <-sub.Out() + if !ok { + panic("wat") + } + atomic.AddInt64(&r, int64(e.(EventB))) } wait.Done() }() diff --git a/go.mod b/go.mod index 01acb9a..e9c1bed 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,6 @@ module github.com/libp2p/go-eventbus go 1.12 require ( - github.com/libp2p/go-libp2p-core v0.0.4 + github.com/libp2p/go-libp2p-core v0.0.6 github.com/libp2p/go-libp2p-testing v0.0.4 ) diff --git a/go.sum b/go.sum index f520d1a..1e3414f 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32/go.mod h1:DrZx5ec/dmnfpw9KyYoQyYo7d0KEvTkk/5M/vbZjAr8= github.com/btcsuite/btcd v0.0.0-20190523000118-16327141da8c h1:aEbSeNALREWXk0G7UdNhR3ayBV7tZ4M2PNmnrCAph6Q= @@ -10,15 +12,20 @@ github.com/btcsuite/goleveldb v0.0.0-20160330041536-7834afc9e8cd/go.mod h1:F+uVa github.com/btcsuite/snappy-go v0.0.0-20151229074030-0bdef8d06723/go.mod h1:8woku9dyThutzjeg+3xrA5iCpBRH8XEEg3lh6TiUghc= github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtEyQwv5/p4Mg4C0fgbePVuGr935/5ddU9Z3TmDRY= github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/davecgh/go-spew v0.0.0-20171005155431-ecdeabc65495 h1:6IyqGr3fnd0tM3YxipK27TUskaOVUjU2nG45yzwcQKY= github.com/davecgh/go-spew v0.0.0-20171005155431-ecdeabc65495/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/gxed/hashland/keccakpg v0.0.1/go.mod h1:kRzw3HkwxFU1mpmPP8v1WyQzwdGfmKFJ6tItnhQ67kU= github.com/gxed/hashland/murmur3 v0.0.1/go.mod h1:KjXop02n4/ckmZSnY2+HKcLud/tcmvhST0bie/0lS48= +github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/ipfs/go-cid v0.0.1/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= github.com/ipfs/go-cid v0.0.2/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= @@ -33,8 +40,8 @@ github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+o github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4= github.com/libp2p/go-flow-metrics v0.0.1/go.mod h1:Iv1GH0sG8DtYN3SVJ2eG221wMiNpZxBdp967ls1g+k8= github.com/libp2p/go-libp2p-core v0.0.1/go.mod h1:g/VxnTZ/1ygHxH3dKok7Vno1VfpvGcGip57wjTU4fco= -github.com/libp2p/go-libp2p-core v0.0.4 h1:wNg7b2tKrZlSNibP+j7A1v8eatjf4+9+YRw0L3DUeFE= -github.com/libp2p/go-libp2p-core v0.0.4/go.mod h1:jyuCQP356gzfCFtRKyvAbNkyeuxb7OlyhWZ3nls5d2I= +github.com/libp2p/go-libp2p-core v0.0.6 h1:SsYhfWJ47vLP1Rd9/0hqEm/W/PlFbC/3YLZyLCcvo1w= +github.com/libp2p/go-libp2p-core v0.0.6/go.mod h1:0d9xmaYAVY5qmbp/fcgxHT3ZJsLjYeYPMJAUKpaCHrE= github.com/libp2p/go-libp2p-testing v0.0.4 h1:Qev57UR47GcLPXWjrunv5aLIQGO4n9mhI/8/EIrEEFc= github.com/libp2p/go-libp2p-testing v0.0.4/go.mod h1:gvchhf3FQOtBdr+eFUABet5a4MBLK8jM3V4Zghvmi+E= github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 h1:lYpkrQH5ajf0OXOcUbGjvZxxijuBwbbmlSxLiuofa+g= @@ -64,6 +71,7 @@ github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572 h1:RC6RW7j+ github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572/go.mod h1:w0SWMsp6j9O/dk4/ZpIhL+3CkG8ofA2vuv7k+ltqUMc= github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= +go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190225124518-7f87c0fbb88b/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= @@ -71,9 +79,20 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20190426145343-a29dc8fdc734/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190618222545-ea8f1a30c443 h1:IcSOAf4PyMp3U3XbIEj1/xJ2BjNN2jWv7JoyOsMxXUU= golang.org/x/crypto v0.0.0-20190618222545-ea8f1a30c443/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190219092855-153ac476189d/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -81,7 +100,15 @@ golang.org/x/sys v0.0.0-20190412213103-97732733099d h1:+R4KGOnez64A81RvjARKc4UT5 golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= +google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/opts.go b/opts.go index 3826555..50c6736 100644 --- a/opts.go +++ b/opts.go @@ -1,40 +1,12 @@ package eventbus -import ( - "errors" - "reflect" - - "github.com/libp2p/go-libp2p-core/event" -) - type subSettings struct { - forcedType reflect.Type + buffer int } -// ForceSubType is a Subscribe option which overrides the type to which -// the subscription will be done. Note that the evtType must be assignable -// to channel type. -// -// This also allows for subscribing to multiple eventbus channels with one -// Go channel to get better ordering guarantees. -// -// Example: -// type Event struct{} -// func (Event) String() string { -// return "event" -// } -// -// eventCh := make(chan fmt.Stringer) // interface { String() string } -// cancel, err := eventbus.Subscribe(eventCh, event.ForceSubType(new(Event))) -// [...] -func ForceSubType(evtType interface{}) event.SubscriptionOpt { - return func(settings interface{}) error { - s := settings.(*subSettings) - typ := reflect.TypeOf(evtType) - if typ.Kind() != reflect.Ptr { - return errors.New("ForceSubType called with non-pointer type") - } - s.forcedType = typ +func BufSize(n int) func(interface{}) error { + return func(s interface{}) error { + s.(*subSettings).buffer = n return nil } }