Skip to content

Commit

Permalink
Beam: crash at startup if Sentry DSN is not correct (close #398)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjben committed Nov 20, 2020
1 parent 96d90f6 commit 2843dd8
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions
import org.joda.time.DateTime
import org.slf4j.LoggerFactory
import java.net.URI

import com.snowplowanalytics.snowplow.enrich.beam.config._
import com.snowplowanalytics.snowplow.enrich.beam.singleton._
Expand All @@ -68,45 +69,26 @@ object Enrich {

def main(cmdlineArgs: Array[String]): Unit = {
val (sc, args) = ContextAndArgs(cmdlineArgs)
val parsedConfig = for {
config <- EnrichConfig(args)
_ = sc.setJobName(config.jobName)
_ <- checkTopicExists(sc, config.enriched)
_ <- checkTopicExists(sc, config.bad)
_ <- config.pii.map(checkTopicExists(sc, _)).getOrElse(().asRight)
resolverJson <- parseResolver(config.resolver)
client <- Client.parseDefault[Id](resolverJson).leftMap(_.toString).value
registryJson <- parseEnrichmentRegistry(config.enrichments, client)
confs <- EnrichmentRegistry.parse(registryJson, client, false).leftMap(_.toString).toEither
labels <- config.labels.map(parseLabels).getOrElse(Right(Map.empty[String, String]))
_ <- if (emitPii(confs) && config.pii.isEmpty)
"A pii topic needs to be used in order to use the pii enrichment".asLeft
else
().asRight
} yield ParsedEnrichConfig(
config.raw,
config.enriched,
config.bad,
config.pii,
resolverJson,
confs,
labels,
config.sentryDSN,
config.metrics
)

parsedConfig match {
val config = for {
conf <- EnrichConfig(args)
_ = sc.setJobName(conf.jobName)
_ <- checkTopicExists(sc, conf.enriched)
_ <- checkTopicExists(sc, conf.bad)
_ <- conf.pii.map(checkTopicExists(sc, _)).getOrElse(().asRight)
} yield conf

config match {
case Left(e) =>
System.err.println(e)
System.exit(1)
case Right(config) =>
run(sc, config)
case Right(c) =>
run(sc, c)
sc.run()
()
}
}

def run(sc: ScioContext, config: ParsedEnrichConfig): Unit = {
def run(sc: ScioContext, config: EnrichConfig): Unit = {
if (config.labels.nonEmpty)
sc.optionsAs[DataflowPipelineOptions].setLabels(config.labels.asJava)

Expand Down Expand Up @@ -196,7 +178,7 @@ object Enrich {
resolver: Json,
enrichmentConfs: List[EnrichmentConf],
cachedFiles: DistCache[List[Either[String, String]]],
sentryDSN: Option[String],
sentryDSN: Option[URI],
metrics: Boolean
): SCollection[Validated[BadRow, EnrichedEvent]] =
raw
Expand Down Expand Up @@ -278,7 +260,7 @@ object Enrich {
data: Array[Byte],
enrichmentRegistry: EnrichmentRegistry[Id],
client: Client[Id, Json],
sentryDSN: Option[String]
sentryDSN: Option[URI]
): List[Validated[BadRow, EnrichedEvent]] = {
val collectorPayload = ThriftLoader.toCollectorPayload(data, processor)
Either.catchNonFatal(
Expand All @@ -297,7 +279,7 @@ object Enrich {
throwable
)
sentryDSN.foreach { dsn =>
System.setProperty("sentry.dsn", dsn)
System.setProperty("sentry.dsn", dsn.toString())
Sentry.capture(throwable)
}
Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package enrich
package beam

import java.io.File
import java.net.URI

import scala.io.Source

Expand Down Expand Up @@ -48,10 +49,10 @@ object config {
enriched: String,
bad: String,
pii: Option[String],
resolver: String,
enrichments: Option[String],
labels: Option[String],
sentryDSN: Option[String],
resolver: Json,
enrichmentConfs: List[EnrichmentConf],
labels: Map[String, String],
sentryDSN: Option[URI],
metrics: Boolean
)
object EnrichConfig {
Expand All @@ -69,17 +70,32 @@ object config {
.leftMap(_.toList.mkString("\n"))
.toEither
List(jobName, raw, enriched, bad, resolver) = l
pii = args.optional("pii")
enrichments = args.optional("enrichments")
labels = args.optional("labels")
sentryDsn = args.asMap.get("sentry-dsn").map(_.mkString(",")) // see #391
metrics = args.boolean("metrics", true)
resolverJson <- parseResolver(resolver)
client <- Client.parseDefault[Id](resolverJson).leftMap(_.toString).value
registryJson <- parseEnrichmentRegistry(enrichments, client)
confs <- EnrichmentRegistry.parse(registryJson, client, false).leftMap(_.toString).toEither
labels <- labels.map(parseLabels).getOrElse(Right(Map.empty[String, String]))
sentryDSN <- parseSentryDsn(sentryDsn)
_ <- if (emitPii(confs) && pii.isEmpty)
"A pii topic needs to be used in order to use the pii enrichment".asLeft
else
().asRight
} yield EnrichConfig(
jobName,
raw,
enriched,
bad,
args.optional("pii"),
resolver,
args.optional("enrichments"),
args.optional("labels"),
args.asMap.get("sentry-dsn").map(_.mkString(",")), // see #391
args.boolean("metrics", true)
pii,
resolverJson,
confs,
labels,
sentryDSN,
metrics
)

private val configurations = List(
Expand Down Expand Up @@ -125,19 +141,6 @@ object config {

}

/** Case class holding the parsed job configuration */
final case class ParsedEnrichConfig(
raw: String,
enriched: String,
bad: String,
pii: Option[String],
resolver: Json,
enrichmentConfs: List[EnrichmentConf],
labels: Map[String, String],
sentryDSN: Option[String],
metrics: Boolean
)

/**
* Parses a resolver at the specified path.
* @param resolverPath path where the resolver is located
Expand Down Expand Up @@ -197,4 +200,17 @@ object config {
def parseLabels(input: String): Either[String, Map[String, String]] =
decode[Map[String, String]](input)
.leftMap(_ => s"Invalid `labels` format, expected json object, received: $input")

/** Create a java.net.URI from string with Sentry DSN. */
private def parseSentryDsn(maybeDSN: Option[String]): Either[String, Option[URI]] =
maybeDSN match {
case Some(uri) if (uri.startsWith("http://") || uri.startsWith("https://")) =>
Either
.catchNonFatal(URI.create(uri))
.leftMap(e => s"Could not parse Sentry DSN as URI. Error: [${e.getMessage}]")
.map(Some(_))
case Some(uri) =>
Left(s"Sentry DSN [$uri] doesn't start with http:// or https://")
case _ => None.asRight
}
}
1 change: 1 addition & 0 deletions modules/beam/src/test/resources/enrichments_wrong/foo.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
bar
Loading

0 comments on commit 2843dd8

Please sign in to comment.