Skip to content

Commit

Permalink
all: add Unix datagram support
Browse files Browse the repository at this point in the history
I tried updating #123 with ~50
commits but the diff got too messy. Easier to just add the changes on
a new commit.
  • Loading branch information
kevinburkesegment committed Sep 9, 2024
1 parent 2a099d8 commit c30dfce
Show file tree
Hide file tree
Showing 6 changed files with 380 additions and 21 deletions.
76 changes: 58 additions & 18 deletions datadog/client.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package datadog

import (
"io"
"log"
"net"
"net/url"
"os"
"strings"
"time"

"github.com/segmentio/stats/v4"
Expand Down Expand Up @@ -40,6 +42,8 @@ var (
// The ClientConfig type is used to configure datadog clients.
type ClientConfig struct {
// Address of the datadog database to send metrics to.
// UDP: host:port (default)
// UDS: unixgram://dir/file.ext
Address string

// Maximum size of batch of events sent to datadog.
Expand Down Expand Up @@ -106,15 +110,22 @@ func NewClientWith(config ClientConfig) *Client {
},
}

conn, bufferSize, err := dial(config.Address, config.BufferSize)
w, err := newWriter(config.Address)
if err != nil {
log.Printf("stats/datadog: %s", err)
c.err = err
w = &noopWriter{}
}

c.conn, c.err, c.bufferSize = conn, err, bufferSize
c.buffer.BufferSize = bufferSize
newBufSize, err := w.CalcBufferSize(config.BufferSize)
if err != nil {
log.Printf("stats/datadog: unable to calc writer's buffer size. Defaulting to a buffer of size %d - %v\n", DefaultBufferSize, err)
newBufSize = DefaultBufferSize
}

c.bufferSize = newBufSize
c.buffer.Serializer = &c.serializer
log.Printf("stats/datadog: sending metrics with a buffer of size %d B", bufferSize)
log.Printf("stats/datadog: sending metrics with a buffer of size %d B", newBufSize)
return c
}

Expand All @@ -140,18 +151,7 @@ func (c *Client) Close() error {
return c.err
}

func dial(address string, sizehint int) (conn net.Conn, bufsize int, err error) {
var f *os.File

if conn, err = net.Dial("udp", address); err != nil {
return
}

if f, err = conn.(*net.UDPConn).File(); err != nil {
conn.Close()
return
}
defer f.Close()
func bufSizeFromFD(f *os.File, sizehint int) (bufsize int, err error) {
fd := int(f.Fd())

// The kernel refuses to send UDP datagrams that are larger than the size of
Expand All @@ -160,7 +160,6 @@ func dial(address string, sizehint int) (conn net.Conn, bufsize int, err error)
// to accept larger datagrams, or fallback to the default socket buffer size
// if it failed.
if bufsize, err = unix.GetsockoptInt(fd, unix.SOL_SOCKET, unix.SO_SNDBUF); err != nil {
conn.Close()
return
}

Expand Down Expand Up @@ -198,3 +197,44 @@ func dial(address string, sizehint int) (conn net.Conn, bufsize int, err error)
_ = unix.SetNonblock(fd, true)
return
}

type ddWriter interface {
io.WriteCloser
CalcBufferSize(desiredBufSize int) (int, error)
}

func newWriter(addr string) (ddWriter, error) {
if strings.HasPrefix(addr, "unixgram://") ||
strings.HasPrefix(addr, "udp://") {
u, err := url.Parse(addr)
if err != nil {
return nil, err
}
switch u.Scheme {
case "unixgram":
return newUDSWriter(u.Path)
case "udp":
return newUDPWriter(u.Path)
}
}
// default assume addr host:port to use UDP
return newUDPWriter(addr)
}

// noopWriter is a writer that does nothing

Check failure on line 224 in datadog/client.go

View workflow job for this annotation

GitHub Actions / lint

Comment should end in a period (godot)
type noopWriter struct{}

// Write writes nothing

Check failure on line 227 in datadog/client.go

View workflow job for this annotation

GitHub Actions / lint

Comment should end in a period (godot)
func (w *noopWriter) Write(data []byte) (int, error) {

Check failure on line 228 in datadog/client.go

View workflow job for this annotation

GitHub Actions / lint

unused-parameter: parameter 'data' seems to be unused, consider removing or renaming it as _ (revive)
return 0, nil
}

// Close is a noop close

Check failure on line 232 in datadog/client.go

View workflow job for this annotation

GitHub Actions / lint

Comment should end in a period (godot)
func (w *noopWriter) Close() error {
return nil
}

// CalcBufferSize returns the sizehint
func (w *noopWriter) CalcBufferSize(sizehint int) (int, error) {
return sizehint, nil
}
58 changes: 57 additions & 1 deletion datadog/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,28 @@ func TestClientWithDistributionPrefixes(t *testing.T) {
}
}

func TestClient_UDS(t *testing.T) {
client := NewClient("unixgram://do-not-exist")

for i := 0; i != 1000; i++ {
client.HandleMeasures(time.Time{}, stats.Measure{
Name: "request",
Fields: []stats.Field{
{Name: "count", Value: stats.ValueOf(5)},
{Name: "rtt", Value: stats.ValueOf(100 * time.Millisecond)},
},
Tags: []stats.Tag{
stats.T("answer", "42"),
stats.T("hello", "world"),
},
})
}

if err := client.Close(); err != nil {
t.Error(err)
}
}

func TestClientWithUseDistributions(t *testing.T) {
// Start a goroutine listening for packets and giving them back on packets chan
packets := make(chan []byte)
Expand Down Expand Up @@ -117,7 +139,7 @@ main.http.rtt.seconds:0.001215296|h|#http_req_content_charset:,http_req_content_
count := int32(0)
expect := int32(strings.Count(data, "\n"))

addr, closer := startTestServer(t, HandlerFunc(func(_ Metric, _ net.Addr) {
addr, closer := startUDPTestServer(t, HandlerFunc(func(_ Metric, _ net.Addr) {
atomic.AddInt32(&count, 1)
}))
defer closer.Close()
Expand All @@ -135,6 +157,40 @@ main.http.rtt.seconds:0.001215296|h|#http_req_content_charset:,http_req_content_
}
}

func TestClientWriteLargeMetrics_UDS(t *testing.T) {
const data = `main.http.error.count:0|c|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity
main.http.message.count:1|c|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity,operation:read,type:request
main.http.message.header.size:2|h|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity,operation:read,type:request
main.http.message.header.bytes:240|h|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity,operation:read,type:request
main.http.message.body.bytes:0|h|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity,operation:read,type:request
main.http.message.count:1|c|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity,http_res_content_charset:,http_res_content_endoing:,http_res_content_type:application/json,http_res_protocol:HTTP/1.1,http_res_server:,http_res_transfer_encoding:identity,http_res_upgrade:,http_status:200,http_status_bucket:2xx,operation:write,type:response
main.http.message.header.size:1|h|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity,http_res_content_charset:,http_res_content_endoing:,http_res_content_type:application/json,http_res_protocol:HTTP/1.1,http_res_server:,http_res_transfer_encoding:identity,http_res_upgrade:,http_status:200,http_status_bucket:2xx,operation:write,type:response
main.http.message.header.bytes:70|h|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity,http_res_content_charset:,http_res_content_endoing:,http_res_content_type:application/json,http_res_protocol:HTTP/1.1,http_res_server:,http_res_transfer_encoding:identity,http_res_upgrade:,http_status:200,http_status_bucket:2xx,operation:write,type:response
main.http.message.body.bytes:839|h|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity,http_res_content_charset:,http_res_content_endoing:,http_res_content_type:application/json,http_res_protocol:HTTP/1.1,http_res_server:,http_res_transfer_encoding:identity,http_res_upgrade:,http_status:200,http_status_bucket:2xx,operation:write,type:response
main.http.rtt.seconds:0.001215296|h|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity,http_res_content_charset:,http_res_content_endoing:,http_res_content_type:application/json,http_res_protocol:HTTP/1.1,http_res_server:,http_res_transfer_encoding:identity,http_res_upgrade:,http_status:200,http_status_bucket:2xx,operation:write,type:response
`

count := int32(0)
expect := int32(strings.Count(data, "\n"))

addr, closer := startUDSTestServer(t, HandlerFunc(func(m Metric, _ net.Addr) {

Check failure on line 176 in datadog/client_test.go

View workflow job for this annotation

GitHub Actions / lint

unused-parameter: parameter 'm' seems to be unused, consider removing or renaming it as _ (revive)
atomic.AddInt32(&count, 1)
}))
defer closer.Close()

client := NewClient("unixgram://" + addr)

if _, err := client.Write([]byte(data)); err != nil {
t.Error(err)
}

time.Sleep(100 * time.Millisecond)

if n := atomic.LoadInt32(&count); n != expect {
t.Error("bad metric count:", n)
}
}

func BenchmarkClient(b *testing.B) {
log.SetOutput(io.Discard)

Expand Down
70 changes: 68 additions & 2 deletions datadog/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package datadog
import (
"io"
"net"
"os"
"path/filepath"
"sort"
"sync"
"sync/atomic"
Expand All @@ -21,7 +23,7 @@ func TestServer(t *testing.T) {
seenGauges := make([]Metric, 0)
var mu sync.Mutex

addr, closer := startTestServer(t, HandlerFunc(func(m Metric, _ net.Addr) {
addr, closer := startUDPTestServer(t, HandlerFunc(func(m Metric, _ net.Addr) {
switch m.Name {
case "datadog.test.A":
atomic.AddUint32(&a, uint32(m.Value))
Expand Down Expand Up @@ -94,7 +96,7 @@ func TestServer(t *testing.T) {
}
}

func startTestServer(t *testing.T, handler Handler) (addr string, closer io.Closer) {
func startUDPTestServer(t *testing.T, handler Handler) (addr string, closer io.Closer) {
conn, err := net.ListenPacket("udp", "127.0.0.1:0")
if err != nil {
t.Error(err)
Expand All @@ -105,3 +107,67 @@ func startTestServer(t *testing.T, handler Handler) (addr string, closer io.Clos

return conn.LocalAddr().String(), conn
}

// startUDSTestServerWithSocketFile starts a UDS server with a given socket file
func startUDSTestServerWithSocketFile(t *testing.T, socketPath string, handler Handler) (closer io.Closer) {
udsAddr, err := net.ResolveUnixAddr("unixgram", socketPath)
if err != nil {
t.Error(err)
t.FailNow()
}

conn, err := net.ListenUnixgram("unixgram", udsAddr)
if err != nil {
t.Error(err)
t.FailNow()
}

go Serve(conn, handler)

return &testUnixgramServer{
UnixConn: conn,
pathToDelete: socketPath,
}
}

// startUDSTestServer starts a UDS server with a random socket file internally generated
func startUDSTestServer(t *testing.T, handler Handler) (socketPath string, closer io.Closer) {
// generate a random dir
dir, err := os.MkdirTemp("", "socket")
if err != nil {
t.Error(err)
t.FailNow()
}

socketPath = filepath.Join(dir, "dsd.socket")

udsAddr, err := net.ResolveUnixAddr("unixgram", socketPath)
if err != nil {
t.Error(err)
t.FailNow()
}

conn, err := net.ListenUnixgram("unixgram", udsAddr)
if err != nil {
t.Error(err)
t.FailNow()
}

ts := testUnixgramServer{
UnixConn: conn,
pathToDelete: dir, // so we delete any tmp dir we created
}

go Serve(conn, handler)
return socketPath, &ts
}

type testUnixgramServer struct {
*net.UnixConn
pathToDelete string
}

func (ts *testUnixgramServer) Close() error {
os.RemoveAll(ts.pathToDelete) // clean up
return ts.UnixConn.Close()
}
40 changes: 40 additions & 0 deletions datadog/udp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package datadog

import "net"

type udpWriter struct {
conn net.Conn
}

// newUDPWriter returns a pointer to a new newUDPWriter given a socket file path as addr.
func newUDPWriter(addr string) (*udpWriter, error) {
udpAddr, err := net.ResolveUDPAddr("udp", addr)
if err != nil {
return nil, err
}
conn, err := net.DialUDP("udp", nil, udpAddr)
if err != nil {
return nil, err
}
return &udpWriter{conn: conn}, nil

}

Check failure on line 21 in datadog/udp.go

View workflow job for this annotation

GitHub Actions / lint

unnecessary trailing newline (whitespace)

// Write data to the UDP connection
func (w *udpWriter) Write(data []byte) (int, error) {
return w.conn.Write(data)
}

func (w *udpWriter) Close() error {
return w.conn.Close()
}

func (w *udpWriter) CalcBufferSize(sizehint int) (int, error) {
f, err := w.conn.(*net.UDPConn).File()
if err != nil {
return 0, err
}
defer f.Close()

return bufSizeFromFD(f, sizehint)
}
Loading

0 comments on commit c30dfce

Please sign in to comment.