Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Datadog UDS support #123

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 63 additions & 31 deletions datadog/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ import (
"bytes"
"io"
"log"
"net"
"net/url"
"os"
"strings"
"syscall"
"time"

Expand All @@ -14,7 +15,7 @@ import (

const (
// DefaultAddress is the default address to which the datadog client tries
// to connect to.
// to connect to. By default it connects to UDP
DefaultAddress = "localhost:8125"

// DefaultBufferSize is the default size for batches of metrics sent to
Expand All @@ -37,6 +38,8 @@ var (
// The ClientConfig type is used to configure datadog clients.
type ClientConfig struct {
// Address of the datadog database to send metrics to.
// UDP: host:port
// UDS: unixgram://dir/file.ext
Address string

// Maximum size of batch of events sent to datadog.
Expand Down Expand Up @@ -89,15 +92,24 @@ func NewClientWith(config ClientConfig) *Client {
},
}

conn, bufferSize, err := dial(config.Address, config.BufferSize)
w, err := newWriter(config.Address)
if err != nil {
log.Printf("stats/datadog: %s", err)
log.Printf("stats/datadog: unable to create writer %s", err)
c.err = err
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ideally i would return an error to the caller of NewClientWith if we can not create a writer, but decided to not change the API

w = &noopWriter{}
}

c.conn, c.err, c.bufferSize = conn, err, bufferSize
c.buffer.BufferSize = bufferSize
newBufSize, err := w.CalcBufferSize(config.BufferSize)
if err != nil {
log.Printf("stats/datadog: unable to calc writer's buffer size. Defaulting to a buffer of size %d - %v\n", DefaultBufferSize, err)
newBufSize = DefaultBufferSize
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if we prefer seting newBufSize to 0 (existing behaviour)? i am open for suggestion

}
c.bufferSize = newBufSize
c.buffer.BufferSize = newBufSize

c.serializer.w = w
c.buffer.Serializer = &c.serializer
log.Printf("stats/datadog: sending metrics with a buffer of size %d B", bufferSize)
log.Printf("stats/datadog: sending metrics with a buffer of size %d B", newBufSize)
return c
}

Expand All @@ -124,7 +136,7 @@ func (c *Client) Close() error {
}

type serializer struct {
conn net.Conn
w io.WriteCloser
bufferSize int
filters map[string]struct{}
}
Expand All @@ -137,12 +149,9 @@ func (s *serializer) AppendMeasures(b []byte, _ time.Time, measures ...stats.Mea
}

func (s *serializer) Write(b []byte) (int, error) {
if s.conn == nil {
return 0, io.ErrClosedPipe
}

if len(b) <= s.bufferSize {
return s.conn.Write(b)
return s.w.Write(b)
}

// When the serialized metrics are larger than the configured socket buffer
Expand All @@ -167,8 +176,7 @@ func (s *serializer) Write(b []byte) (int, error) {
}
splitIndex += i + 1
}

c, err := s.conn.Write(b[:splitIndex])
c, err := s.w.Write(b[:splitIndex])
if err != nil {
return n + c, err
}
Expand All @@ -181,32 +189,17 @@ func (s *serializer) Write(b []byte) (int, error) {
}

func (s *serializer) close() {
if s.conn != nil {
s.conn.Close()
}
s.w.Close()
}

func dial(address string, sizehint int) (conn net.Conn, bufsize int, err error) {
var f *os.File

if conn, err = net.Dial("udp", address); err != nil {
return
}

if f, err = conn.(*net.UDPConn).File(); err != nil {
conn.Close()
return
}
defer f.Close()
func bufSizeFromFD(f *os.File, sizehint int) (bufsize int, err error) {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i would need an expert eye on this, i used the same existing logic against UDP for Unix socket too. I think that is right but i am not 100% sure. if not i can make this only for UDP?

fd := int(f.Fd())

// The kernel refuses to send UDP datagrams that are larger than the size of
// the size of the socket send buffer. To maximize the number of metrics
// sent in one batch we attempt to attempt to adjust the kernel buffer size
// to accept larger datagrams, or fallback to the default socket buffer size
// if it failed.
if bufsize, err = syscall.GetsockoptInt(fd, syscall.SOL_SOCKET, syscall.SO_SNDBUF); err != nil {
conn.Close()
return
}

Expand Down Expand Up @@ -244,3 +237,42 @@ func dial(address string, sizehint int) (conn net.Conn, bufsize int, err error)
syscall.SetNonblock(fd, true)
return
}

type ddWriter interface {
io.WriteCloser
CalcBufferSize(desiredBufSize int) (int, error)
}

func newWriter(addr string) (ddWriter, error) {
if strings.HasPrefix(addr, "unixgram://") ||
strings.HasPrefix(addr, "udp://") {
u, err := url.Parse(addr)
if err != nil {
return nil, err
}
switch u.Scheme {
case "unixgram":
return newUDSWriter(u.Path)
case "udp":
return newUDPWriter(u.Path)
}
}
// default assume addr host:port to use UDP
return newUDPWriter(addr)
}

// noopWriter is a writer that does not do anything
type noopWriter struct{}

// Write writes nothing
func (w *noopWriter) Write(data []byte) (int, error) {
return 0, nil
}

func (w *noopWriter) Close() error {
return nil
}

func (w *noopWriter) CalcBufferSize(sizehint int) (int, error) {
return sizehint, nil
}
62 changes: 59 additions & 3 deletions datadog/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/segmentio/stats/v4"
)

func TestClient(t *testing.T) {
func TestClient_UDP(t *testing.T) {
client := NewClient(DefaultAddress)

for i := 0; i != 1000; i++ {
Expand All @@ -35,7 +35,29 @@ func TestClient(t *testing.T) {
}
}

func TestClientWriteLargeMetrics(t *testing.T) {
func TestClient_UDS(t *testing.T) {
client := NewClient("unixgram://do-not-exist")

for i := 0; i != 1000; i++ {
client.HandleMeasures(time.Time{}, stats.Measure{
Name: "request",
Fields: []stats.Field{
{Name: "count", Value: stats.ValueOf(5)},
{Name: "rtt", Value: stats.ValueOf(100 * time.Millisecond)},
},
Tags: []stats.Tag{
stats.T("answer", "42"),
stats.T("hello", "world"),
},
})
}

if err := client.Close(); err != nil {
t.Error(err)
}
}

func TestClientWriteLargeMetrics_UDP(t *testing.T) {
const data = `main.http.error.count:0|c|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity
main.http.message.count:1|c|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity,operation:read,type:request
main.http.message.header.size:2|h|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity,operation:read,type:request
Expand All @@ -51,7 +73,7 @@ main.http.rtt.seconds:0.001215296|h|#http_req_content_charset:,http_req_content_
count := int32(0)
expect := int32(strings.Count(data, "\n"))

addr, closer := startTestServer(t, HandlerFunc(func(m Metric, _ net.Addr) {
addr, closer := startUDPTestServer(t, HandlerFunc(func(m Metric, _ net.Addr) {
atomic.AddInt32(&count, 1)
}))
defer closer.Close()
Expand All @@ -69,6 +91,40 @@ main.http.rtt.seconds:0.001215296|h|#http_req_content_charset:,http_req_content_
}
}

func TestClientWriteLargeMetrics_UDS(t *testing.T) {
const data = `main.http.error.count:0|c|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity
main.http.message.count:1|c|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity,operation:read,type:request
main.http.message.header.size:2|h|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity,operation:read,type:request
main.http.message.header.bytes:240|h|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity,operation:read,type:request
main.http.message.body.bytes:0|h|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity,operation:read,type:request
main.http.message.count:1|c|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity,http_res_content_charset:,http_res_content_endoing:,http_res_content_type:application/json,http_res_protocol:HTTP/1.1,http_res_server:,http_res_transfer_encoding:identity,http_res_upgrade:,http_status:200,http_status_bucket:2xx,operation:write,type:response
main.http.message.header.size:1|h|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity,http_res_content_charset:,http_res_content_endoing:,http_res_content_type:application/json,http_res_protocol:HTTP/1.1,http_res_server:,http_res_transfer_encoding:identity,http_res_upgrade:,http_status:200,http_status_bucket:2xx,operation:write,type:response
main.http.message.header.bytes:70|h|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity,http_res_content_charset:,http_res_content_endoing:,http_res_content_type:application/json,http_res_protocol:HTTP/1.1,http_res_server:,http_res_transfer_encoding:identity,http_res_upgrade:,http_status:200,http_status_bucket:2xx,operation:write,type:response
main.http.message.body.bytes:839|h|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity,http_res_content_charset:,http_res_content_endoing:,http_res_content_type:application/json,http_res_protocol:HTTP/1.1,http_res_server:,http_res_transfer_encoding:identity,http_res_upgrade:,http_status:200,http_status_bucket:2xx,operation:write,type:response
main.http.rtt.seconds:0.001215296|h|#http_req_content_charset:,http_req_content_endoing:,http_req_content_type:,http_req_host:localhost:3011,http_req_method:GET,http_req_protocol:HTTP/1.1,http_req_transfer_encoding:identity,http_res_content_charset:,http_res_content_endoing:,http_res_content_type:application/json,http_res_protocol:HTTP/1.1,http_res_server:,http_res_transfer_encoding:identity,http_res_upgrade:,http_status:200,http_status_bucket:2xx,operation:write,type:response
`

count := int32(0)
expect := int32(strings.Count(data, "\n"))

addr, closer := startUDSTestServer(t, HandlerFunc(func(m Metric, _ net.Addr) {
atomic.AddInt32(&count, 1)
}))
defer closer.Close()

client := NewClient("unixgram://" + addr)

if _, err := client.Write([]byte(data)); err != nil {
t.Error(err)
}

time.Sleep(100 * time.Millisecond)

if n := atomic.LoadInt32(&count); n != expect {
t.Error("bad metric count:", n)
}
}

func BenchmarkClient(b *testing.B) {
log.SetOutput(ioutil.Discard)

Expand Down
70 changes: 68 additions & 2 deletions datadog/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package datadog

import (
"io"
"io/ioutil"
"net"
"os"
"path/filepath"
"sync/atomic"
"testing"
"time"
Expand All @@ -17,7 +20,7 @@ func TestServer(t *testing.T) {
b := uint32(0)
c := uint32(0)

addr, closer := startTestServer(t, HandlerFunc(func(m Metric, _ net.Addr) {
addr, closer := startUDPTestServer(t, HandlerFunc(func(m Metric, _ net.Addr) {
switch m.Name {
case "datadog.test.A":
atomic.AddUint32(&a, uint32(m.Value))
Expand Down Expand Up @@ -68,7 +71,7 @@ func TestServer(t *testing.T) {
}
}

func startTestServer(t *testing.T, handler Handler) (addr string, closer io.Closer) {
func startUDPTestServer(t *testing.T, handler Handler) (addr string, closer io.Closer) {
conn, err := net.ListenPacket("udp", "127.0.0.1:0")

if err != nil {
Expand All @@ -80,3 +83,66 @@ func startTestServer(t *testing.T, handler Handler) (addr string, closer io.Clos

return conn.LocalAddr().String(), conn
}

// startUDSTestServerWithSocketFile starts a UDS server with a given socket file
func startUDSTestServerWithSocketFile(t *testing.T, socketPath string, handler Handler) (closer io.Closer) {
udsAddr, err := net.ResolveUnixAddr("unixgram", socketPath)
if err != nil {
t.Error(err)
t.FailNow()
}

conn, err := net.ListenUnixgram("unixgram", udsAddr)
if err != nil {
t.Error(err)
t.FailNow()
}

go Serve(conn, handler)

return &testUnixgramServer{
UnixConn: conn,
pathToDelete: socketPath,
}
}

// startUDSTestServer starts a Unix domain socket server with a random socket file
func startUDSTestServer(t *testing.T, handler Handler) (socketPath string, closer io.Closer) {
dir, err := ioutil.TempDir("", "socket")
if err != nil {
t.Error(err)
t.FailNow()
}

socketPath = filepath.Join(dir, "dsd.socket")

udsAddr, err := net.ResolveUnixAddr("unixgram", socketPath)
if err != nil {
t.Error(err)
t.FailNow()
}

conn, err := net.ListenUnixgram("unixgram", udsAddr)
if err != nil {
t.Error(err)
t.FailNow()
}

ts := testUnixgramServer{
UnixConn: conn,
pathToDelete: dir, // so we delete any tmp dir we created
}

go Serve(conn, handler)
return socketPath, &ts
}

type testUnixgramServer struct {
*net.UnixConn
pathToDelete string
}

func (ts *testUnixgramServer) Close() error {
os.RemoveAll(ts.pathToDelete) // clean up
return ts.UnixConn.Close()
}
40 changes: 40 additions & 0 deletions datadog/udp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package datadog

import "net"

type udpWriter struct {
conn net.Conn
}

// newUDPWriter returns a pointer to a new newUDPWriter given a socket file path as addr.
func newUDPWriter(addr string) (*udpWriter, error) {
udpAddr, err := net.ResolveUDPAddr("udp", addr)
if err != nil {
return nil, err
}
conn, err := net.DialUDP("udp", nil, udpAddr)
if err != nil {
return nil, err
}
return &udpWriter{conn: conn}, nil

}

// Write data to the UDP connection
func (w *udpWriter) Write(data []byte) (int, error) {
return w.conn.Write(data)
}

func (w *udpWriter) Close() error {
return w.conn.Close()
}

func (w *udpWriter) CalcBufferSize(sizehint int) (int, error) {
f, err := w.conn.(*net.UDPConn).File()
if err != nil {
return 0, err
}
defer f.Close()

return bufSizeFromFD(f, sizehint)
}
Loading