Skip to content

Commit

Permalink
Common: fix number deserialization in API Request Enrichment (close #383
Browse files Browse the repository at this point in the history
)
  • Loading branch information
oguzhanunlu committed Oct 29, 2020
1 parent 9d4cfaa commit 7a11bd5
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 1 deletion.
5 changes: 5 additions & 0 deletions integration-tests/sce-api-lookup-test.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ def do_GET(self):
response = self.generate_response("GET")
self.end_headers()
self.wfile.write(response)
elif self.path.startswith("/geo"):
self.send_response(200)
response = json.dumps({"latitude":32.234,"longitude":33.564})
self.end_headers()
self.wfile.write(response)
else:
self.wfile.write('not authenticated')

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ package enrichments.registry.apirequest

import cats.syntax.either._
import io.circe._
import io.circe.parser._
import io.circe.syntax._

import io.circe.jackson.enrich.parse
import utils.JsonPath.{query, wrapArray}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright (c) 2012-2020 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/

package io.circe.jackson

import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
import com.snowplowanalytics.snowplow.enrich.common.utils.CirceUtils
import io.circe.{Json, ParsingFailure}

import scala.util.control.NonFatal

/**
* This package is created to be able to change semantics of
* circe-jackson library in special cases by creating a custom
* ObjectMapper.
*
* These special cases include circe-jackson #217, #65 and #49
*
* This package is to be removed either when jackson is replaced
* with another parser, possibly jawn, or when circe-jackson has
* a new version that we can use out of the box.
*/
package object enrich extends JacksonCompat {

val jsonFactory: JsonFactory = new JsonFactory(CirceUtils.mapper)

def jsonStringParser(input: String): JsonParser = jsonFactory.createParser(input)

def parse(input: String): Either[ParsingFailure, Json] =
try Right(CirceUtils.mapper.readValue(jsonStringParser(input), classOf[Json]))
catch {
case NonFatal(error) => Left(ParsingFailure(error.getMessage, error))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import cats.Id
import io.circe._
import io.circe.literal._

import com.snowplowanalytics.iglu.client.CirceValidator
import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData}

import org.specs2.Specification
Expand All @@ -41,6 +42,7 @@ class ApiRequestEnrichmentIntegrationTest extends Specification {
This is a integration test for the ApiRequestEnrichment
Basic Case $e1
POST, Auth, JSON inputs, cache, several outputs $e2
API output with number field $e3
"""

object IntegrationTests {
Expand Down Expand Up @@ -229,6 +231,49 @@ class ApiRequestEnrichmentIntegrationTest extends Specification {
),
json"""{"name": "Snowplow ETL", "jobflow_id": "j-ZKIY4CKQRX72", "state": "RUNNING", "created_at": "2016-01-21T13:14:10.193+03:00"}"""
)

val configuration3 = json"""{
"vendor": "com.snowplowanalytics.snowplow.enrichments",
"name": "api_request_enrichment_config",
"enabled": true,
"parameters": {
"inputs": [
{
"key": "ip",
"pojo": {
"field": "user_ipaddress"
}
}
],
"api": {
"http": {
"method": "GET",
"uri": "http://localhost:8001/geo/{{ip}}?format=json",
"timeout": 5000,
"authentication": {}
}
},
"outputs": [{
"schema": "iglu:com.snowplowanalytics.snowplow/geolocation_context/jsonschema/1-0-0",
"json": {
"jsonPath": "$$"
}
}],
"cache": {
"size": 3000,
"ttl": 60
}
}
}"""

val correctResultContext4 =
SelfDescribingData(
SchemaKey("com.snowplowanalytics.snowplow", "geolocation_context", "jsonschema", SchemaVer.Full(1, 0, 0)),
json"""{"latitude":32.234,"longitude":33.564}"""
)

val schema =
json"""{ "type": "object", "properties": { "latitude": { "type": [ "number" ] }, "longitude": { "type": [ "number" ] } }, "additionalProperties": false }"""
}

val SCHEMA_KEY =
Expand Down Expand Up @@ -314,4 +359,19 @@ class ApiRequestEnrichmentIntegrationTest extends Specification {
) and (contexts must have size 2)
}
}

def e3 = {
val enrichment = ApiRequestEnrichment
.parse(IntegrationTests.configuration3, SCHEMA_KEY)
.map(_.enrichment[Id])
.toEither
val event = new EnrichedEvent
event.setUser_ipaddress("127.0.0.1")
val context = enrichment.flatMap(_.lookup(event, Nil, Nil, None).toEither)
context must beRight.like {
case contexts =>
(contexts must have size 1) and (contexts must contain(IntegrationTests.correctResultContext4)) and
(CirceValidator.validate(contexts.head.data, IntegrationTests.schema) must beRight)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class ApiRequestEnrichmentSpec extends Specification with ValidatedMatchers with
skip incorrect input (none of json or pojo) in configuration $e2
skip incorrect input (both json and pojo) in configuration $e3
extract correct configuration for POST request and perform the request $e4
parse API output with number field successfully $e5
"""

val SCHEMA_KEY =
Expand Down Expand Up @@ -401,4 +402,32 @@ class ApiRequestEnrichmentSpec extends Specification with ValidatedMatchers with

validConfig and validResult
}

def e5 = {
val inputs = List()
val api =
HttpApi(
"GET",
"http://api.acme.com/geo?format=json",
1000,
Authentication(None)
)
implicit val idHttpClient: HttpClient[Id] = new HttpClient[Id] {
override def getResponse(request: HttpRequest): Id[Either[Throwable, String]] =
"""{"latitude":32.234,"longitude":33.564}""".asRight
}
val output = Output("iglu:com.acme/geo/jsonschema/1-0-0", Some(JsonOutput("$")))
val cache = Cache(3000, 60)
val config = ApiRequestConf(SCHEMA_KEY, inputs, api, List(output), cache)

val expectedDerivation =
SelfDescribingData(
SchemaKey("com.acme", "geo", "jsonschema", SchemaVer.Full(1, 0, 0)),
json"""{"latitude": 32.234, "longitude": 33.564}"""
)

val enrichedContextResult = config.enrichment[Id].lookup(new EnrichedEvent, Nil, Nil, None)

enrichedContextResult must beValid(List(expectedDerivation))
}
}

0 comments on commit 7a11bd5

Please sign in to comment.