Skip to content

Commit

Permalink
Stream NH: add (close #346)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuwy committed Sep 18, 2020
1 parent dd45f7b commit 2a6185b
Show file tree
Hide file tree
Showing 27 changed files with 1,890 additions and 17 deletions.
39 changes: 39 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,45 @@ lazy val beam =
)
.enablePlugins(JavaAppPackaging, DockerPlugin, BuildInfoPlugin)

lazy val fs2 = project
.in(file("modules/fs2"))
.dependsOn(common)
.settings(BuildSettings.basicSettings)
.settings(BuildSettings.formatting)
.settings(BuildSettings.scoverageSettings)
.settings(
name := "fs2-enrich",
description := "High-performance streaming Snowplow Enrich job built on top of functional streams",
buildInfoKeys := Seq[BuildInfoKey](organization, name, version, "sceVersion" -> version.value, description),
buildInfoPackage := "com.snowplowanalytics.snowplow.enrich.fs2.generated",
)
.settings(
libraryDependencies ++= Seq(
Dependencies.Libraries.decline,
Dependencies.Libraries.fs2PubSub,
Dependencies.Libraries.circeExtras,
Dependencies.Libraries.circeLiteral,
Dependencies.Libraries.circeConfig,
Dependencies.Libraries.fs2,
Dependencies.Libraries.fs2Io,
Dependencies.Libraries.slf4j,
Dependencies.Libraries.sentry,
Dependencies.Libraries.log4cats,
Dependencies.Libraries.catsRetry,
Dependencies.Libraries.http4sClient,
Dependencies.Libraries.fs2BlobS3,
Dependencies.Libraries.fs2BlobGcs,
Dependencies.Libraries.pureconfig,
Dependencies.Libraries.pureconfigCats,
Dependencies.Libraries.pureconfigCirce,
Dependencies.Libraries.specs2,
Dependencies.Libraries.http4sDsl,
Dependencies.Libraries.http4sServer
),
addCompilerPlugin("com.olegpy" %% "better-monadic-for" % "0.3.1")
)
.enablePlugins(BuildInfoPlugin)

lazy val integrationTests = project
.in(file("modules/integration-tests"))
.settings(moduleName := "integration-tests")
Expand Down
32 changes: 32 additions & 0 deletions config/config.fs2.hocon.sample
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
raw = {
type = "pubsub"
project = "project"
subscription = "subscription"
}

// raw = {
// type = "fs"
// path = "/Users/chuwy/enriched"
// }

good = {
type = "pubsub"
project = "project"
subscription = "subscription"
}

bad = {
type = "pubsub"
project = "project"
subscription = "subscription"
}

pii = {
type = "pubsub"
project = "project"
subscription = "subscription"
}

