diff --git a/integration-tests/sce-api-lookup-test.py b/integration-tests/sce-api-lookup-test.py index d6f0746bf..e19a8bb72 100644 --- a/integration-tests/sce-api-lookup-test.py +++ b/integration-tests/sce-api-lookup-test.py @@ -48,6 +48,11 @@ def do_GET(self): response = self.generate_response("GET") self.end_headers() self.wfile.write(response) + elif self.path.startswith("/geo"): + self.send_response(200) + response = json.dumps({"latitude":32.234,"longitude":33.564}) + self.end_headers() + self.wfile.write(response) else: self.wfile.write('not authenticated') diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/apirequest/Output.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/apirequest/Output.scala index 253db43d4..15bb50193 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/apirequest/Output.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/apirequest/Output.scala @@ -14,12 +14,12 @@ package com.snowplowanalytics.snowplow.enrich.common package enrichments.registry.apirequest import cats.syntax.either._ +import com.fasterxml.jackson.core.JsonFactory +import com.snowplowanalytics.snowplow.enrich.common.utils.CirceUtils +import com.snowplowanalytics.snowplow.enrich.common.utils.JsonPath.{query, wrapArray} import io.circe._ -import io.circe.parser._ import io.circe.syntax._ -import utils.JsonPath.{query, wrapArray} - /** * Base trait for API output format. Primary intention of these classes is to perform transformation * of API raw output to self-describing JSON instance @@ -117,6 +117,7 @@ final case class JsonOutput(jsonPath: String) extends ApiOutput[Json] { case other => other.leftMap(JsonPathException.apply) } + private[this] val jsonFactory: JsonFactory = new JsonFactory(CirceUtils.mapper) def parseResponse(response: String): Either[Throwable, Json] = - parse(response) + Either.catchNonFatal[Json](CirceUtils.mapper.readValue(jsonFactory.createParser(response), classOf[Json])) } diff --git a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/registry/apirequest/ApiRequestEnrichmentIntegrationTest.scala b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/registry/apirequest/ApiRequestEnrichmentIntegrationTest.scala index 2e7f815cb..9b04a0129 100644 --- a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/registry/apirequest/ApiRequestEnrichmentIntegrationTest.scala +++ b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/registry/apirequest/ApiRequestEnrichmentIntegrationTest.scala @@ -14,17 +14,14 @@ package com.snowplowanalytics.snowplow.enrich.common package enrichments.registry.apirequest import cats.Id - +import com.snowplowanalytics.iglu.client.CirceValidator +import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData} +import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent import io.circe._ import io.circe.literal._ - -import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData} - import org.specs2.Specification import org.specs2.matcher.Matcher -import outputs.EnrichedEvent - object ApiRequestEnrichmentIntegrationTest { def continuousIntegration: Boolean = sys.env.get("CI") match { @@ -33,7 +30,7 @@ object ApiRequestEnrichmentIntegrationTest { } } -import ApiRequestEnrichmentIntegrationTest._ +import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.apirequest.ApiRequestEnrichmentIntegrationTest._ class ApiRequestEnrichmentIntegrationTest extends Specification { def is = skipAllUnless(continuousIntegration) ^ @@ -41,6 +38,7 @@ class ApiRequestEnrichmentIntegrationTest extends Specification { This is a integration test for the ApiRequestEnrichment Basic Case $e1 POST, Auth, JSON inputs, cache, several outputs $e2 + API output with number field $e3 """ object IntegrationTests { @@ -229,6 +227,49 @@ class ApiRequestEnrichmentIntegrationTest extends Specification { ), json"""{"name": "Snowplow ETL", "jobflow_id": "j-ZKIY4CKQRX72", "state": "RUNNING", "created_at": "2016-01-21T13:14:10.193+03:00"}""" ) + + val configuration3 = json"""{ + "vendor": "com.snowplowanalytics.snowplow.enrichments", + "name": "api_request_enrichment_config", + "enabled": true, + "parameters": { + "inputs": [ + { + "key": "ip", + "pojo": { + "field": "user_ipaddress" + } + } + ], + "api": { + "http": { + "method": "GET", + "uri": "http://localhost:8001/geo/{{ip}}?format=json", + "timeout": 5000, + "authentication": {} + } + }, + "outputs": [{ + "schema": "iglu:com.snowplowanalytics.snowplow/geolocation_context/jsonschema/1-0-0", + "json": { + "jsonPath": "$$" + } + }], + "cache": { + "size": 3000, + "ttl": 60 + } + } + }""" + + val correctResultContext4 = + SelfDescribingData( + SchemaKey("com.snowplowanalytics.snowplow", "geolocation_context", "jsonschema", SchemaVer.Full(1, 0, 0)), + json"""{"latitude":32.234,"longitude":33.564}""" + ) + + val schema = + json"""{ "type": "object", "properties": { "latitude": { "type": [ "number" ] }, "longitude": { "type": [ "number" ] } }, "additionalProperties": false }""" } val SCHEMA_KEY = @@ -314,4 +355,19 @@ class ApiRequestEnrichmentIntegrationTest extends Specification { ) and (contexts must have size 2) } } + + def e3 = { + val enrichment = ApiRequestEnrichment + .parse(IntegrationTests.configuration3, SCHEMA_KEY) + .map(_.enrichment[Id]) + .toEither + val event = new EnrichedEvent + event.setUser_ipaddress("127.0.0.1") + val context = enrichment.flatMap(_.lookup(event, Nil, Nil, None).toEither) + context must beRight.like { + case contexts => + (contexts must have size 1) and (contexts must contain(IntegrationTests.correctResultContext4)) and + (CirceValidator.validate(contexts.head.data, IntegrationTests.schema) must beRight) + } + } } diff --git a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/registry/apirequest/ApiRequestEnrichmentSpec.scala b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/registry/apirequest/ApiRequestEnrichmentSpec.scala index eaf36ba21..fa8c2b6d9 100644 --- a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/registry/apirequest/ApiRequestEnrichmentSpec.scala +++ b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/registry/apirequest/ApiRequestEnrichmentSpec.scala @@ -14,22 +14,17 @@ package com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.apireq import cats.Id import cats.syntax.either._ - -import io.circe.Json -import io.circe.literal._ -import io.circe.parser._ - -import scalaj.http.HttpRequest - import com.snowplowanalytics.iglu.core.{SchemaCriterion, SchemaKey, SchemaVer, SelfDescribingData} - import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf.ApiRequestConf import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent import com.snowplowanalytics.snowplow.enrich.common.utils.HttpClient - +import io.circe.Json +import io.circe.literal._ +import io.circe.parser._ import org.specs2.Specification import org.specs2.matcher.ValidatedMatchers import org.specs2.mock.Mockito +import scalaj.http.HttpRequest class ApiRequestEnrichmentSpec extends Specification with ValidatedMatchers with Mockito { def is = s2""" @@ -37,6 +32,7 @@ class ApiRequestEnrichmentSpec extends Specification with ValidatedMatchers with skip incorrect input (none of json or pojo) in configuration $e2 skip incorrect input (both json and pojo) in configuration $e3 extract correct configuration for POST request and perform the request $e4 + parse API output with number field successfully $e5 """ val SCHEMA_KEY = @@ -401,4 +397,32 @@ class ApiRequestEnrichmentSpec extends Specification with ValidatedMatchers with validConfig and validResult } + + def e5 = { + val inputs = List() + val api = + HttpApi( + "GET", + "http://api.acme.com/geo?format=json", + 1000, + Authentication(None) + ) + implicit val idHttpClient: HttpClient[Id] = new HttpClient[Id] { + override def getResponse(request: HttpRequest): Id[Either[Throwable, String]] = + """{"latitude":32.234,"longitude":33.564}""".asRight + } + val output = Output("iglu:com.acme/geo/jsonschema/1-0-0", Some(JsonOutput("$"))) + val cache = Cache(3000, 60) + val config = ApiRequestConf(SCHEMA_KEY, inputs, api, List(output), cache) + + val expectedDerivation = + SelfDescribingData( + SchemaKey("com.acme", "geo", "jsonschema", SchemaVer.Full(1, 0, 0)), + json"""{"latitude": 32.234, "longitude": 33.564}""" + ) + + val enrichedContextResult = config.enrichment[Id].lookup(new EnrichedEvent, Nil, Nil, None) + + enrichedContextResult must beValid(List(expectedDerivation)) + } }