diff --git a/exporter/clickhousemetricsexporter/clickhouse.go b/exporter/clickhousemetricsexporter/clickhouse.go index e0a42f34..dfcbe889 100644 --- a/exporter/clickhousemetricsexporter/clickhouse.go +++ b/exporter/clickhousemetricsexporter/clickhouse.go @@ -28,7 +28,6 @@ import ( clickhouse "github.com/ClickHouse/clickhouse-go/v2" "github.com/ClickHouse/clickhouse-go/v2/lib/driver" "github.com/google/uuid" - "github.com/jellydator/ttlcache/v3" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/sirupsen/logrus" @@ -65,8 +64,6 @@ type clickHouse struct { database string maxTimeSeriesInQuery int - cache *ttlcache.Cache[uint64, bool] - timeSeriesRW sync.RWMutex // Maintains the lookup map for fingerprints that are // written to time series table. This map is used to eliminate the @@ -116,19 +113,11 @@ func NewClickHouse(params *ClickHouseParams) (base.Storage, error) { return nil, fmt.Errorf("could not connect to clickhouse: %s", err) } - cache := ttlcache.New[uint64, bool]( - ttlcache.WithTTL[uint64, bool](45*time.Minute), - ttlcache.WithDisableTouchOnHit[uint64, bool](), - ) - - go cache.Start() - ch := &clickHouse{ conn: conn, l: l, database: options.Auth.Database, maxTimeSeriesInQuery: params.MaxTimeSeriesInQuery, - cache: cache, timeSeries: make(map[uint64]struct{}, 8192), @@ -306,6 +295,48 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr return err } + // Write to distributed_time_series_v3 table + err = func() error { + + statement, err := ch.conn.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s (env, temporality, metric_name, fingerprint, timestamp_ms, labels, description, unit, type, is_monotonic) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", ch.database, DISTRIBUTED_TIME_SERIES_TABLE_V3), driver.WithReleaseConnection()) + if err != nil { + return err + } + timestamp := model.Now().Time().UnixMilli() + for fingerprint, labels := range newTimeSeries { + encodedLabels := string(marshalLabels(labels, make([]byte, 0, 128))) + meta := metricNameToMeta[fingerprintToName[fingerprint][nameLabel]] + err = statement.Append( + fingerprintToName[fingerprint][envLabel], + meta.Temporality.String(), + fingerprintToName[fingerprint][nameLabel], + fingerprint, + timestamp, + encodedLabels, + meta.Description, + meta.Unit, + meta.Typ.String(), + meta.IsMonotonic, + ) + if err != nil { + return err + } + } + + start := time.Now() + err = statement.Send() + ctx, _ = tag.New(ctx, + tag.Upsert(exporterKey, string(component.DataTypeMetrics.String())), + tag.Upsert(tableKey, DISTRIBUTED_TIME_SERIES_TABLE_V3), + ) + stats.Record(ctx, writeLatencyMillis.M(int64(time.Since(start).Milliseconds()))) + return err + }() + + if err != nil { + return err + } + metrics := map[string]usage.Metric{} err = func() error { ctx := context.Background() @@ -416,9 +447,6 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr unixMilli := model.Now().Time().UnixMilli() / 3600000 * 3600000 for fingerprint, labels := range timeSeries { - if ch.cache.Get(fingerprint) != nil && ch.cache.Get(fingerprint).Value() { - continue - } encodedLabels := string(marshalLabels(labels, make([]byte, 0, 128))) meta := metricNameToMeta[fingerprintToName[fingerprint][nameLabel]] err = statement.Append( @@ -436,7 +464,6 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr if err != nil { return err } - ch.cache.Set(fingerprint, true, ttlcache.DefaultTTL) } start := time.Now() diff --git a/go.mod b/go.mod index f1484357..d07e970b 100644 --- a/go.mod +++ b/go.mod @@ -20,7 +20,6 @@ require ( github.com/gorilla/mux v1.8.1 github.com/gorilla/websocket v1.5.0 github.com/hashicorp/golang-lru v1.0.2 - github.com/jellydator/ttlcache/v3 v3.2.0 github.com/knadh/koanf v1.5.0 github.com/lightstep/go-expohisto v1.0.0 github.com/oklog/ulid v1.3.1 diff --git a/go.sum b/go.sum index bca96a6b..d1077693 100644 --- a/go.sum +++ b/go.sum @@ -753,8 +753,6 @@ github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh6 github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs= github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= -github.com/jellydator/ttlcache/v3 v3.2.0 h1:6lqVJ8X3ZaUwvzENqPAobDsXNExfUJd61u++uW8a3LE= -github.com/jellydator/ttlcache/v3 v3.2.0/go.mod h1:hi7MGFdMAwZna5n2tuvh63DvFLzVKySzCVW6+0gA2n4= github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik= github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=