Skip to content

Commit

Permalink
Common: get rid of placeholder schema in enrichment configurations (c…
Browse files Browse the repository at this point in the history
…lose #302)
  • Loading branch information
chuwy committed Aug 19, 2020
1 parent b3cd06f commit d0ab6b6
Show file tree
Hide file tree
Showing 13 changed files with 63 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package com.snowplowanalytics.snowplow.enrich.beam

import io.circe.literal._

import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer}

import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry._
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf.AnonIpConf

Expand All @@ -25,6 +27,9 @@ import org.scalatest.freespec.AnyFreeSpec
import com.snowplowanalytics.snowplow.enrich.beam.singleton._

class SingletonSpec extends AnyFreeSpec {

val placeholder = SchemaKey("com.acme", "placeholder", "jsonschema", SchemaVer.Full(1, 0, 0))

"the singleton object should" - {
"make a ClientSingleton.get function available" - {
"which throws if the resolver can't be parsed" in {
Expand All @@ -44,7 +49,7 @@ class SingletonSpec extends AnyFreeSpec {
"which builds and stores the registry" in {
val reg =
EnrichmentRegistrySingleton.get(
List(AnonIpConf(AnonIPv4Octets.Two, AnonIPv6Segments.Two))
List(AnonIpConf(placeholder, AnonIPv4Octets.Two, AnonIPv6Segments.Two))
)
reg.anonIp shouldBe defined
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ object AnonIpEnrichment extends ParseableEnrichment {
.toEither
ipv4Octets <- AnonIPv4Octets.fromInt(paramIPv4Octet)
ipv6Segment <- AnonIPv6Segments.fromInt(paramIPv6Segment)
} yield AnonIpConf(ipv4Octets, ipv6Segment)).toValidatedNel
} yield AnonIpConf(schemaKey, ipv4Octets, ipv6Segment)).toValidatedNel
}

/** How many octets (ipv4) to anonymize */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ object CampaignAttributionEnrichment extends ParseableEnrichment {
.extract[Map[String, String]](c, "parameters", "fields", "mktClickId")
.fold(_ => Map(), s => s)
CampaignAttributionConf(
schemaKey,
medium,
source,
term,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ object CookieExtractorEnrichment extends ParseableEnrichment {
(for {
_ <- isParseable(config, schemaKey)
cookieNames <- CirceUtils.extract[List[String]](config, "parameters", "cookies").toEither
} yield CookieExtractorConf(cookieNames)).toValidatedNel
} yield CookieExtractorConf(schemaKey, cookieNames)).toValidatedNel
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import cats.data.EitherT

import org.joda.money.CurrencyUnit

import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer}
import com.snowplowanalytics.iglu.core.SchemaKey

import com.snowplowanalytics.forex.CreateForex
import com.snowplowanalytics.forex.model.AccountType
Expand All @@ -35,13 +35,8 @@ import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.apirequ
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.sqlquery.{CreateSqlQueryEnrichment, Rdbms, SqlQueryEnrichment}

sealed trait EnrichmentConf {
def schemaKey: SchemaKey =
SchemaKey(
"com.acme",
"placeholder",
"jsonschema",
SchemaVer.Full(1, 0, 0)
)
/** Iglu schema key to identify the enrichment in bad row, some enrichments don't use it */
def schemaKey: SchemaKey

/**
* List of files, such as local DBs that need to be downloaded and distributed across workers
Expand All @@ -53,7 +48,7 @@ sealed trait EnrichmentConf {
object EnrichmentConf {

final case class ApiRequestConf(
override val schemaKey: SchemaKey,
schemaKey: SchemaKey,
inputs: List[apirequest.Input],
api: HttpApi,
outputs: List[apirequest.Output],
Expand All @@ -64,6 +59,7 @@ object EnrichmentConf {
}

final case class PiiPseudonymizerConf(
schemaKey: SchemaKey,
fieldList: List[pii.PiiField],
emitIdentificationEvent: Boolean,
strategy: pii.PiiStrategy
Expand All @@ -73,7 +69,7 @@ object EnrichmentConf {
}

final case class SqlQueryConf(
override val schemaKey: SchemaKey,
schemaKey: SchemaKey,
inputs: List[sqlquery.Input],
db: Rdbms,
query: SqlQueryEnrichment.Query,
Expand All @@ -84,12 +80,16 @@ object EnrichmentConf {
SqlQueryEnrichment[F](this)
}

final case class AnonIpConf(octets: AnonIPv4Octets.AnonIPv4Octets, segments: AnonIPv6Segments.AnonIPv6Segments) extends EnrichmentConf {
override val filesToCache: List[(URI, String)] = Nil
final case class AnonIpConf(
schemaKey: SchemaKey,
octets: AnonIPv4Octets.AnonIPv4Octets,
segments: AnonIPv6Segments.AnonIPv6Segments
) extends EnrichmentConf {
def enrichment: AnonIpEnrichment = AnonIpEnrichment(octets, segments)
}

final case class CampaignAttributionConf(
schemaKey: SchemaKey,
mediumParameters: List[String],
sourceParameters: List[String],
termParameters: List[String],
Expand All @@ -108,12 +108,15 @@ object EnrichmentConf {
)
}

final case class CookieExtractorConf(cookieNames: List[String]) extends EnrichmentConf {
final case class CookieExtractorConf(
schemaKey: SchemaKey,
cookieNames: List[String]
) extends EnrichmentConf {
def enrichment: CookieExtractorEnrichment = CookieExtractorEnrichment(cookieNames)
}

final case class CurrencyConversionConf(
override val schemaKey: SchemaKey,
schemaKey: SchemaKey,
accountType: AccountType,
apiKey: String,
baseCurrency: CurrencyUnit
Expand All @@ -122,17 +125,24 @@ object EnrichmentConf {
CurrencyConversionEnrichment[F](this)
}

final case class EventFingerprintConf(algorithm: String => String, excludedParameters: List[String]) extends EnrichmentConf {
final case class EventFingerprintConf(
schemaKey: SchemaKey,
algorithm: String => String,
excludedParameters: List[String]
) extends EnrichmentConf {
def enrichment: EventFingerprintEnrichment =
EventFingerprintEnrichment(algorithm, excludedParameters)
}

final case class HttpHeaderExtractorConf(headersPattern: String) extends EnrichmentConf {
final case class HttpHeaderExtractorConf(
schemaKey: SchemaKey,
headersPattern: String
) extends EnrichmentConf {
def enrichment: HttpHeaderExtractorEnrichment = HttpHeaderExtractorEnrichment(headersPattern)
}

final case class IabConf(
override val schemaKey: SchemaKey,
schemaKey: SchemaKey,
ipFile: (URI, String),
excludeUaFile: (URI, String),
includeUaFile: (URI, String)
Expand All @@ -143,6 +153,7 @@ object EnrichmentConf {
}

final case class IpLookupsConf(
schemaKey: SchemaKey,
geoFile: Option[(URI, String)],
ispFile: Option[(URI, String)],
domainFile: Option[(URI, String)],
Expand All @@ -154,28 +165,32 @@ object EnrichmentConf {
IpLookupsEnrichment[F](this)
}

final case class JavascriptScriptConf(override val schemaKey: SchemaKey, rawFunction: String) extends EnrichmentConf {
final case class JavascriptScriptConf(schemaKey: SchemaKey, rawFunction: String) extends EnrichmentConf {
def enrichment: JavascriptScriptEnrichment = JavascriptScriptEnrichment(schemaKey, rawFunction)
}

final case class RefererParserConf(refererDatabase: (URI, String), internalDomains: List[String]) extends EnrichmentConf {
final case class RefererParserConf(
schemaKey: SchemaKey,
refererDatabase: (URI, String),
internalDomains: List[String]
) extends EnrichmentConf {
override val filesToCache: List[(URI, String)] = List(refererDatabase)
def enrichment[F[_]: Monad: CreateParser]: EitherT[F, String, RefererParserEnrichment] =
RefererParserEnrichment[F](this)
}

final case class UaParserConf(override val schemaKey: SchemaKey, uaDatabase: Option[(URI, String)]) extends EnrichmentConf {
final case class UaParserConf(schemaKey: SchemaKey, uaDatabase: Option[(URI, String)]) extends EnrichmentConf {
override val filesToCache: List[(URI, String)] = List(uaDatabase).flatten
def enrichment[F[_]: Monad: CreateUaParser]: EitherT[F, String, UaParserEnrichment] =
UaParserEnrichment[F](this)
}

final case class UserAgentUtilsConf(override val schemaKey: SchemaKey) extends EnrichmentConf {
final case class UserAgentUtilsConf(schemaKey: SchemaKey) extends EnrichmentConf {
def enrichment: UserAgentUtilsEnrichment = UserAgentUtilsEnrichment(schemaKey)
}

final case class WeatherConf(
override val schemaKey: SchemaKey,
schemaKey: SchemaKey,
apiHost: String,
apiKey: String,
timeout: Int,
Expand All @@ -186,7 +201,7 @@ object EnrichmentConf {
WeatherEnrichment[F](this)
}

final case class YauaaConf(cacheSize: Option[Int]) extends EnrichmentConf {
final case class YauaaConf(schemaKey: SchemaKey, cacheSize: Option[Int]) extends EnrichmentConf {
def enrichment: YauaaEnrichment = YauaaEnrichment(cacheSize)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ object EventFingerprintEnrichment extends ParseableEnrichment {
).mapN((_, _)).toEither
algorithm <- getAlgorithm(paramsAndAlgo._2)
.leftMap(e => NonEmptyList.one(e))
} yield EventFingerprintConf(algorithm, paramsAndAlgo._1)).toValidated
} yield EventFingerprintConf(schemaKey, algorithm, paramsAndAlgo._1)).toValidated

/**
* Look up the fingerprinting algorithm by name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ object HttpHeaderExtractorEnrichment extends ParseableEnrichment {
(for {
_ <- isParseable(config, schemaKey)
headersPattern <- CirceUtils.extract[String](config, "parameters", "headersPattern").toEither
} yield HttpHeaderExtractorConf(headersPattern)).toValidatedNel
} yield HttpHeaderExtractorConf(schemaKey, headersPattern)).toValidatedNel
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ object IpLookupsEnrichment extends ParseableEnrichment {
getArgumentFromName(c, "connectionType").sequence
).mapN { (geo, isp, domain, connection) =>
IpLookupsConf(
schemaKey,
file(geo, localMode),
file(isp, localMode),
file(domain, localMode),
Expand Down Expand Up @@ -107,6 +108,7 @@ object IpLookupsEnrichment extends ParseableEnrichment {
lruCacheSize = 20000
)
.map(i => IpLookupsEnrichment(i))

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ object RefererParserEnrichment extends ParseableEnrichment {
(uri, db, domains)
}.toEither
source <- getDatabaseUri(conf._1, conf._2).leftMap(NonEmptyList.one)
} yield RefererParserConf(file(source, conf._2, localFile, localMode), conf._3)).toValidated
} yield RefererParserConf(schemaKey, file(source, conf._2, localFile, localMode), conf._3)).toValidated

private def file(
uri: URI,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ object YauaaEnrichment extends ParseableEnrichment {
(for {
_ <- isParseable(c, schemaKey)
cacheSize <- CirceUtils.extract[Option[Int]](c, "parameters", "cacheSize").toEither
} yield YauaaConf(cacheSize)).toValidatedNel
} yield YauaaConf(schemaKey, cacheSize)).toValidatedNel

/** Helper to decapitalize a string. Used for the names of the fields returned in the context. */
def decapitalize(s: String): String =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ object PiiPseudonymizerEnrichment extends ParseableEnrichment {
localMode: Boolean = false
): ValidatedNel[String, PiiPseudonymizerConf] = {
for {
conf <- matchesSchema(config, schemaKey)
conf <- isParseable(config, schemaKey)
emitIdentificationEvent = CirceUtils
.extract[Boolean](conf, "emitEvent")
.toOption
Expand All @@ -70,7 +70,7 @@ object PiiPseudonymizerEnrichment extends ParseableEnrichment {
.extract[PiiStrategyPseudonymize](config, "parameters", "strategy")
.toEither
piiFieldList <- extractFields(piiFields)
} yield PiiPseudonymizerConf(piiFieldList, emitIdentificationEvent, piiStrategy)
} yield PiiPseudonymizerConf(schemaKey, piiFieldList, emitIdentificationEvent, piiStrategy)
}.toValidatedNel

private[pii] def getHashFunction(strategyFunction: String): Either[String, DigestFunction] =
Expand Down Expand Up @@ -132,12 +132,6 @@ object PiiPseudonymizerEnrichment extends ParseableEnrichment {
.get(fieldName)
.map(_.asRight)
.getOrElse(s"The specified json field $fieldName is not supported".asLeft)

private def matchesSchema(config: Json, schemaKey: SchemaKey): Either[String, Json] =
if (supportedSchema.matches(schemaKey))
config.asRight
else
s"Schema key $schemaKey is not supported. A '${supportedSchema.name}' enrichment must have schema '$supportedSchema'.".asLeft
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class EnrichmentConfigsSpec extends Specification with ValidatedMatchers with Da
SchemaVer.Full(1, 0, 1)
)
val result = AnonIpEnrichment.parse(ipAnonJson, schemaKey)
result must beValid(AnonIpConf(AnonIPv4Octets(2), AnonIPv6Segments(3)))
result must beValid(AnonIpConf(schemaKey, AnonIPv4Octets(2), AnonIPv6Segments(3)))
}

"successfully construct an AnonIpEnrichment case class with default value for IPv6" in {
Expand All @@ -64,7 +64,7 @@ class EnrichmentConfigsSpec extends Specification with ValidatedMatchers with Da
SchemaVer.Full(1, 0, 0)
)
val result = AnonIpEnrichment.parse(ipAnonJson, schemaKey)
result must beValid(AnonIpConf(AnonIPv4Octets(2), AnonIPv6Segments(2)))
result must beValid(AnonIpConf(schemaKey, AnonIPv4Octets(2), AnonIPv6Segments(2)))
}
}

