Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix PII pseudonymizer enrichment + unit tests #336

Merged
merged 6 commits into from
Sep 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ import io.circe.jackson._
import io.circe.syntax._

import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.{ArrayNode, ObjectNode, TextNode}
import com.fasterxml.jackson.databind.node.{ArrayNode, NullNode, ObjectNode, TextNode}
import com.fasterxml.jackson.databind.ObjectMapper

import com.jayway.jsonpath.{Configuration, JsonPath => JJsonPath}
import com.jayway.jsonpath.MapFunction
Expand Down Expand Up @@ -132,6 +133,25 @@ object PiiPseudonymizerEnrichment extends ParseableEnrichment {
.get(fieldName)
.map(_.asRight)
.getOrElse(s"The specified json field $fieldName is not supported".asLeft)

/** Helper to remove fields that were wrongly added and are not in the original JSON. See #351. */
private[pii] def removeAddedFields(hashed: Json, original: Json): Json = {
val fixedObject = for {
hashedFields <- hashed.asObject
originalFields <- original.asObject
newFields = hashedFields.toList.flatMap {
case (k, v) => originalFields(k).map(origV => (k, removeAddedFields(v, origV)))
}
} yield Json.fromFields(newFields)

lazy val fixedArray = for {
hashedArr <- hashed.asArray
originalArr <- original.asArray
newArr = hashedArr.zip(originalArr).map { case (hashed, orig) => removeAddedFields(hashed, orig) }
} yield Json.fromValues(newArr)

fixedObject.orElse(fixedArray).getOrElse(hashed)
}
}

/**
Expand Down Expand Up @@ -203,7 +223,8 @@ final case class PiiJson(
)
}
.getOrElse((parsed, List.empty[JsonModifiedField]))
} yield (substituted.noSpaces, modifiedFields.toList)).getOrElse((null, List.empty))
} yield (PiiPseudonymizerEnrichment.removeAddedFields(substituted, parsed).noSpaces, modifiedFields.toList))
.getOrElse((null, List.empty))

/** Map context top fields with strategy if they match. */
private def mapContextTopFields(tuple: (String, Json), strategy: PiiStrategy): (String, (Json, List[JsonModifiedField])) =
Expand Down Expand Up @@ -264,15 +285,15 @@ final case class PiiJson(
val objectNode = io.circe.jackson.mapper.valueToTree[ObjectNode](json)
val documentContext = JJsonPath.using(JsonPathConf).parse(objectNode)
val modifiedFields = MutableList[JsonModifiedField]()
val documentContext2 = documentContext.map(
jsonPath,
new ScrambleMapFunction(strategy, modifiedFields, fieldMutator.fieldName, jsonPath, schema)
)
// make sure it is a structure preserving method, see #3636
//val transformedJValue = JsonMethods.fromJsonNode(documentContext.json[JsonNode]())
//val Diff(_, erroneouslyAdded, _) = jValue diff transformedJValue
//val Diff(_, withoutCruft, _) = erroneouslyAdded diff transformedJValue
(jacksonToCirce(documentContext2.json[JsonNode]()), modifiedFields.toList)
Option(documentContext.read[AnyRef](jsonPath)) match { // check that json object not null
case None => (jacksonToCirce(documentContext.json[JsonNode]()), modifiedFields.toList)
case _ =>
val documentContext2 = documentContext.map(
jsonPath,
new ScrambleMapFunction(strategy, modifiedFields, fieldMutator.fieldName, jsonPath, schema)
)
(jacksonToCirce(documentContext2.json[JsonNode]()), modifiedFields.toList)
}
}
}

Expand All @@ -290,7 +311,9 @@ private final case class ScrambleMapFunction(
val _ = modifiedFields += JsonModifiedField(fieldName, s, newValue, jsonPath, schema)
newValue
case a: ArrayNode =>
a.elements.asScala.map {
val mapper = new ObjectMapper()
val arr = mapper.createArrayNode()
a.elements.asScala.foreach {
case t: TextNode =>
val originalValue = t.asText()
val newValue = strategy.scramble(originalValue)
Expand All @@ -301,9 +324,11 @@ private final case class ScrambleMapFunction(
jsonPath,
schema
)
newValue
case default: AnyRef => default
arr.add(newValue)
case default: AnyRef => arr.add(default)
case null => arr.add(NullNode.getInstance())
}
case default: AnyRef => default
arr
case _ => currentValue
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
},
"emailAddress2": {
"type": "string"
},
"emailAddress3": {
"type": "string"
}
},
"required": ["emailAddress", "emailAddress2"],
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#",
"description": "Schema for acme stuff",
"self": {
"vendor": "com.acme",
"name": "email_sent",
"format": "jsonschema",
"version": "1-1-0"
},
"type": "object",
"properties": {
"emailAddress": {
"type": "string"
},
"emailAddress2": {
"type": "string"
},
"emailAddress3": {
"type": ["string", "null"]
}
},
"required": ["emailAddress", "emailAddress2"],
"additionalProperties": false
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
{
"$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#",
"description": "Schema to test scrambling of array in PII enrichment",
"self": {
"vendor": "com.test",
"name": "array",
"format": "jsonschema",
"version": "1-0-0"
},
"type": "object",
"properties": {
"field": {
"type": "array",
"items": {
"type": ["string", "null" ]
}
},
"field2": {
"type": ["string", "null"]
},
"field3": {
"type": ["object", "null"],
"properties": {
"a": {
"type": "string"
},
"b": {
"type": "string"
}
}
},
"field4": {
"type": "string",
"maxLength": 64
}
},
"required": ["field"],
"additionalProperties": false
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import com.snowplowanalytics.iglu.client.resolver.registries.Registry
import com.snowplowanalytics.iglu.client.validator.CirceValidator

import com.snowplowanalytics.snowplow.badrows.Processor
import com.snowplowanalytics.snowplow.badrows.BadRow

import org.apache.thrift.TSerializer

Expand All @@ -44,6 +45,8 @@ class EtlPipelineSpec extends Specification with ValidatedMatchers {
def is = s2"""
EtlPipeline should always produce either bad or good row for each event of the payload $e1
Processing of events with malformed query string should be supported $e2
Processing of invalid CollectorPayload (CPFormatViolation bad row) should be supported $e3
Absence of CollectorPayload (None) should be supported $e4
benjben marked this conversation as resolved.
Show resolved Hide resolved
"""

val adapterRegistry = new AdapterRegistry()
Expand Down Expand Up @@ -88,6 +91,33 @@ class EtlPipelineSpec extends Specification with ValidatedMatchers {
case res => ko(s"[$res] doesn't contain one enriched event")
}
}

def e3 = {
val invalidCollectorPayload = ThriftLoader.toCollectorPayload(Array(1.toByte), processor)
EtlPipeline.processEvents[Id](
adapterRegistry,
enrichmentReg,
client,
processor,
dateTime,
invalidCollectorPayload
) must be like {
case Validated.Invalid(_: BadRow.CPFormatViolation) :: Nil => ok
case other => ko(s"One invalid CPFormatViolation expected, got ${other}")
}
benjben marked this conversation as resolved.
Show resolved Hide resolved
}

def e4 = {
val collectorPayload: Option[CollectorPayload] = None
EtlPipeline.processEvents[Id](
adapterRegistry,
enrichmentReg,
client,
processor,
dateTime,
collectorPayload.validNel[BadRow]
) must beEqualTo(Nil)
}
}

object EtlPipelineSpec {
Expand Down
Loading