From 2a6185b5d4b10279b96aecbe11d52c404d2135ee Mon Sep 17 00:00:00 2001 From: Anton Parkhomenko Date: Tue, 1 Sep 2020 14:35:13 +0300 Subject: [PATCH] Stream NH: add (close #346) --- build.sbt | 39 ++++ config/config.fs2.hocon.sample | 32 +++ .../common/adapters/registry/Adapter.scala | 21 +- .../apirequest/ApiRequestEnrichment.scala | 23 +- .../snowplow/enrich/fs2/AssetsRefresh.scala | 208 ++++++++++++++++++ .../snowplow/enrich/fs2/Enrich.scala | 132 +++++++++++ .../snowplow/enrich/fs2/Environment.scala | 139 ++++++++++++ .../snowplow/enrich/fs2/Main.scala | 63 ++++++ .../snowplow/enrich/fs2/Payload.scala | 77 +++++++ .../enrich/fs2/config/Base64Hocon.scala | 53 +++++ .../enrich/fs2/config/Base64Json.scala | 46 ++++ .../enrich/fs2/config/CliConfig.scala | 70 ++++++ .../enrich/fs2/config/ConfigFile.scala | 73 ++++++ .../snowplow/enrich/fs2/config/io.scala | 44 ++++ .../snowplow/enrich/fs2/config/package.scala | 27 +++ .../snowplow/enrich/fs2/io/Clients.scala | 132 +++++++++++ .../snowplow/enrich/fs2/io/FileSystem.scala | 46 ++++ .../snowplow/enrich/fs2/io/Sinks.scala | 83 +++++++ .../snowplow/enrich/fs2/io/Source.scala | 57 +++++ .../snowplow/enrich/fs2/io/package.scala | 31 +++ .../snowplow/enrich/fs2/package.scala | 36 +++ .../test/resources/simplelogger.properties | 9 + .../enrich/fs2/AssetsRefreshSpec.scala | 208 ++++++++++++++++++ .../snowplow/enrich/fs2/EnrichSpec.scala | 85 +++++++ .../snowplow/enrich/fs2/SpecHelpers.scala | 67 ++++++ .../enrich/fs2/config/CliConfigSpec.scala | 78 +++++++ project/Dependencies.scala | 28 ++- 27 files changed, 1890 insertions(+), 17 deletions(-) create mode 100644 config/config.fs2.hocon.sample create mode 100644 modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/AssetsRefresh.scala create mode 100644 modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/Enrich.scala create mode 100644 modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/Environment.scala create mode 100644 modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/Main.scala create mode 100644 modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/Payload.scala create mode 100644 modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/config/Base64Hocon.scala create mode 100644 modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/config/Base64Json.scala create mode 100644 modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/config/CliConfig.scala create mode 100644 modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/config/ConfigFile.scala create mode 100644 modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/config/io.scala create mode 100644 modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/config/package.scala create mode 100644 modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/io/Clients.scala create mode 100644 modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/io/FileSystem.scala create mode 100644 modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/io/Sinks.scala create mode 100644 modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/io/Source.scala create mode 100644 modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/io/package.scala create mode 100644 modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/package.scala create mode 100644 modules/fs2/src/test/resources/simplelogger.properties create mode 100644 modules/fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/fs2/AssetsRefreshSpec.scala create mode 100644 modules/fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/fs2/EnrichSpec.scala create mode 100644 modules/fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/fs2/SpecHelpers.scala create mode 100644 modules/fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/fs2/config/CliConfigSpec.scala diff --git a/build.sbt b/build.sbt index 09db15707..2bfebe806 100644 --- a/build.sbt +++ b/build.sbt @@ -178,6 +178,45 @@ lazy val beam = ) .enablePlugins(JavaAppPackaging, DockerPlugin, BuildInfoPlugin) +lazy val fs2 = project + .in(file("modules/fs2")) + .dependsOn(common) + .settings(BuildSettings.basicSettings) + .settings(BuildSettings.formatting) + .settings(BuildSettings.scoverageSettings) + .settings( + name := "fs2-enrich", + description := "High-performance streaming Snowplow Enrich job built on top of functional streams", + buildInfoKeys := Seq[BuildInfoKey](organization, name, version, "sceVersion" -> version.value, description), + buildInfoPackage := "com.snowplowanalytics.snowplow.enrich.fs2.generated", + ) + .settings( + libraryDependencies ++= Seq( + Dependencies.Libraries.decline, + Dependencies.Libraries.fs2PubSub, + Dependencies.Libraries.circeExtras, + Dependencies.Libraries.circeLiteral, + Dependencies.Libraries.circeConfig, + Dependencies.Libraries.fs2, + Dependencies.Libraries.fs2Io, + Dependencies.Libraries.slf4j, + Dependencies.Libraries.sentry, + Dependencies.Libraries.log4cats, + Dependencies.Libraries.catsRetry, + Dependencies.Libraries.http4sClient, + Dependencies.Libraries.fs2BlobS3, + Dependencies.Libraries.fs2BlobGcs, + Dependencies.Libraries.pureconfig, + Dependencies.Libraries.pureconfigCats, + Dependencies.Libraries.pureconfigCirce, + Dependencies.Libraries.specs2, + Dependencies.Libraries.http4sDsl, + Dependencies.Libraries.http4sServer + ), + addCompilerPlugin("com.olegpy" %% "better-monadic-for" % "0.3.1") + ) + .enablePlugins(BuildInfoPlugin) + lazy val integrationTests = project .in(file("modules/integration-tests")) .settings(moduleName := "integration-tests") diff --git a/config/config.fs2.hocon.sample b/config/config.fs2.hocon.sample new file mode 100644 index 000000000..90b955e22 --- /dev/null +++ b/config/config.fs2.hocon.sample @@ -0,0 +1,32 @@ +raw = { + type = "pubsub" + project = "project" + subscription = "subscription" +} + +// raw = { +// type = "fs" +// path = "/Users/chuwy/enriched" +// } + +good = { + type = "pubsub" + project = "project" + subscription = "subscription" +} + +bad = { + type = "pubsub" + project = "project" + subscription = "subscription" +} + +pii = { + type = "pubsub" + project = "project" + subscription = "subscription" +} + +sentry = { + dsn = "http://smth" +} \ No newline at end of file diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/Adapter.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/Adapter.scala index 844177543..4bb871ee0 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/Adapter.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/Adapter.scala @@ -13,12 +13,9 @@ package com.snowplowanalytics.snowplow.enrich.common.adapters.registry import cats.Monad -import cats.data.{NonEmptyList, ValidatedNel} +import cats.data.{NonEmptyList, Validated, ValidatedNel} import cats.data.Validated._ -import cats.syntax.either._ -import cats.syntax.eq._ -import cats.syntax.option._ -import cats.syntax.validated._ +import cats.implicits._ import cats.effect.Clock @@ -265,17 +262,11 @@ trait Adapter { * or Failures */ protected[registry] def rawEventsListProcessor( - rawEventsList: List[ValidatedNel[FailureDetails.AdapterFailure, RawEvent]] + rawEventsList: List[Validated[NonEmptyList[FailureDetails.AdapterFailure], RawEvent]] ): ValidatedNel[FailureDetails.AdapterFailure, NonEmptyList[RawEvent]] = { - val successes: List[RawEvent] = - for { - Valid(s) <- rawEventsList - } yield s - - val failures: List[FailureDetails.AdapterFailure] = - (for { - Invalid(NonEmptyList(h, t)) <- rawEventsList - } yield h :: t).flatten + val (failures, successes) = rawEventsList.separate match { + case (nel, list) => (nel.flatMap(_.toList), list) + } (successes, failures) match { // No Failures collected. diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/apirequest/ApiRequestEnrichment.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/apirequest/ApiRequestEnrichment.scala index 9b9ed39b8..2d2341897 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/apirequest/ApiRequestEnrichment.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/apirequest/ApiRequestEnrichment.scala @@ -18,6 +18,8 @@ import cats.{Id, Monad} import cats.data.{EitherT, NonEmptyList, ValidatedNel} import cats.implicits._ +import cats.effect.Sync + import io.circe._ import io.circe.generic.auto._ @@ -26,7 +28,6 @@ import com.snowplowanalytics.iglu.core.circe.implicits._ import com.snowplowanalytics.lrumap._ import com.snowplowanalytics.snowplow.badrows.FailureDetails - import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.{Enrichment, ParseableEnrichment} import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf.ApiRequestConf import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent @@ -231,4 +232,24 @@ object CreateApiRequestEnrichment { ) ) } + + implicit def syncCreateApiRequestEnrichment[F[_]: Sync]( + implicit CLM: CreateLruMap[F, String, (Either[Throwable, Json], Long)], + HTTP: HttpClient[F] + ): CreateApiRequestEnrichment[F] = + new CreateApiRequestEnrichment[F] { + def create(conf: ApiRequestConf): F[ApiRequestEnrichment[F]] = + CLM + .create(conf.cache.size) + .map(c => + ApiRequestEnrichment( + conf.schemaKey, + conf.inputs, + conf.api, + conf.outputs, + conf.cache.ttl, + c + ) + ) + } } diff --git a/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/AssetsRefresh.scala b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/AssetsRefresh.scala new file mode 100644 index 000000000..21db34d5c --- /dev/null +++ b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/AssetsRefresh.scala @@ -0,0 +1,208 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.fs2 + +import java.net.URI +import java.nio.file.{Path, Paths} + +import scala.concurrent.duration._ +import scala.util.control.NonFatal + +import cats.{Applicative, Parallel} +import cats.implicits._ + +import cats.effect.{Blocker, ConcurrentEffect, ContextShift, Sync, Timer} +import cats.effect.concurrent.Ref + +import retry.{RetryDetails, RetryPolicies, RetryPolicy, retryingOnSomeErrors} + +import fs2.Stream +import fs2.hash.md5 +import fs2.io.file.{copy, deleteIfExists, tempFileResource, writeAll} + +import _root_.io.chrisdavenport.log4cats.Logger +import _root_.io.chrisdavenport.log4cats.slf4j.Slf4jLogger + +import com.snowplowanalytics.snowplow.enrich.fs2.io.Clients + +/** + * Functions responsible for periodic assets (such as MaxMind/IAB DBs) updates + * The common logic is to periodically invoke a function that: + * 1. Downloads a file (in background) to a temp location + * 2. Compares file's checksum with existing one (stored in a mutable hashmap) + * 3. If checksums match - delete the temp file, return + * 4. If checksums don't match - send a signal to stop raw stream + * (via `SignallingRef` in [[Environment]]) + * 5. Once raw stream is stopped - delete an old file and move + * temp file to the old's file location + * If any of those URIs been updated and stopped the raw stream, it will be + * immediately resumed once the above procedure traversed all files + */ +object AssetsRefresh { + + private implicit def unsafeLogger[F[_]: Sync]: Logger[F] = + Slf4jLogger.getLogger[F] + + /** + * State of the [[updateStream]], containing information about tracked URIs + * and `stop` signal from [[Environment]] as well as all clients necessary + * to download URIs + * + * @param files mutable hash map of URIs and their latest known state + * @param stop stop signal coming from [[Environment]] and that can be used + * to stop the raw stream consumption + * @param clients HTTP, GCS, S3 clients if necessary + */ + case class State[F[_]]( + files: Ref[F, Map[URI, Hash]], + stop: Ref[F, Boolean], + clients: Clients[F] + ) + + case class Hash private (s: String) extends AnyVal + + object Hash { + def apply(bytes: Array[Byte]): Hash = { + val bi = new java.math.BigInteger(1, bytes) + Hash(String.format("%0" + (bytes.length << 1) + "x", bi)) + } + } + + /** Pair of a tracker `URI` and destination path on local FS (`java.nio.file.Path` is not serializable) */ + type Asset = (URI, String) + + /** Initialise the [[updateStream]] with all necessary resources if refresh period is specified */ + def run[F[_]: ConcurrentEffect: ContextShift: Timer: Parallel](environment: Environment[F]): Stream[F, Unit] = + environment.config.assetsUpdatePeriod match { + case Some(duration) => + val init = for { + files <- Ref.of(Map.empty[URI, Hash]) + curDir <- Sync[F].delay(Paths.get("").toAbsolutePath) + _ <- Logger[F].info("Initializing AssetsRefresh stream") + uris = environment.enrichments.configs.flatMap(_.filesToCache).map(_._1) + stream = for { + clients <- Stream.resource(Clients.initialize[F](environment.blocker, uris)) + state = State(files, environment.stop, clients) + assets = environment.enrichments.configs.flatMap(_.filesToCache) + _ <- updateStream[F](environment.blocker, state, curDir, duration, assets) + } yield () + } yield stream + Stream.eval(init).flatten + case None => + Stream.empty.covary[F] + } + + /** + * At the end of every update, the stop signal will be resumed to `false` + * Create an update stream that ticks periodically and can invoke an update action, + * which will download an URI and check if it has been update. If it has the + * raw stream will be stopped via `stop` signal from [[Environment]] and assets updated + */ + def updateStream[F[_]: ConcurrentEffect: ContextShift: Parallel: Timer]( + blocker: Blocker, + state: State[F], + curDir: Path, + duration: FiniteDuration, + assets: List[Asset] + ): Stream[F, Unit] = + Stream.awakeEvery[F](duration).evalMap { _ => + val log = Logger[F].debug(show"Ticking with ${assets.map(_._2)}") + val updates = assets.parTraverse { + case (uri, path) => + update(blocker, state, curDir)(uri, Paths.get(path)) + } + log *> updates.map(_.contains(true)).flatMap { stopped => + if (stopped) Logger[F].info("Resuming signalling ref") *> state.stop.set(false) else Sync[F].unit + } + } + + /** + * Update a file in current directory if it has been updated on remote storage + * If a new file has been discovered - stops the enriching streams (signal in `state`) + * Do nothing if file hasn't been updated + * + * Note: this function has a potential to be thread-unsafe if download time + * exceeds tick period. We assume that no two threads will be downloading the same URI + * + * @param blocker a thread pool to execute download/copy operations + * @param state a map of URI to MD5 hash to keep track latest state of remote files + * @param curDir a local FS destination for temporary files + * @param uri a remote file (S3, GCS or HTTP), the URI is used as an identificator + * @param path a static file name that enrich clients will access + * file itself is placed in current dir (`dir`) + * @return true if file has been updated + */ + def update[F[_]: ConcurrentEffect: ContextShift: Timer]( + blocker: Blocker, + state: State[F], + curDir: Path + )( + uri: URI, + path: Path + ): F[Boolean] = + tempFileResource[F](blocker, curDir).use { tmp => + // Set stop signal and replace old file with temporary + def stopAndCopy(hash: Hash, delete: Boolean): F[Unit] = + for { + _ <- Logger[F].info(s"Discovered new data at $uri, stopping signalling ref") + _ <- state.stop.set(true) + _ <- if (delete) { + val deleted = Logger[F].info(s"Deleted outdated $path") + val notDeleted = Logger[F].warn(s"Couldn't delete $path, file didn't exist") + deleteIfExists(blocker, path).ifM(deleted, notDeleted) + } else Sync[F].unit + _ <- copy(blocker, tmp, path) + _ <- state.files.update(_.updated(uri, hash)) + } yield () + + val data = state.clients.download(uri).observe(writeAll(tmp, blocker)).through(md5) + for { + _ <- Logger[F].info(s"Downloading $uri") + hash <- retryDownload(data.compile.to(Array)).map(Hash.apply) + localFiles <- state.files.get + updated <- localFiles.get(uri) match { + case Some(known) if known == hash => + Sync[F].pure(false) + case Some(_) => + stopAndCopy(hash, true).as(true) + case None => + stopAndCopy(hash, false).as(true) + } + } yield updated + } + + def retryDownload[F[_]: Sync: Timer, A](download: F[A]): F[A] = + retryingOnSomeErrors[A](retryPolicy[F], worthRetrying, onError[F])(download) + + def retryPolicy[F[_]: Applicative]: RetryPolicy[F] = + RetryPolicies.fullJitter[F](1500.milliseconds).join(RetryPolicies.limitRetries[F](5)) + + def worthRetrying(e: Throwable): Boolean = + e match { + case _: Clients.DownloadingFailure => true + case _: IllegalArgumentException => false + case NonFatal(_) => false + } + + def onError[F[_]: Sync](error: Throwable, details: RetryDetails): F[Unit] = + if (details.givingUp) + Logger[F].error(show"Failed to download an asset after ${details.retriesSoFar}. ${error.getMessage}. Aborting the job") + else if (details.retriesSoFar == 0) + Logger[F].warn(show"Failed to download an asset. ${error.getMessage}. Keep retrying") + else + Logger[F].warn( + show"Failed to download an asset after ${details.retriesSoFar} retries, " + + show"waiting for ${details.cumulativeDelay.toMillis} ms. ${error.getMessage}. " + + show"Keep retrying" + ) +} diff --git a/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/Enrich.scala b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/Enrich.scala new file mode 100644 index 000000000..da45fc412 --- /dev/null +++ b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/Enrich.scala @@ -0,0 +1,132 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.fs2 + +import java.time.Instant +import java.util.Base64 +import java.util.concurrent.TimeUnit + +import org.joda.time.DateTime + +import cats.data.{NonEmptyList, ValidatedNel} +import cats.implicits._ + +import cats.effect.{Clock, Concurrent, ContextShift, Sync} + +import fs2.Stream + +import _root_.io.sentry.SentryClient + +import _root_.io.circe.Json +import _root_.io.circe.syntax._ + +import _root_.io.chrisdavenport.log4cats.Logger +import _root_.io.chrisdavenport.log4cats.slf4j.Slf4jLogger + +import com.snowplowanalytics.iglu.client.Client + +import com.snowplowanalytics.snowplow.badrows.{Processor, BadRow, Failure, Payload => BadRowPayload} + +import com.snowplowanalytics.snowplow.enrich.common.EtlPipeline +import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent +import com.snowplowanalytics.snowplow.enrich.common.adapters.AdapterRegistry +import com.snowplowanalytics.snowplow.enrich.common.loaders.{CollectorPayload, ThriftLoader} +import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry + +object Enrich { + + /** Default adapter registry, can be constructed dynamically in future */ + val adapterRegistry = new AdapterRegistry() + + val processor: Processor = Processor(generated.BuildInfo.name, generated.BuildInfo.version) + + private implicit def unsafeLogger[F[_]: Sync]: Logger[F] = + Slf4jLogger.getLogger[F] + + /** + * Run a primary enrichment stream, reading from [[Environment]] source, enriching + * via [[enrichWith]] and sinking into [[GoodSink]] and [[BadSink]] respectively. + * Can be stopped via _stop signal_ from [[Environment]] + */ + def run[F[_]: Concurrent: ContextShift: Clock](env: Environment[F]): Stream[F, Unit] = { + val enrich: Enrich[F] = enrichWith[F](env.enrichments.registry, env.resolver, env.sentry) + env.source + .pauseWhen(env.stop) + .parEvalMapUnordered(16)(payload => env.blocker.blockOn(enrich(payload))) + .flatMap(_.decompose[BadRow, EnrichedEvent]) + .observeEither(env.bad, env.good) + .void + } + + /** Enrich a single [[CollectorPayload]] to get list of bad rows and/or enriched events */ + def enrichWith[F[_]: Clock: Sync]( + enrichRegistry: EnrichmentRegistry[F], + igluClient: Client[F, Json], + sentry: Option[SentryClient] + )( + row: Payload[F, Array[Byte]] + ): F[Result[F]] = { + val payload = ThriftLoader.toCollectorPayload(row.data, processor) + val result = Logger[F].debug(payloadToString(payload)) *> + Clock[F] + .realTime(TimeUnit.MILLISECONDS) + .map(millis => new DateTime(millis)) + .flatMap { now => + EtlPipeline.processEvents[F](adapterRegistry, enrichRegistry, igluClient, processor, now, payload) + } + .map(enriched => Payload(enriched, row.ack)) + + result.handleErrorWith(sendToSentry[F](row, sentry)) + } + + /** Stringify `ThriftLoader` result for debugging purposes */ + def payloadToString(payload: ValidatedNel[BadRow.CPFormatViolation, Option[CollectorPayload]]): String = + payload.fold(_.asJson.noSpaces, _.map(_.toBadRowPayload.asJson.noSpaces).getOrElse("None")) + + /** Transform enriched event into canonical TSV */ + def encodeEvent(enrichedEvent: EnrichedEvent): String = + enrichedEvent.getClass.getDeclaredFields + .filterNot(_.getName.equals("pii")) + .map { field => + field.setAccessible(true) + Option(field.get(enrichedEvent)).getOrElse("") + } + .mkString("\t") + + /** Log an error, turn the problematic [[CollectorPayload]] into [[BadRow]] and notify Sentry if configured */ + def sendToSentry[F[_]: Sync: Clock](original: Payload[F, Array[Byte]], sentry: Option[SentryClient])(error: Throwable): F[Result[F]] = + for { + _ <- Logger[F].error("Runtime exception during payload enrichment. CollectorPayload converted to generic_error and ack'ed") + now <- Clock[F].realTime(TimeUnit.MILLISECONDS).map(Instant.ofEpochMilli) + _ <- original.ack + badRow = genericBadRow(original.data, now, error) + _ <- sentry match { + case Some(client) => + Sync[F].delay(client.sendException(error)) + case None => + Sync[F].unit + } + } yield Payload(List(badRow.invalid), Sync[F].unit) + + /** Build a `generic_error` bad row for unhandled runtime errors */ + def genericBadRow( + row: Array[Byte], + time: Instant, + error: Throwable + ): BadRow = { + val base64 = new String(Base64.getEncoder.encode(row)) + val rawPayload = BadRowPayload.RawPayload(base64) + val failure = Failure.GenericFailure(time, NonEmptyList.one(error.toString)) + BadRow.GenericError(processor, failure, rawPayload) + } +} diff --git a/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/Environment.scala b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/Environment.scala new file mode 100644 index 000000000..7c8a98ada --- /dev/null +++ b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/Environment.scala @@ -0,0 +1,139 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.fs2 + +import cats.Show +import cats.data.EitherT +import cats.implicits._ + +import cats.effect.{Async, Blocker, Clock, Concurrent, ContextShift, Resource, Sync} + +import fs2.concurrent.SignallingRef + +import _root_.io.circe.Json +import _root_.io.circe.syntax._ + +import _root_.io.sentry.{Sentry, SentryClient} + +import _root_.io.chrisdavenport.log4cats.Logger +import _root_.io.chrisdavenport.log4cats.slf4j.Slf4jLogger + +import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData} +import com.snowplowanalytics.iglu.core.circe.implicits._ + +import com.snowplowanalytics.iglu.client.Client + +import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry +import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf + +import com.snowplowanalytics.snowplow.enrich.fs2.config.{CliConfig, ConfigFile} +import com.snowplowanalytics.snowplow.enrich.fs2.io.{FileSystem, Sinks, Source} + +/** + * All allocated resources, configs and mutable variables necessary for running Enrich process + * @param config original HOCON configuration + * @param resolver Iglu Client + * @param enrichments enrichment registry with all its clients and parsed configuration files + * @param stop a signalling reference that can pause a raw stream and enrichment, + * used for [[AssetsRefresh]] + * @param blocker thread pool for blocking operations and enrichments itself + * @param source a stream of raw collector payloads + * @param good a sink for successfully enriched events + * @param bad a sink for events that failed validation or enrichment + * @param sentry optional sentry client + */ +case class Environment[F[_]]( + config: ConfigFile, + resolver: Client[F, Json], + enrichments: Environment.Enrichments[F], + stop: SignallingRef[F, Boolean], + blocker: Blocker, + source: RawSource[F], + good: GoodSink[F], + bad: BadSink[F], + sentry: Option[SentryClient] +) + +object Environment { + + private implicit def unsafeLogger[F[_]: Sync]: Logger[F] = + Slf4jLogger.getLogger[F] + + type Parsed[F[_], A] = EitherT[F, String, A] + + type Allocated[F[_]] = Parsed[F, Resource[F, Environment[F]]] + + /** Registry with all allocated clients (MaxMind, IAB etc) and their original configs */ + case class Enrichments[F[_]](registry: EnrichmentRegistry[F], configs: List[EnrichmentConf]) + + /** Schema for all enrichments combined */ + val EnrichmentsKey: SchemaKey = + SchemaKey("com.snowplowanalytics.snowplow", "enrichments", "jsonschema", SchemaVer.Full(1, 0, 0)) + + /** Initialize and allocate all necessary resources */ + def init[F[_]: Concurrent: ContextShift: Clock](config: CliConfig): Allocated[F] = + parse[F](config).map { parsedConfigs => + for { + client <- Client.parseDefault[F](parsedConfigs.igluJson).resource + registry <- EnrichmentRegistry.build[F](parsedConfigs.enrichmentConfigs).resource + blocker <- Blocker[F] + rawSource = Source.read[F](blocker, parsedConfigs.configFile.auth, parsedConfigs.configFile.input) + goodSink <- Sinks.goodSink[F](parsedConfigs.configFile.auth, parsedConfigs.configFile.good) + badSink <- Sinks.badSink[F](parsedConfigs.configFile.auth, parsedConfigs.configFile.bad) + stop <- Resource.liftF(SignallingRef(true)) + enrichments = Enrichments(registry, parsedConfigs.enrichmentConfigs) + sentry <- parsedConfigs.configFile.sentryDsn match { + case Some(dsn) => Resource.liftF[F, Option[SentryClient]](Sync[F].delay(Sentry.init(dsn.toString).some)) + case None => Resource.pure[F, Option[SentryClient]](none[SentryClient]) + } + } yield Environment[F](parsedConfigs.configFile, client, enrichments, stop, blocker, rawSource, goodSink, badSink, sentry) + } + + /** Decode base64-encoded configs, passed via CLI. Read files, validate and parse */ + def parse[F[_]: Async: Clock: ContextShift](config: CliConfig): Parsed[F, ParsedConfigs] = + for { + igluJson <- config.resolver.fold(b => EitherT.rightT[F, String](b.value), p => FileSystem.readJson[F](p)) + enrichmentJsons <- config.enrichments match { + case Left(base64) => + EitherT.rightT[F, String](base64.value) + case Right(path) => + FileSystem + .readJsonDir[F](path) + .map(jsons => Json.arr(jsons: _*)) + .map(json => SelfDescribingData(EnrichmentsKey, json).asJson) + } + configFile <- ConfigFile.parse[F](config.config) + client <- Client.parseDefault[F](igluJson).leftMap(x => show"Cannot decode Iglu Client. $x") + _ <- EitherT.liftF(Logger[F].info(show"Parsed Iglu Client with following registries: ${client.resolver.repos.map(_.config.name)}")) + configs <- EitherT(EnrichmentRegistry.parse[F](enrichmentJsons, client, false).map(_.toEither)).leftMap { x => + show"Cannot decode enrichments ${x.mkString_(", ")}" + } + _ <- EitherT.liftF(Logger[F].info(show"Parsed following enrichments: ${configs.map(_.schemaKey.toSchemaUri)}")) + } yield ParsedConfigs(igluJson, configs, configFile) + + private[fs2] case class ParsedConfigs( + igluJson: Json, + enrichmentConfigs: List[EnrichmentConf], + configFile: ConfigFile + ) + + private implicit class EitherTOps[F[_], E: Show, A](eitherT: EitherT[F, E, A]) { + def resource(implicit F: Sync[F]): Resource[F, A] = { + val action: F[A] = eitherT.value.flatMap { + case Right(a) => Sync[F].pure(a) + case Left(error) => Sync[F].raiseError(new RuntimeException(error.show)) // Safe since we already parsed it + } + Resource.liftF[F, A](action) + } + } +} diff --git a/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/Main.scala b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/Main.scala new file mode 100644 index 000000000..bf756cabf --- /dev/null +++ b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/Main.scala @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.fs2 + +import cats.effect.{ExitCode, IO, IOApp} + +import _root_.io.sentry.SentryClient + +import _root_.io.chrisdavenport.log4cats.Logger +import _root_.io.chrisdavenport.log4cats.slf4j.Slf4jLogger + +object Main extends IOApp { + + private implicit val logger: Logger[IO] = + Slf4jLogger.getLogger[IO] + + def run(args: List[String]): IO[ExitCode] = + config.CliConfig.command.parse(args) match { + case Right(cfg) => + for { + _ <- logger.info("Initialising resources for Enrich job") + environment <- Environment.init[IO](cfg).value + exit <- environment match { + case Right(e) => e.use { env => + val enrich = Enrich.run[IO](env) + val updates = AssetsRefresh.run[IO](env) + val log = logger.info("Running enrichment stream") + log *> enrich.merge(updates).compile.drain.attempt.flatMap { + case Left(exception) => + unsafeSendSentry(exception, env.sentry) + IO.raiseError[ExitCode](exception).as(ExitCode.Error) + case Right(_) => + IO.pure(ExitCode.Success) + } + } + case Left(error) => + logger.error(s"Cannot initialise enrichment resources\n$error").as(ExitCode.Error) + } + } yield exit + case Left(error) => + IO(System.err.println(error)).as(ExitCode.Error) + } + + /** Last attempt to notify about an exception (possibly just interruption) */ + private def unsafeSendSentry(error: Throwable, sentry: Option[SentryClient]): Unit = { + sentry match { + case Some(client) => + client.sendException(error) + case None => () + } + logger.error(s"The Enrich job has stopped ${sentry.fold("")(_ => "Sentry report has been sent")}").unsafeRunSync() + } +} diff --git a/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/Payload.scala b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/Payload.scala new file mode 100644 index 000000000..189c95f6f --- /dev/null +++ b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/Payload.scala @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.fs2 + +import scala.annotation.tailrec + +import cats.Applicative +import cats.syntax.either._ +import cats.data.Validated + +import fs2.{Pure, Stream} + +import com.snowplowanalytics.snowplow.enrich.fs2.Payload.Parsed + +/** + * Anything that has been read from [[RawSource]] and needs to be acknowledged + * or a derivative (parsed `A`) that can be used to acknowledge the original message + * @param data original data or anything it has been transformed to + * @param ack a side-effect to acknowledge the message or no-op in case the original + * message has been flattened into multiple rows and only last row contains + * the actual side-effect + */ +case class Payload[F[_], A](data: A, ack: F[Unit]) { + + /** + * Flatten all payloads from a list and replace an `ack` action to no-op everywhere + * except last message, so that original collector payload (with multiple events) + * will be ack'ed only when last event has sunk into good or bad sink + */ + def decompose[L, R](implicit ev: A <:< List[Validated[L, R]], F: Applicative[F]): Stream[F, Parsed[F, L, R]] = { + val _ = ev + val noop: F[Unit] = Applicative[F].unit + def use(op: F[Unit])(v: Validated[L, R]): Parsed[F, L, R] = + v.fold(a => Payload(a, op).asLeft, b => Payload(b, op).asRight) + + Payload.mapWithLast(use(noop), use(ack))(data) + } +} + +object Payload { + + /** + * Original [[Payload]] that has been transformed into either `A` or `B` + * Despite of the result (`A` or `B`) the original one still has to be acknowledged + * + * If original contained only one row (good or bad), the `Parsed` must have a real + * `ack` action, otherwise if it has been accompanied by other rows, only the last + * element from the original will contain the `ack`, all others just `noop` + */ + type Parsed[F[_], A, B] = Either[Payload[F, A], Payload[F, B]] + + /** Apply `f` function to all elements in a list, except last one, where `lastF` applied */ + def mapWithLast[A, B](f: A => B, lastF: A => B)(as: List[A]): Stream[Pure, B] = { + @tailrec + def go(aas: List[A], accum: Vector[B]): Vector[B] = + aas match { + case Nil => + accum + case last :: Nil => + accum :+ lastF(last) + case a :: remaining => + go(remaining, accum :+ f(a)) + } + + Stream.emits(go(as, Vector.empty)) + } +} diff --git a/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/config/Base64Hocon.scala b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/config/Base64Hocon.scala new file mode 100644 index 000000000..e40037ba3 --- /dev/null +++ b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/config/Base64Hocon.scala @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.fs2.config + +import java.util.Base64 + +import cats.data.ValidatedNel +import cats.syntax.either._ + +import com.typesafe.config.{ConfigException, ConfigFactory} + +import _root_.io.circe.Json + +import pureconfig.syntax._ +import pureconfig.module.circe._ + +import com.monovore.decline.Argument + +final case class Base64Hocon(value: Json) extends AnyVal + +object Base64Hocon { + + private val base64 = Base64.getDecoder + + implicit val base64Hocon: Argument[Base64Hocon] = + new Argument[Base64Hocon] { + def read(string: String): ValidatedNel[String, Base64Hocon] = { + val result = for { + bytes <- Either.catchOnly[IllegalArgumentException](base64.decode(string)).leftMap(_.getMessage) + hocon <- parseHocon(new String(bytes)) + } yield hocon + result.toValidatedNel + } + + def defaultMetavar: String = "base64" + } + + def parseHocon(str: String): Either[String, Base64Hocon] = + for { + configValue <- Either.catchOnly[ConfigException](ConfigFactory.parseString(str)).leftMap(_.toString).map(_.toConfig) + json <- configValue.to[Json].leftMap(_.toString) + } yield Base64Hocon(json) +} diff --git a/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/config/Base64Json.scala b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/config/Base64Json.scala new file mode 100644 index 000000000..ab8fc4879 --- /dev/null +++ b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/config/Base64Json.scala @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.fs2.config + +import java.util.Base64 + +import cats.data.ValidatedNel +import cats.syntax.show._ +import cats.syntax.either._ + +import _root_.io.circe.Json +import _root_.io.circe.parser.parse + +import com.monovore.decline.Argument + +final case class Base64Json(value: Json) extends AnyVal + +object Base64Json { + + private val base64 = Base64.getDecoder + + implicit val base64Json: Argument[Base64Json] = + new Argument[Base64Json] { + + def read(string: String): ValidatedNel[String, Base64Json] = { + val result = for { + bytes <- Either.catchOnly[IllegalArgumentException](base64.decode(string)).leftMap(_.getMessage) + str = new String(bytes) + json <- parse(str).leftMap(_.show) + } yield Base64Json(json) + result.toValidatedNel + } + + def defaultMetavar: String = "base64" + } +} diff --git a/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/config/CliConfig.scala b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/config/CliConfig.scala new file mode 100644 index 000000000..d15a00dab --- /dev/null +++ b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/config/CliConfig.scala @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.fs2.config + +import java.nio.file.Path + +import cats.data.{NonEmptyList, ValidatedNel} +import cats.implicits._ + +import com.monovore.decline.{Argument, Command, Opts} + +import com.snowplowanalytics.snowplow.enrich.fs2.generated.BuildInfo + +final case class CliConfig( + config: EncodedHoconOrPath, + resolver: EncodedOrPath, + enrichments: EncodedOrPath +) + +object CliConfig { + + implicit val encodedOrPathArgument: Argument[EncodedOrPath] = + new Argument[EncodedOrPath] { + def read(string: String): ValidatedNel[String, EncodedOrPath] = { + val encoded = Argument[Base64Json].read(string).map(_.asLeft) + val path = Argument[Path].read(string).map(_.asRight) + val error = show"Value $string cannot be parsed as Base64 JSON neither as FS path" + encoded.orElse(path).leftMap(_ => NonEmptyList.one(error)) + } + + def defaultMetavar: String = "input" + } + + implicit val encodedHoconOrPathArgument: Argument[EncodedHoconOrPath] = + new Argument[EncodedHoconOrPath] { + def read(string: String): ValidatedNel[String, EncodedHoconOrPath] = { + val encoded = Argument[Base64Hocon].read(string).map(_.asLeft) + val path = Argument[Path].read(string).map(_.asRight) + val error = show"Value $string cannot be parsed as Base64 JSON neither as FS path" + encoded.orElse(path).leftMap(_ => NonEmptyList.one(error)) + } + + def defaultMetavar: String = "input" + } + + val configFile: Opts[EncodedHoconOrPath] = + Opts.option[EncodedHoconOrPath]("config", "Base64-encoded HOCON string with enrichment configurations", "c", "base64") + + val enrichments: Opts[EncodedOrPath] = + Opts.option[EncodedOrPath]("enrichments", "Base64-encoded JSON string with enrichment configurations", "e", "base64") + + val igluConfig: Opts[EncodedOrPath] = + Opts.option[EncodedOrPath]("iglu-config", "Iglu resolver configuration JSON", "r", "base64") + + val enrichedJobConfig: Opts[CliConfig] = + (configFile, igluConfig, enrichments).mapN(CliConfig.apply) + + val command: Command[CliConfig] = + Command(show"${BuildInfo.name}", show"${BuildInfo.name} ${BuildInfo.version}\n${BuildInfo.description}")(enrichedJobConfig) +} diff --git a/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/config/ConfigFile.scala b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/config/ConfigFile.scala new file mode 100644 index 000000000..95f0f220a --- /dev/null +++ b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/config/ConfigFile.scala @@ -0,0 +1,73 @@ +package com.snowplowanalytics.snowplow.enrich.fs2.config + +import scala.concurrent.duration.FiniteDuration + +import java.net.URI + +import cats.data.EitherT +import cats.syntax.either._ +import cats.syntax.functor._ +import cats.syntax.show._ +import cats.effect.{Blocker, ContextShift, Sync} + +import _root_.io.circe.{Decoder, Encoder, Json} +import _root_.io.circe.config.syntax._ +import _root_.io.circe.generic.extras.semiauto.{deriveConfiguredDecoder, deriveConfiguredEncoder} + +import com.snowplowanalytics.snowplow.enrich.fs2.config.io.{Authentication, Input, Output} + +import pureconfig.ConfigSource +import pureconfig.module.catseffect.syntax._ +import pureconfig.module.circe._ + +/** + * Parsed HOCON configuration file + * + * @param auth + * @param input + * @param good + * @param bad + * @param assetsUpdatePeriod time after which assets should be updated, in minutes + */ +final case class ConfigFile( + auth: Authentication, + input: Input, + good: Output, + bad: Output, + assetsUpdatePeriod: Option[FiniteDuration], + sentryDsn: Option[URI] +) + +object ConfigFile { + + // Missing in circe-config + implicit val finiteDurationEncoder: Encoder[FiniteDuration] = + implicitly[Encoder[String]].contramap(_.toString) + + implicit val javaNetUriDecoder: Decoder[URI] = + Decoder[String].emap { str => + Either.catchOnly[IllegalArgumentException](URI.create(str)).leftMap(_.getMessage) + } + + implicit val javaNetUriEncoder: Encoder[URI] = + Encoder[String].contramap(_.toString) + + implicit val configFileDecoder: Decoder[ConfigFile] = + deriveConfiguredDecoder[ConfigFile] + implicit val configFileEncoder: Encoder[ConfigFile] = + deriveConfiguredEncoder[ConfigFile] + + def parse[F[_]: Sync: ContextShift](in: EncodedHoconOrPath): EitherT[F, String, ConfigFile] = + in match { + case Right(path) => + val result = Blocker[F].use { blocker => + ConfigSource + .default(ConfigSource.file(path)) + .loadF[F, Json](blocker) + .map(_.as[ConfigFile].leftMap(f => show"Couldn't parse the config $f")) + } + EitherT(result) + case Left(encoded) => + EitherT.fromEither[F](encoded.value.as[ConfigFile].leftMap(failure => show"Couldn't parse a base64-encoded config file:\n$failure")) + } +} diff --git a/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/config/io.scala b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/config/io.scala new file mode 100644 index 000000000..6278a4f49 --- /dev/null +++ b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/config/io.scala @@ -0,0 +1,44 @@ +package com.snowplowanalytics.snowplow.enrich.fs2.config + +import _root_.io.circe.{Decoder, Encoder} +import _root_.io.circe.generic.extras.semiauto._ + +object io { + + sealed trait Authentication extends Product with Serializable + + object Authentication { + final case class Gcp(projectId: String) extends Authentication + final case class Aws() extends Authentication + + implicit val authenticationDecoder: Decoder[Authentication] = + deriveConfiguredDecoder[Authentication] + implicit val authenticationEncoder: Encoder[Authentication] = + deriveConfiguredEncoder[Authentication] + } + + /** Source of raw collector data (only PubSub supported atm) */ + sealed trait Input + + object Input { + + case class PubSub(subscriptionId: String) extends Input + + implicit val inputDecoder: Decoder[Input] = + deriveConfiguredDecoder[Input] + implicit val inputEncoder: Encoder[Input] = + deriveConfiguredEncoder[Input] + } + + sealed trait Output + + object Output { + case class PubSub(topic: String) extends Output + + implicit val outputDecoder: Decoder[Output] = + deriveConfiguredDecoder[Output] + implicit val outputEncoder: Encoder[Output] = + deriveConfiguredEncoder[Output] + } + +} diff --git a/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/config/package.scala b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/config/package.scala new file mode 100644 index 000000000..543b9214b --- /dev/null +++ b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/config/package.scala @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.fs2 + +import java.nio.file.Path + +import _root_.io.circe.generic.extras.Configuration + +package object config { + + type EncodedOrPath = Either[Base64Json, Path] + type EncodedHoconOrPath = Either[Base64Hocon, Path] + + private[config] implicit def customCodecConfig: Configuration = + Configuration.default.withDiscriminator("type") + +} diff --git a/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/io/Clients.scala b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/io/Clients.scala new file mode 100644 index 000000000..0d96670f4 --- /dev/null +++ b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/io/Clients.scala @@ -0,0 +1,132 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.fs2.io + +import java.net.URI + +import cats.syntax.option._ +import cats.syntax.functor._ +import cats.syntax.flatMap._ + +import cats.effect.{Blocker, ConcurrentEffect, ContextShift, Resource, Sync} + +import fs2.{RaiseThrowable, Stream} + +import blobstore.Path +import blobstore.s3.S3Store +import blobstore.gcs.GcsStore + +import com.google.cloud.storage.StorageOptions + +import org.http4s.{Request, Uri} +import org.http4s.client.{Client => HttpClient} +import org.http4s.client.blaze.BlazeClientBuilder + +import software.amazon.awssdk.services.s3.S3AsyncClient + +case class Clients[F[_]]( + s3Store: Option[S3Store[F]], + gcsStore: Option[GcsStore[F]], + http: Option[HttpClient[F]] +) { + + /** Download an `uri` as a stream of bytes, using the appropriate client */ + def download(uri: URI)(implicit RT: RaiseThrowable[F]): Stream[F, Byte] = + Clients.Client.getByUri(uri) match { + case Some(Clients.Client.S3) => + for { + s3 <- s3Store match { + case Some(c) => Stream.emit(c) + case None => Stream.raiseError(new IllegalStateException(s"S3 client is not initialized to download $uri")) + } + data <- s3.get(Path(uri.toString), 16 * 1024) + } yield data + case Some(Clients.Client.GCS) => + for { + gcs <- gcsStore match { + case Some(c) => Stream.emit(c) + case None => Stream.raiseError(new IllegalStateException(s"GCS client is not initialized to download $uri")) + } + data <- gcs.get(Path(uri.toString), 16 * 1024) + } yield data + case Some(Clients.Client.HTTP) => + http match { + case Some(c) => + val request = Request[F](uri = Uri.unsafeFromString(uri.toString)) + for { + response <- c.stream(request) + body <- if (response.status.isSuccess) { println(s"Returing body with ${response.status}"); response.body } + else { + println(s"Raising an error with ${response.status}") + Stream.raiseError[F](Clients.DownloadingFailure(uri)) + } + } yield body + case None => + Stream.raiseError(new IllegalStateException(s"HTTP client is not initialized to download $uri")) + } + case None => + Stream.raiseError(new IllegalStateException(s"No client initialized to download $uri")) + } +} + +object Clients { + + sealed trait Client + object Client { + case object S3 extends Client + case object GCS extends Client + case object HTTP extends Client + + def getByUri(uri: URI): Option[Client] = + uri.getScheme match { + case "http" | "https" => + Some(HTTP) + case "gs" => + Some(GCS) + case "s3" => + None + } + + def required(uris: List[URI]): Set[Client] = + uris.foldLeft(Set.empty[Client]) { (acc, uri) => + getByUri(uri) match { + case Some(client) => acc + client + case None => acc // This should short-circuit on initialisation + } + } + } + + def mkS3[F[_]: ConcurrentEffect]: F[S3Store[F]] = + Sync[F].delay(S3AsyncClient.builder().build()).flatMap(client => S3Store[F](client)) + + def mkGCS[F[_]: ConcurrentEffect: ContextShift](blocker: Blocker): F[GcsStore[F]] = + Sync[F].delay(StorageOptions.getDefaultInstance.getService).map { storage => + GcsStore(storage, blocker, List.empty) + } + + def mkHTTP[F[_]: ConcurrentEffect]: Resource[F, HttpClient[F]] = + BlazeClientBuilder[F](concurrent.ExecutionContext.global).resource + + def initialize[F[_]: ConcurrentEffect: ContextShift](blocker: Blocker, uris: List[URI]): Resource[F, Clients[F]] = { + val toInit = Client.required(uris) + for { + s3 <- if (toInit.contains(Client.S3)) Resource.liftF(mkS3[F]).map(_.some) else Resource.pure[F, Option[S3Store[F]]](none) + gcs <- if (toInit.contains(Client.GCS)) Resource.liftF(mkGCS[F](blocker).map(_.some)) else Resource.pure[F, Option[GcsStore[F]]](none) + http <- if (toInit.contains(Client.HTTP)) mkHTTP[F].map(_.some) else Resource.pure[F, Option[HttpClient[F]]](none) + } yield Clients(s3, gcs, http) + } + + case class DownloadingFailure(uri: URI) extends Throwable { + override def getMessage: String = s"Cannot download $uri" + } +} diff --git a/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/io/FileSystem.scala b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/io/FileSystem.scala new file mode 100644 index 000000000..9701d9a44 --- /dev/null +++ b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/io/FileSystem.scala @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.fs2.io + +import java.nio.file.{Files, Path} + +import scala.collection.JavaConverters._ + +import cats.data.EitherT + +import cats.effect.Sync +import cats.implicits._ + +import fs2.Stream + +import _root_.io.circe.Json +import _root_.io.circe.parser.parse + +object FileSystem { + + def readJson[F[_]: Sync](path: Path): EitherT[F, String, Json] = + EitherT(Sync[F].delay(Files.readString(path)).map(parse)) + .leftMap(e => show"Cannot read resolver config. ${e.getMessage()}") + + def readJsonDir[F[_]: Sync](dir: Path): EitherT[F, String, List[Json]] = { + val files = for { + paths <- Stream.eval(Sync[F].delay(Files.list(dir))) + path <- Stream.fromIterator(paths.iterator().asScala) + } yield path + val action = files.compile.toList + .flatMap(paths => paths.traverse(x => readJson(x).value)) + .map(_.sequence) + EitherT(action) + } + +} diff --git a/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/io/Sinks.scala b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/io/Sinks.scala new file mode 100644 index 000000000..cef3586b0 --- /dev/null +++ b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/io/Sinks.scala @@ -0,0 +1,83 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.fs2.io + +import java.nio.file.Path + +import scala.concurrent.duration._ + +import cats.syntax.apply._ + +import cats.effect.{Async, Blocker, ContextShift, Resource, Sync} + +import fs2.{Stream, text} +import fs2.io.file.writeAll + +import com.snowplowanalytics.snowplow.badrows.BadRow +import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent +import com.snowplowanalytics.snowplow.enrich.fs2.{BadSink, Enrich, GoodSink, Payload} +import com.snowplowanalytics.snowplow.enrich.fs2.config.io.{Authentication, Output} + +import com.permutive.pubsub.producer.Model.{ProjectId, Topic} +import com.permutive.pubsub.producer.encoder.MessageEncoder +import com.permutive.pubsub.producer.grpc.{GooglePubsubProducer, PubsubProducerConfig} + +object Sinks { + + def goodSink[F[_]: Async](auth: Authentication, output: Output): Resource[F, GoodSink[F]] = + (auth, output) match { + case (a: Authentication.Gcp, o: Output.PubSub) => + pubsubSink[F, EnrichedEvent](a, o) + case _ => + ??? + } + + def badSink[F[_]: Async]( + auth: Authentication, + output: Output + ): Resource[F, BadSink[F]] = + (auth, output) match { + case (a: Authentication.Gcp, o: Output.PubSub) => + pubsubSink[F, BadRow](a, o) + case _ => + ??? + } + + def pubsubSink[F[_]: Async, A: MessageEncoder](auth: Authentication.Gcp, output: Output.PubSub) = { + val config = PubsubProducerConfig[F]( + batchSize = 10, + delayThreshold = 5.seconds, + onFailedTerminate = err => Sync[F].delay(System.err.println(err)) + ) + + GooglePubsubProducer + .of[F, A](ProjectId(auth.projectId), Topic(output.topic), config) + .map(producer => (s: Stream[F, Payload[F, A]]) => s.evalMap(row => producer.produce(row.data) *> row.ack)) + } + + def goodFileSink[F[_]: Sync: ContextShift](goodOut: Path, blocker: Blocker): GoodSink[F] = + goodStream => + goodStream + .map(p => Enrich.encodeEvent(p.data)) + .intersperse("\n") + .through(text.utf8Encode) + .through(writeAll[F](goodOut, blocker)) + + def badFileSink[F[_]: Sync: ContextShift](badOut: Path, blocker: Blocker): BadSink[F] = + badStream => + badStream + .map(_.data.compact) + .intersperse("\n") + .through(text.utf8Encode) + .through(writeAll[F](badOut, blocker)) +} diff --git a/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/io/Source.scala b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/io/Source.scala new file mode 100644 index 000000000..9bc7a8063 --- /dev/null +++ b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/io/Source.scala @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2019 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.fs2.io + +import cats.effect.{Blocker, Concurrent, ContextShift, Sync} + +import fs2.Stream + +import com.snowplowanalytics.snowplow.enrich.fs2.{Payload, RawSource} + +import com.permutive.pubsub.consumer.Model +import com.permutive.pubsub.consumer.grpc.{PubsubGoogleConsumer, PubsubGoogleConsumerConfig} +import com.snowplowanalytics.snowplow.enrich.fs2.config.io.{Authentication, Input} +import com.snowplowanalytics.snowplow.enrich.fs2.config.io.Input.PubSub + +import com.google.pubsub.v1.PubsubMessage + +object Source { + + def read[F[_]: Concurrent: ContextShift]( + blocker: Blocker, + auth: Authentication, + input: Input + ): RawSource[F] = + (auth, input) match { + case (g: Authentication.Gcp, p: Input.PubSub) => pubSub(blocker, g, p) + case _ => ??? + } + + def pubSub[F[_]: Concurrent: ContextShift]( + blocker: Blocker, + auth: Authentication.Gcp, + input: PubSub + ): Stream[F, Payload[F, Array[Byte]]] = { + val onFailedTerminate: Throwable => F[Unit] = + e => Sync[F].delay(System.err.println(s"Boom ${e.getMessage}")) + val pubSubConfig = PubsubGoogleConsumerConfig(onFailedTerminate = onFailedTerminate) + val projectId = Model.ProjectId(auth.projectId) + val subscriptionId = Model.Subscription(input.subscriptionId) + val errorHandler: (PubsubMessage, Throwable, F[Unit], F[Unit]) => F[Unit] = // Should be useless + (message, error, _, _) => + Sync[F].delay(System.err.println(s"Cannot decode message ${message.getMessageId} into array of bytes. ${error.getMessage}")) + PubsubGoogleConsumer + .subscribe[F, Array[Byte]](blocker, projectId, subscriptionId, errorHandler, pubSubConfig) + .map(record => Payload(record.value, record.ack)) + } +} diff --git a/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/io/package.scala b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/io/package.scala new file mode 100644 index 000000000..9ad3e5eca --- /dev/null +++ b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/io/package.scala @@ -0,0 +1,31 @@ +package com.snowplowanalytics.snowplow.enrich.fs2 + +import cats.syntax.either._ + +import com.permutive.pubsub.consumer.decoder.MessageDecoder +import com.permutive.pubsub.producer.encoder.MessageEncoder + +import com.snowplowanalytics.snowplow.badrows.BadRow + +import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent + +package object io { + + implicit val badRowEncoder: MessageEncoder[BadRow] = + new MessageEncoder[BadRow] { + def encode(a: BadRow): Either[Throwable, Array[Byte]] = + a.compact.getBytes.asRight + } + + implicit val enrichedEventEncoder: MessageEncoder[EnrichedEvent] = + new MessageEncoder[EnrichedEvent] { + def encode(enrichedEvent: EnrichedEvent): Either[Throwable, Array[Byte]] = + Enrich.encodeEvent(enrichedEvent).getBytes.asRight + } + + implicit val byteArrayMessageDecoder: MessageDecoder[Array[Byte]] = + new MessageDecoder[Array[Byte]] { + def decode(message: Array[Byte]): Either[Throwable, Array[Byte]] = + message.asRight + } +} diff --git a/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/package.scala b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/package.scala new file mode 100644 index 000000000..57d85526b --- /dev/null +++ b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/package.scala @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich + +import cats.data.Validated + +import _root_.fs2.{Pipe, Stream} + +import com.snowplowanalytics.snowplow.badrows.BadRow + +import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent + +package object fs2 { + + /** Raw Thrift payloads coming from a collector */ + type RawSource[F[_]] = Stream[F, Payload[F, Array[Byte]]] + + type BadSink[F[_]] = Pipe[F, Payload[F, BadRow], Unit] + type GoodSink[F[_]] = Pipe[F, Payload[F, EnrichedEvent], Unit] + + /** Enrichment result, containing list of (valid and invalid) results */ + type Result[F[_]] = Payload[F, List[Validated[BadRow, EnrichedEvent]]] + + /** Function to transform an origin raw payload into good and/or bad rows */ + type Enrich[F[_]] = Payload[F, Array[Byte]] => F[Result[F]] +} diff --git a/modules/fs2/src/test/resources/simplelogger.properties b/modules/fs2/src/test/resources/simplelogger.properties new file mode 100644 index 000000000..d697b7813 --- /dev/null +++ b/modules/fs2/src/test/resources/simplelogger.properties @@ -0,0 +1,9 @@ +org.slf4j.simpleLogger.log.org.http4s.blaze.channel.ServerChannel=off +org.slf4j.simpleLogger.log.org.http4s.blaze.channel.nio1.SelectorLoop=off +org.slf4j.simpleLogger.log.org.http4s.blaze.channel.nio1.NIO1SocketServerGroup=off +org.slf4j.simpleLogger.log.org.http4s.client.PoolManager=off +org.slf4j.simpleLogger.log.org.http4s.server.blaze.BlazeServerBuilder=off + +org.slf4j.simpleLogger.log.com.snowplowanalytics.snowplow.enrich.fs2.Enrich=debug +org.slf4j.simpleLogger.log.com.snowplowanalytics.snowplow.enrich.fs2.AssetsRefresh=warn + diff --git a/modules/fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/fs2/AssetsRefreshSpec.scala b/modules/fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/fs2/AssetsRefreshSpec.scala new file mode 100644 index 000000000..146c6cd4e --- /dev/null +++ b/modules/fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/fs2/AssetsRefreshSpec.scala @@ -0,0 +1,208 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.fs2 + +import java.net.URI +import java.nio.file.{NoSuchFileException, Path, Paths} + +import scala.concurrent.duration._ + +import fs2.{Pipe, Stream} +import fs2.io.file.{deleteIfExists, exists, readAll} +import cats.implicits._ + +import cats.effect.{Blocker, IO, Resource, Timer} +import cats.effect.concurrent.Ref + +import org.http4s.{HttpApp, HttpRoutes} +import org.http4s.dsl.io.{Path => _, _} +import org.http4s.implicits._ +import org.http4s.Method.GET +import org.http4s.server.blaze.BlazeServerBuilder + +import com.snowplowanalytics.snowplow.enrich.fs2.io.Clients +import com.snowplowanalytics.snowplow.enrich.fs2.AssetsRefresh.Hash + +import org.specs2.mutable.Specification +import com.snowplowanalytics.snowplow.enrich.fs2.SpecHelpers.{CS, T} + +class AssetsRefreshSpec extends Specification { + + sequential + + "updateStream" should { + "not set stop signal if no updates required" in { + val result = AssetsRefreshSpec.run(1.second) { (blocker, state, curDir, halt) => + val updateStream = AssetsRefresh.updateStream[IO](blocker, state, curDir, 100.millis, List.empty) + halt(updateStream).compile.drain *> state.stop.get + } + result.unsafeRunSync() must beFalse + } + + "download an asset and return stop signal into false" in { + val path = Paths.get("asset") + val input = List( + (URI.create("http://localhost:8080/asset"), path.toString) + ) + val result = AssetsRefreshSpec.run(1.second) { (blocker, state, curDir, halt) => + val updateStream = AssetsRefresh.updateStream[IO](blocker, state, curDir, 300.millis, input) + for { + _ <- halt(updateStream).compile.drain + stop <- state.stop.get + assetExists <- exists[IO](blocker, path) + } yield (stop, assetExists) + } + + val (stop, assetExists) = result.unsafeRunSync() + (stop must beFalse) and (assetExists must beTrue) + } + + "set stop signal to true when long downloads are performed" in { + val input = List( + (URI.create("http://localhost:8080/slow"), "asset1"), // First sets stop to true + (URI.create("http://localhost:8080/slow"), "asset2") // Second doesn't allow update to return prematurely + ) + val result = AssetsRefreshSpec.run(3.seconds) { (blocker, state, curDir, halt) => + val updateStream = AssetsRefresh.updateStream[IO](blocker, state, curDir, 500.milliseconds, input) + + for { + fiber <- (IO.sleep(2.seconds) *> state.stop.get).start + _ <- halt(updateStream).compile.drain + stop <- fiber.join + } yield stop + } + + val stop = result.unsafeRunSync() + stop must beTrue + } + + "attempts to re-download non-existing file" in { + val path = Paths.get("flaky-asset") + val input = List( + (URI.create("http://localhost:8080/flaky"), path.toString) + ) + val result = AssetsRefreshSpec.run(4.seconds) { (blocker, state, curDir, halt) => + val updateStream = AssetsRefresh.updateStream[IO](blocker, state, curDir, 2.seconds, input) + for { + _ <- halt(updateStream).compile.drain + stop <- state.stop.get + assetExists <- readAll[IO](path, blocker, 8).compile.to(Array).map(b => new String(b)) + } yield (stop, assetExists) + } + + val (stop, assetExists) = result.unsafeRunSync() + (stop must beFalse) and (assetExists must beEqualTo("3")) + } + + } +} + +object AssetsRefreshSpec { + + /** + * Test function with allocated resources + * First argument - blocking thread pool + * Second argument - empty state + * Third argument - stream that will force the test to exit + */ + type Test[A] = (Blocker, AssetsRefresh.State[IO], Path, Pipe[IO, Unit, Unit]) => IO[A] + + /** + * Run a tests with resources allocated specifically for it + * It will allocate thread pool, empty state, HTTP server and will + * automatically remove all files after the test is over + * + * @param time timeout after which the test will be forced to exit + * @param test the actual test suite function + */ + def run[A](time: FiniteDuration)(test: Test[A]): IO[A] = { + val resources = for { + state <- AssetsRefreshSpec.mkState + blocker <- SpecHelpers.blocker + } yield (state, blocker) + + resources.use { + case ((path, state), blocker) => + val haltStream = AssetsRefreshSpec.haltStream[IO](time) + test(blocker, state, path, s => haltStream.mergeHaltL(s.merge(runServer))).flatTap(_ => filesCleanup(blocker)) + } + } + + private val TestFiles = List( + Paths.get("asset"), + Paths.get("asset1"), + Paths.get("asset2"), + Paths.get("flaky-asset") + ) + + /** Allocate empty state with HTTP client */ + val mkState: Resource[IO, (Path, AssetsRefresh.State[IO])] = + for { + blocker <- Blocker[IO] + clients <- Clients.initialize[IO](blocker, List(URI.create("http://localhost:8080"))) + map <- Resource.liftF(Ref.of[IO, Map[URI, Hash]](Map.empty)) + stop <- Resource.liftF(Ref.of[IO, Boolean](false)) + path = Paths.get("") + } yield (path, AssetsRefresh.State(map, stop, clients)) + + /** A stream that will halt after specified duration */ + def haltStream[F[_]: Timer](after: FiniteDuration): Stream[F, Unit] = + Stream.eval(Timer[F].sleep(after)) + + /** Clean-up predefined list of files */ + def filesCleanup(blocker: Blocker): IO[Unit] = + TestFiles.traverse_ { path => + deleteIfExists[IO](blocker, path).recover { + case _: NoSuchFileException => false + } + } + + /** Very dumb web-service that can count requests */ + def assetService(counter: Ref[IO, Int]): HttpApp[IO] = + HttpRoutes + .of[IO] { + case GET -> Root / "asset" => + Ok("data") + case GET -> Root / "slow" => + for { + i <- counter.updateAndGet(_ + 1) + _ <- if (i == 1) IO.sleep(100.milliseconds) else IO.sleep(10.seconds) + res <- Ok(s"slow data $i") + } yield res + case GET -> Root / "counter" => + counter.updateAndGet(_ + 1).flatMap { i => + Ok(s"counter $i") + } + case GET -> Root / "flaky" => + counter.update(_ + 1) *> + counter.get.flatMap { i => + val s = i.toString + if (i == 1 || i == 2) NotFound(s) + else if (i == 3) Ok(s) + else NotFound(s) + } + } + .orNotFound + + def runServer: Stream[IO, Unit] = + for { + counter <- Stream.eval(Ref.of[IO, Int](0)) + stream <- BlazeServerBuilder[IO](concurrent.ExecutionContext.global) + .bindHttp(8080) + .withHttpApp(assetService(counter)) + .withoutBanner + .withoutSsl + .serve + .void + } yield stream +} diff --git a/modules/fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/fs2/EnrichSpec.scala b/modules/fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/fs2/EnrichSpec.scala new file mode 100644 index 000000000..5c5fe3be1 --- /dev/null +++ b/modules/fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/fs2/EnrichSpec.scala @@ -0,0 +1,85 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.fs2 + +import java.time.Instant +import java.util.UUID + +import cats.Applicative +import cats.data.Validated +import cats.effect.IO + +import _root_.io.circe.syntax._ + +import org.apache.http.message.BasicNameValuePair + +import com.snowplowanalytics.snowplow.analytics.scalasdk.Event +import com.snowplowanalytics.snowplow.enrich.common.loaders.CollectorPayload +import com.snowplowanalytics.snowplow.enrich.fs2.SpecHelpers.staticIoClock + +import org.specs2.mutable.Specification + +class EnrichSpec extends Specification { + + "enrichWith" should { + "enrich a minimal page_view CollectorPayload event without any enrichments enabled" in { + val result = for { + igluClient <- SpecHelpers.igluClient + registry = SpecHelpers.enrichmentReg + result <- Enrich.enrichWith(registry, igluClient, None)(EnrichSpec.payload[IO]) + } yield result.data.map(event => event.map(Enrich.encodeEvent)) + + val Expected = Event + .minimal( + EnrichSpec.eventId, + Instant.ofEpochMilli(0L), + "ssc-0.0.0-test", + "fs2-enrich-1.3.1-common-1.3.1" + ) + .copy( + etl_tstamp = Some(Instant.ofEpochMilli(SpecHelpers.StaticTime)), + user_ipaddress = Some("127.10.1.3"), + event = Some("page_view"), + event_vendor = Some("com.snowplowanalytics.snowplow"), + event_name = Some("page_view"), + event_format = Some("jsonschema"), + event_version = Some("1-0-0"), + derived_tstamp = Some(Instant.ofEpochMilli(0L)) + ) + + result.unsafeRunSync() must be like { + case List(Validated.Valid(tsv)) => + Event.parse(tsv) match { + case Validated.Valid(Expected) => ok + case Validated.Valid(event) => ko(s"Event is valid, but event:\n ${event.asJson}\ndoesn't match expected:\n${Expected.asJson}") + case Validated.Invalid(error) => ko(s"Analytics SDK cannot parse the output. $error") + } + case _ => ko("Expected one valid event") + } + } + } +} + +object EnrichSpec { + val eventId = UUID.fromString("deadbeef-dead-beef-dead-beefdead") + + val api = CollectorPayload.Api("com.snowplowanalytics.snowplow", "tp2") + val source = CollectorPayload.Source("ssc-0.0.0-test", "UTF-8", Some("collector.snplow.net")) + val context = CollectorPayload.Context(None, Some("127.10.1.3"), None, None, List(), None) + val querystring = List( + new BasicNameValuePair("e", "pv"), + new BasicNameValuePair("eid", eventId.toString) + ) + val colllectorPayload = CollectorPayload(api, querystring, None, None, source, context) + def payload[F[_]: Applicative] = Payload(colllectorPayload.toRaw, Applicative[F].unit) +} diff --git a/modules/fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/fs2/SpecHelpers.scala b/modules/fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/fs2/SpecHelpers.scala new file mode 100644 index 000000000..0df81e53b --- /dev/null +++ b/modules/fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/fs2/SpecHelpers.scala @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.fs2 + +import scala.concurrent.duration.TimeUnit + +import cats.effect.{Blocker, Clock, ContextShift, IO, Resource, Timer} + +import _root_.io.circe.literal._ + +import com.snowplowanalytics.iglu.client.Client + +import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry + +object SpecHelpers { + implicit val ioClock: Clock[IO] = + Clock.create[IO] + + val StaticTime = 1599750938180L + + implicit val staticIoClock: Clock[IO] = + new Clock[IO] { + def realTime(unit: TimeUnit): IO[Long] = IO.pure(StaticTime) + def monotonic(unit: TimeUnit): IO[Long] = IO.pure(StaticTime) + } + + val blocker: Resource[IO, Blocker] = Blocker[IO] + + implicit def CS: ContextShift[IO] = IO.contextShift(concurrent.ExecutionContext.global) + + implicit val T: Timer[IO] = IO.timer(concurrent.ExecutionContext.global) + + val resolverConfig = json""" + { + "schema": "iglu:com.snowplowanalytics.iglu/resolver-config/jsonschema/1-0-2", + "data": { + "cacheSize": 500, + "cacheTtl": 600, + "repositories": [ { + "name": "Iglu Central", + "priority": 1, + "vendorPrefixes": [], + "connection": { + "http": { "uri": "http://iglucentral.com" } + } + } + ] + } + }""" + + val igluClient = Client.parseDefault[IO](resolverConfig).value.flatMap { + case Right(client) => IO.pure(client) + case Left(error) => IO.raiseError(new RuntimeException(error)) + } + + val enrichmentReg: EnrichmentRegistry[IO] = EnrichmentRegistry[IO]() +} diff --git a/modules/fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/fs2/config/CliConfigSpec.scala b/modules/fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/fs2/config/CliConfigSpec.scala new file mode 100644 index 000000000..d9404cf6b --- /dev/null +++ b/modules/fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/fs2/config/CliConfigSpec.scala @@ -0,0 +1,78 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.fs2.config + +import cats.syntax.either._ + +import cats.effect.{ContextShift, IO} + +import org.specs2.mutable.Specification + +class CliConfigSpec extends Specification { + "parseHocon" should { + "parse valid HOCON" in { + val string = """ + input = { + type = "PubSub" + subscriptionId = "inputSub" + } + """.stripMargin + Base64Hocon.parseHocon(string) must beRight + } + } + + "ConfigFile.parse" should { + "parse valid HOCON" in { + val hocon = + Base64Hocon + .parseHocon(""" + auth = { + type = "Gcp" + projectId = "test-project" + } + input = { + type = "PubSub" + subscriptionId = "inputSub" + } + good = { + type = "PubSub" + topic = "good-topic" + } + bad = { + type = "PubSub" + topic = "bad-topic" + } + """.stripMargin) + .toOption + .getOrElse(throw new RuntimeException("Cannot parse HOCON file")) + + val expected = ConfigFile( + io.Authentication.Gcp("test-project"), + io.Input.PubSub("inputSub"), + io.Output.PubSub("good-topic"), + io.Output.PubSub("bad-topic"), + None, + None + ) + + import CliConfigSpec._ + val action = ConfigFile.parse[IO](hocon.asLeft).value + + action.unsafeRunSync() must beRight(expected) + } + } +} + +object CliConfigSpec { + implicit val CS: ContextShift[IO] = IO.contextShift(concurrent.ExecutionContext.global) +} diff --git a/project/Dependencies.scala b/project/Dependencies.scala index a45685850..021433633 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -46,6 +46,7 @@ object Dependencies { val maxmindIplookups = "0.7.1" val circe = "0.13.0" val circeOptics = "0.13.0" + val circeConfig = "0.7.0" val circeJackson = "0.13.0" val scalaForex = "1.0.0" val scalaWeather = "1.0.0" @@ -66,8 +67,16 @@ object Dependencies { val jackson = "2.10.3" val config = "1.3.4" + val decline = "1.0.0" + val fs2 = "2.4.4" + val fs2PubSub = "0.16.0" + val fs2BlobStorage = "0.7.3" + val http4s = "0.21.6" + val log4cats = "1.1.1" + val catsRetry = "1.1.1" + val scopt = "3.7.1" - val pureconfig = "0.11.0" + val pureconfig = "0.13.0" val snowplowTracker = "0.6.1" val specs2 = "4.5.1" @@ -95,10 +104,12 @@ object Dependencies { val circeCore = "io.circe" %% "circe-core" % V.circe val circeGeneric = "io.circe" %% "circe-generic" % V.circe + val circeExtras = "io.circe" %% "circe-generic-extras" % V.circe val circeParser = "io.circe" %% "circe-parser" % V.circe val circeLiteral = "io.circe" %% "circe-literal" % V.circe val circeJava8 = "io.circe" %% "circe-java8" % V.circe val circeJawn = "io.circe" %% "circe-jawn" % V.circe + val circeConfig = "io.circe" %% "circe-config" % V.circeConfig val circeOptics = "io.circe" %% "circe-optics" % V.circeOptics val circeJackson = "io.circe" %% "circe-jackson210" % V.circeJackson val scalaUri = "io.lemonlabs" %% "scala-uri" % V.scalaUri @@ -144,5 +155,20 @@ object Dependencies { val scalacheck = "org.scalacheck" %% "scalacheck" % V.scalacheck % Test val kafka = "org.apache.kafka" %% "kafka" % V.kafka % Test val jinJava = "com.hubspot.jinjava" % "jinjava" % V.jinJava % Test + + // FS2 + val decline = "com.monovore" %% "decline" % V.decline + val fs2PubSub = "com.permutive" %% "fs2-google-pubsub-grpc" % V.fs2PubSub + val fs2 = "co.fs2" %% "fs2-core" % V.fs2 + val fs2Io = "co.fs2" %% "fs2-io" % V.fs2 + val http4sClient = "org.http4s" %% "http4s-blaze-client" % V.http4s + val log4cats = "io.chrisdavenport" %% "log4cats-slf4j" % V.log4cats + val catsRetry = "com.github.cb372" %% "cats-retry" % V.catsRetry + val fs2BlobS3 = "com.github.fs2-blobstore" %% "s3" % V.fs2BlobStorage + val fs2BlobGcs = "com.github.fs2-blobstore" %% "gcs" % V.fs2BlobStorage + val pureconfigCats = "com.github.pureconfig" %% "pureconfig-cats-effect" % V.pureconfig + val pureconfigCirce = "com.github.pureconfig" %% "pureconfig-circe" % V.pureconfig + val http4sDsl = "org.http4s" %% "http4s-dsl" % V.http4s % Test + val http4sServer = "org.http4s" %% "http4s-blaze-server" % V.http4s % Test } }