Skip to content

Commit

Permalink
[RFS] Refactored DocumentReindexer, HTTP client usage, added a OpenSe…
Browse files Browse the repository at this point in the history
…archClient (#623)

* Cleaned up DocumentReindexer error handling and logging

Signed-off-by: Chris Helma <chelma+github@amazon.com>

* Made reactor-netty the sole HTTP client provider

Signed-off-by: Chris Helma <chelma+github@amazon.com>

* Created an OpenSearchClient layer above the RestClient

Signed-off-by: Chris Helma <chelma+github@amazon.com>

* Updates per PR comments

Signed-off-by: Chris Helma <chelma+github@amazon.com>

---------

Signed-off-by: Chris Helma <chelma+github@amazon.com>
  • Loading branch information
chelma committed May 2, 2024
1 parent afa6599 commit 4bfdb0f
Show file tree
Hide file tree
Showing 10 changed files with 248 additions and 274 deletions.
24 changes: 16 additions & 8 deletions RFS/src/main/java/com/rfs/ReindexFromSnapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,10 @@ public static void main(String[] args) throws InterruptedException {
// ==========================================================================================================
logger.info("==================================================================");
logger.info("Attempting to create the snapshot...");
OpenSearchClient sourceClient = new OpenSearchClient(sourceConnection);
SnapshotCreator snapshotCreator = repo instanceof S3Repo
? new S3SnapshotCreator(snapshotName, sourceConnection, s3RepoUri, s3Region)
: new FileSystemSnapshotCreator(snapshotName, sourceConnection, snapshotLocalRepoDirPath.toString());
? new S3SnapshotCreator(snapshotName, sourceClient, s3RepoUri, s3Region)
: new FileSystemSnapshotCreator(snapshotName, sourceClient, snapshotLocalRepoDirPath.toString());
snapshotCreator.registerRepo();
snapshotCreator.createSnapshot();
while (!snapshotCreator.isSnapshotFinished()) {
Expand Down Expand Up @@ -266,14 +267,15 @@ public static void main(String[] args) throws InterruptedException {
logger.info("==================================================================");
logger.info("Attempting to recreate the Global Metadata...");

OpenSearchClient targetClient = new OpenSearchClient(targetConnection);
if (sourceVersion == ClusterVersion.ES_6_8) {
ObjectNode root = globalMetadata.toObjectNode();
ObjectNode transformedRoot = transformer.transformGlobalMetadata(root);
GlobalMetadataCreator_OS_2_11.create(transformedRoot, targetConnection, Collections.emptyList(), templateWhitelist);
GlobalMetadataCreator_OS_2_11.create(transformedRoot, targetClient, Collections.emptyList(), templateWhitelist);
} else if (sourceVersion == ClusterVersion.ES_7_10) {
ObjectNode root = globalMetadata.toObjectNode();
ObjectNode transformedRoot = transformer.transformGlobalMetadata(root);
GlobalMetadataCreator_OS_2_11.create(transformedRoot, targetConnection, componentTemplateWhitelist, templateWhitelist);
GlobalMetadataCreator_OS_2_11.create(transformedRoot, targetClient, componentTemplateWhitelist, templateWhitelist);
}
}

Expand Down Expand Up @@ -301,14 +303,15 @@ public static void main(String[] args) throws InterruptedException {
// ==========================================================================================================
logger.info("==================================================================");
logger.info("Attempting to recreate the indices...");
OpenSearchClient targetClient = new OpenSearchClient(targetConnection);
for (IndexMetadata.Data indexMetadata : indexMetadatas) {
String reindexName = indexMetadata.getName() + indexSuffix;
logger.info("Recreating index " + indexMetadata.getName() + " as " + reindexName + " on target...");

ObjectNode root = indexMetadata.toObjectNode();
ObjectNode transformedRoot = transformer.transformIndexMetadata(root);
IndexMetadataData_OS_2_11 indexMetadataOS211 = new IndexMetadataData_OS_2_11(transformedRoot, indexMetadata.getId(), reindexName);
IndexCreator_OS_2_11.create(reindexName, indexMetadataOS211, targetConnection);
IndexCreator_OS_2_11.create(reindexName, indexMetadataOS211, targetClient);
}
}

Expand Down Expand Up @@ -358,12 +361,17 @@ public static void main(String[] args) throws InterruptedException {

Flux<Document> documents = new LuceneDocumentsReader().readDocuments(luceneDirPath, indexMetadata.getName(), shardId);
String targetIndex = indexMetadata.getName() + indexSuffix;
DocumentReindexer.reindex(targetIndex, documents, targetConnection);

logger.info("Shard reindexing completed");
final int finalShardId = shardId; // Define in local context for the lambda
DocumentReindexer.reindex(targetIndex, documents, targetConnection)
.doOnError(error -> logger.error("Error during reindexing: " + error))
.doOnSuccess(done -> logger.info("Reindexing completed for index " + targetIndex + ", shard " + finalShardId))
// Wait for the shard reindexing to complete before proceeding; fine in this demo script, but
// shouldn't be done quite this way in the real RFS Worker.
.block();
}
}
logger.info("Refreshing newly added documents");
logger.info("Refreshing target cluster to reflect newly added documents");
DocumentReindexer.refreshAllDocuments(targetConnection);
logger.info("Refresh complete");
}
Expand Down
93 changes: 13 additions & 80 deletions RFS/src/main/java/com/rfs/common/DocumentReindexer.java
Original file line number Diff line number Diff line change
@@ -1,47 +1,36 @@
package com.rfs.common;

import java.time.Duration;
import java.util.Base64;
import java.util.List;

import io.netty.buffer.Unpooled;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.document.Document;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
import reactor.util.retry.Retry;


public class DocumentReindexer {
private static final Logger logger = LogManager.getLogger(DocumentReindexer.class);
private static final int MAX_BATCH_SIZE = 1000; // Arbitrarily chosen

public static void reindex(String indexName, Flux<Document> documentStream, ConnectionDetails targetConnection) throws Exception {
String targetUrl = "/" + indexName + "/_bulk";
HttpClient client = HttpClient.create()
.host(targetConnection.hostName)
.port(targetConnection.port)
.headers(h -> {
h.set("Content-Type", "application/json");
if (targetConnection.authType == ConnectionDetails.AuthType.BASIC) {
String credentials = targetConnection.username + ":" + targetConnection.password;
String encodedCredentials = Base64.getEncoder().encodeToString(credentials.getBytes());
h.set("Authorization", "Basic " + encodedCredentials);
}
});
public static Mono<Void> reindex(String indexName, Flux<Document> documentStream, ConnectionDetails targetConnection) throws Exception {
OpenSearchClient client = new OpenSearchClient(targetConnection);

documentStream
return documentStream
.map(DocumentReindexer::convertDocumentToBulkSection) // Convert each Document to part of a bulk operation
.buffer(MAX_BATCH_SIZE) // Collect until you hit the batch size
.doOnNext(bulk -> logger.info(bulk.size() + " documents in current bulk request"))
.map(DocumentReindexer::convertToBulkRequestBody) // Assemble the bulk request body from the parts
.flatMap(bulkJson -> sendBulkRequest(client, targetUrl, bulkJson)) // Send the request
.flatMap(bulkJson -> client.sendBulkRequest(indexName, bulkJson) // Send the request
.doOnSuccess(unused -> logger.debug("Batch succeeded"))
.doOnError(error -> logger.error("Batch failed", error))
.onErrorResume(e -> Mono.empty()) // Prevent the error from stopping the entire stream
)
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)).maxBackoff(Duration.ofSeconds(5)))
.subscribe(
response -> logger.info("Batch uploaded successfully"),
error -> logger.error("Failed to upload batch", error)
);
.doOnComplete(() -> logger.debug("All batches processed"))
.then();
}

