Handling large volumes of duplicated event_ids

Hi all,

We have been using Snowplow for a couple of months and it’s working great.

However, we are having some issues on and off with the shredding step in the batch pipeline. We are seeing enrich take <5 minutes, but shredding failing after > 2 hrs, as here:

Our solution to this is to break down the input into smaller batches and process those manually. This works but feels unnecessary since we are not having especially large volumes of data. The process that failed in the above screenshot was somewhere around 1-2 GB with about 1000 objects (500 if you consider that half the objects are lzo index files). We are currently running an EMR cluster of 4 x r3.2xlarge instances with manual configuration from here. This means we have a total of 244 GB of RAM in the core instances (and another 61 in the master). We run this 4 times per day where usually the EMR cluster takes 45 minutes to run and the (incremental) models take another 45-50 minutes.

The EMR utilisation looks like this for a failing run:

And the failure is memory-related, usually
Java OutOfMemoryError
Container exceeded virtual memory error.

We have traced the moment of failure to a specific line in the shredder code:
Line 357

Which groups by event_id and event_fingerprint.

Going deeper, we looked into our raw tracked events and did some analysis on them. There we found quite some traffic by bots (we are quite heavily indexed by Google and others). So we have event_ids showing up in ~500 events per 10 MB part file. It is a known issue (here and here) that bots are notorious for having poor random number generators. Not much to do about this fact unfortunately.

So, what I am interested in learning is how we can handle these events better? It feels like we are throwing way too much infrastructure at the problem (244 + 61 GB RAM in EMR for 1-2 GB input data still fails with OOM errors) and that we should be able to run this more efficiently. Or is the only solution to run the pipeline more frequently?

Hey Patrik,

You’re running into the typical skewed group by issue, a lot of your events end up being dealt with by the same executor and your shred step is being delayed because of this executor being the only straggler.

There are two approaches to this issue:

  • if you care about the events that have been generated by bots: you’ll need beefier executors, you can of course have way less of them since the performance of your job will be the performance of the executor which receives those events
  • if you don’t care about those events: you can filter them out through a js enrichment

Hope this helps.

Hey Ben,

Of course! You are completely right. I never thought of that.

So in order to beef up the executors, we would be either (a) keeping the cluster constant but with fewer executors or (b) keeping the number of executors constant but making the cluster larger?

I’ll take a look at that enrichment, sounds like the direction we will be interested in going.

Thanks a lot!

Your cluster seems very large enough for the data it’s currently handling so I would suggest just making the executors bigger but keeping the cluster size the same.