Expand All @@ -90,6 +90,7 @@ class EnrichmentConfigsSpec extends Specification with ValidatedMatchers with Da
SchemaVer.Full(2, 0, 0)
)
val expected = IpLookupsConf(
schemaKey,
Some(
(
new URI(
Expand Down Expand Up @@ -136,6 +137,7 @@ class EnrichmentConfigsSpec extends Specification with ValidatedMatchers with Da
SchemaVer.Full(2, 0, 0)
)
val expected = RefererParserConf(
schemaKey,
(
new URI(
"http://snowplow-hosted-assets.s3.amazonaws.com/third-party/referer/referer.json"
Expand All @@ -146,7 +148,6 @@ class EnrichmentConfigsSpec extends Specification with ValidatedMatchers with Da
)
val result = RefererParserEnrichment.parse(refererParserJson, schemaKey, false)
result must beValid(expected)

}
}

Expand Down Expand Up @@ -179,6 +180,7 @@ class EnrichmentConfigsSpec extends Specification with ValidatedMatchers with Da
SchemaVer.Full(1, 0, 0)
)
val expected = CampaignAttributionConf(
schemaKey,
List("utm_medium", "medium"),
List("utm_source", "source"),
List("utm_term"),
Expand Down Expand Up @@ -336,7 +338,7 @@ class EnrichmentConfigsSpec extends Specification with ValidatedMatchers with Da
SchemaVer.Full(1, 0, 0)
)
val result = CookieExtractorEnrichment.parse(cookieExtractorEnrichmentJson, schemaKey)
result must beValid(CookieExtractorConf(List("foo", "bar")))
result must beValid(CookieExtractorConf(schemaKey, List("foo", "bar")))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ class YauaaEnrichmentSpec extends Specification with ValidatedMatchers {
}
}""").toOption.get

val expected = YauaaConf(Some(cacheSize))
val expected = YauaaConf(schemaKey, Some(cacheSize))
val actual = YauaaEnrichment.parse(yauaaConfigJson, schemaKey)
actual must beValid(expected)
}
Expand All @@ -256,7 +256,7 @@ class YauaaEnrichmentSpec extends Specification with ValidatedMatchers {
"enabled": true
}""").toOption.get

val expected = YauaaConf(None)
val expected = YauaaConf(schemaKey, None)
val actual = YauaaEnrichment.parse(yauaaConfigJson, schemaKey)
actual must beValid(expected)
}
Expand Down

0 comments on commit d0ab6b6

Please sign in to comment.