From 910eb1149da59e751cab88e1eacdfbf67c2eb13a Mon Sep 17 00:00:00 2001 From: Gao Binlong Date: Wed, 13 Mar 2024 13:56:48 +0800 Subject: [PATCH] Convert ingest processor supports ip type Signed-off-by: Gao Binlong --- CHANGELOG.md | 1 + .../ingest/common/ConvertProcessor.java | 11 +++ .../ingest/common/ConvertProcessorTests.java | 25 ++++++ .../test/ingest/330_convert_processor.yml | 83 +++++++++++++++++++ 4 files changed, 120 insertions(+) create mode 100644 modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/330_convert_processor.yml diff --git a/CHANGELOG.md b/CHANGELOG.md index c65b20b99bc25..6c3ba6d1163a2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -121,6 +121,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Introduce a new setting `index.check_pending_flush.enabled` to expose the ability to disable the check for pending flushes by write threads ([#12710](https://github.com/opensearch-project/OpenSearch/pull/12710)) - Built-in secure transports support ([#12435](https://github.com/opensearch-project/OpenSearch/pull/12435)) - Lightweight Transport action to verify local term before fetching cluster-state from remote ([#12252](https://github.com/opensearch-project/OpenSearch/pull/12252/)) +- Convert ingest processor supports ip type ### Dependencies - Bump `peter-evans/find-comment` from 2 to 3 ([#12288](https://github.com/opensearch-project/OpenSearch/pull/12288)) diff --git a/modules/ingest-common/src/main/java/org/opensearch/ingest/common/ConvertProcessor.java b/modules/ingest-common/src/main/java/org/opensearch/ingest/common/ConvertProcessor.java index 2a81fa5f4986e..d3df6ef89ce77 100644 --- a/modules/ingest-common/src/main/java/org/opensearch/ingest/common/ConvertProcessor.java +++ b/modules/ingest-common/src/main/java/org/opensearch/ingest/common/ConvertProcessor.java @@ -32,6 +32,7 @@ package org.opensearch.ingest.common; +import org.opensearch.common.network.InetAddresses; import org.opensearch.ingest.AbstractProcessor; import org.opensearch.ingest.ConfigurationUtils; import org.opensearch.ingest.IngestDocument; @@ -118,6 +119,16 @@ public Object convert(Object value) { return value.toString(); } }, + IP { + @Override + public Object convert(Object value) { + if (value instanceof String && InetAddresses.isInetAddress(value.toString())) { + return value; + } else { + throw new IllegalArgumentException("[" + value + "] is not a valid ipv4/ipv6 address"); + } + } + }, AUTO { @Override public Object convert(Object value) { diff --git a/modules/ingest-common/src/test/java/org/opensearch/ingest/common/ConvertProcessorTests.java b/modules/ingest-common/src/test/java/org/opensearch/ingest/common/ConvertProcessorTests.java index 0ba0a39261d00..50ece9282888f 100644 --- a/modules/ingest-common/src/test/java/org/opensearch/ingest/common/ConvertProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/opensearch/ingest/common/ConvertProcessorTests.java @@ -550,4 +550,29 @@ public void testTargetField() throws Exception { assertThat(ingestDocument.getFieldValue(fieldName, String.class), equalTo(String.valueOf(randomInt))); assertThat(ingestDocument.getFieldValue(targetField, Integer.class), equalTo(randomInt)); } + + public void testConvertIP() throws Exception { + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); + String validIPString; + if (randomBoolean()) { + validIPString = "1.2.3.4"; + } else { + validIPString = "::1"; + } + String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, validIPString); + + Processor processor = new ConvertProcessor(randomAlphaOfLength(10), null, fieldName, fieldName, Type.IP, false); + processor.execute(ingestDocument); + assertThat(ingestDocument.getFieldValue(fieldName, String.class), equalTo(validIPString)); + + String invalidIPString = randomAlphaOfLength(10); + fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, invalidIPString); + Processor processorWithInvalidIP = new ConvertProcessor(randomAlphaOfLength(10), null, fieldName, fieldName, Type.IP, false); + try { + processorWithInvalidIP.execute(ingestDocument); + fail("processor execute should have failed"); + } catch (IllegalArgumentException e) { + assertThat(e.getMessage(), equalTo("[" + invalidIPString + "] is not a valid ipv4/ipv6 address")); + } + } } diff --git a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/330_convert_processor.yml b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/330_convert_processor.yml new file mode 100644 index 0000000000000..994ed225dd624 --- /dev/null +++ b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/330_convert_processor.yml @@ -0,0 +1,83 @@ +--- +teardown: + - do: + ingest.delete_pipeline: + id: "1" + ignore: 404 + +--- +"Test convert processor with ip type": + - skip: + version: " - 2.13.99" + reason: "introduced in 2.14.0" + - do: + ingest.put_pipeline: + id: "1" + body: > + { + "processors": [ + { + "convert" : { + "field" : "raw_ip", + "type": "ip" + } + } + ] + } + - match: { acknowledged: true } + + - do: + catch: /\[1.1.1.\] is not a valid ipv4\/ipv6 address/ + index: + index: test + id: 1 + pipeline: "1" + body: { + raw_ip: "1.1.1." + } + + - do: + ingest.put_pipeline: + id: "1" + body: > + { + "processors": [ + { + "convert" : { + "field" : "raw_ip", + "target_field" : "ip_field", + "type" : "ip", + "ignore_failure" : true + } + } + ] + } + - match: { acknowledged: true } + + - do: + index: + index: test + id: 1 + pipeline: "1" + body: { + raw_ip: "1.1.1." + } + - do: + get: + index: test + id: 1 + - match: { _source: { raw_ip: "1.1.1."} } + + - do: + index: + index: test + id: 1 + pipeline: "1" + body: { + raw_ip: "1.1.1.1" + } + - do: + get: + index: test + id: 1 + - match: { _source: { raw_ip: "1.1.1.1", ip_field: "1.1.1.1"} }