Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream FS2: add a test specification with POST payload #403

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
package com.snowplowanalytics.snowplow.enrich.fs2

import java.time.Instant
import java.util.UUID
import java.util.{Base64, UUID}

import scala.concurrent.duration._

Expand All @@ -29,24 +29,22 @@ import _root_.io.circe.literal._

import org.apache.http.NameValuePair
import org.apache.http.message.BasicNameValuePair

import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer}
import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData}

import com.snowplowanalytics.snowplow.analytics.scalasdk.Event
import com.snowplowanalytics.snowplow.badrows.{Processor, BadRow, Payload => BadRowPayload}

import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.IpLookupsEnrichment
import com.snowplowanalytics.snowplow.enrich.common.loaders.CollectorPayload
import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent

import com.snowplowanalytics.snowplow.enrich.fs2.EnrichSpec.{Expected, minimalEvent, normalizeResult}
import com.snowplowanalytics.snowplow.enrich.fs2.EnrichSpec.{normalizeResult, Expected, minimalEvent, contexts}
import com.snowplowanalytics.snowplow.enrich.fs2.test._

import org.specs2.ScalaCheck
import org.specs2.mutable.Specification

import cats.effect.testing.specs2.CatsIO

import com.snowplowanalytics.snowplow.analytics.scalasdk.SnowplowEvent.Contexts

import org.specs2.scalacheck.Parameters

class EnrichSpec extends Specification with CatsIO with ScalaCheck {
Expand Down Expand Up @@ -80,6 +78,41 @@ class EnrichSpec extends Specification with CatsIO with ScalaCheck {
}
}

"enrich a minimal POST page_view CollectorPayload event with newline and without any enrichments enabled" in {
val expected = minimalEvent
.copy(
etl_tstamp = Some(Instant.ofEpochMilli(SpecHelpers.StaticTime)),
user_ipaddress = Some("175.16.199.0"),
benjben marked this conversation as resolved.
Show resolved Hide resolved
event = Some("page_view"),
event_vendor = Some("com.snowplowanalytics.snowplow"),
event_name = Some("page_view"),
event_format = Some("jsonschema"),
event_version = Some("1-0-0"),
platform = Some("app"),
v_tracker = Some("test-0.0.1"),
contexts = Contexts(List(
SelfDescribingData(
SchemaKey("com.snowplowanalytics.snowplow", "change_form","jsonschema", SchemaVer.Full(1,0,0)),
json"""{"formId":"a","elementId":"b","nodeName":"TEXTAREA","value":"line 1\nline2\tcolumn2"}"""
benjben marked this conversation as resolved.
Show resolved Hide resolved
)
)),
derived_tstamp = Some(Instant.ofEpochMilli(0L))
)

TestEnvironment.ioBlocker.use { blocker =>
Enrich
.enrichWith(TestEnvironment.enrichmentReg.pure[IO], blocker, TestEnvironment.igluClient, None, _ => IO.unit)(
EnrichSpec.payloadPost[IO]
)
.map(normalizeResult)
.map {
case List(Validated.Valid(event)) => event must beEqualTo(expected)
case other => ko(s"Expected one valid event, got $other")
}
}
}


