Skip to content

Commit

Permalink
Common: fix API Request Enrichment output deserialization (closes #374)
Browse files Browse the repository at this point in the history
  • Loading branch information
oguzhanunlu authored and chuwy committed Oct 16, 2020
1 parent 35f9e6d commit 9683e92
Show file tree
Hide file tree
Showing 6 changed files with 256 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ package utils

import cats.data.Validated
import cats.syntax.either._

import com.fasterxml.jackson.databind.ObjectMapper
import io.circe._
import io.circe.jackson.enrich.CirceJsonModule

object CirceUtils {

Expand All @@ -43,4 +44,19 @@ object CirceUtils {
s"Could not extract $pathStr as $clas from supplied JSON due to ${e.getMessage}"
}
}

/**
* A custom ObjectMapper specific to Circe JSON AST
*
* The only difference from the original mapper `io.circe.jackson.mapper` is
* how `Long` is deserialized. The original mapper maps a `Long` to `JsonBigDecimal`
* whereas this custom mapper deserializes a `Long` to `JsonLong`.
*
* This customization saves Snowplow events from failing when derived contexts are
* validated post-enrichment. If output schema of API Request Enrichment has an integer
* field, `JsonBigDecimal` representation of a Long results in a bad row
* with message `number found, integer expected` in Iglu Scala Client, since jackson
* treats `DecimalNode` as number in all cases.
*/
final val mapper: ObjectMapper = (new ObjectMapper).registerModule(CirceJsonModule)
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ object JsonPath {
* @param any raw JVM type representing JSON
* @return Json
*/
private def anyToJson(any: Any): Json =
private[utils] def anyToJson(any: Any): Json =
if (any == null) Json.Null
else io.circe.jackson.mapper.convertValue(any, classOf[Json])
else CirceUtils.mapper.convertValue(any, classOf[Json])
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package io.circe.jackson.enrich

import java.util

import com.fasterxml.jackson.core.{JsonParser, JsonTokenId}
import com.fasterxml.jackson.databind.{DeserializationContext, JsonDeserializer}
import io.circe.jackson.{DeserializerContext, JacksonCompat, ReadingList, ReadingMap}
import io.circe.{Json, JsonBigDecimal, JsonLong}

import scala.annotation.{switch, tailrec}
import scala.collection.JavaConverters._

private[jackson] final class CirceJsonDeserializer(klass: Class[_]) extends JsonDeserializer[Object] with JacksonCompat {
override def isCachable: Boolean = true

override def deserialize(jp: JsonParser, ctxt: DeserializationContext): Json = {
val value = deserialize(jp, ctxt, List())
if (!klass.isAssignableFrom(value.getClass)) handleUnexpectedToken(ctxt)(klass, jp)

value
}

@tailrec
def deserialize(
jp: JsonParser,
ctxt: DeserializationContext,
parserContext: List[DeserializerContext]
): Json = {
if (jp.getCurrentToken == null) jp.nextToken()

val (maybeValue, nextContext) = (jp.getCurrentToken.id(): @switch) match {
case JsonTokenId.ID_NUMBER_INT => (Some(Json.JNumber(JsonLong(jp.getLongValue))), parserContext)
case JsonTokenId.ID_NUMBER_FLOAT => (Some(Json.JNumber(JsonBigDecimal(jp.getDecimalValue))), parserContext)
case JsonTokenId.ID_STRING => (Some(Json.JString(jp.getText)), parserContext)
case JsonTokenId.ID_TRUE => (Some(Json.JBoolean(true)), parserContext)
case JsonTokenId.ID_FALSE => (Some(Json.JBoolean(false)), parserContext)
case JsonTokenId.ID_NULL => (Some(Json.JNull), parserContext)
case JsonTokenId.ID_START_ARRAY => (None, ReadingList(new util.ArrayList) +: parserContext)

case JsonTokenId.ID_END_ARRAY =>
parserContext match {
case ReadingList(content) :: stack =>
(Some(Json.fromValues(content.asScala)), stack)
case _ => throw new IllegalStateException("Jackson read ']' but parser context is not an array")
}

case JsonTokenId.ID_START_OBJECT => (None, ReadingMap(new util.ArrayList) +: parserContext)

case JsonTokenId.ID_FIELD_NAME =>
parserContext match {
case (c: ReadingMap) :: stack => (None, c.setField(jp.getCurrentName) +: stack)
case _ =>
throw new IllegalStateException("Jackson read a String field name but parser context is not a json object")
}

case JsonTokenId.ID_END_OBJECT =>
parserContext match {
case ReadingMap(content) :: stack =>
(
Some(Json.fromFields(content.asScala)),
stack
)
case _ => throw new IllegalStateException("Jackson read '}' but parser context is not a json object")
}

case JsonTokenId.ID_NOT_AVAILABLE =>
throw new IllegalStateException("Jackson can't return the json token yet")

case JsonTokenId.ID_EMBEDDED_OBJECT =>
throw new IllegalStateException("Jackson read embedded object but json object was expected")
}

maybeValue match {
case Some(v) if nextContext.isEmpty => v
case maybeValue =>
jp.nextToken()
val toPass = maybeValue
.map { v =>
val previous :: stack = nextContext
previous.addValue(v) +: stack
}
.getOrElse(nextContext)

deserialize(jp, ctxt, toPass)
}
}

override def getNullValue = Json.JNull
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package io.circe.jackson.enrich

import com.fasterxml.jackson.core.Version
import com.fasterxml.jackson.databind.Module.SetupContext
import com.fasterxml.jackson.databind._
import com.fasterxml.jackson.databind.deser.Deserializers
import com.fasterxml.jackson.databind.module.SimpleModule
import com.fasterxml.jackson.databind.ser.Serializers
import io.circe.Json
import io.circe.jackson.CirceJsonSerializer

object CirceJsonModule extends SimpleModule("SPCirceJson", Version.unknownVersion()) {
override final def setupModule(context: SetupContext): Unit = {
context.addDeserializers(
new Deserializers.Base {
override final def findBeanDeserializer(
javaType: JavaType,
config: DeserializationConfig,
beanDesc: BeanDescription
): CirceJsonDeserializer = {
val klass = javaType.getRawClass
if (classOf[Json].isAssignableFrom(klass) || klass == Json.JNull.getClass)
new CirceJsonDeserializer(klass)
else null
}
}
)

context.addSerializers(
new Serializers.Base {
override final def findSerializer(
config: SerializationConfig,
javaType: JavaType,
beanDesc: BeanDescription
): JsonSerializer[Object] = {
val ser: Object =
if (classOf[Json].isAssignableFrom(beanDesc.getBeanClass))
CirceJsonSerializer
else null

ser.asInstanceOf[JsonSerializer[Object]]
}
}
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright (c) 2012-2020 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/

package com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.apirequest

import com.snowplowanalytics.iglu.client.CirceValidator
import com.snowplowanalytics.snowplow.enrich.common.utils.JsonPath.query
import io.circe.Json
import io.circe.literal.JsonStringContext
import org.specs2.Specification
import org.specs2.matcher.ValidatedMatchers
import org.specs2.specification.core.SpecStructure

class ValidatorSpec extends Specification with ValidatedMatchers {
override def is: SpecStructure = s2"""
validate integer field using a valid long value (maximum long) $e1
validate integer field using a valid long value (minimum long) $e2
validate number field using a positive float value $e3
validate number field using a negative float value $e4
validate number field using a negative double value $e5
validate number field using a positive double value $e6
invalidate integer field using a positive double value $e7
"""

val schema =
json"""{ "type": "object", "properties": { "orderID": { "type": "integer" }, "price": { "type": "number" } }, "additionalProperties": false }"""

def e1 =
query("$", json"""{"orderID": 9223372036854775807 }""")
.flatMap(fb => CirceValidator.validate(fb.head, schema)) must beRight

def e2 =
query("$", json"""{"orderID": -9223372036854775808 }""")
.flatMap(fb => CirceValidator.validate(fb.head, schema)) must beRight

def e3 =
query("$", json"""{"price": ${Json.fromFloatOrString(88.92f)} }""")
.flatMap(fb => CirceValidator.validate(fb.head, schema)) must beRight

def e4 =
query("$", json"""{"price": ${Json.fromFloatOrString(-34345328.72f)} }""")
.flatMap(fb => CirceValidator.validate(fb.head, schema)) must beRight

def e5 =
query("$", json"""{"price": ${Json.fromDoubleOrString(-34345488.72)} }""")
.flatMap(fb => CirceValidator.validate(fb.head, schema)) must beRight

def e6 =
query("$", json"""{"price": ${Json.fromDoubleOrString(32488.72)} }""")
.flatMap(fb => CirceValidator.validate(fb.head, schema)) must beRight

def e7 =
query("$", json"""{"orderID": ${Json.fromDoubleOrString(32488.72)} }""")
.flatMap(fb => CirceValidator.validate(fb.head, schema)) must beLeft
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package com.snowplowanalytics.snowplow.enrich.common.utils

import io.circe._
import io.circe.literal.JsonStringContext
import io.circe.syntax._
import org.specs2.Specification

Expand All @@ -21,9 +22,14 @@ class JsonPathSpec extends Specification {
test JSONPath query $e1
test query of non-exist value $e2
test query of empty array $e3
test primtive JSON type (JString) $e6
test primitive JSON type (JString) $e6
invalid JSONPath (JQ syntax) must fail $e4
invalid JSONPath must fail $e5
test query of long $e7
test query of integer $e8
test query of string $e9
test query of double $e10
test query of big decimal $e11
"""

val someJson = Json.obj(
Expand Down Expand Up @@ -88,4 +94,34 @@ class JsonPathSpec extends Specification {

def e6 =
JsonPath.query("$.store.book[2]", Json.fromString("somestring")) must beRight(List())

def e7 = {
val q1 = JsonPath.query("$.empId", json"""{ "empId": 2147483649 }""") must beRight(List(Json.fromLong(2147483649L)))
val q2 = JsonPath.query("$.empId", json"""{ "empId": ${Json.fromLong(2147483649L)} }""") must beRight(List(Json.fromLong(2147483649L)))
q1 and q2
}

def e8 = {
val q1 = JsonPath.query("$.empId", json"""{ "empId": 1086 }""") must beRight(List(Json.fromInt(1086)))
val q2 = JsonPath.query("$.empId", json"""{ "empId": ${Json.fromInt(-1086)} }""") must beRight(List(Json.fromInt(-1086)))
q1 and q2
}

def e9 = {
val q1 = JsonPath.query("$.empName", json"""{ "empName": "ABC" }""") must beRight(List(Json.fromString("ABC")))
val q2 = JsonPath.query("$.empName", json"""{ "empName": ${Json.fromString("XYZ")} }""") must beRight(List(Json.fromString("XYZ")))
q1 and q2
}

def e10 = {
val q1 = JsonPath.query("$.id", json"""{ "id": ${Json.fromDouble(44.54)} }""") must beRight(List(Json.fromDoubleOrNull(44.54)))
val q2 = JsonPath.query("$.id", json"""{ "id": ${Json.fromDouble(20.20)} }""") must beRight(List(Json.fromDoubleOrString(20.20)))
q1 and q2
}

def e11 = {
val q1 = JsonPath.query("$.id", json"""{ "id": ${Json.fromBigDecimal(44.54)} }""") must beRight(List(Json.fromBigDecimal(44.54)))
val q2 = JsonPath.query("$.id", json"""{ "id": ${Json.fromBigDecimal(20.20)} }""") must beRight(List(Json.fromBigDecimal(20.20)))
q1 and q2
}
}

0 comments on commit 9683e92

Please sign in to comment.