Skip to content

Commit

Permalink
[exporter/doris] Second PR (with logs Implementation) of New componen…
Browse files Browse the repository at this point in the history
…t: Doris Exporter (#35150)

**Description:** <Describe what has changed.>

Second PR of New component: Doris Exporter. Implementation of logs.

<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->

**Link to tracking Issue:** #33479 

**Testing:** 

Tested via unit test.

**Documentation:** <Describe the documentation added.>

No additional documentation.
  • Loading branch information
joker-star-l committed Sep 16, 2024
1 parent 1b03346 commit 1057f72
Show file tree
Hide file tree
Showing 7 changed files with 349 additions and 13 deletions.
27 changes: 27 additions & 0 deletions .chloggen/doris-logs.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: new_component

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: dorisexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: logs implementation

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [33479]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
19 changes: 19 additions & 0 deletions exporter/dorisexporter/exporter_common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package dorisexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/dorisexporter"

import (
"net"
"testing"
"time"

Expand Down Expand Up @@ -51,3 +52,21 @@ func TestStreamLoadUrl(t *testing.T) {
url := streamLoadURL("http://doris:8030", "otel", "otel_logs")
require.Equal(t, "http://doris:8030/api/otel/otel_logs/_stream_load", url)
}

func findRandomPort() (int, error) {
l, err := net.Listen("tcp", "localhost:0")

if err != nil {
return 0, err
}

port := l.Addr().(*net.TCPAddr).Port

err = l.Close()

if err != nil {
return 0, err
}

return port, nil
}
159 changes: 159 additions & 0 deletions exporter/dorisexporter/exporter_logs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package dorisexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/dorisexporter"

import (
"context"
_ "embed" // for SQL file embedding
"encoding/json"
"fmt"
"io"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/plog"
semconv "go.opentelemetry.io/collector/semconv/v1.25.0"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/traceutil"
)

//go:embed sql/logs_ddl.sql
var logsDDL string

// dLog Log to Doris
type dLog struct {
ServiceName string `json:"service_name"`
Timestamp string `json:"timestamp"`
TraceID string `json:"trace_id"`
SpanID string `json:"span_id"`
SeverityNumber int32 `json:"severity_number"`
SeverityText string `json:"severity_text"`
Body string `json:"body"`
ResourceAttributes map[string]any `json:"resource_attributes"`
LogAttributes map[string]any `json:"log_attributes"`
ScopeName string `json:"scope_name"`
ScopeVersion string `json:"scope_version"`
}

type logsExporter struct {
*commonExporter
}

func newLogsExporter(logger *zap.Logger, cfg *Config, set component.TelemetrySettings) *logsExporter {
return &logsExporter{
commonExporter: newExporter(logger, cfg, set),
}
}

func (e *logsExporter) start(ctx context.Context, host component.Host) error {
client, err := createDorisHTTPClient(ctx, e.cfg, host, e.TelemetrySettings)
if err != nil {
return err
}
e.client = client

if !e.cfg.CreateSchema {
return nil
}

conn, err := createDorisMySQLClient(e.cfg)
if err != nil {
return err
}
defer conn.Close()

err = createAndUseDatabase(ctx, conn, e.cfg)
if err != nil {
return err
}

ddl := fmt.Sprintf(logsDDL, e.cfg.Table.Logs, e.cfg.propertiesStr())
_, err = conn.ExecContext(ctx, ddl)
return err
}

func (e *logsExporter) shutdown(_ context.Context) error {
if e.client != nil {
e.client.CloseIdleConnections()
}
return nil
}

func (e *logsExporter) pushLogData(ctx context.Context, ld plog.Logs) error {
logs := make([]*dLog, 0, ld.LogRecordCount())

for i := 0; i < ld.ResourceLogs().Len(); i++ {
resourceLogs := ld.ResourceLogs().At(i)
resource := resourceLogs.Resource()
resourceAttributes := resource.Attributes()
serviceName := ""
v, ok := resourceAttributes.Get(semconv.AttributeServiceName)
if ok {
serviceName = v.AsString()
}

for j := 0; j < resourceLogs.ScopeLogs().Len(); j++ {
scopeLogs := resourceLogs.ScopeLogs().At(j)

for k := 0; k < scopeLogs.LogRecords().Len(); k++ {
logRecord := scopeLogs.LogRecords().At(k)

log := &dLog{
ServiceName: serviceName,
Timestamp: e.formatTime(logRecord.Timestamp().AsTime()),
TraceID: traceutil.TraceIDToHexOrEmptyString(logRecord.TraceID()),
SpanID: traceutil.SpanIDToHexOrEmptyString(logRecord.SpanID()),
SeverityNumber: int32(logRecord.SeverityNumber()),
SeverityText: logRecord.SeverityText(),
Body: logRecord.Body().AsString(),
ResourceAttributes: resourceAttributes.AsRaw(),
LogAttributes: logRecord.Attributes().AsRaw(),
ScopeName: scopeLogs.Scope().Name(),
ScopeVersion: scopeLogs.Scope().Version(),
}

logs = append(logs, log)
}

}

}

return e.pushLogDataInternal(ctx, logs)
}

func (e *logsExporter) pushLogDataInternal(ctx context.Context, logs []*dLog) error {
marshal, err := json.Marshal(logs)
if err != nil {
return err
}

req, err := streamLoadRequest(ctx, e.cfg, e.cfg.Table.Logs, marshal)
if err != nil {
return err
}

res, err := e.client.Do(req)
if err != nil {
return err
}
defer res.Body.Close()

body, err := io.ReadAll(res.Body)
if err != nil {
return err
}

response := streamLoadResponse{}
err = json.Unmarshal(body, &response)
if err != nil {
return err
}

if !response.success() {
return fmt.Errorf("failed to push log data: %s", response.Message)
}

return nil
}
89 changes: 89 additions & 0 deletions exporter/dorisexporter/exporter_logs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package dorisexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/dorisexporter"

import (
"context"
"fmt"
"net/http"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
semconv "go.opentelemetry.io/collector/semconv/v1.25.0"
)

func TestPushLogData(t *testing.T) {
port, err := findRandomPort()
require.NoError(t, err)

config := createDefaultConfig().(*Config)
config.Endpoint = fmt.Sprintf("http://127.0.0.1:%d", port)
config.CreateSchema = false

err = config.Validate()
require.NoError(t, err)

exporter := newLogsExporter(nil, config, testTelemetrySettings)

ctx := context.Background()

client, err := createDorisHTTPClient(ctx, config, nil, testTelemetrySettings)
require.NoError(t, err)
require.NotNil(t, client)

exporter.client = client

defer func() {
_ = exporter.shutdown(ctx)
}()

server := &http.Server{
ReadTimeout: 3 * time.Second,
Addr: fmt.Sprintf(":%d", port),
}

go func() {
http.HandleFunc("/api/otel/otel_logs/_stream_load", func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{"Status":"Success"}`))
})
err = server.ListenAndServe()
require.Equal(t, http.ErrServerClosed, err)
}()

err0 := fmt.Errorf("Not Started")
for err0 != nil { // until server started
err0 = exporter.pushLogData(ctx, simpleLogs(10))
time.Sleep(100 * time.Millisecond)
}

_ = server.Shutdown(ctx)
}

func simpleLogs(count int) plog.Logs {
logs := plog.NewLogs()
rl := logs.ResourceLogs().AppendEmpty()
rl.Resource().Attributes().PutStr("service.name", "test-service")
sl := rl.ScopeLogs().AppendEmpty()
sl.Scope().SetName("io.opentelemetry.contrib.doris")
sl.Scope().SetVersion("1.0.0")
sl.Scope().Attributes().PutStr("lib", "doris")
timestamp := time.Now()
for i := 0; i < count; i++ {
r := sl.LogRecords().AppendEmpty()
r.SetTimestamp(pcommon.NewTimestampFromTime(timestamp))
r.SetObservedTimestamp(pcommon.NewTimestampFromTime(timestamp))
r.SetSeverityNumber(plog.SeverityNumberError2)
r.SetSeverityText("error")
r.Body().SetStr("error message")
r.Attributes().PutStr(semconv.AttributeServiceNamespace, "default")
r.SetFlags(plog.DefaultLogRecordFlags)
r.SetTraceID([16]byte{1, 2, 3, byte(i)})
r.SetSpanID([8]byte{1, 2, 3, byte(i)})
}
return logs
}
25 changes: 16 additions & 9 deletions exporter/dorisexporter/exporter_traces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package dorisexporter // import "github.com/open-telemetry/opentelemetry-collect

import (
"context"
"fmt"
"net/http"
"testing"
"time"
Expand All @@ -16,11 +17,14 @@ import (
)

func TestPushTraceData(t *testing.T) {
port, err := findRandomPort()
require.NoError(t, err)

config := createDefaultConfig().(*Config)
config.Endpoint = "http://127.0.0.1:18030"
config.Endpoint = fmt.Sprintf("http://127.0.0.1:%d", port)
config.CreateSchema = false

err := config.Validate()
err = config.Validate()
require.NoError(t, err)

exporter := newTracesExporter(nil, config, testTelemetrySettings)
Expand All @@ -38,21 +42,24 @@ func TestPushTraceData(t *testing.T) {
}()

server := &http.Server{
ReadTimeout: 5 * time.Second,
ReadTimeout: 3 * time.Second,
Addr: fmt.Sprintf(":%d", port),
}

go func() {
http.HandleFunc("/api/otel/otel_traces/_stream_load", func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{"Status":"Success"}`))
})
server.Addr = ":18030"
_ = server.ListenAndServe()
err = server.ListenAndServe()
require.Equal(t, http.ErrServerClosed, err)
}()

