Skip to content

Commit

Permalink
Make parsing errors type-safe (closes #75)
Browse files Browse the repository at this point in the history
  • Loading branch information
aldemirenes committed Aug 15, 2019
1 parent 8e27172 commit 84fb81d
Show file tree
Hide file tree
Showing 8 changed files with 499 additions and 65 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* Copyright (c) 2016-2019 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.analytics.scalasdk

import cats.data.NonEmptyList
import cats.syntax.either._
import io.circe._
import io.circe.syntax._
import com.snowplowanalytics.snowplow.analytics.scalasdk.decode.Key

/**
* Represents error during parsing TSV event
*/
sealed trait ParsingError

object ParsingError {

/**
* Represents error which given line is not a TSV
*/
final case object NonTSVPayload extends ParsingError

/**
* Represents error which number of given columns is not equal
* to number of expected columns
* @param columnCount mismatched column count in the event
*/
final case class ColumnNumberMismatch(columnCount: Int) extends ParsingError

/**
* Represents error which encountered while decoding values in row
* @param errors Infos of errors which encountered during decoding
*/
final case class RowDecodingError(errors: NonEmptyList[RowDecodingErrorInfo]) extends ParsingError

/**
* Gives info about the reasons of the errors during decoding value in row
*/
sealed trait RowDecodingErrorInfo

object RowDecodingErrorInfo {
/**
* Represents cases where value in a field is not valid
* e.g invalid timestamp, invalid UUID
* @param key key of field
* @param value value of field
* @param message error message
*/
final case class InvalidValue(key: Key, value: String, message: String) extends RowDecodingErrorInfo

/**
* Represents cases which getting error is not expected while decoding row
* For example, while parsing the list of tuples to HList in the
* RowDecoder, getting more or less values than expected is impossible
* due to type check. Therefore 'UnexpectedRowDecodingError' is returned for
* these cases. These errors can be ignored since they are not possible to get
* @param error error message
*/
final case class UnexpectedRowDecodingError(error: String) extends RowDecodingErrorInfo

implicit val analyticsSdkRowDecodingErrorInfoCirceEncoder: Encoder[RowDecodingErrorInfo] =
Encoder.instance {
case InvalidValue(key, value, message) =>
Json.obj(
"type" := "InvalidValue",
"key" := key,
"value" := value,
"message" := message
)
case UnexpectedRowDecodingError(error: String) =>
Json.obj(
"type" := "UnexpectedRowDecodingError",
"error" := error
)
}

implicit val analyticsSdkRowDecodingErrorInfoCirceDecoder: Decoder[RowDecodingErrorInfo] =
Decoder.instance { cursor =>
for {
errorType <- cursor.downField("type").as[String]
result <- errorType match {
case "InvalidValue" =>
for {
key <- cursor.downField("key").as[Key]
value <- cursor.downField("value").as[String]
message <- cursor.downField("message").as[String]
} yield InvalidValue(key, value, message)

case "UnexpectedRowDecodingError" =>
cursor
.downField("error")
.as[String]
.map(UnexpectedRowDecodingError)
}
} yield result
}

implicit val analyticsSdkKeyCirceEncoder: Encoder[Key] =
Encoder.instance(_.toString.stripPrefix("'").asJson)

implicit val analyticsSdkKeyCirceDecoder: Decoder[Key] =
Decoder.instance(_.as[String].map(Symbol(_)))

}

implicit val analyticsSdkParsingErrorCirceEncoder: Encoder[ParsingError] =
Encoder.instance {
case NonTSVPayload =>
Json.obj("type" := "NonTSVPayload")
case ColumnNumberMismatch(columnCount) =>
Json.obj(
"type" := "ColumnNumberMismatch",
"columnCount" := columnCount
)
case RowDecodingError(errors) =>
Json.obj(
"type" := "RowDecodingError",
"errors" := errors.asJson
)
}

implicit val analyticsSdkParsingErrorCirceDecoder: Decoder[ParsingError] =
Decoder.instance { cursor =>
for {
error <- cursor.downField("type").as[String]
result <- error match {
case "NonTSVPayload" =>
NonTSVPayload.asRight
case "ColumnNumberMismatch" =>
cursor
.downField("columnCount")
.as[Int]
.map(ColumnNumberMismatch)
case "RowDecodingError" =>
cursor
.downField("errors")
.as[NonEmptyList[RowDecodingErrorInfo]]
.map(RowDecodingError)
case _ =>
DecodingFailure(
s"Error type $error cannot be recognized as Analytics SDK Parsing Error",
cursor.history).asLeft
}
} yield result
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ package com.snowplowanalytics.snowplow.analytics.scalasdk.decode
import shapeless._
import shapeless.ops.record._
import shapeless.ops.hlist._

import Parser._
import cats.data.{NonEmptyList, Validated}
import com.snowplowanalytics.snowplow.analytics.scalasdk.ParsingError.{ColumnNumberMismatch, NonTSVPayload, RowDecodingError}

private[scalasdk] trait Parser[A] extends Serializable {
/** Heterogeneous TSV values */
Expand All @@ -33,9 +33,16 @@ private[scalasdk] trait Parser[A] extends Serializable {

def parse(row: String): DecodeResult[A] = {
val values = row.split("\t", -1)
val zipped = knownKeys.zipAll(values, UnknownKeyPlaceholder, ValueIsMissingPlaceholder)
val decoded = decoder(zipped)
decoded.map { decodedValue => generic.from(decodedValue) }
if (values.length == 1) {
Validated.Invalid(NonTSVPayload)
}
else if (values.length != knownKeys.length) {
Validated.Invalid(ColumnNumberMismatch(values.length))
} else {
val zipped = knownKeys.zip(values)
val decoded = decoder(zipped).leftMap(e => RowDecodingError(e))
decoded.map { decodedValue => generic.from(decodedValue) }
}
}
}

Expand All @@ -61,11 +68,6 @@ object Parser {
}
}

/** Key name that will be used if TSV has more columns than a class */
val UnknownKeyPlaceholder = 'UnknownKey
/** Value that will be used if class has more fields than a TSV */
val ValueIsMissingPlaceholder = "VALUE IS MISSING"

/** Derive a TSV parser for `A` */
private[scalasdk] def deriveFor[A]: DeriveParser[A] =
new DeriveParser[A] {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,40 +16,40 @@ import shapeless._
import cats.syntax.validated._
import cats.syntax.either._
import cats.syntax.apply._
import com.snowplowanalytics.snowplow.analytics.scalasdk.ParsingError.RowDecodingErrorInfo.UnexpectedRowDecodingError

/**
* Type class to decode List of keys-value pairs into HList
* Keys derived from original class of HList,
* Values are actual TSV columns
*/
private[scalasdk] trait RowDecoder[L <: HList] extends Serializable {
def apply(row: List[(Key, String)]): DecodeResult[L]
def apply(row: List[(Key, String)]): RowDecodeResult[L]
}

private[scalasdk] object RowDecoder {
import HList.ListCompat._

def apply[L <: HList](implicit fromRow: RowDecoder[L]): RowDecoder[L] = fromRow

def fromFunc[L <: HList](f: List[(Key, String)] => DecodeResult[L]): RowDecoder[L] =
def fromFunc[L <: HList](f: List[(Key, String)] => RowDecodeResult[L]): RowDecoder[L] =
new RowDecoder[L] {
def apply(row: List[(Key, String)]) = f(row)
}

/** Parse TSV row into HList */
private def parse[H: ValueDecoder, T <: HList: RowDecoder](row: List[(Key, String)]) =
private def parse[H: ValueDecoder, T <: HList: RowDecoder](row: List[(Key, String)]): RowDecodeResult[H :: T] =
row match {
case h :: t =>
val hv: DecodeResult[H] =
ValueDecoder[H].parse(h).leftMap(_._2).toValidatedNel
val tv = RowDecoder[T].apply(t)
val hv: RowDecodeResult[H] = ValueDecoder[H].parse(h).toValidatedNel
val tv: RowDecodeResult[T] = RowDecoder[T].apply(t)
(hv, tv).mapN { _ :: _ }
case Nil => "Not enough values, format is invalid".invalidNel
case Nil => UnexpectedRowDecodingError("Not enough values, format is invalid").invalidNel
}

implicit val hnilFromRow: RowDecoder[HNil] = fromFunc {
implicit def hnilFromRow: RowDecoder[HNil] = fromFunc {
case Nil => HNil.validNel
case rows => s"No more values expected, following provided: ${rows.map(_._2).mkString(", ")}".invalidNel
case rows => UnexpectedRowDecodingError(s"No more values expected, following provided: ${rows.map(_._2).mkString(", ")}").invalidNel
}

implicit def hconsFromRow[H: ValueDecoder, T <: HList: RowDecoder]: RowDecoder[H :: T] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import io.circe.{Error, Json}
// This library
import com.snowplowanalytics.snowplow.analytics.scalasdk.Common.{ContextsCriterion, UnstructEventCriterion}
import com.snowplowanalytics.snowplow.analytics.scalasdk.SnowplowEvent.{Contexts, UnstructEvent}
import com.snowplowanalytics.snowplow.analytics.scalasdk.ParsingError.RowDecodingErrorInfo
import com.snowplowanalytics.snowplow.analytics.scalasdk.ParsingError.RowDecodingErrorInfo._

private[decode] trait ValueDecoder[A] {
def parse(column: (Key, String)): DecodedValue[A]
Expand All @@ -50,7 +52,7 @@ private[decode] object ValueDecoder {
implicit final val stringColumnDecoder: ValueDecoder[String] =
fromFunc[String] {
case (key, value) =>
if (value.isEmpty) (key, s"Field $key cannot be empty").asLeft else value.asRight
if (value.isEmpty) InvalidValue(key, value, s"Field $key cannot be empty").asLeft else value.asRight
}

implicit final val stringOptionColumnDecoder: ValueDecoder[Option[String]] =
Expand All @@ -67,21 +69,21 @@ private[decode] object ValueDecoder {
value.toInt.some.asRight
} catch {
case _: NumberFormatException =>
(key, s"Cannot parse key $key with value $value into integer").asLeft
InvalidValue(key, value, s"Cannot parse key $key with value $value into integer").asLeft
}
}

implicit final val uuidColumnDecoder: ValueDecoder[UUID] =
fromFunc[UUID] {
case (key, value) =>
if (value.isEmpty)
(key, s"Field $key cannot be empty").asLeft
InvalidValue(key, value, s"Field $key cannot be empty").asLeft
else
try {
UUID.fromString(value).asRight[(Key, String)]
UUID.fromString(value).asRight[RowDecodingErrorInfo]
} catch {
case _: IllegalArgumentException =>
(key, s"Cannot parse key $key with value $value into UUID").asLeft
InvalidValue(key, value, s"Cannot parse key $key with value $value into UUID").asLeft
}
}

Expand All @@ -92,7 +94,7 @@ private[decode] object ValueDecoder {
case "0" => false.some.asRight
case "1" => true.some.asRight
case "" => none[Boolean].asRight
case _ => (key, s"Cannot parse key $key with value $value into boolean").asLeft
case _ => InvalidValue(key, value, s"Cannot parse key $key with value $value into boolean").asLeft
}
}

Expand All @@ -105,22 +107,22 @@ private[decode] object ValueDecoder {
value.toDouble.some.asRight
} catch {
case _: NumberFormatException =>
(key, s"Cannot parse key $key with value $value into double").asLeft
InvalidValue(key, value, s"Cannot parse key $key with value $value into double").asLeft
}
}

implicit final val instantColumnDecoder: ValueDecoder[Instant] =
fromFunc[Instant] {
case (key, value) =>
if (value.isEmpty)
(key, s"Field $key cannot be empty").asLeft
InvalidValue(key, value, s"Field $key cannot be empty").asLeft
else {
val tstamp = reformatTstamp(value)
try {
Instant.parse(tstamp).asRight
} catch {
case _: DateTimeParseException =>
(key, s"Cannot parse key $key with value $value into datetime").asLeft
InvalidValue(key, value, s"Cannot parse key $key with value $value into datetime").asLeft
}
}
}
Expand All @@ -129,50 +131,50 @@ private[decode] object ValueDecoder {
fromFunc[Option[Instant]] {
case (key, value) =>
if (value.isEmpty)
none[Instant].asRight[(Key, String)]
none[Instant].asRight[RowDecodingErrorInfo]
else {
val tstamp = reformatTstamp(value)
try {
Instant.parse(tstamp).some.asRight
} catch {
case _: DateTimeParseException =>
(key, s"Cannot parse key $key with value $value into datetime").asLeft
InvalidValue(key, value, s"Cannot parse key $key with value $value into datetime").asLeft
}
}
}

implicit final val unstructuredJson: ValueDecoder[UnstructEvent] =
fromFunc[UnstructEvent] {
case (key, value) =>
def asLeft(error: Error): (Key, String) = (key, error.show)
def asLeft(error: Error): RowDecodingErrorInfo = InvalidValue(key, value, error.show)
if (value.isEmpty)
UnstructEvent(None).asRight[(Key, String)]
UnstructEvent(None).asRight[RowDecodingErrorInfo]
else
parseJson(value)
.flatMap(_.as[SelfDescribingData[Json]])
.leftMap(asLeft) match {
case Right(SelfDescribingData(schema, data)) if UnstructEventCriterion.matches(schema) =>
data.as[SelfDescribingData[Json]].leftMap(asLeft).map(_.some).map(UnstructEvent.apply)
case Right(SelfDescribingData(schema, _)) =>
(key, s"Unknown payload: ${schema.toSchemaUri}").asLeft[UnstructEvent]
InvalidValue(key, value, s"Unknown payload: ${schema.toSchemaUri}").asLeft[UnstructEvent]
case Left(error) => error.asLeft[UnstructEvent]
}
}

implicit final val contexts: ValueDecoder[Contexts] =
fromFunc[Contexts] {
case (key, value) =>
def asLeft(error: Error): (Key, String) = (key, error.show)
def asLeft(error: Error): RowDecodingErrorInfo = InvalidValue(key, value, error.show)
if (value.isEmpty)
Contexts(List()).asRight[(Key, String)]
Contexts(List()).asRight[RowDecodingErrorInfo]
else
parseJson(value)
.flatMap(_.as[SelfDescribingData[Json]])
.leftMap(asLeft) match {
case Right(SelfDescribingData(schema, data)) if ContextsCriterion.matches(schema) =>
data.as[List[SelfDescribingData[Json]]].leftMap(asLeft).map(Contexts.apply)
case Right(SelfDescribingData(schema, _)) =>
(key, s"Unknown payload: ${schema.toSchemaUri}").asLeft[Contexts]
InvalidValue(key, value, s"Unknown payload: ${schema.toSchemaUri}").asLeft[Contexts]
case Left(error) => error.asLeft[Contexts]
}
}
Expand Down
Loading

0 comments on commit 84fb81d

Please sign in to comment.