diff --git a/.chloggen/doris-logs.yaml b/.chloggen/doris-logs.yaml new file mode 100644 index 000000000000..70ea260e9dcf --- /dev/null +++ b/.chloggen/doris-logs.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: new_component + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: dorisexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: logs implementation + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [33479] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/dorisexporter/exporter_common_test.go b/exporter/dorisexporter/exporter_common_test.go index aa0a93c695eb..77ecc8164bbf 100644 --- a/exporter/dorisexporter/exporter_common_test.go +++ b/exporter/dorisexporter/exporter_common_test.go @@ -4,6 +4,7 @@ package dorisexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/dorisexporter" import ( + "net" "testing" "time" @@ -51,3 +52,21 @@ func TestStreamLoadUrl(t *testing.T) { url := streamLoadURL("http://doris:8030", "otel", "otel_logs") require.Equal(t, "http://doris:8030/api/otel/otel_logs/_stream_load", url) } + +func findRandomPort() (int, error) { + l, err := net.Listen("tcp", "localhost:0") + + if err != nil { + return 0, err + } + + port := l.Addr().(*net.TCPAddr).Port + + err = l.Close() + + if err != nil { + return 0, err + } + + return port, nil +} diff --git a/exporter/dorisexporter/exporter_logs.go b/exporter/dorisexporter/exporter_logs.go new file mode 100644 index 000000000000..bf854e1a8943 --- /dev/null +++ b/exporter/dorisexporter/exporter_logs.go @@ -0,0 +1,159 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package dorisexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/dorisexporter" + +import ( + "context" + _ "embed" // for SQL file embedding + "encoding/json" + "fmt" + "io" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pdata/plog" + semconv "go.opentelemetry.io/collector/semconv/v1.25.0" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/traceutil" +) + +//go:embed sql/logs_ddl.sql +var logsDDL string + +// dLog Log to Doris +type dLog struct { + ServiceName string `json:"service_name"` + Timestamp string `json:"timestamp"` + TraceID string `json:"trace_id"` + SpanID string `json:"span_id"` + SeverityNumber int32 `json:"severity_number"` + SeverityText string `json:"severity_text"` + Body string `json:"body"` + ResourceAttributes map[string]any `json:"resource_attributes"` + LogAttributes map[string]any `json:"log_attributes"` + ScopeName string `json:"scope_name"` + ScopeVersion string `json:"scope_version"` +} + +type logsExporter struct { + *commonExporter +} + +func newLogsExporter(logger *zap.Logger, cfg *Config, set component.TelemetrySettings) *logsExporter { + return &logsExporter{ + commonExporter: newExporter(logger, cfg, set), + } +} + +func (e *logsExporter) start(ctx context.Context, host component.Host) error { + client, err := createDorisHTTPClient(ctx, e.cfg, host, e.TelemetrySettings) + if err != nil { + return err + } + e.client = client + + if !e.cfg.CreateSchema { + return nil + } + + conn, err := createDorisMySQLClient(e.cfg) + if err != nil { + return err + } + defer conn.Close() + + err = createAndUseDatabase(ctx, conn, e.cfg) + if err != nil { + return err + } + + ddl := fmt.Sprintf(logsDDL, e.cfg.Table.Logs, e.cfg.propertiesStr()) + _, err = conn.ExecContext(ctx, ddl) + return err +} + +func (e *logsExporter) shutdown(_ context.Context) error { + if e.client != nil { + e.client.CloseIdleConnections() + } + return nil +} + +func (e *logsExporter) pushLogData(ctx context.Context, ld plog.Logs) error { + logs := make([]*dLog, 0, ld.LogRecordCount()) + + for i := 0; i < ld.ResourceLogs().Len(); i++ { + resourceLogs := ld.ResourceLogs().At(i) + resource := resourceLogs.Resource() + resourceAttributes := resource.Attributes() + serviceName := "" + v, ok := resourceAttributes.Get(semconv.AttributeServiceName) + if ok { + serviceName = v.AsString() + } + + for j := 0; j < resourceLogs.ScopeLogs().Len(); j++ { + scopeLogs := resourceLogs.ScopeLogs().At(j) + + for k := 0; k < scopeLogs.LogRecords().Len(); k++ { + logRecord := scopeLogs.LogRecords().At(k) + + log := &dLog{ + ServiceName: serviceName, + Timestamp: e.formatTime(logRecord.Timestamp().AsTime()), + TraceID: traceutil.TraceIDToHexOrEmptyString(logRecord.TraceID()), + SpanID: traceutil.SpanIDToHexOrEmptyString(logRecord.SpanID()), + SeverityNumber: int32(logRecord.SeverityNumber()), + SeverityText: logRecord.SeverityText(), + Body: logRecord.Body().AsString(), + ResourceAttributes: resourceAttributes.AsRaw(), + LogAttributes: logRecord.Attributes().AsRaw(), + ScopeName: scopeLogs.Scope().Name(), + ScopeVersion: scopeLogs.Scope().Version(), + } + + logs = append(logs, log) + } + + } + + } + + return e.pushLogDataInternal(ctx, logs) +} + +func (e *logsExporter) pushLogDataInternal(ctx context.Context, logs []*dLog) error { + marshal, err := json.Marshal(logs) + if err != nil { + return err + } + + req, err := streamLoadRequest(ctx, e.cfg, e.cfg.Table.Logs, marshal) + if err != nil { + return err + } + + res, err := e.client.Do(req) + if err != nil { + return err + } + defer res.Body.Close() + + body, err := io.ReadAll(res.Body) + if err != nil { + return err + } + + response := streamLoadResponse{} + err = json.Unmarshal(body, &response) + if err != nil { + return err + } + + if !response.success() { + return fmt.Errorf("failed to push log data: %s", response.Message) + } + + return nil +} diff --git a/exporter/dorisexporter/exporter_logs_test.go b/exporter/dorisexporter/exporter_logs_test.go new file mode 100644 index 000000000000..8e28aa1ec73b --- /dev/null +++ b/exporter/dorisexporter/exporter_logs_test.go @@ -0,0 +1,89 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package dorisexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/dorisexporter" + +import ( + "context" + "fmt" + "net/http" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + semconv "go.opentelemetry.io/collector/semconv/v1.25.0" +) + +func TestPushLogData(t *testing.T) { + port, err := findRandomPort() + require.NoError(t, err) + + config := createDefaultConfig().(*Config) + config.Endpoint = fmt.Sprintf("http://127.0.0.1:%d", port) + config.CreateSchema = false + + err = config.Validate() + require.NoError(t, err) + + exporter := newLogsExporter(nil, config, testTelemetrySettings) + + ctx := context.Background() + + client, err := createDorisHTTPClient(ctx, config, nil, testTelemetrySettings) + require.NoError(t, err) + require.NotNil(t, client) + + exporter.client = client + + defer func() { + _ = exporter.shutdown(ctx) + }() + + server := &http.Server{ + ReadTimeout: 3 * time.Second, + Addr: fmt.Sprintf(":%d", port), + } + + go func() { + http.HandleFunc("/api/otel/otel_logs/_stream_load", func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{"Status":"Success"}`)) + }) + err = server.ListenAndServe() + require.Equal(t, http.ErrServerClosed, err) + }() + + err0 := fmt.Errorf("Not Started") + for err0 != nil { // until server started + err0 = exporter.pushLogData(ctx, simpleLogs(10)) + time.Sleep(100 * time.Millisecond) + } + + _ = server.Shutdown(ctx) +} + +func simpleLogs(count int) plog.Logs { + logs := plog.NewLogs() + rl := logs.ResourceLogs().AppendEmpty() + rl.Resource().Attributes().PutStr("service.name", "test-service") + sl := rl.ScopeLogs().AppendEmpty() + sl.Scope().SetName("io.opentelemetry.contrib.doris") + sl.Scope().SetVersion("1.0.0") + sl.Scope().Attributes().PutStr("lib", "doris") + timestamp := time.Now() + for i := 0; i < count; i++ { + r := sl.LogRecords().AppendEmpty() + r.SetTimestamp(pcommon.NewTimestampFromTime(timestamp)) + r.SetObservedTimestamp(pcommon.NewTimestampFromTime(timestamp)) + r.SetSeverityNumber(plog.SeverityNumberError2) + r.SetSeverityText("error") + r.Body().SetStr("error message") + r.Attributes().PutStr(semconv.AttributeServiceNamespace, "default") + r.SetFlags(plog.DefaultLogRecordFlags) + r.SetTraceID([16]byte{1, 2, 3, byte(i)}) + r.SetSpanID([8]byte{1, 2, 3, byte(i)}) + } + return logs +} diff --git a/exporter/dorisexporter/exporter_traces_test.go b/exporter/dorisexporter/exporter_traces_test.go index efc1f1bdca5d..aafc08416132 100644 --- a/exporter/dorisexporter/exporter_traces_test.go +++ b/exporter/dorisexporter/exporter_traces_test.go @@ -5,6 +5,7 @@ package dorisexporter // import "github.com/open-telemetry/opentelemetry-collect import ( "context" + "fmt" "net/http" "testing" "time" @@ -16,11 +17,14 @@ import ( ) func TestPushTraceData(t *testing.T) { + port, err := findRandomPort() + require.NoError(t, err) + config := createDefaultConfig().(*Config) - config.Endpoint = "http://127.0.0.1:18030" + config.Endpoint = fmt.Sprintf("http://127.0.0.1:%d", port) config.CreateSchema = false - err := config.Validate() + err = config.Validate() require.NoError(t, err) exporter := newTracesExporter(nil, config, testTelemetrySettings) @@ -38,21 +42,24 @@ func TestPushTraceData(t *testing.T) { }() server := &http.Server{ - ReadTimeout: 5 * time.Second, + ReadTimeout: 3 * time.Second, + Addr: fmt.Sprintf(":%d", port), } + go func() { http.HandleFunc("/api/otel/otel_traces/_stream_load", func(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte(`{"Status":"Success"}`)) }) - server.Addr = ":18030" - _ = server.ListenAndServe() + err = server.ListenAndServe() + require.Equal(t, http.ErrServerClosed, err) }() - time.Sleep(1 * time.Second) - - err = exporter.pushTraceData(ctx, simpleTraces(10)) - require.NoError(t, err) + err0 := fmt.Errorf("Not Started") + for err0 != nil { // until server started + err0 = exporter.pushTraceData(ctx, simpleTraces(10)) + time.Sleep(100 * time.Millisecond) + } _ = server.Shutdown(ctx) } diff --git a/exporter/dorisexporter/factory.go b/exporter/dorisexporter/factory.go index ea7c7e50a834..be7259cb4a9e 100644 --- a/exporter/dorisexporter/factory.go +++ b/exporter/dorisexporter/factory.go @@ -12,7 +12,6 @@ import ( "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper" - "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/dorisexporter/internal/metadata" @@ -51,13 +50,19 @@ func createDefaultConfig() component.Config { } func createLogsExporter(ctx context.Context, set exporter.Settings, cfg component.Config) (exporter.Logs, error) { + c := cfg.(*Config) + exporter := newLogsExporter(set.Logger, c, set.TelemetrySettings) return exporterhelper.NewLogsExporter( ctx, set, cfg, - func(_ context.Context, _ plog.Logs) error { - return nil - }, + exporter.pushLogData, + exporterhelper.WithStart(exporter.start), + exporterhelper.WithShutdown(exporter.shutdown), + // we config the timeout option in http client, so we don't need to set timeout here + exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}), + exporterhelper.WithQueue(c.QueueSettings), + exporterhelper.WithRetry(c.BackOffConfig), ) } diff --git a/exporter/dorisexporter/sql/logs_ddl.sql b/exporter/dorisexporter/sql/logs_ddl.sql new file mode 100644 index 000000000000..b69abf71a8b4 --- /dev/null +++ b/exporter/dorisexporter/sql/logs_ddl.sql @@ -0,0 +1,30 @@ +CREATE TABLE IF NOT EXISTS %s +( + service_name VARCHAR(200), + timestamp DATETIME(6), + trace_id VARCHAR(200), + span_id STRING, + severity_number INT, + severity_text STRING, + body STRING, + resource_attributes VARIANT, + log_attributes VARIANT, + scope_name STRING, + scope_version STRING, + INDEX idx_service_name(service_name) USING INVERTED, + INDEX idx_timestamp(timestamp) USING INVERTED, + INDEX idx_trace_id(trace_id) USING INVERTED, + INDEX idx_span_id(span_id) USING INVERTED, + INDEX idx_severity_number(severity_number) USING INVERTED, + INDEX idx_body(body) USING INVERTED PROPERTIES("parser"="unicode", "support_phrase"="true"), + INDEX idx_severity_text(severity_text) USING INVERTED, + INDEX idx_resource_attributes(resource_attributes) USING INVERTED, + INDEX idx_log_attributes(log_attributes) USING INVERTED, + INDEX idx_scope_name(scope_name) USING INVERTED, + INDEX idx_scope_version(scope_version) USING INVERTED +) +ENGINE = OLAP +DUPLICATE KEY(service_name, timestamp) +PARTITION BY RANGE(timestamp) () +DISTRIBUTED BY HASH(trace_id) BUCKETS AUTO +%s; \ No newline at end of file