How to get structured event data without a separate enrich step?

I’ve got a snowplow tracker pushing custom structured events to a collector on GCP that’s sending those events to a pubsub topic/subscription. Where can I find basic documentation or examples on how to parse the data in those pubsub messages in a dataflow job?

Specifically, I’m hoping to do this without setting up separate jobs/deployments for “validate” and “enrich” steps (because I don’t need to do any enrichment - just need to parse the metadata and use it to kick off some other workflows). It seems like I should be able to use some of the code from scala-common-enrich to deserialize and parse my events myself, but there’s no documentation on how to use any of the classes in that library.

In general, am I even thinking about this the right way? Or is setting up a pre-built “Enrich” step (separate from my actual data processing) really the only correct way to use snowplow? If that’s the case, where can I find docs on the data types/structures that come out of an enrich step? Feels like there’s a ton of basic usage direction that’s either hidden or missing.

In general it’s expected for consumers to subscribe to the enriched topic rather than the raw pubsub topic. Although it is possible to read from the raw topic directly, the data hasn’t been validated or parsed at this stage so it’s utility may be a bit limited compared to the enriched format.

If you do want to read from raw you can look at something like the ThriftLoader in scala-common-enrich. Each message in the raw pubsub stream is Thrift encoded so it should be simple enough to decode using the CollectorPayload schema - though you obviously won’t get the validation / enrichment benefits if doing so.

Most of the time the pipeline is setup with collection + enrichment + storage targets (optionally). If you are reading off the enriched pubsub topic you’ll get events in a TSV format which can be transformed into JSON using one of the Analytics SDKs.

1 Like

@mike awesome, thanks so much for this response, this was helpful for me to get a better mental model of how we should be thinking of the different steps in our snowplow pipeline. Basically seems like the “enrichment” step should (generally) be considered a prereq for any data processing.

Seems like it probably isn’t worth trying to reimplement or rerun that deserialization myself, just in case there are any major changes to raw data formats or the enrichment process in the future. Might as well let snowplow take care of both of those.

And sweet, didn’t run into the analytics SDKs during any of my searching yesterday, but those look like they should take care of getting enriched event data into useful in-memory structs.

Thanks again - really appreciate the help!