"enrich a randomly generated page view event" in {
implicit val cpGen = PayloadGen.getPageViewArbitrary
prop { (collectorPayload: CollectorPayload) =>
Expand Down Expand Up @@ -189,6 +222,12 @@ object EnrichSpec {
def payload[F[_]: Applicative]: Payload[F, Array[Byte]] =
Payload(colllectorPayload.toRaw, Applicative[F].unit)

val changeForm = "{\"schema\":\"iglu:com.snowplowanalytics.snowplow/change_form/jsonschema/1-0-0\",\"data\":{\"formId\":\"a\",\"elementId\":\"b\",\"nodeName\":\"TEXTAREA\",\"value\":\"line 1\\nline2\\tcolumn2\"}}"
val contexts = new String(Base64.getUrlEncoder.encode(s"""{"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1","data":[$changeForm]}""".getBytes))
val bodyPost = s"""{"schema":"iglu:com.snowplowanalytics.snowplow/payload_data/jsonschema/1-0-4","data":[{"eid":"$eventId","e":"pv","cx":"$contexts","p":"app","tv":"test-0.0.1"}]}"""
def payloadPost[F[_]: Applicative]: Payload[F, Array[Byte]] =
Payload(colllectorPayload.copy(querystring = Nil, body = Some(bodyPost), contentType = Some("application/json")).toRaw, Applicative[F].unit)

def normalize(payload: Payload[IO, EnrichedEvent]) =
Event
.parse(Enrich.encodeEvent(payload.data))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,108 @@ object SchemaRegistry {
"additionalProperties": false
}"""

// Defined on Iglu Central
benjben marked this conversation as resolved.
Show resolved Hide resolved
val payloadData: SelfDescribingSchema[Json] =
json"""{
"$$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#",
"description": "Schema for a Snowplow payload",
"self": {
"vendor": "com.snowplowanalytics.snowplow",
"name": "payload_data",
"format": "jsonschema",
"version": "1-0-4"
},
"type": "array",
"items":{
"type": "object",
"properties": {
"tna": {"type":"string"},
"aid": {"type":"string"},
"p": {"type":"string"},
"dtm": {"type":"string"},
"tz": {"type":"string"},
"e": {"type":"string"},
"tid": {"type":"string"},
"eid": {"type":"string"},
"tv": {"type":"string"},
"duid": {"type":"string"},
"nuid": {"type":"string"},
"uid": {"type":"string"},
"vid": {"type":"string"},
"ip": {"type":"string"},
"res": {"type":"string"},
"url": {"type":"string"},
"page": {"type":"string"},
"refr": {"type":"string"},
"fp": {"type":"string"},
"ctype": {"type":"string"},
"cookie": {"type":"string"},
"lang": {"type":"string"},
"f_pdf": {"type":"string"},
"f_qt": {"type":"string"},
"f_realp": {"type":"string"},
"f_wma": {"type":"string"},
"f_dir": {"type":"string"},
"f_fla": {"type":"string"},
"f_java": {"type":"string"},
"f_gears": {"type":"string"},
"f_ag": {"type":"string"},
"cd": {"type":"string"},
"ds": {"type":"string"},
"cs": {"type":"string"},
"vp": {"type":"string"},
"mac": {"type":"string"},
"pp_mix": {"type":"string"},
"pp_max": {"type":"string"},
"pp_miy": {"type":"string"},
"pp_may": {"type":"string"},
"ad_ba": {"type":"string"},
"ad_ca": {"type":"string"},
"ad_ad": {"type":"string"},
"ad_uid": {"type":"string"},
"tr_id": {"type":"string"},
"tr_af": {"type":"string"},
"tr_tt": {"type":"string"},
"tr_tx": {"type":"string"},
"tr_sh": {"type":"string"},
"tr_ci": {"type":"string"},
"tr_st": {"type":"string"},
"tr_co": {"type":"string"},
"tr_cu": {"type":"string"},
"ti_id": {"type":"string"},
"ti_sk": {"type":"string"},
"ti_nm": {"type":"string"},
"ti_na": {"type":"string"},
"ti_ca": {"type":"string"},
"ti_pr": {"type":"string"},
"ti_qu": {"type":"string"},
"ti_cu": {"type":"string"},
"sa": {"type":"string"},
"sn": {"type":"string"},
"st": {"type":"string"},
"sp": {"type":"string"},
"se_ca": {"type":"string"},
"se_ac": {"type":"string"},
"se_la": {"type":"string"},
"se_pr": {"type":"string"},
"se_va": {"type":"string"},
"ue_na": {"type":"string"},
"ue_pr": {"type":"string"},
"ue_px": {"type":"string"},
"co": {"type":"string"},
"cx": {"type":"string"},
"ua": {"type":"string"},
"tnuid": {"type":"string"},
"stm": {"type":"string"},
"sid": {"type":"string"},
"ttm": {"type":"string"}
},
"required": ["tv", "p", "e"],
"additionalProperties": false
},
"minItems": 1
}"""

// Defined on Iglu Central
val contexts: SelfDescribingSchema[Json] =
json"""{
Expand Down Expand Up @@ -226,6 +328,36 @@ object SchemaRegistry {
"additionalProperties": false
}"""

// Defined on Iglu Central
val changeForm: SelfDescribingSchema[Json] =
json"""{
"$$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#",
"description": "Schema for a form field's value being changed",
"self": {
"vendor": "com.snowplowanalytics.snowplow",
"name": "change_form",
"format": "jsonschema",
"version": "1-0-0"
},

"type": "object",
"properties": {
"formId": { "type": "string" },
"elementId": { "type": "string" },
"nodeName": { "enum": ["INPUT", "TEXTAREA", "SELECT"] },
"type": {
"enum": ["button", "checkbox", "color", "date", "datetime", "datetime-local", "email", "file", "hidden", "image", "month", "number", "password", "radio", "range", "reset", "search", "submit", "tel", "text", "time", "url", "week"]
},
"elementClasses": {
"type": "array",
"items": { "type": "string" }
},
"value": { "type": ["string", "null"] }
},
"required": ["formId", "elementId", "nodeName", "value"],
"additionalProperties": false
}"""

private[test] implicit def jsonToSchema(json: Json): SelfDescribingSchema[Json] =
SelfDescribingSchema.parse(json).getOrElse(throw new IllegalStateException("InMemory SchemaRegistry JSON cannot be parsed as schema"))
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,15 @@ object TestEnvironment extends CatsIO {
Registry.Config("fs2-enrich embedded test registry", 1, List("com.acme")),
List(
SchemaRegistry.unstructEvent,
SchemaRegistry.payloadData,
SchemaRegistry.contexts,
SchemaRegistry.geolocationContext,
SchemaRegistry.iabAbdRobots,
SchemaRegistry.yauaaContext,
SchemaRegistry.acmeTest,
SchemaRegistry.acmeOutput
SchemaRegistry.acmeOutput,
SchemaRegistry.changeForm,

)
)
val igluClient: Client[IO, Json] =
Expand Down