Skip to content

Commit

Permalink
frontend: do not use streaming to return the result since
Browse files Browse the repository at this point in the history
Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
  • Loading branch information
krajorama committed May 15, 2024
1 parent ec3c79f commit d688b78
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 106 deletions.
16 changes: 9 additions & 7 deletions integration/e2emimir/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,18 +628,20 @@ func (c *Client) ActiveNativeHistogramMetrics(selector string, options ...Active
_ = body.Close()
}(resp.Body)

var bodyReader io.Reader = resp.Body
if resp.Header.Get("Content-Encoding") == "x-snappy-framed" {
bodyReader = s2.NewReader(bodyReader)
}

body, _ := io.ReadAll(resp.Body)
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(bodyReader)
return nil, fmt.Errorf("unexpected status code %d, body: %s", resp.StatusCode, body)
}

if resp.Header.Get("Content-Encoding") == "snappy" {
body, err = snappy.Decode(nil, body)
if err != nil {
return nil, fmt.Errorf("error decoding snappy response: %w", err)
}
}

res := &cardinality.ActiveNativeHistogramMetricsResponse{}
err = json.NewDecoder(bodyReader).Decode(res)
err = json.Unmarshal(body, res)
if err != nil {
return nil, fmt.Errorf("error decoding active native histograms response: %w", err)
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/cardinality/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,9 @@ func (m *ActiveMetricWithBucketCount) UpdateAverage() {
}

type ActiveNativeHistogramMetricsResponse struct {
Data []ActiveMetricWithBucketCount `json:"data"`
Data []ActiveMetricWithBucketCount `json:"data"`
Status string `json:"status,omitempty"`
Error string `json:"error,omitempty"`
}

// DecodeActiveSeriesRequest decodes the input http.Request into an ActiveSeriesRequest.
Expand Down
111 changes: 26 additions & 85 deletions pkg/frontend/querymiddleware/shard_active_native_histogram_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package querymiddleware

import (
"bytes"
"context"
"fmt"
"io"
Expand All @@ -11,10 +12,8 @@ import (
"sync"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/golang/snappy"
jsoniter "github.com/json-iterator/go"
"github.com/opentracing/opentracing-go"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"

Expand Down Expand Up @@ -47,8 +46,6 @@ func (s *shardActiveNativeHistogramMetricsMiddleware) RoundTrip(r *http.Request)
}

func (s *shardActiveNativeHistogramMetricsMiddleware) mergeResponses(ctx context.Context, responses []*http.Response, encoding string) *http.Response {
reader, writer := io.Pipe()

mtx := sync.Mutex{}
metricIdx := make(map[string]int, 0)
metricBucketCount := make([]*cardinality.ActiveMetricWithBucketCount, 0)
Expand Down Expand Up @@ -137,95 +134,39 @@ func (s *shardActiveNativeHistogramMetricsMiddleware) mergeResponses(ctx context
})
}

// Cannot start streaming until we merged all results.
// Need to wait for all shards to be able to calculate the end result.
err := g.Wait()

// Sort the results by metric name, unless there was an error.
if err == nil {
merged := cardinality.ActiveNativeHistogramMetricsResponse{}
resp := &http.Response{StatusCode: http.StatusInternalServerError, Header: http.Header{}}
resp.Header.Set("Content-Type", "application/json")

if err != nil {
merged.Status = "error"
merged.Error = fmt.Sprintf("error merging partial responses: %s", err.Error())
} else {
resp.StatusCode = http.StatusOK
sort.Slice(metricBucketCount, func(i, j int) bool {
return metricBucketCount[i].Metric < metricBucketCount[j].Metric
})
} else {
metricBucketCount = nil
}

resp := &http.Response{Body: reader, StatusCode: http.StatusOK, Header: http.Header{}}
resp.Header.Set("Content-Type", "application/json")
if encoding == encodingTypeSnappyFramed {
resp.Header.Set("Content-Encoding", encodingTypeSnappyFramed)
for _, item := range metricBucketCount {
item.UpdateAverage()
merged.Data = append(merged.Data, *item)
}
}

go s.writeMergedResponse(ctx, err, writer, metricBucketCount, encoding)

return resp
}

func (s *shardActiveNativeHistogramMetricsMiddleware) writeMergedResponse(ctx context.Context, mergeError error, w io.WriteCloser, metricBucketCount []*cardinality.ActiveMetricWithBucketCount, encoding string) {
defer w.Close()

span, _ := opentracing.StartSpanFromContext(ctx, "shardActiveNativeHistogramMetrics.writeMergedResponse")
defer span.Finish()

var out io.Writer = w
if encoding == encodingTypeSnappyFramed {
span.LogFields(otlog.String("encoding", encodingTypeSnappyFramed))
enc := getSnappyWriter(w)
out = enc
defer func() {
enc.Close()
// Reset the encoder before putting it back to pool to avoid it to hold the writer.
enc.Reset(nil)
snappyWriterPool.Put(enc)
}()
} else {
span.LogFields(otlog.String("encoding", "none"))
body, err := jsoniter.Marshal(merged)
if err != nil {
resp.StatusCode = http.StatusInternalServerError
body = []byte(fmt.Sprintf(`{"status":"error","error":"%s"}`, err.Error()))
}

stream := jsoniter.ConfigFastest.BorrowStream(out)
defer func(stream *jsoniter.Stream) {
_ = stream.Flush()

if cap(stream.Buffer()) > jsoniterMaxBufferSize {
return
}
jsoniter.ConfigFastest.ReturnStream(stream)
}(stream)

stream.WriteObjectStart()
defer stream.WriteObjectEnd()

stream.WriteObjectField("data")
stream.WriteArrayStart()
firstItem := true
for idx := range metricBucketCount {
if ctx.Err() != nil {
mergeError = ctx.Err()
break
}
if firstItem {
firstItem = false
} else {
stream.WriteMore()
}
// Update the average before sending
metricBucketCount[idx].UpdateAverage()
stream.WriteVal(metricBucketCount[idx])

// Flush the stream buffer if it's getting too large.
if stream.Buffered() > jsoniterMaxBufferSize {
_ = stream.Flush()
}
}
stream.WriteArrayEnd()

if mergeError != nil {
level.Error(s.logger).Log("msg", "error merging partial responses", "err", mergeError.Error())
span.LogFields(otlog.Error(mergeError))
stream.WriteMore()
stream.WriteObjectField("status")
stream.WriteString("error")
stream.WriteMore()
stream.WriteObjectField("error")
stream.WriteString(fmt.Sprintf("error merging partial responses: %s", mergeError.Error()))
if encoding == "snappy" {
resp.Header.Set("Content-Encoding", encoding)
body = snappy.Encode(nil, body)
}

resp.Body = io.NopCloser(bytes.NewReader(body))
return resp
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"testing"

"github.com/go-kit/log"
"github.com/golang/snappy"
"github.com/grafana/dskit/user"
"github.com/klauspost/compress/s2"
"github.com/pkg/errors"
Expand Down Expand Up @@ -102,26 +103,26 @@ func Test_shardActiveNativeHistogramMetricsMiddleware_RoundTrip(t *testing.T) {
},
"upstream response: invalid type for data field": {
request: validReq,
responseStatus: http.StatusOK,
responseStatus: http.StatusInternalServerError,
responseBody: `{"data": "unexpected"}`,

// We don't expect an error here because it only occurs later as the response is
// being streamed.
checkResponseErr: func(t *testing.T, err error) (continueTest bool) {
return assert.NoError(t, err)
assert.Error(t, err)
assert.Contains(t, err.Error(), "received unexpected response from upstream")
return false
},
expectedShardCount: tenantShardCount,
expect: resultActiveNativeHistogramMetrics{Status: "error", Error: "expected data field to contain an array"},
},
"upstream response: no data field": {
request: validReq,
responseStatus: http.StatusOK,
responseStatus: http.StatusInternalServerError,
responseBody: `{unexpected: "response"}`,

// We don't expect an error here because it only occurs later as the response is
// being streamed.
checkResponseErr: func(t *testing.T, err error) (continueTest bool) {
return assert.NoError(t, err)
assert.Error(t, err)
assert.Contains(t, err.Error(), "received unexpected response from upstream")
return false
},
expectedShardCount: tenantShardCount,
expect: resultActiveNativeHistogramMetrics{Status: "error", Error: "expected data field at top level"},
Expand Down Expand Up @@ -387,7 +388,7 @@ func Test_shardActiveNativeHistogramMetricsMiddleware_RoundTrip(t *testing.T) {
"honours Accept-Encoding header": {
request: func() *http.Request {
r := validReq()
r.Header.Add("Accept-Encoding", encodingTypeSnappyFramed)
r.Header.Add("Accept-Encoding", "snappy")
r.Header.Add(totalShardsControlHeader, "2")
return r
},
Expand Down Expand Up @@ -448,7 +449,7 @@ func Test_shardActiveNativeHistogramMetricsMiddleware_RoundTrip(t *testing.T) {
},
},
expectedShardCount: 2,
expectContentEncoding: encodingTypeSnappyFramed,
expectContentEncoding: "snappy",
},
"builds correct request shards for GET requests": {
request: func() *http.Request {
Expand Down Expand Up @@ -579,12 +580,14 @@ func Test_shardActiveNativeHistogramMetricsMiddleware_RoundTrip(t *testing.T) {

var br io.Reader = resp.Body
assert.Equal(t, tt.expectContentEncoding, resp.Header.Get("Content-Encoding"))
if resp.Header.Get("Content-Encoding") == encodingTypeSnappyFramed {
br = s2.NewReader(br)
}
body, err := io.ReadAll(br)
assert.NoError(t, err)

if resp.Header.Get("Content-Encoding") == "snappy" {
body, err = snappy.Decode(nil, body)
assert.NoError(t, err)
}

var res resultActiveNativeHistogramMetrics
err = json.Unmarshal(body, &res)
require.NoError(t, err)
Expand Down

0 comments on commit d688b78

Please sign in to comment.