Migrating the Snowplow batch jobs from Scalding to Spark

This is a Request for Comments on moving the Snowplow batch jobs from Twitter Scalding to Apache Spark, and associated efforts.

We welcome feedback on all of our RFCs! Please respond with your thoughts and comments in the thread below.

1. Background

When we started building the Snowplow batch pipeline 4.5 years ago, Apache Hadoop was the dominant dataflow framework, with first-class support on AWS via the Amazon Elastic MapReduce (EMR) service.

Because Hadoop’s MapReduce API was too low-level, we made the decision to use a framework on top of Hadoop; we chose Scalding, an idiomatic Scala API from Twitter, with Scalding itself sitting on top of Cascading, a Java framework for building ETL pipelines on Hadoop.

Fast forward to today, and we now have three jobs written in Scalding which constitute our batch-based event data pipeline:

  1. Scala Hadoop Enrich
  2. Scala Hadoop Shred
  3. Hadoop Elasticsearch Sink

These are all designed to run on EMR, although the amount of AWS-specific code in these jobs is in fact minimal.

We have a further job, Hadoop Event Recovery, which is not part of the core Snowplow flow but is used in an ad hoc fashion on EMR to recover raw events which failed prior processing.

All four of these jobs use the following versions:

  • Hadoop 2.4.1
  • Cascading 2.6.0 or 2.7.0
  • Scalding 0.11.2 or 0.15.0

2. Pain points

Our Scalding/Cascading/Hadoop stack has been extremely robust in production across many Snowplow customers and users, happily crunching through many billions of events for users each day. However, we have identified three major pain points with this stack, which are becoming more pronounced as time goes on:

2.1 Hadoop is excessively disk-bound

We soon learnt that reading lots of small files in S3 directly into Hadoop was a performance killer (see our blog post Dealing with Hadoop’s small files problem). So instead, we copied and compacted files from S3 onto the transient EMR cluster’s local HDFS storage ready for processing.

We then realized that writing out to S3 was similarly slow, and so we again updated our jobs to write to HDFS, and then copied the data back out to S3.

Finally, we realised that our jobs were being effectively run by Hadoop twice: once to generate the good output (enriched events), and a second time to generate the bad output (events failing validation). This was an artifact of the way Hadoop is designed around one output for a given set of inputs. So, again we added an output to disk, this time caching the intermediate output (essentially Either[Success, Failure]) to disk to minimize the amount of work that needed doing twice.

By stealth, we had ended up with an extremely disk-intensive workload - especially compared to our Stream Enrich worker for the real-time pipeline, which merrily processes huge event volumes without touching disk.

This disk-intensive workload causes pain in a few ways:

  1. We believe that we are having to significantly overprovision EMR clusters relative to our jobs’ true memory, CPU and I/O requirements
  2. With larger users we see fairly regular job failures due to the cluster’s HDFS storage filling up
  3. We have to use on-demand EC2 instances in our EMR clusters, because we are dependent on those instances’ local storage

2.2 Code reuse between our batch and real-time pipelines is impossible

Attempts to build abstraction layers on top of Hadoop and real-time engines, such as Twitter’s own Summingbird, have not gained much traction, largely because they are trying to abstract at a semantic level over processing frameworks which are operationally very different.

As a result, at Snowplow we have ended up with essentially two parallel implementations of our pipeline:

  • Scala Hadoop Enrich vs Stream Enrich
  • Hadoop Elasticsearch Sink vs Kinesis Elasticsearch Sink
  • Scala Hadoop Shred plus StorageLoader have no equivalent in the real-time pipeline currently

We can somewhat minimize code duplication via careful extraction of libraries, but it still leads to having a lot of separate apps to manage.

2.3 Slow pace of innovation in Hadoop

At this point Hadoop is a very mature technology - operationally predictable and reliable, but evolving very slowly. Similarly, Cascading and Scalding are effectively “done” at this point - mature technologies getting limited performance and correctness improvements.

While there is merit in using mature, battle-tested software, much more innovation seems to be happening around Apache Spark, Apache Flink and Apache Beam.

Recent R&D and thoughtpieces from those projects (such as “Apache Spark as a Compiler” from Spark, or “Time and Order in Streams” from Flink) show that these platforms are rapidly advancing the state of the art for event data pipelines like Snowplow to build on top of.

3. Enter Apache Spark

We have been working with Spark in various capacities at Snowplow for several years:

Working with Spark, we have seen the platform evolve rapidly, crucially getting first-class support on Amazon EMR as well as other dataflow fabrics such as Google Cloud Dataproc and Azure HDInsight.

