From 22d3f74f350531b0eedc781e81eae185007d3783 Mon Sep 17 00:00:00 2001 From: Chris Helma <25470211+chelma@users.noreply.github.com> Date: Thu, 6 Jun 2024 07:16:12 -0500 Subject: [PATCH] [RFS] Updated RFS to perform coordinated index migration (#693) * Coordinated index creation working; need to add whitelist and tests Signed-off-by: Chris Helma * Fixed unit tests; addred refresh before RFS searches Signed-off-by: Chris Helma * Unit tested coordinated RFS index creation Signed-off-by: Chris Helma * Updated per PR discussion Signed-off-by: Chris Helma --------- Signed-off-by: Chris Helma --- .../java/com/rfs/DemoPrintOutSnapshot.java | 4 +- .../java/com/rfs/ReindexFromSnapshot.java | 8 +- RFS/src/main/java/com/rfs/RunRfsWorker.java | 15 +- RFS/src/main/java/com/rfs/cms/CmsClient.java | 46 +- RFS/src/main/java/com/rfs/cms/CmsEntry.java | 153 ++++- .../java/com/rfs/cms/OpenSearchCmsClient.java | 156 ++++- .../java/com/rfs/cms/OpenSearchCmsEntry.java | 116 ++++ .../java/com/rfs/common/IndexMetadata.java | 27 +- .../java/com/rfs/common/OpenSearchClient.java | 123 ++-- .../IndexMetadataFactory_ES_6_8.java | 14 +- .../IndexMetadataFactory_ES_7_10.java | 14 +- .../version_os_2_11/IndexCreator_OS_2_11.java | 16 +- .../main/java/com/rfs/worker/GlobalState.java | 5 +- .../main/java/com/rfs/worker/IndexRunner.java | 59 ++ .../main/java/com/rfs/worker/IndexStep.java | 473 ++++++++++++++++ .../java/com/rfs/worker/MetadataStep.java | 38 +- .../java/com/rfs/worker/SnapshotStep.java | 37 +- .../test/java/com/rfs/cms/CmsEntryTest.java | 2 +- .../SimpleRestoreFromSnapshot_ES_7_10.java | 5 +- .../java/com/rfs/worker/IndexRunnerTest.java | 44 ++ .../java/com/rfs/worker/IndexStepTest.java | 534 ++++++++++++++++++ .../com/rfs/worker/MetadataRunnerTest.java | 23 +- .../java/com/rfs/worker/MetadataStepTest.java | 26 +- .../com/rfs/worker/SnapshotRunnerTest.java | 22 +- .../java/com/rfs/worker/SnapshotStepTest.java | 45 +- 25 files changed, 1847 insertions(+), 158 deletions(-) create mode 100644 RFS/src/main/java/com/rfs/worker/IndexRunner.java create mode 100644 RFS/src/main/java/com/rfs/worker/IndexStep.java create mode 100644 RFS/src/test/java/com/rfs/worker/IndexRunnerTest.java create mode 100644 RFS/src/test/java/com/rfs/worker/IndexStepTest.java diff --git a/RFS/src/main/java/com/rfs/DemoPrintOutSnapshot.java b/RFS/src/main/java/com/rfs/DemoPrintOutSnapshot.java index 1589fb503..f7dc877ba 100644 --- a/RFS/src/main/java/com/rfs/DemoPrintOutSnapshot.java +++ b/RFS/src/main/java/com/rfs/DemoPrintOutSnapshot.java @@ -155,12 +155,12 @@ public static void main(String[] args) { Map indexMetadatas = new HashMap<>(); if (sourceVersion == ClusterVersion.ES_6_8) { for (SnapshotRepo.Index index : repoDataProvider.getIndicesInSnapshot(snapshotName)) { - IndexMetadata.Data indexMetadata = new IndexMetadataFactory_ES_6_8().fromRepo(repo, repoDataProvider, snapshotName, index.getName()); + IndexMetadata.Data indexMetadata = new IndexMetadataFactory_ES_6_8(repoDataProvider).fromRepo(snapshotName, index.getName()); indexMetadatas.put(index.getName(), indexMetadata); } } else { for (SnapshotRepo.Index index : repoDataProvider.getIndicesInSnapshot(snapshotName)) { - IndexMetadata.Data indexMetadata = new IndexMetadataFactory_ES_7_10().fromRepo(repo, repoDataProvider, snapshotName, index.getName()); + IndexMetadata.Data indexMetadata = new IndexMetadataFactory_ES_7_10(repoDataProvider).fromRepo(snapshotName, index.getName()); indexMetadatas.put(index.getName(), indexMetadata); } } diff --git a/RFS/src/main/java/com/rfs/ReindexFromSnapshot.java b/RFS/src/main/java/com/rfs/ReindexFromSnapshot.java index 6ba31b4dd..c9f6ddb3a 100644 --- a/RFS/src/main/java/com/rfs/ReindexFromSnapshot.java +++ b/RFS/src/main/java/com/rfs/ReindexFromSnapshot.java @@ -290,9 +290,9 @@ public static void main(String[] args) throws InterruptedException { logger.info("Reading Index Metadata for index: " + index.getName()); IndexMetadata.Data indexMetadata; if (sourceVersion == ClusterVersion.ES_6_8) { - indexMetadata = new IndexMetadataFactory_ES_6_8().fromRepo(repo, repoDataProvider, snapshotName, index.getName()); + indexMetadata = new IndexMetadataFactory_ES_6_8(repoDataProvider).fromRepo(snapshotName, index.getName()); } else { - indexMetadata = new IndexMetadataFactory_ES_7_10().fromRepo(repo, repoDataProvider, snapshotName, index.getName()); + indexMetadata = new IndexMetadataFactory_ES_7_10(repoDataProvider).fromRepo(snapshotName, index.getName()); } indexMetadatas.add(indexMetadata); } @@ -305,14 +305,14 @@ public static void main(String[] args) throws InterruptedException { // ========================================================================================================== logger.info("=================================================================="); logger.info("Attempting to recreate the indices..."); + IndexCreator_OS_2_11 indexCreator = new IndexCreator_OS_2_11(targetClient); for (IndexMetadata.Data indexMetadata : indexMetadatas) { String reindexName = indexMetadata.getName() + indexSuffix; logger.info("Recreating index " + indexMetadata.getName() + " as " + reindexName + " on target..."); ObjectNode root = indexMetadata.toObjectNode(); ObjectNode transformedRoot = transformer.transformIndexMetadata(root); - IndexMetadataData_OS_2_11 indexMetadataOS211 = new IndexMetadataData_OS_2_11(transformedRoot, indexMetadata.getId(), reindexName); - IndexCreator_OS_2_11.create(reindexName, indexMetadataOS211, targetClient); + indexCreator.create(transformedRoot, reindexName, indexMetadata.getId()); } } diff --git a/RFS/src/main/java/com/rfs/RunRfsWorker.java b/RFS/src/main/java/com/rfs/RunRfsWorker.java index 0192cc665..11b6b1580 100644 --- a/RFS/src/main/java/com/rfs/RunRfsWorker.java +++ b/RFS/src/main/java/com/rfs/RunRfsWorker.java @@ -22,6 +22,7 @@ import com.rfs.common.ClusterVersion; import com.rfs.common.ConnectionDetails; import com.rfs.common.GlobalMetadata; +import com.rfs.common.IndexMetadata; import com.rfs.common.Logging; import com.rfs.common.OpenSearchClient; import com.rfs.common.S3Uri; @@ -33,9 +34,12 @@ import com.rfs.transformers.TransformFunctions; import com.rfs.transformers.Transformer; import com.rfs.version_es_7_10.GlobalMetadataFactory_ES_7_10; +import com.rfs.version_es_7_10.IndexMetadataFactory_ES_7_10; import com.rfs.version_es_7_10.SnapshotRepoProvider_ES_7_10; import com.rfs.version_os_2_11.GlobalMetadataCreator_OS_2_11; +import com.rfs.version_os_2_11.IndexCreator_OS_2_11; import com.rfs.worker.GlobalState; +import com.rfs.worker.IndexRunner; import com.rfs.worker.MetadataRunner; import com.rfs.worker.Runner; import com.rfs.worker.SnapshotRunner; @@ -75,7 +79,11 @@ public static class Args { @Parameter(names = {"--target-password"}, description = "Optional. The target password; if not provided, will assume no auth on target", required = false) public String targetPass = null; - @Parameter(names = {"--index-template-whitelist"}, description = ("Optional. List of template names to migrate" + @Parameter(names = {"--index-whitelist"}, description = ("Optional. List of index names to migrate" + + " (e.g. 'logs_2024_01, logs_2024_02'). Default: all indices"), required = false) + public List indexWhitelist = List.of(); + + @Parameter(names = {"--index-template-whitelist"}, description = ("Optional. List of index template names to migrate" + " (e.g. 'posts_index_template1, posts_index_template2'). Default: empty list"), required = false) public List indexTemplateWhitelist = List.of(); @@ -139,6 +147,11 @@ public static void main(String[] args) throws Exception { Transformer transformer = TransformFunctions.getTransformer(ClusterVersion.ES_7_10, ClusterVersion.OS_2_11, awarenessDimensionality); MetadataRunner metadataWorker = new MetadataRunner(globalState, cmsClient, snapshotName, metadataFactory, metadataCreator, transformer); metadataWorker.run(); + + IndexMetadata.Factory indexMetadataFactory = new IndexMetadataFactory_ES_7_10(repoDataProvider); + IndexCreator_OS_2_11 indexCreator = new IndexCreator_OS_2_11(targetClient); + IndexRunner indexWorker = new IndexRunner(globalState, cmsClient, snapshotName, indexMetadataFactory, indexCreator, transformer); + indexWorker.run(); } catch (Runner.PhaseFailed e) { logPhaseFailureRecord(e.phase, e.nextStep, e.cmsEntry, e.e); diff --git a/RFS/src/main/java/com/rfs/cms/CmsClient.java b/RFS/src/main/java/com/rfs/cms/CmsClient.java index 86b53c2c0..8acb7e8e1 100644 --- a/RFS/src/main/java/com/rfs/cms/CmsClient.java +++ b/RFS/src/main/java/com/rfs/cms/CmsClient.java @@ -1,5 +1,6 @@ package com.rfs.cms; +import java.util.List; import java.util.Optional; /* @@ -23,7 +24,7 @@ public interface CmsClient { * Updates the Snapshot entry in the CMS. Returns an Optional; if the document was updated, it will be * the updated entry and empty otherwise. */ - public Optional updateSnapshotEntry(String snapshotName, CmsEntry.SnapshotStatus status); + public Optional updateSnapshotEntry(CmsEntry.Snapshot newEntry, CmsEntry.Snapshot lastEntry); /* * Creates a new entry in the CMS for the Metadata Migration's progress. Returns an Optional; if the document was @@ -41,5 +42,46 @@ public interface CmsClient { * Updates the Metadata Migration entry in the CMS. Returns an Optional; if the document was updated, * it will be the updated entry and empty otherwise. */ - public Optional updateMetadataEntry(CmsEntry.MetadataStatus status, String leaseExpiry, Integer numAttempts); + public Optional updateMetadataEntry(CmsEntry.Metadata newEntry, CmsEntry.Metadata lastEntry); + + /* + * Creates a new entry in the CMS for the Index Migration's progress. Returns an Optional; if the document was + * created, it will be the created entry and empty otherwise. + */ + public Optional createIndexEntry(); + + /* + * Attempt to retrieve the Index Migration entry from the CMS, if it exists. Returns an Optional; if the document + * exists, it will be the retrieved entry and empty otherwise. + */ + public Optional getIndexEntry(); + + /* + * Updates the Index Migration entry in the CMS. Returns an Optional; if the document was updated, + * it will be the updated entry and empty otherwise. + */ + public Optional updateIndexEntry(CmsEntry.Index newEntry, CmsEntry.Index lastEntry); + + /* + * Creates a new entry in the CMS for an Index Work Item. Returns an Optional; if the document was + * created, it will be the created entry and empty otherwise. + */ + public Optional createIndexWorkItem(String name, int numShards); + + /* + * Updates the Index Work Item in the CMS. Returns an Optional; if the document was updated, + * it will be the updated entry and empty otherwise. + */ + public Optional updateIndexWorkItem(CmsEntry.IndexWorkItem newEntry, CmsEntry.IndexWorkItem lastEntry); + + /* + * Forcefully updates the Index Work Item in the CMS. This method should be used when you don't care about collisions + * and just want to overwrite the existing entry no matter what. Returns the updated entry. + */ + public CmsEntry.IndexWorkItem updateIndexWorkItemForceful(CmsEntry.IndexWorkItem newEntry); + + /* + * Retrieves a set of Index Work Items from the CMS that appear ready to be worked on, up to the specified limit. + */ + public List getAvailableIndexWorkItems(int maxItems); } diff --git a/RFS/src/main/java/com/rfs/cms/CmsEntry.java b/RFS/src/main/java/com/rfs/cms/CmsEntry.java index da69910e5..1eec3e84f 100644 --- a/RFS/src/main/java/com/rfs/cms/CmsEntry.java +++ b/RFS/src/main/java/com/rfs/cms/CmsEntry.java @@ -1,11 +1,64 @@ package com.rfs.cms; + import com.rfs.common.RfsException; public class CmsEntry { + public static enum EntryType { + SNAPSHOT, + METADATA, + INDEX, + INDEX_WORK_ITEM, + } + public abstract static class Base { protected Base() {} + + @Override public abstract String toString(); + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || !(obj instanceof Base)) { + return false; + } + Base other = (Base) obj; + return this.toString().equals(other.toString()); + } + + @Override + public int hashCode() { + return this.toString().hashCode(); + } + } + + /* + * Provides a base class for leasable entry types. Doesn't allow for customization of lease mechanics, but it's + * unclear how to achieve that in Java given the constraints around static methods. + */ + public abstract static class Leasable extends Base { + public static final int LEASE_MS = 1 * 60 * 1000; // 1 minute, arbitrarily chosen + public static final int MAX_ATTEMPTS = 3; // arbitrarily chosen + + protected Leasable() {} + + public static int getLeaseDurationMs(int numAttempts) { + if (numAttempts > MAX_ATTEMPTS) { + throw new CouldNotFindNextLeaseDuration("numAttempts=" + numAttempts + " is greater than MAX_ATTEMPTS=" + MAX_ATTEMPTS); + } else if (numAttempts < 1) { + throw new CouldNotFindNextLeaseDuration("numAttempts=" + numAttempts + " is less than 1"); + } + return LEASE_MS * numAttempts; // Arbitratily chosen algorithm + } + + // TODO: We should be ideally setting the lease expiry using the server's clock, but it's unclear on the best + // way to do this. For now, we'll just use the client's clock. + public static String getLeaseExpiry(long currentTime, int numAttempts) { + return Long.toString(currentTime + getLeaseDurationMs(numAttempts)); + } } public static enum SnapshotStatus { @@ -15,7 +68,11 @@ public static enum SnapshotStatus { FAILED, } + /* + * Used to track the progress of taking a snapshot of the source cluster + */ public static class Snapshot extends Base { + public final EntryType type = EntryType.SNAPSHOT; public final String name; public final SnapshotStatus status; @@ -28,8 +85,9 @@ public Snapshot(String name, SnapshotStatus status) { @Override public String toString() { return "Snapshot(" + + "type='" + type.toString() + "," + "name='" + name + "," - + "status=" + status + + + "status=" + status.toString() + ")"; } } @@ -40,30 +98,50 @@ public static enum MetadataStatus { FAILED, } - public static class Metadata extends Base { - public static final int METADATA_LEASE_MS = 1 * 60 * 1000; // 1 minute, arbitrarily chosen - public static final int MAX_ATTEMPTS = 3; // arbitrarily chosen + /* + * Used to track the progress of migrating all the templates from the source cluster + */ + public static class Metadata extends Leasable { + public final EntryType type = EntryType.METADATA; + public final MetadataStatus status; + public final String leaseExpiry; + public final Integer numAttempts; - public static int getLeaseDurationMs(int numAttempts) { - if (numAttempts > MAX_ATTEMPTS) { - throw new CouldNotFindNextLeaseDuration("numAttempts=" + numAttempts + " is greater than MAX_ATTEMPTS=" + MAX_ATTEMPTS); - } else if (numAttempts < 1) { - throw new CouldNotFindNextLeaseDuration("numAttempts=" + numAttempts + " is less than 1"); - } - return METADATA_LEASE_MS * numAttempts; // Arbitratily chosen algorithm + public Metadata(MetadataStatus status, String leaseExpiry, int numAttempts) { + super(); + this.status = status; + this.leaseExpiry = leaseExpiry; + this.numAttempts = numAttempts; } - // TODO: We should be ideally setting the lease expiry using the server's clock, but it's unclear on the best - // way to do this. For now, we'll just use the client's clock. - public static String getLeaseExpiry(long currentTime, int numAttempts) { - return Long.toString(currentTime + getLeaseDurationMs(numAttempts)); + @Override + public String toString() { + return "Metadata(" + + "type='" + type.toString() + "," + + "status=" + status.toString() + "," + + "leaseExpiry=" + leaseExpiry + "," + + "numAttempts=" + numAttempts.toString() + + ")"; } + } - public final MetadataStatus status; + public static enum IndexStatus { + SETUP, + IN_PROGRESS, + COMPLETED, + FAILED, + } + + /* + * Used to track the progress of migrating all the indices from the soruce cluster + */ + public static class Index extends Leasable { + public final EntryType type = EntryType.INDEX; + public final IndexStatus status; public final String leaseExpiry; public final Integer numAttempts; - public Metadata(MetadataStatus status, String leaseExpiry, int numAttempts) { + public Index(IndexStatus status, String leaseExpiry, int numAttempts) { super(); this.status = status; this.leaseExpiry = leaseExpiry; @@ -72,7 +150,8 @@ public Metadata(MetadataStatus status, String leaseExpiry, int numAttempts) { @Override public String toString() { - return "Metadata(" + return "Index(" + + "type='" + type.toString() + "," + "status=" + status.toString() + "," + "leaseExpiry=" + leaseExpiry + "," + "numAttempts=" + numAttempts.toString() + @@ -80,6 +159,44 @@ public String toString() { } } + public static enum IndexWorkItemStatus { + NOT_STARTED, + COMPLETED, + FAILED, + } + + /* + * Used to track the migration of a particular index from the source cluster + */ + public static class IndexWorkItem extends Base { + public final EntryType type = EntryType.INDEX_WORK_ITEM; + public static final int ATTEMPTS_SOFT_LIMIT = 3; // will make at least this many attempts; arbitrarily chosen + + public final String name; + public final IndexWorkItemStatus status; + public final Integer numAttempts; + public final Integer numShards; + + public IndexWorkItem(String name, IndexWorkItemStatus status, int numAttempts, int numShards) { + super(); + this.name = name; + this.status = status; + this.numAttempts = numAttempts; + this.numShards = numShards; + } + + @Override + public String toString() { + return "IndexWorkItem(" + + "type='" + type.toString() + "," + + "name=" + name.toString() + "," + + "status=" + status.toString() + "," + + "numAttempts=" + numAttempts.toString() + "," + + "numShards=" + numShards.toString() + + ")"; + } + } + public static class CouldNotFindNextLeaseDuration extends RfsException { public CouldNotFindNextLeaseDuration(String message) { super("Could not find next lease duration. Reason: " + message); diff --git a/RFS/src/main/java/com/rfs/cms/OpenSearchCmsClient.java b/RFS/src/main/java/com/rfs/cms/OpenSearchCmsClient.java index 28cdea373..31efa9e73 100644 --- a/RFS/src/main/java/com/rfs/cms/OpenSearchCmsClient.java +++ b/RFS/src/main/java/com/rfs/cms/OpenSearchCmsClient.java @@ -1,14 +1,28 @@ package com.rfs.cms; +import java.util.List; import java.util.Optional; +import java.util.stream.Collectors; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import com.fasterxml.jackson.databind.node.ObjectNode; import com.rfs.common.OpenSearchClient; +import com.rfs.common.RfsException; public class OpenSearchCmsClient implements CmsClient { + private static final Logger logger = LogManager.getLogger(OpenSearchCmsClient.class); + public static final String CMS_INDEX_NAME = "cms-reindex-from-snapshot"; public static final String CMS_SNAPSHOT_DOC_ID = "snapshot_status"; public static final String CMS_METADATA_DOC_ID = "metadata_status"; + public static final String CMS_INDEX_DOC_ID = "index_status"; + + public static String getIndexWorkItemDocId(String name) { + // iwi => index work item + return "iwi_" + name; + } private final OpenSearchClient client; @@ -31,9 +45,20 @@ public Optional getSnapshotEntry(String snapshotName) { } @Override - public Optional updateSnapshotEntry(String snapshotName, CmsEntry.SnapshotStatus status) { - OpenSearchCmsEntry.Snapshot entry = new OpenSearchCmsEntry.Snapshot(snapshotName, status); - Optional updatedEntry = client.updateDocument(CMS_INDEX_NAME, CMS_SNAPSHOT_DOC_ID, entry.toJson()); + public Optional updateSnapshotEntry(CmsEntry.Snapshot newEntry, CmsEntry.Snapshot lastEntry) { + // Pull the existing entry to ensure that it hasn't changed since we originally retrieved it + ObjectNode currentEntryRaw = client.getDocument(CMS_INDEX_NAME, CMS_SNAPSHOT_DOC_ID) + .orElseThrow(() -> new RfsException("Failed to update snapshot entry: " + CMS_SNAPSHOT_DOC_ID + " does not exist")); + + OpenSearchCmsEntry.Snapshot currentEntry = OpenSearchCmsEntry.Snapshot.fromJson((ObjectNode) currentEntryRaw.get("_source")); + if (!currentEntry.equals(new OpenSearchCmsEntry.Snapshot(lastEntry))) { + logger.info("Failed to update snapshot entry: " + CMS_SNAPSHOT_DOC_ID + " has changed we first retrieved it"); + return Optional.empty(); + } + + // Now attempt the update + ObjectNode newEntryJson = new OpenSearchCmsEntry.Snapshot(newEntry).toJson(); + Optional updatedEntry = client.updateDocument(CMS_INDEX_NAME, CMS_SNAPSHOT_DOC_ID, newEntryJson, currentEntryRaw); return updatedEntry.map(OpenSearchCmsEntry.Snapshot::fromJson); } @@ -53,9 +78,126 @@ public Optional getMetadataEntry() { } @Override - public Optional updateMetadataEntry(CmsEntry.MetadataStatus status, String leaseExpiry, Integer numAttempts) { - OpenSearchCmsEntry.Metadata metadata = new OpenSearchCmsEntry.Metadata(status, leaseExpiry, numAttempts); - Optional updatedEntry = client.updateDocument(CMS_INDEX_NAME, CMS_METADATA_DOC_ID, metadata.toJson()); + public Optional updateMetadataEntry(CmsEntry.Metadata newEntry, CmsEntry.Metadata lastEntry) { + // Pull the existing entry to ensure that it hasn't changed since we originally retrieved it + ObjectNode currentEntryRaw = client.getDocument(CMS_INDEX_NAME, CMS_METADATA_DOC_ID) + .orElseThrow(() -> new RfsException("Failed to update metadata entry: " + CMS_METADATA_DOC_ID + " does not exist")); + + OpenSearchCmsEntry.Metadata currentEntry = OpenSearchCmsEntry.Metadata.fromJson((ObjectNode) currentEntryRaw.get("_source")); + if (!currentEntry.equals(new OpenSearchCmsEntry.Metadata(lastEntry))) { + logger.info("Failed to update metadata entry: " + CMS_METADATA_DOC_ID + " has changed we first retrieved it"); + return Optional.empty(); + } + + // Now attempt the update + ObjectNode newEntryJson = new OpenSearchCmsEntry.Metadata(newEntry).toJson(); + Optional updatedEntry = client.updateDocument(CMS_INDEX_NAME, CMS_METADATA_DOC_ID, newEntryJson, currentEntryRaw); return updatedEntry.map(OpenSearchCmsEntry.Metadata::fromJson); - } + } + + @Override + public Optional createIndexEntry() { + OpenSearchCmsEntry.Index entry = OpenSearchCmsEntry.Index.getInitial(); + Optional createdEntry = client.createDocument(CMS_INDEX_NAME, CMS_INDEX_DOC_ID, entry.toJson()); + return createdEntry.map(OpenSearchCmsEntry.Index::fromJson); + + } + + @Override + public Optional getIndexEntry() { + Optional document = client.getDocument(CMS_INDEX_NAME, CMS_INDEX_DOC_ID); + return document.map(doc -> (ObjectNode) doc.get("_source")) + .map(OpenSearchCmsEntry.Index::fromJson); + } + + @Override + public Optional updateIndexEntry(CmsEntry.Index newEntry, CmsEntry.Index lastEntry) { + // Pull the existing entry to ensure that it hasn't changed since we originally retrieved it + ObjectNode currentEntryRaw = client.getDocument(CMS_INDEX_NAME, CMS_INDEX_DOC_ID) + .orElseThrow(() -> new RfsException("Failed to update index entry: " + CMS_INDEX_DOC_ID + " does not exist")); + + OpenSearchCmsEntry.Index currentEntry = OpenSearchCmsEntry.Index.fromJson((ObjectNode) currentEntryRaw.get("_source")); + if (!currentEntry.equals(new OpenSearchCmsEntry.Index(lastEntry))) { + logger.info("Failed to update index entry: " + CMS_INDEX_DOC_ID + " has changed we first retrieved it"); + return Optional.empty(); + } + + // Now attempt the update + ObjectNode newEntryJson = new OpenSearchCmsEntry.Index(newEntry).toJson(); + Optional updatedEntry = client.updateDocument(CMS_INDEX_NAME, CMS_INDEX_DOC_ID, newEntryJson, currentEntryRaw); + return updatedEntry.map(OpenSearchCmsEntry.Index::fromJson); + } + + @Override + public Optional createIndexWorkItem(String name, int numShards) { + OpenSearchCmsEntry.IndexWorkItem entry = OpenSearchCmsEntry.IndexWorkItem.getInitial(name, numShards); + Optional createdEntry = client.createDocument(CMS_INDEX_NAME, getIndexWorkItemDocId(entry.name), entry.toJson()); + return createdEntry.map(OpenSearchCmsEntry.IndexWorkItem::fromJson); + } + + @Override + public Optional updateIndexWorkItem(CmsEntry.IndexWorkItem newEntry, CmsEntry.IndexWorkItem lastEntry) { + // Pull the existing entry to ensure that it hasn't changed since we originally retrieved it + ObjectNode currentEntryRaw = client.getDocument(CMS_INDEX_NAME, getIndexWorkItemDocId(lastEntry.name)) + .orElseThrow(() -> new RfsException("Failed to update index work item: " + lastEntry.name + " does not exist")); + + OpenSearchCmsEntry.IndexWorkItem currentEntry = OpenSearchCmsEntry.IndexWorkItem.fromJson((ObjectNode) currentEntryRaw.get("_source")); + if (!currentEntry.equals(new OpenSearchCmsEntry.IndexWorkItem(lastEntry))) { + logger.info("Failed to update index work item: " + lastEntry.name + " has changed we first retrieved it"); + return Optional.empty(); + } + + // Now attempt the update + ObjectNode newEntryJson = new OpenSearchCmsEntry.IndexWorkItem(newEntry).toJson(); + Optional updatedEntry = client.updateDocument(CMS_INDEX_NAME, getIndexWorkItemDocId(newEntry.name), newEntryJson, currentEntryRaw); + return updatedEntry.map(OpenSearchCmsEntry.IndexWorkItem::fromJson); + } + + @Override + public CmsEntry.IndexWorkItem updateIndexWorkItemForceful(CmsEntry.IndexWorkItem newEntry) { + // Now attempt the update + ObjectNode newEntryJson = new OpenSearchCmsEntry.IndexWorkItem(newEntry).toJson(); + ObjectNode updatedEntry = client.updateDocumentForceful(CMS_INDEX_NAME, getIndexWorkItemDocId(newEntry.name), newEntryJson); + return OpenSearchCmsEntry.IndexWorkItem.fromJson(updatedEntry); + } + + @Override + public List getAvailableIndexWorkItems(int maxItems) { + // Ensure we have a relatively fresh view of the index + client.refresh(); + + // Pull the docs + String queryBody = "{\n" + + " \"query\": {\n" + + " \"function_score\": {\n" + + " \"query\": {\n" + + " \"bool\": {\n" + + " \"must\": [\n" + + " {\n" + + " \"match\": {\n" + + " \"type\": \"INDEX_WORK_ITEM\"\n" + + " }\n" + + " },\n" + + " {\n" + + " \"match\": {\n" + + " \"status\": \"NOT_STARTED\"\n" + + " }\n" + + " }\n" + + " ]\n" + + " }\n" + + " },\n" + + " \"random_score\": {}\n" + // Try to avoid the workers fighting for the same work items + " }\n" + + " },\n" + + " \"size\": " + maxItems + "\n" + + "}"; + + List hits = client.searchDocuments(CMS_INDEX_NAME, queryBody); + List workItems = hits.stream() + .map(hit -> (ObjectNode) hit.get("_source")) + .map(OpenSearchCmsEntry.IndexWorkItem::fromJson) + .collect(Collectors.toList()); + + return workItems; + } } diff --git a/RFS/src/main/java/com/rfs/cms/OpenSearchCmsEntry.java b/RFS/src/main/java/com/rfs/cms/OpenSearchCmsEntry.java index a083c2494..b339eec49 100644 --- a/RFS/src/main/java/com/rfs/cms/OpenSearchCmsEntry.java +++ b/RFS/src/main/java/com/rfs/cms/OpenSearchCmsEntry.java @@ -10,6 +10,7 @@ public class OpenSearchCmsEntry { private static final ObjectMapper objectMapper = new ObjectMapper(); public static class Snapshot extends CmsEntry.Snapshot { + public static final String FIELD_TYPE = "type"; public static final String FIELD_NAME = "name"; public static final String FIELD_STATUS = "status"; @@ -32,8 +33,13 @@ public Snapshot(String name, CmsEntry.SnapshotStatus status) { super(name, status); } + public Snapshot(CmsEntry.Snapshot entry) { + this(entry.name, entry.status); + } + public ObjectNode toJson() { ObjectNode node = objectMapper.createObjectNode(); + node.put(FIELD_TYPE, type.toString()); node.put(FIELD_STATUS, name); node.put(FIELD_STATUS, status.toString()); return node; @@ -46,6 +52,7 @@ public String toString() { } public static class Metadata extends CmsEntry.Metadata { + public static final String FIELD_TYPE = "type"; public static final String FIELD_STATUS = "status"; public static final String FIELD_LEASE_EXPIRY = "leaseExpiry"; public static final String FIELD_NUM_ATTEMPTS = "numAttempts"; @@ -76,8 +83,64 @@ public Metadata(CmsEntry.MetadataStatus status, String leaseExpiry, Integer numA super(status, leaseExpiry, numAttempts); } + public Metadata(CmsEntry.Metadata entry) { + this(entry.status, entry.leaseExpiry, entry.numAttempts); + } + + public ObjectNode toJson() { + ObjectNode node = objectMapper.createObjectNode(); + node.put(FIELD_TYPE, type.toString()); + node.put(FIELD_STATUS, status.toString()); + node.put(FIELD_LEASE_EXPIRY, leaseExpiry); + node.put(FIELD_NUM_ATTEMPTS, numAttempts); + return node; + } + + @Override + public String toString() { + return this.toJson().toString(); + } + } + + public static class Index extends CmsEntry.Index { + public static final String FIELD_TYPE = "type"; + public static final String FIELD_STATUS = "status"; + public static final String FIELD_LEASE_EXPIRY = "leaseExpiry"; + public static final String FIELD_NUM_ATTEMPTS = "numAttempts"; + + public static Index getInitial() { + return new Index( + CmsEntry.IndexStatus.IN_PROGRESS, + // TODO: We should be ideally setting the lease using the server's clock, but it's unclear on the best way + // to do this. For now, we'll just use the client's clock. + CmsEntry.Index.getLeaseExpiry(Instant.now().toEpochMilli(), 1), + 1 + ); + } + + public static Index fromJson(ObjectNode node) { + try { + return new Index( + CmsEntry.IndexStatus.valueOf(node.get(FIELD_STATUS).asText()), + node.get(FIELD_LEASE_EXPIRY).asText(), + node.get(FIELD_NUM_ATTEMPTS).asInt() + ); + } catch (Exception e) { + throw new CantParseCmsEntryFromJson(Index.class, node.toString(), e); + } + } + + public Index(CmsEntry.IndexStatus status, String leaseExpiry, Integer numAttempts) { + super(status, leaseExpiry, numAttempts); + } + + public Index(CmsEntry.Index entry) { + this(entry.status, entry.leaseExpiry, entry.numAttempts); + } + public ObjectNode toJson() { ObjectNode node = objectMapper.createObjectNode(); + node.put(FIELD_TYPE, type.toString()); node.put(FIELD_STATUS, status.toString()); node.put(FIELD_LEASE_EXPIRY, leaseExpiry); node.put(FIELD_NUM_ATTEMPTS, numAttempts); @@ -90,6 +153,59 @@ public String toString() { } } + public static class IndexWorkItem extends CmsEntry.IndexWorkItem { + public static final String FIELD_TYPE = "type"; + public static final String FIELD_NAME = "name"; + public static final String FIELD_STATUS = "status"; + public static final String FIELD_NUM_ATTEMPTS = "numAttempts"; + public static final String FIELD_NUM_SHARDS = "numShards"; + + public static IndexWorkItem getInitial(String name, int numShards) { + return new IndexWorkItem( + name, + CmsEntry.IndexWorkItemStatus.NOT_STARTED, + 1, + numShards + ); + } + + public static IndexWorkItem fromJson(ObjectNode node) { + try { + return new IndexWorkItem( + node.get(FIELD_NAME).asText(), + CmsEntry.IndexWorkItemStatus.valueOf(node.get(FIELD_STATUS).asText()), + node.get(FIELD_NUM_ATTEMPTS).asInt(), + node.get(FIELD_NUM_SHARDS).asInt() + ); + } catch (Exception e) { + throw new CantParseCmsEntryFromJson(Index.class, node.toString(), e); + } + } + + public IndexWorkItem(String name, CmsEntry.IndexWorkItemStatus status, Integer numAttempts, Integer numShards) { + super(name, status, numAttempts, numShards); + } + + public IndexWorkItem(CmsEntry.IndexWorkItem entry) { + this(entry.name, entry.status, entry.numAttempts, entry.numShards); + } + + public ObjectNode toJson() { + ObjectNode node = objectMapper.createObjectNode(); + node.put(FIELD_TYPE, type.toString()); + node.put(FIELD_NAME, name); + node.put(FIELD_STATUS, status.toString()); + node.put(FIELD_NUM_ATTEMPTS, numAttempts); + node.put(FIELD_NUM_SHARDS, numShards); + return node; + } + + @Override + public String toString() { + return this.toJson().toString(); + } + } + public static class CantParseCmsEntryFromJson extends RfsException { public CantParseCmsEntryFromJson(Class entryClass, String json, Exception e) { super("Failed to parse CMS entry of type " + entryClass.getName() + " from JSON: " + json, e); diff --git a/RFS/src/main/java/com/rfs/common/IndexMetadata.java b/RFS/src/main/java/com/rfs/common/IndexMetadata.java index 56e92cf38..eb062e06f 100644 --- a/RFS/src/main/java/com/rfs/common/IndexMetadata.java +++ b/RFS/src/main/java/com/rfs/common/IndexMetadata.java @@ -18,8 +18,8 @@ public class IndexMetadata { * Defines the behavior required to read a snapshot's index metadata as JSON and convert it into a Data object */ public static interface Factory { - private JsonNode getJsonNode(SourceRepo repo, SnapshotRepo.Provider repoDataProvider, String indexId, String indexFileId, SmileFactory smileFactory) throws Exception { - Path filePath = repo.getIndexMetadataFilePath(indexId, indexFileId); + private JsonNode getJsonNode(String indexId, String indexFileId, SmileFactory smileFactory) { + Path filePath = getRepoDataProvider().getRepo().getIndexMetadataFilePath(indexId, indexFileId); try (InputStream fis = new FileInputStream(filePath.toFile())) { // Don't fully understand what the value of this code is, but it progresses the stream so we need to do it @@ -33,19 +33,30 @@ private JsonNode getJsonNode(SourceRepo repo, SnapshotRepo.Provider repoDataProv ObjectMapper smileMapper = new ObjectMapper(smileFactory); return smileMapper.readTree(bis); + } catch (Exception e) { + throw new RfsException("Could not load index metadata file: " + filePath.toString(), e); } } - default IndexMetadata.Data fromRepo(SourceRepo repo, SnapshotRepo.Provider repoDataProvider, String snapshotName, String indexName) throws Exception { + default IndexMetadata.Data fromRepo(String snapshotName, String indexName) { SmileFactory smileFactory = getSmileFactory(); - String indexId = repoDataProvider.getIndexId(indexName); - String indexFileId = getIndexFileId(repoDataProvider, snapshotName, indexName); - JsonNode root = getJsonNode(repo, repoDataProvider, indexId, indexFileId, smileFactory); + String indexId = getRepoDataProvider().getIndexId(indexName); + String indexFileId = getIndexFileId(snapshotName, indexName); + JsonNode root = getJsonNode(indexId, indexFileId, smileFactory); return fromJsonNode(root, indexId, indexName); } - public IndexMetadata.Data fromJsonNode(JsonNode root, String indexId, String indexName) throws Exception; + + // Version-specific implementation + public IndexMetadata.Data fromJsonNode(JsonNode root, String indexId, String indexName); + + // Version-specific implementation public SmileFactory getSmileFactory(); - public String getIndexFileId(SnapshotRepo.Provider repoDataProvider, String snapshotName, String indexName); + + // Version-specific implementation + public String getIndexFileId(String snapshotName, String indexName); + + // Get the underlying SnapshotRepo Provider + public SnapshotRepo.Provider getRepoDataProvider(); } /** diff --git a/RFS/src/main/java/com/rfs/common/OpenSearchClient.java b/RFS/src/main/java/com/rfs/common/OpenSearchClient.java index a17e4a6dd..abf3431a5 100644 --- a/RFS/src/main/java/com/rfs/common/OpenSearchClient.java +++ b/RFS/src/main/java/com/rfs/common/OpenSearchClient.java @@ -2,6 +2,8 @@ import java.net.HttpURLConnection; import java.time.Duration; +import java.util.ArrayList; +import java.util.List; import java.util.Optional; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -9,7 +11,9 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import reactor.core.publisher.Mono; @@ -81,23 +85,19 @@ private Optional createObjectIdempotent(String objectPath, ObjectNod if (response.code == HttpURLConnection.HTTP_NOT_FOUND) { client.put(objectPath, settings.toString()); return Optional.of(settings); - } else if (response.code == HttpURLConnection.HTTP_OK) { - logger.info(objectPath + " already exists. Skipping creation."); - } else { - logger.warn("Could not confirm that " + objectPath + " does not already exist. Skipping creation."); - } + } + // The only response code that can end up here is HTTP_OK, which means the object already existed return Optional.empty(); } /* - * Attempts to register a snapshot repository; no-op if the repo already exists. Returns an Optional; if the repo - * was created, it will be the settings used and empty if it already existed. + * Attempts to register a snapshot repository; no-op if the repo already exists. */ - public Optional registerSnapshotRepo(String repoName, ObjectNode settings){ + public void registerSnapshotRepo(String repoName, ObjectNode settings){ String targetPath = "_snapshot/" + repoName; - RestClient.Response response = client.putAsync(targetPath, settings.toString()) + client.putAsync(targetPath, settings.toString()) .flatMap(resp -> { - if (resp.code == HttpURLConnection.HTTP_OK || resp.code == HttpURLConnection.HTTP_CREATED) { + if (resp.code == HttpURLConnection.HTTP_OK) { return Mono.just(resp); } else { String errorMessage = ("Could not register snapshot repo: " + targetPath + ". Response Code: " + resp.code @@ -108,24 +108,16 @@ public Optional registerSnapshotRepo(String repoName, ObjectNode set .doOnError(e -> logger.error(e.getMessage())) .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)).maxBackoff(Duration.ofSeconds(10))) .block(); - - if (response.code == HttpURLConnection.HTTP_CREATED) { - return Optional.of(settings); - } else { - logger.info("Snapshot repo already exists. Registration is a no-op."); - return Optional.empty(); - } } /* - * Attempts to create a snapshot; no-op if the snapshot already exists. Returns an Optional; if the snapshot - * was created, it will be the settings used and empty if it already existed. + * Attempts to create a snapshot; no-op if the snapshot already exists. */ - public Optional createSnapshot(String repoName, String snapshotName, ObjectNode settings){ + public void createSnapshot(String repoName, String snapshotName, ObjectNode settings){ String targetPath = "_snapshot/" + repoName + "/" + snapshotName; - RestClient.Response response = client.putAsync(targetPath, settings.toString()) + client.putAsync(targetPath, settings.toString()) .flatMap(resp -> { - if (resp.code == HttpURLConnection.HTTP_OK || resp.code == HttpURLConnection.HTTP_CREATED) { + if (resp.code == HttpURLConnection.HTTP_OK) { return Mono.just(resp); } else { String errorMessage = ("Could not create snapshot: " + targetPath + ". Response Code: " + resp.code @@ -136,14 +128,6 @@ public Optional createSnapshot(String repoName, String snapshotName, .doOnError(e -> logger.error(e.getMessage())) .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)).maxBackoff(Duration.ofSeconds(10))) .block(); - - - if (response.code == HttpURLConnection.HTTP_CREATED) { - return Optional.of(settings); - } else { - logger.info("Snapshot already exists. Creation is a no-op."); - return Optional.empty(); - } } /* @@ -243,27 +227,22 @@ public Optional getDocument(String indexName, String documentId) { } /* - * Update a document using optimistic locking. Returns an Optional; if the document was updated, it - * will be the new value and empty otherwise. + * Update a document using optimistic locking with the versioning info in an original copy of the doc that is passed in. + * Returns an Optional; if the document was updated, it will be the new value and empty otherwise. */ - public Optional updateDocument(String indexName, String documentId, ObjectNode body) { - Optional document = getDocument(indexName, documentId); - if (document.isEmpty()) { - throw new UpdateFailed("Document not found: " + indexName + "/" + documentId); - } - + public Optional updateDocument(String indexName, String documentId, ObjectNode newBody, ObjectNode originalCopy) { String currentSeqNum; String currentPrimaryTerm; try { - currentSeqNum = document.get().get("_seq_no").asText(); - currentPrimaryTerm = document.get().get("_primary_term").asText(); + currentSeqNum = originalCopy.get("_seq_no").asText(); + currentPrimaryTerm = originalCopy.get("_primary_term").asText(); } catch (Exception e) { String errorMessage = "Could not update document: " + indexName + "/" + documentId; throw new RfsException(errorMessage, e); } ObjectNode upsertBody = new ObjectMapper().createObjectNode(); - upsertBody.set("doc", body); + upsertBody.set("doc", newBody); String targetPath = indexName + "/_update/" + documentId + "?if_seq_no=" + currentSeqNum + "&if_primary_term=" + currentPrimaryTerm; RestClient.Response response = client.postAsync(targetPath, upsertBody.toString()) @@ -281,13 +260,40 @@ public Optional updateDocument(String indexName, String documentId, .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)).maxBackoff(Duration.ofSeconds(10))) .block(); if (response.code == HttpURLConnection.HTTP_OK) { - return Optional.of(body); + return Optional.of(newBody); } else { // The only response code that can end up here is HTTP_CONFLICT, as everything is an error above // This indicates that we didn't acquire the optimistic lock return Optional.empty(); } } + + /* + * Update a document forcefully by skipping optimistic locking and overwriting the document regardless of versioning. + * Returns the new body used in the update. + */ + public ObjectNode updateDocumentForceful(String indexName, String documentId, ObjectNode newBody) { + ObjectNode upsertBody = new ObjectMapper().createObjectNode(); + upsertBody.set("doc", newBody); + + String targetPath = indexName + "/_update/" + documentId; + client.postAsync(targetPath, upsertBody.toString()) + .flatMap(resp -> { + if (resp.code == HttpURLConnection.HTTP_OK) { + return Mono.just(resp); + } else { + + String errorMessage = ("Could not update document: " + indexName + "/" + documentId + ". Response Code: " + resp.code + + ", Response Message: " + resp.message + ", Response Body: " + resp.body); + return Mono.error(new OperationFailed(errorMessage, resp)); + } + }) + .doOnError(e -> logger.error(e.getMessage())) + .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)).maxBackoff(Duration.ofSeconds(10))) + .block(); + + return newBody; + } public Mono sendBulkRequest(String indexName, String body) { String targetPath = indexName + "/_bulk"; @@ -310,6 +316,39 @@ public RestClient.Response refresh() { return client.get(targetPath); } + /* + * Retrieves documents from the specified index with the specified query. Returns a list of the hits. + */ + public List searchDocuments(String indexName, String queryBody) { + String targetPath = indexName + "/_search"; + RestClient.Response response = client.postAsync(targetPath, queryBody.toString()) + .flatMap(resp -> { + if (resp.code == HttpURLConnection.HTTP_OK) { + return Mono.just(resp); + } else { + String errorMessage = ("Could not retrieve documents from index " + indexName + ". Response Code: " + resp.code + + ", Response Message: " + resp.message + ", Response Body: " + resp.body); + return Mono.error(new OperationFailed(errorMessage, resp)); + } + }) + .doOnError(e -> logger.error(e.getMessage())) + .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)).maxBackoff(Duration.ofSeconds(10))) + .block(); + + try { + // Pull the hits out of the surrounding response and return them + ObjectNode responseJson = objectMapper.readValue(response.body, ObjectNode.class); + ArrayNode hits = (ArrayNode) responseJson.get("hits").get("hits"); + + JavaType type = objectMapper.getTypeFactory().constructCollectionType(ArrayList.class, ObjectNode.class); + List docs = objectMapper.convertValue(hits, type); + return docs; + } catch (Exception e) { + String errorMessage = "Could not parse response for: " + indexName; + throw new OperationFailed(errorMessage, response); + } + } + public static class BulkResponse extends RestClient.Response { public BulkResponse(int responseCode, String responseBody, String responseMessage) { super(responseCode, responseBody, responseMessage); diff --git a/RFS/src/main/java/com/rfs/version_es_6_8/IndexMetadataFactory_ES_6_8.java b/RFS/src/main/java/com/rfs/version_es_6_8/IndexMetadataFactory_ES_6_8.java index d95b2a754..2bf52273e 100644 --- a/RFS/src/main/java/com/rfs/version_es_6_8/IndexMetadataFactory_ES_6_8.java +++ b/RFS/src/main/java/com/rfs/version_es_6_8/IndexMetadataFactory_ES_6_8.java @@ -7,9 +7,14 @@ import com.rfs.common.SnapshotRepo; public class IndexMetadataFactory_ES_6_8 implements com.rfs.common.IndexMetadata.Factory { + private final SnapshotRepo.Provider repoDataProvider; + + public IndexMetadataFactory_ES_6_8(SnapshotRepo.Provider repoDataProvider) { + this.repoDataProvider = repoDataProvider; + } @Override - public IndexMetadata.Data fromJsonNode(JsonNode root, String indexId, String indexName) throws Exception { + public IndexMetadata.Data fromJsonNode(JsonNode root, String indexId, String indexName) { ObjectNode objectNodeRoot = (ObjectNode) root.get(indexName); return new IndexMetadataData_ES_6_8(objectNodeRoot, indexId, indexName); } @@ -20,7 +25,12 @@ public SmileFactory getSmileFactory() { } @Override - public String getIndexFileId(SnapshotRepo.Provider repoDataProvider, String snapshotName, String indexName) { + public String getIndexFileId(String snapshotName, String indexName) { return repoDataProvider.getSnapshotId(snapshotName); } + + @Override + public SnapshotRepo.Provider getRepoDataProvider() { + return repoDataProvider; + } } diff --git a/RFS/src/main/java/com/rfs/version_es_7_10/IndexMetadataFactory_ES_7_10.java b/RFS/src/main/java/com/rfs/version_es_7_10/IndexMetadataFactory_ES_7_10.java index cebd8a3f2..732e8e116 100644 --- a/RFS/src/main/java/com/rfs/version_es_7_10/IndexMetadataFactory_ES_7_10.java +++ b/RFS/src/main/java/com/rfs/version_es_7_10/IndexMetadataFactory_ES_7_10.java @@ -7,9 +7,14 @@ import com.rfs.common.SnapshotRepo; public class IndexMetadataFactory_ES_7_10 implements com.rfs.common.IndexMetadata.Factory { + private final SnapshotRepo.Provider repoDataProvider; + + public IndexMetadataFactory_ES_7_10(SnapshotRepo.Provider repoDataProvider) { + this.repoDataProvider = repoDataProvider; + } @Override - public IndexMetadata.Data fromJsonNode(JsonNode root, String indexId, String indexName) throws Exception { + public IndexMetadata.Data fromJsonNode(JsonNode root, String indexId, String indexName) { ObjectNode objectNodeRoot = (ObjectNode) root.get(indexName); return new IndexMetadataData_ES_7_10(objectNodeRoot, indexId, indexName); } @@ -20,9 +25,14 @@ public SmileFactory getSmileFactory() { } @Override - public String getIndexFileId(SnapshotRepo.Provider repoDataProvider, String snapshotName, String indexName) { + public String getIndexFileId(String snapshotName, String indexName) { SnapshotRepoProvider_ES_7_10 providerES710 = (SnapshotRepoProvider_ES_7_10) repoDataProvider; return providerES710.getIndexMetadataId(snapshotName, indexName); } + + @Override + public SnapshotRepo.Provider getRepoDataProvider() { + return repoDataProvider; + } } diff --git a/RFS/src/main/java/com/rfs/version_os_2_11/IndexCreator_OS_2_11.java b/RFS/src/main/java/com/rfs/version_os_2_11/IndexCreator_OS_2_11.java index 0464ac0c1..2979a4a2d 100644 --- a/RFS/src/main/java/com/rfs/version_os_2_11/IndexCreator_OS_2_11.java +++ b/RFS/src/main/java/com/rfs/version_os_2_11/IndexCreator_OS_2_11.java @@ -1,14 +1,22 @@ package com.rfs.version_os_2_11; +import java.util.Optional; + import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; -import com.rfs.common.IndexMetadata; import com.rfs.common.OpenSearchClient; public class IndexCreator_OS_2_11 { private static final ObjectMapper mapper = new ObjectMapper(); + protected final OpenSearchClient client; + + public IndexCreator_OS_2_11 (OpenSearchClient client) { + this.client = client; + } + + public Optional create(ObjectNode root, String indexName, String indexId) { + IndexMetadataData_OS_2_11 indexMetadata = new IndexMetadataData_OS_2_11(root, indexId, indexName); - public static void create(String indexName, IndexMetadata.Data indexMetadata, OpenSearchClient client) throws Exception { // Remove some settings which will cause errors if you try to pass them to the API ObjectNode settings = indexMetadata.getSettings(); @@ -23,7 +31,7 @@ public static void create(String indexName, IndexMetadata.Data indexMetadata, Op body.set("mappings", indexMetadata.getMappings()); body.set("settings", settings); - // Idempotently create the index - client.createIndex(indexName, body); + // Create the index; it's fine if it already exists + return client.createIndex(indexName, body); } } diff --git a/RFS/src/main/java/com/rfs/worker/GlobalState.java b/RFS/src/main/java/com/rfs/worker/GlobalState.java index 75d085bf6..813448792 100644 --- a/RFS/src/main/java/com/rfs/worker/GlobalState.java +++ b/RFS/src/main/java/com/rfs/worker/GlobalState.java @@ -15,7 +15,10 @@ public enum Phase { SNAPSHOT_FAILED, METADATA_IN_PROGRESS, METADATA_COMPLETED, - METADATA_FAILED + METADATA_FAILED, + INDEX_IN_PROGRESS, + INDEX_COMPLETED, + INDEX_FAILED } private AtomicReference phase = new AtomicReference<>(Phase.UNSET); diff --git a/RFS/src/main/java/com/rfs/worker/IndexRunner.java b/RFS/src/main/java/com/rfs/worker/IndexRunner.java new file mode 100644 index 000000000..0b8885527 --- /dev/null +++ b/RFS/src/main/java/com/rfs/worker/IndexRunner.java @@ -0,0 +1,59 @@ +package com.rfs.worker; + +import java.util.Optional; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import com.rfs.cms.CmsClient; +import com.rfs.cms.CmsEntry; +import com.rfs.common.IndexMetadata; +import com.rfs.transformers.Transformer; +import com.rfs.version_os_2_11.IndexCreator_OS_2_11; + +public class IndexRunner implements Runner { + private static final Logger logger = LogManager.getLogger(IndexRunner.class); + + private final IndexStep.SharedMembers members; + + public IndexRunner(GlobalState globalState, CmsClient cmsClient, String snapshotName, IndexMetadata.Factory metadataFactory, + IndexCreator_OS_2_11 indexCreator, Transformer transformer) { + this.members = new IndexStep.SharedMembers(globalState, cmsClient, snapshotName, metadataFactory, indexCreator, transformer); + } + + @Override + public void runInternal() { + WorkerStep nextStep = null; + try { + nextStep = new IndexStep.EnterPhase(members); + + while (nextStep != null) { + nextStep.run(); + nextStep = nextStep.nextStep(); + } + } catch (Exception e) { + throw new IndexMigrationPhaseFailed( + members.globalState.getPhase(), + nextStep, + members.cmsEntry.map(bar -> (CmsEntry.Base) bar), + e + ); + } + } + + @Override + public String getPhaseName() { + return "Index Migration"; + } + + @Override + public Logger getLogger() { + return logger; + } + + public static class IndexMigrationPhaseFailed extends Runner.PhaseFailed { + public IndexMigrationPhaseFailed(GlobalState.Phase phase, WorkerStep nextStep, Optional cmsEntry, Exception e) { + super("Index Migration Phase failed", phase, nextStep, cmsEntry, e); + } + } +} diff --git a/RFS/src/main/java/com/rfs/worker/IndexStep.java b/RFS/src/main/java/com/rfs/worker/IndexStep.java new file mode 100644 index 000000000..1af30a3de --- /dev/null +++ b/RFS/src/main/java/com/rfs/worker/IndexStep.java @@ -0,0 +1,473 @@ +package com.rfs.worker; + +import java.time.Instant; +import java.util.List; +import java.util.Optional; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.rfs.cms.CmsClient; +import com.rfs.cms.CmsEntry; +import com.rfs.cms.OpenSearchCmsClient; +import com.rfs.common.IndexMetadata; +import com.rfs.common.RfsException; +import com.rfs.common.SnapshotRepo; +import com.rfs.transformers.Transformer; +import com.rfs.version_os_2_11.IndexCreator_OS_2_11; + +public class IndexStep { + + public static class SharedMembers { + protected final GlobalState globalState; + protected final CmsClient cmsClient; + protected final String snapshotName; + protected final IndexMetadata.Factory metadataFactory; + protected final IndexCreator_OS_2_11 indexCreator; + protected final Transformer transformer; + protected Optional cmsEntry; + + public SharedMembers(GlobalState globalState, CmsClient cmsClient, String snapshotName, IndexMetadata.Factory metadataFactory, + IndexCreator_OS_2_11 indexCreator, Transformer transformer) { + this.globalState = globalState; + this.cmsClient = cmsClient; + this.snapshotName = snapshotName; + this.metadataFactory = metadataFactory; + this.indexCreator = indexCreator; + this.transformer = transformer; + this.cmsEntry = Optional.empty(); + } + + // A convient way to check if the CMS entry is present before retrieving it. In some places, it's fine/expected + // for the CMS entry to be missing, but in others, it's a problem. + public CmsEntry.Index getCmsEntryNotMissing() { + return cmsEntry.orElseThrow( + () -> new MissingIndexEntry() + ); + } + } + + public static abstract class Base implements WorkerStep { + protected final Logger logger = LogManager.getLogger(getClass()); + protected final SharedMembers members; + + public Base(SharedMembers members) { + this.members = members; + } + } + + /* + * Updates the Worker's phase to indicate we're doing work on an Index Migration + */ + public static class EnterPhase extends Base { + public EnterPhase(SharedMembers members) { + super(members); + } + + @Override + public void run() { + logger.info("Index Migration not yet completed, entering Index Phase..."); + members.globalState.updatePhase(GlobalState.Phase.INDEX_IN_PROGRESS); + } + + @Override + public WorkerStep nextStep() { + return new GetEntry(members); + } + } + + /* + * Gets the current Index Migration entry from the CMS, if it exists + */ + public static class GetEntry extends Base { + + public GetEntry(SharedMembers members) { + super(members); + } + + @Override + public void run() { + logger.info("Pulling the Index Migration entry from the CMS, if it exists..."); + members.cmsEntry = members.cmsClient.getIndexEntry(); + } + + @Override + public WorkerStep nextStep() { + if (members.cmsEntry.isEmpty()) { + return new CreateEntry(members); + } + + CmsEntry.Index currentEntry = members.cmsEntry.get(); + switch (currentEntry.status) { + case SETUP: + // TODO: This uses the client-side clock to evaluate the lease expiration, when we should + // ideally be using the server-side clock. Consider this a temporary solution until we find + // out how to use the server-side clock. + long leaseExpiryMillis = Long.parseLong(currentEntry.leaseExpiry); + Instant leaseExpiryInstant = Instant.ofEpochMilli(leaseExpiryMillis); + boolean leaseExpired = leaseExpiryInstant.isBefore(Instant.now()); + + // Don't try to acquire the lease if we're already at the max number of attempts + if (currentEntry.numAttempts >= CmsEntry.Index.MAX_ATTEMPTS && leaseExpired) { + return new ExitPhaseFailed(members, new MaxAttemptsExceeded()); + } + + if (leaseExpired) { + return new AcquireLease(members); + } + + logger.info("Index Migration entry found, but there's already a valid work lease on it"); + return new RandomWait(members); + + case IN_PROGRESS: + return new GetIndicesToMigrate(members); + case COMPLETED: + return new ExitPhaseSuccess(members); + case FAILED: + return new ExitPhaseFailed(members, new FoundFailedIndexMigration()); + default: + throw new IllegalStateException("Unexpected metadata migration status: " + currentEntry.status); + } + } + } + + public static class CreateEntry extends Base { + + public CreateEntry(SharedMembers members) { + super(members); + } + + @Override + public void run() { + logger.info("Index Migration CMS Entry not found, attempting to create it..."); + members.cmsEntry = members.cmsClient.createIndexEntry(); + logger.info("Index Migration CMS Entry created"); + } + + @Override + public WorkerStep nextStep() { + // Set up the index work entries if we successfully created the CMS entry; otherwise, circle back to the beginning + if (members.cmsEntry.isPresent()) { + return new SetupIndexWorkEntries(members); + } else { + return new GetEntry(members); + } + } + } + + public static class AcquireLease extends Base { + + public AcquireLease(SharedMembers members) { + super(members); + } + + protected long getNowMs() { + return Instant.now().toEpochMilli(); + } + + @Override + public void run() { + // We only get here if we know we want to acquire the lock, so we know the CMS entry should not be null + CmsEntry.Index lastCmsEntry = members.getCmsEntryNotMissing(); + + logger.info("Current Metadata Migration work lease appears to have expired; attempting to acquire it..."); + + CmsEntry.Index updatedEntry = new CmsEntry.Index( + CmsEntry.IndexStatus.IN_PROGRESS, + // Set the next CMS entry based on the current one + // TODO: Should be using the server-side clock here + CmsEntry.Index.getLeaseExpiry(getNowMs(), lastCmsEntry.numAttempts + 1), + lastCmsEntry.numAttempts + 1 + ); + members.cmsEntry = members.cmsClient.updateIndexEntry(updatedEntry, lastCmsEntry); + + if (members.cmsEntry.isPresent()) { + logger.info("Lease acquired"); + } else { + logger.info("Failed to acquire lease"); + } + } + + @Override + public WorkerStep nextStep() { + // Set up the index work entries if we acquired the lease; otherwise, circle back to the beginning after a backoff + if (members.cmsEntry.isPresent()) { + return new SetupIndexWorkEntries(members); + } else { + return new RandomWait(members); + } + } + } + + public static class SetupIndexWorkEntries extends Base { + + public SetupIndexWorkEntries(SharedMembers members) { + super(members); + } + + @Override + public void run() { + // We only get here if we acquired the lock, so we know the CMS entry should not be missing + CmsEntry.Index lastCmsEntry = members.getCmsEntryNotMissing(); + + logger.info("Setting the worker's current work item to be creating the index work entries..."); + members.globalState.updateWorkItem(new OpenSearchWorkItem(OpenSearchCmsClient.CMS_INDEX_NAME, OpenSearchCmsClient.CMS_INDEX_DOC_ID)); + logger.info("Work item set"); + + logger.info("Setting up the Index Work Items..."); + SnapshotRepo.Provider repoDatProvider = members.metadataFactory.getRepoDataProvider(); + for (SnapshotRepo.Index index : repoDatProvider.getIndicesInSnapshot(members.snapshotName)) { + IndexMetadata.Data indexMetadata = members.metadataFactory.fromRepo(members.snapshotName, index.getName()); + logger.info("Creating Index Work Item for index: " + indexMetadata.getName()); + members.cmsClient.createIndexWorkItem(indexMetadata.getName(), indexMetadata.getNumberOfShards()); + } + logger.info("Finished setting up the Index Work Items."); + + logger.info("Updating the Index Migration entry to indicate setup has been completed..."); + CmsEntry.Index updatedEntry = new CmsEntry.Index( + CmsEntry.IndexStatus.COMPLETED, + lastCmsEntry.leaseExpiry, + lastCmsEntry.numAttempts + ); + + members.cmsEntry = members.cmsClient.updateIndexEntry(updatedEntry, lastCmsEntry); + logger.info("Index Migration entry updated"); + + logger.info("Clearing the worker's current work item..."); + members.globalState.updateWorkItem(null); + logger.info("Work item cleared"); + } + + @Override + public WorkerStep nextStep() { + if (members.cmsEntry.isEmpty()) { + // In this scenario, we've done all the work, but failed to update the CMS entry so that we know we've + // done the work. We circle back around to try again, which is made more reasonable by the fact we + // don't re-migrate templates that already exist on the target cluster. If we didn't circle back + // around, there would be a chance that the CMS entry would never be marked as completed. + // + // The CMS entry's retry limit still applies in this case, so there's a limiting factor here. + logger.warn("Completed creating the index work entries but failed to update the Index Migration entry; retrying..."); + return new GetEntry(members); + } + return new GetIndicesToMigrate(members); + } + } + + public static class GetIndicesToMigrate extends Base { + public static final int MAX_WORK_ITEMS = 10; //Arbitrarily chosen + + protected List workItems; + + public GetIndicesToMigrate(SharedMembers members) { + super(members); + workItems = List.of(); + } + + @Override + public void run() { + logger.info("Pulling a list of indices to migrate from the CMS..."); + workItems = members.cmsClient.getAvailableIndexWorkItems(MAX_WORK_ITEMS); + logger.info("Pulled " + workItems.size() + " indices to migrate:"); + logger.info(workItems.toString()); + } + + @Override + public WorkerStep nextStep() { + if (workItems.isEmpty()) { + return new ExitPhaseSuccess(members); + } else { + return new MigrateIndices(members, workItems); + } + } + } + + public static class MigrateIndices extends Base { + protected final List workItems; + + public MigrateIndices(SharedMembers members, List workItems) { + super(members); + this.workItems = workItems; + } + + @Override + public void run() { + logger.info("Migrating current batch of indices..."); + for (CmsEntry.IndexWorkItem workItem : workItems) { + /* + * Try to migrate the index. + * + * If we succeed, we forcefully mark it as completed. When we do so, we don't care if someone else has changed + * the record in the meantime; *we* completed it successfully and that's what matters. Because this is the only + * forceful operation on the entry, the other operations are safe to be non-forceful. + * + * If it's already exceeded the number of attempts, we attempt to mark it as failed. If someone else + * has updated the entry in the meantime, we just move on to the next work item. This is safe because + * it means someone else has either marked it as completed or failed, and either is fine. + * + * If we fail to migrate it, we attempt to increment the attempt count. It's fine if the increment + * fails because we guarantee that we'll attempt the work at least N times, not exactly N times. + */ + if (workItem.numAttempts > CmsEntry.IndexWorkItem.ATTEMPTS_SOFT_LIMIT) { + logger.info("Index Work Item " + workItem.name + " has exceeded the maximum number of attempts; marking it as failed..."); + CmsEntry.IndexWorkItem updatedEntry = new CmsEntry.IndexWorkItem( + workItem.name, + CmsEntry.IndexWorkItemStatus.FAILED, + workItem.numAttempts, + workItem.numShards + ); + + members.cmsClient.updateIndexWorkItem(updatedEntry, workItem).ifPresentOrElse( + value -> logger.info("Index Work Item " + workItem.name + " marked as failed"), + () ->logger.info("Unable to mark Index Work Item " + workItem.name + " as failed") + ); + continue; + } + + try { + logger.info("Migrating index: " + workItem.name); + IndexMetadata.Data indexMetadata = members.metadataFactory.fromRepo(members.snapshotName, workItem.name); + + ObjectNode root = indexMetadata.toObjectNode(); + ObjectNode transformedRoot = members.transformer.transformIndexMetadata(root); + + members.indexCreator.create(transformedRoot, workItem.name, indexMetadata.getId()).ifPresentOrElse( + value -> logger.info("Index " + workItem.name + " created successfully"), + () -> logger.info("Index " + workItem.name + " already existed; no work required") + ); + + logger.info("Forcefully updating the Index Work Item to indicate it has been completed..."); + CmsEntry.IndexWorkItem updatedEntry = new CmsEntry.IndexWorkItem( + workItem.name, + CmsEntry.IndexWorkItemStatus.COMPLETED, + workItem.numAttempts, + workItem.numShards + ); + members.cmsClient.updateIndexWorkItemForceful(updatedEntry); + logger.info("Index Work Item updated"); + } catch (Exception e) { + logger.info("Failed to migrate index: " + workItem.name, e); + logger.info("Updating the Index Work Item with incremented attempt count..."); + CmsEntry.IndexWorkItem updatedEntry = new CmsEntry.IndexWorkItem( + workItem.name, + workItem.status, + workItem.numAttempts + 1, + workItem.numShards + ); + + members.cmsClient.updateIndexWorkItem(updatedEntry, workItem).ifPresentOrElse( + value -> logger.info("Index Work Item " + workItem.name + " attempt count was incremented"), + () ->logger.info("Unable to increment attempt count of Index Work Item " + workItem.name) + ); + } + } + } + + @Override + public WorkerStep nextStep() { + return new GetIndicesToMigrate(members); + } + } + + public static class RandomWait extends Base { + private final static int WAIT_TIME_MS = 5 * 1000; // arbitrarily chosen + + public RandomWait(SharedMembers members) { + super(members); + } + + protected void waitABit() { + try { + Thread.sleep(WAIT_TIME_MS); + } catch (InterruptedException e) { + logger.error("Interrupted while performing a wait", e); + throw new IndexMigrationFailed("Interrupted"); + } + } + + @Override + public void run() { + logger.info("Backing off for " + WAIT_TIME_MS + " milliseconds before checking the Index Migration entry again..."); + waitABit(); + } + + @Override + public WorkerStep nextStep() { + return new GetEntry(members); + } + } + + public static class ExitPhaseSuccess extends Base { + public ExitPhaseSuccess(SharedMembers members) { + super(members); + } + + @Override + public void run() { + logger.info("Index Migration completed, exiting Index Phase..."); + members.globalState.updatePhase(GlobalState.Phase.INDEX_COMPLETED); + } + + @Override + public WorkerStep nextStep() { + return null; + } + } + + public static class ExitPhaseFailed extends Base { + private final IndexMigrationFailed e; + + public ExitPhaseFailed(SharedMembers members, IndexMigrationFailed e) { + super(members); + this.e = e; + } + + @Override + public void run() { + // We either failed the Metadata Migration or found it had already been failed; either way this + // should not be missing + CmsEntry.Index lastCmsEntry = members.getCmsEntryNotMissing(); + + logger.error("Metadata Migration failed"); + CmsEntry.Index updatedEntry = new CmsEntry.Index( + CmsEntry.IndexStatus.FAILED, + lastCmsEntry.leaseExpiry, + lastCmsEntry.numAttempts + ); + members.cmsClient.updateIndexEntry(updatedEntry, lastCmsEntry); + members.globalState.updatePhase(GlobalState.Phase.INDEX_FAILED); + } + + @Override + public WorkerStep nextStep() { + throw e; + } + } + + public static class IndexMigrationFailed extends RfsException { + public IndexMigrationFailed(String message) { + super("The Index Migration has failed. Reason: " + message); + } + } + + public static class MissingIndexEntry extends RfsException { + public MissingIndexEntry() { + super("The Index Migration CMS entry we expected to be stored in local memory was null." + + " This should never happen." + ); + } + } + + public static class FoundFailedIndexMigration extends IndexMigrationFailed { + public FoundFailedIndexMigration() { + super("We checked the status in the CMS and found it had failed. Aborting."); + } + } + + public static class MaxAttemptsExceeded extends IndexMigrationFailed { + public MaxAttemptsExceeded() { + super("We reached the limit of " + CmsEntry.Index.MAX_ATTEMPTS + " attempts to complete the Index Migration"); + } + } +} diff --git a/RFS/src/main/java/com/rfs/worker/MetadataStep.java b/RFS/src/main/java/com/rfs/worker/MetadataStep.java index 255673c29..fec23f94b 100644 --- a/RFS/src/main/java/com/rfs/worker/MetadataStep.java +++ b/RFS/src/main/java/com/rfs/worker/MetadataStep.java @@ -41,7 +41,7 @@ public SharedMembers(GlobalState globalState, CmsClient cmsClient, String snapsh // for the CMS entry to be missing, but in others, it's a problem. public CmsEntry.Metadata getCmsEntryNotMissing() { return cmsEntry.orElseThrow( - () -> new MissingMigrationEntry("The Metadata Migration CMS entry we expected to be stored in local memory was empty") + () -> new MissingMigrationEntry() ); } } @@ -170,17 +170,18 @@ protected long getNowMs() { @Override public void run() { // We only get here if we know we want to acquire the lock, so we know the CMS entry should not be null - CmsEntry.Metadata currentCmsEntry = members.getCmsEntryNotMissing(); + CmsEntry.Metadata lastCmsEntry = members.getCmsEntryNotMissing(); logger.info("Current Metadata Migration work lease appears to have expired; attempting to acquire it..."); - // Set the next CMS entry based on the current one - members.cmsEntry = members.cmsClient.updateMetadataEntry( + CmsEntry.Metadata updatedEntry = new CmsEntry.Metadata( CmsEntry.MetadataStatus.IN_PROGRESS, + // Set the next CMS entry based on the current one // TODO: Should be using the server-side clock here - CmsEntry.Metadata.getLeaseExpiry(getNowMs(), currentCmsEntry.numAttempts + 1), - currentCmsEntry.numAttempts + 1 + CmsEntry.Metadata.getLeaseExpiry(getNowMs(), lastCmsEntry.numAttempts + 1), + lastCmsEntry.numAttempts + 1 ); + members.cmsEntry = members.cmsClient.updateMetadataEntry(updatedEntry, lastCmsEntry); if (members.cmsEntry.isPresent()) { logger.info("Lease acquired"); @@ -208,8 +209,8 @@ public MigrateTemplates(SharedMembers members) { @Override public void run() { - // We only get here if we acquired the lock, so we know the CMS entry should not be null - CmsEntry.Metadata currentCmsEntry = members.getCmsEntryNotMissing(); + // We only get here if we acquired the lock, so we know the CMS entry should not be missing + CmsEntry.Metadata lastCmsEntry = members.getCmsEntryNotMissing(); logger.info("Setting the worker's current work item to be the Metadata Migration..."); members.globalState.updateWorkItem(new OpenSearchWorkItem(OpenSearchCmsClient.CMS_INDEX_NAME, OpenSearchCmsClient.CMS_METADATA_DOC_ID)); @@ -223,11 +224,12 @@ public void run() { logger.info("Templates migration complete"); logger.info("Updating the Metadata Migration entry to indicate completion..."); - members.cmsEntry = members.cmsClient.updateMetadataEntry( + CmsEntry.Metadata updatedEntry = new CmsEntry.Metadata( CmsEntry.MetadataStatus.COMPLETED, - currentCmsEntry.leaseExpiry, - currentCmsEntry.numAttempts + lastCmsEntry.leaseExpiry, + lastCmsEntry.numAttempts ); + members.cmsEntry = members.cmsClient.updateMetadataEntry(updatedEntry, lastCmsEntry); logger.info("Metadata Migration entry updated"); logger.info("Clearing the worker's current work item..."); @@ -307,15 +309,16 @@ public ExitPhaseFailed(SharedMembers members, MetadataMigrationFailed e) { @Override public void run() { // We either failed the Metadata Migration or found it had already been failed; either way this - // should not be null - CmsEntry.Metadata currentCmsEntry = members.getCmsEntryNotMissing(); + // should not be missing + CmsEntry.Metadata lastCmsEntry = members.getCmsEntryNotMissing(); logger.error("Metadata Migration failed"); - members.cmsClient.updateMetadataEntry( + CmsEntry.Metadata updatedEntry = new CmsEntry.Metadata( CmsEntry.MetadataStatus.FAILED, - currentCmsEntry.leaseExpiry, - currentCmsEntry.numAttempts + lastCmsEntry.leaseExpiry, + lastCmsEntry.numAttempts ); + members.cmsClient.updateMetadataEntry(updatedEntry, lastCmsEntry); members.globalState.updatePhase(GlobalState.Phase.METADATA_FAILED); } @@ -324,8 +327,9 @@ public WorkerStep nextStep() { throw e; } } + public static class MissingMigrationEntry extends RfsException { - public MissingMigrationEntry(String message) { + public MissingMigrationEntry() { super("The Metadata Migration CMS entry we expected to be stored in local memory was null." + " This should never happen." ); diff --git a/RFS/src/main/java/com/rfs/worker/SnapshotStep.java b/RFS/src/main/java/com/rfs/worker/SnapshotStep.java index 798bc6139..6b0e38f3f 100644 --- a/RFS/src/main/java/com/rfs/worker/SnapshotStep.java +++ b/RFS/src/main/java/com/rfs/worker/SnapshotStep.java @@ -8,6 +8,7 @@ import com.rfs.cms.CmsClient; import com.rfs.cms.CmsEntry; import com.rfs.cms.CmsEntry.SnapshotStatus; +import com.rfs.common.RfsException; import com.rfs.common.SnapshotCreator; import com.rfs.common.SnapshotCreator.SnapshotCreationFailed; @@ -25,6 +26,14 @@ public SharedMembers(GlobalState globalState, CmsClient cmsClient, SnapshotCreat this.snapshotCreator = snapshotCreator; this.cmsEntry = Optional.empty(); } + + // A convient way to check if the CMS entry is present before retrieving it. In some places, it's fine/expected + // for the CMS entry to be missing, but in others, it's a problem. + public CmsEntry.Snapshot getCmsEntryNotMissing() { + return cmsEntry.orElseThrow( + () -> new MissingSnapshotEntry() + ); + } } public static abstract class Base implements WorkerStep { @@ -87,7 +96,7 @@ public CreateEntry(SharedMembers members) { @Override public void run() { logger.info("Snapshot CMS Entry not found, attempting to create it..."); - members.cmsClient.createSnapshotEntry(members.snapshotCreator.getSnapshotName()); + members.cmsEntry = members.cmsClient.createSnapshotEntry(members.snapshotCreator.getSnapshotName()); logger.info("Snapshot CMS Entry created"); } @@ -107,12 +116,16 @@ public InitiateSnapshot(SharedMembers members) { @Override public void run() { + // We only get here if we know we want to create a snapshot, so we know the CMS entry should not be null + CmsEntry.Snapshot lastCmsEntry = members.getCmsEntryNotMissing(); + logger.info("Attempting to initiate the snapshot..."); members.snapshotCreator.registerRepo(); members.snapshotCreator.createSnapshot(); logger.info("Snapshot in progress..."); - members.cmsClient.updateSnapshotEntry(members.snapshotCreator.getSnapshotName(), SnapshotStatus.IN_PROGRESS); + CmsEntry.Snapshot updatedEntry = new CmsEntry.Snapshot(members.snapshotCreator.getSnapshotName(), SnapshotStatus.IN_PROGRESS); + members.cmsEntry = members.cmsClient.updateSnapshotEntry(updatedEntry, lastCmsEntry); } @Override @@ -170,7 +183,10 @@ public ExitPhaseSuccess(SharedMembers members) { @Override public void run() { - members.cmsClient.updateSnapshotEntry(members.snapshotCreator.getSnapshotName(), SnapshotStatus.COMPLETED); + CmsEntry.Snapshot lastCmsEntry = members.getCmsEntryNotMissing(); + CmsEntry.Snapshot updatedEntry = new CmsEntry.Snapshot(members.snapshotCreator.getSnapshotName(), SnapshotStatus.COMPLETED); + members.cmsClient.updateSnapshotEntry(updatedEntry, lastCmsEntry); + members.globalState.updatePhase(GlobalState.Phase.SNAPSHOT_COMPLETED); logger.info("Snapshot completed, exiting Snapshot Phase..."); } @@ -195,7 +211,10 @@ public ExitPhaseSnapshotFailed(SharedMembers members, SnapshotCreationFailed e) @Override public void run() { logger.error("Snapshot creation failed"); - members.cmsClient.updateSnapshotEntry(members.snapshotCreator.getSnapshotName(), SnapshotStatus.FAILED); + CmsEntry.Snapshot lastCmsEntry = members.getCmsEntryNotMissing(); + CmsEntry.Snapshot updatedEntry = new CmsEntry.Snapshot(members.snapshotCreator.getSnapshotName(), SnapshotStatus.FAILED); + members.cmsClient.updateSnapshotEntry(updatedEntry, lastCmsEntry); + members.globalState.updatePhase(GlobalState.Phase.SNAPSHOT_FAILED); } @@ -204,4 +223,14 @@ public WorkerStep nextStep() { throw e; } } + + + + public static class MissingSnapshotEntry extends RfsException { + public MissingSnapshotEntry() { + super("The Snapshot CMS entry we expected to be stored in local memory was null." + + " This should never happen." + ); + } + } } diff --git a/RFS/src/test/java/com/rfs/cms/CmsEntryTest.java b/RFS/src/test/java/com/rfs/cms/CmsEntryTest.java index c0c853477..e25960c1b 100644 --- a/RFS/src/test/java/com/rfs/cms/CmsEntryTest.java +++ b/RFS/src/test/java/com/rfs/cms/CmsEntryTest.java @@ -27,7 +27,7 @@ void Metadata_getLeaseExpiry_HappyPath(int numAttempts) { String result = CmsEntry.Metadata.getLeaseExpiry(0, numAttempts); // Check the results - assertEquals(Long.toString(CmsEntry.Metadata.METADATA_LEASE_MS * numAttempts), result); + assertEquals(Long.toString(CmsEntry.Metadata.LEASE_MS * numAttempts), result); } static Stream provide_Metadata_getLeaseExpiry_UnhappyPath_args() { diff --git a/RFS/src/test/java/com/rfs/framework/SimpleRestoreFromSnapshot_ES_7_10.java b/RFS/src/test/java/com/rfs/framework/SimpleRestoreFromSnapshot_ES_7_10.java index 82b6dbecc..5ca87ee1a 100644 --- a/RFS/src/test/java/com/rfs/framework/SimpleRestoreFromSnapshot_ES_7_10.java +++ b/RFS/src/test/java/com/rfs/framework/SimpleRestoreFromSnapshot_ES_7_10.java @@ -13,6 +13,7 @@ import com.rfs.common.IndexMetadata; import com.rfs.common.LuceneDocumentsReader; import com.rfs.common.OpenSearchClient; +import com.rfs.common.SnapshotRepo; import com.rfs.common.SnapshotShardUnpacker; import com.rfs.version_es_7_10.IndexMetadataFactory_ES_7_10; import com.rfs.version_es_7_10.ShardMetadataFactory_ES_7_10; @@ -29,12 +30,12 @@ public List extraSnapshotIndexData(final String localPath, f IOUtils.rm(unpackedShardDataDir); final var repo = new FileSystemRepo(Path.of(localPath)); - final var snapShotProvider = new SnapshotRepoProvider_ES_7_10(repo); + SnapshotRepo.Provider snapShotProvider = new SnapshotRepoProvider_ES_7_10(repo); final List indices = snapShotProvider.getIndicesInSnapshot(snapshotName) .stream() .map(index -> { try { - return new IndexMetadataFactory_ES_7_10().fromRepo(repo, snapShotProvider, snapshotName, index.getName()); + return new IndexMetadataFactory_ES_7_10(snapShotProvider).fromRepo(snapshotName, index.getName()); } catch (final Exception e) { throw new RuntimeException(e); } diff --git a/RFS/src/test/java/com/rfs/worker/IndexRunnerTest.java b/RFS/src/test/java/com/rfs/worker/IndexRunnerTest.java new file mode 100644 index 000000000..fd0ce72a3 --- /dev/null +++ b/RFS/src/test/java/com/rfs/worker/IndexRunnerTest.java @@ -0,0 +1,44 @@ +package com.rfs.worker; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.*; + +import java.util.Optional; + +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import com.rfs.cms.CmsClient; +import com.rfs.common.IndexMetadata; +import com.rfs.common.RfsException; +import com.rfs.transformers.Transformer; +import com.rfs.version_os_2_11.IndexCreator_OS_2_11; + +public class IndexRunnerTest { + + @Test + void run_encountersAnException_asExpected() { + // Setup + GlobalState globalState = Mockito.mock(GlobalState.class); + CmsClient cmsClient = Mockito.mock(CmsClient.class); + String snapshotName = "testSnapshot"; + IndexMetadata.Factory metadataFactory = Mockito.mock(IndexMetadata.Factory.class); + IndexCreator_OS_2_11 creator = Mockito.mock(IndexCreator_OS_2_11.class); + Transformer transformer = Mockito.mock(Transformer.class); + RfsException testException = new RfsException("Unit test"); + + doThrow(testException).when(cmsClient).getIndexEntry(); + when(globalState.getPhase()).thenReturn(GlobalState.Phase.INDEX_IN_PROGRESS); + + // Run the test + IndexRunner testRunner = new IndexRunner(globalState, cmsClient, snapshotName, metadataFactory, creator, transformer); + final var e = assertThrows(IndexRunner.IndexMigrationPhaseFailed.class, () -> testRunner.run()); + + // Verify the results + assertEquals(GlobalState.Phase.INDEX_IN_PROGRESS, e.phase); + assertEquals(IndexStep.GetEntry.class, e.nextStep.getClass()); + assertEquals(Optional.empty(), e.cmsEntry); + assertEquals(testException, e.e); + } +} diff --git a/RFS/src/test/java/com/rfs/worker/IndexStepTest.java b/RFS/src/test/java/com/rfs/worker/IndexStepTest.java new file mode 100644 index 000000000..ba201ff14 --- /dev/null +++ b/RFS/src/test/java/com/rfs/worker/IndexStepTest.java @@ -0,0 +1,534 @@ +package com.rfs.worker; + +import java.time.Duration; +import java.time.Instant; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; + +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.rfs.cms.CmsClient; +import com.rfs.cms.CmsEntry; +import com.rfs.cms.OpenSearchCmsClient; +import com.rfs.common.IndexMetadata; +import com.rfs.common.SnapshotRepo; +import com.rfs.transformers.Transformer; +import com.rfs.version_os_2_11.IndexCreator_OS_2_11; +import com.rfs.worker.IndexStep.SharedMembers; +import com.rfs.worker.IndexStep.MaxAttemptsExceeded; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.*; + +@ExtendWith(MockitoExtension.class) +public class IndexStepTest { + private SharedMembers testMembers; + + @BeforeEach + void setUp() { + GlobalState globalState = Mockito.mock(GlobalState.class); + CmsClient cmsClient = Mockito.mock(CmsClient.class); + String snapshotName = "test"; + + IndexMetadata.Factory metadataFactory = Mockito.mock(IndexMetadata.Factory.class); + IndexCreator_OS_2_11 indexCreator = Mockito.mock(IndexCreator_OS_2_11.class); + Transformer transformer = Mockito.mock(Transformer.class); + testMembers = new SharedMembers(globalState, cmsClient, snapshotName, metadataFactory, indexCreator, transformer); + } + + @Test + void EnterPhase_AsExpected() { + // Run the test + IndexStep.EnterPhase testStep = new IndexStep.EnterPhase(testMembers); + testStep.run(); + WorkerStep nextStep = testStep.nextStep(); + + // Check the results + Mockito.verify(testMembers.globalState, times(1)).updatePhase(GlobalState.Phase.INDEX_IN_PROGRESS); + assertEquals(IndexStep.GetEntry.class, nextStep.getClass()); + } + + static Stream provideGetEntryArgs() { + return Stream.of( + // There is no CMS entry, so we need to create one + Arguments.of( + Optional.empty(), + IndexStep.CreateEntry.class + ), + + // The CMS entry has an expired lease and is under the retry limit, so we try to acquire the lease + Arguments.of( + Optional.of(new CmsEntry.Index( + CmsEntry.IndexStatus.SETUP, + String.valueOf(Instant.now().minus(Duration.ofDays(1)).toEpochMilli()), + CmsEntry.Index.MAX_ATTEMPTS - 1 + )), + IndexStep.AcquireLease.class + ), + + // The CMS entry has an expired lease and is at the retry limit, so we exit as failed + Arguments.of( + Optional.of(new CmsEntry.Index( + CmsEntry.IndexStatus.SETUP, + String.valueOf(Instant.now().minus(Duration.ofDays(1)).toEpochMilli()), + CmsEntry.Index.MAX_ATTEMPTS + )), + IndexStep.ExitPhaseFailed.class + ), + + // The CMS entry has an expired lease and is over the retry limit, so we exit as failed + Arguments.of( + Optional.of(new CmsEntry.Index( + CmsEntry.IndexStatus.SETUP, + String.valueOf(Instant.now().minus(Duration.ofDays(1)).toEpochMilli()), + CmsEntry.Index.MAX_ATTEMPTS + 1 + )), + IndexStep.ExitPhaseFailed.class + ), + + // The CMS entry has valid lease and is under the retry limit, so we back off a bit + Arguments.of( + Optional.of(new CmsEntry.Index( + CmsEntry.IndexStatus.SETUP, + String.valueOf(Instant.now().plus(Duration.ofDays(1)).toEpochMilli()), + CmsEntry.Index.MAX_ATTEMPTS - 1 + )), + IndexStep.RandomWait.class + ), + + // The CMS entry has valid lease and is at the retry limit, so we back off a bit + Arguments.of( + Optional.of(new CmsEntry.Index( + CmsEntry.IndexStatus.SETUP, + String.valueOf(Instant.now().plus(Duration.ofDays(1)).toEpochMilli()), + CmsEntry.Index.MAX_ATTEMPTS + )), + IndexStep.RandomWait.class + ), + + // The CMS entry is marked as in progress, so we try to do some work + Arguments.of( + Optional.of(new CmsEntry.Index( + CmsEntry.IndexStatus.IN_PROGRESS, + String.valueOf(Instant.now().plus(Duration.ofDays(1)).toEpochMilli()), + CmsEntry.Index.MAX_ATTEMPTS - 1 + )), + IndexStep.GetIndicesToMigrate.class + ), + + // The CMS entry is marked as completed, so we exit as success + Arguments.of( + Optional.of(new CmsEntry.Index( + CmsEntry.IndexStatus.COMPLETED, + String.valueOf(Instant.now().plus(Duration.ofDays(1)).toEpochMilli()), + CmsEntry.Index.MAX_ATTEMPTS - 1 + )), + IndexStep.ExitPhaseSuccess.class + ), + + // The CMS entry is marked as failed, so we exit as failed + Arguments.of( + Optional.of(new CmsEntry.Index( + CmsEntry.IndexStatus.FAILED, + String.valueOf(Instant.now().plus(Duration.ofDays(1)).toEpochMilli()), + CmsEntry.Index.MAX_ATTEMPTS - 1 + )), + IndexStep.ExitPhaseFailed.class + ) + ); + } + + @ParameterizedTest + @MethodSource("provideGetEntryArgs") + void GetEntry_AsExpected(Optional index, Class nextStepClass) { + // Set up the test + Mockito.when(testMembers.cmsClient.getIndexEntry()).thenReturn(index); + + // Run the test + IndexStep.GetEntry testStep = new IndexStep.GetEntry(testMembers); + testStep.run(); + WorkerStep nextStep = testStep.nextStep(); + + // Check the results + Mockito.verify(testMembers.cmsClient, times(1)).getIndexEntry(); + assertEquals(nextStepClass, nextStep.getClass()); + } + + static Stream provideCreateEntryArgs() { + return Stream.of( + // We were able to create the CMS entry ourselves, so we have the work lease + Arguments.of( + Optional.of(new CmsEntry.Index( + CmsEntry.IndexStatus.IN_PROGRESS, + String.valueOf(Instant.now().plus(Duration.ofDays(1)).toEpochMilli()), + 1 + )), + IndexStep.SetupIndexWorkEntries.class + ), + + // We were unable to create the CMS entry ourselves, so we do not have the work lease + Arguments.of(Optional.empty(), IndexStep.GetEntry.class) + ); + } + + @ParameterizedTest + @MethodSource("provideCreateEntryArgs") + void CreateEntry_AsExpected(Optional createdEntry, Class nextStepClass) { + // Set up the test + Mockito.when(testMembers.cmsClient.createIndexEntry()).thenReturn(createdEntry); + + // Run the test + IndexStep.CreateEntry testStep = new IndexStep.CreateEntry(testMembers); + testStep.run(); + WorkerStep nextStep = testStep.nextStep(); + + // Check the results + Mockito.verify(testMembers.cmsClient, times(1)).createIndexEntry(); + assertEquals(nextStepClass, nextStep.getClass()); + } + + public static class TestAcquireLease extends IndexStep.AcquireLease { + public static final int milliSinceEpoch = 42; // Arbitrarily chosen, but predictable + + public TestAcquireLease(SharedMembers members) { + super(members); + } + + @Override + protected long getNowMs() { + return milliSinceEpoch; + } + } + + static Stream provideAcquireLeaseArgs() { + return Stream.of( + // We were able to acquire the lease + Arguments.of( + Optional.of(new CmsEntry.Index( + CmsEntry.IndexStatus.IN_PROGRESS, + String.valueOf(Instant.now().plus(Duration.ofDays(1)).toEpochMilli()), + 1 + )), + IndexStep.SetupIndexWorkEntries.class + ), + + // We were unable to acquire the lease + Arguments.of(Optional.empty(), IndexStep.RandomWait.class) + ); + } + + @ParameterizedTest + @MethodSource("provideAcquireLeaseArgs") + void AcquireLease_AsExpected(Optional updatedEntry, Class nextStepClass) { + // Set up the test + var existingEntry = Optional.of(new CmsEntry.Index( + CmsEntry.IndexStatus.IN_PROGRESS, + CmsEntry.Index.getLeaseExpiry(0L, CmsEntry.Index.MAX_ATTEMPTS - 1), + CmsEntry.Index.MAX_ATTEMPTS - 1 + )); + testMembers.cmsEntry = existingEntry; + + Mockito.when(testMembers.cmsClient.updateIndexEntry( + any(CmsEntry.Index.class), eq(existingEntry.get()) + )).thenReturn(updatedEntry); + + // Run the test + IndexStep.AcquireLease testStep = new TestAcquireLease(testMembers); + testStep.run(); + WorkerStep nextStep = testStep.nextStep(); + + // Check the results + var expectedEntry = new CmsEntry.Index( + CmsEntry.IndexStatus.IN_PROGRESS, + CmsEntry.Index.getLeaseExpiry(TestAcquireLease.milliSinceEpoch, CmsEntry.Index.MAX_ATTEMPTS), + CmsEntry.Index.MAX_ATTEMPTS + ); + Mockito.verify(testMembers.cmsClient, times(1)).updateIndexEntry( + expectedEntry, existingEntry.get() + ); + assertEquals(nextStepClass, nextStep.getClass()); + } + + static Stream provideSetupIndexWorkEntriesArgs() { + return Stream.of( + // We were able to acquire the lease + Arguments.of( + Optional.of(new CmsEntry.Index( + CmsEntry.IndexStatus.COMPLETED, + String.valueOf(42), + 1 + )), + IndexStep.GetIndicesToMigrate.class + ), + + // We were unable to acquire the lease + Arguments.of(Optional.empty(), IndexStep.GetEntry.class) + ); + } + + @ParameterizedTest + @MethodSource("provideSetupIndexWorkEntriesArgs") + void SetupIndexWorkEntries_AsExpected(Optional updatedEntry, Class nextStepClass) { + // Set up the test + var existingEntry = Optional.of(new CmsEntry.Index( + CmsEntry.IndexStatus.IN_PROGRESS, + String.valueOf(42), + 1 + )); + testMembers.cmsEntry = existingEntry; + + SnapshotRepo.Provider repoDatProvider = Mockito.mock(SnapshotRepo.Provider.class); + Mockito.when(testMembers.metadataFactory.getRepoDataProvider()).thenReturn(repoDatProvider); + + SnapshotRepo.Index index1 = Mockito.mock(SnapshotRepo.Index.class); + Mockito.when(index1.getName()).thenReturn("index1"); + SnapshotRepo.Index index2 = Mockito.mock(SnapshotRepo.Index.class); + Mockito.when(index2.getName()).thenReturn("index2"); + Mockito.when(repoDatProvider.getIndicesInSnapshot(testMembers.snapshotName)).thenReturn( + Stream.of(index1, index2).collect(Collectors.toList()) + ); + + IndexMetadata.Data indexMetadata1 = Mockito.mock(IndexMetadata.Data.class); + Mockito.when(indexMetadata1.getName()).thenReturn("index1"); + Mockito.when(indexMetadata1.getNumberOfShards()).thenReturn(1); + Mockito.when(testMembers.metadataFactory.fromRepo(testMembers.snapshotName, "index1")).thenReturn(indexMetadata1); + + IndexMetadata.Data indexMetadata2 = Mockito.mock(IndexMetadata.Data.class); + Mockito.when(indexMetadata2.getName()).thenReturn("index2"); + Mockito.when(indexMetadata2.getNumberOfShards()).thenReturn(2); + Mockito.when(testMembers.metadataFactory.fromRepo(testMembers.snapshotName, "index2")).thenReturn(indexMetadata2); + + CmsEntry.Index expectedEntry = new CmsEntry.Index( + CmsEntry.IndexStatus.COMPLETED, + existingEntry.get().leaseExpiry, + existingEntry.get().numAttempts + ); + Mockito.when(testMembers.cmsClient.updateIndexEntry( + expectedEntry, existingEntry.get() + )).thenReturn(updatedEntry); + + // Run the test + IndexStep.SetupIndexWorkEntries testStep = new IndexStep.SetupIndexWorkEntries(testMembers); + testStep.run(); + WorkerStep nextStep = testStep.nextStep(); + + // Check the results + Mockito.verify(testMembers.globalState, times(1)).updateWorkItem( + argThat(argument -> { + if (!(argument instanceof OpenSearchWorkItem)) { + return false; + } + OpenSearchWorkItem workItem = (OpenSearchWorkItem) argument; + return workItem.indexName.equals(OpenSearchCmsClient.CMS_INDEX_NAME) && + workItem.documentId.equals(OpenSearchCmsClient.CMS_INDEX_DOC_ID); + }) + ); + Mockito.verify(testMembers.cmsClient, times(1)).createIndexWorkItem( + "index1", 1 + ); + Mockito.verify(testMembers.cmsClient, times(1)).createIndexWorkItem( + "index2", 2 + ); + + Mockito.verify(testMembers.cmsClient, times(1)).updateIndexEntry( + expectedEntry, existingEntry.get() + ); + Mockito.verify(testMembers.globalState, times(1)).updateWorkItem( + null + ); + + assertEquals(nextStepClass, nextStep.getClass()); + } + + static Stream provideGetIndicesToMigrateArgs() { + return Stream.of( + // There's still work to do + Arguments.of( + Stream.of( + new CmsEntry.IndexWorkItem("index1", CmsEntry.IndexWorkItemStatus.NOT_STARTED, 1, 1), + new CmsEntry.IndexWorkItem("index2", CmsEntry.IndexWorkItemStatus.NOT_STARTED, 1, 2) + ).collect(Collectors.toList()), + IndexStep.MigrateIndices.class + ), + + // There's no more work to do + Arguments.of(List.of(), IndexStep.ExitPhaseSuccess.class) + ); + } + + @ParameterizedTest + @MethodSource("provideGetIndicesToMigrateArgs") + void GetIndicesToMigrate_AsExpected(List workItems, Class nextStepClass) { + // Set up the test + Mockito.when(testMembers.cmsClient.getAvailableIndexWorkItems(IndexStep.GetIndicesToMigrate.MAX_WORK_ITEMS)).thenReturn(workItems); + + // Run the test + IndexStep.GetIndicesToMigrate testStep = new IndexStep.GetIndicesToMigrate(testMembers); + testStep.run(); + WorkerStep nextStep = testStep.nextStep(); + + // Check the results + assertEquals(nextStepClass, nextStep.getClass()); + } + + static Stream provideMigrateIndicesArgs() { + return Stream.of( + // We have an to migrate and we create it + Arguments.of( + new CmsEntry.IndexWorkItem("index1", CmsEntry.IndexWorkItemStatus.NOT_STARTED, 1, 1), + Optional.of(Mockito.mock(ObjectNode.class)), + new CmsEntry.IndexWorkItem("index1", CmsEntry.IndexWorkItemStatus.COMPLETED, 1, 1) + ), + + // We have an index to migrate and someone else created it before us + Arguments.of( + new CmsEntry.IndexWorkItem("index2", CmsEntry.IndexWorkItemStatus.NOT_STARTED, 1, 2), + Optional.empty(), + new CmsEntry.IndexWorkItem("index2", CmsEntry.IndexWorkItemStatus.COMPLETED, 1, 2) + ) + ); + } + + @ParameterizedTest + @MethodSource("provideMigrateIndicesArgs") + void MigrateIndices_workToDo_AsExpected(CmsEntry.IndexWorkItem workItem, Optional createResponse, CmsEntry.IndexWorkItem updatedItem) { + // Set up the test + IndexMetadata.Data indexMetadata = Mockito.mock(IndexMetadata.Data.class); + Mockito.when(testMembers.metadataFactory.fromRepo(testMembers.snapshotName, workItem.name)).thenReturn(indexMetadata); + + ObjectNode root = Mockito.mock(ObjectNode.class); + Mockito.when(indexMetadata.toObjectNode()).thenReturn(root); + Mockito.when(indexMetadata.getId()).thenReturn("index-id"); + ObjectNode transformedRoot = Mockito.mock(ObjectNode.class); + Mockito.when(testMembers.transformer.transformIndexMetadata(root)).thenReturn(transformedRoot); + Mockito.when(testMembers.indexCreator.create(transformedRoot, workItem.name, "index-id")).thenReturn(createResponse); + + // Run the test + IndexStep.MigrateIndices testStep = new IndexStep.MigrateIndices(testMembers, List.of(workItem)); + testStep.run(); + WorkerStep nextStep = testStep.nextStep(); + + // Check the results + Mockito.verify(testMembers.cmsClient, times(1)).updateIndexWorkItemForceful(updatedItem); + assertEquals(IndexStep.GetIndicesToMigrate.class, nextStep.getClass()); + } + + @Test + void MigrateIndices_exceededAttempts_AsExpected(){ + // Set up the test + CmsEntry.IndexWorkItem workItem = new CmsEntry.IndexWorkItem("index1", CmsEntry.IndexWorkItemStatus.NOT_STARTED, CmsEntry.IndexWorkItem.ATTEMPTS_SOFT_LIMIT + 1, 1); + CmsEntry.IndexWorkItem updatedItem = new CmsEntry.IndexWorkItem("index1", CmsEntry.IndexWorkItemStatus.FAILED, CmsEntry.IndexWorkItem.ATTEMPTS_SOFT_LIMIT + 1, 1); + + // Run the test + IndexStep.MigrateIndices testStep = new IndexStep.MigrateIndices(testMembers, List.of(workItem)); + testStep.run(); + WorkerStep nextStep = testStep.nextStep(); + + // Check the results + Mockito.verify(testMembers.cmsClient, times(1)).updateIndexWorkItem(updatedItem, workItem); + assertEquals(IndexStep.GetIndicesToMigrate.class, nextStep.getClass()); + } + + @Test + void MigrateIndices_workItemFailed_AsExpected(){ + // Set up the test + CmsEntry.IndexWorkItem workItem = new CmsEntry.IndexWorkItem("index1", CmsEntry.IndexWorkItemStatus.NOT_STARTED, 1, 1); + CmsEntry.IndexWorkItem updatedItem = new CmsEntry.IndexWorkItem("index1", CmsEntry.IndexWorkItemStatus.NOT_STARTED, 2, 1); + doThrow(new RuntimeException("Test exception")).when(testMembers.metadataFactory).fromRepo(testMembers.snapshotName, workItem.name); + + // Run the test + IndexStep.MigrateIndices testStep = new IndexStep.MigrateIndices(testMembers, List.of(workItem)); + testStep.run(); + WorkerStep nextStep = testStep.nextStep(); + + // Check the results + Mockito.verify(testMembers.cmsClient, times(1)).updateIndexWorkItem(updatedItem, workItem); + assertEquals(IndexStep.GetIndicesToMigrate.class, nextStep.getClass()); + } + + public static class TestRandomWait extends IndexStep.RandomWait { + public TestRandomWait(SharedMembers members) { + super(members); + } + + @Override + protected void waitABit() { + // do nothing + } + } + + @Test + void RandomWait_AsExpected() { + // Run the test + IndexStep.RandomWait testStep = new TestRandomWait(testMembers); + testStep.run(); + WorkerStep nextStep = testStep.nextStep(); + + // Check the results + assertEquals(IndexStep.GetEntry.class, nextStep.getClass()); + } + + @Test + void ExitPhaseSuccess_AsExpected() { + // Run the test + IndexStep.ExitPhaseSuccess testStep = new IndexStep.ExitPhaseSuccess(testMembers); + testStep.run(); + WorkerStep nextStep = testStep.nextStep(); + + // Check the results + Mockito.verify(testMembers.globalState, times(1)).updatePhase( + GlobalState.Phase.INDEX_COMPLETED + ); + assertEquals(null, nextStep); + } + + @Test + void ExitPhaseFailed_AsExpected() { + // Set up the test + MaxAttemptsExceeded e = new MaxAttemptsExceeded(); + + var existingEntry = Optional.of(new CmsEntry.Index( + CmsEntry.IndexStatus.SETUP, + String.valueOf(42), + 1 + )); + testMembers.cmsEntry = existingEntry; + + // Run the test + IndexStep.ExitPhaseFailed testStep = new IndexStep.ExitPhaseFailed(testMembers, e); + testStep.run(); + assertThrows(MaxAttemptsExceeded.class, () -> { + testStep.nextStep(); + }); + + // Check the results + var expectedEntry = new CmsEntry.Index( + CmsEntry.IndexStatus.FAILED, + existingEntry.get().leaseExpiry, + existingEntry.get().numAttempts + ); + Mockito.verify(testMembers.cmsClient, times(1)).updateIndexEntry( + expectedEntry, existingEntry.get() + ); + Mockito.verify(testMembers.globalState, times(1)).updatePhase( + GlobalState.Phase.INDEX_FAILED + ); + } + +} diff --git a/RFS/src/test/java/com/rfs/worker/MetadataRunnerTest.java b/RFS/src/test/java/com/rfs/worker/MetadataRunnerTest.java index 2e4f6cf79..83e18c279 100644 --- a/RFS/src/test/java/com/rfs/worker/MetadataRunnerTest.java +++ b/RFS/src/test/java/com/rfs/worker/MetadataRunnerTest.java @@ -1,7 +1,7 @@ package com.rfs.worker; +import static org.junit.Assert.assertThrows; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.Mockito.*; import java.util.Optional; @@ -12,7 +12,6 @@ import com.rfs.cms.CmsClient; import com.rfs.common.GlobalMetadata; import com.rfs.common.RfsException; -import com.rfs.common.SnapshotCreator; import com.rfs.transformers.Transformer; import com.rfs.version_os_2_11.GlobalMetadataCreator_OS_2_11; @@ -32,21 +31,15 @@ void run_encountersAnException_asExpected() { doThrow(testException).when(cmsClient).getMetadataEntry(); when(globalState.getPhase()).thenReturn(GlobalState.Phase.METADATA_IN_PROGRESS); - + // Run the test MetadataRunner testRunner = new MetadataRunner(globalState, cmsClient, snapshotName, metadataFactory, metadataCreator, transformer); + final var e = assertThrows(MetadataRunner.MetadataMigrationPhaseFailed.class, () -> testRunner.run()); - // Run the test - try { - testRunner.run(); - } catch (MetadataRunner.MetadataMigrationPhaseFailed e) { - assertEquals(GlobalState.Phase.METADATA_IN_PROGRESS, e.phase); - assertEquals(null, e.nextStep); - assertEquals(Optional.empty(), e.cmsEntry); - assertEquals(testException, e.e); - - } catch (Exception e) { - fail("Unexpected exception thrown: " + e.getClass().getName()); - } + // Verify the results + assertEquals(GlobalState.Phase.METADATA_IN_PROGRESS, e.phase); + assertEquals(null, e.nextStep); + assertEquals(Optional.empty(), e.cmsEntry); + assertEquals(testException, e.e); } } \ No newline at end of file diff --git a/RFS/src/test/java/com/rfs/worker/MetadataStepTest.java b/RFS/src/test/java/com/rfs/worker/MetadataStepTest.java index 2a7bf66a9..3d0ec01f0 100644 --- a/RFS/src/test/java/com/rfs/worker/MetadataStepTest.java +++ b/RFS/src/test/java/com/rfs/worker/MetadataStepTest.java @@ -29,8 +29,6 @@ import com.rfs.worker.MetadataStep.SharedMembers; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.*; @@ -130,7 +128,7 @@ static Stream provideGetEntryArgs() { MetadataStep.ExitPhaseSuccess.class ), - // The CMS entry is marked as completed, so we exit as success + // The CMS entry is marked as failed, so we exit as faile Arguments.of( Optional.of(new CmsEntry.Metadata( CmsEntry.MetadataStatus.FAILED, @@ -233,7 +231,7 @@ void AcquireLease_AsExpected(Optional updatedEntry, Class testMembers.cmsEntry = existingEntry; Mockito.when(testMembers.cmsClient.updateMetadataEntry( - any(CmsEntry.MetadataStatus.class), anyString(), anyInt() + any(CmsEntry.Metadata.class), eq(existingEntry.get()) )).thenReturn(updatedEntry); // Run the test @@ -242,11 +240,14 @@ void AcquireLease_AsExpected(Optional updatedEntry, Class WorkerStep nextStep = testStep.nextStep(); // Check the results - Mockito.verify(testMembers.cmsClient, times(1)).updateMetadataEntry( + var expectedEntry = new CmsEntry.Metadata( CmsEntry.MetadataStatus.IN_PROGRESS, CmsEntry.Metadata.getLeaseExpiry(TestAcquireLease.milliSinceEpoch, CmsEntry.Metadata.MAX_ATTEMPTS), CmsEntry.Metadata.MAX_ATTEMPTS ); + Mockito.verify(testMembers.cmsClient, times(1)).updateMetadataEntry( + expectedEntry, existingEntry.get() + ); assertEquals(nextStepClass, nextStep.getClass()); } @@ -284,10 +285,14 @@ void MigrateTemplates_AsExpected(Optional updatedEntry, Class Mockito.when(testMembers.metadataFactory.fromRepo(testMembers.snapshotName)).thenReturn(testGlobalMetadata); Mockito.when(testGlobalMetadata.toObjectNode()).thenReturn(testNode); Mockito.when(testMembers.transformer.transformGlobalMetadata(testNode)).thenReturn(testTransformedNode); - Mockito.when(testMembers.cmsClient.updateMetadataEntry( + + var expectedEntry = new CmsEntry.Metadata( CmsEntry.MetadataStatus.COMPLETED, existingEntry.get().leaseExpiry, existingEntry.get().numAttempts + ); + Mockito.when(testMembers.cmsClient.updateMetadataEntry( + eq(expectedEntry), eq(existingEntry.get()) )).thenReturn(updatedEntry); @@ -317,9 +322,7 @@ void MigrateTemplates_AsExpected(Optional updatedEntry, Class testTransformedNode ); Mockito.verify(testMembers.cmsClient, times(1)).updateMetadataEntry( - CmsEntry.MetadataStatus.COMPLETED, - existingEntry.get().leaseExpiry, - existingEntry.get().numAttempts + expectedEntry, existingEntry.get() ); Mockito.verify(testMembers.globalState, times(1)).updateWorkItem( null @@ -384,11 +387,14 @@ void ExitPhaseFailed_AsExpected() { }); // Check the results - Mockito.verify(testMembers.cmsClient, times(1)).updateMetadataEntry( + var expectedEntry = new CmsEntry.Metadata( CmsEntry.MetadataStatus.FAILED, existingEntry.get().leaseExpiry, existingEntry.get().numAttempts ); + Mockito.verify(testMembers.cmsClient, times(1)).updateMetadataEntry( + expectedEntry, existingEntry.get() + ); Mockito.verify(testMembers.globalState, times(1)).updatePhase( GlobalState.Phase.METADATA_FAILED ); diff --git a/RFS/src/test/java/com/rfs/worker/SnapshotRunnerTest.java b/RFS/src/test/java/com/rfs/worker/SnapshotRunnerTest.java index 5cbd26b3e..33096a405 100644 --- a/RFS/src/test/java/com/rfs/worker/SnapshotRunnerTest.java +++ b/RFS/src/test/java/com/rfs/worker/SnapshotRunnerTest.java @@ -1,7 +1,7 @@ package com.rfs.worker; +import static org.junit.Assert.assertThrows; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.Mockito.*; import java.util.Optional; @@ -21,7 +21,6 @@ void run_encountersAnException_asExpected() { GlobalState globalState = mock(GlobalState.class); CmsClient cmsClient = mock(CmsClient.class); SnapshotCreator snapshotCreator = mock(SnapshotCreator.class); - SnapshotRunner testRunner = new SnapshotRunner(globalState, cmsClient, snapshotCreator); RfsException testException = new RfsException("Unit test"); doThrow(testException).when(cmsClient).getSnapshotEntry(snapshotName); @@ -29,17 +28,14 @@ void run_encountersAnException_asExpected() { when(snapshotCreator.getSnapshotName()).thenReturn(snapshotName); // Run the test - try { - testRunner.run(); - } catch (SnapshotRunner.SnapshotPhaseFailed e) { - assertEquals(GlobalState.Phase.SNAPSHOT_IN_PROGRESS, e.phase); - assertEquals(null, e.nextStep); - assertEquals(Optional.empty(), e.cmsEntry); - assertEquals(testException, e.e); - - } catch (Exception e) { - fail("Unexpected exception thrown: " + e.getClass().getName()); - } + SnapshotRunner testRunner = new SnapshotRunner(globalState, cmsClient, snapshotCreator); + final var e = assertThrows(SnapshotRunner.SnapshotPhaseFailed.class, () -> testRunner.run()); + + // Verify the results + assertEquals(GlobalState.Phase.SNAPSHOT_IN_PROGRESS, e.phase); + assertEquals(null, e.nextStep); + assertEquals(Optional.empty(), e.cmsEntry); + assertEquals(testException, e.e); } } diff --git a/RFS/src/test/java/com/rfs/worker/SnapshotStepTest.java b/RFS/src/test/java/com/rfs/worker/SnapshotStepTest.java index d944443c4..5d12cfe78 100644 --- a/RFS/src/test/java/com/rfs/worker/SnapshotStepTest.java +++ b/RFS/src/test/java/com/rfs/worker/SnapshotStepTest.java @@ -14,6 +14,7 @@ import org.mockito.junit.jupiter.MockitoExtension; import com.rfs.cms.CmsClient; +import com.rfs.cms.CmsEntry; import com.rfs.cms.CmsEntry.Snapshot; import com.rfs.cms.CmsEntry.SnapshotStatus; import com.rfs.common.SnapshotCreator; @@ -80,6 +81,12 @@ void CreateEntry_AsExpected() { @Test void InitiateSnapshot_AsExpected() { // Set up the test + var existingEntry = new CmsEntry.Snapshot( + "test", + SnapshotStatus.NOT_STARTED + ); + testMembers.cmsEntry = Optional.of(existingEntry); + when(testMembers.snapshotCreator.getSnapshotName()).thenReturn("test"); // Run the test @@ -90,7 +97,15 @@ void InitiateSnapshot_AsExpected() { // Check the results Mockito.verify(testMembers.snapshotCreator, times(1)).registerRepo(); Mockito.verify(testMembers.snapshotCreator, times(1)).createSnapshot(); - Mockito.verify(testMembers.cmsClient, times(1)).updateSnapshotEntry("test", SnapshotStatus.IN_PROGRESS); + + var expectedEntry = new CmsEntry.Snapshot( + "test", + SnapshotStatus.IN_PROGRESS + ); + + Mockito.verify(testMembers.cmsClient, times(1)).updateSnapshotEntry( + expectedEntry, existingEntry + ); assertEquals(SnapshotStep.WaitForSnapshot.class, nextStep.getClass()); } @@ -141,6 +156,12 @@ void WaitForSnapshot_failedSnapshot_AsExpected() { @Test void ExitPhaseSuccess_AsExpected() { // Set up the test + var existingEntry = new CmsEntry.Snapshot( + "test", + SnapshotStatus.IN_PROGRESS + ); + testMembers.cmsEntry = Optional.of(existingEntry); + when(testMembers.snapshotCreator.getSnapshotName()).thenReturn("test"); // Run the test @@ -149,7 +170,13 @@ void ExitPhaseSuccess_AsExpected() { WorkerStep nextStep = exitPhase.nextStep(); // Check the results - Mockito.verify(testMembers.cmsClient, times(1)).updateSnapshotEntry("test", SnapshotStatus.COMPLETED); + var expectedEntry = new CmsEntry.Snapshot( + "test", + SnapshotStatus.COMPLETED + ); + Mockito.verify(testMembers.cmsClient, times(1)).updateSnapshotEntry( + expectedEntry, existingEntry + ); Mockito.verify(testMembers.globalState, times(1)).updatePhase(GlobalState.Phase.SNAPSHOT_COMPLETED); assertEquals(null, nextStep); } @@ -157,6 +184,12 @@ void ExitPhaseSuccess_AsExpected() { @Test void ExitPhaseSnapshotFailed_AsExpected() { // Set up the test + var existingEntry = new CmsEntry.Snapshot( + "test", + SnapshotStatus.IN_PROGRESS + ); + testMembers.cmsEntry = Optional.of(existingEntry); + when(testMembers.snapshotCreator.getSnapshotName()).thenReturn("test"); SnapshotCreationFailed e = new SnapshotCreationFailed("test"); @@ -168,7 +201,13 @@ void ExitPhaseSnapshotFailed_AsExpected() { }); // Check the results - Mockito.verify(testMembers.cmsClient, times(1)).updateSnapshotEntry("test", SnapshotStatus.FAILED); + var expectedEntry = new CmsEntry.Snapshot( + "test", + SnapshotStatus.FAILED + ); + Mockito.verify(testMembers.cmsClient, times(1)).updateSnapshotEntry( + expectedEntry, existingEntry + ); Mockito.verify(testMembers.globalState, times(1)).updatePhase(GlobalState.Phase.SNAPSHOT_FAILED); } }