Skip to content

Commit

Permalink
Fix everything
Browse files Browse the repository at this point in the history
  • Loading branch information
pondzix committed Feb 14, 2024
1 parent 99ad846 commit e67e7a6
Show file tree
Hide file tree
Showing 35 changed files with 145 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ object S3Client {

def mk[F[_]: Async]: Resource[F, Client[F]] =
for {
s3Client <- Resource.fromAutoCloseable(Sync[F].delay(S3AsyncClient.builder().region(getRegion).build()))
s3Client <- Resource.fromAutoCloseable(Sync[F].delay(S3AsyncClient.builder().region(getRegion()).build()))
store <- Resource.eval(S3Store.builder[F](s3Client).build.toEither.leftMap(_.head).pure[F].rethrow)
} yield new Client[F] {
def canDownload(uri: URI): Boolean =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.thrift.TSerializer

import java.util.Base64

import com.snowplowanalytics.iglu.core.{ SelfDescribingData, SchemaKey, SchemaVer }
import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData}
import com.snowplowanalytics.iglu.core.circe.CirceIgluCodecs._

import com.snowplowanalytics.snowplow.enrich.common.loaders.CollectorPayload
Expand All @@ -40,7 +40,9 @@ object CollectorPayloadGen {
generateRaw(nbGoodEvents, nbBadRows).map(_.toThrift).map(new TSerializer().serialize)

def generateRaw[F[_]: Sync](nbGoodEvents: Long, nbBadRows: Long): Stream[F, CollectorPayload] =
Stream.repeatEval(runGen(collectorPayloadGen(true))).take(nbGoodEvents) ++ Stream.repeatEval(runGen(collectorPayloadGen(false))).take(nbBadRows)
Stream.repeatEval(runGen(collectorPayloadGen(true))).take(nbGoodEvents) ++ Stream
.repeatEval(runGen(collectorPayloadGen(false)))
.take(nbBadRows)

private def collectorPayloadGen(valid: Boolean): Gen[CollectorPayload] =
for {
Expand Down Expand Up @@ -74,46 +76,74 @@ object CollectorPayloadGen {
aid <- Gen.const("enrich-kinesis-integration-tests").withKey("aid")
e <- Gen.const("ue").withKey("e")
tv <- Gen.oneOf("scala-tracker_1.0.0", "js_2.0.0", "go_1.2.3").withKey("tv")
uePx <-
if(valid)
ueGen.map(_.toString).map(str => base64Encoder.encodeToString(str.getBytes)).withKey("ue_px")
else
Gen.const("foo").withKey("ue_px")
uePx <- if (valid)
ueGen.map(_.toString).map(str => base64Encoder.encodeToString(str.getBytes)).withKey("ue_px")
else
Gen.const("foo").withKey("ue_px")
} yield SelfDescribingData(
SchemaKey("com.snowplowanalytics.snowplow", "payload_data", "jsonschema", SchemaVer.Full(1,0,4)),
List(asObject(List(p, aid, e, uePx, tv))).asJson
SchemaKey("com.snowplowanalytics.snowplow", "payload_data", "jsonschema", SchemaVer.Full(1, 0, 4)),
List(asObject(List(p, aid, e, uePx, tv))).asJson
).asJson.toString

private def ueGen =
for {
sdj <- Gen.oneOf(changeFormGen, clientSessionGen)
} yield SelfDescribingData(
SchemaKey("com.snowplowanalytics.snowplow", "unstruct_event", "jsonschema", SchemaVer.Full(1,0,0)),
SchemaKey("com.snowplowanalytics.snowplow", "unstruct_event", "jsonschema", SchemaVer.Full(1, 0, 0)),
sdj.asJson
).asJson


private def changeFormGen =
for {
formId <- strGen(32, Gen.alphaNumChar).withKey("formId")
formId <- strGen(32, Gen.alphaNumChar).withKey("formId")
elementId <- strGen(32, Gen.alphaNumChar).withKey("elementId")
nodeName <- Gen.oneOf(List("INPUT", "TEXTAREA", "SELECT")).withKey("nodeName")
`type` <- Gen.option(Gen.oneOf(List("button", "checkbox", "color", "date", "datetime", "datetime-local", "email", "file", "hidden", "image", "month", "number", "password", "radio", "range", "reset", "search", "submit", "tel", "text", "time", "url", "week"))).withKeyOpt("type")
value <- Gen.option(strGen(16, Gen.alphaNumChar)).withKeyNull("value")
nodeName <- Gen.oneOf(List("INPUT", "TEXTAREA", "SELECT")).withKey("nodeName")
`type` <- Gen
.option(
Gen.oneOf(
List(
"button",
"checkbox",
"color",
"date",
"datetime",
"datetime-local",
"email",
"file",
"hidden",
"image",
"month",
"number",
"password",
"radio",
"range",
"reset",
"search",
"submit",
"tel",
"text",
"time",
"url",
"week"
)
)
)
.withKeyOpt("type")
value <- Gen.option(strGen(16, Gen.alphaNumChar)).withKeyNull("value")
} yield SelfDescribingData(
SchemaKey("com.snowplowanalytics.snowplow", "change_form", "jsonschema", SchemaVer.Full(1,0,0)),
SchemaKey("com.snowplowanalytics.snowplow", "change_form", "jsonschema", SchemaVer.Full(1, 0, 0)),
asObject(List(formId, elementId, nodeName, `type`, value))
)

private def clientSessionGen =
for {
userId <- Gen.uuid.withKey("userId")
sessionId <- Gen.uuid.withKey("sessionId")
sessionIndex <- Gen.choose(0, 2147483647).withKey("sessionIndex")
userId <- Gen.uuid.withKey("userId")
sessionId <- Gen.uuid.withKey("sessionId")
sessionIndex <- Gen.choose(0, 2147483647).withKey("sessionIndex")
previousSessionId <- Gen.option(Gen.uuid).withKeyNull("previousSessionId")
storageMechanism <- Gen.oneOf(List("SQLITE", "COOKIE_1", "COOKIE_3", "LOCAL_STORAGE", "FLASH_LSO")).withKey("storageMechanism")
storageMechanism <- Gen.oneOf(List("SQLITE", "COOKIE_1", "COOKIE_3", "LOCAL_STORAGE", "FLASH_LSO")).withKey("storageMechanism")
} yield SelfDescribingData(
SchemaKey("com.snowplowanalytics.snowplow", "client_session", "jsonschema", SchemaVer.Full(1,0,1)),
SchemaKey("com.snowplowanalytics.snowplow", "client_session", "jsonschema", SchemaVer.Full(1, 0, 1)),
asObject(List(userId, sessionId, sessionIndex, previousSessionId, storageMechanism))
)

Expand Down Expand Up @@ -159,7 +189,7 @@ object CollectorPayloadGen {

implicit class GenOps[A](gen: Gen[A]) {
def withKey[B](name: String)(implicit enc: Encoder[A]): Gen[Option[(String, Json)]] =
gen.map { a => Some((name -> a.asJson)) }
gen.map(a => Some((name -> a.asJson)))
}

implicit class GenOptOps[A](gen: Gen[Option[A]]) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import java.net.URI

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import scala.util.control.NonFatal

import cats.Applicative
import cats.implicits._
Expand Down Expand Up @@ -259,8 +258,7 @@ object Assets {
def worthRetrying[F[_]: Applicative](e: Throwable): F[Boolean] =
e match {
case _: Clients.RetryableFailure => Applicative[F].pure(true)
case _: IllegalArgumentException => Applicative[F].pure(false)
case NonFatal(_) => Applicative[F].pure(false)
case _ => Applicative[F].pure(false)
}

def onError[F[_]: Sync](error: Throwable, details: RetryDetails): F[Unit] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ object Enrich {
payload.fold(_.asJson.noSpaces, _.map(_.toBadRowPayload.asJson.noSpaces).getOrElse("None"))

/** Log an error, turn the problematic `CollectorPayload` into `BadRow` and notify Sentry if configured */
def sendToSentry[F[_]: Sync: Clock](
def sendToSentry[F[_]: Sync](
original: Array[Byte],
sentry: Option[SentryClient],
processor: Processor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ import cats.implicits._

import fs2.Stream

import scala.concurrent.ExecutionContext

import cats.effect.kernel.{Async, Resource, Sync}
import cats.effect.ExitCode

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ package com.snowplowanalytics.snowplow.enrich.common.fs2.io

import java.nio.file.{Files, Path}

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.io.{Source => SSource}

import cats.data.EitherT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
*/
package com.snowplowanalytics.snowplow.enrich.common.fs2.blackbox

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._

import org.specs2.mutable.Specification

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ trait Adapter {
formatter: FormatterFunc,
platform: String
): RawEventParameters = {
val params = formatter(parameters - ("nuid", "aid", "cv", "p"))
val params = formatter(parameters -- List("nuid", "aid", "cv", "p"))
val json = toUnstructEvent(SelfDescribingData(schema, params)).noSpaces
buildUnstructEventParams(tracker, platform, parameters, json)
}
Expand All @@ -182,7 +182,7 @@ trait Adapter {
"p" -> parameters.getOrElse("p", Option(platform)), // Required field
"ue_pr" -> Option(json)
) ++
parameters.filterKeys(AcceptedQueryParameters)
parameters.view.filterKeys(AcceptedQueryParameters).toMap

/**
* Creates a Snowplow unstructured event by nesting the provided JValue in a self-describing
Expand Down Expand Up @@ -375,7 +375,7 @@ trait Adapter {
*/
private[registry] def camelCase(snakeOrDash: String) =
snakeCaseOrDashTokenCapturingRegex.replaceAllIn(
Character.toLowerCase(snakeOrDash.charAt(0)) + snakeOrDash.substring(1),
Character.toString(Character.toLowerCase(snakeOrDash.charAt(0))) + snakeOrDash.substring(1),
m => m.group(1).capitalize
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ package com.snowplowanalytics.snowplow.enrich.common.adapters.registry

import scala.annotation.tailrec

import cats.{Applicative, Functor, Monad}
import cats.{Applicative, Monad}
import cats.data.{NonEmptyList, ValidatedNel}
import cats.implicits._

Expand Down Expand Up @@ -531,7 +531,7 @@ case class GoogleAnalyticsAdapter(schemas: GoogleAnalyticsSchemas) extends Adapt
)
schemaVal = lookupSchema(
hitType.some,
unstructEventData.mapValues(_.schemaKey)
unstructEventData.view.mapValues(_.schemaKey).toMap
).toValidatedNel
simpleContexts = buildContexts(params, contextData, fieldToSchemaMap)
compositeContexts = buildCompositeContexts(
Expand Down Expand Up @@ -675,7 +675,9 @@ case class GoogleAnalyticsAdapter(schemas: GoogleAnalyticsSchemas) extends Adapt
// composite params have digits in their key
composite <- originalParams
.collect { case (k, Some(v)) => (k, v) }
.view
.filterKeys(k => k.exists(_.isDigit))
.toMap
.asRight
brokenDown <- composite.toList.sorted.map {
case (k, v) => breakDownCompField(k, v, indicator)
Expand All @@ -684,7 +686,9 @@ case class GoogleAnalyticsAdapter(schemas: GoogleAnalyticsSchemas) extends Adapt
// we additionally make sure we have a rectangular dataset
grouped = (partitioned._2 ++ removeConsecutiveDuplicates(partitioned._1)).flatten
.groupBy(_._1)
.view
.mapValues(_.map(_._2))
.toMap
translated <- {
val m = grouped
.foldLeft(
Expand Down Expand Up @@ -821,7 +825,7 @@ case class GoogleAnalyticsAdapter(schemas: GoogleAnalyticsSchemas) extends Adapt
case head => head :: transpose(l.collect { case _ :: tail => tail })
}

private def traverseMap[G[_]: Functor: Applicative, K, V](m: Map[K, G[V]]): G[Map[K, V]] =
private def traverseMap[G[_]: Applicative, K, V](m: Map[K, G[V]]): G[Map[K, V]] =
m.toList
.traverse {
case (name, vnel) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ package com.snowplowanalytics.snowplow.enrich.common.adapters.registry
import java.net.URI
import java.nio.charset.StandardCharsets.UTF_8

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.util.{Try, Success => TS, Failure => TF}

import cats.Monad
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ package com.snowplowanalytics.snowplow.enrich.common.adapters.registry
import java.net.URI
import java.nio.charset.StandardCharsets.UTF_8

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.util.{Try, Success => TS, Failure => TF}

import cats.Monad
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ package com.snowplowanalytics.snowplow.enrich.common.adapters.registry
import java.net.URI
import java.nio.charset.StandardCharsets.UTF_8

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.util.{Try, Success => TS, Failure => TF}

import cats.Monad
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ object RedirectAdapter extends Adapter {
case (None, Some(Some(co))) if co == "" => newCo.asRight
case (None, Some(Some(co))) => addToExistingCo(json, co).map(str => Map("co" -> str))
case (Some(Some(cx)), _) => addToExistingCx(json, cx).map(str => Map("cx" -> str))
case other => throw new IllegalStateException(s"Illegal state: $other")
}
} else
// Add URI redirect as an unstructured event
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ object EnrichmentManager {
def setCollectorTstamp(event: EnrichedEvent, timestamp: Option[DateTime]): Either[FailureDetails.EnrichmentFailure, Unit] =
EE.formatCollectorTstamp(timestamp).map { t =>
event.collector_tstamp = t
().asRight
()
}

def setUseragent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ final case class AnonIpEnrichment(ipv4Octets: AnonIPv4Octets.AnonIPv4Octets, ipv
.map {
case _: Inet4Address => anonymizeIpV4(ip)
case ipv6: Inet6Address => anonymizeIpV6(ipv6.getHostAddress)
case _ => throw new IllegalStateException(s"Illegal state")
}
.getOrElse(tryAnonymizingInvalidIp(ip))
}.orNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
*/
package com.snowplowanalytics.snowplow.enrich.common.enrichments.registry

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._

import cats.data.ValidatedNel
import cats.syntax.either._
Expand Down Expand Up @@ -64,7 +64,7 @@ object YauaaEnrichment extends ParseableEnrichment {
s match {
case _ if s.isEmpty => s
case _ if s.length == 1 => s.toLowerCase
case _ => s.charAt(0).toLower + s.substring(1)
case _ => Character.toString(s.charAt(0).toLower) + s.substring(1)
}
}

Expand Down Expand Up @@ -112,7 +112,9 @@ final case class YauaaEnrichment(cacheSize: Option[Int]) extends Enrichment {
parsedUA.getAvailableFieldNamesSorted.asScala
.map(field => decapitalize(field) -> parsedUA.getValue(field))
.toMap
.view
.filterKeys(validFields)
.toMap
}

/** Yauaa 7.x added many new fields which are not in the 1-0-4 schema */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ object ApiRequestEnrichment extends ParseableEnrichment {
UUID.nameUUIDFromBytes(contentKey.getBytes).toString
}

def create[F[_]: Async: Clock](
def create[F[_]: Async](
schemaKey: SchemaKey,
inputs: List[Input],
api: HttpApi,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
*/
package com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.pii

import scala.collection.JavaConverters._
import scala.collection.mutable.MutableList
import scala.jdk.CollectionConverters._
import scala.collection.mutable.ListBuffer

import cats.data.ValidatedNel
import cats.implicits._
Expand Down Expand Up @@ -216,7 +216,7 @@ final case class PiiJson(
val jObjectMap = obj.toMap
val contextMapped = jObjectMap.map(mapContextTopFields(_, strategy))
(
Json.obj(contextMapped.mapValues(_._1).toList: _*),
Json.obj(contextMapped.view.mapValues(_._1).toList: _*),
contextMapped.values.flatMap(_._2)
)
}
Expand Down Expand Up @@ -282,7 +282,7 @@ final case class PiiJson(
): (Json, List[JsonModifiedField]) = {
val objectNode = io.circe.jackson.mapper.valueToTree[ObjectNode](json)
val documentContext = JJsonPath.using(JsonPathConf).parse(objectNode)
val modifiedFields = MutableList[JsonModifiedField]()
val modifiedFields = ListBuffer[JsonModifiedField]()
Option(documentContext.read[AnyRef](jsonPath)) match { // check that json object not null
case None => (jacksonToCirce(documentContext.json[JsonNode]()), modifiedFields.toList)
case _ =>
Expand All @@ -297,7 +297,7 @@ final case class PiiJson(

private final case class ScrambleMapFunction(
strategy: PiiStrategy,
modifiedFields: MutableList[JsonModifiedField],
modifiedFields: ListBuffer[JsonModifiedField],
fieldName: String,
jsonPath: String,
schema: String
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,6 @@ object DbExecutor {
if (intMap.keys.size == placeholderCount) true else false
}

def getConnection[F[_]: Monad: DbExecutor](dataSource: DataSource): Resource[F, Connection] =
def getConnection[F[_]: DbExecutor](dataSource: DataSource): Resource[F, Connection] =
DbExecutor[F].getConnection(dataSource)
}
Loading

0 comments on commit e67e7a6

Please sign in to comment.