Most importantly, Spark seems to address the three painpoints set out above:

3.1 Excellent in-memory processing performance

There have been plenty of claims and counter-claims about Spark’s performance and reliability relative to Hadoop.

At Snowplow we have been rather cautious about these claims, believing that the best validation for us would be to actually port our jobs to Spark - but knowing that this itself would require committing serious resources on our side.

However, the evidence has been steadily mounting that a switch to Spark would reap serious performance benefits for Snowplow. I attended an excellent talk by Xavier Léauté from Metamarkets at Strata London - Metamarkets are an ad analytics business who process many billions of events each day; in his talk “Streaming analytics at 300 billion events per day with Kafka, Samza, and Druid”, Xavier explained that:

  • Metamarkets had been long-term users of Hadoop/Cascading/Scalding, but had moved all their workloads to Spark
  • They were extremely happy with the move
  • They no longer used HDFS for their event processing at all - instead their Spark jobs read from and write directly to S3, and this was highly performant for them
  • As a result, Metamarkets was able to switch to using cheaper spot instances for all Spark jobs

Combined with anecdotal evidence from other Spark users, there seems to finally be enough justification on performance grounds for going ahead with at least an exploratory port to Spark.

3.2 Single codebase for batch and micro-batch

Spark Streaming is well known to be a micro-batch system rather than a true streaming system: essentially an event received receives the incoming event stream and discretizes it into small micro-batches ready for the actual Spark engine to consume (see “Diving into Apache Spark Streaming’s Execution Model” for more information).

While this architecture has come under some criticism from proponents of competing frameworks, Spark’s approach is actually a good fit for cases where true streaming is not an option. A good example of this is loading databases like Redshift and Postgres which for performance reasons need to be fed in batches, even if the source data is in a stream.

In the case of loading Redshift, rather than having one framework for loading Redshift from batch and another for loading it from stream, we could have a single Spark app, which works in two modes:

  1. A Spark configuration which reads a batch of enriched events from S3 and loads them into Redshift
  2. A Spark Streaming configuration which reads a micro-batch of enriched events from Kinesis and loads them into Redshift

So we have the potential of a single codebase to provide a capability for both Snowplow pipelines. To be clear, this won’t be suitable for all Snowplow jobs, but it should reduce some duplication.

3.3 Evolving fast

Spark is evolving quickly, reaching version 2.0.0 in July this year. They are innovating on multiple fronts - not just Spark itself and Spark Streaming, but also GraphX for graph computation, MLlib for machine learning, and Spark Datasets for type-safe processing over structured data.

While the framework needs of our batch processing jobs are relatively straightforward today, having access to these more advanced capabilities in Spark for the future is hugely interesting.

4. A proposal for moving to Spark

4.1 Phase 1: porting our batch enrichment process to Spark

The most obvious place to start is with our batch enrichment process. As soon as we have re-implemented this as Scala Spark Enrich in Spark, we can run load tests using Avalanche to compare the performance relative to our existing Hadoop version.

If performance is close to Hadoop, or better, then we can proceed to the next stage.

4.2 Phase 2: porting our Redshift load process to Spark/Spark Streaming

Currently our Redshift load process involves running a job on Hadoop, called Scala Hadoop Enrich, followed by StorageLoader, a JRuby app which runs on the server which orchestrates your pipeline.

Regardless of a potential port to Spark, we are planning on:

  1. Updating StorageLoader so that it is executed from the EMR cluster itself, rather than from your orchestration server. This should be more performant and more secure
  2. Rewriting StorageLoader in Scala. We want to deeply integrate StorageLoader with Iglu, for which we have an Iglu Scala Client

Once these are done, we can look at a full re-implementation of our Redshift load process using Spark:

  • We would merge Scala Hadoop Shred and StorageLoader (having been rewritten in Scala) into a single app, Scala Redshift Sink
  • Using Spark, this app would support loading enriched events from S3 into Redshift as part of the batch pipeline
  • Using Spark Streaming, this app would also support loading enriched events from a Kinesis stream into Redshift, as part of our real-time pipeline

When all this was done, we would have a single app which could be used as part of our batch or real-time pipelines.

4.3 Phase 3: porting our remaining apps

With this successfully completed, we could look at porting the remaining apps to Spark:

Although the business rationale for porting these two apps is less clear-cut, there could be benefits in standardizing on Spark for all of our batch apps, and indeed in these apps being usable via Spark Streaming in our real-time pipeline.

4.4 Out of scope

