Skip to content

Commit

Permalink
Beam: crash at startup if Sentry DSN is not correct (close #398)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjben committed Nov 19, 2020
1 parent 96d90f6 commit a58e7e7
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions
import org.joda.time.DateTime
import org.slf4j.LoggerFactory
import java.net.URI

import com.snowplowanalytics.snowplow.enrich.beam.config._
import com.snowplowanalytics.snowplow.enrich.beam.singleton._
Expand All @@ -68,45 +69,26 @@ object Enrich {

def main(cmdlineArgs: Array[String]): Unit = {
val (sc, args) = ContextAndArgs(cmdlineArgs)
val parsedConfig = for {
config <- EnrichConfig(args)
_ = sc.setJobName(config.jobName)
_ <- checkTopicExists(sc, config.enriched)
_ <- checkTopicExists(sc, config.bad)
_ <- config.pii.map(checkTopicExists(sc, _)).getOrElse(().asRight)
resolverJson <- parseResolver(config.resolver)
client <- Client.parseDefault[Id](resolverJson).leftMap(_.toString).value
registryJson <- parseEnrichmentRegistry(config.enrichments, client)
confs <- EnrichmentRegistry.parse(registryJson, client, false).leftMap(_.toString).toEither
labels <- config.labels.map(parseLabels).getOrElse(Right(Map.empty[String, String]))
_ <- if (emitPii(confs) && config.pii.isEmpty)
"A pii topic needs to be used in order to use the pii enrichment".asLeft
else
().asRight
} yield ParsedEnrichConfig(
config.raw,
config.enriched,
config.bad,
config.pii,
resolverJson,
confs,
labels,
config.sentryDSN,
config.metrics
)

parsedConfig match {
val config = for {
conf <- EnrichConfig(args)
_ = sc.setJobName(conf.jobName)
_ <- checkTopicExists(sc, conf.enriched)
_ <- checkTopicExists(sc, conf.bad)
_ <- conf.pii.map(checkTopicExists(sc, _)).getOrElse(().asRight)
} yield conf

config match {
case Left(e) =>
System.err.println(e)
System.exit(1)
case Right(config) =>
run(sc, config)
case Right(c) =>
run(sc, c)
sc.run()
()
}
}

def run(sc: ScioContext, config: ParsedEnrichConfig): Unit = {
def run(sc: ScioContext, config: EnrichConfig): Unit = {
if (config.labels.nonEmpty)
sc.optionsAs[DataflowPipelineOptions].setLabels(config.labels.asJava)

Expand Down Expand Up @@ -196,7 +178,7 @@ object Enrich {
resolver: Json,
enrichmentConfs: List[EnrichmentConf],
cachedFiles: DistCache[List[Either[String, String]]],
sentryDSN: Option[String],
sentryDSN: Option[URI],
metrics: Boolean
): SCollection[Validated[BadRow, EnrichedEvent]] =
raw
Expand Down Expand Up @@ -278,7 +260,7 @@ object Enrich {
data: Array[Byte],
enrichmentRegistry: EnrichmentRegistry[Id],
client: Client[Id, Json],
sentryDSN: Option[String]
sentryDSN: Option[URI]
): List[Validated[BadRow, EnrichedEvent]] = {
val collectorPayload = ThriftLoader.toCollectorPayload(data, processor)
Either.catchNonFatal(
Expand All @@ -297,7 +279,7 @@ object Enrich {
throwable
)
sentryDSN.foreach { dsn =>
System.setProperty("sentry.dsn", dsn)
System.setProperty("sentry.dsn", dsn.toString())
Sentry.capture(throwable)
}
Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package enrich
package beam

import java.io.File
import java.net.URI

import scala.io.Source

Expand Down Expand Up @@ -48,10 +49,10 @@ object config {
enriched: String,
bad: String,
pii: Option[String],
resolver: String,
enrichments: Option[String],
labels: Option[String],
sentryDSN: Option[String],
resolver: Json,
enrichmentConfs: List[EnrichmentConf],
labels: Map[String, String],
sentryDSN: Option[URI],
metrics: Boolean
)
object EnrichConfig {
Expand All @@ -69,17 +70,32 @@ object config {
.leftMap(_.toList.mkString("\n"))
.toEither
List(jobName, raw, enriched, bad, resolver) = l
pii = args.optional("pii")
enrichments = args.optional("enrichments")
labels = args.optional("labels")
sentryDsn = args.asMap.get("sentry-dsn").map(_.mkString(",")) // see #391
metrics = args.boolean("metrics", true)
resolverJson <- parseResolver(resolver)
client <- Client.parseDefault[Id](resolverJson).leftMap(_.toString).value
registryJson <- parseEnrichmentRegistry(enrichments, client)
confs <- EnrichmentRegistry.parse(registryJson, client, false).leftMap(_.toString).toEither
labels <- labels.map(parseLabels).getOrElse(Right(Map.empty[String, String]))
sentryDSN <- parseSentryDsn(sentryDsn)
_ <- if (emitPii(confs) && pii.isEmpty)
"A pii topic needs to be used in order to use the pii enrichment".asLeft
else
().asRight
} yield EnrichConfig(
jobName,
raw,
enriched,
bad,
args.optional("pii"),
resolver,
args.optional("enrichments"),
args.optional("labels"),
args.asMap.get("sentry-dsn").map(_.mkString(",")), // see #391
args.boolean("metrics", true)
pii,
resolverJson,
confs,
labels,
sentryDSN,
metrics
)

private val configurations = List(
Expand Down Expand Up @@ -125,19 +141,6 @@ object config {

}

/** Case class holding the parsed job configuration */
final case class ParsedEnrichConfig(
raw: String,
enriched: String,
bad: String,
pii: Option[String],
resolver: Json,
enrichmentConfs: List[EnrichmentConf],
labels: Map[String, String],
sentryDSN: Option[String],
metrics: Boolean
)

/**
* Parses a resolver at the specified path.
* @param resolverPath path where the resolver is located
Expand Down Expand Up @@ -197,4 +200,15 @@ object config {
def parseLabels(input: String): Either[String, Map[String, String]] =
decode[Map[String, String]](input)
.leftMap(_ => s"Invalid `labels` format, expected json object, received: $input")

/** Create a java.net.URI from string with Sentry DSN. */
private def parseSentryDsn(maybeDSN: Option[String]): Either[String, Option[URI]] =
maybeDSN match {
case Some(uri) =>
Either
.catchNonFatal(URI.create(uri))
.leftMap(e => s"Could not parse Sentry DSN as URI. Error: [${e.getMessage}]")
.map(Some(_))
case _ => None.asRight
}
}
1 change: 1 addition & 0 deletions modules/beam/src/test/resources/enrichments_wrong/foo.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
bar
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,13 @@
package com.snowplowanalytics.snowplow.enrich.beam

import java.nio.file.Files
import java.nio.file.Paths
import java.net.URI

import com.snowplowanalytics.iglu.core.SelfDescribingData
import com.snowplowanalytics.iglu.core.circe.CirceIgluCodecs._
import com.snowplowanalytics.iglu.core.SchemaKey
import com.snowplowanalytics.iglu.core.SchemaVer
import com.spotify.scio.Args
import io.circe.Json
import io.circe.syntax._
Expand All @@ -28,6 +32,8 @@ import config._
import SpecHelpers._
import org.scalatest.freespec.AnyFreeSpec

import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf.YauaaConf

class ConfigSpec extends AnyFreeSpec with EitherValues {
"the config object should" - {
"make an EnrichConfig smart ctor available" - {
Expand Down Expand Up @@ -65,93 +71,130 @@ class ConfigSpec extends AnyFreeSpec with EitherValues {
EnrichConfig(Args(Array("--job-name=j", "--raw=i", "--enriched=o", "--bad=b"))) shouldEqual
Left("Missing `resolver` argument")
}
"which succeeds otherwise" in {
EnrichConfig(
Args(Array("--job-name=j", "--raw=i", "--enriched=o", "--bad=b", "--resolver=r"))
) shouldEqual
Right(EnrichConfig("j", "i", "o", "b", None, "r", None, None, None, true))
}
"which succeeds if --enrichments is present" in {
"which fails if --resolver can't be parsed" in {
val resolverPath = Paths.get(getClass.getResource("/referer-tests.json").toURI()).toString()
val enrichmentsPath = Paths.get(getClass.getResource("/yauaa").toURI()).toString()
val args = Args(
Array(
"--job-name=j",
"--raw=i",
"--enriched=o",
"--bad=b",
"--resolver=r",
"--enrichments=e"
"--pii=p",
"--resolver=" + resolverPath,
"--enrichments=" + enrichmentsPath,
"--labels={\"env\":\"abc\"}",
"--sentry-dsn=https://foo.bar?stacktrace.app.packages=com.snowplowanalytics.snowplow.enrich.beam",
"--metrics=false"
)
)
EnrichConfig(args) shouldEqual Right(
EnrichConfig("j", "i", "o", "b", None, "r", Some("e"), None, None, true)
)
}
"which succeeds if --pii is present" in {
val args = Args(
Array("--job-name=j", "--raw=i", "--enriched=o", "--bad=b", "--pii=p", "--resolver=r")
)
EnrichConfig(args) shouldEqual Right(
EnrichConfig("j", "i", "o", "b", Some("p"), "r", None, None, None, true)
)
EnrichConfig(args) shouldEqual Left("schema key is not available")
}
"which succeeds if --labels is present" in {
"which fails if --enrichments config files can't be parsed" in {
val resolverPath = Paths.get(getClass.getResource("/iglu_resolver.json").toURI()).toString()
val enrichmentsPath = Paths.get(getClass.getResource("/enrichments_wrong").toURI()).toString()
val args = Args(
Array(
"--job-name=j",
"--raw=i",
"--enriched=o",
"--bad=b",
"--pii=p",
"--resolver=r",
"--labels={\"env\":\"abc\"}"
)
)
EnrichConfig(args) shouldEqual Right(
EnrichConfig(
"j",
"i",
"o",
"b",
Some("p"),
"r",
None,
Some("{\"env\":\"abc\"}"),
None,
true
"--resolver=" + resolverPath,
"--enrichments=" + enrichmentsPath,
"--labels={\"env\":\"abc\"}",
"--sentry-dsn=https://foo.bar?stacktrace.app.packages=com.snowplowanalytics.snowplow.enrich.beam",
"--metrics=false"
)
)
EnrichConfig(args) shouldEqual Left("invalid json: expected json value got 'bar' (line 1, column 1)")
}
"which succeeds if --sentry-dsn is present" in {
val dsn = "https://foo.bar?stacktrace.app.packages=com.snowplowanalytics.snowplow.enrich.beam&tags=cloud:GCP,pipeline_name:dev,client_name:tests,region:ES&release=1.0.0&async=false"
"which fails if --labels is not a map" in {
val resolverPath = Paths.get(getClass.getResource("/iglu_resolver.json").toURI()).toString()
val enrichmentsPath = Paths.get(getClass.getResource("/yauaa").toURI()).toString()
val args = Args(
Array(
"--job-name=j",
"--raw=i",
"--enriched=o",
"--bad=b",
"--pii=p",
"--resolver=r",
s"--sentry-dsn=$dsn"
"--resolver=" + resolverPath,
"--enrichments=" + enrichmentsPath,
"--labels=foo",
"--sentry-dsn=https://foo.bar?stacktrace.app.packages=com.snowplowanalytics.snowplow.enrich.beam",
"--metrics=false"
)
)
EnrichConfig(args) shouldEqual Right(
EnrichConfig("j", "i", "o", "b", Some("p"), "r", None, None, Some(dsn), true)
)
EnrichConfig(args) shouldEqual Left("Invalid `labels` format, expected json object, received: foo")
}
"which respects --metrics=false" in {
"which fails if --sentry-dsn is not a valid URI" in {
val resolverPath = Paths.get(getClass.getResource("/iglu_resolver.json").toURI()).toString()
val enrichmentsPath = Paths.get(getClass.getResource("/yauaa").toURI()).toString()
val args = Args(
Array(
"--job-name=j",
"--raw=i",
"--enriched=o",
"--bad=b",
"--pii=p",
"--resolver=r",
"--resolver=" + resolverPath,
"--enrichments=" + enrichmentsPath,
"--labels={\"env\":\"abc\"}",
"--sentry-dsn='https://foo.bar",
"--metrics=false"
)
)
EnrichConfig(args) shouldEqual Left(
"Could not parse Sentry DSN as URI. Error: [Illegal character in scheme name at index 0: 'https://foo.bar]"
)
}
"which succeeds if all the configuration is valid" in {
val resolverPath = Paths.get(getClass.getResource("/iglu_resolver.json").toURI()).toString()
val enrichmentsPath = Paths.get(getClass.getResource("/yauaa").toURI()).toString()
val jobName = "j"
val raw = "i"
val enriched = "o"
val bad = "b"
val pii = "p"
val resolver = parseResolver(resolverPath).getOrElse(throw new IllegalArgumentException(s"can't parse $resolverPath"))
val enrichments = List(
YauaaConf(
SchemaKey("com.snowplowanalytics.snowplow.enrichments", "yauaa_enrichment_config", "jsonschema", SchemaVer.Full(1, 0, 0)),
None
)
)
val labels = Map("env" -> "abc")
val sentryDsn = URI.create(
"https://foo.bar?stacktrace.app.packages=com.snowplowanalytics.snowplow.enrich.beam&tags=cloud:GCP,pipeline_name:dev,client_name:tests,region:ES&release=1.0.0&async=false"
)
val args = Args(
Array(
s"--job-name=$jobName",
s"--raw=$raw",
s"--enriched=$enriched",
s"--bad=$bad",
s"--pii=$pii",
"--resolver=" + resolverPath,
"--enrichments=" + enrichmentsPath,
"--labels={\"env\":\"abc\"}",
s"--sentry-dsn=${sentryDsn.toString}",
"--metrics=false"
)
)
EnrichConfig(args) shouldEqual Right(
EnrichConfig("j", "i", "o", "b", Some("p"), "r", None, None, None, false)
EnrichConfig(
jobName,
raw,
enriched,
bad,
Some(pii),
resolver,
enrichments,
labels,
Some(sentryDsn),
false
)
)
}
}
Expand Down

0 comments on commit a58e7e7

Please sign in to comment.