/
influx_batch.go
42 lines (35 loc) · 762 Bytes
/
influx_batch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package dispatchers
import (
"bytes"
"errors"
)
const InfluxMaxBatchSize = 500
type InfluxBatch struct {
lines bytes.Buffer
lineCount int
}
func NewInfluxBatch() *InfluxBatch {
return &InfluxBatch{}
}
func (batch *InfluxBatch) Add(message []byte) error {
// check for message exceeding max size
if len(message) == 0 {
// noop
return nil
}
// check for batch size
if !batch.CanAdd(message) {
return errors.New(ErrBatchTooLarge)
}
// TODO: remove trailing new line in message
batch.lines.Write(message)
batch.lines.WriteString("\n")
batch.lineCount++
return nil
}
func (batch *InfluxBatch) CanAdd(message []byte) bool {
return batch.lineCount < InfluxMaxBatchSize
}
func (batch *InfluxBatch) Len() int {
return batch.lineCount
}