Skip to content

Commit

Permalink
Beam: bump Scio to 0.10.2 (close #461)
Browse files Browse the repository at this point in the history
  • Loading branch information
oguzhanunlu authored and istreeter committed Apr 30, 2021
1 parent ead5921 commit e236a47
Show file tree
Hide file tree
Showing 33 changed files with 419 additions and 271 deletions.
18 changes: 16 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -161,15 +161,29 @@ lazy val beam =
buildInfoKeys := Seq[BuildInfoKey](organization, name, version, "sceVersion" -> version.value),
buildInfoPackage := "com.snowplowanalytics.snowplow.enrich.beam.generated",
libraryDependencies ++= Seq(
Dependencies.Libraries.scio,
Dependencies.Libraries.tcnative,
Dependencies.Libraries.scioCore,
Dependencies.Libraries.scioGCP,
Dependencies.Libraries.beam,
Dependencies.Libraries.grpc,
Dependencies.Libraries.sentry,
Dependencies.Libraries.slf4j,
Dependencies.Libraries.scioTest,
Dependencies.Libraries.scalaTest,
Dependencies.Libraries.circeLiteral % Test,
),
dependencyOverrides ++= Seq(
"io.grpc" % "grpc-alts" % Dependencies.V.grpc,
"io.grpc" % "grpc-auth" % Dependencies.V.grpc,
"io.grpc" % "grpc-core" % Dependencies.V.grpc,
"io.grpc" % "grpc-context" % Dependencies.V.grpc,
"io.grpc" % "grpc-grpclb" % Dependencies.V.grpc,
"io.grpc" % "grpc-netty" % Dependencies.V.grpc,
"io.grpc" % "grpc-netty-shaded" % Dependencies.V.grpc,
"io.grpc" % "grpc-api" % Dependencies.V.grpc,
"io.grpc" % "grpc-stub" % Dependencies.V.grpc,
"io.grpc" % "grpc-protobuf" % Dependencies.V.grpc,
"io.grpc" % "grpc-protobuf-lite" % Dependencies.V.grpc,
),
Docker / packageName := "snowplow/beam-enrich"
)
.settings(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,12 @@ import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent
import com.snowplowanalytics.snowplow.enrich.common.utils.ConversionUtils

import com.spotify.scio._
import com.spotify.scio.pubsub._
import com.spotify.scio.coders.Coder
import com.spotify.scio.pubsub.PubSubAdmin
import com.spotify.scio.values.{DistCache, SCollection}

import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions
import org.apache.beam.sdk.io.gcp.pubsub.{PubsubMessage, PubsubOptions}
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions
import org.joda.time.DateTime
import org.slf4j.LoggerFactory
Expand Down Expand Up @@ -96,8 +97,9 @@ object Enrich {
val cachedFiles: DistCache[List[Either[String, String]]] =
buildDistCache(sc, config.enrichmentConfs)

val raw: SCollection[Array[Byte]] =
sc.withName("raw-from-pubsub").pubsubSubscription[Array[Byte]](config.raw)
val in = sc.read[PubsubMessage](PubsubIO.pubsub[PubsubMessage](config.raw))(PubsubIO.ReadParam(PubsubIO.Subscription))

val raw: SCollection[Array[Byte]] = in.map(_.getPayload)

val enriched: SCollection[Validated[BadRow, EnrichedEvent]] =
enrichEvents(raw, config.resolver, config.enrichmentConfs, cachedFiles, config.sentryDSN, config.metrics)
Expand All @@ -123,7 +125,7 @@ object Enrich {
.withName("get-properly-sized-enriched")
.map(_._1)
.withName("write-enriched-to-pubsub")
.saveAsPubsub(config.enriched)
.write(PubsubIO.string(config.enriched))(PubsubIO.WriteParam())

val resizedEnriched: SCollection[BadRow] = tooBigSuccesses
.withName("resize-oversized-enriched")
Expand All @@ -139,7 +141,7 @@ object Enrich {
.withName("get-properly-sized-pii")
.map(_._1)
.withName("write-pii-to-pubsub")
.saveAsPubsub(topicPii)
.write(PubsubIO.string(topicPii))(PubsubIO.WriteParam())

tooBigPiis
.withName("resize-oversized-pii")
Expand All @@ -162,7 +164,7 @@ object Enrich {
.withName("serialize-bad-rows")
.map(_.compact)
.withName("write-bad-rows-to-pubsub")
.saveAsPubsub(config.bad)
.write(PubsubIO.string(config.bad))(PubsubIO.WriteParam())

()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ package com.snowplowanalytics.snowplow.enrich.beam
import java.nio.file.Paths

import com.spotify.scio.ScioMetrics
import com.spotify.scio.io.PubsubIO
import com.spotify.scio.pubsub.PubsubIO
import com.spotify.scio.testing._
import org.apache.commons.codec.binary.Base64
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage
import scala.jdk.CollectionConverters._

object EnrichSpec {
val raw = Seq(
Expand Down Expand Up @@ -95,14 +97,16 @@ class EnrichSpec extends PipelineSpec {
"--bad=bad",
"--resolver=" + Paths.get(getClass.getResource("/iglu_resolver.json").toURI())
)
.input(PubsubIO.readCoder[Array[Byte]]("in"), raw.map(Base64.decodeBase64))
.input(PubsubIO.pubsub[PubsubMessage]("in"),
raw.map(Base64.decodeBase64).map(arr => new PubsubMessage(arr, Map.empty[String, String].asJava))
)
.distCache(DistCacheIO(""), List.empty[Either[String, String]])
.output(PubsubIO.readString("out")) { o =>
.output(PubsubIO.string("out")) { o =>
o should satisfySingleValue { c: String =>
expected.forall(c.contains)
}; ()
}
.output(PubsubIO.readString("bad")) { b =>
.output(PubsubIO.string("bad")) { b =>
b should beEmpty; ()
}
.distribution(Enrich.enrichedEventSizeDistribution) { d =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,20 @@ package adapters
import java.nio.file.Paths

import cats.syntax.option._
import com.spotify.scio.io.PubsubIO
import com.spotify.scio.pubsub.PubsubIO
import com.spotify.scio.testing._
import io.circe.literal._
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage
import scala.jdk.CollectionConverters._

object CallrailAdapterSpec {
val raw = Seq(
SpecHelpers.buildCollectorPayload(
path = "/com.callrail/v1",
querystring =
"aid=bnb&answered=true&callercity=BAKERSFIELD&callercountry=US&callername=SKYPE+CALLER&callernum=%2B166&callerstate=CA&callerzip=93307&callsource=keyword&datetime=2014-10-09+16%3A23%3A45&destinationnum=20&duration=247&first_call=true&id=30&ip=86.178.183.7&landingpage=http%3A%2F%2Flndpage.com%2F&recording=http%3A%2F%2Fapp.callrail.com%2Fcalls%2F30%2Frecording%2F9f&referrer=direct&referrermedium=Direct&trackingnum=%2B12015911668".some
new PubsubMessage(SpecHelpers.buildCollectorPayload(
path = "/com.callrail/v1",
querystring =
"aid=bnb&answered=true&callercity=BAKERSFIELD&callercountry=US&callername=SKYPE+CALLER&callernum=%2B166&callerstate=CA&callerzip=93307&callsource=keyword&datetime=2014-10-09+16%3A23%3A45&destinationnum=20&duration=247&first_call=true&id=30&ip=86.178.183.7&landingpage=http%3A%2F%2Flndpage.com%2F&recording=http%3A%2F%2Fapp.callrail.com%2Fcalls%2F30%2Frecording%2F9f&referrer=direct&referrermedium=Direct&trackingnum=%2B12015911668".some
),
Map.empty[String, String].asJava
)
)
val expected = Map(
Expand All @@ -52,12 +56,12 @@ class CallrailAdapterSpec extends PipelineSpec {
"--bad=bad",
"--resolver=" + Paths.get(getClass.getResource("/iglu_resolver.json").toURI())
)
.input(PubsubIO.readCoder[Array[Byte]]("in"), raw)
.input(PubsubIO.pubsub[PubsubMessage]("in"), raw)
.distCache(DistCacheIO(""), List.empty[Either[String, String]])
.output(PubsubIO.readString("bad")) { b =>
.output(PubsubIO.string("bad")) { b =>
b should beEmpty; ()
}
.output(PubsubIO.readString("out")) { o =>
.output(PubsubIO.string("out")) { o =>
o should satisfySingleValue { c: String =>
SpecHelpers.compareEnrichedEvent(expected, c)
}; ()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,20 @@ package adapters
import java.nio.file.Paths

import cats.syntax.option._
import com.spotify.scio.io.PubsubIO
import com.spotify.scio.pubsub.PubsubIO
import com.spotify.scio.testing._
import io.circe.literal._
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage
import scala.jdk.CollectionConverters._

object GoogleAnalyticsAdapterSpec {
val body = "t=pageview&dh=host&dp=path"
val raw = Seq(
SpecHelpers.buildCollectorPayload(
path = "/com.google.analytics/v1",
body = body.some
new PubsubMessage(SpecHelpers.buildCollectorPayload(
path = "/com.google.analytics/v1",
body = body.some
),
Map.empty[String, String].asJava
)
)
val expected = Map(
Expand All @@ -52,12 +56,12 @@ class GoogleAnalyticsAdapterSpec extends PipelineSpec {
"--bad=bad",
"--resolver=" + Paths.get(getClass.getResource("/iglu_resolver.json").toURI())
)
.input(PubsubIO.readCoder[Array[Byte]]("in"), raw)
.input(PubsubIO.pubsub[PubsubMessage]("in"), raw)
.distCache(DistCacheIO(""), List.empty[Either[String, String]])
.output(PubsubIO.readString("bad")) { b =>
.output(PubsubIO.string("bad")) { b =>
b should beEmpty; ()
}
.output(PubsubIO.readString("out")) { o =>
.output(PubsubIO.string("out")) { o =>
o should satisfySingleValue { c: String =>
SpecHelpers.compareEnrichedEvent(expected, c)
}; ()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,22 @@ package adapters
import java.nio.file.Paths

import cats.syntax.option._
import com.spotify.scio.io.PubsubIO
import com.spotify.scio.pubsub.PubsubIO
import com.spotify.scio.testing._
import io.circe.literal._
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage
import scala.jdk.CollectionConverters._

object HubspotAdapterSpec {
val body =
json"""[{"eventId":1,"subscriptionId":25458,"portalId":4737818,"occurredAt":1539145399845,"subscriptionType":"contact.creation","attemptNumber":0,"objectId":123,"changeSource":"CRM","changeFlag":"NEW","appId":177698}]"""
val raw = Seq(
SpecHelpers.buildCollectorPayload(
path = "/com.hubspot/v1",
body = body.noSpaces.some,
contentType = "application/json".some
new PubsubMessage(SpecHelpers.buildCollectorPayload(
path = "/com.hubspot/v1",
body = body.noSpaces.some,
contentType = "application/json".some
),
Map.empty[String, String].asJava
)
)
val expected = Map(
Expand All @@ -54,12 +58,12 @@ class HubspotAdapterSpec extends PipelineSpec {
"--bad=bad",
"--resolver=" + Paths.get(getClass.getResource("/iglu_resolver.json").toURI())
)
.input(PubsubIO.readCoder[Array[Byte]]("in"), raw)
.input(PubsubIO.pubsub[PubsubMessage]("in"), raw)
.distCache(DistCacheIO(""), List.empty[Either[String, String]])
.output(PubsubIO.readString("bad")) { b =>
.output(PubsubIO.string("bad")) { b =>
b should beEmpty; ()
}
.output(PubsubIO.readString("out")) { o =>
.output(PubsubIO.string("out")) { o =>
o should satisfySingleValue { c: String =>
SpecHelpers.compareEnrichedEvent(expected, c)
}; ()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,22 @@ package adapters
import java.nio.file.Paths

import cats.syntax.option._
import com.spotify.scio.io.PubsubIO
import com.spotify.scio.pubsub.PubsubIO
import com.spotify.scio.testing._
import io.circe.literal._
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage
import scala.jdk.CollectionConverters._

object MailchimpAdapterSpec {
val body =
"type=subscribe&fired_at=2014-11-04+09%3A42%3A31&data%5Bid%5D=e7c77d3852&data%5Bemail%5D=agentsmith%40snowplowtest.com&data%5Bemail_type%5D=html&data%5Bip_opt%5D=82.225.169.220&data%5Bweb_id%5D=210833825&data%5Bmerges%5D%5BEMAIL%5D=agentsmith%40snowplowtest.com&data%5Bmerges%5D%5BFNAME%5D=Agent&data%5Bmerges%5D%5BLNAME%5D=Smith&data%5Blist_id%5D=f1243a3b12"
val raw = Seq(
SpecHelpers.buildCollectorPayload(
path = "/com.mailchimp/v1",
body = body.some,
contentType = "application/x-www-form-urlencoded".some
new PubsubMessage(SpecHelpers.buildCollectorPayload(
path = "/com.mailchimp/v1",
body = body.some,
contentType = "application/x-www-form-urlencoded".some
),
Map.empty[String, String].asJava
)
)
val expected = Map(
Expand All @@ -54,12 +58,12 @@ class MailchimpAdapterSpec extends PipelineSpec {
"--bad=bad",
"--resolver=" + Paths.get(getClass.getResource("/iglu_resolver.json").toURI())
)
.input(PubsubIO.readCoder[Array[Byte]]("in"), raw)
.input(PubsubIO.pubsub[PubsubMessage]("in"), raw)
.distCache(DistCacheIO(""), List.empty[Either[String, String]])
.output(PubsubIO.readString("bad")) { b =>
.output(PubsubIO.string("bad")) { b =>
b should beEmpty; ()
}
.output(PubsubIO.readString("out")) { o =>
.output(PubsubIO.string("out")) { o =>
o should satisfySingleValue { c: String =>
SpecHelpers.compareEnrichedEvent(expected, c)
}; ()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,22 @@ package adapters
import java.nio.file.Paths

import cats.syntax.option._
import com.spotify.scio.io.PubsubIO
import com.spotify.scio.pubsub.PubsubIO
import com.spotify.scio.testing._
import io.circe.literal._
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage
import scala.jdk.CollectionConverters._

object MailgunAdapterSpec {
val body =
"domain=sandbox57070072075d4cfd9008d4332108734c.mailgun.org&my_var_1=Mailgun+Variable+%231&my-var-2=awesome&message-headers=%5B%5B%22Received%22%2C+%22by+luna.mailgun.net+with+SMTP+mgrt+8734663311733%3B+Fri%2C+03+May+2013+18%3A26%3A27+%2B0000%22%5D%2C+%5B%22Content-Type%22%2C+%5B%22multipart%2Falternative%22%2C+%7B%22boundary%22%3A+%22eb663d73ae0a4d6c9153cc0aec8b7520%22%7D%5D%5D%2C+%5B%22Mime-Version%22%2C+%221.0%22%5D%2C+%5B%22Subject%22%2C+%22Test+deliver+webhook%22%5D%2C+%5B%22From%22%2C+%22Bob+%3Cbob%40sandbox57070072075d4cfd9008d4332108734c.mailgun.org%3E%22%5D%2C+%5B%22To%22%2C+%22Alice+%3Calice%40example.com%3E%22%5D%2C+%5B%22Message-Id%22%2C+%22%3C20130503182626.18666.16540%40sandbox57070072075d4cfd9008d4332108734c.mailgun.org%3E%22%5D%2C+%5B%22X-Mailgun-Variables%22%2C+%22%7B%5C%22my_var_1%5C%22%3A+%5C%22Mailgun+Variable+%231%5C%22%2C+%5C%22my-var-2%5C%22%3A+%5C%22awesome%5C%22%7D%22%5D%2C+%5B%22Date%22%2C+%22Fri%2C+03+May+2013+18%3A26%3A27+%2B0000%22%5D%2C+%5B%22Sender%22%2C+%22bob%40sandbox57070072075d4cfd9008d4332108734c.mailgun.org%22%5D%5D&Message-Id=%3C20130503182626.18666.16540%40sandbox57070072075d4cfd9008d4332108734c.mailgun.org%3E&recipient=alice%40example.com&event=delivered&timestamp=1510161827&token=cd87f5a30002794e37aa49e67fb46990e578b1e9197773d817&signature=c902ff9e3dea54c2dbe1871f9041653292ea9689d3d2b2d2ecfa996f025b9669&body-plain="
val raw = Seq(
SpecHelpers.buildCollectorPayload(
path = "/com.mailgun/v1",
body = body.some,
contentType = "application/x-www-form-urlencoded".some
new PubsubMessage(SpecHelpers.buildCollectorPayload(
path = "/com.mailgun/v1",
body = body.some,
contentType = "application/x-www-form-urlencoded".some
),
Map.empty[String, String].asJava
)
)
val expected = Map(
Expand All @@ -54,12 +58,12 @@ class MailgunAdapterSpec extends PipelineSpec {
"--bad=bad",
"--resolver=" + Paths.get(getClass.getResource("/iglu_resolver.json").toURI())
)
.input(PubsubIO.readCoder[Array[Byte]]("in"), raw)
.input(PubsubIO.pubsub[PubsubMessage]("in"), raw)
.distCache(DistCacheIO(""), List.empty[Either[String, String]])
.output(PubsubIO.readString("bad")) { b =>
.output(PubsubIO.string("bad")) { b =>
b should beEmpty; ()
}
.output(PubsubIO.readString("out")) { o =>
.output(PubsubIO.string("out")) { o =>
o should satisfySingleValue { c: String =>
SpecHelpers.compareEnrichedEvent(expected, c)
}; ()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,22 @@ package adapters
import java.nio.file.Paths

import cats.syntax.option._
import com.spotify.scio.io.PubsubIO
import com.spotify.scio.pubsub.PubsubIO
import com.spotify.scio.testing._
import io.circe.literal._
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage
import scala.jdk.CollectionConverters._

object MandrillAdapterSpec {
val body =
"mandrill_events=%5B%0A%20%20%20%20%7B%0A%20%20%20%20%20%20%20%20%22event%22%3A%20%22send%22%2C%0A%20%20%20%20%20%20%20%20%22msg%22%3A%20%7B%0A%20%20%20%20%20%20%20%20%20%20%20%20%22ts%22%3A%201365109999%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%22subject%22%3A%20%22This%20an%20example%20webhook%20message%22%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%22email%22%3A%20%22example.webhook%40mandrillapp.com%22%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%22sender%22%3A%20%22example.sender%40mandrillapp.com%22%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%22tags%22%3A%20%5B%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%22webhook-example%22%0A%20%20%20%20%20%20%20%20%20%20%20%20%5D%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%22opens%22%3A%20%5B%5D%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%22clicks%22%3A%20%5B%5D%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%22state%22%3A%20%22sent%22%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%22metadata%22%3A%20%7B%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%22user_id%22%3A%20111%0A%20%20%20%20%20%20%20%20%20%20%20%20%7D%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%22_id%22%3A%20%22exampleaaaaaaaaaaaaaaaaaaaaaaaaa%22%2C%0A%20%20%20%20%20%20%20%20%20%20%20%20%22_version%22%3A%20%22exampleaaaaaaaaaaaaaaa%22%0A%20%20%20%20%20%20%20%20%7D%2C%0A%20%20%20%20%20%20%20%20%22_id%22%3A%20%22exampleaaaaaaaaaaaaaaaaaaaaaaaaa%22%2C%0A%20%20%20%20%20%20%20%20%22ts%22%3A%201415692035%0A%20%20%20%20%7D%0A%5D"
val raw = Seq(
SpecHelpers.buildCollectorPayload(
path = "/com.mandrill/v1",
body = body.some,
contentType = "application/x-www-form-urlencoded".some
new PubsubMessage(SpecHelpers.buildCollectorPayload(
path = "/com.mandrill/v1",
body = body.some,
contentType = "application/x-www-form-urlencoded".some
),
Map.empty[String, String].asJava
)
)
val expected = Map(
Expand All @@ -54,12 +58,12 @@ class MandrillAdapterSpec extends PipelineSpec {
"--bad=bad",
"--resolver=" + Paths.get(getClass.getResource("/iglu_resolver.json").toURI())
)
.input(PubsubIO.readCoder[Array[Byte]]("in"), raw)
.input(PubsubIO.pubsub[PubsubMessage]("in"), raw)
.distCache(DistCacheIO(""), List.empty[Either[String, String]])
.output(PubsubIO.readString("bad")) { b =>
.output(PubsubIO.string("bad")) { b =>
b should beEmpty; ()
}
.output(PubsubIO.readString("out")) { o =>
.output(PubsubIO.string("out")) { o =>
o should satisfySingleValue { c: String =>
SpecHelpers.compareEnrichedEvent(expected, c)
}; ()
Expand Down
Loading

0 comments on commit e236a47

Please sign in to comment.