One piece of work which is out of scope for now is replacing our Stream Enrich component with a Spark Streaming configuration of the new Scala Spark Enrich.

Today, Stream Enrich is a standalone app built around the Kinesis Client Library; it has been around for several years, has a minimum of moving parts and is easy to reason about.

There are some valid grounds for not migrating this app to Spark:

  1. Stream Enrich is a simple worker app using the KCL. Spark Streaming also uses the KCL, but has a lot of additional Spark-idiomatic complexity to manage on top of this
  2. Stream Enrich’s buffer options can be adjusted to work on individual events at a time - this is more flexible than the micro-batch configuration available with Spark Streaming

Given these points, a port to Spark Streaming would increase the complexity of Stream Enrich, while also decreasing that component’s flexibility.

REQUEST FOR COMMENTS

As always, please do share comments, feedback, criticisms, alternative ideas in the thread below.

13 Likes

This is super exciting stuff.

It comes at an ideal time as EMR 5 has been recently released which will introduce better debugging support but more importantly includes the latest version of Spark - 2.0. We’ve had some great experiences using Spark on Snowplow data so far so it’ll be great to experience the performance improvements that have been made since 1.6.

Thanks for the feedback @mike!

On building for Spark 1.6 (using Scala 2.10 on EMR 4.x) versus Spark 2 (using Scala 2.11 on EMR 5.x) - I think we may have to start with Spark 1.6, because so much of Snowplow today is tied to Scala 2.10.

We are aware that Scala 2.10 is pretty old at this point - we have a ticket to address the move to Scala 2.11, #2824 Common: massive ecosystem bump, but this will take a lot of time to do properly.

I’d rather not make that ecosystem bump a dependency for the Spark work, so I reckon we start with Spark 1.6 / Scala 2.10 / EMR 4.x - unless there’s another option I’m missing?

I’d love to see shredding moved to Spark streaming. At the moment we maintain the EMR batch jobs just so we can load into redshift and they are totally over-provisioned to prevent them failing when hdfs utilization hits 100%. It is a real pain point for us. It would be great to have redshift updated closer to real-time as well.

Would love to see this! Is a much better fit for our ecosystem!

hey Snowplow team - i was wondering how this was going, and if you have steps/issues/repo tracking your work on this?

We are very interested in getting set up with spark, and have been able to load snowplow data into our spark cluster following the other tutorials, but also want to assist, if possible, in getting the batch enrichment process going. Happy to help, just didn’t want to duplicate work if you guys have already started.

Thanks!

Hey @13scoobie - I know a community member, Phil K, has been doing a lot of work on this port. Let me ping him and find out where he’s got to…

1 Like

sounds good - not sure if you guys (or Phil K) have a Trello board or gh repo with issues that we could open up to community to hack on.

Sure thing @13scoobie - let me check with Phil and see if what he has can be put into a PR for further hacking on. I know he was working on Phase 1: porting our batch enrichment process to Spark.

It doesn’t make sense currently to work on Phase 2 as we’re doing so much active development on Hadoop Shred and StorageLoader in their current form - it’s a moving target.

Anyway let me chat to Phil and see what he says!

Thanks @alex ! Look forward to hearing back and contributing where I can

Wow, that was fast! Phil has put together a PR of where he got to - @13scoobie you can treat this as a jumping off point:

https://github.com/snowplow/snowplow/pull/2984

1 Like

Hey @13scoobie - did you have a chance to look at the PR - is it something you can pick up?

Hey @alex - i havent had a chance to look at it yet, but will update as soon as i take a look. Definitely exciting stuff and anxious to get spark plugged in!

Alex,

I’m 100% in agreement with the project objective and would like to respond in two parts. First boring - my own attempts to minimize EMR exposure. Then somewhat exciting - what I think is within a realm of non-spark possibilities.

How I put together a snowplow processing pipeline to attempt not having EMR in my rollout, and failed.
I used elastic beanstalk and AWS managed service to deploy snowplow application comprised of three environments:
a. streaming collector service - elastic load balanced with publicly exposed elastic IP address and placement of the EC2 instances into a secure, private subnet. Built on the “Web server” EB model. In this instance collector used is a streaming collector to push incoming data streams into amazon kinesis streams (good / bad )
b. iglu service - elastic load balanced with publicly exposed elastic IP address and placement of the EC2 instances into a secure, private subnet. Built on the “Web server” EB model with RDS/Postgres option enabled and DB replicated across multiple availability zones.
c. streaming enrich service - elastic load balanced (based on CPU usage) cluster with completely private placement. IT reads from collector good kinesis stream, pushes to kinesis streams. One of these streams I’ve attached a lambda function to in order to copy stream data from kinesis stream to kinesis firehose delivery stream that is configured to insert data directly into a redshift table. I created the destination table as a hybrid of previous versions of atomic.events to retain all those expensive to keep around context /json payload fields recently stripped from the atomic.events table.

