Skip to content

Commit

Permalink
Common: move EnrichmentConf into its own module (close #303)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuwy authored and benjben committed Sep 8, 2020
1 parent 024e2c8 commit bfc443f
Show file tree
Hide file tree
Showing 39 changed files with 488 additions and 385 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,18 @@ import scala.util.Try

import cats.Id
import cats.effect.Clock

import io.circe.Json
import io.circe.syntax._
import com.snowplowanalytics.snowplow.badrows._
import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent

import org.joda.time.{DateTime, DateTimeZone}
import org.joda.time.format.DateTimeFormat

import com.snowplowanalytics.snowplow.badrows._

import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.PiiPseudonymizerConf
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf.PiiPseudonymizerConf

object utils {

Expand Down Expand Up @@ -88,10 +92,8 @@ object utils {
/** Determine if we have to emit pii transformation events. */
def emitPii(confs: List[EnrichmentConf]): Boolean =
confs
.collect { case c: PiiPseudonymizerConf => c }
.headOption
.map(_.emitIdentificationEvent)
.getOrElse(false)
.collectFirst { case c: PiiPseudonymizerConf => c }
.exists(_.emitIdentificationEvent)

// We want to take one-tenth of the payload characters (not taking into account multi-bytes char)
private val ReductionFactor = 10
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,16 @@
*/
package com.snowplowanalytics.snowplow.enrich.beam

import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry._
import io.circe.literal._
import org.scalatest._
import matchers.should.Matchers._

import singleton._
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry._
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf.AnonIpConf

import org.scalatest.matchers.should.Matchers._
import org.scalatest.freespec.AnyFreeSpec

import com.snowplowanalytics.snowplow.enrich.beam.singleton._

class SingletonSpec extends AnyFreeSpec {
"the singleton object should" - {
"make a ClientSingleton.get function available" - {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ object SpecHelpers {
contentType: Option[String] = None,
headers: List[String] = Nil,
ipAddress: String = "",
networkUserId: String = java.util.UUID.randomUUID().toString(),
networkUserId: String = java.util.UUID.randomUUID().toString,
path: String = "",
querystring: Option[String] = None,
refererUri: Option[String] = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,18 @@
* See the Apache License Version 2.0 for the specific language governing permissions and
* limitations there under.
*/
package com.snowplowanalytics.snowplow.enrich.beam
package enrichments
package com.snowplowanalytics.snowplow.enrich.beam.enrichments

import java.nio.file.Paths

import cats.syntax.option._

import io.circe.literal._

import com.snowplowanalytics.snowplow.enrich.beam.{CI, Enrich, SpecHelpers}

import com.spotify.scio.io.PubsubIO
import com.spotify.scio.testing._
import io.circe.literal._

object ApiRequestEnrichmentSpec {
val contexts =
Expand Down Expand Up @@ -51,8 +54,8 @@ class ApiRequestEnrichmentSpec extends PipelineSpec {
"--raw=in",
"--enriched=out",
"--bad=bad",
"--resolver=" + Paths.get(getClass.getResource("/iglu_resolver.json").toURI()),
"--enrichments=" + Paths.get(getClass.getResource("/api_request").toURI())
"--resolver=" + Paths.get(getClass.getResource("/iglu_resolver.json").toURI),
"--enrichments=" + Paths.get(getClass.getResource("/api_request").toURI)
)
.input(PubsubIO.readCoder[Array[Byte]]("in"), raw)
.distCache(DistCacheIO(""), List.empty[Either[String, String]])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,18 @@
* See the Apache License Version 2.0 for the specific language governing permissions and
* limitations there under.
*/
package com.snowplowanalytics.snowplow.enrich.beam
package enrichments
package com.snowplowanalytics.snowplow.enrich.beam.enrichments

import java.nio.file.Paths

import io.circe.literal._

import cats.syntax.option._

import com.spotify.scio.io.PubsubIO
import com.spotify.scio.testing._
import io.circe.literal._

import com.snowplowanalytics.snowplow.enrich.beam.{CI, Enrich, SpecHelpers}

object SqlQueryEnrichmentSpec {
val contexts =
Expand Down Expand Up @@ -48,8 +51,8 @@ class SqlQueryEnrichmentSpec extends PipelineSpec {
"--raw=in",
"--enriched=out",
"--bad=bad",
"--resolver=" + Paths.get(getClass.getResource("/iglu_resolver.json").toURI()),
"--enrichments=" + Paths.get(getClass.getResource("/sql_query").toURI())
"--resolver=" + Paths.get(getClass.getResource("/iglu_resolver.json").toURI),
"--enrichments=" + Paths.get(getClass.getResource("/sql_query").toURI)
)
.input(PubsubIO.readCoder[Array[Byte]]("in"), raw)
.distCache(DistCacheIO(""), List.empty[Either[String, String]])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,27 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.enrich.common
package adapters
package com.snowplowanalytics.snowplow.enrich.common.adapters

import java.time.Instant

import cats.Monad
import cats.data.{NonEmptyList, Validated}

import cats.effect.Clock
import cats.syntax.functor._
import cats.syntax.validated._
import cats.implicits._

import io.circe.Json

import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup
import com.snowplowanalytics.iglu.client.Client

import com.snowplowanalytics.snowplow.badrows._
import io.circe.Json

import loaders.CollectorPayload
import registry._
import registry.snowplow._
import utils.HttpClient
import com.snowplowanalytics.snowplow.enrich.common.adapters.registry._
import com.snowplowanalytics.snowplow.enrich.common.adapters.registry.snowplow.{RedirectAdapter, Tp1Adapter, Tp2Adapter}
import com.snowplowanalytics.snowplow.enrich.common.loaders.CollectorPayload
import com.snowplowanalytics.snowplow.enrich.common.utils.HttpClient

/**
* The AdapterRegistry lets us convert a CollectorPayload into one or more RawEvents, using a given
Expand Down Expand Up @@ -95,14 +97,15 @@ class AdapterRegistry(remoteAdapters: Map[(String, String), RemoteAdapter] = Map
processor: Processor
): F[Validated[BadRow, NonEmptyList[RawEvent]]] =
(adapters.get((payload.api.vendor, payload.api.version)) match {
case Some(adapter) => adapter.toRawEvents(payload, client)
case Some(adapter) =>
adapter.toRawEvents(payload, client)
case _ =>
val f = FailureDetails.AdapterFailure.InputData(
val f: FailureDetails.AdapterFailureOrTrackerProtocolViolation = FailureDetails.AdapterFailure.InputData(
"vendor/version",
Some(s"${payload.api.vendor}/${payload.api.version}"),
"vendor/version combination is not supported"
)
Monad[F].pure(f.invalidNel)
Monad[F].pure(f.invalidNel[NonEmptyList[RawEvent]])
}).map(_.leftMap(enrichFailure(_, payload, payload.api.vendor, payload.api.version, processor)))

private def enrichFailure(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,7 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.enrich.common
package adapters
package registry
package snowplow
package com.snowplowanalytics.snowplow.enrich.common.adapters.registry.snowplow

import cats.Monad
import cats.data.{NonEmptyList, ValidatedNel}
Expand All @@ -26,15 +23,18 @@ import cats.effect.Clock
import io.circe._
import io.circe.syntax._

import com.snowplowanalytics.iglu.client.Client
import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup
import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData}
import com.snowplowanalytics.iglu.core.circe.CirceIgluCodecs._

import com.snowplowanalytics.iglu.client.Client
import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup

import com.snowplowanalytics.snowplow.badrows.FailureDetails

import loaders.CollectorPayload
import utils.{HttpClient, ConversionUtils => CU, JsonUtils => JU}
import com.snowplowanalytics.snowplow.enrich.common.adapters.RawEvent
import com.snowplowanalytics.snowplow.enrich.common.adapters.registry.Adapter
import com.snowplowanalytics.snowplow.enrich.common.loaders.CollectorPayload
import com.snowplowanalytics.snowplow.enrich.common.utils.{HttpClient, JsonUtils => JU, ConversionUtils => CU}

/**
* The Redirect Adapter is essentially a pre-processor for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,25 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.enrich.common
package adapters
package registry
package snowplow
package com.snowplowanalytics.snowplow.enrich.common.adapters.registry.snowplow

import cats.Monad
import cats.data.{NonEmptyList, ValidatedNel}

import cats.effect.Clock
import cats.syntax.validated._

import io.circe.Json

import com.snowplowanalytics.iglu.client.Client
import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup
import com.snowplowanalytics.snowplow.badrows._
import io.circe.Json

import loaders.CollectorPayload
import utils.HttpClient
import com.snowplowanalytics.snowplow.badrows.FailureDetails

import com.snowplowanalytics.snowplow.enrich.common.adapters.RawEvent
import com.snowplowanalytics.snowplow.enrich.common.adapters.registry.Adapter
import com.snowplowanalytics.snowplow.enrich.common.loaders.CollectorPayload
import com.snowplowanalytics.snowplow.enrich.common.utils.HttpClient

/** Version 1 of the Tracker Protocol is GET only. All data comes in on the querystring. */
object Tp1Adapter extends Adapter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,29 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.enrich.common
package adapters
package registry
package snowplow
package com.snowplowanalytics.snowplow.enrich.common.adapters.registry.snowplow

import cats.Monad
import cats.data.{EitherT, NonEmptyList, Validated, ValidatedNel}
import cats.data.Validated._
import cats.implicits._

import cats.effect.Clock

import io.circe.Json

import com.snowplowanalytics.iglu.client.Client
import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup
import com.snowplowanalytics.iglu.core.{SchemaCriterion, SelfDescribingData}
import com.snowplowanalytics.iglu.core.circe.instances._

import com.snowplowanalytics.snowplow.badrows.FailureDetails

import io.circe.Json

import loaders.CollectorPayload
import utils.{HttpClient, JsonUtils => JU}
import com.snowplowanalytics.snowplow.enrich.common.RawEventParameters
import com.snowplowanalytics.snowplow.enrich.common.adapters.RawEvent
import com.snowplowanalytics.snowplow.enrich.common.adapters.registry.Adapter
import com.snowplowanalytics.snowplow.enrich.common.loaders.CollectorPayload
import com.snowplowanalytics.snowplow.enrich.common.utils.{HttpClient, JsonUtils => JU}

/**
* Version 2 of the Tracker Protocol supports GET and POST. Note that with POST, data can still be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,32 +10,35 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.enrich.common
package enrichments
package com.snowplowanalytics.snowplow.enrich.common.enrichments

import cats.Monad
import cats.data.{EitherT, NonEmptyList, ValidatedNel}

import cats.effect.Clock
import cats.implicits._

import io.circe._
import io.circe.syntax._

import com.snowplowanalytics.iglu.client.Client
import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup
import com.snowplowanalytics.iglu.core.{SchemaCriterion, SchemaKey, SelfDescribingData}
import com.snowplowanalytics.iglu.core.circe.instances._

import com.snowplowanalytics.iglu.client.Client
import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup

import com.snowplowanalytics.forex.CreateForex
import com.snowplowanalytics.maxmind.iplookups.CreateIpLookups
import com.snowplowanalytics.refererparser.CreateParser
import com.snowplowanalytics.weather.providers.openweather.CreateOWM

import registry._
import registry.apirequest.ApiRequestEnrichment
import registry.pii.PiiPseudonymizerEnrichment
import registry.sqlquery.SqlQueryEnrichment
import utils.CirceUtils
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf._

import com.snowplowanalytics.snowplow.enrich.common.utils.CirceUtils
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry._
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.apirequest.ApiRequestEnrichment
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.pii.PiiPseudonymizerEnrichment
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.sqlquery.SqlQueryEnrichment

/** Companion which holds a constructor for the EnrichmentRegistry. */
object EnrichmentRegistry {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,22 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.enrich.common
package enrichments.registry
package com.snowplowanalytics.snowplow.enrich.common.enrichments.registry

import cats.data.ValidatedNel
import cats.data.Validated
import java.net.{Inet4Address, Inet6Address}

import scala.util.Try

import cats.data.{Validated, ValidatedNel}
import cats.syntax.either._
import com.snowplowanalytics.iglu.core.{SchemaCriterion, SchemaKey}
import io.circe._

import utils.CirceUtils
import io.circe.Json

import java.net.{Inet4Address, Inet6Address}
import com.google.common.net.{InetAddresses => GuavaInetAddress}
import scala.util.Try
import com.snowplowanalytics.iglu.core.{SchemaCriterion, SchemaKey}

import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf.AnonIpConf
import com.snowplowanalytics.snowplow.enrich.common.utils.CirceUtils

/** Companion object. Lets us create a AnonIpConf from a Json. */
object AnonIpEnrichment extends ParseableEnrichment {
Expand All @@ -32,7 +34,7 @@ object AnonIpEnrichment extends ParseableEnrichment {

/**
* Creates an AnonIpEnrichment instance from a Json.
* @param c The anon_ip enrichment JSON
* @param config The anon_ip enrichment JSON
* @param schemaKey provided for the enrichment, must be supported by this enrichment
* @return an AnonIpEnrichment configuration
*/
Expand Down
Loading

0 comments on commit bfc443f

Please sign in to comment.