diff --git a/modules/beam/src/test/scala/com.snowplowanalytics.snowplow.enrich.beam/SingletonSpec.scala b/modules/beam/src/test/scala/com.snowplowanalytics.snowplow.enrich.beam/SingletonSpec.scala index 2a250baa3..618369dad 100644 --- a/modules/beam/src/test/scala/com.snowplowanalytics.snowplow.enrich.beam/SingletonSpec.scala +++ b/modules/beam/src/test/scala/com.snowplowanalytics.snowplow.enrich.beam/SingletonSpec.scala @@ -16,6 +16,8 @@ package com.snowplowanalytics.snowplow.enrich.beam import io.circe.literal._ +import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer} + import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry._ import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf.AnonIpConf @@ -25,6 +27,9 @@ import org.scalatest.freespec.AnyFreeSpec import com.snowplowanalytics.snowplow.enrich.beam.singleton._ class SingletonSpec extends AnyFreeSpec { + + val placeholder = SchemaKey("com.acme", "placeholder", "jsonschema", SchemaVer.Full(1, 0, 0)) + "the singleton object should" - { "make a ClientSingleton.get function available" - { "which throws if the resolver can't be parsed" in { @@ -44,7 +49,7 @@ class SingletonSpec extends AnyFreeSpec { "which builds and stores the registry" in { val reg = EnrichmentRegistrySingleton.get( - List(AnonIpConf(AnonIPv4Octets.Two, AnonIPv6Segments.Two)) + List(AnonIpConf(placeholder, AnonIPv4Octets.Two, AnonIPv6Segments.Two)) ) reg.anonIp shouldBe defined } diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/AnonIpEnrichment.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/AnonIpEnrichment.scala index 5dfe3dc95..0c22fd0b1 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/AnonIpEnrichment.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/AnonIpEnrichment.scala @@ -54,7 +54,7 @@ object AnonIpEnrichment extends ParseableEnrichment { .toEither ipv4Octets <- AnonIPv4Octets.fromInt(paramIPv4Octet) ipv6Segment <- AnonIPv6Segments.fromInt(paramIPv6Segment) - } yield AnonIpConf(ipv4Octets, ipv6Segment)).toValidatedNel + } yield AnonIpConf(schemaKey, ipv4Octets, ipv6Segment)).toValidatedNel } /** How many octets (ipv4) to anonymize */ diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/CampaignAttributionEnrichment.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/CampaignAttributionEnrichment.scala index f4f1cd874..e6d53d7d6 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/CampaignAttributionEnrichment.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/CampaignAttributionEnrichment.scala @@ -61,6 +61,7 @@ object CampaignAttributionEnrichment extends ParseableEnrichment { .extract[Map[String, String]](c, "parameters", "fields", "mktClickId") .fold(_ => Map(), s => s) CampaignAttributionConf( + schemaKey, medium, source, term, diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/CookieExtractorEnrichment.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/CookieExtractorEnrichment.scala index 13721cebb..d6b9bb244 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/CookieExtractorEnrichment.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/CookieExtractorEnrichment.scala @@ -43,7 +43,7 @@ object CookieExtractorEnrichment extends ParseableEnrichment { (for { _ <- isParseable(config, schemaKey) cookieNames <- CirceUtils.extract[List[String]](config, "parameters", "cookies").toEither - } yield CookieExtractorConf(cookieNames)).toValidatedNel + } yield CookieExtractorConf(schemaKey, cookieNames)).toValidatedNel } /** diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/EnrichmentConf.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/EnrichmentConf.scala index e4f6dab0d..9e8743ce9 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/EnrichmentConf.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/EnrichmentConf.scala @@ -19,7 +19,7 @@ import cats.data.EitherT import org.joda.money.CurrencyUnit -import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer} +import com.snowplowanalytics.iglu.core.SchemaKey import com.snowplowanalytics.forex.CreateForex import com.snowplowanalytics.forex.model.AccountType @@ -35,13 +35,9 @@ import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.apirequ import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.sqlquery.{CreateSqlQueryEnrichment, Rdbms, SqlQueryEnrichment} sealed trait EnrichmentConf { - def schemaKey: SchemaKey = - SchemaKey( - "com.acme", - "placeholder", - "jsonschema", - SchemaVer.Full(1, 0, 0) - ) + + /** Iglu schema key to identify the enrichment in bad row, some enrichments don't use it */ + def schemaKey: SchemaKey /** * List of files, such as local DBs that need to be downloaded and distributed across workers @@ -53,7 +49,7 @@ sealed trait EnrichmentConf { object EnrichmentConf { final case class ApiRequestConf( - override val schemaKey: SchemaKey, + schemaKey: SchemaKey, inputs: List[apirequest.Input], api: HttpApi, outputs: List[apirequest.Output], @@ -64,6 +60,7 @@ object EnrichmentConf { } final case class PiiPseudonymizerConf( + schemaKey: SchemaKey, fieldList: List[pii.PiiField], emitIdentificationEvent: Boolean, strategy: pii.PiiStrategy @@ -73,7 +70,7 @@ object EnrichmentConf { } final case class SqlQueryConf( - override val schemaKey: SchemaKey, + schemaKey: SchemaKey, inputs: List[sqlquery.Input], db: Rdbms, query: SqlQueryEnrichment.Query, @@ -84,12 +81,16 @@ object EnrichmentConf { SqlQueryEnrichment[F](this) } - final case class AnonIpConf(octets: AnonIPv4Octets.AnonIPv4Octets, segments: AnonIPv6Segments.AnonIPv6Segments) extends EnrichmentConf { - override val filesToCache: List[(URI, String)] = Nil + final case class AnonIpConf( + schemaKey: SchemaKey, + octets: AnonIPv4Octets.AnonIPv4Octets, + segments: AnonIPv6Segments.AnonIPv6Segments + ) extends EnrichmentConf { def enrichment: AnonIpEnrichment = AnonIpEnrichment(octets, segments) } final case class CampaignAttributionConf( + schemaKey: SchemaKey, mediumParameters: List[String], sourceParameters: List[String], termParameters: List[String], @@ -108,12 +109,15 @@ object EnrichmentConf { ) } - final case class CookieExtractorConf(cookieNames: List[String]) extends EnrichmentConf { + final case class CookieExtractorConf( + schemaKey: SchemaKey, + cookieNames: List[String] + ) extends EnrichmentConf { def enrichment: CookieExtractorEnrichment = CookieExtractorEnrichment(cookieNames) } final case class CurrencyConversionConf( - override val schemaKey: SchemaKey, + schemaKey: SchemaKey, accountType: AccountType, apiKey: String, baseCurrency: CurrencyUnit @@ -122,17 +126,24 @@ object EnrichmentConf { CurrencyConversionEnrichment[F](this) } - final case class EventFingerprintConf(algorithm: String => String, excludedParameters: List[String]) extends EnrichmentConf { + final case class EventFingerprintConf( + schemaKey: SchemaKey, + algorithm: String => String, + excludedParameters: List[String] + ) extends EnrichmentConf { def enrichment: EventFingerprintEnrichment = EventFingerprintEnrichment(algorithm, excludedParameters) } - final case class HttpHeaderExtractorConf(headersPattern: String) extends EnrichmentConf { + final case class HttpHeaderExtractorConf( + schemaKey: SchemaKey, + headersPattern: String + ) extends EnrichmentConf { def enrichment: HttpHeaderExtractorEnrichment = HttpHeaderExtractorEnrichment(headersPattern) } final case class IabConf( - override val schemaKey: SchemaKey, + schemaKey: SchemaKey, ipFile: (URI, String), excludeUaFile: (URI, String), includeUaFile: (URI, String) @@ -143,6 +154,7 @@ object EnrichmentConf { } final case class IpLookupsConf( + schemaKey: SchemaKey, geoFile: Option[(URI, String)], ispFile: Option[(URI, String)], domainFile: Option[(URI, String)], @@ -154,28 +166,32 @@ object EnrichmentConf { IpLookupsEnrichment[F](this) } - final case class JavascriptScriptConf(override val schemaKey: SchemaKey, rawFunction: String) extends EnrichmentConf { + final case class JavascriptScriptConf(schemaKey: SchemaKey, rawFunction: String) extends EnrichmentConf { def enrichment: JavascriptScriptEnrichment = JavascriptScriptEnrichment(schemaKey, rawFunction) } - final case class RefererParserConf(refererDatabase: (URI, String), internalDomains: List[String]) extends EnrichmentConf { + final case class RefererParserConf( + schemaKey: SchemaKey, + refererDatabase: (URI, String), + internalDomains: List[String] + ) extends EnrichmentConf { override val filesToCache: List[(URI, String)] = List(refererDatabase) def enrichment[F[_]: Monad: CreateParser]: EitherT[F, String, RefererParserEnrichment] = RefererParserEnrichment[F](this) } - final case class UaParserConf(override val schemaKey: SchemaKey, uaDatabase: Option[(URI, String)]) extends EnrichmentConf { + final case class UaParserConf(schemaKey: SchemaKey, uaDatabase: Option[(URI, String)]) extends EnrichmentConf { override val filesToCache: List[(URI, String)] = List(uaDatabase).flatten def enrichment[F[_]: Monad: CreateUaParser]: EitherT[F, String, UaParserEnrichment] = UaParserEnrichment[F](this) } - final case class UserAgentUtilsConf(override val schemaKey: SchemaKey) extends EnrichmentConf { + final case class UserAgentUtilsConf(schemaKey: SchemaKey) extends EnrichmentConf { def enrichment: UserAgentUtilsEnrichment = UserAgentUtilsEnrichment(schemaKey) } final case class WeatherConf( - override val schemaKey: SchemaKey, + schemaKey: SchemaKey, apiHost: String, apiKey: String, timeout: Int, @@ -186,7 +202,7 @@ object EnrichmentConf { WeatherEnrichment[F](this) } - final case class YauaaConf(cacheSize: Option[Int]) extends EnrichmentConf { + final case class YauaaConf(schemaKey: SchemaKey, cacheSize: Option[Int]) extends EnrichmentConf { def enrichment: YauaaEnrichment = YauaaEnrichment(cacheSize) } } diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/EventFingerprintEnrichment.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/EventFingerprintEnrichment.scala index 607012426..91696c001 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/EventFingerprintEnrichment.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/EventFingerprintEnrichment.scala @@ -57,7 +57,7 @@ object EventFingerprintEnrichment extends ParseableEnrichment { ).mapN((_, _)).toEither algorithm <- getAlgorithm(paramsAndAlgo._2) .leftMap(e => NonEmptyList.one(e)) - } yield EventFingerprintConf(algorithm, paramsAndAlgo._1)).toValidated + } yield EventFingerprintConf(schemaKey, algorithm, paramsAndAlgo._1)).toValidated /** * Look up the fingerprinting algorithm by name diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/HttpHeaderExtractorEnrichment.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/HttpHeaderExtractorEnrichment.scala index 64d934ebd..c7e6060e0 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/HttpHeaderExtractorEnrichment.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/HttpHeaderExtractorEnrichment.scala @@ -48,7 +48,7 @@ object HttpHeaderExtractorEnrichment extends ParseableEnrichment { (for { _ <- isParseable(config, schemaKey) headersPattern <- CirceUtils.extract[String](config, "parameters", "headersPattern").toEither - } yield HttpHeaderExtractorConf(headersPattern)).toValidatedNel + } yield HttpHeaderExtractorConf(schemaKey, headersPattern)).toValidatedNel } /** diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/IpLookupsEnrichment.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/IpLookupsEnrichment.scala index 956221f55..bfab38a0c 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/IpLookupsEnrichment.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/IpLookupsEnrichment.scala @@ -57,6 +57,7 @@ object IpLookupsEnrichment extends ParseableEnrichment { getArgumentFromName(c, "connectionType").sequence ).mapN { (geo, isp, domain, connection) => IpLookupsConf( + schemaKey, file(geo, localMode), file(isp, localMode), file(domain, localMode), @@ -107,6 +108,7 @@ object IpLookupsEnrichment extends ParseableEnrichment { lruCacheSize = 20000 ) .map(i => IpLookupsEnrichment(i)) + } /** diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/RefererParserEnrichment.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/RefererParserEnrichment.scala index 5bc29aedd..4be2235b2 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/RefererParserEnrichment.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/RefererParserEnrichment.scala @@ -56,7 +56,7 @@ object RefererParserEnrichment extends ParseableEnrichment { (uri, db, domains) }.toEither source <- getDatabaseUri(conf._1, conf._2).leftMap(NonEmptyList.one) - } yield RefererParserConf(file(source, conf._2, localFile, localMode), conf._3)).toValidated + } yield RefererParserConf(schemaKey, file(source, conf._2, localFile, localMode), conf._3)).toValidated private def file( uri: URI, diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/YauaaEnrichment.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/YauaaEnrichment.scala index 9f0b9de36..1a3efc8c0 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/YauaaEnrichment.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/YauaaEnrichment.scala @@ -54,7 +54,7 @@ object YauaaEnrichment extends ParseableEnrichment { (for { _ <- isParseable(c, schemaKey) cacheSize <- CirceUtils.extract[Option[Int]](c, "parameters", "cacheSize").toEither - } yield YauaaConf(cacheSize)).toValidatedNel + } yield YauaaConf(schemaKey, cacheSize)).toValidatedNel /** Helper to decapitalize a string. Used for the names of the fields returned in the context. */ def decapitalize(s: String): String = diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/pii/PiiPseudonymizerEnrichment.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/pii/PiiPseudonymizerEnrichment.scala index 474922111..9c5aaa2e6 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/pii/PiiPseudonymizerEnrichment.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/pii/PiiPseudonymizerEnrichment.scala @@ -58,7 +58,7 @@ object PiiPseudonymizerEnrichment extends ParseableEnrichment { localMode: Boolean = false ): ValidatedNel[String, PiiPseudonymizerConf] = { for { - conf <- matchesSchema(config, schemaKey) + conf <- isParseable(config, schemaKey) emitIdentificationEvent = CirceUtils .extract[Boolean](conf, "emitEvent") .toOption @@ -70,7 +70,7 @@ object PiiPseudonymizerEnrichment extends ParseableEnrichment { .extract[PiiStrategyPseudonymize](config, "parameters", "strategy") .toEither piiFieldList <- extractFields(piiFields) - } yield PiiPseudonymizerConf(piiFieldList, emitIdentificationEvent, piiStrategy) + } yield PiiPseudonymizerConf(schemaKey, piiFieldList, emitIdentificationEvent, piiStrategy) }.toValidatedNel private[pii] def getHashFunction(strategyFunction: String): Either[String, DigestFunction] = @@ -132,12 +132,6 @@ object PiiPseudonymizerEnrichment extends ParseableEnrichment { .get(fieldName) .map(_.asRight) .getOrElse(s"The specified json field $fieldName is not supported".asLeft) - - private def matchesSchema(config: Json, schemaKey: SchemaKey): Either[String, Json] = - if (supportedSchema.matches(schemaKey)) - config.asRight - else - s"Schema key $schemaKey is not supported. A '${supportedSchema.name}' enrichment must have schema '$supportedSchema'.".asLeft } /** diff --git a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/registry/EnrichmentConfigsSpec.scala b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/registry/EnrichmentConfigsSpec.scala index 49460c360..7dac5ca0f 100644 --- a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/registry/EnrichmentConfigsSpec.scala +++ b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/registry/EnrichmentConfigsSpec.scala @@ -47,7 +47,7 @@ class EnrichmentConfigsSpec extends Specification with ValidatedMatchers with Da SchemaVer.Full(1, 0, 1) ) val result = AnonIpEnrichment.parse(ipAnonJson, schemaKey) - result must beValid(AnonIpConf(AnonIPv4Octets(2), AnonIPv6Segments(3))) + result must beValid(AnonIpConf(schemaKey, AnonIPv4Octets(2), AnonIPv6Segments(3))) } "successfully construct an AnonIpEnrichment case class with default value for IPv6" in { @@ -64,7 +64,7 @@ class EnrichmentConfigsSpec extends Specification with ValidatedMatchers with Da SchemaVer.Full(1, 0, 0) ) val result = AnonIpEnrichment.parse(ipAnonJson, schemaKey) - result must beValid(AnonIpConf(AnonIPv4Octets(2), AnonIPv6Segments(2))) + result must beValid(AnonIpConf(schemaKey, AnonIPv4Octets(2), AnonIPv6Segments(2))) } } @@ -90,6 +90,7 @@ class EnrichmentConfigsSpec extends Specification with ValidatedMatchers with Da SchemaVer.Full(2, 0, 0) ) val expected = IpLookupsConf( + schemaKey, Some( ( new URI( @@ -136,6 +137,7 @@ class EnrichmentConfigsSpec extends Specification with ValidatedMatchers with Da SchemaVer.Full(2, 0, 0) ) val expected = RefererParserConf( + schemaKey, ( new URI( "http://snowplow-hosted-assets.s3.amazonaws.com/third-party/referer/referer.json" @@ -146,7 +148,6 @@ class EnrichmentConfigsSpec extends Specification with ValidatedMatchers with Da ) val result = RefererParserEnrichment.parse(refererParserJson, schemaKey, false) result must beValid(expected) - } } @@ -179,6 +180,7 @@ class EnrichmentConfigsSpec extends Specification with ValidatedMatchers with Da SchemaVer.Full(1, 0, 0) ) val expected = CampaignAttributionConf( + schemaKey, List("utm_medium", "medium"), List("utm_source", "source"), List("utm_term"), @@ -336,7 +338,7 @@ class EnrichmentConfigsSpec extends Specification with ValidatedMatchers with Da SchemaVer.Full(1, 0, 0) ) val result = CookieExtractorEnrichment.parse(cookieExtractorEnrichmentJson, schemaKey) - result must beValid(CookieExtractorConf(List("foo", "bar"))) + result must beValid(CookieExtractorConf(schemaKey, List("foo", "bar"))) } } diff --git a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/registry/YauaaEnrichmentSpec.scala b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/registry/YauaaEnrichmentSpec.scala index 50089a985..0e6f0e4af 100644 --- a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/registry/YauaaEnrichmentSpec.scala +++ b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/enrichments/registry/YauaaEnrichmentSpec.scala @@ -246,7 +246,7 @@ class YauaaEnrichmentSpec extends Specification with ValidatedMatchers { } }""").toOption.get - val expected = YauaaConf(Some(cacheSize)) + val expected = YauaaConf(schemaKey, Some(cacheSize)) val actual = YauaaEnrichment.parse(yauaaConfigJson, schemaKey) actual must beValid(expected) } @@ -256,7 +256,7 @@ class YauaaEnrichmentSpec extends Specification with ValidatedMatchers { "enabled": true }""").toOption.get - val expected = YauaaConf(None) + val expected = YauaaConf(schemaKey, None) val actual = YauaaEnrichment.parse(yauaaConfigJson, schemaKey) actual must beValid(expected) }