Skip to content

Commit

Permalink
Common: add toThrift and toRaw methods to CollectorPayload (close #345)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuwy committed Sep 18, 2020
1 parent 5f16d66 commit dd45f7b
Show file tree
Hide file tree
Showing 7 changed files with 440 additions and 231 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,9 @@ class AdapterRegistry(remoteAdapters: Map[(String, String), RemoteAdapter] = Map
processor: Processor
): F[Validated[BadRow, NonEmptyList[RawEvent]]] =
(adapters.get((payload.api.vendor, payload.api.version)) match {
case Some(adapter) =>
adapter.toRawEvents(payload, client)
case _ =>
val f: FailureDetails.AdapterFailureOrTrackerProtocolViolation = FailureDetails.AdapterFailure.InputData(
case Some(adapter) => adapter.toRawEvents(payload, client)
case None =>
val f = FailureDetails.AdapterFailure.InputData(
"vendor/version",
Some(s"${payload.api.vendor}/${payload.api.version}"),
"vendor/version combination is not supported"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.enrich.common
package adapters
package registry
package com.snowplowanalytics.snowplow.enrich.common.adapters.registry

import cats.Monad
import cats.data.{NonEmptyList, ValidatedNel}
Expand All @@ -29,31 +27,31 @@ import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup
import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData}
import com.snowplowanalytics.iglu.core.circe.CirceIgluCodecs._

import com.snowplowanalytics.snowplow.badrows.FailureDetails

import io.circe._
import io.circe.syntax._

import org.apache.http.NameValuePair

import org.joda.time.{DateTime, DateTimeZone}
import org.joda.time.format.DateTimeFormat

import loaders.CollectorPayload
import utils.{HttpClient, JsonUtils => JU}
import org.joda.time.format.{DateTimeFormat, DateTimeFormatter}
import com.snowplowanalytics.snowplow.badrows.FailureDetails
import com.snowplowanalytics.snowplow.enrich.common.RawEventParameters
import com.snowplowanalytics.snowplow.enrich.common.adapters.RawEvent
import com.snowplowanalytics.snowplow.enrich.common.loaders.CollectorPayload
import com.snowplowanalytics.snowplow.enrich.common.utils.{HttpClient, JsonUtils => JU}

trait Adapter {

// Signature for a Formatter function
type FormatterFunc = (RawEventParameters) => Json
type FormatterFunc = RawEventParameters => Json

// The encoding type to be used
val EventEncType = "UTF-8"

private val AcceptedQueryParameters = Set("nuid", "aid", "cv", "eid", "ttm", "url")

// Datetime format we need to convert timestamps to
val JsonSchemaDateTimeFormat =
val JsonSchemaDateTimeFormat: DateTimeFormatter =
DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'").withZone(DateTimeZone.UTC)

private def toStringField(seconds: Long): String = {
Expand Down Expand Up @@ -112,7 +110,7 @@ trait Adapter {

/**
* Converts a CollectorPayload instance into raw events.
* @param payload The CollectorPaylod containing one or more raw events as collected by a
* @param payload The CollectorPayload containing one or more raw events as collected by a
* Snowplow collector
* @param client The Iglu client used for schema lookup and validation
* @return a Validation boxing either a NEL of RawEvents on Success, or a NEL of Failure Strings
Expand Down Expand Up @@ -168,14 +166,22 @@ trait Adapter {
): RawEventParameters = {
val params = formatter(parameters - ("nuid", "aid", "cv", "p"))
val json = toUnstructEvent(SelfDescribingData(schema, params)).noSpaces
buildUnstructEventParams(tracker, platform, parameters, json)
}

def buildUnstructEventParams(
tracker: String,
platform: String,
parameters: RawEventParameters,
json: String
): Map[String, String] =
Map(
"tv" -> tracker,
"e" -> "ue",
"p" -> parameters.getOrElse("p", platform), // Required field
"ue_pr" -> json
) ++
parameters.filterKeys(AcceptedQueryParameters)
}

/**
* Creates a Snowplow unstructured event by nesting the provided JValue in a self-describing
Expand Down Expand Up @@ -223,13 +229,7 @@ trait Adapter {
platform: String
): RawEventParameters = {
val json = toUnstructEvent(SelfDescribingData(schema, eventJson.asJson)).noSpaces
Map(
"tv" -> tracker,
"e" -> "ue",
"p" -> qsParams.getOrElse("p", platform), // Required field
"ue_pr" -> json
) ++
qsParams.filterKeys(AcceptedQueryParameters)
buildUnstructEventParams(tracker, platform, qsParams, json)
}

/**
Expand All @@ -251,14 +251,7 @@ trait Adapter {
platform: String
): RawEventParameters = {
val json = toUnstructEvent(SelfDescribingData(schema, eventJson)).noSpaces

Map(
"tv" -> tracker,
"e" -> "ue",
"p" -> qsParams.getOrElse("p", platform), // Required field
"ue_pr" -> json
) ++
qsParams.filterKeys(AcceptedQueryParameters)
buildUnstructEventParams(tracker, platform, qsParams, json)
}

/**
Expand Down Expand Up @@ -411,15 +404,15 @@ trait Adapter {
object Adapter {

/** The Iglu schema URI for a Snowplow unstructured event */
val UnstructEvent = SchemaKey(
val UnstructEvent: SchemaKey = SchemaKey(
"com.snowplowanalytics.snowplow",
"unstruct_event",
"jsonschema",
SchemaVer.Full(1, 0, 0)
)

/** The Iglu schema URI for a Snowplow custom contexts */
val Contexts = SchemaKey(
val Contexts: SchemaKey = SchemaKey(
"com.snowplowanalytics.snowplow",
"contexts",
"jsonschema",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,7 @@ object IgluAdapter extends Adapter {
)
case (None, Some(body), Some(contentType)) =>
Monad[F].pure(payloadSdJsonToEvent(payload, body, contentType, params))
case (Some(schemaUri), Some(_), Some(_)) =>
Monad[F].pure(payloadToEventWithSchema(payload, schemaUri, params))
case (Some(schemaUri), None, _) =>
case (Some(schemaUri), _, _) => // Ignore body
Monad[F].pure(payloadToEventWithSchema(payload, schemaUri, params))
case (None, None, _) =>
val nel = NonEmptyList.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,37 +10,55 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.enrich.common
package loaders
package com.snowplowanalytics.snowplow.enrich.common.loaders

import java.util.UUID

import scala.collection.JavaConverters._

import cats.syntax.either._
import cats.syntax.option._

import com.snowplowanalytics.snowplow.badrows
import com.snowplowanalytics.snowplow.badrows.{FailureDetails, NVP}

import org.apache.http.NameValuePair
import org.apache.http.client.utils.URIBuilder
import org.apache.thrift.TSerializer

import org.joda.time.DateTime

import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer}

import com.snowplowanalytics.snowplow.CollectorPayload.thrift.model1.{CollectorPayload => CollectorPayload1}

import com.snowplowanalytics.snowplow.badrows.{FailureDetails, NVP, Payload}

/**
* The canonical input format for the ETL process: it should be possible to convert any collector
* input format to this format, ready for the main, collector-agnostic stage of the ETL.
*
* Unlike `RawEvent`, where `parameters` contain a single event,
* [[CollectorPayload]]'s `body` can contain a POST payload with multiple events
* [[CollectorPayload]]'s `body` can contain a POST payload with multiple events,
* hence [[CollectorPayload]] with `body` is potentially identical to `List[RawEvent]`
* or [[CollectorPayload]] with `querystring` is identical to single `RawEvent`
*
* @param api collector's endpoint
* @param querystring GET parameters, would be empty for buffered events and most webhooks,
* an actual payload of `body` is empty
* @param contentType derived from HTTP header (should be in `Context`)
* @param body POST body, for buffered events and most webhooks,
* an actual payload if `querystring` is empty
* @param source information to identify the collector
* @param context event's meta-information, some properties can be used to augment payload
*/
final case class CollectorPayload(
api: CollectorPayload.Api,
querystring: List[NameValuePair], // Could be empty in future trackers
contentType: Option[String], // Not always set
body: Option[String], // Not set for GETs
querystring: List[NameValuePair],
contentType: Option[String],
body: Option[String],
source: CollectorPayload.Source,
context: CollectorPayload.Context
) {
def toBadRowPayload: badrows.Payload.CollectorPayload =
badrows.Payload.CollectorPayload(
def toBadRowPayload: Payload.CollectorPayload =
Payload.CollectorPayload(
api.vendor,
api.version,
querystring.map(nvp => NVP(nvp.getName, Option(nvp.getValue))),
Expand All @@ -56,39 +74,88 @@ final case class CollectorPayload(
context.headers,
context.userId
)

/**
* Cast back to Thrift-generated `CollectorPayload` class, coming from collector
* Reverse of [[ThriftLoader.toCollectorPayload]]
* Used for tests and debugging
*/
def toThrift: CollectorPayload1 = {
// Timestamp must be always set, otherwise long will fallback it to 1970-01-01
val timestamp: Long = context.timestamp.map(_.getMillis.asInstanceOf[java.lang.Long]).orNull

new CollectorPayload1(CollectorPayload.IgluUri.toSchemaUri, context.ipAddress.orNull, timestamp, source.encoding, source.name)
.setQuerystring((new URIBuilder).setParameters(querystring.asJava).build().getQuery)
.setHostname(source.hostname.orNull)
.setRefererUri(context.refererUri.orNull)
.setContentType(contentType.orNull)
.setUserAgent(context.useragent.orNull)
.setBody(body.orNull)
.setNetworkUserId(context.userId.map(_.toString).orNull)
.setHeaders(context.headers.asJava)
.setPath(api.toRaw)
}

/**
* Transform back to array of bytes coming from collector topic
* Used for tests and debugging
*/
def toRaw: Array[Byte] =
CollectorPayload.serializer.serialize(toThrift)
}

object CollectorPayload {

/** Latest payload SchemaKey */
val IgluUri: SchemaKey = SchemaKey("com.snowplowanalytics.snowplow", "CollectorPayload", "thrift", SchemaVer.Full(1, 0, 0))

/**
* Unambiguously identifies the collector source of this input line.
* @param name kind and version of the collector (e.g. ssc-1.0.1-kafka)
* @param encoding usually "UTF-8"
* @param hostname the actual host the collector was running on
*/
final case class Source(
name: String,
encoding: String,
hostname: Option[String]
)

/** Context derived by the collector. */
/**
* Information *derived* by the collector to be used as meta-data (meta-payload)
* Everything else in [[CollectorPayload]] is directly payload (body and queryparams)
* @param timestamp collector_tstamp (not optional in fact)
* @param ipAddress client's IP address, can be later overwritten by `ip` param in
* `enrichments.Transform`
* @param useragent UA header, can be later overwritten by `ua` param in `entichments.Transform`
* @param refererUri extracted from corresponding HTTP header
* @param headers all headers, including UA and referer URI
* @param userId generated by collector-set third-party cookie
*/
final case class Context(
timestamp: Option[DateTime], // Must have a timestamp
timestamp: Option[DateTime],
ipAddress: Option[String],
useragent: Option[String],
refererUri: Option[String],
headers: List[String], // Could be empty
userId: Option[UUID] // User ID generated by collector-set third-party cookie
headers: List[String],
userId: Option[UUID]
)

/** Define the vendor and version of the payload. */
final case class Api(vendor: String, version: String)
/**
* Define the vendor and version of the payload, defined by collector endpoint
* Coming from [[com.snowplowanalytics.snowplow.enrich.common.adapters.AdapterRegistry]]
*/
final case class Api(vendor: String, version: String) {

// Defaults for the tracker vendor and version before we implemented this into Snowplow.
// TODO: make private once the ThriftLoader is updated
val SnowplowTp1 = Api("com.snowplowanalytics.snowplow", "tp1")
/** Reverse back to collector's endpoint */
def toRaw: String = if (this == SnowplowTp1) "/i" else s"$vendor/$version"
}

/** Defaults for the tracker vendor and version before we implemented this into Snowplow */
val SnowplowTp1: Api = Api("com.snowplowanalytics.snowplow", "tp1")

// To extract the API vendor and version from the the path to the requested object.
// TODO: move this to somewhere not specific to this collector
private val ApiPathRegex = """^[\/]?([^\/]+)\/([^\/]+)[\/]?$""".r
private val ApiPathRegex = """^[/]?([^/]+)/([^/]+)[/]?$""".r

/**
* Parses the requested URI path to determine the specific API version this payload follows.
Expand All @@ -115,4 +182,7 @@ object CollectorPayload {
path.startsWith("/ice.png") || // Legacy name for /i
path.equals("/i") || // Legacy name for /com.snowplowanalytics.snowplow/tp1
path.startsWith("/i?")

/** Thrift serializer, used for tests and debugging with `toThrift` */
private[loaders] lazy val serializer = new TSerializer()
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import com.snowplowanalytics.iglu.core.{SchemaCriterion, SchemaKey, ParseError =
object ThriftLoader extends Loader[Array[Byte]] {
private val thriftDeserializer = new TDeserializer

private val ExpectedSchema =
private[loaders] val ExpectedSchema =
SchemaCriterion("com.snowplowanalytics.snowplow", "CollectorPayload", "thrift", 1, 0)

/** Parse Error -> Collector Payload violation */
Expand Down Expand Up @@ -133,7 +133,7 @@ object ThriftLoader extends Loader[Array[Byte]] {

val headers = Option(collectorPayload.headers).map(_.asScala.toList).getOrElse(Nil)

val ip = IpAddressExtractor.extractIpAddress(headers, collectorPayload.ipAddress).some // Required
val ip = Option(IpAddressExtractor.extractIpAddress(headers, collectorPayload.ipAddress)) // Required

val api = Option(collectorPayload.path) match {
case None =>
Expand Down Expand Up @@ -196,7 +196,7 @@ object ThriftLoader extends Loader[Array[Byte]] {

val headers = Option(snowplowRawEvent.headers).map(_.asScala.toList).getOrElse(Nil)

val ip = IpAddressExtractor.extractIpAddress(headers, snowplowRawEvent.ipAddress).some // Required
val ip = Option(IpAddressExtractor.extractIpAddress(headers, snowplowRawEvent.ipAddress)) // Required

(querystring.toValidatedNel, networkUserId).mapN { (q, nuid) =>
val timestamp = Some(new DateTime(snowplowRawEvent.timestamp, DateTimeZone.UTC))
Expand Down
Loading

0 comments on commit dd45f7b

Please sign in to comment.