Skip to content

Commit

Permalink
Beam: make metrics optional (close #285)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuwy committed Jul 12, 2020
1 parent 0b97d2e commit 5e35898
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,38 @@
*/
package com.snowplowanalytics.snowplow.enrich.beam

import scala.collection.JavaConverters._

import io.circe.Json

import io.sentry.Sentry

import cats.Id
import cats.data.Validated
import cats.implicits._

import com.snowplowanalytics.iglu.client.Client
import com.snowplowanalytics.snowplow.badrows._
import com.snowplowanalytics.snowplow.badrows.{BadRow, Processor}
import com.snowplowanalytics.snowplow.enrich.common.EtlPipeline
import com.snowplowanalytics.snowplow.enrich.common.adapters.AdapterRegistry
import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf
import com.snowplowanalytics.snowplow.enrich.common.loaders.ThriftLoader
import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent

import com.spotify.scio._
import com.spotify.scio.coders.Coder
import com.spotify.scio.pubsub.PubSubAdmin
import com.spotify.scio.values.{DistCache, SCollection}
import _root_.io.circe.Json

import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions
import org.joda.time.DateTime
import org.slf4j.LoggerFactory
import _root_.io.sentry.Sentry
import scala.collection.JavaConverters._
import config._
import singleton._
import utils._

import com.snowplowanalytics.snowplow.enrich.beam.config._
import com.snowplowanalytics.snowplow.enrich.beam.singleton._
import com.snowplowanalytics.snowplow.enrich.beam.utils._

/** Enrich job using the Beam API through SCIO */
object Enrich {
Expand Down Expand Up @@ -83,7 +90,8 @@ object Enrich {
resolverJson,
confs,
labels,
config.sentryDSN
config.sentryDSN,
config.metrics
)

parsedConfig match {
Expand All @@ -108,7 +116,7 @@ object Enrich {
sc.withName("raw-from-pubsub").pubsubSubscription[Array[Byte]](config.raw)

val enriched: SCollection[Validated[BadRow, EnrichedEvent]] =
enrichEvents(raw, config.resolver, config.enrichmentConfs, cachedFiles, config.sentryDSN)
enrichEvents(raw, config.resolver, config.enrichmentConfs, cachedFiles, config.sentryDSN, config.metrics)

val (failures, successes): (SCollection[BadRow], SCollection[EnrichedEvent]) = {
val enrichedPartitioned = enriched.withName("split-enriched-good-bad").partition(_.isValid)
Expand All @@ -126,7 +134,7 @@ object Enrich {
(failures, successes)
}

val (tooBigSuccesses, properlySizedSuccesses) = formatEnrichedEvents(successes)
val (tooBigSuccesses, properlySizedSuccesses) = formatEnrichedEvents(config.metrics, successes)
properlySizedSuccesses
.withName("get-properly-sized-enriched")
.map(_._1)
Expand Down Expand Up @@ -187,7 +195,8 @@ object Enrich {
resolver: Json,
enrichmentConfs: List[EnrichmentConf],
cachedFiles: DistCache[List[Either[String, String]]],
sentryDSN: Option[String]
sentryDSN: Option[String],
metrics: Boolean
): SCollection[Validated[BadRow, EnrichedEvent]] =
raw
.withName("enrich")
Expand All @@ -201,7 +210,7 @@ object Enrich {
sentryDSN
)
}
timeToEnrichDistribution.update(time)
if (metrics) timeToEnrichDistribution.update(time) else ()
enriched
}
.withName("flatten-enriched")
Expand All @@ -213,15 +222,19 @@ object Enrich {
* @param enriched collection of events that went through the enrichment phase
* @return a collection of properly-sized enriched events and another of oversized ones
*/
private def formatEnrichedEvents(enriched: SCollection[EnrichedEvent]): (SCollection[(String, Int)], SCollection[(String, Int)]) =
private def formatEnrichedEvents(
metrics: Boolean,
enriched: SCollection[EnrichedEvent]
): (SCollection[(String, Int)], SCollection[(String, Int)]) =
enriched
.withName("format-enriched")
.map { enrichedEvent =>
getEnrichedEventMetrics(enrichedEvent)
.foreach(ScioMetrics.counter(MetricsNamespace, _).inc())
if (metrics)
getEnrichedEventMetrics(enrichedEvent).foreach(metric => ScioMetrics.counter(MetricsNamespace, metric).inc())
else ()
val formattedEnrichedEvent = tabSeparatedEnrichedEvent(enrichedEvent)
val size = getSize(formattedEnrichedEvent)
enrichedEventSizeDistribution.update(size.toLong)
if (metrics) enrichedEventSizeDistribution.update(size.toLong) else ()
(formattedEnrichedEvent, size)
}
.withName("split-oversized")
Expand Down Expand Up @@ -266,7 +279,6 @@ object Enrich {
client: Client[Id, Json],
sentryDSN: Option[String]
): List[Validated[BadRow, EnrichedEvent]] = {
val processor = Processor(generated.BuildInfo.name, generated.BuildInfo.version)
val collectorPayload = ThriftLoader.toCollectorPayload(data, processor)
Either.catchNonFatal(
EtlPipeline.processEvents(
Expand All @@ -293,16 +305,15 @@ object Enrich {
}

/**
* Builds a SCIO's [[DistCache]] which downloads the needed files and create the necessary
* Builds a Scio's [[DistCache]] which downloads the needed files and create the necessary
* symlinks.
* @param sc [[ScioContext]]
* @param sc Scio context
* @param enrichmentConfs list of enrichment configurations
* @return a properly build [[DistCache]]
*/
private def buildDistCache(sc: ScioContext, enrichmentConfs: List[EnrichmentConf]): DistCache[List[Either[String, String]]] = {
val filesToCache: List[(String, String)] = enrichmentConfs
.map(_.filesToCache)
.flatten
.flatMap(_.filesToCache)
.map { case (uri, sl) => (uri.toString, sl) }
sc.distCache(filesToCache.map(_._1)) { files =>
val symLinks = files.toList
Expand All @@ -318,7 +329,7 @@ object Enrich {

/**
* Checks a PubSub topic exists before launching the job.
* @param sc [[ScioContext]]
* @param sc Scio Context
* @param topicName name of the topic to check for existence, projects/{project}/topics/{topic}
* @return Right if it exists, left otherwise
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ object config {
resolver: String,
enrichments: Option[String],
labels: Option[String],
sentryDSN: Option[String]
sentryDSN: Option[String],
metrics: Boolean
)
object EnrichConfig {

Expand All @@ -61,7 +62,7 @@ object config {
_ <- if (args.optional("help").isDefined) helpString(configurations).asLeft else "".asRight
l <- configurations
.collect {
case RequiredConfiguration(key, _) =>
case Configuration.Required(key, _) =>
args.optional(key).toValidNel(s"Missing `$key` argument")
}
.sequence[ValidatedNelS, String]
Expand All @@ -77,34 +78,36 @@ object config {
resolver,
args.optional("enrichments"),
args.optional("labels"),
args.optional("sentry-dsn")
args.optional("sentry-dsn"),
args.boolean("metrics", true)
)

private val configurations = List(
RequiredConfiguration("job-name", "Name of the Dataflow job that will be launched"),
RequiredConfiguration(
Configuration.Required("job-name", "Name of the Dataflow job that will be launched"),
Configuration.Required(
"raw",
"Name of the subscription to the raw topic projects/{project}/subscriptions/{subscription}"
),
RequiredConfiguration(
Configuration.Required(
"enriched",
"Name of the enriched topic projects/{project}/topics/{topic}"
),
RequiredConfiguration("bad", "Name of the bad topic projects/{project}/topics/{topic}"),
OptionalConfiguration("pii", "Name of the pii topic projects/{project}/topics/{topic}"),
RequiredConfiguration("resolver", "Path to the resolver file"),
OptionalConfiguration("enrichments", "Path to the directory containing the enrichment files"),
OptionalConfiguration("labels", "Dataflow labels to be set ie. env=qa1;region=eu"),
OptionalConfiguration("sentry-dsn", "Sentry DSN")
Configuration.Required("bad", "Name of the bad topic projects/{project}/topics/{topic}"),
Configuration.Optional("pii", "Name of the pii topic projects/{project}/topics/{topic}"),
Configuration.Required("resolver", "Path to the resolver file"),
Configuration.Optional("enrichments", "Path to the directory containing the enrichment files"),
Configuration.Optional("labels", "Dataflow labels to be set ie. env=qa1;region=eu"),
Configuration.Optional("sentry-dsn", "Sentry DSN"),
Configuration.Optional("metrics", "Enable ScioMetrics (default: true)")
)

/** Generates an help string from a list of conifugration */
private def helpString(configs: List[Configuration]): String =
"Possible configuration are:\n" +
configs
.map {
case OptionalConfiguration(key, desc) => s"--$key=VALUE, optional, $desc"
case RequiredConfiguration(key, desc) => s"--$key=VALUE, required, $desc"
case Configuration.Optional(key, desc) => s"--$key=VALUE, optional, $desc"
case Configuration.Required(key, desc) => s"--$key=VALUE, required, $desc"
}
.mkString("\n") +
"\n--help, Display this message" +
Expand All @@ -116,8 +119,11 @@ object config {
def key: String
def desc: String
}
final case class OptionalConfiguration(key: String, desc: String) extends Configuration
final case class RequiredConfiguration(key: String, desc: String) extends Configuration
object Configuration {
final case class Optional(key: String, desc: String) extends Configuration
final case class Required(key: String, desc: String) extends Configuration

}

/** Case class holding the parsed job configuration */
final case class ParsedEnrichConfig(
Expand All @@ -128,7 +134,8 @@ object config {
resolver: Json,
enrichmentConfs: List[EnrichmentConf],
labels: Map[String, String],
sentryDSN: Option[String]
sentryDSN: Option[String],
metrics: Boolean
)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class ConfigSpec extends AnyFreeSpec with EitherValues {
EnrichConfig(
Args(Array("--job-name=j", "--raw=i", "--enriched=o", "--bad=b", "--resolver=r"))
) shouldEqual
Right(EnrichConfig("j", "i", "o", "b", None, "r", None, None, None))
Right(EnrichConfig("j", "i", "o", "b", None, "r", None, None, None, true))
}
"which succeeds if --enrichments is present" in {
val args = Args(
Expand All @@ -83,15 +83,15 @@ class ConfigSpec extends AnyFreeSpec with EitherValues {
)
)
EnrichConfig(args) shouldEqual Right(
EnrichConfig("j", "i", "o", "b", None, "r", Some("e"), None, None)
EnrichConfig("j", "i", "o", "b", None, "r", Some("e"), None, None, true)
)
}
"which succeeds if --pii is present" in {
val args = Args(
Array("--job-name=j", "--raw=i", "--enriched=o", "--bad=b", "--pii=p", "--resolver=r")
)
EnrichConfig(args) shouldEqual Right(
EnrichConfig("j", "i", "o", "b", Some("p"), "r", None, None, None)
EnrichConfig("j", "i", "o", "b", Some("p"), "r", None, None, None, true)
)
}
"which succeeds if --labels is present" in {
Expand All @@ -116,7 +116,8 @@ class ConfigSpec extends AnyFreeSpec with EitherValues {
"r",
None,
Some("{\"env\":\"abc\"}"),
None
None,
true
)
)
}
Expand All @@ -133,7 +134,23 @@ class ConfigSpec extends AnyFreeSpec with EitherValues {
)
)
EnrichConfig(args) shouldEqual Right(
EnrichConfig("j", "i", "o", "b", Some("p"), "r", None, None, Some("DSN"))
EnrichConfig("j", "i", "o", "b", Some("p"), "r", None, None, Some("DSN"), true)
)
}
"which respects --metrics=false" in {
val args = Args(
Array(
"--job-name=j",
"--raw=i",
"--enriched=o",
"--bad=b",
"--pii=p",
"--resolver=r",
"--metrics=false"
)
)
EnrichConfig(args) shouldEqual Right(
EnrichConfig("j", "i", "o", "b", Some("p"), "r", None, None, None, false)
)
}
}
Expand Down

0 comments on commit 5e35898

Please sign in to comment.