From c76dbdcafc45686d07e771b9010a8f0f71820fc0 Mon Sep 17 00:00:00 2001 From: Martin Gaievski Date: Wed, 15 Nov 2023 12:01:47 -0800 Subject: [PATCH] Passing IndicesService to ingest processor factory with processor params (#10307) * Passing IngestService to processor factory with processor params Signed-off-by: Martin Gaievski * Fixed typo in the changlelog entry for this PR Signed-off-by: Martin Gaievski --------- Signed-off-by: Martin Gaievski Signed-off-by: Shivansh Arora --- CHANGELOG.md | 1 + .../org/opensearch/ingest/IngestService.java | 7 +++++-- .../java/org/opensearch/ingest/Processor.java | 7 ++++++- .../main/java/org/opensearch/node/Node.java | 20 ++++++++++--------- .../opensearch/ingest/IngestServiceTests.java | 15 +++++++++----- .../snapshots/SnapshotResiliencyTests.java | 3 ++- 6 files changed, 35 insertions(+), 18 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 99c31f00278b8..202e0fa910c55 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -103,6 +103,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add cluster state stats ([#10670](https://github.com/opensearch-project/OpenSearch/pull/10670)) - Adding slf4j license header to LoggerMessageFormat.java ([#11069](https://github.com/opensearch-project/OpenSearch/pull/11069)) - [Streaming Indexing] Introduce new experimental server HTTP transport based on Netty 4 and Project Reactor (Reactor Netty) ([#9672](https://github.com/opensearch-project/OpenSearch/pull/9672)) +- Allowing pipeline processors to access index mapping info by passing ingest service ref as part of the processor factory parameters ([#10307](https://github.com/opensearch-project/OpenSearch/pull/10307)) ### Dependencies - Bump `com.google.api.grpc:proto-google-common-protos` from 2.10.0 to 2.25.1 ([#10208](https://github.com/opensearch-project/OpenSearch/pull/10208), [#10298](https://github.com/opensearch-project/OpenSearch/pull/10298)) diff --git a/server/src/main/java/org/opensearch/ingest/IngestService.java b/server/src/main/java/org/opensearch/ingest/IngestService.java index baf357a4bc0d5..2d4439e86461b 100644 --- a/server/src/main/java/org/opensearch/ingest/IngestService.java +++ b/server/src/main/java/org/opensearch/ingest/IngestService.java @@ -72,6 +72,7 @@ import org.opensearch.index.IndexSettings; import org.opensearch.index.VersionType; import org.opensearch.index.analysis.AnalysisRegistry; +import org.opensearch.indices.IndicesService; import org.opensearch.plugins.IngestPlugin; import org.opensearch.script.ScriptService; import org.opensearch.threadpool.ThreadPool; @@ -128,7 +129,8 @@ public IngestService( ScriptService scriptService, AnalysisRegistry analysisRegistry, List ingestPlugins, - Client client + Client client, + IndicesService indicesService ) { this.clusterService = clusterService; this.scriptService = scriptService; @@ -143,7 +145,8 @@ public IngestService( (delay, command) -> threadPool.schedule(command, TimeValue.timeValueMillis(delay), ThreadPool.Names.GENERIC), this, client, - threadPool.generic()::execute + threadPool.generic()::execute, + indicesService ) ); this.threadPool = threadPool; diff --git a/server/src/main/java/org/opensearch/ingest/Processor.java b/server/src/main/java/org/opensearch/ingest/Processor.java index 6097045a87e21..ecae1c139ea5e 100644 --- a/server/src/main/java/org/opensearch/ingest/Processor.java +++ b/server/src/main/java/org/opensearch/ingest/Processor.java @@ -36,6 +36,7 @@ import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.env.Environment; import org.opensearch.index.analysis.AnalysisRegistry; +import org.opensearch.indices.IndicesService; import org.opensearch.script.ScriptService; import org.opensearch.threadpool.Scheduler; @@ -156,6 +157,8 @@ class Parameters { */ public final Client client; + public final IndicesService indicesService; + public Parameters( Environment env, ScriptService scriptService, @@ -165,7 +168,8 @@ public Parameters( BiFunction scheduler, IngestService ingestService, Client client, - Consumer genericExecutor + Consumer genericExecutor, + IndicesService indicesService ) { this.env = env; this.scriptService = scriptService; @@ -176,6 +180,7 @@ public Parameters( this.ingestService = ingestService; this.client = client; this.genericExecutor = genericExecutor; + this.indicesService = indicesService; } } diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 6e8e327d51fef..ca72264f433b5 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -630,15 +630,6 @@ protected Node( metricsRegistry = metricsRegistryFactory.getMetricsRegistry(); resourcesToClose.add(tracer::close); resourcesToClose.add(metricsRegistry::close); - final IngestService ingestService = new IngestService( - clusterService, - threadPool, - this.environment, - scriptService, - analysisModule.getAnalysisRegistry(), - pluginsService.filterPlugins(IngestPlugin.class), - client - ); final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client); final UsageService usageService = new UsageService(); @@ -824,6 +815,17 @@ protected Node( recoverySettings ); + final IngestService ingestService = new IngestService( + clusterService, + threadPool, + this.environment, + scriptService, + analysisModule.getAnalysisRegistry(), + pluginsService.filterPlugins(IngestPlugin.class), + client, + indicesService + ); + final AliasValidator aliasValidator = new AliasValidator(); final ShardLimitValidator shardLimitValidator = new ShardLimitValidator(settings, clusterService, systemIndices); diff --git a/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java b/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java index 1dfd41e96817e..2edfe87387c92 100644 --- a/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java @@ -67,6 +67,7 @@ import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.index.IndexSettings; import org.opensearch.index.VersionType; +import org.opensearch.indices.IndicesService; import org.opensearch.plugins.IngestPlugin; import org.opensearch.script.MockScriptEngine; import org.opensearch.script.Script; @@ -149,7 +150,8 @@ public void testIngestPlugin() { null, null, Collections.singletonList(DUMMY_PLUGIN), - client + client, + mock(IndicesService.class) ); Map factories = ingestService.getProcessorFactories(); assertTrue(factories.containsKey("foo")); @@ -167,7 +169,8 @@ public void testIngestPluginDuplicate() { null, null, Arrays.asList(DUMMY_PLUGIN, DUMMY_PLUGIN), - client + client, + mock(IndicesService.class) ) ); assertTrue(e.getMessage(), e.getMessage().contains("already registered")); @@ -182,7 +185,8 @@ public void testExecuteIndexPipelineDoesNotExist() { null, null, Collections.singletonList(DUMMY_PLUGIN), - client + client, + mock(IndicesService.class) ); final IndexRequest indexRequest = new IndexRequest("_index").id("_id") .source(emptyMap()) @@ -1485,7 +1489,8 @@ public Map getProcessors(Processor.Parameters paramet null, null, Arrays.asList(testPlugin), - client + client, + mock(IndicesService.class) ); ingestService.addIngestClusterStateListener(ingestClusterStateListener); @@ -1702,7 +1707,7 @@ private static IngestService createWithProcessors(Map public Map getProcessors(final Processor.Parameters parameters) { return processors; } - }), client); + }), client, mock(IndicesService.class)); } private CompoundProcessor mockCompoundProcessor() { diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index b7a2baacba611..352eeb779599c 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -2209,7 +2209,8 @@ public void onFailure(final Exception e) { scriptService, new AnalysisModule(environment, Collections.emptyList()).getAnalysisRegistry(), Collections.emptyList(), - client + client, + indicesService ), transportShardBulkAction, client,