diff --git a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/enrich/pubsub/Enrich.scala b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/enrich/pubsub/Enrich.scala index 0ee5a099e..af258b212 100644 --- a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/enrich/pubsub/Enrich.scala +++ b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/enrich/pubsub/Enrich.scala @@ -144,7 +144,7 @@ object Enrich { } def sinkBad[F[_]: Monad](env: Environment[F], bad: BadRow): F[Unit] = - env.metrics.badCount >> env.bad(bad.compact.getBytes(UTF_8)) + env.metrics.badCount >> env.bad(badRowResize(bad)) def sinkGood[F[_]: Concurrent: Parallel](env: Environment[F], enriched: EnrichedEvent): F[Unit] = serializeEnriched(enriched) match { @@ -181,6 +181,27 @@ object Enrich { def sinkResult[F[_]: Concurrent: Parallel](env: Environment[F])(result: Validated[BadRow, EnrichedEvent]): F[Unit] = result.fold(sinkBad(env, _), sinkGood(env, _)) + /** + * Check if plain bad row (such as `enrichment_failure`) exceeds the `MaxRecordSize` + * If it does - turn into size violation with trimmed + */ + def badRowResize(badRow: BadRow): Array[Byte] = { + val asStr = badRow.compact + val originalBytes = asStr.getBytes(UTF_8) + val size = originalBytes.size + if (size > MaxRecordSize) { + val msg = s"event failed enrichment, but resulting bad row exceeds allowed size $MaxRecordSize" + BadRow + .SizeViolation( + Enrich.processor, + Failure.SizeViolation(Instant.now(), MaxRecordSize, size, msg), + BadRowPayload.RawPayload(asStr.take(MaxErrorMessageSize)) + ) + .compact + .getBytes(UTF_8) + } else originalBytes + } + /** * The maximum size of a serialized payload that can be written to pubsub. *