time.Sleep(1 * time.Second)

err = exporter.pushTraceData(ctx, simpleTraces(10))
require.NoError(t, err)
err0 := fmt.Errorf("Not Started")
for err0 != nil { // until server started
err0 = exporter.pushTraceData(ctx, simpleTraces(10))
time.Sleep(100 * time.Millisecond)
}

_ = server.Shutdown(ctx)
}
Expand Down
13 changes: 9 additions & 4 deletions exporter/dorisexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/dorisexporter/internal/metadata"
Expand Down Expand Up @@ -51,13 +50,19 @@ func createDefaultConfig() component.Config {
}

func createLogsExporter(ctx context.Context, set exporter.Settings, cfg component.Config) (exporter.Logs, error) {
c := cfg.(*Config)
exporter := newLogsExporter(set.Logger, c, set.TelemetrySettings)
return exporterhelper.NewLogsExporter(
ctx,
set,
cfg,
func(_ context.Context, _ plog.Logs) error {
return nil
},
exporter.pushLogData,
exporterhelper.WithStart(exporter.start),
exporterhelper.WithShutdown(exporter.shutdown),
// we config the timeout option in http client, so we don't need to set timeout here
exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}),
exporterhelper.WithQueue(c.QueueSettings),
exporterhelper.WithRetry(c.BackOffConfig),
)
}

Expand Down
Loading

0 comments on commit 1057f72

Please sign in to comment.