Skip to content

Commit

Permalink
Common: replace json4s with circe (close snowplow/snowplow#3602)
Browse files Browse the repository at this point in the history
  • Loading branch information
BenFradet authored and chuwy committed Jun 13, 2020
1 parent cbf0da4 commit 4d0b947
Show file tree
Hide file tree
Showing 152 changed files with 7,871 additions and 7,811 deletions.
2 changes: 1 addition & 1 deletion .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
style = default
align = none
maxColumn = 120
maxColumn = 100
docstrings = JavaDoc
optIn.breakChainOnFirstMethodDot = true
spaces.afterKeywordBeforeParen = true
Expand Down
10 changes: 5 additions & 5 deletions common.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
lazy val root = project
.in(file("."))
.settings(
name := "snowplow-common-enrich",
version := "0.38.0",
name := "snowplow-common-enrich",
version := "0.38.0",
description := "Common functionality for enriching raw Snowplow events"
)
.settings(BuildSettings.formatting)
Expand Down Expand Up @@ -47,14 +47,14 @@ lazy val root = project
Dependencies.Libraries.yauaa,
Dependencies.Libraries.kryo,
// Scala
Dependencies.Libraries.circeOptics,
Dependencies.Libraries.circeJackson,
Dependencies.Libraries.scalaz7,
Dependencies.Libraries.snowplowRawEvent,
Dependencies.Libraries.collectorPayload,
Dependencies.Libraries.schemaSniffer,
Dependencies.Libraries.refererParser,
Dependencies.Libraries.maxmindIplookups,
Dependencies.Libraries.json4sJackson,
Dependencies.Libraries.json4sScalaz,
Dependencies.Libraries.igluClient,
Dependencies.Libraries.scalaUri,
Dependencies.Libraries.scalaForex,
Expand All @@ -67,5 +67,5 @@ lazy val root = project
Dependencies.Libraries.scalaCheck,
Dependencies.Libraries.scaldingArgs,
Dependencies.Libraries.mockito
)
) ++ Dependencies.Libraries.circeDeps
)
Original file line number Diff line number Diff line change
Expand Up @@ -16,102 +16,64 @@ import scalaz._

/**
* The parent for our ETL-specific exceptions
*
* Note that the SnowPlow ETL does **not**
* use exceptions for control flow - it uses
* Scalaz Validation and ValidationNel objects.
*
* However two types of exception we do support
* are:
*
* 1. FatalEtlException - should always cause
* the ETL to die
* 2. UnexpectedEtlException - ETL may die or
* continue, depending on the ETL config
* Note that the SnowPlow ETL does **not** use exceptions for control flow - it uses Scalaz
* Validation and ValidationNel objects.
* However two types of exception we do support are:
* 1. FatalEtlException - should always cause the ETL to die
* 2. UnexpectedEtlException - ETL may die or continue, depending on the ETL config
*/
sealed class EtlException(msg: String) extends RuntimeException(msg)

/**
* Holds ways of constructing the
* exception message from a Scalaz
* Validation or ValidatioNel.
*
* Mixed into the companion objects
* for the exceptions below.
* Holds ways of constructing the exception message from a Scalaz Validation or ValidatioNel.
* Mixed into the companion objects for the exceptions below.
*/
trait EtlExceptionConstructors[E <: EtlException] {

// Structured type lets us pass in
// a factory to construct our E
self: {
val fac: (String => E)
} =>

/**
* Alternative constructor for
* the companion object.
*
* Converts a Scalaz
* NonEmptyList[String] into a single
* String error message.
*
* @param errs The list of
* error messages
* @return a new EtlException of
* type E
* Alternative constructor for the companion object.
* Converts a Scalaz NonEmptyList[String] into a single String error message.
* @param errs The list of error messages
* @return a new EtlException of type E
*/
def apply(errs: NonEmptyList[String]): E =
apply(errs.list)

/**
* Alternative constructor for
* the companion object.
*
* Converts a List[String] into
* a single String error message.
*
* @param errs The list of
* error messages
* @return a new EtlException of
* type E
* Alternative constructor for the companion object.
* Converts a List[String] into a single String error message.
* @param errs The list of error messages
* @return a new EtlException of type E
*/
def apply(errs: List[String]): E =
fac(formatErrors(errs))

/**
* A helper to format the list of
* error messages.
*
* @param errs The list of error
* messages
* @return a nicely formatted
* error String
* A helper to format the list of error messages.
* @param errs The list of error messages
* @return a nicely formatted error String
*/
private def formatErrors(errs: List[String]): String =
"EtlException Errors:\n - %s".format(errs.mkString("\n - "))
}

