Skip to content

Commit

Permalink
Merge pull request #892 from peternied/dlq
Browse files Browse the repository at this point in the history
Dead Letter Queue for failed bulk requests
  • Loading branch information
peternied committed Aug 14, 2024
2 parents 8b29700 + af7e1b1 commit 3deb5b1
Show file tree
Hide file tree
Showing 11 changed files with 653 additions and 88 deletions.
58 changes: 34 additions & 24 deletions RFS/src/main/java/com/rfs/common/DocumentReindexer.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package com.rfs.common;

import java.util.List;
import java.util.Collection;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.lucene.document.Document;

import org.opensearch.migrations.reindexer.tracing.IDocumentMigrationContexts;

import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -27,44 +29,52 @@ public Mono<Void> reindex(
IDocumentMigrationContexts.IDocumentReindexContext context
) {

return documentStream.map(this::convertDocumentToBulkSection) // Convert each Document to part of a bulk
// operation
return documentStream.map(BulkDocSection::new)
.buffer(numDocsPerBulkRequest) // Collect until you hit the batch size
.doOnNext(bulk -> log.atInfo().log("{} documents in current bulk request", bulk.size()))
.map(this::convertToBulkRequestBody) // Assemble the bulk request body from the parts
.flatMap(
bulkJson -> client.sendBulkRequest(indexName, bulkJson, context.createBulkRequest()) // Send the request
bulkDocs -> client.sendBulkRequest(indexName, bulkDocs, context.createBulkRequest()) // Send the request
.doOnSuccess(unused -> log.atDebug().log("Batch succeeded"))
.doOnError(error -> log.atError().log("Batch failed", error))
// Prevent the error from stopping the entire stream, retries occurring within sendBulkRequest
.onErrorResume(e -> Mono.empty()),
maxConcurrentRequests)
maxConcurrentRequests
)
.doOnComplete(() -> log.atDebug().log("All batches processed"))
.then();
}

