Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Receive: Dont rebatch already replicated requests #5818

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

### Changed

- [#5818] Receive: Don't batch requests which are already replicated, this improves performance by removing unecessary operation. *breaking :warning: If you have replication enabled, ensure all your receive nodes are already running at least `v0.28.0` prior to updating to this version.*

## [v0.29.0](https://github.com/thanos-io/thanos/tree/release-0.29) - Release in progress

### Fixed
Expand Down
14 changes: 12 additions & 2 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,14 +552,24 @@ func (h *Handler) forward(ctx context.Context, tenant string, r replica, wreq *p
return errors.New("hashring is not ready")
}

wreqs := make(map[endpointReplica]*prompb.WriteRequest)

// If request was already replicated, we know it's intended for
// current endpoint, so we can go to local write directly (taken care of
// in fanoutForward)
if r.replicated {
wreqs[endpointReplica{endpoint: h.options.Endpoint, replica: r}] = wreq
h.mtx.RUnlock()
return h.fanoutForward(ctx, tenant, wreqs, 1)
}

// Batch all of the time series in the write request
// into several smaller write requests that are
// grouped by target endpoint. This ensures that
// for any incoming write request to a node,
// at most one outgoing write request will be made
// to every other node in the hashring, rather than
// one request per time series.
wreqs := make(map[endpointReplica]*prompb.WriteRequest)
for i := range wreq.Timeseries {
endpoint, err := h.hashring.GetN(tenant, &wreq.Timeseries[i], r.n)
if err != nil {
Expand All @@ -573,8 +583,8 @@ func (h *Handler) forward(ctx context.Context, tenant string, r replica, wreq *p
wr := wreqs[key]
wr.Timeseries = append(wr.Timeseries, wreq.Timeseries[i])
}
h.mtx.RUnlock()

h.mtx.RUnlock()
return h.fanoutForward(ctx, tenant, wreqs, len(wreqs))
}

Expand Down
34 changes: 34 additions & 0 deletions pkg/receive/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1425,6 +1425,40 @@ func benchmarkHandlerMultiTSDBReceiveRemoteWrite(b testutil.TB) {
}
})

handler.options.DefaultTenantID = fmt.Sprintf("%v-ok-w-replicated", tcase.name)
handler.options.ReplicaHeader = "test-header"
handler.writer.multiTSDB = &tsOverrideTenantStorage{TenantStorage: m, interval: 1}

// It takes time to create new tenant, wait for it.
{
app, err := m.TenantAppendable(handler.options.DefaultTenantID)
testutil.Ok(b, err)

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

testutil.Ok(b, runutil.Retry(1*time.Second, ctx.Done(), func() error {
_, err = app.Appender(ctx)
return err
}))
}

b.Run("OK-with-replicated", func(b testutil.TB) {
n := b.N()
headers := make(http.Header)
headers.Set(handler.options.ReplicaHeader, "1")
b.ResetTimer()
for i := 0; i < n; i++ {
r := httptest.NewRecorder()
handler.receiveHTTP(r, &http.Request{
ContentLength: int64(len(tcase.writeRequest)),
Body: io.NopCloser(bytes.NewReader(tcase.writeRequest)),
Header: headers,
})
testutil.Equals(b, http.StatusOK, r.Code, "got non 200 error: %v", r.Body.String())
}
})

handler.options.DefaultTenantID = fmt.Sprintf("%v-conflicting", tcase.name)
handler.writer.multiTSDB = &tsOverrideTenantStorage{TenantStorage: m, interval: -1} // Timestamp can't go down, which will cause conflict error.

Expand Down