sentry = {
dsn = "http://smth"
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,9 @@
package com.snowplowanalytics.snowplow.enrich.common.adapters.registry

import cats.Monad
import cats.data.{NonEmptyList, ValidatedNel}
import cats.data.{NonEmptyList, Validated, ValidatedNel}
import cats.data.Validated._
import cats.syntax.either._
import cats.syntax.eq._
import cats.syntax.option._
import cats.syntax.validated._
import cats.implicits._

import cats.effect.Clock

Expand Down Expand Up @@ -265,17 +262,11 @@ trait Adapter {
* or Failures
*/
protected[registry] def rawEventsListProcessor(
rawEventsList: List[ValidatedNel[FailureDetails.AdapterFailure, RawEvent]]
rawEventsList: List[Validated[NonEmptyList[FailureDetails.AdapterFailure], RawEvent]]
): ValidatedNel[FailureDetails.AdapterFailure, NonEmptyList[RawEvent]] = {
val successes: List[RawEvent] =
for {
Valid(s) <- rawEventsList
} yield s

val failures: List[FailureDetails.AdapterFailure] =
(for {
Invalid(NonEmptyList(h, t)) <- rawEventsList
} yield h :: t).flatten
val (failures, successes) = rawEventsList.separate match {
case (nel, list) => (nel.flatMap(_.toList), list)
}

(successes, failures) match {
// No Failures collected.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import cats.{Id, Monad}
import cats.data.{EitherT, NonEmptyList, ValidatedNel}
import cats.implicits._

import cats.effect.Sync

import io.circe._
import io.circe.generic.auto._

Expand All @@ -26,7 +28,6 @@ import com.snowplowanalytics.iglu.core.circe.implicits._

import com.snowplowanalytics.lrumap._
import com.snowplowanalytics.snowplow.badrows.FailureDetails

import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.{Enrichment, ParseableEnrichment}
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf.ApiRequestConf
import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent
Expand Down Expand Up @@ -231,4 +232,24 @@ object CreateApiRequestEnrichment {
)
)
}

implicit def syncCreateApiRequestEnrichment[F[_]: Sync](
implicit CLM: CreateLruMap[F, String, (Either[Throwable, Json], Long)],
HTTP: HttpClient[F]
): CreateApiRequestEnrichment[F] =
new CreateApiRequestEnrichment[F] {
def create(conf: ApiRequestConf): F[ApiRequestEnrichment[F]] =
CLM
.create(conf.cache.size)
.map(c =>
ApiRequestEnrichment(
conf.schemaKey,
conf.inputs,
conf.api,
conf.outputs,
conf.cache.ttl,
c
)
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
/*
* Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.enrich.fs2

import java.net.URI
import java.nio.file.{Path, Paths}

import scala.concurrent.duration._
import scala.util.control.NonFatal

import cats.{Applicative, Parallel}
import cats.implicits._

import cats.effect.{Blocker, ConcurrentEffect, ContextShift, Sync, Timer}
import cats.effect.concurrent.Ref

import retry.{RetryDetails, RetryPolicies, RetryPolicy, retryingOnSomeErrors}

import fs2.Stream
import fs2.hash.md5
import fs2.io.file.{copy, deleteIfExists, tempFileResource, writeAll}

import _root_.io.chrisdavenport.log4cats.Logger
import _root_.io.chrisdavenport.log4cats.slf4j.Slf4jLogger

import com.snowplowanalytics.snowplow.enrich.fs2.io.Clients

/**
* Functions responsible for periodic assets (such as MaxMind/IAB DBs) updates
* The common logic is to periodically invoke a function that:
* 1. Downloads a file (in background) to a temp location
* 2. Compares file's checksum with existing one (stored in a mutable hashmap)
* 3. If checksums match - delete the temp file, return
* 4. If checksums don't match - send a signal to stop raw stream
* (via `SignallingRef` in [[Environment]])
* 5. Once raw stream is stopped - delete an old file and move
* temp file to the old's file location
* If any of those URIs been updated and stopped the raw stream, it will be
* immediately resumed once the above procedure traversed all files
*/
object AssetsRefresh {

private implicit def unsafeLogger[F[_]: Sync]: Logger[F] =
Slf4jLogger.getLogger[F]

/**
* State of the [[updateStream]], containing information about tracked URIs
* and `stop` signal from [[Environment]] as well as all clients necessary
* to download URIs
*
* @param files mutable hash map of URIs and their latest known state
* @param stop stop signal coming from [[Environment]] and that can be used
* to stop the raw stream consumption
* @param clients HTTP, GCS, S3 clients if necessary
*/
case class State[F[_]](
files: Ref[F, Map[URI, Hash]],
stop: Ref[F, Boolean],
clients: Clients[F]
)

case class Hash private (s: String) extends AnyVal

object Hash {
def apply(bytes: Array[Byte]): Hash = {
val bi = new java.math.BigInteger(1, bytes)
Hash(String.format("%0" + (bytes.length << 1) + "x", bi))
}
}

/** Pair of a tracker `URI` and destination path on local FS (`java.nio.file.Path` is not serializable) */
type Asset = (URI, String)

/** Initialise the [[updateStream]] with all necessary resources if refresh period is specified */
def run[F[_]: ConcurrentEffect: ContextShift: Timer: Parallel](environment: Environment[F]): Stream[F, Unit] =
environment.config.assetsUpdatePeriod match {
case Some(duration) =>
val init = for {
files <- Ref.of(Map.empty[URI, Hash])
curDir <- Sync[F].delay(Paths.get("").toAbsolutePath)
_ <- Logger[F].info("Initializing AssetsRefresh stream")
uris = environment.enrichments.configs.flatMap(_.filesToCache).map(_._1)
stream = for {
clients <- Stream.resource(Clients.initialize[F](environment.blocker, uris))
state = State(files, environment.stop, clients)
assets = environment.enrichments.configs.flatMap(_.filesToCache)
_ <- updateStream[F](environment.blocker, state, curDir, duration, assets)
} yield ()
} yield stream
Stream.eval(init).flatten
case None =>
Stream.empty.covary[F]
}

/**
* At the end of every update, the stop signal will be resumed to `false`
* Create an update stream that ticks periodically and can invoke an update action,
* which will download an URI and check if it has been update. If it has the
* raw stream will be stopped via `stop` signal from [[Environment]] and assets updated
*/
def updateStream[F[_]: ConcurrentEffect: ContextShift: Parallel: Timer](
blocker: Blocker,
state: State[F],
curDir: Path,
duration: FiniteDuration,
assets: List[Asset]
): Stream[F, Unit] =
Stream.awakeEvery[F](duration).evalMap { _ =>
val log = Logger[F].debug(show"Ticking with ${assets.map(_._2)}")
val updates = assets.parTraverse {
case (uri, path) =>
update(blocker, state, curDir)(uri, Paths.get(path))
}
log *> updates.map(_.contains(true)).flatMap { stopped =>
if (stopped) Logger[F].info("Resuming signalling ref") *> state.stop.set(false) else Sync[F].unit
}
}

/**
* Update a file in current directory if it has been updated on remote storage
* If a new file has been discovered - stops the enriching streams (signal in `state`)
* Do nothing if file hasn't been updated
*
* Note: this function has a potential to be thread-unsafe if download time
* exceeds tick period. We assume that no two threads will be downloading the same URI
*
* @param blocker a thread pool to execute download/copy operations
* @param state a map of URI to MD5 hash to keep track latest state of remote files
* @param curDir a local FS destination for temporary files
* @param uri a remote file (S3, GCS or HTTP), the URI is used as an identificator
* @param path a static file name that enrich clients will access
* file itself is placed in current dir (`dir`)
* @return true if file has been updated
*/
def update[F[_]: ConcurrentEffect: ContextShift: Timer](
blocker: Blocker,
state: State[F],
curDir: Path
)(
uri: URI,
path: Path
): F[Boolean] =
tempFileResource[F](blocker, curDir).use { tmp =>
// Set stop signal and replace old file with temporary
def stopAndCopy(hash: Hash, delete: Boolean): F[Unit] =
for {
_ <- Logger[F].info(s"Discovered new data at $uri, stopping signalling ref")
_ <- state.stop.set(true)
_ <- if (delete) {
val deleted = Logger[F].info(s"Deleted outdated $path")
val notDeleted = Logger[F].warn(s"Couldn't delete $path, file didn't exist")
deleteIfExists(blocker, path).ifM(deleted, notDeleted)
} else Sync[F].unit
_ <- copy(blocker, tmp, path)
_ <- state.files.update(_.updated(uri, hash))
} yield ()

val data = state.clients.download(uri).observe(writeAll(tmp, blocker)).through(md5)
for {
_ <- Logger[F].info(s"Downloading $uri")
hash <- retryDownload(data.compile.to(Array)).map(Hash.apply)
localFiles <- state.files.get
updated <- localFiles.get(uri) match {
case Some(known) if known == hash =>
Sync[F].pure(false)
case Some(_) =>
stopAndCopy(hash, true).as(true)
case None =>
stopAndCopy(hash, false).as(true)
}
} yield updated
}

def retryDownload[F[_]: Sync: Timer, A](download: F[A]): F[A] =
retryingOnSomeErrors[A](retryPolicy[F], worthRetrying, onError[F])(download)

def retryPolicy[F[_]: Applicative]: RetryPolicy[F] =
RetryPolicies.fullJitter[F](1500.milliseconds).join(RetryPolicies.limitRetries[F](5))

def worthRetrying(e: Throwable): Boolean =
e match {
case _: Clients.DownloadingFailure => true
case _: IllegalArgumentException => false
case NonFatal(_) => false
}

def onError[F[_]: Sync](error: Throwable, details: RetryDetails): F[Unit] =
if (details.givingUp)
Logger[F].error(show"Failed to download an asset after ${details.retriesSoFar}. ${error.getMessage}. Aborting the job")
else if (details.retriesSoFar == 0)
Logger[F].warn(show"Failed to download an asset. ${error.getMessage}. Keep retrying")
else
Logger[F].warn(
show"Failed to download an asset after ${details.retriesSoFar} retries, " +
show"waiting for ${details.cumulativeDelay.toMillis} ms. ${error.getMessage}. " +
show"Keep retrying"
)
}
Loading

0 comments on commit 2a6185b

Please sign in to comment.