@SneakyThrows
private String convertDocumentToBulkSection(Document document) {
String id = Uid.decodeId(document.getBinaryValue("_id").bytes);
@EqualsAndHashCode(onlyExplicitlyIncluded = true)
public static class BulkDocSection {
private Document doc;
@EqualsAndHashCode.Include
@Getter
private String docId;

String action = "{\"index\": {\"_id\": \"" + id + "\"}}";

// We must ensure the _source document is a "minified" JSON string, otherwise the bulk request will be corrupted.
// Specifically, we cannot have any leading or trailing whitespace, and the JSON must be on a single line.
String trimmedSource = document.getBinaryValue("_source").utf8ToString().trim();
Object jsonObject = objectMapper.readValue(trimmedSource, Object.class);
String minifiedSource = objectMapper.writeValueAsString(jsonObject);
public BulkDocSection(Document doc) {
this.doc = doc;
this.docId = Uid.decodeId(doc.getBinaryValue("_id").bytes);
}

return action + "\n" + minifiedSource;
}
@SneakyThrows
public String asBulkIndex() {
String action = "{\"index\": {\"_id\": \"" + getDocId() + "\"}}";
// We must ensure the _source document is a "minified" JSON string, otherwise the bulk request will be corrupted.
// Specifically, we cannot have any leading or trailing whitespace, and the JSON must be on a single line.
String trimmedSource = doc.getBinaryValue("_source").utf8ToString().trim();
Object jsonObject = objectMapper.readValue(trimmedSource, Object.class);
String minifiedSource = objectMapper.writeValueAsString(jsonObject);
return action + "\n" + minifiedSource;
}

private String convertToBulkRequestBody(List<String> bulkSections) {
StringBuilder builder = new StringBuilder();
for (String section : bulkSections) {
builder.append(section).append("\n");
public static String convertToBulkRequestBody(Collection<BulkDocSection> bulkSections) {
StringBuilder builder = new StringBuilder();
for (var section : bulkSections) {
var indexCommand = section.asBulkIndex();
builder.append(indexCommand).append("\n");
}
return builder.toString();
}
log.atDebug().log("Bulk request body: \n{}", builder.toString());

return builder.toString();
}
}
128 changes: 97 additions & 31 deletions RFS/src/main/java/com/rfs/common/OpenSearchClient.java
Original file line number Diff line number Diff line change
@@ -1,40 +1,67 @@
package com.rfs.common;

import java.io.IOException;
import java.net.HttpURLConnection;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import org.opensearch.migrations.parsing.BulkResponseParser;
import org.opensearch.migrations.reindexer.FailedRequestsLogger;

import com.rfs.common.DocumentReindexer.BulkDocSection;
import com.rfs.common.http.ConnectionContext;
import com.rfs.common.http.HttpResponse;
import com.rfs.tracing.IRfsContexts;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;

@Slf4j
public class OpenSearchClient {
private static final Logger logger = LogManager.getLogger(OpenSearchClient.class);

private static final ObjectMapper objectMapper = new ObjectMapper();

private static final int defaultMaxRetryAttempts = 3;
private static final Duration defaultBackoff = Duration.ofSeconds(1);
private static final Duration defaultMaxBackoff = Duration.ofSeconds(10);
private static final Retry snapshotRetryStrategy = Retry.backoff(defaultMaxRetryAttempts, defaultBackoff)
.maxBackoff(defaultMaxBackoff);
private static final Retry checkIfItemExistsRetryStrategy = Retry.backoff(defaultMaxRetryAttempts, defaultBackoff)
.maxBackoff(defaultMaxBackoff);
private static final Retry createItemExistsRetryStrategy = Retry.backoff(defaultMaxRetryAttempts, defaultBackoff)
.maxBackoff(defaultMaxBackoff)
.filter(throwable -> !(throwable instanceof InvalidResponse)); // Do not retry on this exception

private static final int bulkMaxRetryAttempts = 15;
private static final Duration bulkBackoff = Duration.ofSeconds(2);
private static final Duration bulkMaxBackoff = Duration.ofSeconds(60);
/** Retries for up 10 minutes */
private static final Retry bulkRetryStrategy = Retry.backoff(bulkMaxRetryAttempts, bulkBackoff)
.maxBackoff(bulkMaxBackoff);

static {
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
}

private final RestClient client;
private final FailedRequestsLogger failedRequestsLogger;

public OpenSearchClient(ConnectionContext connectionContext) {
this(new RestClient(connectionContext));
this(new RestClient(connectionContext), new FailedRequestsLogger());
}

OpenSearchClient(RestClient client) {
OpenSearchClient(RestClient client, FailedRequestsLogger failedRequestsLogger) {
this.client = client;
this.failedRequestsLogger = failedRequestsLogger;
}

/*
Expand Down Expand Up @@ -110,8 +137,8 @@ private Optional<ObjectNode> createObjectIdempotent(
return Mono.error(new OperationFailed(errorMessage, resp));
}
})
.doOnError(e -> logger.error(e.getMessage()))
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)).maxBackoff(Duration.ofSeconds(10)))
.doOnError(e -> log.error(e.getMessage()))
.retryWhen(checkIfItemExistsRetryStrategy)
.block();

assert getResponse != null : ("getResponse should not be null; it should either be a valid response or an exception"
Expand All @@ -137,12 +164,8 @@ private Optional<ObjectNode> createObjectIdempotent(
return Mono.error(new OperationFailed(errorMessage, resp));
}
})
.doOnError(e -> logger.error(e.getMessage()))
.retryWhen(
Retry.backoff(3, Duration.ofSeconds(1))
.maxBackoff(Duration.ofSeconds(10))
.filter(throwable -> !(throwable instanceof InvalidResponse)) // Do not retry on this exception
)
.doOnError(e -> log.error(e.getMessage()))
.retryWhen(createItemExistsRetryStrategy)
.block();

return Optional.of(settings);
Expand Down Expand Up @@ -175,8 +198,8 @@ public void registerSnapshotRepo(
return Mono.error(new OperationFailed(errorMessage, resp));
}
})
.doOnError(e -> logger.error(e.getMessage()))
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)).maxBackoff(Duration.ofSeconds(10)))
.doOnError(e -> log.error(e.getMessage()))
.retryWhen(snapshotRetryStrategy)
.block();
}

Expand Down Expand Up @@ -205,8 +228,8 @@ public void createSnapshot(
return Mono.error(new OperationFailed(errorMessage, resp));
}
})
.doOnError(e -> logger.error(e.getMessage()))
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)).maxBackoff(Duration.ofSeconds(10)))
.doOnError(e -> log.error(e.getMessage()))
.retryWhen(snapshotRetryStrategy)
.block();
}

Expand All @@ -233,8 +256,8 @@ public Optional<ObjectNode> getSnapshotStatus(
return Mono.error(new OperationFailed(errorMessage, resp));
}
})
.doOnError(e -> logger.error(e.getMessage()))
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)).maxBackoff(Duration.ofSeconds(10)))
.doOnError(e -> log.error(e.getMessage()))
.retryWhen(snapshotRetryStrategy)
.block();

assert getResponse != null : ("getResponse should not be null; it should either be a valid response or an exception"
Expand All @@ -257,20 +280,54 @@ public Optional<ObjectNode> getSnapshotStatus(
}
}

public Mono<BulkResponse> sendBulkRequest(String indexName, String body, IRfsContexts.IRequestContext context) {
String targetPath = indexName + "/_bulk";
Retry getBulkRetryStrategy() {
return bulkRetryStrategy;
}

return client.postAsync(targetPath, body, context)
.map(response -> new BulkResponse(response.statusCode, response.statusText, response.headers, response.body))
.flatMap(resp -> {
if (resp.hasBadStatusCode() || resp.hasFailedOperations()) {
logger.error(resp.getFailureMessage());
public Mono<BulkResponse> sendBulkRequest(String indexName, List<BulkDocSection> docs, IRfsContexts.IRequestContext context) {
final var docsMap = docs.stream().collect(Collectors.toMap(d -> d.getDocId(), d -> d));
return Mono.defer(() -> {
final String targetPath = indexName + "/_bulk";
log.atTrace()
.setMessage("Creating bulk body with document ids {}")
.addArgument(() -> docsMap.keySet())
.log();
var body = BulkDocSection.convertToBulkRequestBody(docsMap.values());
return client.postAsync(targetPath, body, context)
.flatMap(response -> {
var resp = new BulkResponse(response.statusCode, response.statusText, response.headers, response.body);
if (!resp.hasBadStatusCode() && !resp.hasFailedOperations()) {
return Mono.just(resp);
}
// Remove all successful documents for the next bulk request attempt
var successfulDocs = resp.getSuccessfulDocs();
successfulDocs.forEach(docsMap::remove);
log.atWarn()
.setMessage("After bulk request on index '{}', {} more documents have succeed, {} remain")
.addArgument(indexName)
.addArgument(successfulDocs::size)
.addArgument(docsMap::size)
.log();
return Mono.error(new OperationFailed(resp.getFailureMessage(), resp));
}
return Mono.just(resp);
})
// In throttle cases, this will be low enough to get down to 1tps with 50 concurrency
.retryWhen(Retry.backoff(6, Duration.ofSeconds(2)).maxBackoff(Duration.ofSeconds(60)));
});
})
.retryWhen(getBulkRetryStrategy())
.doOnError(error -> {
if (!docsMap.isEmpty()) {
failedRequestsLogger.logBulkFailure(
indexName,
docsMap::size,
() -> BulkDocSection.convertToBulkRequestBody(docsMap.values()),
error
);
} else {
log.atWarn()
.setMessage("Unexpected empty document map for bulk request on index {}")
.addArgument(indexName)
.setCause(error)
.log();
}
});
}

public HttpResponse refresh(IRfsContexts.IRequestContext context) {
Expand Down Expand Up @@ -298,6 +355,15 @@ public boolean hasFailedOperations() {
return matcher.find();
}

public List<String> getSuccessfulDocs() {
try {
return BulkResponseParser.findSuccessDocs(body);
} catch (IOException ioe) {
log.warn("Unable to process bulk request for success", ioe);
return List.of();
}
}

public String getFailureMessage() {
String failureMessage;
if (hasBadStatusCode()) {
Expand Down
Loading

0 comments on commit 3deb5b1

Please sign in to comment.