Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove ingest processor supports excluding fields #10967

Merged
merged 15 commits into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add search query categorizer ([#10255](https://github.com/opensearch-project/OpenSearch/pull/10255))
- Per request phase latency ([#10351](https://github.com/opensearch-project/OpenSearch/issues/10351))
- Add cluster state stats ([#10670](https://github.com/opensearch-project/OpenSearch/pull/10670))
- Remove ingest processor supports excluding fields ([#10967](https://github.com/opensearch-project/OpenSearch/pull/10967))
- [Tiered caching] Enabling serialization for IndicesRequestCache key object ([#10275](https://github.com/opensearch-project/OpenSearch/pull/10275))
- [Tiered caching] Defining interfaces, listeners and extending IndicesRequestCache with Tiered cache support ([#10753](https://github.com/opensearch-project/OpenSearch/pull/10753))
- [Remote cluster state] Restore cluster state version during remote state auto restore ([#10853](https://github.com/opensearch-project/OpenSearch/pull/10853))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.ingest.common;

import org.opensearch.common.Nullable;
import org.opensearch.core.common.Strings;
import org.opensearch.index.VersionType;
import org.opensearch.ingest.AbstractProcessor;
Expand All @@ -42,11 +43,15 @@
import org.opensearch.script.TemplateScript;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

import static org.opensearch.ingest.ConfigurationUtils.newConfigurationException;

/**
* Processor that removes existing fields. Nothing happens if the field is not present.
*/
Expand All @@ -55,54 +60,105 @@
public static final String TYPE = "remove";

private final List<TemplateScript.Factory> fields;
private final List<TemplateScript.Factory> excludeFields;
private final boolean ignoreMissing;

RemoveProcessor(String tag, String description, List<TemplateScript.Factory> fields, boolean ignoreMissing) {
RemoveProcessor(
reta marked this conversation as resolved.
Show resolved Hide resolved
String tag,
String description,
@Nullable List<TemplateScript.Factory> fields,
@Nullable List<TemplateScript.Factory> excludeFields,
boolean ignoreMissing
) {
super(tag, description);
this.fields = new ArrayList<>(fields);
if (fields == null && excludeFields == null || fields != null && excludeFields != null) {
gaobinlong marked this conversation as resolved.
Show resolved Hide resolved
throw new IllegalArgumentException("ether fields and excludeFields must be set");
}
if (fields != null) {
this.fields = new ArrayList<>(fields);
this.excludeFields = null;
} else {
this.fields = null;
this.excludeFields = new ArrayList<>(excludeFields);
}

this.ignoreMissing = ignoreMissing;
}

public List<TemplateScript.Factory> getFields() {
return fields;
}

public List<TemplateScript.Factory> getExcludeFields() {
return excludeFields;
}

@Override
public IngestDocument execute(IngestDocument document) {
fields.forEach(field -> {
String path = document.renderTemplate(field);
final boolean fieldPathIsNullOrEmpty = Strings.isNullOrEmpty(path);
if (fieldPathIsNullOrEmpty || document.hasField(path) == false) {
if (ignoreMissing) {
return;
} else if (fieldPathIsNullOrEmpty) {
throw new IllegalArgumentException("field path cannot be null nor empty");
} else {
throw new IllegalArgumentException("field [" + path + "] doesn't exist");
if (fields != null && !fields.isEmpty()) {
fields.forEach(field -> {
String path = document.renderTemplate(field);
final boolean fieldPathIsNullOrEmpty = Strings.isNullOrEmpty(path);
if (fieldPathIsNullOrEmpty || document.hasField(path) == false) {
if (ignoreMissing) {
return;
} else if (fieldPathIsNullOrEmpty) {
throw new IllegalArgumentException("field path cannot be null nor empty");
} else {
throw new IllegalArgumentException("field [" + path + "] doesn't exist");
}
}
}
// cannot remove _index, _version and _version_type.
if (path.equals(IngestDocument.Metadata.INDEX.getFieldName())
|| path.equals(IngestDocument.Metadata.VERSION.getFieldName())
|| path.equals(IngestDocument.Metadata.VERSION_TYPE.getFieldName())) {
throw new IllegalArgumentException("cannot remove metadata field [" + path + "]");
}
// removing _id is disallowed when there's an external version specified in the request
if (path.equals(IngestDocument.Metadata.ID.getFieldName())
&& document.hasField(IngestDocument.Metadata.VERSION_TYPE.getFieldName())) {
String versionType = document.getFieldValue(IngestDocument.Metadata.VERSION_TYPE.getFieldName(), String.class);
if (!Objects.equals(versionType, VersionType.toString(VersionType.INTERNAL))) {
Long version = document.getFieldValue(IngestDocument.Metadata.VERSION.getFieldName(), Long.class, true);
throw new IllegalArgumentException(
"cannot remove metadata field [_id] when specifying external version for the document, version: "
+ version
+ ", version_type: "
+ versionType
);

// cannot remove _index, _version and _version_type.
if (path.equals(IngestDocument.Metadata.INDEX.getFieldName())
|| path.equals(IngestDocument.Metadata.VERSION.getFieldName())
|| path.equals(IngestDocument.Metadata.VERSION_TYPE.getFieldName())) {
throw new IllegalArgumentException("cannot remove metadata field [" + path + "]");
}
// removing _id is disallowed when there's an external version specified in the request
if (path.equals(IngestDocument.Metadata.ID.getFieldName())
&& document.hasField(IngestDocument.Metadata.VERSION_TYPE.getFieldName())) {
String versionType = document.getFieldValue(IngestDocument.Metadata.VERSION_TYPE.getFieldName(), String.class);
if (!Objects.equals(versionType, VersionType.toString(VersionType.INTERNAL))) {
Long version = document.getFieldValue(IngestDocument.Metadata.VERSION.getFieldName(), Long.class, true);
throw new IllegalArgumentException(
"cannot remove metadata field [_id] when specifying external version for the document, version: "
+ version
+ ", version_type: "
+ versionType
);
}
}
document.removeField(path);
});
}

if (excludeFields != null && !excludeFields.isEmpty()) {
Set<String> excludeFieldSet = new HashSet<>();
excludeFields.forEach(field -> {
String path = document.renderTemplate(field);
// ignore the empty or null field path
if (!Strings.isNullOrEmpty(path)) {
excludeFieldSet.add(path);
}
});

if (!excludeFieldSet.isEmpty()) {
Set<String> existingFields = new HashSet<>(document.getSourceAndMetadata().keySet());
Set<String> metadataFields = document.getMetadata()
.keySet()
.stream()
.map(IngestDocument.Metadata::getFieldName)
.collect(Collectors.toSet());
existingFields.forEach(field -> {
// ignore metadata fields such as _index, _id, etc.
if (!metadataFields.contains(field) && !excludeFieldSet.contains(field)) {
document.removeField(field);
}
});
}
document.removeField(path);
});
}

return document;
}

Expand All @@ -127,20 +183,41 @@
Map<String, Object> config
) throws Exception {
final List<String> fields = new ArrayList<>();
final Object field = ConfigurationUtils.readObject(TYPE, processorTag, config, "field");
if (field instanceof List) {
@SuppressWarnings("unchecked")
List<String> stringList = (List<String>) field;
fields.addAll(stringList);
} else {
fields.add((String) field);
final List<String> excludeFields = new ArrayList<>();
final Object field = ConfigurationUtils.readOptionalObject(config, "field");
final Object excludeField = ConfigurationUtils.readOptionalObject(config, "exclude_field");

if (field == null && excludeField == null || field != null && excludeField != null) {
throw newConfigurationException(TYPE, processorTag, "field", "ether field or exclude_field must be set");
}

final List<TemplateScript.Factory> compiledTemplates = fields.stream()
.map(f -> ConfigurationUtils.compileTemplate(TYPE, processorTag, "field", f, scriptService))
.collect(Collectors.toList());
boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false);
return new RemoveProcessor(processorTag, description, compiledTemplates, ignoreMissing);

if (field != null) {
if (field instanceof List) {
@SuppressWarnings("unchecked")
List<String> stringList = (List<String>) field;
fields.addAll(stringList);
} else {
fields.add((String) field);
}
List<TemplateScript.Factory> fieldCompiledTemplates = fields.stream()
.map(f -> ConfigurationUtils.compileTemplate(TYPE, processorTag, "field", f, scriptService))
.collect(Collectors.toList());
return new RemoveProcessor(processorTag, description, fieldCompiledTemplates, null, ignoreMissing);
} else {
if (excludeField instanceof List) {
@SuppressWarnings("unchecked")
List<String> stringList = (List<String>) excludeField;
excludeFields.addAll(stringList);
} else {

Check warning on line 213 in modules/ingest-common/src/main/java/org/opensearch/ingest/common/RemoveProcessor.java

View check run for this annotation

Codecov / codecov/patch

modules/ingest-common/src/main/java/org/opensearch/ingest/common/RemoveProcessor.java#L211-L213

Added lines #L211 - L213 were not covered by tests
excludeFields.add((String) excludeField);
}
List<TemplateScript.Factory> excludeFieldCompiledTemplates = excludeFields.stream()
.map(f -> ConfigurationUtils.compileTemplate(TYPE, processorTag, "exclude_field", f, scriptService))
.collect(Collectors.toList());
return new RemoveProcessor(processorTag, description, null, excludeFieldCompiledTemplates, ignoreMissing);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -79,16 +80,6 @@ public void testCreateMultipleFields() throws Exception {
);
}

public void testCreateMissingField() throws Exception {
Map<String, Object> config = new HashMap<>();
try {
factory.create(null, null, null, config);
fail("factory create should have failed");
} catch (OpenSearchParseException e) {
assertThat(e.getMessage(), equalTo("[field] required property is missing"));
}
}

public void testInvalidMustacheTemplate() throws Exception {
RemoveProcessor.Factory factory = new RemoveProcessor.Factory(TestTemplateService.instance(true));
Map<String, Object> config = new HashMap<>();
Expand All @@ -98,4 +89,31 @@ public void testInvalidMustacheTemplate() throws Exception {
assertThat(exception.getMessage(), equalTo("java.lang.RuntimeException: could not compile script"));
assertThat(exception.getMetadata("opensearch.processor_tag").get(0), equalTo(processorTag));
}

public void testCreateWithExcludeField() throws Exception {
Map<String, Object> config = new HashMap<>();
String processorTag = randomAlphaOfLength(10);
OpenSearchException exception = expectThrows(
OpenSearchParseException.class,
() -> factory.create(null, processorTag, null, config)
);
assertThat(exception.getMessage(), equalTo("[field] ether field or exclude_field must be set"));

Map<String, Object> config2 = new HashMap<>();
config2.put("field", "field1");
config2.put("exclude_field", "field2");
exception = expectThrows(OpenSearchParseException.class, () -> factory.create(null, processorTag, null, config2));
assertThat(exception.getMessage(), equalTo("[field] ether field or exclude_field must be set"));

Map<String, Object> config6 = new HashMap<>();
config6.put("exclude_field", "exclude_field");
RemoveProcessor removeProcessor = factory.create(null, processorTag, null, config6);
assertThat(
removeProcessor.getExcludeFields()
.stream()
.map(template -> template.newInstance(Collections.emptyMap()).execute())
.collect(Collectors.toList()),
equalTo(List.of("exclude_field"))
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@
import org.opensearch.ingest.Processor;
import org.opensearch.ingest.RandomDocumentPicks;
import org.opensearch.ingest.TestTemplateService;
import org.opensearch.script.TemplateScript;
import org.opensearch.test.OpenSearchTestCase;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand All @@ -57,12 +59,28 @@ public void testRemoveFields() throws Exception {
randomAlphaOfLength(10),
null,
Collections.singletonList(new TestTemplateService.MockTemplateScript.Factory(field)),
null,
false
);
processor.execute(ingestDocument);
assertThat(ingestDocument.hasField(field), equalTo(false));
}

public void testRemoveByExcludeFields() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
ingestDocument.setFieldValue("foo_1", "value");
ingestDocument.setFieldValue("foo_2", "value");
ingestDocument.setFieldValue("foo_3", "value");
List<TemplateScript.Factory> excludeFields = new ArrayList<>();
excludeFields.add(new TestTemplateService.MockTemplateScript.Factory("foo_1"));
excludeFields.add(new TestTemplateService.MockTemplateScript.Factory("foo_2"));
Processor processor = new RemoveProcessor(randomAlphaOfLength(10), null, null, excludeFields, false);
processor.execute(ingestDocument);
assertThat(ingestDocument.hasField("foo_1"), equalTo(true));
assertThat(ingestDocument.hasField("foo_2"), equalTo(true));
assertThat(ingestDocument.hasField("foo_3"), equalTo(false));
}

public void testRemoveNonExistingField() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
String fieldName = RandomDocumentPicks.randomFieldName(random());
Expand Down Expand Up @@ -183,6 +201,34 @@ public void testRemoveMetadataField() throws Exception {
}
}

public void testCreateRemoveProcessorWithBothFieldsAndExcludeFields() throws Exception {
assertThrows(
"ether fields and excludeFields must be set",
IllegalArgumentException.class,
() -> new RemoveProcessor(randomAlphaOfLength(10), null, null, null, false)
);

final List<TemplateScript.Factory> fields;
if (randomBoolean()) {
fields = new ArrayList<>();
} else {
fields = List.of(new TestTemplateService.MockTemplateScript.Factory("foo_1"));
}

final List<TemplateScript.Factory> excludeFields;
if (randomBoolean()) {
excludeFields = new ArrayList<>();
} else {
excludeFields = List.of(new TestTemplateService.MockTemplateScript.Factory("foo_2"));
}

assertThrows(
"ether fields and excludeFields must be set",
IllegalArgumentException.class,
() -> new RemoveProcessor(randomAlphaOfLength(10), null, fields, excludeFields, false)
);
}

public void testRemoveDocumentId() throws Exception {
Map<String, Object> config = new HashMap<>();
config.put("field", IngestDocument.Metadata.ID.getFieldName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,3 +319,43 @@ teardown:
}
- match: { docs.0.error.type: "illegal_argument_exception" }
- match: { docs.0.error.reason: "cannot remove metadata field [_id] when specifying external version for the document, version: 1, version_type: external_gte" }

# Related issue: https://github.com/opensearch-project/OpenSearch/issues/1578
---
"Test remove processor with exclude_field":
- skip:
version: " - 2.11.99"
reason: "exclude_field is introduced in 2.12"
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"remove" : {
"exclude_field": "bar"
}
}
]
}
- match: { acknowledged: true }

- do:
index:
index: test
id: 1
pipeline: "my_pipeline"
body: {
foo1: "bar",
foo2: "bar",
bar: "zoo",
zoo: "bar"
}

- do:
get:
index: test
id: 1
- match: { _source: { bar: "zoo"}}
Loading
Loading