private static String convertDocumentToBulkSection(Document document) {
Expand All @@ -53,72 +42,16 @@ private static String convertDocumentToBulkSection(Document document) {
}

private static String convertToBulkRequestBody(List<String> bulkSections) {
logger.info(bulkSections.size() + " documents in current bulk request");
StringBuilder builder = new StringBuilder();
for (String section : bulkSections) {
builder.append(section).append("\n");
}
return builder.toString();
}

private static Mono<Void> sendBulkRequest(HttpClient client, String url, String bulkJson) {
return client.post()
.uri(url)
.send(Flux.just(Unpooled.wrappedBuffer(bulkJson.getBytes())))
.responseSingle((res, content) ->
content.asString() // Convert the response content to a string
.map(body -> new BulkResponseDetails(res.status().code(), body)) // Map both status code and body into a response details object
)
.flatMap(responseDetails -> {
// Something bad happened with our request, log it
if (responseDetails.hasBadStatusCode()) {
logger.error(responseDetails.getFailureMessage());
}
// Some of the bulk operations failed, log it
else if (responseDetails.hasFailedOperations()) {
logger.error(responseDetails.getFailureMessage());
}
return Mono.just(responseDetails);
})
.doOnError(err -> {
// We weren't even able to complete the request, log it
logger.error("Bulk request failed", err);
})
.then();
}

public static void refreshAllDocuments(ConnectionDetails targetConnection) throws Exception {
// Send the request
RestClient client = new RestClient(targetConnection);
client.get("_refresh", false);
}

static class BulkResponseDetails {
public final int statusCode;
public final String body;

BulkResponseDetails(int statusCode, String body) {
this.statusCode = statusCode;
this.body = body;
}

public boolean hasBadStatusCode() {
return !(statusCode == 200 || statusCode == 201);
}

public boolean hasFailedOperations() {
return body.contains("\"errors\":true");
}

public String getFailureMessage() {
String failureMessage;
if (hasBadStatusCode()) {
failureMessage = "Bulk request failed. Status code: " + statusCode + ", Response body: " + body;
} else {
failureMessage = "Bulk request succeeded, but some operations failed. Response body: " + body;
}

return failureMessage;
}
OpenSearchClient client = new OpenSearchClient(targetConnection);
client.refresh();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ public class FileSystemSnapshotCreator extends SnapshotCreator {
private static final Logger logger = LogManager.getLogger(FileSystemSnapshotCreator.class);
private static final ObjectMapper mapper = new ObjectMapper();

private final ConnectionDetails connectionDetails;
private final OpenSearchClient client;
private final String snapshotName;
private final String snapshotRepoDirectoryPath;

public FileSystemSnapshotCreator(String snapshotName, ConnectionDetails connectionDetails, String snapshotRepoDirectoryPath) {
super(snapshotName, connectionDetails);
public FileSystemSnapshotCreator(String snapshotName, OpenSearchClient client, String snapshotRepoDirectoryPath) {
super(snapshotName, client);
this.snapshotName = snapshotName;
this.connectionDetails = connectionDetails;
this.client = client;
this.snapshotRepoDirectoryPath = snapshotRepoDirectoryPath;
}

Expand Down
6 changes: 5 additions & 1 deletion RFS/src/main/java/com/rfs/common/LuceneDocumentsReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.util.BytesRef;

import lombok.Lombok;
import reactor.core.publisher.Flux;


public class LuceneDocumentsReader {
private static final Logger logger = LogManager.getLogger(LuceneDocumentsReader.class);

Expand All @@ -36,6 +39,7 @@ public Flux<Document> readDocuments(Path luceneFilesBasePath, String indexName,
reader.close();
} catch (IOException e) {
logger.error("Failed to close IndexReader", e);
Lombok.sneakyThrow(e);
}
}
);
Expand All @@ -61,7 +65,7 @@ protected Document getDocument(IndexReader reader, int docId) {
}
if (source_bytes == null || source_bytes.bytes.length == 0) {
logger.warn("Document " + id + " is deleted or doesn't have the _source field enabled");
return null; // Skip deleted documents or those without the _source field
return null; // Skip these too
}

logger.debug("Document " + id + " read successfully");
Expand Down
130 changes: 130 additions & 0 deletions RFS/src/main/java/com/rfs/common/OpenSearchClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package com.rfs.common;

import java.net.HttpURLConnection;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import com.fasterxml.jackson.databind.node.ObjectNode;

import reactor.core.publisher.Mono;

public class OpenSearchClient {
private static final Logger logger = LogManager.getLogger(OpenSearchClient.class);

public static class BulkResponse extends RestClient.Response {
public BulkResponse(int responseCode, String responseBody, String responseMessage) {
super(responseCode, responseBody, responseMessage);
}

public boolean hasBadStatusCode() {
return !(code == HttpURLConnection.HTTP_OK || code == HttpURLConnection.HTTP_CREATED);
}

public boolean hasFailedOperations() {
// The OpenSearch Bulk API response body is JSON and contains a top-level "errors" field that indicates
// whether any of the individual operations in the bulk request failed. Rather than marshalling the entire
// response as JSON, just check for the string value.
return body.contains("\"errors\":true");
}

public String getFailureMessage() {
String failureMessage;
if (hasBadStatusCode()) {
failureMessage = "Bulk request failed. Status code: " + code + ", Response body: " + body;
} else {
failureMessage = "Bulk request succeeded, but some operations failed. Response body: " + body;
}

return failureMessage;
}
}

public final ConnectionDetails connectionDetails;
private final RestClient client;

public OpenSearchClient(ConnectionDetails connectionDetails) {
this.connectionDetails = connectionDetails;
this.client = new RestClient(connectionDetails);
}

/*
* Create a legacy template if it does not already exist; return true if created, false otherwise.
*/
public boolean createLegacyTemplateIdempotent(String templateName, ObjectNode settings){
String targetPath = "_template/" + templateName;
return createObjectIdempotent(targetPath, settings);
}

/*
* Create a component template if it does not already exist; return true if created, false otherwise.
*/
public boolean createComponentTemplateIdempotent(String templateName, ObjectNode settings){
String targetPath = "_component_template/" + templateName;
return createObjectIdempotent(targetPath, settings);
}

/*
* Create an index template if it does not already exist; return true if created, false otherwise.
*/
public boolean createIndexTemplateIdempotent(String templateName, ObjectNode settings){
String targetPath = "_index_template/" + templateName;
return createObjectIdempotent(targetPath, settings);
}

/*
* Create an index if it does not already exist; return true if created, false otherwise.
*/
public boolean createIndexIdempotent(String indexName, ObjectNode settings){
String targetPath = indexName;
return createObjectIdempotent(targetPath, settings);
}

private boolean createObjectIdempotent(String objectPath, ObjectNode settings){
RestClient.Response response = client.get(objectPath, true);
if (response.code == HttpURLConnection.HTTP_NOT_FOUND) {
client.put(objectPath, settings.toString(), false);
return true;
} else if (response.code == HttpURLConnection.HTTP_OK) {
logger.info(objectPath + " already exists. Skipping creation.");
} else {
logger.warn("Could not confirm that " + objectPath + " does not already exist. Skipping creation.");
}
return false;
}

public RestClient.Response registerSnapshotRepo(String repoName, ObjectNode settings){
String targetPath = "_snapshot/" + repoName;
return client.put(targetPath, settings.toString(), false);
}

public RestClient.Response createSnapshot(String repoName, String snapshotName, ObjectNode settings){
String targetPath = "_snapshot/" + repoName + "/" + snapshotName;
return client.put(targetPath, settings.toString(), false);
}

public RestClient.Response getSnapshotStatus(String repoName, String snapshotName){
String targetPath = "_snapshot/" + repoName + "/" + snapshotName;
return client.get(targetPath, false);
}

public Mono<BulkResponse> sendBulkRequest(String indexName, String body) {
String targetPath = indexName + "/_bulk";

return client.postAsync(targetPath, body)
.map(response -> new BulkResponse(response.code, response.body, response.message))
.flatMap(responseDetails -> {
if (responseDetails.hasBadStatusCode() || responseDetails.hasFailedOperations()) {
logger.error(responseDetails.getFailureMessage());
return Mono.error(new RuntimeException(responseDetails.getFailureMessage()));
}
return Mono.just(responseDetails);
});
}

public RestClient.Response refresh() {
String targetPath = "_refresh";

return client.get(targetPath, false);
}
}
Loading

0 comments on commit 4bfdb0f

Please sign in to comment.