From d211791df2dd3e320a8f05c3f2cebf858ba43c09 Mon Sep 17 00:00:00 2001 From: Anton Parkhomenko Date: Thu, 6 Aug 2020 13:35:55 +0300 Subject: [PATCH] Beam: add mechanism for assets refresh (close #319) --- .../AssetsManagement.scala | 245 ++++++++++++++++++ .../Enrich.scala | 72 ++--- .../config.scala | 36 ++- .../ConfigSpec.scala | 68 +++-- .../EnrichSpec.scala | 67 ++++- .../enrichments/IpLookupsEnrichmentSpec.scala | 4 +- 6 files changed, 410 insertions(+), 82 deletions(-) create mode 100644 modules/beam/src/main/scala/com.snowplowanalytics.snowplow.enrich.beam/AssetsManagement.scala diff --git a/modules/beam/src/main/scala/com.snowplowanalytics.snowplow.enrich.beam/AssetsManagement.scala b/modules/beam/src/main/scala/com.snowplowanalytics.snowplow.enrich.beam/AssetsManagement.scala new file mode 100644 index 000000000..bb2b6f384 --- /dev/null +++ b/modules/beam/src/main/scala/com.snowplowanalytics.snowplow.enrich.beam/AssetsManagement.scala @@ -0,0 +1,245 @@ +/* + * Copyright (c) 2012-2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and + * limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.beam + +import java.io.File +import java.net.URI +import java.nio.file.NoSuchFileException +import java.nio.file.attribute.BasicFileAttributes +import java.nio.file.{Files, Paths} + +import cats.syntax.either._ + +import org.joda.time.Duration + +import org.slf4j.LoggerFactory + +import org.apache.beam.sdk.extensions.gcp.options.GcsOptions +import org.apache.beam.sdk.io.GenerateSequence +import org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode +import org.apache.beam.sdk.transforms.windowing.{AfterPane, Repeatedly} +import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior + +import com.spotify.scio.ScioContext +import com.spotify.scio.coders.Coder +import com.spotify.scio.util.RemoteFileUtil +import com.spotify.scio.values.{DistCache, SCollection, SideInput, WindowOptions} + +import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf +import com.snowplowanalytics.snowplow.enrich.beam.utils.createSymLink + +/** + * Module responsible for assets (such as MaxMind and referer-parser DBs) + * management: downloading, distributing per workers, updating when necessary + */ +object AssetsManagement { + + val SideInputName = "assets-refresh-tick" + + /** List of linked files or error messages */ + type DbList = List[Either[String, FileLink]] + + /** + * A downloaded asset. `Path` not used because its not serializable + * @param uri an original GCS URI + * @param original real file path on a worker + * @param link link path to `original`. Its used because enrichments + * refer to static/hardcoded filepath, whereas `original` + * is dynamic + */ + case class FileLink( + uri: String, + original: String, + link: String + ) + + private val logger = LoggerFactory.getLogger(this.getClass) + + /** + * Create a transformation sole purpose of which is to periodially refresh + * worker assets. If no `refreshRate` given - it's an empty transformation + * @param sc Scio context + * @param refreshRate an interval with which assets should be updated + * @param enrichmentConfs enrichment configurations to provide links + * @tparam A type of data flowing through original stream + * @return no-op or refresh transformation + */ + def mkTransformation[A: Coder]( + sc: ScioContext, + refreshRate: Option[Duration], + enrichmentConfs: List[EnrichmentConf] + ): SCollection[A] => SCollection[A] = + refreshRate match { + case Some(rate) => + val rfu = RemoteFileUtil.create(sc.optionsAs[GcsOptions]) + val refreshInput = getSideInput(sc, rate).asSingletonSideInput(false) + val distCache = buildDistCache(sc, enrichmentConfs) + withAssetsUpdate[A](distCache, refreshInput, rfu) + case None => + val distCache = buildDistCache(sc, enrichmentConfs) + // identity function analog for scio + (collection: SCollection[A]) => + collection + .withName("assets-refresh-noop") + .map { a => + val _ = distCache() + a + } + } + + /** `SCollection` ticking with `true` for a minute, after specified period of time */ + def getSideInput(sc: ScioContext, period: Duration): SCollection[Boolean] = + sc + .customInput( + SideInputName, + GenerateSequence + .from(0L) + .withRate(1L, Duration.standardMinutes(1)) + ) + .map(x => x % period.getStandardMinutes == 0) + .withName("assets-refresh-window") + .withGlobalWindow( + options = WindowOptions( + trigger = Repeatedly.forever(AfterPane.elementCountAtLeast(1)), + accumulationMode = AccumulationMode.DISCARDING_FIRED_PANES, + closingBehavior = ClosingBehavior.FIRE_IF_NON_EMPTY, + allowedLateness = Duration.standardSeconds(0) + ) + ) + .withName("assets-refresh-noupdate") + + /** Transformation that only updates DBs and returns data as is */ + def withAssetsUpdate[A: Coder]( + cachedFiles: DistCache[DbList], + refreshInput: SideInput[Boolean], + rfu: RemoteFileUtil + )( + raw: SCollection[A] + ): SCollection[A] = + raw + .withGlobalWindow() + .withSideInputs(refreshInput) + .withName("assets-refresh") // aggregate somehow, if there's true in acc - return false + .map { (raw, side) => + val update = side(refreshInput) + if (update) { + val existing = cachedFiles() // Get already downloaded + // Traverse all existing files and find out if at least one had to be deleted + val atLeastOne = existing.foldLeft(false)((acc, cur) => shouldUpdate(cur) || acc) + if (atLeastOne) { + logger.info(s"Deleting all assets: ${}") + existing.foreach(deleteFile(rfu)) + logger.info("Re-downloading expired assets") + val _ = cachedFiles() + } + } else { + val _ = cachedFiles() // In case side-input's first value wasn't true somehow, should be no-op + } + + raw + } + .toSCollection + + /** + * Builds a Scio's [[DistCache]] which downloads the needed files and create the necessary + * symlinks. + * @param sc Scio context + * @param enrichmentConfs list of enrichment configurations + * @return a properly build [[DistCache]] + */ + def buildDistCache(sc: ScioContext, enrichmentConfs: List[EnrichmentConf]): DistCache[DbList] = { + val filesToCache: List[(URI, String)] = enrichmentConfs + .flatMap(_.filesToCache) + val filesToDownload = filesToCache.map(_._1.toString) + val filesDestinations = filesToCache.map(_._2) + + sc.distCache(filesToDownload)(linkFiles(filesToDownload, filesDestinations)) + } + + /** + * Check if link is older than 2 minutes and try to delete it and its original file + * Checking that it's old allows to not re-trigger downloading + * multiple times because refreshing side input will be true for one minute + * @param fileLink data structure containing all information about a file + * that potentially has to be deleted and re-downloaded + * @return true if thread managed to delete a link and RFU + * false if there was any error or it wasn't necessary to delete it + */ + private def shouldUpdate(fileLink: Either[String, FileLink]): Boolean = + fileLink match { + case Right(FileLink(_, originalPath, _)) => + val path = Paths.get(originalPath) + try { + val lastModified = Files + .readAttributes(path, classOf[BasicFileAttributes]) + .lastModifiedTime() + .toMillis + val now = System.currentTimeMillis() + val should = now - lastModified > 120000L + if (should) logger.info(s"File $originalPath is timestamped with $lastModified at $now") + should + } catch { + case _: NoSuchFileException => + logger.warn( + s"File $path does not exist for lastModifiedTime, possibly another thread deleted it. Blocking the worker to avoid futher racing" + ) + Thread.sleep(2000) + false + } + case Left(_) => + // Threads ran into race condition and cannot overwrite each other's link - not a failure + false + } + + private def deleteFile(rfu: RemoteFileUtil)(fileLink: Either[String, FileLink]): Unit = + fileLink match { + case Right(FileLink(originalUri, originalPath, linkPath)) => + val deleted = deleteFilePhysically(originalPath) || deleteFilePhysically(linkPath) + if (deleted) { + rfu.delete(URI.create(originalUri)) + logger.info(s"Deleted $fileLink") + } + case Left(_) => + () + } + + private def deleteFilePhysically(path: String): Boolean = + Either.catchOnly[NoSuchFileException](Files.delete(Paths.get(path))) match { + case Left(_) => + logger.warn(s"Tried to delete nonexisting $path") + false + case Right(_) => + true + } + + /** + * Link every `downloaded` file to a destination from `links` + * Both `downloaded` and `links` must have same amount of elements + */ + private def linkFiles(uris: List[String], links: List[String])(downloaded: Seq[File]): DbList = { + val mapped = downloaded.toList + .zip(links) + .map { case (file, symLink) => createSymLink(file, symLink) } + + uris.zip(mapped).zip(downloaded).map { + case ((uri, Right(p)), file) => + logger.info(s"File $file cached at $p") + FileLink(uri, file.toString, p.toString).asRight + case ((uri, Left(e)), file) => + logger.warn(s"File $file (downloaded from $uri) could not be cached: $e") + e.asLeft + } + } +} diff --git a/modules/beam/src/main/scala/com.snowplowanalytics.snowplow.enrich.beam/Enrich.scala b/modules/beam/src/main/scala/com.snowplowanalytics.snowplow.enrich.beam/Enrich.scala index d67944530..5d442ab16 100644 --- a/modules/beam/src/main/scala/com.snowplowanalytics.snowplow.enrich.beam/Enrich.scala +++ b/modules/beam/src/main/scala/com.snowplowanalytics.snowplow.enrich.beam/Enrich.scala @@ -16,16 +16,29 @@ package com.snowplowanalytics.snowplow.enrich.beam import scala.collection.JavaConverters._ +import cats.Id +import cats.data.Validated +import cats.implicits._ + import io.circe.Json import io.sentry.Sentry -import cats.Id -import cats.data.Validated -import cats.implicits._ +import com.spotify.scio._ +import com.spotify.scio.coders.Coder +import com.spotify.scio.pubsub.PubSubAdmin +import com.spotify.scio.values.SCollection + +import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions +import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions + +import org.joda.time.{DateTime, Duration} +import org.slf4j.LoggerFactory import com.snowplowanalytics.iglu.client.Client + import com.snowplowanalytics.snowplow.badrows.{BadRow, Processor} + import com.snowplowanalytics.snowplow.enrich.common.EtlPipeline import com.snowplowanalytics.snowplow.enrich.common.adapters.AdapterRegistry import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry @@ -33,16 +46,6 @@ import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.Enrichm import com.snowplowanalytics.snowplow.enrich.common.loaders.ThriftLoader import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent -import com.spotify.scio._ -import com.spotify.scio.coders.Coder -import com.spotify.scio.pubsub.PubSubAdmin -import com.spotify.scio.values.{DistCache, SCollection} - -import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions -import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions -import org.joda.time.DateTime -import org.slf4j.LoggerFactory - import com.snowplowanalytics.snowplow.enrich.beam.config._ import com.snowplowanalytics.snowplow.enrich.beam.singleton._ import com.snowplowanalytics.snowplow.enrich.beam.utils._ @@ -69,7 +72,7 @@ object Enrich { def main(cmdlineArgs: Array[String]): Unit = { val (sc, args) = ContextAndArgs(cmdlineArgs) val parsedConfig = for { - config <- EnrichConfig(args) + config <- EnrichConfig.from(args) _ = sc.setJobName(config.jobName) _ <- checkTopicExists(sc, config.enriched) _ <- checkTopicExists(sc, config.bad) @@ -83,6 +86,7 @@ object Enrich { "A pii topic needs to be used in order to use the pii enrichment".asLeft else ().asRight + refreshRate = if (config.assetsRefreshDuration == 0) None else Some(Duration.standardMinutes(config.assetsRefreshDuration.toLong)) } yield ParsedEnrichConfig( config.raw, config.enriched, @@ -92,7 +96,8 @@ object Enrich { confs, labels, config.sentryDSN, - config.metrics + config.metrics, + refreshRate ) parsedConfig match { @@ -110,14 +115,17 @@ object Enrich { if (config.labels.nonEmpty) sc.optionsAs[DataflowPipelineOptions].setLabels(config.labels.asJava) - val cachedFiles: DistCache[List[Either[String, String]]] = - buildDistCache(sc, config.enrichmentConfs) + /** Either plain no-op or assets-updating transformation no-op */ + val withAssetUpdate = AssetsManagement.mkTransformation[Array[Byte]](sc, config.assetsRefreshRate, config.enrichmentConfs) val raw: SCollection[Array[Byte]] = - sc.withName("raw-from-pubsub").pubsubSubscription[Array[Byte]](config.raw) + sc.withName("raw-from-pubsub") + .pubsubSubscription[Array[Byte]](config.raw) + .withName("assets-refresh-transformation") + .transform(withAssetUpdate) val enriched: SCollection[Validated[BadRow, EnrichedEvent]] = - enrichEvents(raw, config.resolver, config.enrichmentConfs, cachedFiles, config.sentryDSN, config.metrics) + enrichEvents(raw, config.resolver, config.enrichmentConfs, config.sentryDSN, config.metrics) val (failures, successes): (SCollection[BadRow], SCollection[EnrichedEvent]) = { val enrichedPartitioned = enriched.withName("split-enriched-good-bad").partition(_.isValid) @@ -189,20 +197,17 @@ object Enrich { * @param raw collection of events * @param resolver Json representing the iglu resolver * @param enrichmentConfs list of enabled enrichment configuration - * @param cachedFiles list of files to cache */ private def enrichEvents( raw: SCollection[Array[Byte]], resolver: Json, enrichmentConfs: List[EnrichmentConf], - cachedFiles: DistCache[List[Either[String, String]]], sentryDSN: Option[String], metrics: Boolean ): SCollection[Validated[BadRow, EnrichedEvent]] = raw .withName("enrich") .map { rawEvent => - cachedFiles() val (enriched, time) = timeMs { enrich( rawEvent, @@ -305,29 +310,6 @@ object Enrich { } } - /** - * Builds a Scio's [[DistCache]] which downloads the needed files and create the necessary - * symlinks. - * @param sc Scio context - * @param enrichmentConfs list of enrichment configurations - * @return a properly build [[DistCache]] - */ - private def buildDistCache(sc: ScioContext, enrichmentConfs: List[EnrichmentConf]): DistCache[List[Either[String, String]]] = { - val filesToCache: List[(String, String)] = enrichmentConfs - .flatMap(_.filesToCache) - .map { case (uri, sl) => (uri.toString, sl) } - sc.distCache(filesToCache.map(_._1)) { files => - val symLinks = files.toList - .zip(filesToCache.map(_._2)) - .map { case (file, symLink) => createSymLink(file, symLink) } - symLinks.zip(files).foreach { - case (Right(p), file) => logger.info(s"File $file cached at $p") - case (Left(e), file) => logger.warn(s"File $file could not be cached: $e") - } - symLinks.map(_.map(_.toString)) - } - } - /** * Checks a PubSub topic exists before launching the job. * @param sc Scio Context diff --git a/modules/beam/src/main/scala/com.snowplowanalytics.snowplow.enrich.beam/config.scala b/modules/beam/src/main/scala/com.snowplowanalytics.snowplow.enrich.beam/config.scala index fea021495..75a63fbd4 100644 --- a/modules/beam/src/main/scala/com.snowplowanalytics.snowplow.enrich.beam/config.scala +++ b/modules/beam/src/main/scala/com.snowplowanalytics.snowplow.enrich.beam/config.scala @@ -24,13 +24,19 @@ import scala.io.Source import cats.Id import cats.data.ValidatedNel import cats.implicits._ + +import org.joda.time.Duration + +import com.spotify.scio.Args + import com.snowplowanalytics.iglu.client.Client import com.snowplowanalytics.iglu.core._ import com.snowplowanalytics.iglu.core.circe.CirceIgluCodecs._ + import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf import com.snowplowanalytics.snowplow.enrich.common.utils.JsonUtils -import com.spotify.scio.Args + import io.circe.Json import io.circe.parser.decode import io.circe.syntax._ @@ -52,12 +58,13 @@ object config { enrichments: Option[String], labels: Option[String], sentryDSN: Option[String], - metrics: Boolean + metrics: Boolean, + assetsRefreshDuration: Int ) object EnrichConfig { /** Smart constructor taking SCIO's [[Args]] */ - def apply(args: Args): Either[String, EnrichConfig] = + def from(args: Args): Either[String, EnrichConfig] = for { _ <- if (args.optional("help").isDefined) helpString(configurations).asLeft else "".asRight l <- configurations @@ -79,7 +86,8 @@ object config { args.optional("enrichments"), args.optional("labels"), args.optional("sentry-dsn"), - args.boolean("metrics", true) + args.boolean("metrics", default = true), + args.int("assets-refresh-rate", default = 0) ) private val configurations = List( @@ -98,10 +106,11 @@ object config { Configuration.Optional("enrichments", "Path to the directory containing the enrichment files"), Configuration.Optional("labels", "Dataflow labels to be set ie. env=qa1;region=eu"), Configuration.Optional("sentry-dsn", "Sentry DSN"), - Configuration.Optional("metrics", "Enable ScioMetrics (default: true)") + Configuration.Optional("metrics", "Enable ScioMetrics (default: true)"), + Configuration.Optional("assets-refresh-rate", "How often (in minutes) enrich should try to update worker assets (e.g. MaxMind DB)") ) - /** Generates an help string from a list of conifugration */ + /** Generates an help string from a list of configuration */ private def helpString(configs: List[Configuration]): String = "Possible configuration are:\n" + configs @@ -135,7 +144,8 @@ object config { enrichmentConfs: List[EnrichmentConf], labels: Map[String, String], sentryDSN: Option[String], - metrics: Boolean + metrics: Boolean, + assetsRefreshRate: Option[Duration] ) /** @@ -150,11 +160,15 @@ object config { _ <- Client.parseDefault[Id](json).leftMap(_.message).value } yield json - /** Reads a resolver file at the specfied path. */ + /** Reads a resolver file at the specified path. */ private def readResolverFile(path: String): Either[String, String] = { val file = new File(path) - if (file.exists) Source.fromFile(file).mkString.asRight - else s"Iglu resolver configuration file `$path` does not exist".asLeft + if (file.exists) { + val source = Source.fromFile(file) + val config = source.mkString + source.close() + config.asRight + } else s"Iglu resolver configuration file `$path` does not exist".asLeft } /** @@ -165,7 +179,7 @@ object config { def parseEnrichmentRegistry(enrichmentsPath: Option[String], client: Client[Id, Json]): Either[String, Json] = for { fileContents <- readEnrichmentFiles(enrichmentsPath) - jsons <- fileContents.map(JsonUtils.extractJson(_)).sequence[EitherS, Json] + jsons <- fileContents.map(JsonUtils.extractJson).sequence[EitherS, Json] schemaKey = SchemaKey( "com.snowplowanalytics.snowplow", "enrichments", diff --git a/modules/beam/src/test/scala/com.snowplowanalytics.snowplow.enrich.beam/ConfigSpec.scala b/modules/beam/src/test/scala/com.snowplowanalytics.snowplow.enrich.beam/ConfigSpec.scala index e221b05b4..962b2d336 100644 --- a/modules/beam/src/test/scala/com.snowplowanalytics.snowplow.enrich.beam/ConfigSpec.scala +++ b/modules/beam/src/test/scala/com.snowplowanalytics.snowplow.enrich.beam/ConfigSpec.scala @@ -16,23 +16,26 @@ package com.snowplowanalytics.snowplow.enrich.beam import java.nio.file.Files -import com.snowplowanalytics.iglu.core.SelfDescribingData -import com.snowplowanalytics.iglu.core.circe.CirceIgluCodecs._ import com.spotify.scio.Args + import io.circe.Json import io.circe.syntax._ -import org.scalatest._ -import matchers.should.Matchers._ -import config._ -import SpecHelpers._ +import com.snowplowanalytics.iglu.core.SelfDescribingData +import com.snowplowanalytics.iglu.core.circe.CirceIgluCodecs._ + +import org.scalatest._ +import org.scalatest.matchers.should.Matchers._ import org.scalatest.freespec.AnyFreeSpec +import com.snowplowanalytics.snowplow.enrich.beam.config._ +import com.snowplowanalytics.snowplow.enrich.beam.SpecHelpers._ + class ConfigSpec extends AnyFreeSpec with EitherValues { "the config object should" - { "make an EnrichConfig smart ctor available" - { "which fails if --job-name is not present" in { - EnrichConfig(Args(Array.empty)) shouldEqual Left( + EnrichConfig.from(Args(Array.empty)) shouldEqual Left( "Missing `job-name` argument\n" + "Missing `raw` argument\n" + "Missing `enriched` argument\n" + @@ -41,7 +44,7 @@ class ConfigSpec extends AnyFreeSpec with EitherValues { ) } "which fails if --raw is not present" in { - EnrichConfig(Args(Array("--job-name=j"))) shouldEqual Left( + EnrichConfig.from(Args(Array("--job-name=j"))) shouldEqual Left( "Missing `raw` argument\n" + "Missing `enriched` argument\n" + "Missing `bad` argument\n" + @@ -49,27 +52,27 @@ class ConfigSpec extends AnyFreeSpec with EitherValues { ) } "which fails if --enriched is not present" in { - EnrichConfig(Args(Array("--job-name=j", "--raw=i"))) shouldEqual Left( + EnrichConfig.from(Args(Array("--job-name=j", "--raw=i"))) shouldEqual Left( "Missing `enriched` argument\n" + "Missing `bad` argument\n" + "Missing `resolver` argument" ) } "which fails if --bad is not present" in { - EnrichConfig(Args(Array("--job-name=j", "--raw=i", "--enriched=o"))) shouldEqual Left( + EnrichConfig.from(Args(Array("--job-name=j", "--raw=i", "--enriched=o"))) shouldEqual Left( "Missing `bad` argument\n" + "Missing `resolver` argument" ) } "which fails if --resolver is not present" in { - EnrichConfig(Args(Array("--job-name=j", "--raw=i", "--enriched=o", "--bad=b"))) shouldEqual + EnrichConfig.from(Args(Array("--job-name=j", "--raw=i", "--enriched=o", "--bad=b"))) shouldEqual Left("Missing `resolver` argument") } "which succeeds otherwise" in { - EnrichConfig( + EnrichConfig.from( Args(Array("--job-name=j", "--raw=i", "--enriched=o", "--bad=b", "--resolver=r")) ) shouldEqual - Right(EnrichConfig("j", "i", "o", "b", None, "r", None, None, None, true)) + Right(EnrichConfig("j", "i", "o", "b", None, "r", None, None, None, true, 0)) } "which succeeds if --enrichments is present" in { val args = Args( @@ -82,16 +85,16 @@ class ConfigSpec extends AnyFreeSpec with EitherValues { "--enrichments=e" ) ) - EnrichConfig(args) shouldEqual Right( - EnrichConfig("j", "i", "o", "b", None, "r", Some("e"), None, None, true) + EnrichConfig.from(args) shouldEqual Right( + EnrichConfig("j", "i", "o", "b", None, "r", Some("e"), None, None, true, 0) ) } "which succeeds if --pii is present" in { val args = Args( Array("--job-name=j", "--raw=i", "--enriched=o", "--bad=b", "--pii=p", "--resolver=r") ) - EnrichConfig(args) shouldEqual Right( - EnrichConfig("j", "i", "o", "b", Some("p"), "r", None, None, None, true) + EnrichConfig.from(args) shouldEqual Right( + EnrichConfig("j", "i", "o", "b", Some("p"), "r", None, None, None, true, 0) ) } "which succeeds if --labels is present" in { @@ -106,7 +109,7 @@ class ConfigSpec extends AnyFreeSpec with EitherValues { "--labels={\"env\":\"abc\"}" ) ) - EnrichConfig(args) shouldEqual Right( + EnrichConfig.from(args) shouldEqual Right( EnrichConfig( "j", "i", @@ -117,7 +120,8 @@ class ConfigSpec extends AnyFreeSpec with EitherValues { None, Some("{\"env\":\"abc\"}"), None, - true + true, + 0 ) ) } @@ -133,8 +137,8 @@ class ConfigSpec extends AnyFreeSpec with EitherValues { "--sentry-dsn=DSN" ) ) - EnrichConfig(args) shouldEqual Right( - EnrichConfig("j", "i", "o", "b", Some("p"), "r", None, None, Some("DSN"), true) + EnrichConfig.from(args) shouldEqual Right( + EnrichConfig("j", "i", "o", "b", Some("p"), "r", None, None, Some("DSN"), true, 0) ) } "which respects --metrics=false" in { @@ -149,8 +153,24 @@ class ConfigSpec extends AnyFreeSpec with EitherValues { "--metrics=false" ) ) - EnrichConfig(args) shouldEqual Right( - EnrichConfig("j", "i", "o", "b", Some("p"), "r", None, None, None, false) + EnrichConfig.from(args) shouldEqual Right( + EnrichConfig("j", "i", "o", "b", Some("p"), "r", None, None, None, false, 0) + ) + } + "which succeeds if --assets-refresh-rate is present" in { + val args = Args( + Array( + "--job-name=j", + "--raw=i", + "--enriched=o", + "--bad=b", + "--pii=p", + "--resolver=r", + "--assets-refresh-rate=6000" + ) + ) + EnrichConfig.from(args) shouldEqual Right( + EnrichConfig("j", "i", "o", "b", Some("p"), "r", None, None, None, true, 6000) ) } } @@ -248,7 +268,7 @@ class ConfigSpec extends AnyFreeSpec with EitherValues { .write(Files.createTempFile(d.toAbsolutePath, name, ".json"), content.getBytes) .toFile .deleteOnExit() - val f = d.toFile() + val f = d.toFile f.deleteOnExit() f.getAbsolutePath } diff --git a/modules/beam/src/test/scala/com.snowplowanalytics.snowplow.enrich.beam/EnrichSpec.scala b/modules/beam/src/test/scala/com.snowplowanalytics.snowplow.enrich.beam/EnrichSpec.scala index 99002ecf2..2e5087d6a 100644 --- a/modules/beam/src/test/scala/com.snowplowanalytics.snowplow.enrich.beam/EnrichSpec.scala +++ b/modules/beam/src/test/scala/com.snowplowanalytics.snowplow.enrich.beam/EnrichSpec.scala @@ -17,7 +17,7 @@ package com.snowplowanalytics.snowplow.enrich.beam import java.nio.file.Paths import com.spotify.scio.ScioMetrics -import com.spotify.scio.io.PubsubIO +import com.spotify.scio.io.{CustomIO, PubsubIO} import com.spotify.scio.testing._ import org.apache.commons.codec.binary.Base64 @@ -131,4 +131,69 @@ class EnrichSpec extends PipelineSpec { .run() } + "Enrich" should "enrich a struct event with --assets-refresh-rate with IAB enrichment" in { + // This test doesn't assert refresh behavior, but will fail at start if windowing is invalid + // and from logging it should be apparent it tried to download and delete assets 3 times + val localIpFile = "./iab_ipFile" + val resourceIpFile = "/iab/ip_exclude_current_cidr.txt" + val localExcludeUaFile = "./iab_excludeUseragentFile" + val resourceExcludeUaFile = "/iab/exclude_current.txt" + val localIncludeUaFile = "./iab_includeUseragentFile" + val resourceIncludeUaFile = "/iab/include_current.txt" + val uriIpFile = s"http://snowplow-hosted-assets.s3.amazonaws.com/third-party$resourceIpFile" + val uriExcludeUaFile = s"http://snowplow-hosted-assets.s3.amazonaws.com/third-party$resourceExcludeUaFile" + val uriIncludeUaFile = s"http://snowplow-hosted-assets.s3.amazonaws.com/third-party$resourceIncludeUaFile" + + JobTest[Enrich.type] + .args( + "--job-name=j", + "--raw=in", + "--enriched=out", + "--bad=bad", + "--assets-refresh-rate=3", // Actual value doesn't matter + "--resolver=" + Paths.get(getClass.getResource("/iglu_resolver.json").toURI), + "--enrichments=" + Paths.get(getClass.getResource("/iab").toURI) + ) + .input(PubsubIO.readCoder[Array[Byte]]("in"), raw.flatMap(x => Seq(x, x, x)).map(Base64.decodeBase64)) + .input(CustomIO(AssetsManagement.SideInputName), List(0L)) + .distCache( + DistCacheIO( + Seq( + uriIpFile, + uriExcludeUaFile, + uriIncludeUaFile + ) + ), + List( + Right(AssetsManagement.FileLink(uriIpFile, resourceIpFile, localIpFile)), + Right(AssetsManagement.FileLink(uriExcludeUaFile, resourceExcludeUaFile, localExcludeUaFile)), + Right(AssetsManagement.FileLink(uriIncludeUaFile, resourceIncludeUaFile, localIncludeUaFile)) + ) + ) + .output(PubsubIO.readString("out")) { o => + o should forAll { c: String => + expected.forall(c.contains) + }; () + } + .output(PubsubIO.readString("bad")) { b => + b should beEmpty; () + } + .distribution(Enrich.enrichedEventSizeDistribution) { d => + d.getCount shouldBe 3 + () + } + .distribution(Enrich.timeToEnrichDistribution) { d => + d.getCount shouldBe 3 + () + } + .counter(ScioMetrics.counter("snowplow", "vendor_com_google_analytics")) { c => + c shouldBe 3 + () + } + .counter(ScioMetrics.counter("snowplow", "tracker_js_0_13_1")) { c => + c shouldBe 3 + () + } + .run() + } } diff --git a/modules/beam/src/test/scala/com.snowplowanalytics.snowplow.enrich.beam/enrichments/IpLookupsEnrichmentSpec.scala b/modules/beam/src/test/scala/com.snowplowanalytics.snowplow.enrich.beam/enrichments/IpLookupsEnrichmentSpec.scala index 50ff90c3e..94bfc1884 100644 --- a/modules/beam/src/test/scala/com.snowplowanalytics.snowplow.enrich.beam/enrichments/IpLookupsEnrichmentSpec.scala +++ b/modules/beam/src/test/scala/com.snowplowanalytics.snowplow.enrich.beam/enrichments/IpLookupsEnrichmentSpec.scala @@ -72,7 +72,9 @@ class IpLookupsEnrichmentSpec extends PipelineSpec { .distCache(DistCacheIO(url), List(Right(localFile))) .output(PubsubIO.readString("out")) { o => o should satisfySingleValue { c: String => - expected.forall(c.contains) // Add `println(c);` before `expected` to see the enrichment output + // Add `println(c);` before `expected` to see the enrichment output + // see https://github.com/snowplow/enrich/issues/327 + expected.forall(c.contains) }; () } .output(PubsubIO.readString("bad")) { b =>