/**
* Companion object for
* FatalEtlException
*
* Contains an apply() constructor
* which takes a Scalaz
* NonEmptyList[String] - see
* ValidationConstructors trait
* for details.
* Companion object for FatalEtlException
* Contains an apply() constructor which takes a Scalaz NonEmptyList[String] - see
* ValidationConstructors trait for details.
*/
object FatalEtlException extends EtlExceptionConstructors[FatalEtlException] {
val fac = (msg: String) => FatalEtlException(msg)
}

/**
* Companion object for
* FatalEtlError
*/
/** Companion object for FatalEtlError */
// TODO: delete when Cascading FailureTrap supports exclusions
object FatalEtlError {

def apply(errs: NonEmptyList[String]): FatalEtlError =
apply(errs.list)

Expand All @@ -123,56 +85,26 @@ object FatalEtlError {
}

/**
* Companion object for
* UnexpectedEtlException
*
* Contains an apply() constructor
* which takes a Scalaz
* NonEmptyList[String] - see
* ValidationConstructors trait
* for details.
* Companion object for UnexpectedEtlException
* Contains an apply() constructor which takes a Scalaz NonEmptyList[String] - see
* ValidationConstructors trait for details.
*/
object UnexpectedEtlException extends EtlExceptionConstructors[UnexpectedEtlException] {
val fac = (msg: String) => UnexpectedEtlException(msg)
}

/**
* A fatal exception in our ETL.
*
* Will only be thrown if the ETL cannot
* feasibly be run - **do not** try to catch
* it, or a kitten dies.
*
* This should be explicitly excluded from
* Cascading Failure Traps, as soon as they
* support this (Cascading 2.2).
* A fatal exception in our ETL. Will only be thrown if the ETL cannot feasibly be run - **do not**
* try to catch it, or a kitten dies.
*/
case class FatalEtlException(msg: String) extends EtlException(msg)

/**
* A fatal error in our ETL.
*
* We are using this as a workaround:
* because Cascading cannot yet support
* excluding a specific Exception subclass
* (e.g. FatalEtlException) from a Failure
* Trap, we need to throw an Error instead.
*
* For details see:
* https://groups.google.com/forum/?fromgroups=#!topic/cascading-user/Ld5sg1baOyc
*/
// TODO: delete when Cascading FailureTrap supports exclusions
/** A fatal error in our ETL. */
case class FatalEtlError(msg: String) extends Error(msg)

/**
* An unexpected exception in our
* ETL.
*
* Will be thrown in the event of
* an unexpected exception. How to
* handle it will depend on the
* setting of the Continue On
* Unexpected Error? flag passed in
* to the ETL.
* An unexpected exception in our ETL.
* Will be thrown in the event of an unexpected exception. How to handle it will depend on the
* setting of the Continue On Unexpected Error? flag passed in to the ETL.
*/
case class UnexpectedEtlException(msg: String) extends EtlException(msg)
Original file line number Diff line number Diff line change
Expand Up @@ -25,42 +25,33 @@ import adapters.AdapterRegistry
import enrichments.{EnrichmentManager, EnrichmentRegistry}
import outputs.EnrichedEvent

/**
* Expresses the end-to-end event pipeline
* supported by the Scala Common Enrich
* project.
*/
/** Expresses the end-to-end event pipeline supported by the Scala Common Enrich project. */
object EtlPipeline {

/**
* A helper method to take a ValidatedMaybeCanonicalInput
* and transform it into a List (possibly empty) of
* ValidatedCanonicalOutputs.
*
* We have to do some unboxing because enrichEvent
* expects a raw CanonicalInput as its argument, not
* a MaybeCanonicalInput.
*
* A helper method to take a ValidatedMaybeCanonicalInput and transform it into a List (possibly
* empty) of ValidatedCanonicalOutputs.
* We have to do some unboxing because enrichEvent expects a raw CanonicalInput as its argument,
* not a MaybeCanonicalInput.
* @param adapterRegistry Contains all of the events adapters
* @param enrichmentRegistry Contains configuration for all
* enrichments to apply
* @param enrichmentRegistry Contains configuration for all enrichments to apply
* @param etlVersion The ETL version
* @param etlTstamp The ETL timestamp
* @param input The ValidatedMaybeCanonicalInput
* @param resolver (implicit) The Iglu resolver used for
* schema lookup and validation
* @return the ValidatedMaybeCanonicalOutput. Thanks to
* flatMap, will include any validation errors
* contained within the ValidatedMaybeCanonicalInput
* @param resolver (implicit) The Iglu resolver used for schema lookup and validation
* @return the ValidatedMaybeCanonicalOutput. Thanks to flatMap, will include any validation
* errors contained within the ValidatedMaybeCanonicalInput
*/
def processEvents(
adapterRegistry: AdapterRegistry,
enrichmentRegistry: EnrichmentRegistry,
etlVersion: String,
etlTstamp: DateTime,
input: ValidatedMaybeCollectorPayload)(implicit resolver: Resolver): List[ValidatedEnrichedEvent] = {

def flattenToList[A](v: Validated[Option[Validated[NonEmptyList[Validated[A]]]]]): List[Validated[A]] = v match {
input: ValidatedMaybeCollectorPayload)(
implicit resolver: Resolver
): List[ValidatedEnrichedEvent] = {
def flattenToList[A](
v: Validated[Option[Validated[NonEmptyList[Validated[A]]]]]): List[Validated[A]] = v match {
case Success(Some(Success(nel))) => nel.toList
case Success(Some(Failure(f))) => List(f.fail)
case Failure(f) => List(f.fail)
Expand All @@ -80,7 +71,11 @@ object EtlPipeline {
} yield
for {
event <- events
enriched = EnrichmentManager.enrichEvent(enrichmentRegistry, etlVersion, etlTstamp, event)
enriched = EnrichmentManager.enrichEvent(
enrichmentRegistry,
etlVersion,
etlTstamp,
event)
} yield enriched

flattenToList[EnrichedEvent](e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ import Scalaz._

import loaders.CollectorPayload
import registry._
import registry.snowplow.{Tp1Adapter => SpTp1Adapter, Tp2Adapter => SpTp2Adapter, RedirectAdapter => SpRedirectAdapter}
import registry.snowplow.{
Tp1Adapter => SpTp1Adapter,
Tp2Adapter => SpTp2Adapter,
RedirectAdapter => SpRedirectAdapter
}

/**
* The AdapterRegistry lets us convert a CollectorPayload
Expand All @@ -28,26 +32,26 @@ import registry.snowplow.{Tp1Adapter => SpTp1Adapter, Tp2Adapter => SpTp2Adapter
class AdapterRegistry(remoteAdapters: Map[(String, String), RemoteAdapter] = Map.empty) {

val adapters: Map[(String, String), Adapter] = Map(
(Vendor.Snowplow, "tp1") -> SpTp1Adapter,
(Vendor.Snowplow, "tp2") -> SpTp2Adapter,
(Vendor.Redirect, "tp2") -> SpRedirectAdapter,
(Vendor.Iglu, "v1") -> IgluAdapter,
(Vendor.Callrail, "v1") -> CallrailAdapter,
(Vendor.Snowplow, "tp1") -> SpTp1Adapter,
(Vendor.Snowplow, "tp2") -> SpTp2Adapter,
(Vendor.Redirect, "tp2") -> SpRedirectAdapter,
(Vendor.Iglu, "v1") -> IgluAdapter,
(Vendor.Callrail, "v1") -> CallrailAdapter,
(Vendor.Cloudfront, "wd_access_log") -> CloudfrontAccessLogAdapter.WebDistribution,
(Vendor.Mailchimp, "v1") -> MailchimpAdapter,
(Vendor.Mailgun, "v1") -> MailgunAdapter,
(Vendor.GoogleAnalytics, "v1") -> GoogleAnalyticsAdapter,
(Vendor.Mandrill, "v1") -> MandrillAdapter,
(Vendor.Olark, "v1") -> OlarkAdapter,
(Vendor.Pagerduty, "v1") -> PagerdutyAdapter,
(Vendor.Pingdom, "v1") -> PingdomAdapter,
(Vendor.Sendgrid, "v3") -> SendgridAdapter,
(Vendor.StatusGator, "v1") -> StatusGatorAdapter,
(Vendor.Unbounce, "v1") -> UnbounceAdapter,
(Vendor.UrbanAirship, "v1") -> UrbanAirshipAdapter,
(Vendor.Marketo, "v1") -> MarketoAdapter,
(Vendor.Vero, "v1") -> VeroAdapter,
(Vendor.HubSpot, "v1") -> HubSpotAdapter
(Vendor.Mailchimp, "v1") -> MailchimpAdapter,
(Vendor.Mailgun, "v1") -> MailgunAdapter,
(Vendor.GoogleAnalytics, "v1") -> GoogleAnalyticsAdapter,
(Vendor.Mandrill, "v1") -> MandrillAdapter,
(Vendor.Olark, "v1") -> OlarkAdapter,
(Vendor.Pagerduty, "v1") -> PagerdutyAdapter,
(Vendor.Pingdom, "v1") -> PingdomAdapter,
(Vendor.Sendgrid, "v3") -> SendgridAdapter,
(Vendor.StatusGator, "v1") -> StatusGatorAdapter,
(Vendor.Unbounce, "v1") -> UnbounceAdapter,
(Vendor.UrbanAirship, "v1") -> UrbanAirshipAdapter,
(Vendor.Marketo, "v1") -> MarketoAdapter,
(Vendor.Vero, "v1") -> VeroAdapter,
(Vendor.HubSpot, "v1") -> HubSpotAdapter
) ++ remoteAdapters

/**
Expand Down Expand Up @@ -105,7 +109,8 @@ class AdapterRegistry(remoteAdapters: Map[(String, String), RemoteAdapter] = Map
case (Vendor.Redirect, "tp2") => SpRedirectAdapter.toRawEvents(payload)
case (Vendor.Iglu, "v1") => IgluAdapter.toRawEvents(payload)
case (Vendor.Callrail, "v1") => CallrailAdapter.toRawEvents(payload)
case (Vendor.Cloudfront, "wd_access_log") => CloudfrontAccessLogAdapter.WebDistribution.toRawEvents(payload)
case (Vendor.Cloudfront, "wd_access_log") =>
CloudfrontAccessLogAdapter.WebDistribution.toRawEvents(payload)
case (Vendor.Mailchimp, "v1") => MailchimpAdapter.toRawEvents(payload)
case (Vendor.Mailgun, "v1") => MailgunAdapter.toRawEvents(payload)
case (Vendor.GoogleAnalytics, "v1") => GoogleAnalyticsAdapter.toRawEvents(payload)
Expand Down
Loading

0 comments on commit 4d0b947

Please sign in to comment.