From 7a11bd54d3eec445ebb87b7e1ccb17f7f31c2ada Mon Sep 17 00:00:00 2001 From: Oguzhan Unlu Date: Wed, 28 Oct 2020 16:40:28 +0300 Subject: [PATCH] Common: fix number deserialization in API Request Enrichment (close #383) --- integration-tests/sce-api-lookup-test.py | 5 ++ .../registry/apirequest/Output.scala | 2 +- .../io/circe/jackson/enrich/package.scala | 44 ++++++++++++++ .../ApiRequestEnrichmentIntegrationTest.scala | 60 +++++++++++++++++++ .../apirequest/ApiRequestEnrichmentSpec.scala | 29 +++++++++ 5 files changed, 139 insertions(+), 1 deletion(-) create mode 100644 modules/common/src/main/scala/io/circe/jackson/enrich/package.scala diff --git a/integration-tests/sce-api-lookup-test.py b/integration-tests/sce-api-lookup-test.py index d6f0746bf..e19a8bb72 100644 --- a/integration-tests/sce-api-lookup-test.py +++ b/integration-tests/sce-api-lookup-test.py @@ -48,6 +48,11 @@ def do_GET(self): response = self.generate_response("GET") self.end_headers() self.wfile.write(response) + elif self.path.startswith("/geo"): + self.send_response(200) + response = json.dumps({"latitude":32.234,"longitude":33.564}) + self.end_headers() + self.wfile.write(response) else: self.wfile.write('not authenticated') diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/apirequest/Output.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/apirequest/Output.scala index 253db43d4..03471efdd 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/apirequest/Output.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/apirequest/Output.scala @@ -15,9 +15,9 @@ package enrichments.registry.apirequest import cats.syntax.either._ import io.circe._ -import io.circe.parser._ import io.circe.syntax._ +import io.circe.jackson.enrich.parse import utils.JsonPath.{query, wrapArray} /** diff --git a/modules/common/src/main/scala/io/circe/jackson/enrich/package.scala b/modules/common/src/main/scala/io/circe/jackson/enrich/package.scala new file mode 100644 index 000000000..ad839a9a2 --- /dev/null +++ b/modules/common/src/main/scala/io/circe/jackson/enrich/package.scala @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2012-2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ + +package io.circe.jackson + +import com.fasterxml.jackson.core.{JsonFactory, JsonParser} +import com.snowplowanalytics.snowplow.enrich.common.utils.CirceUtils +import io.circe.{Json, ParsingFailure} + +import scala.util.control.NonFatal + +/** + * This package is created to be able to change semantics of + * circe-jackson library in special cases by creating a custom + * ObjectMapper. + * + * These special cases include circe-jackson #217, #65 and #49 + * + * This package is to be removed either when jackson is replaced + * with another parser, possibly jawn, or when circe-jackson has + * a new version that we can use out of the box. + */ +package object enrich extends JacksonCompat { + + val jsonFactory: JsonFactory = new JsonFactory(CirceUtils.mapper) + + def jsonStringParser(input: String): JsonParser = jsonFactory.createParser(input) + + def parse(input: String): Either[ParsingFailure, Json] = + try Right(CirceUtils.mapper.readValue(jsonStringParser(input), classOf[Json])) + catch { + case NonFatal(error) => Left(ParsingFailure(error.getMessage, error)) + } +} diff --git a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/registry/apirequest/ApiRequestEnrichmentIntegrationTest.scala b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/registry/apirequest/ApiRequestEnrichmentIntegrationTest.scala index 2e7f815cb..0bf2e132c 100644 --- a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/registry/apirequest/ApiRequestEnrichmentIntegrationTest.scala +++ b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/registry/apirequest/ApiRequestEnrichmentIntegrationTest.scala @@ -18,6 +18,7 @@ import cats.Id import io.circe._ import io.circe.literal._ +import com.snowplowanalytics.iglu.client.CirceValidator import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData} import org.specs2.Specification @@ -41,6 +42,7 @@ class ApiRequestEnrichmentIntegrationTest extends Specification { This is a integration test for the ApiRequestEnrichment Basic Case $e1 POST, Auth, JSON inputs, cache, several outputs $e2 + API output with number field $e3 """ object IntegrationTests { @@ -229,6 +231,49 @@ class ApiRequestEnrichmentIntegrationTest extends Specification { ), json"""{"name": "Snowplow ETL", "jobflow_id": "j-ZKIY4CKQRX72", "state": "RUNNING", "created_at": "2016-01-21T13:14:10.193+03:00"}""" ) + + val configuration3 = json"""{ + "vendor": "com.snowplowanalytics.snowplow.enrichments", + "name": "api_request_enrichment_config", + "enabled": true, + "parameters": { + "inputs": [ + { + "key": "ip", + "pojo": { + "field": "user_ipaddress" + } + } + ], + "api": { + "http": { + "method": "GET", + "uri": "http://localhost:8001/geo/{{ip}}?format=json", + "timeout": 5000, + "authentication": {} + } + }, + "outputs": [{ + "schema": "iglu:com.snowplowanalytics.snowplow/geolocation_context/jsonschema/1-0-0", + "json": { + "jsonPath": "$$" + } + }], + "cache": { + "size": 3000, + "ttl": 60 + } + } + }""" + + val correctResultContext4 = + SelfDescribingData( + SchemaKey("com.snowplowanalytics.snowplow", "geolocation_context", "jsonschema", SchemaVer.Full(1, 0, 0)), + json"""{"latitude":32.234,"longitude":33.564}""" + ) + + val schema = + json"""{ "type": "object", "properties": { "latitude": { "type": [ "number" ] }, "longitude": { "type": [ "number" ] } }, "additionalProperties": false }""" } val SCHEMA_KEY = @@ -314,4 +359,19 @@ class ApiRequestEnrichmentIntegrationTest extends Specification { ) and (contexts must have size 2) } } + + def e3 = { + val enrichment = ApiRequestEnrichment + .parse(IntegrationTests.configuration3, SCHEMA_KEY) + .map(_.enrichment[Id]) + .toEither + val event = new EnrichedEvent + event.setUser_ipaddress("127.0.0.1") + val context = enrichment.flatMap(_.lookup(event, Nil, Nil, None).toEither) + context must beRight.like { + case contexts => + (contexts must have size 1) and (contexts must contain(IntegrationTests.correctResultContext4)) and + (CirceValidator.validate(contexts.head.data, IntegrationTests.schema) must beRight) + } + } } diff --git a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/registry/apirequest/ApiRequestEnrichmentSpec.scala b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/registry/apirequest/ApiRequestEnrichmentSpec.scala index eaf36ba21..b526ca12d 100644 --- a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/registry/apirequest/ApiRequestEnrichmentSpec.scala +++ b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/registry/apirequest/ApiRequestEnrichmentSpec.scala @@ -37,6 +37,7 @@ class ApiRequestEnrichmentSpec extends Specification with ValidatedMatchers with skip incorrect input (none of json or pojo) in configuration $e2 skip incorrect input (both json and pojo) in configuration $e3 extract correct configuration for POST request and perform the request $e4 + parse API output with number field successfully $e5 """ val SCHEMA_KEY = @@ -401,4 +402,32 @@ class ApiRequestEnrichmentSpec extends Specification with ValidatedMatchers with validConfig and validResult } + + def e5 = { + val inputs = List() + val api = + HttpApi( + "GET", + "http://api.acme.com/geo?format=json", + 1000, + Authentication(None) + ) + implicit val idHttpClient: HttpClient[Id] = new HttpClient[Id] { + override def getResponse(request: HttpRequest): Id[Either[Throwable, String]] = + """{"latitude":32.234,"longitude":33.564}""".asRight + } + val output = Output("iglu:com.acme/geo/jsonschema/1-0-0", Some(JsonOutput("$"))) + val cache = Cache(3000, 60) + val config = ApiRequestConf(SCHEMA_KEY, inputs, api, List(output), cache) + + val expectedDerivation = + SelfDescribingData( + SchemaKey("com.acme", "geo", "jsonschema", SchemaVer.Full(1, 0, 0)), + json"""{"latitude": 32.234, "longitude": 33.564}""" + ) + + val enrichedContextResult = config.enrichment[Id].lookup(new EnrichedEvent, Nil, Nil, None) + + enrichedContextResult must beValid(List(expectedDerivation)) + } }