I’ve jumped into snowplow fresh and (for better or worse) decided we wanted to stream events straight to BigQuery. I’m using the JS tracker along with the Kinesis collector, Kinesis enrichment and the Kinesis S3 sink and things are working well.
From reading the documentation about Iglu and unstructured events, I knew a little about how events got shredded into separate tables joined by a foreign key and I didn’t fancy having to re-implement all that logic so I stuck to using just structured events; all that would be left to do is map the TSV enriched file to a BigQuery schema and load it in.
I then realised that we’re using Google Enhanced Ecommerce tags and they end up as unstructured events after enrichment so now I’m wondering what options I have.
Hey @Shin, this is a really interesting topic. We’ve spent some time looking at BigQuery so hopefully we can help give you some guidance here.
We’ve thought about BigQuery a fair bit, and it’s unlikely that we would port our Redshift shredding code to BigQuery, for two reasons:
We don’t believe BigQuery is as performant at mega-mega JOINs as Redshift
BigQuery supports tables with deeply nested columns, unlike Redshift
For this reason, when we eventually add BigQuery support, we will likely dynamically mutate on a per-company basis the atomic.events table to contain (deeply nested) columns for each of the entities that a given company’s events can contain.
You are right - doing this in a real-time way that works for all users is a big engineering challenge (one which we haven’t prioritised yet). But you have an advantage, which is that you know up-front which entities you want to store in your atomic.events table: the Google Enhanced Ecommerce entities.
You define a BigQuery table definition which extends atomic.events with the Google Enhanced Ecommerce entities
You create an updated Kinesis BigQuery Sink which reads the enriched events, looks for the Google Enhanced Ecommerce entities, and writes out each event to your custom atomic.events table
You open a PR back into Snowplow with your work - we won’t be able to merge it straight in, but it will help inform our thinking on how to generalize the solution for all users and entity types
Thanks Alex - I didn’t know about the BQ sink and your point about deeply nested columns support in BQ was a very good one and helped reach a final solution.
Rather than going down the Kinesis sink route, I had already written an AWS Lambda to read enriched events from S3 and used the Scala SDK to transform the events. As you say, I’m only dealing with enhanced ecommerce events so I know what the json keys are in the flattened json which made writing the BQ schema ahead of time possible; I went a bit further and moved the values into fields that made a bit more sense:
public static String flattenGoogleEnhancedEcommerceRow(String json) {
JsonParser jsonParser = new JsonParser();
HashMap<String,String> ecommerceTypes = new HashMap<String,String>() {{
put("contexts_com_google_analytics_enhanced_ecommerce_action_field_object_1", "action");
put("contexts_com_google_analytics_enhanced_ecommerce_impression_field_object_1", "impression");
put("contexts_com_google_analytics_enhanced_ecommerce_product_field_object_1", "product");
put("contexts_com_google_analytics_enhanced_ecommerce_promo_field_object_1", "promo");
}};
Iterator<String> types = ecommerceTypes.keySet().iterator();
JsonObject rootObject = jsonParser.parse(json).getAsJsonObject();
// Move each of the ecommerce types from the contexts field under ecomm where it will appear nested in BQ
while(types.hasNext()) {
String type = types.next();
String prefix = ecommerceTypes.get(type);
if (rootObject.has(type)) {
JsonElement typeJson = rootObject.get(type);
getEcommNode(rootObject).add(prefix, typeJson);
rootObject.remove(type);
}
}
// Flatten the action too
String action = "unstruct_event_com_google_analytics_enhanced_ecommerce_action_1";
if (rootObject.has(action)) {
getEcommNode(rootObject).add("action_name", rootObject.getAsJsonObject(action).get("action"));
rootObject.remove(action);
}
return rootObject.toString();
}
private static JsonObject getEcommNode(JsonObject rootObject) {
if (!rootObject.has("ecomm")) {
rootObject.add("ecomm", new JsonObject());
}
return rootObject.getAsJsonObject("ecomm");
}
Then we get back a json that we can stream to BQ. We’re still in the testing phase but this is probably what we’ll stick to.
One thing you may need to be mindful with when using the S3 trigger with Lambda is that Lambda employs at least once semantics. This means it’s possible to potentially insert events from the same S3 file more than once into BQ (unless you’re deduplicating somehow before this).
Thanks @Shin, @mike, definitely an interesting discussion.
Right - given that you have a fixed event set, as you say a one-time BQ schema was possible.
But for the general use case, we have the challenge of mutating the atomic.events table dynamically to add in extra JSONs as they are observed within the event stream. It’s highly likely that we’ll need some kind of singleton process to do this.
For us I suspect instead we are going to have to do this:
( Check micro-batch for new JSONs
( Modify table to include new JSONs
( Load micro-batch into modified table
--------------------------------------
( Check micro-batch for new JSONs
( Modify table to include new JSONs
( Load micro-batch into modified table
--------------------------------------
Repeat...
To put it another way, we can’t parallelize the queue of table modifications we will need to make…
So that sounds like a showstopper to me. The exciting thing about BigQuery support in Snowplow will be when we dynamically adapt an atomic.events table on a per-installation basis, based on the specific entities (self-describing events, contexts) which are seen in the given event stream (@mike that’s what I mean by “dynamic extra JSON”)…
Doh - I stayed up late watching the Olympics and clearly wasn’t thinking straight
https://cloud.google.com/bigquery/streaming-data-into-bigquery#template-tables could be another idea. So in each batch, we generate a template, insert (and wait) if it doesn’t already exist, then insert our data using the template as a suffix. Streaming data ends up in a table called baseName+templateName so there shouldn’t be a problem with conflicting tables - you get a new one for every new json/schema.
Another thing I’m thinking is that nothing in the event stream should be unexpected since everything has to be validated by Iglu further upstream. So we could generate everything upfront according to what’s in Iglu as part of a build/deployment. Admittedly not exciting and not very dynamic.
Interesting idea, need to take a look at template tables!
That wouldn’t normally work - an event’s constituent entities can’t be guaranteed from a snapshot of a single Iglu repository at a point in time. All it takes is for a company to start collecting SendGrid webhooks (say) after this events table has been pre-defined and the table will no longer be sufficient…