-
Notifications
You must be signed in to change notification settings - Fork 32
/
server.go
118 lines (95 loc) · 2.46 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package datadog
import (
"bytes"
"io"
"net"
"runtime"
"time"
)
// Handler defines the interface that types must satisfy to process metrics
// received by a dogstatsd server.
type Handler interface {
// HandleMetric is called when a dogstatsd server receives a metric.
// The method receives the metric and the address from which it was sent.
HandleMetric(Metric, net.Addr)
// HandleEvent is called when a dogstatsd server receives an event.
// The method receives the metric and the address from which it was sent.
HandleEvent(Event, net.Addr)
}
// HandlerFunc makes it possible for function types to be used as metric
// handlers on dogstatsd servers.
type HandlerFunc func(Metric, net.Addr)
// HandleMetric calls f(m, a).
func (f HandlerFunc) HandleMetric(m Metric, a net.Addr) {
f(m, a)
}
// HandleEvent is a no-op for backwards compatibility.
func (f HandlerFunc) HandleEvent(e Event, a net.Addr) {
return
}
// ListenAndServe starts a new dogstatsd server, listening for UDP datagrams on
// addr and forwarding the metrics to handler.
func ListenAndServe(addr string, handler Handler) (err error) {
var conn net.PacketConn
if conn, err = net.ListenPacket("udp", addr); err != nil {
return
}
err = Serve(conn, handler)
return
}
// Serve runs a dogstatsd server, listening for datagrams on conn and forwarding
// the metrics to handler.
func Serve(conn net.PacketConn, handler Handler) (err error) {
defer conn.Close()
concurrency := runtime.GOMAXPROCS(-1)
if concurrency <= 0 {
concurrency = 1
}
done := make(chan error, concurrency)
conn.SetDeadline(time.Time{})
for i := 0; i != concurrency; i++ {
go serve(conn, handler, done)
}
for i := 0; i != concurrency; i++ {
switch e := <-done; e {
case nil, io.EOF, io.ErrClosedPipe, io.ErrUnexpectedEOF:
default:
err = e
}
conn.Close()
}
return
}
func serve(conn net.PacketConn, handler Handler, done chan<- error) {
b := make([]byte, 65536)
for {
n, a, err := conn.ReadFrom(b)
if err != nil {
done <- err
return
}
for s := b[:n]; len(s) != 0; {
var ln []byte
var off int
if off = bytes.IndexByte(s, '\n'); off < 0 {
off = len(s)
} else {
off++
}
ln, s = s[:off], s[off:]
if bytes.HasPrefix(ln, []byte("_e")) {
e, err := parseEvent(string(ln))
if err != nil {
continue
}
handler.HandleEvent(e, a)
} else {
m, err := parseMetric(string(ln))
if err != nil {
continue
}
handler.HandleMetric(m, a)
}
}
}
}