diff --git a/gossip/gossip/batcher.go b/gossip/gossip/batcher.go new file mode 100644 index 00000000000..cd39960d776 --- /dev/null +++ b/gossip/gossip/batcher.go @@ -0,0 +1,134 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package gossip + +import ( + "sync" + "sync/atomic" + "time" +) + +type emitBatchCallback func([]interface{}) + +//batchingEmitter is used for the gossip push/forwarding phase. +// Messages are added into the batchingEmitter, and they are forwarded periodically T times in batches and then discarded. +// If the batchingEmitter's stored message count reaches a certain capacity, that also triggers a message dispatch +type batchingEmitter interface { + // Add adds a message to be batched + Add(interface{}) + + // Stop stops the component + Stop() + + // Size returns the amount of pending messages to be emitted + Size() int +} + +// newBatchingEmitter accepts the following parameters: +// iterations: number of times each message is forwarded +// burstSize: a threshold that triggers a forwarding because of message count +// latency: the maximum delay that each message can be stored without being forwarded +// cb: a callback that is called in order for the forwarding to take place +func newBatchingEmitter(iterations, burstSize int, latency time.Duration, cb emitBatchCallback) batchingEmitter { + p := &batchingEmitterImpl{ + cb: cb, + delay: latency, + iterations: iterations, + burstSize: burstSize, + lock: &sync.Mutex{}, + buff: make([]*batchedMessage, 0), + stopFlag: int32(0), + } + + go p.periodicEmit() + return p +} + +func (p *batchingEmitterImpl) periodicEmit() { + for !p.toDie() { + time.Sleep(p.delay) + p.lock.Lock() + p.emit() + p.lock.Unlock() + } +} + +func (p *batchingEmitterImpl) emit() { + if len(p.buff) == 0 { + return + } + msgs2beEmitted := make([]interface{}, len(p.buff)) + for i, v := range p.buff { + msgs2beEmitted[i] = v.data + } + + p.cb(msgs2beEmitted) + p.decrementCounters() +} + +func (p *batchingEmitterImpl) decrementCounters() { + n := len(p.buff) + for i := 0; i < n; i++ { + msg := p.buff[i] + msg.iterationsLeft-- + if msg.iterationsLeft == 0 { + p.buff = append(p.buff[:i], p.buff[i+1:]...) + n-- + i-- + } + } +} + +func (p *batchingEmitterImpl) toDie() bool { + return atomic.LoadInt32(&(p.stopFlag)) == int32(1) +} + +type batchingEmitterImpl struct { + iterations int + burstSize int + delay time.Duration + cb emitBatchCallback + lock *sync.Mutex + buff []*batchedMessage + stopFlag int32 +} + +type batchedMessage struct { + data interface{} + iterationsLeft int +} + +func (p *batchingEmitterImpl) Stop() { + atomic.StoreInt32(&(p.stopFlag), int32(1)) +} + +func (p *batchingEmitterImpl) Size() int { + p.lock.Lock() + defer p.lock.Unlock() + return len(p.buff) +} + +func (p *batchingEmitterImpl) Add(message interface{}) { + p.lock.Lock() + defer p.lock.Unlock() + + p.buff = append(p.buff, &batchedMessage{data: message, iterationsLeft: p.iterations}) + + if len(p.buff) >= p.burstSize { + p.emit() + } +} diff --git a/gossip/gossip/batcher_test.go b/gossip/gossip/batcher_test.go new file mode 100644 index 00000000000..9ced01f8a33 --- /dev/null +++ b/gossip/gossip/batcher_test.go @@ -0,0 +1,118 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package gossip + +import ( + "github.com/stretchr/testify/assert" + "sync" + "sync/atomic" + "testing" + "time" +) + +func TestBatchingEmitterAddAndSize(t *testing.T) { + emitter := newBatchingEmitter(1, 10, time.Second, func(a []interface{}) {}) + defer emitter.Stop() + emitter.Add(1) + emitter.Add(2) + emitter.Add(3) + assert.Equal(t, 3, emitter.Size()) +} + +func TestBatchingEmitterStop(t *testing.T) { + // In this test we make sure the emitter doesn't do anything after it's stopped + disseminationAttempts := int32(0) + cb := func(a []interface{}) { + atomic.AddInt32(&disseminationAttempts, int32(1)) + } + + emitter := newBatchingEmitter(10, 1, time.Duration(10)*time.Millisecond, cb) + emitter.Add(1) + time.Sleep(time.Duration(10) * time.Millisecond) + emitter.Stop() + time.Sleep(time.Duration(100) * time.Millisecond) + assert.True(t, atomic.LoadInt32(&disseminationAttempts) < int32(5)) +} + +func TestBatchingEmitterExpiration(t *testing.T) { + // In this test we make sure that a message is expired and is discarded after enough time + // and that it was forwarded an adequate amount of times + disseminationAttempts := int32(0) + cb := func(a []interface{}) { + atomic.AddInt32(&disseminationAttempts, int32(1)) + } + + emitter := newBatchingEmitter(10, 1, time.Duration(1)*time.Millisecond, cb) + defer emitter.Stop() + + emitter.Add(1) + time.Sleep(time.Duration(50) * time.Millisecond) + assert.Equal(t, int32(10), atomic.LoadInt32(&disseminationAttempts), "Inadaquate amount of dissemination attempts detected") + assert.Equal(t, 0, emitter.Size()) +} + +func TestBatchingEmitterCounter(t *testing.T) { + // In this test we count the number of times each message is forwarded, with relation to the time passed + counters := make(map[int]int) + lock := &sync.Mutex{} + cb := func(a []interface{}) { + lock.Lock() + defer lock.Unlock() + for _, e := range a { + n := e.(int) + if _, exists := counters[n]; !exists { + counters[n] = 0 + } else { + counters[n]++ + } + } + } + + emitter := newBatchingEmitter(5, 100, time.Duration(50)*time.Millisecond, cb) + defer emitter.Stop() + + for i := 1; i <= 5; i++ { + emitter.Add(i) + if i == 5 { + break + } + time.Sleep(time.Duration(60) * time.Millisecond) + } + emitter.Stop() + + lock.Lock() + assert.Equal(t, 0, counters[4]) + assert.Equal(t, 1, counters[3]) + assert.Equal(t, 2, counters[2]) + assert.Equal(t, 3, counters[1]) + lock.Unlock() +} + +// TestBatchingEmitterBurstSizeCap tests that the emitter +func TestBatchingEmitterBurstSizeCap(t *testing.T) { + disseminationAttempts := int32(0) + cb := func(a []interface{}) { + atomic.AddInt32(&disseminationAttempts, int32(1)) + } + emitter := newBatchingEmitter(1, 10, time.Duration(800)*time.Millisecond, cb) + defer emitter.Stop() + + for i := 0; i < 50; i++ { + emitter.Add(i) + } + assert.Equal(t, int32(5), atomic.LoadInt32(&disseminationAttempts)) +} diff --git a/gossip/gossip/msgs.go b/gossip/gossip/msgs.go new file mode 100644 index 00000000000..f9ebc769508 --- /dev/null +++ b/gossip/gossip/msgs.go @@ -0,0 +1,118 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package gossip + +import "sync" + +type invalidationResult int + +const ( + MESSAGE_NO_ACTION = invalidationResult(0) + MESSAGE_INVALIDATES = invalidationResult(1) + MESSAGE_INVALIDATED = invalidationResult(2) +) + +// Returns: +// MESSAGE_INVALIDATES if this message invalidates that +// MESSAGE_INVALIDATED if this message is invalidated by that +// MESSAGE_NO_ACTION otherwise +type messageReplacingPolicy func(this interface{}, that interface{}) invalidationResult + +// invalidationTrigger is invoked on each message that was invalidated because of a message addition +// i.e: if add(0), add(1) was called one after the other, and the store has only {1} after the sequence of invocations +// then the invalidation trigger on 0 was called when 1 was added. +type invalidationTrigger func(message interface{}) + +func newMessageStore(pol messageReplacingPolicy, trigger invalidationTrigger) messageStore { + return &messageStoreImpl{pol: pol, lock: &sync.RWMutex{}, messages: make([]*msg, 0), invTrigger: trigger} +} + +// messageStore adds messages to an internal buffer. +// When a message is received, it might: +// - Be added to the buffer +// - Discarded because of some message already in the buffer (invalidated) +// - Make a message already in the buffer to be discarded (invalidates) +// When a message is invalidated, the invalidationTrigger is invoked on that message. +type messageStore interface { + // add adds a message to the store + // returns true or false whether the message was added to the store + add(msg interface{}) bool + + // size returns the amount of messages in the store + size() int + + // get returns all messages in the store + get() []interface{} +} + +type messageStoreImpl struct { + pol messageReplacingPolicy + lock *sync.RWMutex + messages []*msg + invTrigger invalidationTrigger +} + +type msg struct { + data interface{} +} + +// add adds a message to the store +func (s *messageStoreImpl) add(message interface{}) bool { + s.lock.Lock() + defer s.lock.Unlock() + + n := len(s.messages) + for i := 0; i < n; i++ { + m := s.messages[i] + switch s.pol(message, m.data) { + case MESSAGE_INVALIDATED: + return false + break + case MESSAGE_INVALIDATES: + s.invTrigger(m.data) + s.messages = append(s.messages[:i], s.messages[i+1:]...) + n-- + i-- + break + default: + break + } + } + + s.messages = append(s.messages, &msg{data: message}) + return true +} + +// size returns the amount of messages in the store +func (s *messageStoreImpl) size() int { + s.lock.RLock() + defer s.lock.RUnlock() + return len(s.messages) +} + +// get returns all messages in the store +func (s *messageStoreImpl) get() []interface{} { + s.lock.RLock() + defer s.lock.RUnlock() + + n := len(s.messages) + res := make([]interface{}, n) + for i := 0; i < n; i++ { + res[i] = s.messages[i].data + } + return res +} diff --git a/gossip/gossip/msgs_test.go b/gossip/gossip/msgs_test.go new file mode 100644 index 00000000000..083a31857ba --- /dev/null +++ b/gossip/gossip/msgs_test.go @@ -0,0 +1,141 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package gossip + +import ( + "github.com/stretchr/testify/assert" + "math/rand" + "sync/atomic" + "testing" + "time" +) + +func init() { + rand.Seed(42) +} + +func alwaysNoAction(this interface{}, that interface{}) invalidationResult { + return MESSAGE_NO_ACTION +} + +func noopTrigger(m interface{}) { + +} + +func compareInts(this interface{}, that interface{}) invalidationResult { + a := this.(int) + b := that.(int) + if a == b { + return MESSAGE_NO_ACTION + } + if a > b { + return MESSAGE_INVALIDATES + } + + return MESSAGE_INVALIDATED +} + +func TestSize(t *testing.T) { + msgStore := newMessageStore(alwaysNoAction, noopTrigger) + msgStore.add(0) + msgStore.add(1) + msgStore.add(2) + assert.Equal(t, 3, msgStore.size()) +} + +func TestNewMessagesInvalidates(t *testing.T) { + invalidated := make([]int, 9) + msgStore := newMessageStore(compareInts, func(m interface{}) { + invalidated = append(invalidated, m.(int)) + }) + assert.True(t, msgStore.add(0)) + for i := 1; i < 10; i++ { + assert.True(t, msgStore.add(i)) + assert.Equal(t, i - 1, invalidated[len(invalidated) - 1]) + assert.Equal(t, 1, msgStore.size()) + assert.Equal(t, i, msgStore.get()[0].(int)) + } +} + +func TestMessagesGet(t *testing.T) { + contains := func(a []interface{}, e interface{}) bool { + for _, v := range a { + if v == e { + return true + } + } + return false + } + + msgStore := newMessageStore(alwaysNoAction, noopTrigger) + expected := make([]int, 0) + for i := 0; i < 2; i++ { + n := rand.Int() + expected = append(expected, n) + msgStore.add(n) + } + + for _, num2Search := range expected { + assert.True(t, contains(msgStore.get(), num2Search), "Value %v not found in array", num2Search) + } + +} + +func TestNewMessagesInvalidated(t *testing.T) { + msgStore := newMessageStore(compareInts, noopTrigger) + assert.True(t, msgStore.add(10)) + for i := 9; i >= 0; i-- { + assert.False(t, msgStore.add(i)) + assert.Equal(t, 1, msgStore.size()) + assert.Equal(t, 10, msgStore.get()[0].(int)) + } +} + +func TestConcurrency(t *testing.T) { + stopFlag := int32(0) + msgStore := newMessageStore(compareInts, noopTrigger) + looper := func(f func()) func() { + return func() { + for { + if atomic.LoadInt32(&stopFlag) == int32(1) { + return + } + f() + } + } + } + + addProcess := looper(func() { + msgStore.add(rand.Int()) + }) + + getProcess := looper(func() { + msgStore.get() + }) + + sizeProcess := looper(func() { + msgStore.size() + }) + + go addProcess() + go getProcess() + go sizeProcess() + + time.Sleep(time.Duration(3) * time.Second) + + atomic.CompareAndSwapInt32(&stopFlag, 0, 1) +}