Results - data collected is in the redshift database approximately 60 seconds later. All is good, BUT WHERE ARE THE CROWN JEWELS? I missed the shredded tables! And hence my failure - it would have been perfect if shredded tables were loaded at the same time. Unfortunately the only way to load them is to use EMR ETL. EMR etl jobs are constantly failing, often in a way that requires careful manual intervention, preceded with a careful investigation - looking through the logs to determine what failed and where to recover from. I wrote python scripts to move raw incoming data into processing bucket/prefix more reliably and then skip staging step. That gave me better operational stability, but it is still not stable - jobs still fail in other steps. I have to constantly be on a watch and can hardly concentrate on other tasks because watching the EMR cluster always takes my attention away from new endeavors. By the way, EMR cluster takes 16 minutes to provision, 10 minutes to complete the job (26 minutes total) and AWS charges for a whole hour times the number of nodes, times the number of CPU cores - that’s a lot of money for busy work! In my case I pay for 60 minutes even though I only use the resources for 10 minutes. Half of that is repeating the enrichment step I have already performed using the stream enricher. …Not cool.

So enough with the boring part. And let’s examine the possibilities. Apache Apex project, which is in fact Data Torrent open sourced, allows to deploy streaming jobs to EMR in a whole new way. They operate on DAGs, each DAG node having input and output ports that can be programmatically defined and connected at runtime if necessary. They use YARN as an operating system to automatically scale their applications within the Hadoop cluster resource pool. Side note: the guys behind the project are a pleasure to speak to on the phone. Software Developers are smart and Business Developers are honest. So, I was thinking, maybe we could port the EMR ETL to Apex/Malhar operators - specifically the shredder and see how it behaves in the streaming environment receiving enriched data on the input port and automatically adding an output port whenever new contexts are encountered. Finally, terminating in a database load (potentially preceded by running DDL corresponding to the new contexts to create target tables and prevent the pipeline from stalling ).
I know this is a deviation away from the Spark plan, but I though it is still worth discussing.

Sincerely,
David.

3 Likes

Thanks for sharing this super-detailed feedback @dashirov! It sounds like you have been on a really interesting journey.

A few thoughts:

  • You are completely right - custom approaches to drip-feeding Redshift, e.g. using Kinesis Firehose, miss out on the complex process of loading shredded events and contexts, which as you say are the “crown jewels” of your event stream
  • I am surprised by your EMR failure rates - they sound much higher than the rates we see across our customer base (observed failure rates inside EMR are <0.1%)
  • Your criticisms of transient clusters are valid - there is a lot of “ceremony” required to boot the cluster, and then you have the rounded-up pricing from Amazon

On porting the EMR ETL to Apache Apex - I think this is a really interesting idea.

As mentioned in the RFC, by the end of our proposed migration you will be able to drip-feed Redshift via our load code running in Spark Streaming, consuming from a Snowplow enriched event Kinesis stream (and eventually Kafka topic).

There is an open question what the deployment fabric should be for this job - should the Spark Streaming job be deployed to EMR, should it use Apex? Should we assume users will just “bring their own” fabric, like Mesos or YARN or Kubernetes?

What do other Snowplowers think?

We have an exciting update for this RFC - Snowplow data engineer @BenFradet is now working full-time on this port of our core Scalding jobs to Spark. I’ll leave Ben to follow-up on this thread with a little more detail on his work, including supported versions, links to the WIP branches, etc.

4 Likes

As @alex mentioned, we’re currently moving forward with the port of our existing jobs to Spark.

You can track our progress on the enrichment job in pull request #3077 and on the shredding job in pull request #3094.

We’re aiming for Spark 2.1.0 since it’s the latest version supported by EMR and, because EMR built it against Scala 2.11, we’re also moving the couple of projects which were not yet cross-built against 2.11 to 2.11 (scala-forex and the Iglu Scala client, for example).

Don’t hesitate to post here if you have remarks or comments.

6 Likes

Awesome. It’s exciting to hear that this will support the latest version of Spark.

I wonder if Spark SQL enrichments might be an option in the near future!

2 Likes

I am assuming your are done with this migration. What tips/learning would you share for moving code written in scalding to spark?