Skip to content

Commit

Permalink
Revert "Skip write to time_series_v4 table for the same fingerprint i…
Browse files Browse the repository at this point in the history
…n curren…" (#335)

This reverts commit f60d60d.
  • Loading branch information
srikanthccv committed Jun 14, 2024
1 parent f60d60d commit fd180d5
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 18 deletions.
57 changes: 42 additions & 15 deletions exporter/clickhousemetricsexporter/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
clickhouse "github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"github.com/google/uuid"
"github.com/jellydator/ttlcache/v3"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -65,8 +64,6 @@ type clickHouse struct {
database string
maxTimeSeriesInQuery int

cache *ttlcache.Cache[uint64, bool]

timeSeriesRW sync.RWMutex
// Maintains the lookup map for fingerprints that are
// written to time series table. This map is used to eliminate the
Expand Down Expand Up @@ -116,19 +113,11 @@ func NewClickHouse(params *ClickHouseParams) (base.Storage, error) {
return nil, fmt.Errorf("could not connect to clickhouse: %s", err)
}

cache := ttlcache.New[uint64, bool](
ttlcache.WithTTL[uint64, bool](45*time.Minute),
ttlcache.WithDisableTouchOnHit[uint64, bool](),
)

go cache.Start()

ch := &clickHouse{
conn: conn,
l: l,
database: options.Auth.Database,
maxTimeSeriesInQuery: params.MaxTimeSeriesInQuery,
cache: cache,

timeSeries: make(map[uint64]struct{}, 8192),

Expand Down Expand Up @@ -306,6 +295,48 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr
return err
}

// Write to distributed_time_series_v3 table
err = func() error {

statement, err := ch.conn.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s (env, temporality, metric_name, fingerprint, timestamp_ms, labels, description, unit, type, is_monotonic) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", ch.database, DISTRIBUTED_TIME_SERIES_TABLE_V3), driver.WithReleaseConnection())
if err != nil {
return err
}
timestamp := model.Now().Time().UnixMilli()
for fingerprint, labels := range newTimeSeries {
encodedLabels := string(marshalLabels(labels, make([]byte, 0, 128)))
meta := metricNameToMeta[fingerprintToName[fingerprint][nameLabel]]
err = statement.Append(
fingerprintToName[fingerprint][envLabel],
meta.Temporality.String(),
fingerprintToName[fingerprint][nameLabel],
fingerprint,
timestamp,
encodedLabels,
meta.Description,
meta.Unit,
meta.Typ.String(),
meta.IsMonotonic,
)
if err != nil {
return err
}
}

start := time.Now()
err = statement.Send()
ctx, _ = tag.New(ctx,
tag.Upsert(exporterKey, string(component.DataTypeMetrics.String())),
tag.Upsert(tableKey, DISTRIBUTED_TIME_SERIES_TABLE_V3),
)
stats.Record(ctx, writeLatencyMillis.M(int64(time.Since(start).Milliseconds())))
return err
}()

if err != nil {
return err
}

metrics := map[string]usage.Metric{}
err = func() error {
ctx := context.Background()
Expand Down Expand Up @@ -416,9 +447,6 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr
unixMilli := model.Now().Time().UnixMilli() / 3600000 * 3600000

for fingerprint, labels := range timeSeries {
if ch.cache.Get(fingerprint) != nil && ch.cache.Get(fingerprint).Value() {
continue
}
encodedLabels := string(marshalLabels(labels, make([]byte, 0, 128)))
meta := metricNameToMeta[fingerprintToName[fingerprint][nameLabel]]
err = statement.Append(
Expand All @@ -436,7 +464,6 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr
if err != nil {
return err
}
ch.cache.Set(fingerprint, true, ttlcache.DefaultTTL)
}

start := time.Now()
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ require (
github.com/gorilla/mux v1.8.1
github.com/gorilla/websocket v1.5.0
github.com/hashicorp/golang-lru v1.0.2
github.com/jellydator/ttlcache/v3 v3.2.0
github.com/knadh/koanf v1.5.0
github.com/lightstep/go-expohisto v1.0.0
github.com/oklog/ulid v1.3.1
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -753,8 +753,6 @@ github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh6
github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs=
github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY=
github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc=
github.com/jellydator/ttlcache/v3 v3.2.0 h1:6lqVJ8X3ZaUwvzENqPAobDsXNExfUJd61u++uW8a3LE=
github.com/jellydator/ttlcache/v3 v3.2.0/go.mod h1:hi7MGFdMAwZna5n2tuvh63DvFLzVKySzCVW6+0gA2n4=
github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
Expand Down

0 comments on commit fd180d5

Please sign in to comment.