Upgrading your Snowplow batch pipeline on AWS to real-time
Background
The Snowplow Clojure collector, Cloudfront collectors and Spark Enrich are being deprecated. In this post we’ll explain the steps that should be taken by an open source user running those components to migrate to the real-time infrastructure.
Architecture overview
Cloudfront Collector (Batch)
Snowplow users running the Cloudfront collector will have a pipeline that looks as follows:
To break this diagram down slightly further, the following steps happen as the pipeline processes events:
- The Cloudfront collector receives requests from trackers containing events.
- The Cloudfront collector writes Raw Cloudfront logs to S3.
- The Raw Cloudfront logs are consumed by Spark Enrich, instrumented by EmrEtlRunner, and outputs Enriched data to S3.
- The Enriched data on S3 is then consumed by the Relational Database (RDB) Shredder, instrumented by EmrEtlRunner, and outputs Shredded data on S3
- The Shredded data on S3 is consumed by the RDB Loader, instrumented by EmrEtlRunner, and delivers your data into Redshift.
N.B Typically the EmrEtlRunner is used to spin up an EMR cluster that runs Spark Enrich, RDB Shredder and RDB Loader sequentially, taking the raw logs published by the collector, processing them (including validating and enriching them) and writing them into Redshift in a single step.
Clojure Collector (Batch)
Snowplow users running the Clojure collector have a similar architecture to the Cloudfront collector. The difference is that a different collector is configured.
Like the Cloudfront collector, the Clojure collector outputs raw logs to S3 and these are then used as an input for Spark Enrich:
- The Clojure collector, running on Elastic Beanstalk, receives requests from trackers containing events.
- The Clojure collector writes Raw Tomcat logs to S3.
- The Raw Tomcat logs are consumed by Spark Enrich, instrumented by EmrEtlRunner, and outputs Enriched data to S3.
- The Enriched data on S3 is then consumed by the Relational Database (RDB) Shredder, instrumented by EmrEtlRunner, and outputs Shredded data on S3
- The Shredded data on S3 is consumed by the RDB Loader, instrumented by EmrEtlRunner, and delivers your data into Redshift.
Scala Stream Collector (Real-Time)
The real-time setup is a bit different, which uses our Scala Stream Collector and Kinesis.
- The Scala Stream collector, running on Auto Scaling EC2 Instances, receives requests from trackers containing events.
- The Scala Stream collector writes Raw data to a Kinesis Raw stream.
- The Kinesis Raw stream is consumed by Stream Enrich, and outputs Enriched data to Kinesis Enriched stream.
- The Enriched data on the Kinesis Enriched stream is then consumed by the Relational Database (RDB) Shredder, instrumented by EmrEtlRunner, and outputs Shredded data on S3
- The Shredded data on S3 is consumed by the RDB Loader, instrumented by EmrEtlRunner, and delivers your data into Redshift.
Writing Raw data to S3
With the above set up, one key difference will be that you are no longer writing the Raw data to S3. Storing the Raw logs in S3 is a loss averse strategy that will allow data to be replayed through the pipeline in the case of a failure at a later pipeline stage. Additionally, by writing Raw logs to S3, you can continue using Spark Enrich with the Scala Stream collector whilst you work on upgrading to Stream Enrich.
To store the Raw logs in S3, you can utilise the snowplow-s3-loader to sink the collector payload data into S3. To save repetition, further information on setting up Snowplow S3 Loader can be found below.
Upgrading your pipeline
We recommend performing the upgrade as follows:
- Upgrading to the Scala Stream collector (either from the Cloudfront or Clojure collectors)
- Upgrading from Spark Enrich to Stream Enrich
With both of these steps complete, your Real Time Architecture (minus DB Loading) should look similar to this:
Upgrading your collector to the Scala Stream collector
It is possible to upgrade just your collector to the Scala Stream collector, whilst continuing to use Spark Enrich. Your pipeline will look as follows:
- The Scala Stream collector, running on Auto Scaling EC2 instances, receives requests from trackers containing events.
- The Scala Stream collector writes Raw data to a Kinesis Raw stream.
- The Snowplow S3 Loader consumes the Raw data from the Kinesis Raw stream and sinks the Raw data into S3.
- The Raw data in S3 are consumed by Spark Enrich, instrumented by EmrEtlRunner, and outputs Enriched data to S3.
- The Enriched data on S3 is then consumed by the Relational Database (RDB) Shredder, instrumented by EmrEtlRunner, and outputs Shredded data on S3
- The Shredded data on S3 is consumed by the RDB Loader, instrumented by EmrEtlRunner, and delivers your data into Redshift.
To achieve this set up you will need to following the following steps:
-
Setup a Kinesis Raw stream. This will be where the Scala Stream collector will log data to.
-
Setup the Scala Stream collector. This should be setup as an Auto Scaling set of EC2 instances behind a Load Balancer. We recommend CPU based scaling.
-
Setup the S3 sink, using Snowplow S3 Loader, to load the Raw requests from Kinesis to S3.
-
Update Spark Enrich to consume the Raw logs in S3.
The pipeline should now work end-to-end.
Upgrading from Spark Enrich to Stream Enrich
With your collector upgraded to the Scala Stream collector, the next stage is to add Stream Enrich to the real-time pipeline. This will allow your pipeline to consume the Raw data from the Raw Kinesis stream, and write it to a new Enriched Kinesis Stream. This will eliminate the Spark Enrich step being instrumented by EmrEtlRunner.
The steps required to upgrade are:
- Set up a Kinesis Enriched stream. This is where Stream Enrich will write Enriched data to.
- Set up Stream Enrich to run on a set of Auto Scaling EC2 instances.
- Much of the documentation on Auto Scaling EC2 instance for the Scala Stream Collector also applies here. Additionally, a Kinesis Client Library DynamoDB table is used to ensure workers get allocated shards from the stream - this DynamoDB table should also be auto scaling.
- Setting up Stream Enrich - Stream Enrich should be reading from the Raw Kinesis stream and writing to the new Enriched Kinesis stream.
- Set up an S3 sink, using Snowplow S3 Loader, to take the Enriched data on the Enriched Kinesis stream and write it to S3.
- Finally change the EmrEtlRunner config so you are no longer running Spark Enrich and the
enriched:stream:
points to the Enriched S3 data (See: EmrEtlRunner Stream Enrich Mode). - You should now be using the Enriched data from S3 and only running RDB Shredder and Loader, instrumented by EmrEtlRunner, to get it into Redshift.
The pipeline should now work end-to-end.
Optional extra: Stream the data into Elasticsearch
You can setup the Elasticsearch loader to consume the Enriched data from the Enriched Kinesis stream and write it into Elasticsearch in real-time. Instructions can be found here.