diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 6db9dae66..266d70251 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -78,8 +78,10 @@ jobs: run: sbt "project kinesis" docker:publish - name: Build and publish Stream Kafka Docker image run: sbt "project kafka" docker:publish - - name: Build and publish Stream NSQ Docker images + - name: Build and publish Stream NSQ Docker image run: sbt "project nsq" docker:publish + - name: Build and publish Stream NH Docker image + run: sbt "project fs2" docker:publish deploy_sce: needs: test diff --git a/build.sbt b/build.sbt index 6c5f9ab99..d8b8e84ed 100644 --- a/build.sbt +++ b/build.sbt @@ -157,7 +157,7 @@ lazy val beam = .settings( name := "beam-enrich", description := "Streaming enrich job written using SCIO", - buildInfoKeys := Seq[BuildInfoKey](organization, name, version, "sceVersion" -> version.value), + buildInfoKeys := Seq[BuildInfoKey](organization, name, version), buildInfoPackage := "com.snowplowanalytics.snowplow.enrich.beam.generated", libraryDependencies ++= Seq( Dependencies.Libraries.scio, @@ -183,3 +183,52 @@ lazy val beam = .enablePlugins(JavaAppPackaging, DockerPlugin, BuildInfoPlugin) Global / onChangedBuildSource := ReloadOnSourceChanges + +lazy val fs2 = project + .in(file("modules/fs2")) + .dependsOn(common) + .settings(BuildSettings.basicSettings) + .settings(BuildSettings.formatting) + .settings(BuildSettings.scoverageSettings) + .settings(BuildSettings.addExampleConfToTestCp) + .settings(BuildSettings.sbtAssemblySettings) + .settings( + name := "fs2-enrich", + description := "High-performance streaming Snowplow Enrich job built on top of functional streams", + buildInfoKeys := Seq[BuildInfoKey](organization, name, version, "sceVersion" -> version.value, description), + buildInfoPackage := "com.snowplowanalytics.snowplow.enrich.fs2.generated", + ) + .settings( + libraryDependencies ++= Seq( + Dependencies.Libraries.decline, + Dependencies.Libraries.fs2PubSub, + Dependencies.Libraries.circeExtras, + Dependencies.Libraries.circeLiteral, + Dependencies.Libraries.circeConfig, + Dependencies.Libraries.fs2, + Dependencies.Libraries.fs2Io, + Dependencies.Libraries.monocle, + Dependencies.Libraries.monocleMacro, + Dependencies.Libraries.slf4j, + Dependencies.Libraries.sentry, + Dependencies.Libraries.log4cats, + Dependencies.Libraries.catsRetry, + Dependencies.Libraries.http4sClient, + Dependencies.Libraries.fs2BlobS3, + Dependencies.Libraries.fs2BlobGcs, + Dependencies.Libraries.metrics, + Dependencies.Libraries.pureconfig.withRevision(Dependencies.V.pureconfig013), + Dependencies.Libraries.pureconfigCats.withRevision(Dependencies.V.pureconfig013), + Dependencies.Libraries.pureconfigCirce.withRevision(Dependencies.V.pureconfig013), + Dependencies.Libraries.specs2, + Dependencies.Libraries.specs2CE, + Dependencies.Libraries.scalacheck, + Dependencies.Libraries.specs2Scalacheck, + Dependencies.Libraries.http4sDsl, + Dependencies.Libraries.http4sServer + ), + addCompilerPlugin("com.olegpy" %% "better-monadic-for" % "0.3.1") + ) + .enablePlugins(BuildInfoPlugin) + .settings(BuildSettings.dockerSettings) + .enablePlugins(BuildInfoPlugin, JavaAppPackaging, DockerPlugin) diff --git a/config/config.fs2.hocon.sample b/config/config.fs2.hocon.sample new file mode 100644 index 000000000..19fd867db --- /dev/null +++ b/config/config.fs2.hocon.sample @@ -0,0 +1,24 @@ +auth = { + type = "Gcp" + projectId = "test-project" +} +input = { + type = "PubSub" + subscriptionId = "inputSub" +} +good = { + type = "PubSub" + topic = "good-topic" +} +bad = { + type = "PubSub" + topic = "bad-topic" +} + +assetsUpdatePeriod = "7 days" + +sentry = { + dsn = "http://sentry.acme.com" +} + +metricsReportPeriod = "1 second" \ No newline at end of file diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/Adapter.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/Adapter.scala index 346dfc6b1..cd9a85738 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/Adapter.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/adapters/registry/Adapter.scala @@ -13,12 +13,9 @@ package com.snowplowanalytics.snowplow.enrich.common.adapters.registry import cats.Monad -import cats.data.{NonEmptyList, ValidatedNel} +import cats.data.{NonEmptyList, Validated, ValidatedNel} import cats.data.Validated._ -import cats.syntax.either._ -import cats.syntax.eq._ -import cats.syntax.option._ -import cats.syntax.validated._ +import cats.implicits._ import cats.effect.Clock @@ -265,17 +262,11 @@ trait Adapter { * or Failures */ protected[registry] def rawEventsListProcessor( - rawEventsList: List[ValidatedNel[FailureDetails.AdapterFailure, RawEvent]] + rawEventsList: List[Validated[NonEmptyList[FailureDetails.AdapterFailure], RawEvent]] ): ValidatedNel[FailureDetails.AdapterFailure, NonEmptyList[RawEvent]] = { - val successes: List[RawEvent] = - for { - Valid(s) <- rawEventsList - } yield s - - val failures: List[FailureDetails.AdapterFailure] = - (for { - Invalid(NonEmptyList(h, t)) <- rawEventsList - } yield h :: t).flatten + val (failures, successes) = rawEventsList.separate match { + case (nel, list) => (nel.flatMap(_.toList), list) + } (successes, failures) match { // No Failures collected. diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/apirequest/ApiRequestEnrichment.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/apirequest/ApiRequestEnrichment.scala index 9b9ed39b8..2d2341897 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/apirequest/ApiRequestEnrichment.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/registry/apirequest/ApiRequestEnrichment.scala @@ -18,6 +18,8 @@ import cats.{Id, Monad} import cats.data.{EitherT, NonEmptyList, ValidatedNel} import cats.implicits._ +import cats.effect.Sync + import io.circe._ import io.circe.generic.auto._ @@ -26,7 +28,6 @@ import com.snowplowanalytics.iglu.core.circe.implicits._ import com.snowplowanalytics.lrumap._ import com.snowplowanalytics.snowplow.badrows.FailureDetails - import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.{Enrichment, ParseableEnrichment} import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf.ApiRequestConf import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent @@ -231,4 +232,24 @@ object CreateApiRequestEnrichment { ) ) } + + implicit def syncCreateApiRequestEnrichment[F[_]: Sync]( + implicit CLM: CreateLruMap[F, String, (Either[Throwable, Json], Long)], + HTTP: HttpClient[F] + ): CreateApiRequestEnrichment[F] = + new CreateApiRequestEnrichment[F] { + def create(conf: ApiRequestConf): F[ApiRequestEnrichment[F]] = + CLM + .create(conf.cache.size) + .map(c => + ApiRequestEnrichment( + conf.schemaKey, + conf.inputs, + conf.api, + conf.outputs, + conf.cache.ttl, + c + ) + ) + } } diff --git a/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/Assets.scala b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/Assets.scala new file mode 100644 index 000000000..29ee8523b --- /dev/null +++ b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/Assets.scala @@ -0,0 +1,292 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.fs2 + +import java.net.URI +import java.nio.file.{Path, Paths} + +import scala.concurrent.duration._ +import scala.util.control.NonFatal + +import cats.{Applicative, Parallel} +import cats.implicits._ + +import cats.effect.{Blocker, Concurrent, ConcurrentEffect, ContextShift, Resource, Sync, Timer} +import cats.effect.concurrent.Ref + +import retry.{RetryDetails, RetryPolicies, RetryPolicy, retryingOnSomeErrors} + +import fs2.Stream +import fs2.hash.md5 +import fs2.io.file.{copy, deleteIfExists, exists, readAll, tempFileResource, writeAll} + +import _root_.io.chrisdavenport.log4cats.Logger +import _root_.io.chrisdavenport.log4cats.slf4j.Slf4jLogger + +import com.snowplowanalytics.snowplow.enrich.fs2.io.Clients + +/** + * Functions responsible for periodic assets (such as MaxMind/IAB DBs) updates + * The common logic is to periodically invoke a function that: + * 1. Downloads a file (in background) to a temp location + * 2. Compares file's checksum with existing one (stored in a mutable hashmap) + * 3. If checksums match - delete the temp file, return + * 4. If checksums don't match - send a signal to stop raw stream + * (via `SignallingRef` in [[Environment]]) + * 5. Once raw stream is stopped - delete an old file and move + * temp file to the old's file location + * If any of those URIs been updated and stopped the raw stream, it will be + * immediately resumed once the above procedure traversed all files + */ +object Assets { + + private implicit def unsafeLogger[F[_]: Sync]: Logger[F] = + Slf4jLogger.getLogger[F] + + /** + * State of the [[updateStream]], containing information about tracked URIs + * and `stop` signal from [[Environment]] as well as all clients necessary + * to download URIs + * + * @param files mutable hash map of URIs and their latest known state + * @param stop stop signal coming from [[Environment]] and that can be used + * to stop the raw stream consumption + * @param clients HTTP, GCS, S3 clients if necessary + */ + final case class State[F[_]]( + files: Ref[F, Map[URI, Hash]], + stop: Ref[F, Boolean], + clients: Clients[F] + ) + + object State { + + /** Test pair is used in tests to initialize HTTP client */ + private val TestPair: Asset = URI.create("http://localhost:8080") -> "index" + + /** + * Initialize an assets state. Try to find them on local FS + * or download if they're missing. Also initializes all necessary + * clients (S3, GCP, HTTP etc) + * @param blocker thread pool for downloading and reading files + * @param stop global stop signal from [[Environment]] + * @param assets all assets that have to be tracked + */ + def make[F[_]: ConcurrentEffect: Timer: ContextShift]( + blocker: Blocker, + stop: Ref[F, Boolean], + assets: List[Asset] + ): Resource[F, State[F]] = + for { + clients <- Clients.make[F](blocker, assets.map(_._1)) + map <- Resource.liftF(build[F](blocker, clients, assets.filterNot(asset => asset == TestPair))) + files <- Resource.liftF(Ref.of[F, Map[URI, Hash]](map)) + } yield State(files, stop, clients) + + def build[F[_]: Concurrent: Timer: ContextShift]( + blocker: Blocker, + clients: Clients[F], + assets: List[Asset] + ): F[Map[URI, Hash]] = + buildFromLocal(blocker, assets) + .flatMap { hashes => + hashes.traverse { + case (uri, path, Some(hash)) => + Logger[F].info(s"Asset from $uri is found on local system at $path").as(uri -> hash) + case (uri, path, None) => + downloadAndHash[F](clients, blocker, uri, Paths.get(path)).map(hash => uri -> hash) + } + } + .map(_.toMap) + + def buildFromLocal[F[_]: Sync: ContextShift](blocker: Blocker, assets: List[Asset]): F[List[(URI, String, Option[Hash])]] = + assets.traverse { case (uri, path) => local[F](blocker, path).map(hash => (uri, path, hash)) } + + /** Check if file already exists */ + def local[F[_]: Sync: ContextShift](blocker: Blocker, path: String): F[Option[Hash]] = { + val fpath = Paths.get(path) + exists(blocker, fpath).ifM( + Hash.fromStream(readAll(fpath, blocker, 1024)).map(_.some), + Sync[F].pure(none) + ) + } + } + + /** Valid MD5 hash */ + final case class Hash private(s: String) extends AnyVal + + object Hash { + private[this] def fromBytes(bytes: Array[Byte]): Hash = { + val bi = new java.math.BigInteger(1, bytes) + Hash(String.format("%0" + (bytes.length << 1) + "x", bi)) + } + + def fromStream[F[_]: Sync](stream: Stream[F, Byte]): F[Hash] = + stream.through(md5).compile.to(Array).map(fromBytes) + } + + /** Pair of a tracked `URI` and destination path on local FS (`java.nio.file.Path` is not serializable) */ + type Asset = (URI, String) + + /** Initialise the [[updateStream]] with all necessary resources if refresh period is specified */ + def run[F[_]: ConcurrentEffect: ContextShift: Timer: Parallel](env: Environment[F]): Stream[F, Unit] = + env.assetsUpdatePeriod match { + case Some(duration) => + val init = for { + curDir <- getCurDir + _ <- Logger[F].info(s"Initializing assets refresh stream in $curDir") + assets <- env.enrichments.get.map(_.configs.flatMap(_.filesToCache)) + _ <- download(env.blocker, env.assetsState, curDir, assets) + _ <- env.pauseEnrich.set(false) + } yield updateStream[F](env.blocker, env.assetsState, env.enrichments, curDir, duration, assets) + Stream.eval(init).flatten + case None => + Stream.empty.covary[F] + } + + def getCurDir[F[_]: Sync]: F[Path] = + Sync[F].delay(Paths.get("").toAbsolutePath) + + /** + * At the end of every update, the stop signal will be resumed to `false` + * Create an update stream that ticks periodically and can invoke an update action, + * which will download an URI and check if it has been update. If it has the + * raw stream will be stopped via `stop` signal from [[Environment]] and assets updated + */ + def updateStream[F[_]: ConcurrentEffect: ContextShift: Parallel: Timer]( + blocker: Blocker, + state: State[F], + enrichments: Ref[F, Environment.Enrichments[F]], + curDir: Path, + duration: FiniteDuration, + assets: List[Asset] + ): Stream[F, Unit] = + Stream.awakeEvery[F](duration).evalMap { _ => + val log = Logger[F].debug(show"Checking remote assets: ${assets.map(_._1).mkString(", ")}") + val updates = download[F](blocker, state, curDir, assets) + val reinitialize = + for { + // side-effecting get-set is inherently not thread-safe + // we need to be sure the state.stop is set to true + // before re-initializing enrichments + old <- enrichments.get + _ <- Logger[F].info(s"Reinitializing enrichments: ${old.configs.map(_.schemaKey.name).mkString(", ")}") + fresh <- old.reinitialize + _ <- enrichments.set(fresh) + _ <- state.stop.set(false) + } yield () + log *> updates.ifM(reinitialize, Sync[F].unit) + } + + /** Download list of assets, return false if none has been downloaded */ + def download[F[_]: ConcurrentEffect: ContextShift: Parallel: Timer]( + blocker: Blocker, + state: State[F], + dir: Path, + assets: List[Asset] + ): F[Boolean] = + assets + .parTraverse { + case (uri, path) => + update(blocker, state, dir)(uri, Paths.get(path)) + } + .map(_.contains(true)) + + /** + * Update a file in current directory if it has been updated on remote storage + * If a new file has been discovered - stops the enriching streams (signal in `state`) + * Do nothing if file hasn't been updated + * + * Note: this function has a potential to be thread-unsafe if download time + * exceeds tick period. We assume that no two threads will be downloading the same URI + * + * @param blocker a thread pool to execute download/copy operations + * @param state a map of URI to MD5 hash to keep track latest state of remote files + * @param curDir a local FS destination for temporary files + * @param uri a remote file (S3, GCS or HTTP), the URI is used as an identificator + * @param path a static file name that enrich clients will access + * file itself is placed in current dir (`dir`) + * @return true if file has been updated + */ + def update[F[_]: ConcurrentEffect: ContextShift: Timer]( + blocker: Blocker, + state: State[F], + curDir: Path + )( + uri: URI, + path: Path + ): F[Boolean] = + tempFileResource[F](blocker, curDir).use { tmp => + // Set stop signal and replace old file with temporary + def stopAndCopy(hash: Hash, delete: Boolean): F[Unit] = + for { + _ <- Logger[F].info(s"Discovered new data at $uri, pausing enrich stream") + _ <- state.stop.set(true) + _ <- if (delete) { + val deleted = Logger[F].info(s"Deleted outdated $path") + val notDeleted = Logger[F].warn(s"Couldn't delete $path, file didn't exist") + deleteIfExists(blocker, path).ifM(deleted, notDeleted) + } else Sync[F].unit + _ <- copy(blocker, tmp, path) + _ <- state.files.update(_.updated(uri, hash)) + } yield () + + for { + hash <- downloadAndHash(state.clients, blocker, uri, tmp) + localFiles <- state.files.get + updated <- localFiles.get(uri) match { + case Some(known) if known == hash => + Sync[F].pure(false) + case Some(_) => + stopAndCopy(hash, true).as(true) + case None => + stopAndCopy(hash, false).as(true) + } + } yield updated + } + + def downloadAndHash[F[_]: Concurrent: ContextShift: Timer]( + clients: Clients[F], + blocker: Blocker, + uri: URI, + destination: Path + ): F[Hash] = { + val stream = clients.download(uri).observe(writeAll[F](destination, blocker)) + Logger[F].info(s"Downloading $uri") *> retryDownload(Hash.fromStream(stream)) + } + + def retryDownload[F[_]: Sync: Timer, A](download: F[A]): F[A] = + retryingOnSomeErrors[A](retryPolicy[F], worthRetrying, onError[F])(download) + + def retryPolicy[F[_]: Applicative]: RetryPolicy[F] = + RetryPolicies.fullJitter[F](1500.milliseconds).join(RetryPolicies.limitRetries[F](5)) + + def worthRetrying(e: Throwable): Boolean = + e match { + case _: Clients.DownloadingFailure => true + case _: IllegalArgumentException => false + case NonFatal(_) => false + } + + def onError[F[_]: Sync](error: Throwable, details: RetryDetails): F[Unit] = + if (details.givingUp) + Logger[F].error(show"Failed to download an asset after ${details.retriesSoFar}. ${error.getMessage}. Aborting the job") + else if (details.retriesSoFar == 0) + Logger[F].warn(show"Failed to download an asset. ${error.getMessage}. Keep retrying") + else + Logger[F].warn( + show"Failed to download an asset after ${details.retriesSoFar} retries, " + + show"waiting for ${details.cumulativeDelay.toMillis} ms. ${error.getMessage}. " + + show"Keep retrying" + ) +} diff --git a/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/Enrich.scala b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/Enrich.scala new file mode 100644 index 000000000..57aacb34b --- /dev/null +++ b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/Enrich.scala @@ -0,0 +1,154 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.fs2 + +import java.time.Instant +import java.util.Base64 +import java.util.concurrent.TimeUnit + +import org.joda.time.DateTime + +import cats.data.{NonEmptyList, ValidatedNel} +import cats.implicits._ + +import cats.effect.{Clock, Concurrent, ContextShift, Sync} + +import fs2.Stream + +import _root_.io.sentry.SentryClient + +import _root_.io.circe.Json +import _root_.io.circe.syntax._ + +import _root_.io.chrisdavenport.log4cats.Logger +import _root_.io.chrisdavenport.log4cats.slf4j.Slf4jLogger + +import com.snowplowanalytics.iglu.client.Client + +import com.snowplowanalytics.snowplow.badrows.{Processor, BadRow, Failure, Payload => BadRowPayload} + +import com.snowplowanalytics.snowplow.enrich.common.EtlPipeline +import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent +import com.snowplowanalytics.snowplow.enrich.common.adapters.AdapterRegistry +import com.snowplowanalytics.snowplow.enrich.common.loaders.{CollectorPayload, ThriftLoader} +import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry + +object Enrich { + + /** + * Parallelism of an enrich stream. + * Unlike for thread pools it doesn't make much sense to use `CPUs x 2` formulae + * as we're not sizing threads, but fibers and memory is the only cost of them + */ + val ConcurrencyLevel = 64 + + /** Default adapter registry, can be constructed dynamically in future */ + val adapterRegistry = new AdapterRegistry() + + val processor: Processor = Processor(generated.BuildInfo.name, generated.BuildInfo.version) + + private implicit def unsafeLogger[F[_]: Sync]: Logger[F] = + Slf4jLogger.getLogger[F] + + /** + * Run a primary enrichment stream, reading from [[Environment]] source, enriching + * via [[enrichWith]] and sinking into [[GoodSink]] and [[BadSink]] respectively. + * Can be stopped via _stop signal_ from [[Environment]] + * + * The stream won't download any enrichment DBs. [[Assets]] stream or + * [[Assets.download]] should be responsible for that + */ + def run[F[_]: Concurrent: ContextShift: Clock](env: Environment[F]): Stream[F, Unit] = { + val registry: F[EnrichmentRegistry[F]] = env.enrichments.get.map(_.registry) + val enrich: Enrich[F] = enrichWith[F](registry, env.igluClient, env.sentry, env.metrics.enrichLatency) + val badSink: BadSink[F] = _.evalTap(_ => env.metrics.badCount).through(env.bad) + val goodSink: GoodSink[F] = _.evalTap(_ => env.metrics.goodCount).through(env.good) + + env.source + .pauseWhen(env.pauseEnrich) + .evalTap(_ => env.metrics.rawCount) + .parEvalMapUnordered(ConcurrencyLevel)(payload => env.blocker.blockOn(enrich(payload))) + .flatMap(_.decompose[BadRow, EnrichedEvent]) + .observeEither(badSink, goodSink) + .void + } + + /** + * Enrich a single [[CollectorPayload]] to get list of bad rows and/or enriched events + * + * Along with actual `ack` the `enrichLatency` gauge will be updated + */ + def enrichWith[F[_]: Clock: Sync]( + enrichRegistry: F[EnrichmentRegistry[F]], + igluClient: Client[F, Json], + sentry: Option[SentryClient], + enrichLatency: Option[Long] => F[Unit] + )( + row: Payload[F, Array[Byte]] + ): F[Result[F]] = { + val payload = ThriftLoader.toCollectorPayload(row.data, processor) + val collectorTstamp = payload.toOption.flatMap(_.flatMap(_.context.timestamp).map(_.getMillis)) + + val result = + for { + _ <- Logger[F].debug(payloadToString(payload)) + etlTstamp <- Clock[F].realTime(TimeUnit.MILLISECONDS).map(millis => new DateTime(millis)) + registry <- enrichRegistry + enriched <- EtlPipeline.processEvents[F](adapterRegistry, registry, igluClient, processor, etlTstamp, payload) + trackLatency = enrichLatency(collectorTstamp) + } yield Payload(enriched, trackLatency *> row.finalise) + + result.handleErrorWith(sendToSentry[F](row, sentry)) + } + + /** Stringify `ThriftLoader` result for debugging purposes */ + def payloadToString(payload: ValidatedNel[BadRow.CPFormatViolation, Option[CollectorPayload]]): String = + payload.fold(_.asJson.noSpaces, _.map(_.toBadRowPayload.asJson.noSpaces).getOrElse("None")) + + /** Transform enriched event into canonical TSV */ + def encodeEvent(enrichedEvent: EnrichedEvent): String = + enrichedEvent.getClass.getDeclaredFields + .filterNot(_.getName.equals("pii")) + .map { field => + field.setAccessible(true) + Option(field.get(enrichedEvent)).getOrElse("") + } + .mkString("\t") + + /** Log an error, turn the problematic [[CollectorPayload]] into [[BadRow]] and notify Sentry if configured */ + def sendToSentry[F[_]: Sync: Clock](original: Payload[F, Array[Byte]], sentry: Option[SentryClient])(error: Throwable): F[Result[F]] = + for { + _ <- Logger[F].error("Runtime exception during payload enrichment. CollectorPayload converted to generic_error and ack'ed") + now <- Clock[F].realTime(TimeUnit.MILLISECONDS).map(Instant.ofEpochMilli) + _ <- original.finalise + badRow = genericBadRow(original.data, now, error) + _ <- sentry match { + case Some(client) => + Sync[F].delay(client.sendException(error)) + case None => + Sync[F].unit + } + } yield Payload(List(badRow.invalid), Sync[F].unit) + + /** Build a `generic_error` bad row for unhandled runtime errors */ + def genericBadRow( + row: Array[Byte], + time: Instant, + error: Throwable + ): BadRow.GenericError = { + val base64 = new String(Base64.getEncoder.encode(row)) + val rawPayload = BadRowPayload.RawPayload(base64) + val failure = Failure.GenericFailure(time, NonEmptyList.one(error.toString)) + BadRow.GenericError(processor, failure, rawPayload) + } +} diff --git a/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/Environment.scala b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/Environment.scala new file mode 100644 index 000000000..4ead444d9 --- /dev/null +++ b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/Environment.scala @@ -0,0 +1,185 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.fs2 + +import scala.concurrent.duration.FiniteDuration + +import cats.Show +import cats.data.EitherT +import cats.implicits._ + +import cats.effect.{Async, Blocker, Clock, ConcurrentEffect, ContextShift, Resource, Sync, Timer} +import cats.effect.concurrent.Ref + +import fs2.concurrent.SignallingRef + +import _root_.io.circe.Json +import _root_.io.circe.syntax._ + +import _root_.io.sentry.{Sentry, SentryClient} +import _root_.io.chrisdavenport.log4cats.Logger +import _root_.io.chrisdavenport.log4cats.slf4j.Slf4jLogger +import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData} +import com.snowplowanalytics.iglu.core.circe.implicits._ + +import com.snowplowanalytics.iglu.client.Client + +import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry +import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf +import com.snowplowanalytics.snowplow.enrich.fs2.config.{CliConfig, ConfigFile} +import com.snowplowanalytics.snowplow.enrich.fs2.io.{FileSystem, Metrics, Sinks, Source} + +/** + * All allocated resources, configs and mutable variables necessary for running Enrich process + * + * @param igluClient Iglu Client + * @param enrichments enrichment registry with all clients and parsed configuration files + * it's wrapped in mutable variable because all resources need to be + * reinitialized after DB assets are updated via [[Assets]] stream + * @param pauseEnrich a signalling reference that can pause a raw stream and enrichment, + * used by [[Assets]] + * @param assetsState a main entity from [[Assets]] stream, controlling when assets + * have to be replaced with newer ones + * @param blocker thread pool for blocking operations and enrichments themselves + * @param source a stream of raw collector payloads + * @param good a sink for successfully enriched events + * @param bad a sink for events that failed validation or enrichment + * @param sentry optional sentry client + * @param metrics common counters + * @param assetsUpdatePeriod time after which enrich assets should be refresh + * @param metricsReportPeriod period after which metrics are updated + */ +final case class Environment[F[_]]( + igluClient: Client[F, Json], + enrichments: Ref[F, Environment.Enrichments[F]], + pauseEnrich: SignallingRef[F, Boolean], + assetsState: Assets.State[F], + blocker: Blocker, + source: RawSource[F], + good: GoodSink[F], + bad: BadSink[F], + sentry: Option[SentryClient], + metrics: Metrics[F], + assetsUpdatePeriod: Option[FiniteDuration], + metricsReportPeriod: Option[FiniteDuration] +) + +object Environment { + + private implicit def unsafeLogger[F[_]: Sync]: Logger[F] = + Slf4jLogger.getLogger[F] + + type Parsed[F[_], A] = EitherT[F, String, A] + + type Allocated[F[_]] = Parsed[F, Resource[F, Environment[F]]] + + /** Registry with all allocated clients (MaxMind, IAB etc) and their original configs */ + final case class Enrichments[F[_]](registry: EnrichmentRegistry[F], configs: List[EnrichmentConf]) { + + /** Initialize same enrichments, specified by configs (in case DB files updated) */ + def reinitialize(implicit A: Async[F]): F[Enrichments[F]] = + Enrichments.buildRegistry(configs).map(registry => Enrichments(registry, configs)) + } + + object Enrichments { + def make[F[_]: Async: Clock](configs: List[EnrichmentConf]): Resource[F, Ref[F, Enrichments[F]]] = + Resource.liftF { + for { + registry <- buildRegistry[F](configs) + ref <- Ref.of(Enrichments[F](registry, configs)) + } yield ref + } + + def buildRegistry[F[_]: Async](configs: List[EnrichmentConf]) = + EnrichmentRegistry.build[F](configs).value.flatMap { + case Right(reg) => Async[F].pure(reg) + case Left(error) => Async[F].raiseError[EnrichmentRegistry[F]](new RuntimeException(error)) + } + } + + /** Schema for all enrichments combined */ + val EnrichmentsKey: SchemaKey = + SchemaKey("com.snowplowanalytics.snowplow", "enrichments", "jsonschema", SchemaVer.Full(1, 0, 0)) + + /** Initialize and allocate all necessary resources */ + def make[F[_]: ConcurrentEffect: ContextShift: Clock: Timer](config: CliConfig): Allocated[F] = + parse[F](config).map { parsedConfigs => + val file = parsedConfigs.configFile + for { + client <- Client.parseDefault[F](parsedConfigs.igluJson).resource + blocker <- Blocker[F] + metrics <- Metrics.resource[F] + rawSource = Source.read[F](blocker, file.auth, file.input) + goodSink <- Sinks.goodSink[F](blocker, file.auth, file.good) + badSink <- Sinks.badSink[F](blocker, file.auth, file.bad) + stop <- Resource.liftF(SignallingRef(true)) + assets = parsedConfigs.enrichmentConfigs.flatMap(_.filesToCache) + assets <- Assets.State.make[F](blocker, stop, assets) + enrichments <- Enrichments.make[F](parsedConfigs.enrichmentConfigs) + sentry <- file.sentry.map(_.dsn) match { + case Some(dsn) => Resource.liftF[F, Option[SentryClient]](Sync[F].delay(Sentry.init(dsn.toString).some)) + case None => Resource.pure[F, Option[SentryClient]](none[SentryClient]) + } + } yield Environment[F](client, + enrichments, + stop, + assets, + blocker, + rawSource, + goodSink, + badSink, + sentry, + metrics, + file.assetsUpdatePeriod, + file.metricsReportPeriod + ) + } + + /** Decode base64-encoded configs, passed via CLI. Read files, validate and parse */ + def parse[F[_]: Async: Clock: ContextShift](config: CliConfig): Parsed[F, ParsedConfigs] = + for { + igluJson <- config.resolver.fold(b => EitherT.rightT[F, String](b.value), p => FileSystem.readJson[F](p)) + enrichmentJsons <- config.enrichments match { + case Left(base64) => + EitherT.rightT[F, String](base64.value) + case Right(path) => + FileSystem + .readJsonDir[F](path) + .map(jsons => Json.arr(jsons: _*)) + .map(json => SelfDescribingData(EnrichmentsKey, json).asJson) + } + configFile <- ConfigFile.parse[F](config.config) + client <- Client.parseDefault[F](igluJson).leftMap(x => show"Cannot decode Iglu Client. $x") + _ <- EitherT.liftF(Logger[F].info(show"Parsed Iglu Client with following registries: ${client.resolver.repos.map(_.config.name)}")) + configs <- EitherT(EnrichmentRegistry.parse[F](enrichmentJsons, client, false).map(_.toEither)).leftMap { x => + show"Cannot decode enrichments ${x.mkString_(", ")}" + } + _ <- EitherT.liftF(Logger[F].info(show"Parsed following enrichments: ${configs.map(_.schemaKey.toSchemaUri)}")) + } yield ParsedConfigs(igluJson, configs, configFile) + + private[fs2] final case class ParsedConfigs( + igluJson: Json, + enrichmentConfigs: List[EnrichmentConf], + configFile: ConfigFile + ) + + private implicit class EitherTOps[F[_], E: Show, A](eitherT: EitherT[F, E, A]) { + def resource(implicit F: Sync[F]): Resource[F, A] = { + val action: F[A] = eitherT.value.flatMap { + case Right(a) => Sync[F].pure(a) + case Left(error) => Sync[F].raiseError(new RuntimeException(error.show)) // Safe since we already parsed it + } + Resource.liftF[F, A](action) + } + } +} diff --git a/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/Main.scala b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/Main.scala new file mode 100644 index 000000000..a78acc394 --- /dev/null +++ b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/Main.scala @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.fs2 + +import cats.effect.{ExitCode, IO, IOApp} + +import _root_.io.sentry.SentryClient + +import _root_.io.chrisdavenport.log4cats.Logger +import _root_.io.chrisdavenport.log4cats.slf4j.Slf4jLogger + +import com.snowplowanalytics.snowplow.enrich.fs2.io.Metrics + +object Main extends IOApp { + + private implicit val logger: Logger[IO] = + Slf4jLogger.getLogger[IO] + + def run(args: List[String]): IO[ExitCode] = + config.CliConfig.command.parse(args) match { + case Right(cfg) => + for { + _ <- logger.info("Initialising resources for Enrich job") + environment <- Environment.make[IO](cfg).value + exit <- environment match { + case Right(e) => + e.use { env => + val log = logger.info("Running enrichment stream") + val enrich = Enrich.run[IO](env) + val updates = Assets.run[IO](env) + val reporting = Metrics.run[IO](env) + val flow = enrich.merge(updates).merge(reporting) + log *> flow.compile.drain.attempt.flatMap { + case Left(exception) => + unsafeSendSentry(exception, env.sentry) + IO.raiseError[ExitCode](exception).as(ExitCode.Error) + case Right(_) => + IO.pure(ExitCode.Success) + } + } + case Left(error) => + logger.error(s"Cannot initialise enrichment resources\n$error").as(ExitCode.Error) + } + } yield exit + case Left(error) => + IO(System.err.println(error)).as(ExitCode.Error) + } + + /** Last attempt to notify about an exception (possibly just interruption) */ + private def unsafeSendSentry(error: Throwable, sentry: Option[SentryClient]): Unit = { + sentry match { + case Some(client) => + client.sendException(error) + case None => () + } + logger.error(s"The Enrich job has stopped ${sentry.fold("")(_ => "Sentry report has been sent")}").unsafeRunSync() + } +} diff --git a/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/Payload.scala b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/Payload.scala new file mode 100644 index 000000000..10bd2b725 --- /dev/null +++ b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/Payload.scala @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.fs2 + +import scala.annotation.tailrec + +import cats.Applicative +import cats.syntax.either._ +import cats.data.Validated + +import fs2.{Pure, Stream} + +import com.snowplowanalytics.snowplow.enrich.fs2.Payload.Parsed + +/** + * Anything that has been read from [[RawSource]] and needs to be acknowledged + * or a derivative (parsed `A`) that can be used to acknowledge the original message + * @param data original data or anything it has been transformed to + * @param finalise a side-effect to acknowledge (commit, log on-finish) the message or + * no-op in case the original message has been flattened into + * multiple rows and only last row contains the actual side-effect + */ +case class Payload[F[_], A](data: A, finalise: F[Unit]) { + + /** + * Flatten all payloads from a list and replace an `ack` action to no-op everywhere + * except last message, so that original collector payload (with multiple events) + * will be ack'ed only when last event has sunk into good or bad sink + */ + def decompose[L, R](implicit ev: A <:< List[Validated[L, R]], F: Applicative[F]): Stream[F, Parsed[F, L, R]] = { + val _ = ev + val noop: F[Unit] = Applicative[F].unit + def use(op: F[Unit])(v: Validated[L, R]): Parsed[F, L, R] = + v.fold(a => Payload(a, op).asLeft, b => Payload(b, op).asRight) + + Payload.mapWithLast(use(noop), use(finalise))(data) + } +} + +object Payload { + + /** + * Original [[Payload]] that has been transformed into either `A` or `B` + * Despite of the result (`A` or `B`) the original one still has to be acknowledged + * + * If original contained only one row (good or bad), the `Parsed` must have a real + * `ack` action, otherwise if it has been accompanied by other rows, only the last + * element from the original will contain the `ack`, all others just `noop` + */ + type Parsed[F[_], A, B] = Either[Payload[F, A], Payload[F, B]] + + /** Apply `f` function to all elements in a list, except last one, where `lastF` applied */ + def mapWithLast[A, B](f: A => B, lastF: A => B)(as: List[A]): Stream[Pure, B] = { + @tailrec + def go(aas: List[A], accum: Vector[B]): Vector[B] = + aas match { + case Nil => + accum + case last :: Nil => + accum :+ lastF(last) + case a :: remaining => + go(remaining, accum :+ f(a)) + } + + Stream.emits(go(as, Vector.empty)) + } +} diff --git a/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/config/Base64Hocon.scala b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/config/Base64Hocon.scala new file mode 100644 index 000000000..e40037ba3 --- /dev/null +++ b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/config/Base64Hocon.scala @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.fs2.config + +import java.util.Base64 + +import cats.data.ValidatedNel +import cats.syntax.either._ + +import com.typesafe.config.{ConfigException, ConfigFactory} + +import _root_.io.circe.Json + +import pureconfig.syntax._ +import pureconfig.module.circe._ + +import com.monovore.decline.Argument + +final case class Base64Hocon(value: Json) extends AnyVal + +object Base64Hocon { + + private val base64 = Base64.getDecoder + + implicit val base64Hocon: Argument[Base64Hocon] = + new Argument[Base64Hocon] { + def read(string: String): ValidatedNel[String, Base64Hocon] = { + val result = for { + bytes <- Either.catchOnly[IllegalArgumentException](base64.decode(string)).leftMap(_.getMessage) + hocon <- parseHocon(new String(bytes)) + } yield hocon + result.toValidatedNel + } + + def defaultMetavar: String = "base64" + } + + def parseHocon(str: String): Either[String, Base64Hocon] = + for { + configValue <- Either.catchOnly[ConfigException](ConfigFactory.parseString(str)).leftMap(_.toString).map(_.toConfig) + json <- configValue.to[Json].leftMap(_.toString) + } yield Base64Hocon(json) +} diff --git a/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/config/Base64Json.scala b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/config/Base64Json.scala new file mode 100644 index 000000000..ab8fc4879 --- /dev/null +++ b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/config/Base64Json.scala @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.fs2.config + +import java.util.Base64 + +import cats.data.ValidatedNel +import cats.syntax.show._ +import cats.syntax.either._ + +import _root_.io.circe.Json +import _root_.io.circe.parser.parse + +import com.monovore.decline.Argument + +final case class Base64Json(value: Json) extends AnyVal + +object Base64Json { + + private val base64 = Base64.getDecoder + + implicit val base64Json: Argument[Base64Json] = + new Argument[Base64Json] { + + def read(string: String): ValidatedNel[String, Base64Json] = { + val result = for { + bytes <- Either.catchOnly[IllegalArgumentException](base64.decode(string)).leftMap(_.getMessage) + str = new String(bytes) + json <- parse(str).leftMap(_.show) + } yield Base64Json(json) + result.toValidatedNel + } + + def defaultMetavar: String = "base64" + } +} diff --git a/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/config/CliConfig.scala b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/config/CliConfig.scala new file mode 100644 index 000000000..d15a00dab --- /dev/null +++ b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/config/CliConfig.scala @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.fs2.config + +import java.nio.file.Path + +import cats.data.{NonEmptyList, ValidatedNel} +import cats.implicits._ + +import com.monovore.decline.{Argument, Command, Opts} + +import com.snowplowanalytics.snowplow.enrich.fs2.generated.BuildInfo + +final case class CliConfig( + config: EncodedHoconOrPath, + resolver: EncodedOrPath, + enrichments: EncodedOrPath +) + +object CliConfig { + + implicit val encodedOrPathArgument: Argument[EncodedOrPath] = + new Argument[EncodedOrPath] { + def read(string: String): ValidatedNel[String, EncodedOrPath] = { + val encoded = Argument[Base64Json].read(string).map(_.asLeft) + val path = Argument[Path].read(string).map(_.asRight) + val error = show"Value $string cannot be parsed as Base64 JSON neither as FS path" + encoded.orElse(path).leftMap(_ => NonEmptyList.one(error)) + } + + def defaultMetavar: String = "input" + } + + implicit val encodedHoconOrPathArgument: Argument[EncodedHoconOrPath] = + new Argument[EncodedHoconOrPath] { + def read(string: String): ValidatedNel[String, EncodedHoconOrPath] = { + val encoded = Argument[Base64Hocon].read(string).map(_.asLeft) + val path = Argument[Path].read(string).map(_.asRight) + val error = show"Value $string cannot be parsed as Base64 JSON neither as FS path" + encoded.orElse(path).leftMap(_ => NonEmptyList.one(error)) + } + + def defaultMetavar: String = "input" + } + + val configFile: Opts[EncodedHoconOrPath] = + Opts.option[EncodedHoconOrPath]("config", "Base64-encoded HOCON string with enrichment configurations", "c", "base64") + + val enrichments: Opts[EncodedOrPath] = + Opts.option[EncodedOrPath]("enrichments", "Base64-encoded JSON string with enrichment configurations", "e", "base64") + + val igluConfig: Opts[EncodedOrPath] = + Opts.option[EncodedOrPath]("iglu-config", "Iglu resolver configuration JSON", "r", "base64") + + val enrichedJobConfig: Opts[CliConfig] = + (configFile, igluConfig, enrichments).mapN(CliConfig.apply) + + val command: Command[CliConfig] = + Command(show"${BuildInfo.name}", show"${BuildInfo.name} ${BuildInfo.version}\n${BuildInfo.description}")(enrichedJobConfig) +} diff --git a/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/config/ConfigFile.scala b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/config/ConfigFile.scala new file mode 100644 index 000000000..a062f0d80 --- /dev/null +++ b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/config/ConfigFile.scala @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.fs2.config + +import scala.concurrent.duration.FiniteDuration + +import cats.data.EitherT +import cats.implicits._ +import cats.effect.{Blocker, ContextShift, Sync} + +import _root_.io.circe.{Decoder, Encoder, Json} +import _root_.io.circe.config.syntax._ +import _root_.io.circe.generic.extras.semiauto.{deriveConfiguredDecoder, deriveConfiguredEncoder} + +import com.snowplowanalytics.snowplow.enrich.fs2.config.io.{Authentication, Input, Output} + +import pureconfig.ConfigSource +import pureconfig.module.catseffect.syntax._ +import pureconfig.module.circe._ + +/** + * Parsed HOCON configuration file + * + * @param auth authentication details, such as credentials + * @param input input (PubSub, Kinesis etc) + * @param good good enriched output (PubSub, Kinesis, FS etc) + * @param bad bad rows output (PubSub, Kinesis, FS etc) + * @param assetsUpdatePeriod time after which assets should be updated, in minutes + */ +final case class ConfigFile( + auth: Authentication, + input: Input, + good: Output, + bad: Output, + assetsUpdatePeriod: Option[FiniteDuration], + sentry: Option[Sentry], + metricsReportPeriod: Option[FiniteDuration] +) + +object ConfigFile { + + // Missing in circe-config + implicit val finiteDurationEncoder: Encoder[FiniteDuration] = + implicitly[Encoder[String]].contramap(_.toString) + + implicit val configFileDecoder: Decoder[ConfigFile] = + deriveConfiguredDecoder[ConfigFile] + implicit val configFileEncoder: Encoder[ConfigFile] = + deriveConfiguredEncoder[ConfigFile] + + def parse[F[_]: Sync: ContextShift](in: EncodedHoconOrPath): EitherT[F, String, ConfigFile] = + in match { + case Right(path) => + val result = Blocker[F].use { blocker => + ConfigSource + .default(ConfigSource.file(path)) + .loadF[F, Json](blocker) + .map(_.as[ConfigFile].leftMap(f => show"Couldn't parse the config $f")) + } + result.attemptT.leftMap(_.getMessage).subflatMap(identity) + case Left(encoded) => + EitherT.fromEither[F](encoded.value.as[ConfigFile].leftMap(failure => show"Couldn't parse a base64-encoded config file:\n$failure")) + } +} diff --git a/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/config/Sentry.scala b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/config/Sentry.scala new file mode 100644 index 000000000..3dbe4e6fc --- /dev/null +++ b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/config/Sentry.scala @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.fs2.config + +import java.net.URI + +import cats.syntax.either._ + +import _root_.io.circe.{Decoder, Encoder} +import _root_.io.circe.generic.extras.semiauto._ + +case class Sentry(dsn: URI) + +object Sentry { + + implicit val javaNetUriDecoder: Decoder[URI] = + Decoder[String].emap { str => + Either.catchOnly[IllegalArgumentException](URI.create(str)).leftMap(_.getMessage) + } + + implicit val javaNetUriEncoder: Encoder[URI] = + Encoder[String].contramap(_.toString) + + implicit val authenticationDecoder: Decoder[Sentry] = + deriveConfiguredDecoder[Sentry] + implicit val authenticationEncoder: Encoder[Sentry] = + deriveConfiguredEncoder[Sentry] +} diff --git a/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/config/io.scala b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/config/io.scala new file mode 100644 index 000000000..3ca7d7f3e --- /dev/null +++ b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/config/io.scala @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.fs2.config + +import java.nio.file.{InvalidPathException, Path, Paths} + +import cats.syntax.either._ + +import _root_.io.circe.{Decoder, Encoder} +import _root_.io.circe.generic.extras.semiauto._ + +object io { + + implicit val javaPathDecoder: Decoder[Path] = + Decoder[String].emap { s => + Either.catchOnly[InvalidPathException](Paths.get(s)).leftMap(_.getMessage) + } + implicit val javaPathEncoder: Encoder[Path] = + Encoder[String].contramap(_.toString) + + sealed trait Authentication extends Product with Serializable + + object Authentication { + final case class Gcp(projectId: String) extends Authentication + + implicit val authenticationDecoder: Decoder[Authentication] = + deriveConfiguredDecoder[Authentication] + implicit val authenticationEncoder: Encoder[Authentication] = + deriveConfiguredEncoder[Authentication] + } + + /** Source of raw collector data (only PubSub supported atm) */ + sealed trait Input + + object Input { + + case class PubSub(subscriptionId: String) extends Input + case class FileSystem(dir: Path) extends Input + + implicit val inputDecoder: Decoder[Input] = + deriveConfiguredDecoder[Input] + implicit val inputEncoder: Encoder[Input] = + deriveConfiguredEncoder[Input] + } + + sealed trait Output + + object Output { + case class PubSub(topic: String) extends Output + case class FileSystem(dir: Path) extends Output + + implicit val outputDecoder: Decoder[Output] = + deriveConfiguredDecoder[Output] + implicit val outputEncoder: Encoder[Output] = + deriveConfiguredEncoder[Output] + } +} diff --git a/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/config/package.scala b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/config/package.scala new file mode 100644 index 000000000..543b9214b --- /dev/null +++ b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/config/package.scala @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.fs2 + +import java.nio.file.Path + +import _root_.io.circe.generic.extras.Configuration + +package object config { + + type EncodedOrPath = Either[Base64Json, Path] + type EncodedHoconOrPath = Either[Base64Hocon, Path] + + private[config] implicit def customCodecConfig: Configuration = + Configuration.default.withDiscriminator("type") + +} diff --git a/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/io/Clients.scala b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/io/Clients.scala new file mode 100644 index 000000000..e217c7af7 --- /dev/null +++ b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/io/Clients.scala @@ -0,0 +1,130 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.fs2.io + +import java.net.URI + +import cats.syntax.option._ +import cats.syntax.functor._ +import cats.syntax.flatMap._ + +import cats.effect.{Blocker, ConcurrentEffect, ContextShift, Resource, Sync} + +import fs2.{RaiseThrowable, Stream} + +import blobstore.Path +import blobstore.s3.S3Store +import blobstore.gcs.GcsStore + +import com.google.cloud.storage.StorageOptions + +import org.http4s.{Request, Uri} +import org.http4s.client.{Client => HttpClient} +import org.http4s.client.blaze.BlazeClientBuilder + +import software.amazon.awssdk.services.s3.S3AsyncClient + +case class Clients[F[_]]( + s3Store: Option[S3Store[F]], + gcsStore: Option[GcsStore[F]], + http: Option[HttpClient[F]] +) { + + /** Download an `uri` as a stream of bytes, using the appropriate client */ + def download(uri: URI)(implicit RT: RaiseThrowable[F]): Stream[F, Byte] = + Clients.Client.getByUri(uri) match { + case Some(Clients.Client.S3) => + for { + s3 <- s3Store match { + case Some(c) => Stream.emit(c) + case None => Stream.raiseError(new IllegalStateException(s"S3 client is not initialized to download $uri")) + } + data <- s3.get(Path(uri.toString), 16 * 1024) + } yield data + case Some(Clients.Client.GCS) => + for { + gcs <- gcsStore match { + case Some(c) => Stream.emit(c) + case None => Stream.raiseError(new IllegalStateException(s"GCS client is not initialized to download $uri")) + } + data <- gcs.get(Path(uri.toString), 16 * 1024) + } yield data + case Some(Clients.Client.HTTP) => + http match { + case Some(c) => + val request = Request[F](uri = Uri.unsafeFromString(uri.toString)) + for { + response <- c.stream(request) + body <- if (response.status.isSuccess) response.body + else Stream.raiseError[F](Clients.DownloadingFailure(uri)) + } yield body + case None => + Stream.raiseError(new IllegalStateException(s"HTTP client is not initialized to download $uri")) + } + case None => + Stream.raiseError(new IllegalStateException(s"No client initialized to download $uri")) + } +} + +object Clients { + + sealed trait Client + object Client { + case object S3 extends Client + case object GCS extends Client + case object HTTP extends Client + + def getByUri(uri: URI): Option[Client] = + uri.getScheme match { + case "http" | "https" => + Some(HTTP) + case "gs" => + Some(GCS) + case "s3" => + None + } + + def required(uris: List[URI]): Set[Client] = + uris.foldLeft(Set.empty[Client]) { (acc, uri) => + getByUri(uri) match { + case Some(client) => acc + client + case None => acc // This should short-circuit on initialisation + } + } + } + + def mkS3[F[_]: ConcurrentEffect]: F[S3Store[F]] = + Sync[F].delay(S3AsyncClient.builder().build()).flatMap(client => S3Store[F](client)) + + def mkGCS[F[_]: ConcurrentEffect: ContextShift](blocker: Blocker): F[GcsStore[F]] = + Sync[F].delay(StorageOptions.getDefaultInstance.getService).map { storage => + GcsStore(storage, blocker, List.empty) + } + + def mkHTTP[F[_]: ConcurrentEffect]: Resource[F, HttpClient[F]] = + BlazeClientBuilder[F](concurrent.ExecutionContext.global).resource + + /** Initialise all necessary clients capable of fetching provides `uris` */ + def make[F[_]: ConcurrentEffect: ContextShift](blocker: Blocker, uris: List[URI]): Resource[F, Clients[F]] = { + val toInit = Client.required(uris) + for { + s3 <- if (toInit.contains(Client.S3)) Resource.liftF(mkS3[F]).map(_.some) else Resource.pure[F, Option[S3Store[F]]](none) + gcs <- if (toInit.contains(Client.GCS)) Resource.liftF(mkGCS[F](blocker).map(_.some)) else Resource.pure[F, Option[GcsStore[F]]](none) + http <- if (toInit.contains(Client.HTTP)) mkHTTP[F].map(_.some) else Resource.pure[F, Option[HttpClient[F]]](none) + } yield Clients(s3, gcs, http) + } + + case class DownloadingFailure(uri: URI) extends Throwable { + override def getMessage: String = s"Cannot download $uri" + } +} diff --git a/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/io/FileSystem.scala b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/io/FileSystem.scala new file mode 100644 index 000000000..3abffd966 --- /dev/null +++ b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/io/FileSystem.scala @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.fs2.io + +import java.nio.file.{Files, Path} + +import scala.collection.JavaConverters._ + +import cats.data.EitherT + +import cats.effect.Sync +import cats.implicits._ + +import fs2.Stream + +import _root_.io.circe.Json +import _root_.io.circe.parser.parse + +object FileSystem { + + def list[F[_]: Sync](dir: Path) = + for { + paths <- Stream.eval(Sync[F].delay(Files.list(dir))) + path <- Stream.fromIterator(paths.iterator().asScala) + } yield path + + def readJson[F[_]: Sync](path: Path): EitherT[F, String, Json] = + EitherT(Sync[F].delay(Files.readString(path)).map(parse)) + .leftMap(e => show"Cannot read resolver config. ${e.getMessage()}") + + def readJsonDir[F[_]: Sync](dir: Path): EitherT[F, String, List[Json]] = { + val action = list(dir).compile.toList + .flatMap(paths => paths.traverse(x => readJson(x).value)) + .map(_.sequence) + EitherT(action) + } +} diff --git a/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/io/Metrics.scala b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/io/Metrics.scala new file mode 100644 index 000000000..fa2d37b6d --- /dev/null +++ b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/io/Metrics.scala @@ -0,0 +1,113 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.fs2.io + +import cats.effect.{Resource, Sync, Timer} + +import fs2.Stream + +import com.codahale.metrics.{Gauge, MetricRegistry, Slf4jReporter} + +import org.slf4j.LoggerFactory + +import com.snowplowanalytics.snowplow.enrich.fs2.Environment + +trait Metrics[F[_]] { + + /** Send latest metrics to reporter */ + def report: F[Unit] + + /** + * Track latency between collector hit and enrichment + * This function gets current timestamp by itself + */ + def enrichLatency(collectorTstamp: Option[Long]): F[Unit] + + /** Increment raw payload count */ + def rawCount: F[Unit] + + /** Increment good enriched events */ + def goodCount: F[Unit] + + /** Increment bad events */ + def badCount: F[Unit] +} + +object Metrics { + + val LoggerName = "enrich.metrics" + val LatencyGaugeName = "enrich.metrics.latency" + val RawCounterName = "enrich.metrics.raw.count" + val GoodCounterName = "enrich.metrics.good.count" + val BadCounterName = "enrich.metrics.bad.count" + + def run[F[_]: Sync: Timer](env: Environment[F]): Stream[F, Unit] = + env.metricsReportPeriod match { + case Some(period) => + Stream.awakeEvery[F](period).evalMap(_ => env.metrics.report) + case None => + Stream.empty.covary[F] + } + + /** + * Technically `Resource` doesn't give us much as we don't allocate a thread pool, + * but it will make sure the last report is issued + */ + def resource[F[_]: Sync]: Resource[F, Metrics[F]] = + Resource + .make(init) { case (res, _) => Sync[F].delay(res.close()) } + .map { case (res, reg) => make[F](res, reg) } + + /** Initialise backend resources */ + def init[F[_]: Sync]: F[(Slf4jReporter, MetricRegistry)] = + Sync[F].delay { + val registry = new MetricRegistry() + val logger = LoggerFactory.getLogger(LoggerName) + val reporter = Slf4jReporter.forRegistry(registry).outputTo(logger).build() + (reporter, registry) + } + + def make[F[_]: Sync](reporter: Slf4jReporter, registry: MetricRegistry): Metrics[F] = + new Metrics[F] { + val rawCounter = registry.counter(RawCounterName) + val goodCounter = registry.counter(GoodCounterName) + val badCounter = registry.counter(BadCounterName) + + def report: F[Unit] = + Sync[F].delay(reporter.report()) + + def enrichLatency(collectorTstamp: Option[Long]): F[Unit] = + collectorTstamp match { + case Some(delay) => + Sync[F].delay { + registry.remove(LatencyGaugeName) + val _ = registry.register(LatencyGaugeName, + new Gauge[Long] { + def getValue: Long = System.currentTimeMillis() - delay + } + ) + } + case None => + Sync[F].unit + } + + def rawCount: F[Unit] = + Sync[F].delay(rawCounter.inc()) + + def goodCount: F[Unit] = + Sync[F].delay(goodCounter.inc()) + + def badCount: F[Unit] = + Sync[F].delay(badCounter.inc()) + } +} diff --git a/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/io/Sinks.scala b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/io/Sinks.scala new file mode 100644 index 000000000..cc20e41cc --- /dev/null +++ b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/io/Sinks.scala @@ -0,0 +1,97 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.fs2.io + +import java.nio.file.{Path, StandardOpenOption} + +import scala.concurrent.duration._ + +import cats.syntax.apply._ +import cats.syntax.functor._ + +import cats.effect.{Async, Blocker, ContextShift, Resource, Sync} + +import fs2.{Pipe, Stream, text} +import fs2.io.file.writeAll + +import com.snowplowanalytics.snowplow.badrows.BadRow +import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent +import com.snowplowanalytics.snowplow.enrich.fs2.{BadSink, Enrich, GoodSink, Payload} +import com.snowplowanalytics.snowplow.enrich.fs2.config.io.{Authentication, Output} + +import com.permutive.pubsub.producer.Model.{ProjectId, Topic} +import com.permutive.pubsub.producer.encoder.MessageEncoder +import com.permutive.pubsub.producer.grpc.{GooglePubsubProducer, PubsubProducerConfig} +import io.chrisdavenport.log4cats.Logger +import io.chrisdavenport.log4cats.slf4j.Slf4jLogger + +object Sinks { + + private implicit def unsafeLogger[F[_]: Sync]: Logger[F] = + Slf4jLogger.getLogger[F] + + def goodSink[F[_]: Async: ContextShift]( + blocker: Blocker, + auth: Authentication, + output: Output + ): Resource[F, GoodSink[F]] = + (auth, output) match { + case (a: Authentication.Gcp, o: Output.PubSub) => + pubsubSink[F, EnrichedEvent](a, o) + case (_, o: Output.FileSystem) => + Resource.pure(goodFileSink(o.dir, blocker)) + } + + def badSink[F[_]: Async: ContextShift]( + blocker: Blocker, + auth: Authentication, + output: Output + ): Resource[F, BadSink[F]] = + (auth, output) match { + case (a: Authentication.Gcp, o: Output.PubSub) => + pubsubSink[F, BadRow](a, o) + case (_, o: Output.FileSystem) => + Resource.pure(badFileSink(o.dir, blocker)) + } + + def pubsubSink[F[_]: Async, A: MessageEncoder]( + auth: Authentication.Gcp, + output: Output.PubSub + ): Resource[F, Pipe[F, Payload[F, A], Unit]] = { + val config = PubsubProducerConfig[F]( + batchSize = 10, + delayThreshold = 5.seconds, + onFailedTerminate = err => Logger[F].error(err)("PubSub sink termination error") + ) + + GooglePubsubProducer + .of[F, A](ProjectId(auth.projectId), Topic(output.topic), config) + .map(producer => (s: Stream[F, Payload[F, A]]) => s.evalMap(row => producer.produce(row.data) *> row.finalise)) + } + + def goodFileSink[F[_]: Sync: ContextShift](goodOut: Path, blocker: Blocker): GoodSink[F] = + goodStream => + goodStream + .evalMap(p => p.finalise.as(Enrich.encodeEvent(p.data))) + .intersperse("\n") + .through(text.utf8Encode) + .through(writeAll[F](goodOut, blocker, List(StandardOpenOption.CREATE_NEW))) + + def badFileSink[F[_]: Sync: ContextShift](badOut: Path, blocker: Blocker): BadSink[F] = + badStream => + badStream + .evalMap(p => p.finalise.as(p.data.compact)) + .intersperse("\n") + .through(text.utf8Encode) + .through(writeAll[F](badOut, blocker, List(StandardOpenOption.CREATE_NEW))) +} diff --git a/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/io/Source.scala b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/io/Source.scala new file mode 100644 index 000000000..38c099572 --- /dev/null +++ b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/io/Source.scala @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2019 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.fs2.io + +import cats.effect.{Blocker, Concurrent, ContextShift, Sync} +import cats.implicits._ + +import fs2.Stream +import fs2.io.file.{directoryStream, readAll} + +import com.permutive.pubsub.consumer.Model +import com.permutive.pubsub.consumer.grpc.{PubsubGoogleConsumer, PubsubGoogleConsumerConfig} + +import com.snowplowanalytics.snowplow.enrich.fs2.{Payload, RawSource} +import com.snowplowanalytics.snowplow.enrich.fs2.config.io.{Authentication, Input} + +import com.google.pubsub.v1.PubsubMessage + +object Source { + + def read[F[_]: Concurrent: ContextShift]( + blocker: Blocker, + auth: Authentication, + input: Input + ): RawSource[F] = + (auth, input) match { + case (g: Authentication.Gcp, p: Input.PubSub) => + pubSub(blocker, g, p) + case (_, p: Input.FileSystem) => + directoryStream(blocker, p.dir).evalMap { file => + readAll[F](file, blocker, 4096).compile + .to(Array) + .map(bytes => Payload(bytes, Sync[F].unit)) + } + } + + def pubSub[F[_]: Concurrent: ContextShift]( + blocker: Blocker, + auth: Authentication.Gcp, + input: Input.PubSub + ): Stream[F, Payload[F, Array[Byte]]] = { + val onFailedTerminate: Throwable => F[Unit] = + e => Sync[F].delay(System.err.println(s"Cannot terminate ${e.getMessage}")) + val pubSubConfig = PubsubGoogleConsumerConfig(onFailedTerminate = onFailedTerminate) + val projectId = Model.ProjectId(auth.projectId) + val subscriptionId = Model.Subscription(input.subscriptionId) + val errorHandler: (PubsubMessage, Throwable, F[Unit], F[Unit]) => F[Unit] = // Should be useless + (message, error, _, _) => + Sync[F].delay(System.err.println(s"Cannot decode message ${message.getMessageId} into array of bytes. ${error.getMessage}")) + PubsubGoogleConsumer + .subscribe[F, Array[Byte]](blocker, projectId, subscriptionId, errorHandler, pubSubConfig) + .map(record => Payload(record.value, record.ack)) + } +} diff --git a/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/io/package.scala b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/io/package.scala new file mode 100644 index 000000000..9ad3e5eca --- /dev/null +++ b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/io/package.scala @@ -0,0 +1,31 @@ +package com.snowplowanalytics.snowplow.enrich.fs2 + +import cats.syntax.either._ + +import com.permutive.pubsub.consumer.decoder.MessageDecoder +import com.permutive.pubsub.producer.encoder.MessageEncoder + +import com.snowplowanalytics.snowplow.badrows.BadRow + +import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent + +package object io { + + implicit val badRowEncoder: MessageEncoder[BadRow] = + new MessageEncoder[BadRow] { + def encode(a: BadRow): Either[Throwable, Array[Byte]] = + a.compact.getBytes.asRight + } + + implicit val enrichedEventEncoder: MessageEncoder[EnrichedEvent] = + new MessageEncoder[EnrichedEvent] { + def encode(enrichedEvent: EnrichedEvent): Either[Throwable, Array[Byte]] = + Enrich.encodeEvent(enrichedEvent).getBytes.asRight + } + + implicit val byteArrayMessageDecoder: MessageDecoder[Array[Byte]] = + new MessageDecoder[Array[Byte]] { + def decode(message: Array[Byte]): Either[Throwable, Array[Byte]] = + message.asRight + } +} diff --git a/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/package.scala b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/package.scala new file mode 100644 index 000000000..57d85526b --- /dev/null +++ b/modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/package.scala @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich + +import cats.data.Validated + +import _root_.fs2.{Pipe, Stream} + +import com.snowplowanalytics.snowplow.badrows.BadRow + +import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent + +package object fs2 { + + /** Raw Thrift payloads coming from a collector */ + type RawSource[F[_]] = Stream[F, Payload[F, Array[Byte]]] + + type BadSink[F[_]] = Pipe[F, Payload[F, BadRow], Unit] + type GoodSink[F[_]] = Pipe[F, Payload[F, EnrichedEvent], Unit] + + /** Enrichment result, containing list of (valid and invalid) results */ + type Result[F[_]] = Payload[F, List[Validated[BadRow, EnrichedEvent]]] + + /** Function to transform an origin raw payload into good and/or bad rows */ + type Enrich[F[_]] = Payload[F, Array[Byte]] => F[Result[F]] +} diff --git a/modules/fs2/src/test/resources/com.snowplowanalytics.snowplow.enrich.fs2/assets-refresh/geoip2-city-1.mmdb b/modules/fs2/src/test/resources/com.snowplowanalytics.snowplow.enrich.fs2/assets-refresh/geoip2-city-1.mmdb new file mode 100644 index 000000000..902c8d78d Binary files /dev/null and b/modules/fs2/src/test/resources/com.snowplowanalytics.snowplow.enrich.fs2/assets-refresh/geoip2-city-1.mmdb differ diff --git a/modules/fs2/src/test/resources/com.snowplowanalytics.snowplow.enrich.fs2/assets-refresh/geoip2-city-2.mmdb b/modules/fs2/src/test/resources/com.snowplowanalytics.snowplow.enrich.fs2/assets-refresh/geoip2-city-2.mmdb new file mode 100644 index 000000000..e76492b80 Binary files /dev/null and b/modules/fs2/src/test/resources/com.snowplowanalytics.snowplow.enrich.fs2/assets-refresh/geoip2-city-2.mmdb differ diff --git a/modules/fs2/src/test/resources/com.snowplowanalytics.snowplow.enrich.fs2/assets-refresh/geoip2-city-3.mmdb b/modules/fs2/src/test/resources/com.snowplowanalytics.snowplow.enrich.fs2/assets-refresh/geoip2-city-3.mmdb new file mode 100644 index 000000000..0a9098648 Binary files /dev/null and b/modules/fs2/src/test/resources/com.snowplowanalytics.snowplow.enrich.fs2/assets-refresh/geoip2-city-3.mmdb differ diff --git a/modules/fs2/src/test/resources/simplelogger.properties b/modules/fs2/src/test/resources/simplelogger.properties new file mode 100644 index 000000000..d622d38e0 --- /dev/null +++ b/modules/fs2/src/test/resources/simplelogger.properties @@ -0,0 +1,13 @@ +org.slf4j.simpleLogger.showThreadName=false +org.slf4j.simpleLogger.showDateTime=true + +org.slf4j.simpleLogger.log.org.http4s.blaze.channel.ServerChannel=off +org.slf4j.simpleLogger.log.org.http4s.blaze.channel.nio1.SelectorLoop=off +org.slf4j.simpleLogger.log.org.http4s.blaze.channel.nio1.NIO1SocketServerGroup=off +org.slf4j.simpleLogger.log.org.http4s.client.PoolManager=off +org.slf4j.simpleLogger.log.org.http4s.server.blaze.BlazeServerBuilder=off + +org.slf4j.simpleLogger.log.com.snowplowanalytics.snowplow.enrich.fs2.Enrich=info +org.slf4j.simpleLogger.log.com.snowplowanalytics.snowplow.enrich.fs2.Assets=warn +org.slf4j.simpleLogger.log.com.snowplowanalytics.snowplow.enrich.fs2.test.HttpServer=info + diff --git a/modules/fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/fs2/AssetsSpec.scala b/modules/fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/fs2/AssetsSpec.scala new file mode 100644 index 000000000..34a4eb923 --- /dev/null +++ b/modules/fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/fs2/AssetsSpec.scala @@ -0,0 +1,165 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.fs2 + +import java.net.URI +import java.nio.file.Paths + +import scala.concurrent.duration._ + +import fs2.Stream +import fs2.io.file.{exists, readAll} + +import cats.effect.{Blocker, Concurrent, ContextShift, IO, Resource, Timer} + +import com.snowplowanalytics.snowplow.enrich.fs2.test._ +import com.snowplowanalytics.snowplow.enrich.fs2.Assets.Asset + +import org.specs2.ScalaCheck +import org.specs2.mutable.Specification +import cats.effect.testing.specs2.CatsIO + +class AssetsSpec extends Specification with CatsIO with ScalaCheck { + + sequential + + "updateStream" should { + "not set stop signal if no updates required" in + AssetsSpec.run(1.second) { (state, run) => + run(100.millis, List.empty) *> state.stop.get.map { pause => + pause must beFalse + } + } + + "download an asset and return stop signal into false" in { + val path = Paths.get("asset") + val input = List( + (URI.create("http://localhost:8080/asset"), path.toString) + ) + AssetsSpec.run(1500.millis) { (state, run) => + for { + assetExistsBefore <- Blocker[IO].use(b => exists[IO](b, path)) + _ <- run(100.millis, input) + stop <- state.stop.get + assetExists <- Blocker[IO].use(b => exists[IO](b, path)) + } yield { + assetExistsBefore must beFalse // Otherwise previous execution left the file + stop must beFalse + assetExists must beTrue + } + } + } + + "set stop signal to true when long downloads are performed" in { + val input = List( + (URI.create("http://localhost:8080/slow"), "asset1"), // First sets stop to true + (URI.create("http://localhost:8080/slow"), "asset2") // Second doesn't allow update to return prematurely + ) + AssetsSpec.run(3.seconds) { (state, run) => + for { + fiber <- (IO.sleep(2.seconds) *> state.stop.get).start + _ <- run(500.milliseconds, input) + stop <- fiber.join + } yield stop must beTrue + } + } + + "attempt to re-download non-existing file" in { + val path = Paths.get("flaky-asset") + val input = List( + (URI.create("http://localhost:8080/flaky"), path.toString) + ) + AssetsSpec.run(4.seconds) { (state, run) => + for { + _ <- run(1.second, input) + stop <- state.stop.get + assetExists <- Blocker[IO].use { b => + readAll[IO](path, b, 8).compile.to(Array).map(arr => new String(arr)) + } + } yield { + stop must beFalse + assetExists must beEqualTo("3") + } + } + } + } + + "Hash.fromStream" should { + "always create a valid MD5 hash" in { + prop { (bytes: Array[Byte]) => + val input = Stream.emits(bytes).covary[IO] + Assets.Hash.fromStream(input).map { hash => + hash.s.matches("^[a-f0-9]{32}$") must beTrue + } + } + } + } +} + +object AssetsSpec { + + /** Run assets refresh function with specified refresh interval and list of assets */ + type Run = (FiniteDuration, List[Asset]) => IO[Unit] + + /** + * User-written function to test effects of [[Assets]] stream + * * First argument - state initialised to empty, can be inspected after + * * Second argument - [[Run]] function to specify custom refresh interval and list of assets + */ + type Test[A] = (Assets.State[IO], Run) => IO[A] + + /** + * Run a test with resources allocated specifically for it + * It will allocate thread pool, empty state, HTTP server and will + * automatically remove all files after the test is over + * + * @param time timeout after which the test will be forced to exit + * @param test the actual test suite function + */ + def run[A]( + time: FiniteDuration + )( + test: Test[A] + )( + implicit C: Concurrent[IO], + T: Timer[IO], + CS: ContextShift[IO] + ): IO[A] = { + val resources = for { + blocker <- Blocker[IO] + state <- SpecHelpers.refreshState(List(URI.create("http://localhost:8080") -> "index")) + enrichments <- Environment.Enrichments.make[IO](List()) + path <- Resource.liftF(Assets.getCurDir[IO]) + _ <- SpecHelpers.filesResource(blocker, TestFiles) + } yield (blocker, state, enrichments, path) + + resources.use { + case (blocker, state, enrichments, curDir) => + val testFunction: Run = Assets + .updateStream[IO](blocker, state, enrichments, curDir, _, _) + .withHttp + .haltAfter(time) + .compile + .drain + test(state, testFunction) + } + } + + /** List of local files that have to be deleted after every test */ + private val TestFiles = List( + Paths.get("asset"), + Paths.get("asset1"), + Paths.get("asset2"), + Paths.get("flaky-asset") + ) +} diff --git a/modules/fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/fs2/EnrichSpec.scala b/modules/fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/fs2/EnrichSpec.scala new file mode 100644 index 000000000..385c187e7 --- /dev/null +++ b/modules/fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/fs2/EnrichSpec.scala @@ -0,0 +1,229 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.fs2 + +import java.time.Instant +import java.util.UUID + +import scala.concurrent.duration._ + +import cats.Applicative +import cats.data.Validated +import cats.implicits._ + +import cats.effect.IO + +import fs2.Stream + +import _root_.io.circe.literal._ + +import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer} + +import com.snowplowanalytics.iglu.client.Client + +import org.apache.http.NameValuePair +import org.apache.http.message.BasicNameValuePair + +import com.snowplowanalytics.snowplow.analytics.scalasdk.Event +import com.snowplowanalytics.snowplow.badrows.{Processor, BadRow, Payload => BadRowPayload} + +import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.IpLookupsEnrichment +import com.snowplowanalytics.snowplow.enrich.common.loaders.CollectorPayload +import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent + +import com.snowplowanalytics.snowplow.enrich.fs2.EnrichSpec.{Expected, minimalEvent, normalize, normalizeResult} +import com.snowplowanalytics.snowplow.enrich.fs2.test._ + +import org.specs2.ScalaCheck +import org.specs2.mutable.Specification +import cats.effect.testing.specs2.CatsIO + +class EnrichSpec extends Specification with CatsIO with ScalaCheck { + + sequential + + "enrichWith" should { + "enrich a minimal page_view CollectorPayload event without any enrichments enabled" in { + val expected = minimalEvent + .copy( + etl_tstamp = Some(Instant.ofEpochMilli(SpecHelpers.StaticTime)), + user_ipaddress = Some("175.16.199.0"), + event = Some("page_view"), + event_vendor = Some("com.snowplowanalytics.snowplow"), + event_name = Some("page_view"), + event_format = Some("jsonschema"), + event_version = Some("1-0-0"), + derived_tstamp = Some(Instant.ofEpochMilli(0L)) + ) + + Enrich + .enrichWith(TestEnvironment.enrichmentReg.pure[IO], Client.IgluCentral, None, _ => IO.unit)(EnrichSpec.payload[IO]) + .map(normalizeResult) + .map { + case List(Validated.Valid(event)) => event must beEqualTo(expected) + case other => ko(s"Expected one valid event, got $other") + } + } + + "enrich a randomly generated page view event" in { + implicit val cpGen = PayloadGen.getPageViewArbitrary + prop { (collectorPayload: CollectorPayload) => + val payload = Payload(collectorPayload.toRaw, IO.unit) + Enrich + .enrichWith(TestEnvironment.enrichmentReg.pure[IO], Client.IgluCentral, None, _ => IO.unit)(payload) + .map(normalizeResult) + .map { + case List(Validated.Valid(e)) => e.event must beSome("page_view") + case other => ko(s"Expected one valid event, got $other") + } + } + } + } + + "enrich" should { + "update metrics with raw, good and bad counters" in { + val input = Stream(Payload(Array.empty[Byte], IO.unit), EnrichSpec.payload[IO]) + TestEnvironment.make(input).use { test => + val enrichStream = Enrich.run[IO](test.env) + val rows = test.bad.dequeue + .either(test.good.dequeue) + .concurrently(enrichStream) + .haltAfter(1.second) + for { + _ <- test.env.pauseEnrich.set(false) + payloads <- rows.compile.toList + _ <- IO.sleep(100.millis) + counter <- test.counter.get + } yield { + counter mustEqual Counter(2L, 1L, 1L, None) + payloads must be like { + case List(Left(_), Right(_)) => ok + case List(Right(_), Left(_)) => ok + case other => ko(s"Expected one bad and one good row, got $other") + } + } + } + } + + "enrich event using refreshing MaxMind DB" in { + // 4 enrichments can update assets: MaxMind, IAB, referer-parser, ua-parser + val input = Stream(EnrichSpec.payload[IO]) ++ Stream.sleep_(2.seconds) ++ Stream(EnrichSpec.payload[IO]) + val ipLookupsConf = IpLookupsEnrichment + .parse( + json"""{ + "name": "ip_lookups", + "vendor": "com.snowplowanalytics.snowplow", + "enabled": true, + "parameters": { + "geo": { + "database": "GeoIP2-City.mmdb", + "uri": "http://localhost:8080/maxmind" + } + } + }""", + SchemaKey( + "com.snowplowanalytics.snowplow", + "ip_lookups", + "jsonschema", + SchemaVer.Full(2, 0, 0) + ), + false // Unlike in other tests we actually download it + ) + .getOrElse(throw new RuntimeException("Invalid test configuration")) + + val one = Expected + .copy( + geo_country = Some("CN"), + geo_region = Some("22"), + geo_city = Some("Changchun"), + geo_latitude = Some(43.88), + geo_longitude = Some(125.3228), + geo_region_name = Some("Jilin Sheng"), + geo_timezone = Some("Asia/Harbin") + ) + val two = one.copy(geo_city = Some("Fuyu")) + + val assetsServer = HttpServer.resource(6.seconds) + (assetsServer *> TestEnvironment.make(input, List(ipLookupsConf))).use { test => + val updatedEnv = test.env.copy(assetsUpdatePeriod = Some(500.millis)) + val assetsStream = Assets.run[IO](updatedEnv) + val enrichStream = Enrich.run[IO](updatedEnv).concurrently(assetsStream) + + test.good.dequeue + .concurrently(enrichStream) + .haltAfter(5.seconds) + .map(normalize) + .compile + .toList + .map { events => + events must beEqualTo(List(Validated.Valid(one), Validated.Valid(two))) + } + } + } + } +} + +object EnrichSpec { + val eventId: UUID = UUID.fromString("deadbeef-dead-beef-dead-beefdead") + + val api: CollectorPayload.Api = + CollectorPayload.Api("com.snowplowanalytics.snowplow", "tp2") + val source: CollectorPayload.Source = + CollectorPayload.Source("ssc-0.0.0-test", "UTF-8", Some("collector.snplow.net")) + val context: CollectorPayload.Context = CollectorPayload.Context(None, Some("175.16.199.0"), None, None, List(), None) + val querystring: List[NameValuePair] = List( + new BasicNameValuePair("e", "pv"), + new BasicNameValuePair("eid", eventId.toString) + ) + val colllectorPayload: CollectorPayload = CollectorPayload(api, querystring, None, None, source, context) + def payload[F[_]: Applicative]: Payload[F, Array[Byte]] = + Payload(colllectorPayload.toRaw, Applicative[F].unit) + + def normalize(payload: Payload[IO, EnrichedEvent]) = + Event + .parse(Enrich.encodeEvent(payload.data)) + .map(_.copy(etl_tstamp = Some(Instant.ofEpochMilli(SpecHelpers.StaticTime)))) match { + case Validated.Valid(event) => + Validated.Valid(event) + case Validated.Invalid(error) => + val rawPayload = BadRowPayload.RawPayload(Enrich.encodeEvent(payload.data)) + val badRow = BadRow.LoaderParsingError(Processor("fs2-enrich-test-suite", "x"), error, rawPayload) + Validated.Invalid(badRow) + } + + def normalizeResult(payload: Result[IO]) = + payload.data.map { + case Validated.Valid(a) => normalize(Payload(a, IO.unit)) + case Validated.Invalid(e) => e.invalid + } + + val minimalEvent = Event + .minimal( + EnrichSpec.eventId, + Instant.ofEpochMilli(0L), + "ssc-0.0.0-test", + s"fs2-enrich-${generated.BuildInfo.version}-common-${generated.BuildInfo.version}" + ) + + val Expected = minimalEvent + .copy( + etl_tstamp = Some(Instant.ofEpochMilli(SpecHelpers.StaticTime)), + user_ipaddress = Some("175.16.199.0"), + event = Some("page_view"), + event_vendor = Some("com.snowplowanalytics.snowplow"), + event_name = Some("page_view"), + event_format = Some("jsonschema"), + event_version = Some("1-0-0"), + derived_tstamp = Some(Instant.ofEpochMilli(0L)) + ) +} diff --git a/modules/fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/fs2/PayloadGen.scala b/modules/fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/fs2/PayloadGen.scala new file mode 100644 index 000000000..141f1841d --- /dev/null +++ b/modules/fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/fs2/PayloadGen.scala @@ -0,0 +1,113 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.fs2 + +import java.nio.file.{ Path, Paths } +import java.util.Base64 + +import cats.effect.{Blocker, IO} +import cats.effect.concurrent.Ref + +import _root_.io.circe.literal._ +import fs2.{Chunk, Stream} +import fs2.io.file.{ writeAll, createDirectory } + +import org.apache.http.message.BasicNameValuePair + +import org.joda.time.{DateTimeZone, LocalDate} + +import org.scalacheck.{Arbitrary, Gen} +import cats.effect.testing.specs2.CatsIO + +import com.snowplowanalytics.snowplow.enrich.common.loaders.CollectorPayload + +object PayloadGen extends CatsIO { + + val api: CollectorPayload.Api = + CollectorPayload.Api("com.snowplowanalytics.snowplow", "tp2") + val source: CollectorPayload.Source = + CollectorPayload.Source("ssc-0.0.0-test", "UTF-8", Some("collector.snplow.net")) + + val userAgentGen: Gen[String] = for { + os <- Gen.oneOf("Windows NT 10.0; Win64; x64", + "Windows NT 5.1; rv:7.0.1", + "Macintosh; Intel Mac OS X 10_14_5", + "Macintosh; Intel Mac OS X 10_15_4" + ) + engine <- Gen.oneOf("AppleWebKit/603.3.8 (KHTML, like Gecko)", + "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.169", + "AppleWebKit/605.1.15 (KHTML, like Gecko)" + ) + version <- Gen.oneOf("Version/11.1.2 Safari/605.1.15", "Chrome/60.0.3112.113 Safari/537.36", "Gecko/20100101 Firefox/40.1") + } yield s"Mozilla/5.0 ($os) $engine $version" + + val geolocationGen = for { + latitude <- Gen.choose(-90.0, 90.0) + longitude <- Gen.choose(-180.0, 180.0) + payload = json"""{"latitude":$latitude,"longitude":$longitude}""" + schemaKey = "iglu:com.snowplowanalytics.snowplow/geolocation_context/jsonschema/1-1-0" + } yield json"""{"schema":$schemaKey, "data": $payload}""" + val contextsGen = for { + geo <- Gen.option(geolocationGen).map(_.toList) + schemaKey = "iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1" + } yield json"""{"schema":$schemaKey, "data": $geo}""" + + val localDateGen: Gen[LocalDate] = Gen.calendar.map(LocalDate.fromCalendarFields).suchThat(_.year().get() < 3000) + val ipGen: Gen[String] = for { + part1 <- Gen.choose(2, 255) + part2 <- Gen.choose(0, 255) + part3 <- Gen.choose(0, 255) + part4 <- Gen.choose(0, 255) + } yield s"$part1.$part2.$part3.$part4" + val contextGen: Gen[CollectorPayload.Context] = for { + timestamp <- localDateGen.map(_.toDateTimeAtStartOfDay(DateTimeZone.UTC)).map(Option.apply) + ip <- Gen.option(ipGen) + userAgent <- userAgentGen.map(x => Some(x)) + userId <- Gen.option(Gen.uuid) + } yield CollectorPayload.Context(timestamp, ip, userAgent, None, List(), userId) + + val getPageView = for { + eventId <- Gen.uuid + aid <- Gen.oneOf("test-app", "scalacheck") + cx <- contextsGen.map(json => Base64.getEncoder.encodeToString(json.noSpaces.getBytes)) + querystring = List( + new BasicNameValuePair("aid", aid), + new BasicNameValuePair("e", "pv"), + new BasicNameValuePair("eid", eventId.toString), + new BasicNameValuePair("cx", cx) + ) + context <- contextGen + } yield CollectorPayload(api, querystring, None, None, source, context) + + val getPageViewArbitrary: Arbitrary[CollectorPayload] = Arbitrary.apply(getPageView) + + val payloadStream = Stream.repeatEval(IO(getPageView.sample)).collect { + case Some(x) => x + } + + def write(dir: Path, cardinality: Long): IO[Unit] = + for { + counter <- Ref.of[IO, Int](0) + dir <- Blocker[IO].use { b => createDirectory[IO](b, dir) } + filename = counter.updateAndGet(_ + 1).map(i => Paths.get(s"${dir.toAbsolutePath}/payload.$i.thrift")) + _ <- Blocker[IO].use { b => + val result = + for { + payload <- payloadStream.take(cardinality) + fileName <- Stream.eval(filename) + _ <- Stream.chunk(Chunk.bytes(payload.toRaw)).through(writeAll[IO](fileName, b)) + } yield () + result.compile.drain + } + } yield () +} diff --git a/modules/fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/fs2/PayloadSpec.scala b/modules/fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/fs2/PayloadSpec.scala new file mode 100644 index 000000000..d154db6c5 --- /dev/null +++ b/modules/fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/fs2/PayloadSpec.scala @@ -0,0 +1,90 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.fs2 + +import cats.implicits._ + +import cats.effect.IO +import cats.effect.concurrent.Ref +import cats.effect.testing.specs2.CatsIO + +import org.specs2.ScalaCheck +import org.specs2.mutable.Specification + +class PayloadSpec extends Specification with CatsIO with ScalaCheck { + "mapWithLast" should { + "always apply a lastF function to the last element" in { + prop { (list: List[String]) => + val lastF: String => String = _ => "unique" + val result = Payload.mapWithLast(identity[String], lastF)(list).toList + result.lastOption must (beSome("unique") or beNone) + } + } + + "always apply an f function to all elements except last" in { + prop { (list: List[String]) => + val f: String => String = _ => "unique" + val result = Payload.mapWithLast(f, identity[String])(list).toList + list match { + case Nil => ok + case _ => + val init = List.fill(list.length - 1)("unique") + result.mkString("-") must startWith(init.mkString("-")) + } + } + } + } + + "decompose" should { + "preserve the order" in { + val input = List("error-1".invalid, 42.valid, "error-2".invalid) + val payload = Payload(input, IO.unit) + payload.decompose[String, Int].compile.toList.map { + case List(error1, valid, error2) => + error1 must beLeft.like { + case Payload(data, _) => data must be("error-1") + } + valid must beRight.like { + case Payload(data, _) => data must beEqualTo(42) + } + error2 must beLeft.like { + case Payload(data, _) => data must be("error-2") + } + case other => + ko(s"Expected list of 3, got $other") + } + } + + "execute finalize action only once" in { + val input = List("error-1".invalid, 42.valid, "error-2".invalid) + for { + ref <- Ref.of[IO, Int](0) + payload = Payload(input, ref.update(_ + 1)) + parsed <- payload.decompose[String, Int].compile.toList + _ <- parsed.traverse_(_.fold(_.finalise, _.finalise)) + result <- ref.get + } yield result must beEqualTo(1) + } + + "not execute finalize action until last element" in { + val input = List("error-1".invalid, 42.valid, "error-2".invalid) + for { + ref <- Ref.of[IO, Int](0) + payload = Payload(input, ref.update(_ + 1)) + parsed <- payload.decompose[String, Int].compile.toList + _ <- parsed.init.traverse_(_.fold(_.finalise, _.finalise)) + result <- ref.get + } yield result must beEqualTo(0) + } + } +} diff --git a/modules/fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/fs2/SpecHelpers.scala b/modules/fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/fs2/SpecHelpers.scala new file mode 100644 index 000000000..2c6db6f88 --- /dev/null +++ b/modules/fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/fs2/SpecHelpers.scala @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.fs2 + +import java.nio.file.{NoSuchFileException, Path} + +import scala.concurrent.duration.TimeUnit + +import cats.effect.{Blocker, Clock, IO, Resource} +import cats.effect.concurrent.Ref + +import cats.implicits._ + +import fs2.io.file.deleteIfExists + +import com.snowplowanalytics.snowplow.enrich.fs2.test._ + +import cats.effect.testing.specs2.CatsIO + +object SpecHelpers extends CatsIO { + implicit val ioClock: Clock[IO] = + Clock.create[IO] + + val StaticTime = 1599750938180L + + val staticIoClock: Clock[IO] = + new Clock[IO] { + def realTime(unit: TimeUnit): IO[Long] = IO.pure(StaticTime) + def monotonic(unit: TimeUnit): IO[Long] = IO.pure(StaticTime) + } + + def refreshState(uris: List[Assets.Asset]): Resource[IO, Assets.State[IO]] = + for { + b <- TestEnvironment.ioBlocker + stop <- Resource.liftF(Ref.of[IO, Boolean](false)) + state <- Assets.State.make[IO](b, stop, uris) + } yield state + + /** Clean-up predefined list of files */ + def filesCleanup(blocker: Blocker, files: List[Path]): IO[Unit] = + files.traverse_ { path => + deleteIfExists[IO](blocker, path).recover { + case _: NoSuchFileException => false + } + } + + /** Make sure files don't exist before and after test starts */ + def filesResource(blocker: Blocker, files: List[Path]): Resource[IO, Unit] = + Resource.make(filesCleanup(blocker, files))(_ => filesCleanup(blocker, files)) +} diff --git a/modules/fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/fs2/config/Base64HoconSpec.scala b/modules/fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/fs2/config/Base64HoconSpec.scala new file mode 100644 index 000000000..10f065064 --- /dev/null +++ b/modules/fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/fs2/config/Base64HoconSpec.scala @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.fs2.config + +import java.util.Base64.getEncoder + +import com.monovore.decline.Argument + +import org.specs2.mutable.Specification + +class Base64HoconSpec extends Specification { + "Argument[Base64Hocon]" should { + "parse a base64-encoded HOCON" in { + val inputStr = """input = {}""" + val input = getEncoder.encodeToString(inputStr.getBytes()) + Argument[Base64Hocon].read(input).toEither must beRight + } + + "fail to parse plain string as HOCON" in { + val inputStr = "+" + val input = getEncoder.encodeToString(inputStr.getBytes()) + Argument[Base64Hocon].read(input).toEither must beLeft + } + } +} diff --git a/modules/fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/fs2/config/CliConfigSpec.scala b/modules/fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/fs2/config/CliConfigSpec.scala new file mode 100644 index 000000000..255ce17ba --- /dev/null +++ b/modules/fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/fs2/config/CliConfigSpec.scala @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.fs2.config + +import cats.syntax.either._ + +import cats.effect.IO + +import org.specs2.mutable.Specification +import cats.effect.testing.specs2.CatsIO + +class CliConfigSpec extends Specification with CatsIO { + "parseHocon" should { + "parse valid HOCON" in { + val string = """ + input = { + type = "PubSub" + subscriptionId = "inputSub" + } + """.stripMargin + Base64Hocon.parseHocon(string) must beRight + } + } + + "ConfigFile.parse" should { + "parse valid HOCON" in { + val hocon = + Base64Hocon + .parseHocon(""" + auth = { + type = "Gcp" + projectId = "test-project" + } + input = { + type = "PubSub" + subscriptionId = "inputSub" + } + good = { + type = "PubSub" + topic = "good-topic" + } + bad = { + type = "PubSub" + topic = "bad-topic" + } + """) + .getOrElse(throw new RuntimeException("Cannot parse HOCON file")) + + val expected = ConfigFile( + io.Authentication.Gcp("test-project"), + io.Input.PubSub("inputSub"), + io.Output.PubSub("good-topic"), + io.Output.PubSub("bad-topic"), + None, + None, + None + ) + + ConfigFile.parse[IO](hocon.asLeft).value.map(result => result must beRight(expected)) + } + } +} diff --git a/modules/fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/fs2/config/ConfigFileSpec.scala b/modules/fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/fs2/config/ConfigFileSpec.scala new file mode 100644 index 000000000..f2b010b47 --- /dev/null +++ b/modules/fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/fs2/config/ConfigFileSpec.scala @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.fs2.config + +import java.net.URI +import java.nio.file.Paths + +import scala.concurrent.duration._ + +import cats.syntax.either._ + +import cats.effect.IO + +import org.specs2.mutable.Specification +import cats.effect.testing.specs2.CatsIO + +class ConfigFileSpec extends Specification with CatsIO { + "parse" should { + "parse valid HOCON file with path provided" in { + val configPath = Paths.get(getClass.getResource("/config.fs2.hocon.sample").toURI) + val expected = ConfigFile( + io.Authentication.Gcp("test-project"), + io.Input.PubSub("inputSub"), + io.Output.PubSub("good-topic"), + io.Output.PubSub("bad-topic"), + Some(7.days), + Some(Sentry(URI.create("http://sentry.acme.com"))), + Some(1.second) + ) + ConfigFile.parse[IO](configPath.asRight).value.map(result => result must beRight(expected)) + } + + "not throw an exception if file not found" in { + val configPath = Paths.get("does-not-exist") + ConfigFile.parse[IO](configPath.asRight).value.map(result => result must beLeft) + } + } +} diff --git a/modules/fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/fs2/enrichments/ApiRequestEnrichmentSpec.scala b/modules/fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/fs2/enrichments/ApiRequestEnrichmentSpec.scala new file mode 100644 index 000000000..d931076b2 --- /dev/null +++ b/modules/fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/fs2/enrichments/ApiRequestEnrichmentSpec.scala @@ -0,0 +1,92 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.fs2.enrichments + +import java.util.Base64 + +import scala.concurrent.duration._ + +import org.apache.http.message.BasicNameValuePair + +import cats.implicits._ + +import cats.effect.IO +import cats.effect.testing.specs2.CatsIO + +import fs2.Stream + +import io.circe.Json +import io.circe.literal._ + +import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaCriterion, SchemaVer, SelfDescribingData} + +import com.snowplowanalytics.snowplow.analytics.scalasdk.SnowplowEvent.Contexts + +import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf.ApiRequestConf +import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.apirequest.{Authentication, HttpApi, Cache, Input, Output, JsonOutput} + +import com.snowplowanalytics.snowplow.enrich.fs2.enrichments.ApiRequestEnrichmentSpec.unstructEvent +import com.snowplowanalytics.snowplow.enrich.fs2.{EnrichSpec, Payload} +import com.snowplowanalytics.snowplow.enrich.fs2.test._ + +import org.specs2.mutable.Specification + +class ApiRequestEnrichmentSpec extends Specification with CatsIO { + "ApiRequestEnrichment" should { + "add a derived context" in { + val event = + json"""{ + "schema": "iglu:com.acme/test/jsonschema/1-0-1", + "data": {"path": {"id": 3}} + }""" + val payload = EnrichSpec.colllectorPayload.copy( + querystring = new BasicNameValuePair("ue_px", unstructEvent(event)) :: EnrichSpec.colllectorPayload.querystring + ) + val input = Stream(Payload(payload.toRaw, IO.unit)) + + /** Schemas defined at [[SchemaRegistry]] */ + val enrichment = ApiRequestConf( + SchemaKey("com.acme", "enrichment", "jsonschema", SchemaVer.Full(1,0,0)), + List(Input.Json("key1", "unstruct_event", SchemaCriterion("com.acme", "test", "jsonschema", 1), "$.path")), + HttpApi("GET", "http://localhost:8080/enrichment/api/$key1", 2000, Authentication(None)), + List(Output("iglu:com.acme/output/jsonschema/1-0-0", Some(JsonOutput("$")))), + Cache(1, 1000) + ) + + val expected = Contexts(List(SelfDescribingData( + SchemaKey("com.acme", "output", "jsonschema", SchemaVer.Full(1,0,0)), + json"""{"output": 1}""" + ))) + + val testWithHttp = HttpServer.resource(5.seconds) *> TestEnvironment.make(input, List(enrichment)) + testWithHttp.use { test => + test.env.pauseEnrich.set(false) *> + test.run(identity).map { events => events must beLike { + case List(Right(event)) => + event.derived_contexts must beEqualTo(expected) + case other => ko(s"Expected one enriched event, got ${other}") + } } + } + } + } +} + +object ApiRequestEnrichmentSpec { + private val encoder = Base64.getEncoder + + def encode(json: Json): String = + new String(encoder.encode(json.noSpaces.getBytes)) + + def unstructEvent(json: Json): String = + encode(json"""{"schema":"iglu:com.snowplowanalytics.snowplow/unstruct_event/jsonschema/1-0-0","data":$json}""") +} diff --git a/modules/fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/fs2/test/Counter.scala b/modules/fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/fs2/test/Counter.scala new file mode 100644 index 000000000..62522f9db --- /dev/null +++ b/modules/fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/fs2/test/Counter.scala @@ -0,0 +1,47 @@ +package com.snowplowanalytics.snowplow.enrich.fs2.test + +import java.util.concurrent.TimeUnit + +import cats.Monad +import cats.syntax.flatMap._ + +import cats.effect.concurrent.Ref +import cats.effect.{Clock, Sync} + +import com.snowplowanalytics.snowplow.enrich.fs2.io.Metrics + +/** Metrics container for testing */ +case class Counter( + raw: Long, + good: Long, + bad: Long, + latency: Option[Long] +) + +object Counter { + val empty: Counter = Counter(0L, 0L, 0L, None) + + def make[F[_]: Sync]: F[Ref[F, Counter]] = + Ref.of[F, Counter](empty) + + /** Create a pure metrics with mutable state */ + def mkCounterMetrics[F[_]: Monad: Clock](ref: Ref[F, Counter]): Metrics[F] = + new Metrics[F] { + def report: F[Unit] = + Monad[F].unit + + def enrichLatency(collectorTstamp: Option[Long]): F[Unit] = + Clock[F].realTime(TimeUnit.MILLISECONDS).flatMap { now => + ref.update(_.copy(latency = collectorTstamp.map(ct => now - ct))) + } + + def rawCount: F[Unit] = + ref.update(cnt => cnt.copy(raw = cnt.raw + 1)) + + def goodCount: F[Unit] = + ref.update(cnt => cnt.copy(good = cnt.good + 1)) + + def badCount: F[Unit] = + ref.update(cnt => cnt.copy(bad = cnt.bad + 1)) + } +} diff --git a/modules/fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/fs2/test/HttpServer.scala b/modules/fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/fs2/test/HttpServer.scala new file mode 100644 index 000000000..7a6217383 --- /dev/null +++ b/modules/fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/fs2/test/HttpServer.scala @@ -0,0 +1,119 @@ +/* + * Copyright (c) 2012-2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.fs2.test + +import scala.concurrent.duration._ + +import cats.implicits._ + +import cats.effect.{Blocker, Fiber, IO, Resource} +import cats.effect.concurrent.Ref + +import io.circe.literal._ + +import fs2.Stream +import fs2.io.readInputStream + +import io.chrisdavenport.log4cats.Logger +import io.chrisdavenport.log4cats.slf4j.Slf4jLogger + +import org.http4s.HttpRoutes +import org.http4s.Method.GET +import org.http4s.server.blaze.BlazeServerBuilder +import org.http4s.dsl.io._ +import org.http4s.syntax.all._ + +import cats.effect.testing.specs2.CatsIO + +/** + * Embedded HTTP Server for testing, mostly for assets refresh, + * but can serve + */ +object HttpServer extends CatsIO { + + private val logger: Logger[IO] = + Slf4jLogger.getLogger[IO] + + /** + * Set of testing routes: + * * Plain data + * * Imitating slow connection + * * Frequently updating resource + * * Sometimes non-working resource + * + * @param counter mutable variable with counter updated on every request + */ + def routes(counter: Ref[IO, Int]): HttpRoutes[IO] = + HttpRoutes + .of[IO] { + case r @ GET -> Root / "asset" => + logger.debug(r.pathInfo) *> Ok("data") + case r @ GET -> Root / "slow" => + val action = for { + i <- counter.updateAndGet(_ + 1) + _ <- if (i == 1) IO.sleep(100.milliseconds) else IO.sleep(10.seconds) + res <- Ok(s"slow data $i") + } yield res + logger.debug(r.pathInfo) *> action + case r @ GET -> Root / "counter" => + logger.debug(r.pathInfo) *> counter.updateAndGet(_ + 1).flatMap { i => + Ok(s"counter $i") + } + case r @ GET -> Root / "flaky" => + logger.debug(r.pathInfo) *> counter.update(_ + 1) *> + counter.get.flatMap { i => + val s = i.toString + if (i == 1 || i == 2) NotFound(s) + else if (i == 3) Ok(s) + else NotFound(s) + } + case GET -> Root / "maxmind" / "GeoIP2-City.mmdb" => + counter.updateAndGet(_ + 1).flatMap { i => + val is = readMaxMindDb(i) + Ok(Blocker[IO].use(b => readInputStream[IO](is, 256, b).compile.to(Array))) + } + case GET -> Root / "enrichment" / "api" / _ => + counter.updateAndGet(_ + 1).flatMap { _ => + Ok(json"""{"output": 1}""".noSpaces) + } + } + + def run: Stream[IO, Unit] = + for { + counter <- Stream.eval(Ref.of[IO, Int](0)) + stream <- BlazeServerBuilder[IO](concurrent.ExecutionContext.global) + .bindHttp(8080) + .withHttpApp(routes(counter).orNotFound) + .withoutBanner + .withoutSsl + .serve + .void + } yield stream + + def resource(duration: FiniteDuration): Resource[IO, Fiber[IO, Unit]] = + Resource.make { + run + .haltAfter(duration) + .compile + .drain + .start + .flatTap(_ => IO.sleep(500.millis) *> logger.info("Running test HttpServer")) + }(_.cancel *> logger.info("Destroyed test HttpServer")) + + private def readMaxMindDb(req: Int) = { + val path = + if (req < 4) s"/com.snowplowanalytics.snowplow.enrich.fs2/assets-refresh/geoip2-city-$req.mmdb" + else s"/com.snowplowanalytics.snowplow.enrich.fs2/assets-refresh/geoip2-city-3.mmdb" + IO(getClass.getResourceAsStream(path)) + } +} diff --git a/modules/fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/fs2/test/SchemaRegistry.scala b/modules/fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/fs2/test/SchemaRegistry.scala new file mode 100644 index 000000000..385d7e449 --- /dev/null +++ b/modules/fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/fs2/test/SchemaRegistry.scala @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.fs2.test + +import io.circe.Json +import io.circe.literal._ + +import com.snowplowanalytics.iglu.core.SelfDescribingSchema +import com.snowplowanalytics.iglu.core.circe.implicits._ + +object SchemaRegistry { + val acmeTest: SelfDescribingSchema[Json] = + SelfDescribingSchema + .parse(json"""{ + "$$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#", + "self": { + "vendor": "com.acme", + "name": "test", + "format": "jsonschema", + "version": "1-0-1" + }, + "properties": { + "path": { + "properties": { + "id": { + "type": "integer" + } + } + } + } + }""").getOrElse(throw new RuntimeException("invalid acmeTest schema")) + + val acmeOutput: SelfDescribingSchema[Json] = + SelfDescribingSchema + .parse(json"""{ + "$$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#", + "self": { + "vendor": "com.acme", + "name": "output", + "format": "jsonschema", + "version": "1-0-0" + }, + "properties": { + "output": { + "type": "number" + } + } + }""").getOrElse(throw new RuntimeException("invalid acmeTest schema")) + + // Defined on Iglu Central + val unstructEvent: SelfDescribingSchema[Json] = + SelfDescribingSchema + .parse(json"""{ + "$$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#", + "self": { + "vendor": "com.snowplowanalytics.snowplow", + "name": "unstruct_event", + "format": "jsonschema", + "version": "1-0-0" + }, + "type": "object", + "properties": { + "schema": { + "type": "string", + "pattern": "^iglu:[a-zA-Z0-9-_.]+/[a-zA-Z0-9-_]+/[a-zA-Z0-9-_]+/[0-9]+-[0-9]+-[0-9]+$$" + }, + "data": {} + }, + "required": ["schema", "data"], + "additionalProperties": false + }""").getOrElse(throw new RuntimeException("invalid acmeTest schema")) +} diff --git a/modules/fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/fs2/test/TestEnvironment.scala b/modules/fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/fs2/test/TestEnvironment.scala new file mode 100644 index 000000000..44459be54 --- /dev/null +++ b/modules/fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/fs2/test/TestEnvironment.scala @@ -0,0 +1,114 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.fs2.test + +import java.nio.file.Paths + +import scala.concurrent.duration._ + +import cats.Monad +import cats.syntax.either._ + +import cats.effect.{ContextShift, Blocker, Resource, Timer, Concurrent, IO} +import cats.effect.concurrent.Ref + +import io.circe.Json +import fs2.concurrent.{SignallingRef, Queue} + +import com.snowplowanalytics.iglu.client.{Resolver, Client, CirceValidator} +import com.snowplowanalytics.iglu.client.resolver.registries.Registry + +import com.snowplowanalytics.snowplow.badrows.BadRow + +import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry +import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf +import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent + +import com.snowplowanalytics.snowplow.enrich.fs2.Environment.Enrichments +import com.snowplowanalytics.snowplow.enrich.fs2.SpecHelpers.{filesResource, ioClock} +import com.snowplowanalytics.snowplow.enrich.fs2.{Assets, Environment, EnrichSpec, RawSource, Enrich, Payload} + +import cats.effect.testing.specs2.CatsIO + +import com.snowplowanalytics.snowplow.analytics.scalasdk.Event + +case class TestEnvironment( + env: Environment[IO], + counter: Ref[IO, Counter], + good: Queue[IO, Payload[IO, EnrichedEvent]], + bad: Queue[IO, Payload[IO, BadRow]] +) { + def run(f: Environment[IO] => Environment[IO]) + (implicit C: Concurrent[IO], CS: ContextShift[IO], T: Timer[IO]): IO[List[Either[BadRow, Event]]] = { + val enrichStream = Enrich.run[IO](f(env)) + bad.dequeue.either(good.dequeue) + .concurrently(enrichStream) + .haltAfter(5.seconds) + .compile + .toList + .map { rows => + rows.map(_.fold(_.data.asLeft, event => EnrichSpec.normalize(event).toEither)) + } + } +} + +object TestEnvironment extends CatsIO { + + val enrichmentReg: EnrichmentRegistry[IO] = + EnrichmentRegistry[IO]() + val enrichments: Environment.Enrichments[IO] = + Environment.Enrichments(enrichmentReg, Nil) + + val ioBlocker: Resource[IO, Blocker] = Blocker[IO] + + val embeddedRegistry = + Registry.InMemory( + Registry.Config("fs2-enrich embedded test registry", 1, List("com.acme")), + List( + SchemaRegistry.unstructEvent, + SchemaRegistry.acmeTest, + SchemaRegistry.acmeOutput, + ) + ) + val igluClient: Client[IO, Json] = + Client[IO, Json](Resolver(List(embeddedRegistry), None), CirceValidator) + + /** + * A dummy test environment without enrichmenta and with noop sinks and sources + * One can replace stream and sinks via `.copy` + */ + def make(source: RawSource[IO], enrichments: List[EnrichmentConf] = Nil): Resource[IO, TestEnvironment] = + for { + blocker <- ioBlocker + _ <- filesResource(blocker, enrichments.flatMap(_.filesToCache).map(p => Paths.get(p._2))) + (stop, counter) <- Resource.liftF(SignallingRef[IO, Boolean](true).parProduct(Counter.make[IO])) + assets <- Assets.State.make(blocker, stop, enrichments.flatMap(_.filesToCache)) + goodQueue <- Resource.liftF(Queue.unbounded[IO, Payload[IO, EnrichedEvent]]) + badQueue <- Resource.liftF(Queue.unbounded[IO, Payload[IO, BadRow]]) + metrics = Counter.mkCounterMetrics[IO](counter)(Monad[IO], ioClock) + enrichmentsRef <- Enrichments.make[IO](enrichments) + environment = Environment[IO](igluClient, + enrichmentsRef, + stop, + assets, + blocker, + source, + goodQueue.enqueue, + badQueue.enqueue, + None, + metrics, + None, + None + ) + } yield TestEnvironment(environment, counter, goodQueue, badQueue) +} diff --git a/modules/fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/fs2/test/package.scala b/modules/fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/fs2/test/package.scala new file mode 100644 index 000000000..4ae834cdc --- /dev/null +++ b/modules/fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/fs2/test/package.scala @@ -0,0 +1,24 @@ +package com.snowplowanalytics.snowplow.enrich.fs2 + +import scala.concurrent.duration.FiniteDuration + +import cats.effect.{Concurrent, IO, Timer} + +import _root_.fs2.Stream + +package object test { + + implicit class StreamOps[F[_], A](s: Stream[F, A]) { + + /** Halting a stream after specified period of time */ + def haltAfter(after: FiniteDuration)(implicit T: Timer[F], C: Concurrent[F]): Stream[F, A] = + Stream.eval_(Timer[F].sleep(after)).mergeHaltL(s) + } + + implicit class StreamIoOps[A](s: Stream[IO, A]) { + + /** Run test [[HttpServer]] in parallel with the stream */ + def withHttp(implicit C: Concurrent[IO]): Stream[IO, A] = + s.concurrently(HttpServer.run) + } +} diff --git a/project/BuildSettings.scala b/project/BuildSettings.scala index 6b73f77a0..4f412acf6 100644 --- a/project/BuildSettings.scala +++ b/project/BuildSettings.scala @@ -102,6 +102,9 @@ object BuildSettings { lazy val sbtAssemblySettings = Seq( assemblyJarName in assembly := { s"${moduleName.value}-${version.value}.jar" }, assemblyMergeStrategy in assembly := { + case x if x.endsWith("native-image.properties") => MergeStrategy.first + case x if x.endsWith("io.netty.versions.properties") => MergeStrategy.first + case x if x.endsWith("public-suffix-list.txt") => MergeStrategy.first case x if x.endsWith("ProjectSettings$.class") => MergeStrategy.first case x if x.endsWith("module-info.class") => MergeStrategy.first case x => diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 3f545aa97..4534a1502 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -47,6 +47,7 @@ object Dependencies { val maxmindIplookups = "0.7.1" val circe = "0.13.0" val circeOptics = "0.13.0" + val circeConfig = "0.7.0" val circeJackson = "0.13.0" val scalaForex = "1.0.0" val scalaWeather = "1.0.0" @@ -67,11 +68,23 @@ object Dependencies { val jackson = "2.10.5" val config = "1.3.4" + val decline = "1.0.0" + val fs2 = "2.4.4" + val monocle = "2.0.4" + val fs2PubSub = "0.16.0" + val fs2BlobStorage = "0.7.3" + val http4s = "0.21.7" + val log4cats = "1.1.1" + val catsRetry = "1.1.1" + val metrics = "4.1.12.1" + val scopt = "3.7.1" val pureconfig = "0.11.0" + val pureconfig013 = "0.13.0" val snowplowTracker = "0.6.1" val specs2 = "4.5.1" + val specs2CE = "0.4.1" val scalacheck = "1.14.0" val jinJava = "2.5.0" @@ -98,10 +111,12 @@ object Dependencies { val circeCore = "io.circe" %% "circe-core" % V.circe val circeGeneric = "io.circe" %% "circe-generic" % V.circe + val circeExtras = "io.circe" %% "circe-generic-extras" % V.circe val circeParser = "io.circe" %% "circe-parser" % V.circe val circeLiteral = "io.circe" %% "circe-literal" % V.circe val circeJava8 = "io.circe" %% "circe-java8" % V.circe val circeJawn = "io.circe" %% "circe-jawn" % V.circe + val circeConfig = "io.circe" %% "circe-config" % V.circeConfig val circeOptics = "io.circe" %% "circe-optics" % V.circeOptics val circeJackson = "io.circe" %% "circe-jackson210" % V.circeJackson val scalaUri = "io.lemonlabs" %% "scala-uri" % V.scalaUri @@ -121,6 +136,7 @@ object Dependencies { val specs2Cats = "org.specs2" %% "specs2-cats" % V.specs2 % Test val specs2Scalacheck = "org.specs2" %% "specs2-scalacheck" % V.specs2 % Test val specs2Mock = "org.specs2" %% "specs2-mock" % V.specs2 % Test + val specs2CE = "com.codecommit" %% "cats-effect-testing-specs2" % V.specs2CE % Test // Beam val sentry = "io.sentry" % "sentry" % V.sentry @@ -147,5 +163,23 @@ object Dependencies { val scalacheck = "org.scalacheck" %% "scalacheck" % V.scalacheck % Test val kafka = "org.apache.kafka" %% "kafka" % V.kafka % Test val jinJava = "com.hubspot.jinjava" % "jinjava" % V.jinJava % Test + + // FS2 + val decline = "com.monovore" %% "decline" % V.decline + val fs2PubSub = "com.permutive" %% "fs2-google-pubsub-grpc" % V.fs2PubSub + val fs2 = "co.fs2" %% "fs2-core" % V.fs2 + val fs2Io = "co.fs2" %% "fs2-io" % V.fs2 + val monocle = "com.github.julien-truffaut" %% "monocle-core" % V.monocle + val monocleMacro = "com.github.julien-truffaut" %% "monocle-macro" % V.monocle + val http4sClient = "org.http4s" %% "http4s-blaze-client" % V.http4s + val log4cats = "io.chrisdavenport" %% "log4cats-slf4j" % V.log4cats + val catsRetry = "com.github.cb372" %% "cats-retry" % V.catsRetry + val fs2BlobS3 = "com.github.fs2-blobstore" %% "s3" % V.fs2BlobStorage + val fs2BlobGcs = "com.github.fs2-blobstore" %% "gcs" % V.fs2BlobStorage + val pureconfigCats = "com.github.pureconfig" %% "pureconfig-cats-effect" % V.pureconfig + val pureconfigCirce = "com.github.pureconfig" %% "pureconfig-circe" % V.pureconfig + val metrics = "io.dropwizard.metrics" % "metrics-core" % V.metrics + val http4sDsl = "org.http4s" %% "http4s-dsl" % V.http4s % Test + val http4sServer = "org.http4s" %% "http4s-blaze-server" % V.http4s % Test } }