Skip to content

Commit

Permalink
Common: fix number deserialization in API Request Enrichment (close #383
Browse files Browse the repository at this point in the history
)
  • Loading branch information
oguzhanunlu committed Oct 28, 2020
1 parent 9d4cfaa commit 12727ee
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 20 deletions.
5 changes: 5 additions & 0 deletions integration-tests/sce-api-lookup-test.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ def do_GET(self):
response = self.generate_response("GET")
self.end_headers()
self.wfile.write(response)
elif self.path.startswith("/geo"):
self.send_response(200)
response = json.dumps({"latitude":32.234,"longitude":33.564})
self.end_headers()
self.wfile.write(response)
else:
self.wfile.write('not authenticated')

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ package com.snowplowanalytics.snowplow.enrich.common
package enrichments.registry.apirequest

import cats.syntax.either._
import com.fasterxml.jackson.core.JsonFactory
import com.snowplowanalytics.snowplow.enrich.common.utils.CirceUtils
import com.snowplowanalytics.snowplow.enrich.common.utils.JsonPath.{query, wrapArray}
import io.circe._
import io.circe.parser._
import io.circe.syntax._

import utils.JsonPath.{query, wrapArray}

/**
* Base trait for API output format. Primary intention of these classes is to perform transformation
* of API raw output to self-describing JSON instance
Expand Down Expand Up @@ -117,6 +117,7 @@ final case class JsonOutput(jsonPath: String) extends ApiOutput[Json] {
case other => other.leftMap(JsonPathException.apply)
}

private[this] val jsonFactory: JsonFactory = new JsonFactory(CirceUtils.mapper)
def parseResponse(response: String): Either[Throwable, Json] =
parse(response)
Either.catchNonFatal[Json](CirceUtils.mapper.readValue(jsonFactory.createParser(response), classOf[Json]))
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,14 @@ package com.snowplowanalytics.snowplow.enrich.common
package enrichments.registry.apirequest

import cats.Id

import com.snowplowanalytics.iglu.client.CirceValidator
import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData}
import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent
import io.circe._
import io.circe.literal._

import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData}

import org.specs2.Specification
import org.specs2.matcher.Matcher

import outputs.EnrichedEvent

object ApiRequestEnrichmentIntegrationTest {
def continuousIntegration: Boolean =
sys.env.get("CI") match {
Expand All @@ -33,14 +30,15 @@ object ApiRequestEnrichmentIntegrationTest {
}
}

import ApiRequestEnrichmentIntegrationTest._
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.apirequest.ApiRequestEnrichmentIntegrationTest._
class ApiRequestEnrichmentIntegrationTest extends Specification {
def is =
skipAllUnless(continuousIntegration) ^
s2"""
This is a integration test for the ApiRequestEnrichment
Basic Case $e1
POST, Auth, JSON inputs, cache, several outputs $e2
API output with number field $e3
"""

object IntegrationTests {
Expand Down Expand Up @@ -229,6 +227,49 @@ class ApiRequestEnrichmentIntegrationTest extends Specification {
),
json"""{"name": "Snowplow ETL", "jobflow_id": "j-ZKIY4CKQRX72", "state": "RUNNING", "created_at": "2016-01-21T13:14:10.193+03:00"}"""
)

val configuration3 = json"""{
"vendor": "com.snowplowanalytics.snowplow.enrichments",
"name": "api_request_enrichment_config",
"enabled": true,
"parameters": {
"inputs": [
{
"key": "ip",
"pojo": {
"field": "user_ipaddress"
}
}
],
"api": {
"http": {
"method": "GET",
"uri": "http://localhost:8001/geo/{{ip}}?format=json",
"timeout": 5000,
"authentication": {}
}
},
"outputs": [{
"schema": "iglu:com.snowplowanalytics.snowplow/geolocation_context/jsonschema/1-0-0",
"json": {
"jsonPath": "$$"
}
}],
"cache": {
"size": 3000,
"ttl": 60
}
}
}"""

val correctResultContext4 =
SelfDescribingData(
SchemaKey("com.snowplowanalytics.snowplow", "geolocation_context", "jsonschema", SchemaVer.Full(1, 0, 0)),
json"""{"latitude":32.234,"longitude":33.564}"""
)

val schema =
json"""{ "type": "object", "properties": { "latitude": { "type": [ "number" ] }, "longitude": { "type": [ "number" ] } }, "additionalProperties": false }"""
}

val SCHEMA_KEY =
Expand Down Expand Up @@ -314,4 +355,19 @@ class ApiRequestEnrichmentIntegrationTest extends Specification {
) and (contexts must have size 2)
}
}

def e3 = {
val enrichment = ApiRequestEnrichment
.parse(IntegrationTests.configuration3, SCHEMA_KEY)
.map(_.enrichment[Id])
.toEither
val event = new EnrichedEvent
event.setUser_ipaddress("127.0.0.1")
val context = enrichment.flatMap(_.lookup(event, Nil, Nil, None).toEither)
context must beRight.like {
case contexts =>
(contexts must have size 1) and (contexts must contain(IntegrationTests.correctResultContext4)) and
(CirceValidator.validate(contexts.head.data, IntegrationTests.schema) must beRight)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,25 @@ package com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.apireq

import cats.Id
import cats.syntax.either._

import io.circe.Json
import io.circe.literal._
import io.circe.parser._

import scalaj.http.HttpRequest

import com.snowplowanalytics.iglu.core.{SchemaCriterion, SchemaKey, SchemaVer, SelfDescribingData}

import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf.ApiRequestConf
import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent
import com.snowplowanalytics.snowplow.enrich.common.utils.HttpClient

import io.circe.Json
import io.circe.literal._
import io.circe.parser._
import org.specs2.Specification
import org.specs2.matcher.ValidatedMatchers
import org.specs2.mock.Mockito
import scalaj.http.HttpRequest

class ApiRequestEnrichmentSpec extends Specification with ValidatedMatchers with Mockito {
def is = s2"""
extract correct configuration for GET request and perform the request $e1
skip incorrect input (none of json or pojo) in configuration $e2
skip incorrect input (both json and pojo) in configuration $e3
extract correct configuration for POST request and perform the request $e4
parse API output with number field successfully $e5
"""

val SCHEMA_KEY =
Expand Down Expand Up @@ -401,4 +397,32 @@ class ApiRequestEnrichmentSpec extends Specification with ValidatedMatchers with

validConfig and validResult
}

def e5 = {
val inputs = List()
val api =
HttpApi(
"GET",
"http://api.acme.com/geo?format=json",
1000,
Authentication(None)
)
implicit val idHttpClient: HttpClient[Id] = new HttpClient[Id] {
override def getResponse(request: HttpRequest): Id[Either[Throwable, String]] =
"""{"latitude":32.234,"longitude":33.564}""".asRight
}
val output = Output("iglu:com.acme/geo/jsonschema/1-0-0", Some(JsonOutput("$")))
val cache = Cache(3000, 60)
val config = ApiRequestConf(SCHEMA_KEY, inputs, api, List(output), cache)

val expectedDerivation =
SelfDescribingData(
SchemaKey("com.acme", "geo", "jsonschema", SchemaVer.Full(1, 0, 0)),
json"""{"latitude": 32.234, "longitude": 33.564}"""
)

val enrichedContextResult = config.enrichment[Id].lookup(new EnrichedEvent, Nil, Nil, None)

enrichedContextResult must beValid(List(expectedDerivation))
}
}

0 comments on commit 12727ee

Please sign in to comment.