-
Notifications
You must be signed in to change notification settings - Fork 38
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Stream Enrich: New Hope #328
Conversation
modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/Main.scala
Outdated
Show resolved
Hide resolved
modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/Main.scala
Outdated
Show resolved
Hide resolved
8ea132b
to
ada7cd5
Compare
352044f
to
15da7b7
Compare
modules/fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/fs2/Payload.scala
Show resolved
Hide resolved
} | ||
.map(enriched => Payload(enriched, row.ack)) | ||
|
||
result.handleErrorWith(sendToSentry[F](row, sentry)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we send the payload
rather than the row
? So that to troubleshoot we don't need to Thrift deserialize it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think sendToSentry
is probably slightly misleading. It does three things:
- Sends an exception to Sentry (we cannot send anything from an event because it can contain PII data
- Creates a
generic_error
bad row - that's why we need arow
- Logs an error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My point is, why not use payload
instead of row
in the generic bad row created? An array of Thrift bytes is not very useful to troubleshoot, compared with a BadRow
(CPFormatViolation) or a CollectorPayload
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, ok makes sense now!
The reason was that our payload is something like ValidatedNel[CPFormatViolation, Option[CollectorPayload]]
, so we don't really have a parsed payload yet. We technicall can pattern-match on it, like:
payload match {
case Validated.Invalid(errors) => // what to do here? why we were trying to process error in a first place
errors // thrift bytes anyway
case Validated.Valid(payload) =>
turnIntoAdapterFailure(payload) // but what if it's not an adapter failure? enrichment failure? then we need a raw event to construct bad row
And also feels weird to produce different kinds of bad row from the same place, so I decided to stick with the most generic one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking of putting something like show"$payload"
in the generic_error
, whatever it contains.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But in that case we won't have a clear way to recover it. I think it's an important promise that whenever you have generic_error
coming from enrich you need to be able to base64 payload in order to recover it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's actually a bug that ThriftLoader.toCollectorPayload
's signature says that it can return multiple bad rows, it can return only one.
So in the generic_error
bad row we could put either the single CPFormatViolation
(recoverable) or the raw extracted CollectorPayload
(recoverable).
ada7cd5
to
dd8ed1b
Compare
cde51dc
to
399ad89
Compare
2cbe8b4
to
e3dfc3e
Compare
399ad89
to
4e27039
Compare
2b8c98f
to
3f425e3
Compare
8df2f4b
to
e80962a
Compare
8b0d3a3
to
cc5d6b1
Compare
437a69c
to
e367370
Compare
Hey @benjben! I adressed all your feedback, added few more tests and couple of tickets (#370 - depends on NH, #371 was also discovered while I was writing tests). If anyone else from @snowplow/com-snowplowanalytics-engineering-datacapability wants to have a look - you're welcome. Otherwise this should be ready. |
9851e1e
to
6e25f3b
Compare
f0acad3
to
a874162
Compare
@@ -0,0 +1,23 @@ | |||
auth = { | |||
type = "Gcp" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Acronyms are generally all-caps. Is there a specific reason to use
Gcp
? - From a user's perspective, it'd be useful if we could see valid values of configuration fields, e.g. is
gcp
orGCP
valid here? Or do we want to rely on user-friendly error messages explaining what's wrong and how to fix?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- It'd be nice to see which fields are optional and and which values are used by default when applicable e.g.
assetsUpdatePeriod
if it is not configured
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spelling was dictated by codecs deriivation (it uses a exact case class name). I decided no to change it for now as Gcp
is the only valid value here, but I agree this is something that should be fixed.
I'll add comments to the config file.
|
||
object State { | ||
|
||
/** Test pair is used in tests to initialize HTTP client */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we have it here if it is used in tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at L92, it seems it isn't used in tests only, could you update this scaladoc?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't notice your comment and posted the last one, still scaladoc needs an update I think
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why production code needs to know anything about test code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good in general! It's a bit to big PR to read it very carefully.
if: ${{ always() }} | ||
run: sbt coveralls | ||
env: | ||
COVERALLS_REPO_TOKEN: ${{ secrets.COVERALLS_REPO_TOKEN }} | ||
- name: Check Scala formatting | ||
if: ${{ always() }} | ||
run: sbt scalafmtCheck |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you change to scalafmtCheckAll
and add scalafmtSbtCheck
, please?
|
||
object State { | ||
|
||
/** Test pair is used in tests to initialize HTTP client */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why production code needs to know anything about test code?
final case class Hash private (s: String) extends AnyVal | ||
|
||
object Hash { | ||
private[this] def fromBytes(bytes: Array[Byte]): Hash = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This small function should have a test
// side-effecting get-set is inherently not thread-safe | ||
// we need to be sure the state.stop is set to true | ||
// before re-initializing enrichments | ||
_ <- Logger[F].info(s"Unpausing enrich stream") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unpausing
-> Resuming
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be good to stick to one, either show
or s
.
bfc2b5f
to
c61fa01
Compare
AssetsRefresh
respect non-2xx responses (retry or give up)blockOn
inenrichWith
develop
and rebase on top of it0
in assets/metrics periods counts as "disabled"