From a0574d7daa6d81215749196e348762da54f65dc0 Mon Sep 17 00:00:00 2001 From: Justin Lei Date: Fri, 26 Apr 2024 10:56:21 -0700 Subject: [PATCH] Scripts to find discrepancies in aggregated histograms --- cmd/foobar/main.go | 185 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 185 insertions(+) create mode 100644 cmd/foobar/main.go diff --git a/cmd/foobar/main.go b/cmd/foobar/main.go new file mode 100644 index 00000000000..0d1fef196f3 --- /dev/null +++ b/cmd/foobar/main.go @@ -0,0 +1,185 @@ +package main + +import ( + "bufio" + "context" + "fmt" + "net/url" + "os" + "time" + + commonconfig "github.com/prometheus/common/config" + "github.com/prometheus/common/model" + + "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/prompb" + "github.com/prometheus/prometheus/storage/remote" + "github.com/prometheus/prometheus/tsdb/chunkenc" +) + +const ( + rawURL = "http://localhost:8080/prometheus/api/v1/read" + tenantID = "10428" + histogramBasesFilepath = "/Users/leizor/tmp/science/20240409/histogram_bases.out" +) + +func main() { + histogramBases, err := readHistogramBases() + if err != nil { + panic(err) + } + + client, err := initializeReadClient() + if err != nil { + panic(err) + } + + now := time.Now().Truncate(time.Hour) + + for _, hb := range histogramBases { + // Start at the 12-hour mark and go backwards from there to avoid querying against the ingesters. + for i := 12; i < 24; i++ { + hourOffset := time.Duration((i+2)*-1) * time.Hour + start := now.Add(hourOffset) + + ts, err := checkHistogram(client, hb, start) + if err != nil { + panic(err) + } + + if ts > 0 { + fmt.Printf("Found! %s at %s\n", hb, time.UnixMilli(ts).UTC().String()) + + fmt.Printf(`sum({__aggregation__="%s_bucket:sum:counter",le="+Inf"})`, hb) + fmt.Println() + fmt.Printf(`sum({__aggregation__="%s_count:sum:counter"})`, hb) + fmt.Println() + fmt.Printf("Start: %s, End: %s\n", start.UTC().String(), start.Add(time.Hour).UTC().String()) + fmt.Println() + } + } + } +} + +func readHistogramBases() ([]string, error) { + f, err := os.Open(histogramBasesFilepath) + if err != nil { + return nil, err + } + defer func() { _ = f.Close() }() + + var histogramBases []string + + scanner := bufio.NewScanner(f) + for scanner.Scan() { + hb := scanner.Text() + histogramBases = append(histogramBases, hb) + } + if err = scanner.Err(); err != nil { + return nil, err + } + + return histogramBases, nil +} + +func initializeReadClient() (remote.ReadClient, error) { + u, err := url.Parse(rawURL) + if err != nil { + return nil, err + } + + return remote.NewReadClient("foobar", &remote.ClientConfig{ + URL: &commonconfig.URL{URL: u}, + Headers: map[string]string{ + "x-scope-orgid": tenantID, + }, + Timeout: model.Duration(5 * time.Minute), + ChunkedReadLimit: config.DefaultChunkedReadLimit, + }) +} + +func checkHistogram(client remote.ReadClient, histogramBase string, hourStart time.Time) (int64, error) { + startTimestampMs := hourStart.UnixMilli() + endTimestampMs := hourStart.Add(time.Hour).UnixMilli() + + queries := []*prompb.Query{ + { + StartTimestampMs: startTimestampMs, + EndTimestampMs: endTimestampMs, + Matchers: []*prompb.LabelMatcher{ + { + Type: prompb.LabelMatcher_EQ, + Name: "__aggregation__", + Value: fmt.Sprintf("%s_count:sum:counter", histogramBase), + }, + }, + }, + { + StartTimestampMs: startTimestampMs, + EndTimestampMs: endTimestampMs, + Matchers: []*prompb.LabelMatcher{ + { + Type: prompb.LabelMatcher_EQ, + Name: "__aggregation__", + Value: fmt.Sprintf("%s_bucket:sum:counter", histogramBase), + }, + { + Type: prompb.LabelMatcher_EQ, + Name: "le", + Value: "+Inf", + }, + }, + }, + } + + res := []map[int64]float64{ + make(map[int64]float64), + make(map[int64]float64), + } + + for i, q := range queries { + ss, err := client.Read(context.Background(), q, false) + if err != nil { + return -1, err + } + + for ss.Next() { + if err = ss.Err(); err != nil { + return -1, err + } + + series := ss.At() + it := series.Iterator(nil) + + for vt := it.Next(); vt != chunkenc.ValNone; vt = it.Next() { + if err = it.Err(); err != nil { + return -1, fmt.Errorf("error iterating through %v: %w", series.Labels(), err) + } + if vt != chunkenc.ValFloat { + return -1, fmt.Errorf("expected float result for query '%v', got %s", q, vt.String()) + } + + ts, v := it.At() + + if _, ok := res[i][ts]; !ok { + res[i][ts] = v + } else { + res[i][ts] += v + } + } + } + } + + if len(res[0]) != len(res[1]) { + return -1, fmt.Errorf("result sets different lengths, %v != %v", len(res[0]), len(res[1])) + } + + for ts, v0 := range res[0] { + v1 := res[1][ts] + if v0 != v1 { + return ts, nil + } + } + + return 0, nil +}