From f1c5e0461933901188025fa1df5f52e88a9f1d14 Mon Sep 17 00:00:00 2001 From: Benjamin Benoist Date: Fri, 20 Nov 2020 09:18:43 +0100 Subject: [PATCH] Beam: crash at startup if Sentry DSN is not correct (close #398) --- .../Enrich.scala | 50 ++---- .../config.scala | 62 +++++--- .../test/resources/enrichments_wrong/foo.json | 1 + .../ConfigSpec.scala | 150 ++++++++++++------ 4 files changed, 161 insertions(+), 102 deletions(-) create mode 100644 modules/beam/src/test/resources/enrichments_wrong/foo.json diff --git a/modules/beam/src/main/scala/com.snowplowanalytics.snowplow.enrich.beam/Enrich.scala b/modules/beam/src/main/scala/com.snowplowanalytics.snowplow.enrich.beam/Enrich.scala index d67944530..3dc297b84 100644 --- a/modules/beam/src/main/scala/com.snowplowanalytics.snowplow.enrich.beam/Enrich.scala +++ b/modules/beam/src/main/scala/com.snowplowanalytics.snowplow.enrich.beam/Enrich.scala @@ -42,6 +42,7 @@ import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions import org.joda.time.DateTime import org.slf4j.LoggerFactory +import java.net.URI import com.snowplowanalytics.snowplow.enrich.beam.config._ import com.snowplowanalytics.snowplow.enrich.beam.singleton._ @@ -68,45 +69,26 @@ object Enrich { def main(cmdlineArgs: Array[String]): Unit = { val (sc, args) = ContextAndArgs(cmdlineArgs) - val parsedConfig = for { - config <- EnrichConfig(args) - _ = sc.setJobName(config.jobName) - _ <- checkTopicExists(sc, config.enriched) - _ <- checkTopicExists(sc, config.bad) - _ <- config.pii.map(checkTopicExists(sc, _)).getOrElse(().asRight) - resolverJson <- parseResolver(config.resolver) - client <- Client.parseDefault[Id](resolverJson).leftMap(_.toString).value - registryJson <- parseEnrichmentRegistry(config.enrichments, client) - confs <- EnrichmentRegistry.parse(registryJson, client, false).leftMap(_.toString).toEither - labels <- config.labels.map(parseLabels).getOrElse(Right(Map.empty[String, String])) - _ <- if (emitPii(confs) && config.pii.isEmpty) - "A pii topic needs to be used in order to use the pii enrichment".asLeft - else - ().asRight - } yield ParsedEnrichConfig( - config.raw, - config.enriched, - config.bad, - config.pii, - resolverJson, - confs, - labels, - config.sentryDSN, - config.metrics - ) - - parsedConfig match { + val config = for { + conf <- EnrichConfig(args) + _ = sc.setJobName(conf.jobName) + _ <- checkTopicExists(sc, conf.enriched) + _ <- checkTopicExists(sc, conf.bad) + _ <- conf.pii.map(checkTopicExists(sc, _)).getOrElse(().asRight) + } yield conf + + config match { case Left(e) => System.err.println(e) System.exit(1) - case Right(config) => - run(sc, config) + case Right(c) => + run(sc, c) sc.run() () } } - def run(sc: ScioContext, config: ParsedEnrichConfig): Unit = { + def run(sc: ScioContext, config: EnrichConfig): Unit = { if (config.labels.nonEmpty) sc.optionsAs[DataflowPipelineOptions].setLabels(config.labels.asJava) @@ -196,7 +178,7 @@ object Enrich { resolver: Json, enrichmentConfs: List[EnrichmentConf], cachedFiles: DistCache[List[Either[String, String]]], - sentryDSN: Option[String], + sentryDSN: Option[URI], metrics: Boolean ): SCollection[Validated[BadRow, EnrichedEvent]] = raw @@ -278,7 +260,7 @@ object Enrich { data: Array[Byte], enrichmentRegistry: EnrichmentRegistry[Id], client: Client[Id, Json], - sentryDSN: Option[String] + sentryDSN: Option[URI] ): List[Validated[BadRow, EnrichedEvent]] = { val collectorPayload = ThriftLoader.toCollectorPayload(data, processor) Either.catchNonFatal( @@ -297,7 +279,7 @@ object Enrich { throwable ) sentryDSN.foreach { dsn => - System.setProperty("sentry.dsn", dsn) + System.setProperty("sentry.dsn", dsn.toString()) Sentry.capture(throwable) } Nil diff --git a/modules/beam/src/main/scala/com.snowplowanalytics.snowplow.enrich.beam/config.scala b/modules/beam/src/main/scala/com.snowplowanalytics.snowplow.enrich.beam/config.scala index 419dabc45..f773c32af 100644 --- a/modules/beam/src/main/scala/com.snowplowanalytics.snowplow.enrich.beam/config.scala +++ b/modules/beam/src/main/scala/com.snowplowanalytics.snowplow.enrich.beam/config.scala @@ -18,6 +18,7 @@ package enrich package beam import java.io.File +import java.net.URI import scala.io.Source @@ -48,10 +49,10 @@ object config { enriched: String, bad: String, pii: Option[String], - resolver: String, - enrichments: Option[String], - labels: Option[String], - sentryDSN: Option[String], + resolver: Json, + enrichmentConfs: List[EnrichmentConf], + labels: Map[String, String], + sentryDSN: Option[URI], metrics: Boolean ) object EnrichConfig { @@ -69,17 +70,32 @@ object config { .leftMap(_.toList.mkString("\n")) .toEither List(jobName, raw, enriched, bad, resolver) = l + pii = args.optional("pii") + enrichments = args.optional("enrichments") + labels = args.optional("labels") + sentryDsn = args.asMap.get("sentry-dsn").map(_.mkString(",")) // see #391 + metrics = args.boolean("metrics", true) + resolverJson <- parseResolver(resolver) + client <- Client.parseDefault[Id](resolverJson).leftMap(_.toString).value + registryJson <- parseEnrichmentRegistry(enrichments, client) + confs <- EnrichmentRegistry.parse(registryJson, client, false).leftMap(_.toString).toEither + labels <- labels.map(parseLabels).getOrElse(Right(Map.empty[String, String])) + sentryDSN <- parseSentryDsn(sentryDsn) + _ <- if (emitPii(confs) && pii.isEmpty) + "A pii topic needs to be used in order to use the pii enrichment".asLeft + else + ().asRight } yield EnrichConfig( jobName, raw, enriched, bad, - args.optional("pii"), - resolver, - args.optional("enrichments"), - args.optional("labels"), - args.asMap.get("sentry-dsn").map(_.mkString(",")), // see #391 - args.boolean("metrics", true) + pii, + resolverJson, + confs, + labels, + sentryDSN, + metrics ) private val configurations = List( @@ -125,19 +141,6 @@ object config { } - /** Case class holding the parsed job configuration */ - final case class ParsedEnrichConfig( - raw: String, - enriched: String, - bad: String, - pii: Option[String], - resolver: Json, - enrichmentConfs: List[EnrichmentConf], - labels: Map[String, String], - sentryDSN: Option[String], - metrics: Boolean - ) - /** * Parses a resolver at the specified path. * @param resolverPath path where the resolver is located @@ -197,4 +200,17 @@ object config { def parseLabels(input: String): Either[String, Map[String, String]] = decode[Map[String, String]](input) .leftMap(_ => s"Invalid `labels` format, expected json object, received: $input") + + /** Create a java.net.URI from string with Sentry DSN. */ + private def parseSentryDsn(maybeDSN: Option[String]): Either[String, Option[URI]] = + maybeDSN match { + case Some(uri) if uri.startsWith("http://") || uri.startsWith("https://") => + Either + .catchNonFatal(URI.create(uri)) + .leftMap(e => s"Could not parse Sentry DSN as URI. Error: [${e.getMessage}]") + .map(Some(_)) + case Some(uri) => + Left(s"Sentry DSN [$uri] doesn't start with http:// or https://") + case _ => None.asRight + } } diff --git a/modules/beam/src/test/resources/enrichments_wrong/foo.json b/modules/beam/src/test/resources/enrichments_wrong/foo.json new file mode 100644 index 000000000..ba0e162e1 --- /dev/null +++ b/modules/beam/src/test/resources/enrichments_wrong/foo.json @@ -0,0 +1 @@ +bar \ No newline at end of file diff --git a/modules/beam/src/test/scala/com.snowplowanalytics.snowplow.enrich.beam/ConfigSpec.scala b/modules/beam/src/test/scala/com.snowplowanalytics.snowplow.enrich.beam/ConfigSpec.scala index f513e4b45..812e4c79d 100644 --- a/modules/beam/src/test/scala/com.snowplowanalytics.snowplow.enrich.beam/ConfigSpec.scala +++ b/modules/beam/src/test/scala/com.snowplowanalytics.snowplow.enrich.beam/ConfigSpec.scala @@ -15,9 +15,13 @@ package com.snowplowanalytics.snowplow.enrich.beam import java.nio.file.Files +import java.nio.file.Paths +import java.net.URI import com.snowplowanalytics.iglu.core.SelfDescribingData import com.snowplowanalytics.iglu.core.circe.CirceIgluCodecs._ +import com.snowplowanalytics.iglu.core.SchemaKey +import com.snowplowanalytics.iglu.core.SchemaVer import com.spotify.scio.Args import io.circe.Json import io.circe.syntax._ @@ -28,6 +32,8 @@ import config._ import SpecHelpers._ import org.scalatest.freespec.AnyFreeSpec +import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf.YauaaConf + class ConfigSpec extends AnyFreeSpec with EitherValues { "the config object should" - { "make an EnrichConfig smart ctor available" - { @@ -65,36 +71,47 @@ class ConfigSpec extends AnyFreeSpec with EitherValues { EnrichConfig(Args(Array("--job-name=j", "--raw=i", "--enriched=o", "--bad=b"))) shouldEqual Left("Missing `resolver` argument") } - "which succeeds otherwise" in { - EnrichConfig( - Args(Array("--job-name=j", "--raw=i", "--enriched=o", "--bad=b", "--resolver=r")) - ) shouldEqual - Right(EnrichConfig("j", "i", "o", "b", None, "r", None, None, None, true)) - } - "which succeeds if --enrichments is present" in { + "which fails if --resolver can't be parsed" in { + val resolverPath = Paths.get(getClass.getResource("/referer-tests.json").toURI()).toString() + val enrichmentsPath = Paths.get(getClass.getResource("/yauaa").toURI()).toString() val args = Args( Array( "--job-name=j", "--raw=i", "--enriched=o", "--bad=b", - "--resolver=r", - "--enrichments=e" + "--pii=p", + "--resolver=" + resolverPath, + "--enrichments=" + enrichmentsPath, + "--labels={\"env\":\"abc\"}", + "--sentry-dsn=https://foo.bar?stacktrace.app.packages=com.snowplowanalytics.snowplow.enrich.beam", + "--metrics=false" ) ) - EnrichConfig(args) shouldEqual Right( - EnrichConfig("j", "i", "o", "b", None, "r", Some("e"), None, None, true) - ) + EnrichConfig(args) shouldEqual Left("schema key is not available") } - "which succeeds if --pii is present" in { + "which fails if --enrichments config files can't be parsed" in { + val resolverPath = Paths.get(getClass.getResource("/iglu_resolver.json").toURI()).toString() + val enrichmentsPath = Paths.get(getClass.getResource("/enrichments_wrong").toURI()).toString() val args = Args( - Array("--job-name=j", "--raw=i", "--enriched=o", "--bad=b", "--pii=p", "--resolver=r") - ) - EnrichConfig(args) shouldEqual Right( - EnrichConfig("j", "i", "o", "b", Some("p"), "r", None, None, None, true) + Array( + "--job-name=j", + "--raw=i", + "--enriched=o", + "--bad=b", + "--pii=p", + "--resolver=" + resolverPath, + "--enrichments=" + enrichmentsPath, + "--labels={\"env\":\"abc\"}", + "--sentry-dsn=https://foo.bar?stacktrace.app.packages=com.snowplowanalytics.snowplow.enrich.beam", + "--metrics=false" + ) ) + EnrichConfig(args) shouldEqual Left("invalid json: expected json value got 'bar' (line 1, column 1)") } - "which succeeds if --labels is present" in { + "which fails if --labels is not a map" in { + val resolverPath = Paths.get(getClass.getResource("/iglu_resolver.json").toURI()).toString() + val enrichmentsPath = Paths.get(getClass.getResource("/yauaa").toURI()).toString() val args = Args( Array( "--job-name=j", @@ -102,27 +119,18 @@ class ConfigSpec extends AnyFreeSpec with EitherValues { "--enriched=o", "--bad=b", "--pii=p", - "--resolver=r", - "--labels={\"env\":\"abc\"}" - ) - ) - EnrichConfig(args) shouldEqual Right( - EnrichConfig( - "j", - "i", - "o", - "b", - Some("p"), - "r", - None, - Some("{\"env\":\"abc\"}"), - None, - true + "--resolver=" + resolverPath, + "--enrichments=" + enrichmentsPath, + "--labels=foo", + "--sentry-dsn=https://foo.bar?stacktrace.app.packages=com.snowplowanalytics.snowplow.enrich.beam", + "--metrics=false" ) ) + EnrichConfig(args) shouldEqual Left("Invalid `labels` format, expected json object, received: foo") } - "which succeeds if --sentry-dsn is present" in { - val dsn = "https://foo.bar?stacktrace.app.packages=com.snowplowanalytics.snowplow.enrich.beam&tags=cloud:GCP,pipeline_name:dev,client_name:tests,region:ES&release=1.0.0&async=false" + "which fails if --sentry-dsn is not a valid URI" in { + val resolverPath = Paths.get(getClass.getResource("/iglu_resolver.json").toURI()).toString() + val enrichmentsPath = Paths.get(getClass.getResource("/yauaa").toURI()).toString() val args = Args( Array( "--job-name=j", @@ -130,28 +138,80 @@ class ConfigSpec extends AnyFreeSpec with EitherValues { "--enriched=o", "--bad=b", "--pii=p", - "--resolver=r", - s"--sentry-dsn=$dsn" + "--resolver=" + resolverPath, + "--enrichments=" + enrichmentsPath, + "--labels={\"env\":\"abc\"}", + "--sentry-dsn=http://", + "--metrics=false" ) ) - EnrichConfig(args) shouldEqual Right( - EnrichConfig("j", "i", "o", "b", Some("p"), "r", None, None, Some(dsn), true) + EnrichConfig(args) shouldEqual Left( + "Could not parse Sentry DSN as URI. Error: [Expected authority at index 7: http://]" ) - } - "which respects --metrics=false" in { - val args = Args( + val args2 = Args( Array( "--job-name=j", "--raw=i", "--enriched=o", "--bad=b", "--pii=p", - "--resolver=r", + "--resolver=" + resolverPath, + "--enrichments=" + enrichmentsPath, + "--labels={\"env\":\"abc\"}", + "--sentry-dsn=ftp://hello", + "--metrics=false" + ) + ) + EnrichConfig(args2) shouldEqual Left( + "Sentry DSN [ftp://hello] doesn't start with http:// or https://" + ) + } + "which succeeds if all the configuration is valid" in { + val resolverPath = Paths.get(getClass.getResource("/iglu_resolver.json").toURI()).toString() + val enrichmentsPath = Paths.get(getClass.getResource("/yauaa").toURI()).toString() + val jobName = "j" + val raw = "i" + val enriched = "o" + val bad = "b" + val pii = "p" + val resolver = parseResolver(resolverPath).getOrElse(throw new IllegalArgumentException(s"can't parse $resolverPath")) + val enrichments = List( + YauaaConf( + SchemaKey("com.snowplowanalytics.snowplow.enrichments", "yauaa_enrichment_config", "jsonschema", SchemaVer.Full(1, 0, 0)), + None + ) + ) + val labels = Map("env" -> "abc") + val sentryDsn = URI.create( + "https://foo.bar?stacktrace.app.packages=com.snowplowanalytics.snowplow.enrich.beam&tags=cloud:GCP,pipeline_name:dev,client_name:tests,region:ES&release=1.0.0&async=false" + ) + val args = Args( + Array( + s"--job-name=$jobName", + s"--raw=$raw", + s"--enriched=$enriched", + s"--bad=$bad", + s"--pii=$pii", + "--resolver=" + resolverPath, + "--enrichments=" + enrichmentsPath, + "--labels={\"env\":\"abc\"}", + s"--sentry-dsn=${sentryDsn.toString}", "--metrics=false" ) ) EnrichConfig(args) shouldEqual Right( - EnrichConfig("j", "i", "o", "b", Some("p"), "r", None, None, None, false) + EnrichConfig( + jobName, + raw, + enriched, + bad, + Some(pii), + resolver, + enrichments, + labels, + Some(sentryDsn), + false + ) ) } }