Skip to content

Commit

Permalink
Stream FS2: add (close #346)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuwy committed Oct 16, 2020
1 parent 9683e92 commit a91c323
Show file tree
Hide file tree
Showing 46 changed files with 3,600 additions and 20 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,10 @@ jobs:
run: sbt "project kinesis" docker:publish
- name: Build and publish Stream Kafka Docker image
run: sbt "project kafka" docker:publish
- name: Build and publish Stream NSQ Docker images
- name: Build and publish Stream NSQ Docker image
run: sbt "project nsq" docker:publish
- name: Build and publish Stream NH Docker image
run: sbt "project fs2" docker:publish

deploy_sce:
needs: test
Expand Down
52 changes: 51 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
lazy val root = project.in(file("."))
.settings(name := "enrich")
.settings(BuildSettings.basicSettings)
.aggregate(common, beam, stream, kinesis, kafka, nsq, stdin)
.aggregate(common, beam, stream, kinesis, kafka, nsq, stdin, fs2)

lazy val common = project
.in(file("modules/common"))
Expand Down Expand Up @@ -183,3 +183,53 @@ lazy val beam =
.enablePlugins(JavaAppPackaging, DockerPlugin, BuildInfoPlugin)

Global / onChangedBuildSource := ReloadOnSourceChanges

lazy val fs2 = project
.in(file("modules/fs2"))
.dependsOn(common)
.settings(BuildSettings.basicSettings)
.settings(BuildSettings.formatting)
.settings(BuildSettings.scoverageSettings)
.settings(BuildSettings.addExampleConfToTestCp)
.settings(BuildSettings.sbtAssemblySettings)
.settings(
name := "fs2-enrich",
description := "High-performance streaming Snowplow Enrich job built on top of functional streams",
buildInfoKeys := Seq[BuildInfoKey](organization, name, version, description),
buildInfoPackage := "com.snowplowanalytics.snowplow.enrich.fs2.generated",
packageName in Docker := "snowplow/fs2-enrich",
)
.settings(parallelExecution in Test := false)
.settings(
libraryDependencies ++= Seq(
Dependencies.Libraries.decline,
Dependencies.Libraries.fs2PubSub,
Dependencies.Libraries.circeExtras,
Dependencies.Libraries.circeLiteral,
Dependencies.Libraries.circeConfig,
Dependencies.Libraries.catsEffect,
Dependencies.Libraries.fs2,
Dependencies.Libraries.fs2Io,
Dependencies.Libraries.slf4j,
Dependencies.Libraries.sentry,
Dependencies.Libraries.log4cats,
Dependencies.Libraries.catsRetry,
Dependencies.Libraries.http4sClient,
Dependencies.Libraries.fs2BlobS3,
Dependencies.Libraries.fs2BlobGcs,
Dependencies.Libraries.metrics,
Dependencies.Libraries.pureconfig.withRevision(Dependencies.V.pureconfig013),
Dependencies.Libraries.pureconfigCats.withRevision(Dependencies.V.pureconfig013),
Dependencies.Libraries.pureconfigCirce.withRevision(Dependencies.V.pureconfig013),
Dependencies.Libraries.specs2,
Dependencies.Libraries.specs2CE,
Dependencies.Libraries.scalacheck,
Dependencies.Libraries.specs2Scalacheck,
Dependencies.Libraries.http4sDsl,
Dependencies.Libraries.http4sServer
),
addCompilerPlugin("com.olegpy" %% "better-monadic-for" % "0.3.1")
)
.enablePlugins(BuildInfoPlugin)
.settings(BuildSettings.dockerSettings)
.enablePlugins(BuildInfoPlugin, JavaAppPackaging, DockerPlugin)
24 changes: 24 additions & 0 deletions config/config.fs2.hocon.sample
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
auth = {
type = "Gcp"
projectId = "test-project"
}
input = {
type = "PubSub"
subscriptionId = "inputSub"
}
good = {
type = "PubSub"
topic = "good-topic"
}
bad = {
type = "PubSub"
topic = "bad-topic"
}

assetsUpdatePeriod = "7 days"

sentry = {
dsn = "http://sentry.acme.com"
}

metricsReportPeriod = "1 second"
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,9 @@
package com.snowplowanalytics.snowplow.enrich.common.adapters.registry

import cats.Monad
import cats.data.{NonEmptyList, ValidatedNel}
import cats.data.{NonEmptyList, Validated, ValidatedNel}
import cats.data.Validated._
import cats.syntax.either._
import cats.syntax.eq._
import cats.syntax.option._
import cats.syntax.validated._
import cats.implicits._

import cats.effect.Clock

Expand Down Expand Up @@ -265,17 +262,11 @@ trait Adapter {
* or Failures
*/
protected[registry] def rawEventsListProcessor(
rawEventsList: List[ValidatedNel[FailureDetails.AdapterFailure, RawEvent]]
rawEventsList: List[Validated[NonEmptyList[FailureDetails.AdapterFailure], RawEvent]]
): ValidatedNel[FailureDetails.AdapterFailure, NonEmptyList[RawEvent]] = {
val successes: List[RawEvent] =
for {
Valid(s) <- rawEventsList
} yield s

val failures: List[FailureDetails.AdapterFailure] =
(for {
Invalid(NonEmptyList(h, t)) <- rawEventsList
} yield h :: t).flatten
val (failures, successes) = rawEventsList.separate match {
case (nel, list) => (nel.flatMap(_.toList), list)
}

(successes, failures) match {
// No Failures collected.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import cats.{Id, Monad}
import cats.data.{EitherT, NonEmptyList, ValidatedNel}
import cats.implicits._

import cats.effect.Sync

import io.circe._
import io.circe.generic.auto._

Expand All @@ -26,7 +28,6 @@ import com.snowplowanalytics.iglu.core.circe.implicits._

import com.snowplowanalytics.lrumap._
import com.snowplowanalytics.snowplow.badrows.FailureDetails

import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.{Enrichment, ParseableEnrichment}
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf.ApiRequestConf
import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent
Expand Down Expand Up @@ -231,4 +232,24 @@ object CreateApiRequestEnrichment {
)
)
}

implicit def syncCreateApiRequestEnrichment[F[_]: Sync](
implicit CLM: CreateLruMap[F, String, (Either[Throwable, Json], Long)],
HTTP: HttpClient[F]
): CreateApiRequestEnrichment[F] =
new CreateApiRequestEnrichment[F] {
def create(conf: ApiRequestConf): F[ApiRequestEnrichment[F]] =
CLM
.create(conf.cache.size)
.map(c =>
ApiRequestEnrichment(
conf.schemaKey,
conf.inputs,
conf.api,
conf.outputs,
conf.cache.ttl,
c
)
)
}
}
Loading

0 comments on commit a91c323

Please sign in to comment.