RFC: making the Snowplow pipeline real-time end-to-end and deprecating support for batch processing modules

As part of our drive to make the Snowplow community more collaborative and widen our network of open source contributors, we are regularly posting our proposals for new features, apps and libraries under the Request for Comments section of our forum. You are welcome to post your own proposals too!

Introduction

This RFC covers two related proposed goals:

  1. To make the entire Snowplow pipeline (i.e. every Snowplow microservice) real-time
  2. To deprecate support for batch versions of microservice components

Our assumption on writing this RFC is that the first goal is not controversial, but the second one might be. However, we look forward to hearing back from the Snowplow community whether this is correct or not.

A bit of history

Looking back, two things are very notable about the first version of Snowplow, launched way-back in February 2012:

  1. The Snowplow pipeline was composed of discrete, independent microservices. (E.g. a collector, an enrich module.)
  2. All of these microservices processed data in batches

That each microservice processed the data in batches is very unsurprising: it was very uncommon in 2012 to process stream data in real-time - Kafka was only open sourced in early 2011 and Amazon Kinesis was only launched in November 2013.

As real-time processing technologies including unified logging and stream processing frameworks evolved, we built out support for real-time processing in Snowplow by building real-time versions of our core components: in the first instance, a real-time collector (the Scala Stream collector) and the stream enrich module. New Snowplow users could therefore decide whether they wanted real-time processing or not, and choose the appropriate collector and enrichment microservice accordingly.

This was just the start of a journey. Soon, we started building new microservices that were real-time only - the first was our Elasticsearch Loader.

Then, when we built a version of Snowplow to run natively on GCP, we built each new microservice for GCP so that it was real-time only.

Today, as we look across the Snowplow estate, we see:

  • Microservices that are available in multiple flavours: some real-time, some batch. (E.g. collectors, enrichers.)
  • Microservices that are batch-only (e.g. the Redshift and Snowflake loaders)
  • Microservices that are real-time only (e.g. the BigQuery loadern (although this can be run in a batch simulation mode), the Elastic loader, the Indicative Relay)

We believe that now is a good time to tidy up this estate, and evaluate where to focus our development efforts going forwards, to deliver the most for the Snowplow community.

Upgrading all our microservices to be runnable in real-time

With our core microservices available in batch and real-time flavours, Snowplow can be setup in a “Lambda Architecture” - as presented in Nathan Marz’s 2015 Big Data book. In this approach, batch based processing is the reliable backbone of the big data pipeline, and real-time processing is used to provide a “speed layer” for low latency queries. It is common, for example, to stream Snowplow data into Elasticsearch to support real-time dashboards and APIs, and load the same data in batch into Snowflake DB or Redshift, so support more sophisticated but less time-critical analysis.

The Lambda Architecture made sense in a world where stream processing was inherently less reliable than batch processing. That assumption is not true today, and it has not been true for some time. Unified log technologies and stream processing frameworks have evolved to the point they can be dependently used for business critical processing of big data sets at low latencies. Given that, the case for building batch-based microservices is much weaker, and that is why we focused all our efforts when we ported Snowplow to run in GCP on a real-time version.

Now we have the opportunity to look at those remaining microservices that only run in batch: notably our Redshift and Snowflake Loaders, and port them to run in real-time. This is something that we are looking to do between now and the year end.

Deprecating our batch technology

If it is possible to run Snowplow pipelines real-time, end-to-end, are there still compelling reasons why someone would want to run them in batch? After all, the benefits of real-time are very real:

  1. The data is available at very low latency. This means that not only can real-time reports and dashboards be delivered, but the data can be made available for real-time data applications like personalisation engines, marketing automation, health monitoring etc.
  2. The real-time pipeline can be setup to autoscale - gracefully handling traffic fluctuations without human intervention. (In comparison, it is typical for batch users to need to reconfigure the size of their EMR clusters to handle shifting traffic patterns.
  3. The real-time pipeline is more stable - esp. at very high volumes. One of the issues we have found with companies running batch data pipelines processing billion+ events per day is that in the event of issues arising on the EMR cluster can cause a whole batch to fail. This means the job needs to be rerun from scratch, during which time data “backs up”, requiring progressively larger (and more expensive) EMR clusters to clear the backlog.
  4. At any sort of scale, real-time pipelines are typically much more resource efficient than batch pipelines: clusters don’t idle or spend time in bootstrap; stream topologies can be configured to scale so that each node is being effectively utilised, with enough headroom to buy a bit of breathing space to spin up new nodes in the event of a traffic spike. As a result, at higher volumes we find real-time pipelines more cost effective than their batch counterparts.

We think there are two reasons why some might prefer running the batch technology:

  1. Batch processes are typically easier to setup, manage and maintain. In the event of any issues, it is generally straightforward to reason about where a failure has occurred and resume the pipeline appropriately to avoid data loss or double processing of data.
  2. At low volumes in particular, batch processing may be more cost effective than real-time processing, which requires (at least in Snowplow’s case) the constant running of EC2 instances for the different microservices out of which a pipeline is composed.

Both of these issues are broader than batch vs stream processing, however:

  1. The Snowplow open source stack (both the real-time and batch versions) are too difficult to set up, manage and run on an ongoing basis.
  2. In both batch and real-time, the architecture of Snowplow is such that is more expensive than it needs to be at low volumes. (Because it is architected for scale, therefore using services like EMR on AWS or Dataflow on GCP.)

We think we have really great opportunities to reduce the cost of running Snowplow at low volumes, and making Snowplow easier to run:

  1. We want to focus our efforts on making our real time open source pipeline even easier to setup, run and manage. This will be a key focus in 2020, in particular: we would love to get to a one-click install. (Let’s see how close we can get.)
  2. We want to focus on dramatically improving Snowplow Mini (e.g. by adding Postgres support) so that it is much more cost effective option for companies that want to do more advanced digital analytics but don’t require big data scale.

Given the above, we think that not many people and companies would benefit in running Snowplow in batch mode, and that we would do better to concentrate our and the community’s engineering resources on our stream processing versions (and the other improvements we’ve highlighted above) than continuing to support batch processing in Snowplow. It is for that reason that we would like to deprecate our batch technology.

Proposed next steps at Snowplow

Before we look at which microservices we will deprecate first, we should stress that we are committed to supporting all the functionality currently supported in batch in stream - that includes:

  • Enabling users to keep a persistent log of all the events ever recorded to disk (S3 or GCS)
  • Enabling users to reprocess their entire event data set, if they wish
  • Enabling users to debug, recover and reprocess “bad rows” (i.e. events that were not successfully processed)

The first two components we plan to deprecate support for are the Clojure Collector and Spark Enrich. We do not plan to release new versions of either of these microservices. That is particularly important to note for the Clojure Collector, because Amazon announced a few months ago that they were deprecating support for Tomcat 8 on Elasticbeanstalk, which the Clojure Collector uses under the hood. As we will not release an updated version of the Clojure Collector that removes this dependency, Clojure Collector users will need to migrate to running the Scala Stream Collector instead. We will be posting guidance for users migrating from batch -> real-time shortly, to help users going through the process do so successfully.

We intend to formally deprecate support when we release the new version of Snowplow with the refactored bad row format, which we hope to publish by the end of Q3. As part of this will be releasing new versions of:

  • The Scala Stream Collector
  • Stream enrich (which runs on AWS)
  • Beam enrich (which runs on GCP)

all of which will emit bad rows (failed events) in a new, highly structured format.

As part of this release we will move the Clojure Collector and Spark Enrich out of the core Snowplow/Snowplow repo into the Snowplow Archive. We hope this will make it clear to users that these microservices are no longer supported by Snowplow, but make the full source code easily accessible for anyone who wants it. At this point we will no longer publish any releases including publishing maintenance releases. We will take the same approach with the other microservices we deprecate support for.

In parallel we plan to start work on porting RDB Loader and the Snowflake Loader to run as stream processing jobs: expect an RFC with our proposed architecture shortly.

Once these have been published we plan to deprecate support for EmrEtlRunner: as it will no longer be necessary to run batch pipelines as part of the Snowplow setup. The proposed sequence of events is illustrated below:

Recommended next steps for companies running batch components (esp. the Clojure Collector and Spark Enrich)

We will be publishing documentation walking users of the Clojure Collector and Spark Enrich through the process of upgrading to the Scala Stream Collector and Stream Enrich shortly, and will post links in the thread below.

We would love your feedback!

What do you think? We’d love your feedback below :slightly_smiling_face: .

8 Likes

YES!!! and running batch is expensive AWS services wise as well.

We have been eyeing a migration to GCP/RT for some time anyway. The Redshift pricing model is the primary driver. We’re at the point our dense compute nodes only hold 8-9 months of data (which is more than enough for day-to-day use, but not when it comes to comparing YoY etc). Adding new reserved instances that expire at different times throughout the year only makes it harder to switch away from these nodes.

Our main reservations around the real-time pipelines have been:

  • More servers and time required to keep the microservices running(?) - Mint Metrics currently only does a few million events/day
  • Lack of cost visibility over GCP vs AWS vs self-hosted (Is GCP really cheaper?)
  • Lack of knowledge (on my part) around how the RT microservices work together (i.e. Could we run SSC + Kafka on our own metal, but stream into BigQuery?)
  • Migrating to GCP seen as a nuclear option - haven’t seen anyone migrating over from AWS batch, so not sure what to expect

Ultimately, we just need to learn more about the RT setup as it compares to Batch. Despite that, deprecating batch would give us the impetus to migrate to RT sooner rather than later.

What sort of timeline is the team thinking?

Not sure if it’s under the scope of this RFC but it would be nice to make it easier to add data from s3 as Spectrum tables. Right now, this is a bit painful because of the way the data is stored in S3.

If I had a vote on deprecating batch ETL, I guess I would vote “Well OK, if we must”. I recognize that the folks who actually write the application have limited time they can spend, and I can see that Stream Collector is more broadly useful and in general The Way Forward.

But our current Snowplow installation is great – I hardly ever have to think about it. The client applications are all fine with a data refresh every 24 hours. And the EMR job is not terribly expensive. (Maybe we’re just in the sweet spot with about 800K daily events.)

Anyway, thanks for a great toolset. If the bus is going to stream processing, we’ll get on the bus.

1 Like

Hey @robkingston! Great to hear you’ve been thinking of moving to GCP / RT. To talk to your reservations:

  • The RT infrastructure does use a lot of microservices, which does mean that it can be costly at low volumes, where your each service is minimally specced, but still overspecced for the given event volumes.
  • At low volumes GCP is more cost effective because of the BigQuery pricing model: you can end up spending much less on BigQuery than on Redshift. On the flipside, the minimum specification for the different Dataflow jobs is quite chunky, which means the pipeline on GCP is not much cheaper than AWS at low volumes.
  • If you want the data ending up in BigQuery my strong recommendation would be to setup the end-to-end GCP pipeline using Pub/Sub. We have never even attempted to load BigQuery from Kafka.
  • There are different ways of migrating from batch -> RT and AWS -> GCP. Might be worth exploring on a separate thread. (Certainly we’re going to put together documentation to assist with the former.)

Timeline-wise we’d be looking at deprecating support for the Clojure Collector and Spark Enrich in the next 3-6 months.

Hope that helps!

Yali

Thanks for the feedback @wleftwich. Is there anything we can do around helping you from batch -> RT to persuade you to stay on the bus?

@rahulj51 out-of-the-box support for Spectrum is something that we’re looking to add after porting RDB Loader to run as a real-time process.

Awesome… that’s very helpful @yali.

Based on the docs, Google Pub/sub doesn’t specify any limit to “outstanding messages” for pull-delivery beyond 7 day retention - in that sense couldn’t you under-spec Beam Enrich for spikes in events and process accumulated events gradually throughout the day?

Even with Beam Enrich under specced you’ve got to run at least one streaming worker which is around about $70 / month (per Dataflow job with minimum configuration in terms of vCPUs / memory). Depending on your configuration you’ll want to be running at least 2 Dataflow jobs (enrichment, BigQuery loader) though more if you want redundancy for enriched bad and failed BigQuery writes.

Depending on your event volumes (and providing you are reusing the PubSub subscriptions) you could in theory spin up and terminate the streaming Dataflow jobs every few hours which would probably save money but you’d have to worry about backpressure.

1 Like

From my perspective, equivalent real-time support for all supported Cloud providers sounds like a smart move.

Thanks @yali. I am definitely on the bus, just have to make the time to change over to RT. I’ll study the docs and holler for help when I get stuck.

Great to hear its still working well! We just configured the batch setup for AWS this month and continue to grow the analytics and research.

We are migrating our data warehousing to near real-time loading and analytics using Snowflake DW. A more cost effective streaming process is aligned with our overall architecture. Our intent is to load snowflake in “micro-batches” vs “continuously”, I’m hoping that will be a feature.

1 Like

Hi @robkingston it’s possible to run SSC and stream to BQ directly from Kafka (we still use similar approach), but for that you would need to have an additional service acts as consumer to kafka and processes messages then pushes them to BQ via stream API. I’m not sure if it would be much more cheaper than going the dataflow route as it would require hosting kafka cluster and other pieces that tie everything together which requires some effort and monitoring “if something goes wrong”. I would go that route only if you would have some other use cases for kafka cluster and custom vm’s on gcp (most likely kubernetes) besides snowplow data collection.

1 Like

“The Snowplow open source stack (both the real-time and batch versions) are too difficult to set up, manage and run on an ongoing basis.” This is very true. The docs are detailed and comprehensive but also complicated and confusing.

Snowplow mini is an awesome idea. You’ll get small companies migrating to your managed subscriptions as they scale.

Focus on easier setup:
That would be great, with a reduced set of options and more opinionated recommendations. We’ve spent a great deal of time setting up Terraform for the cloudfront collector & spark enrich & emretlrunner and would appreciate some pre-existing Terraform templates for the stream-only pipeline. Cloudformation would probably also be appreciated by some. Even if it’s not 100% at least they can be modified a bit to fit in with different VPC scenarios.

  • Autoscaling EC2:
    Why not something like AWS Fargate on ECS or modern serverless container management like Kubernetes / AWS EKS ? We’ve moved away from Autoscaling EC2 because it’s slow to spin up instances, complicated to get the scaling rules right, and often unreliable to the point we were manually killing instances to force it to spin up new ones. Fargate is much nicer once all the (complicated) infrastructure config is set up. It’s been cheaper for us too. It would also scale down better for your smaller customers.

Streaming RDB/Snowflake loader:
How will you make these stream-only? The AWS best-practice docs (which often aren’t) say that using COPY from S3 to lift files in parallel is the most performant way to ingress data. If the files are congruent with sort & dist keys on their target tables then minimal vacuuming is required.
In complete denial of that best-practice, we use a custom ETL tool to do logical replication from Postgres to Redshift and we found that issuing delete or delete+insert statements in transactions as literal SQL statements ( e.g. insert into table… values ()) actually works just fine. Especially when they are small and frequent. I’m interested to see what you end up with; I suspect that ingesting from kinesis, dumping to s3 and then loading to Redshift is really just extra steps and complication, as long as back-pressure in kinesis is dealt with.

We currently use Redshift for our Snowplow data, but we have a small cluster and way too much Snowplow data, so we ended up using AWS Glue catalog & Redshift Spectrum to ingress the data. From Redshift, we create an external database & schema, then within AWS Glue ( actually specified and created using Terraform) we have the snowplow events table DDL. The only complication is adding new partitions to that atomic events table as new data arrives. There’s a few ways to do that but we added it to Glue jobs using boto3. This setup gives us the Redshift Spectrum advantage of separating storage from compute ( yes, snowflake, bigquery, athena are way ahead of Redshift on that front). On the downside it means the reporting & analytics models (e.g. looker, dbt) aren’t possible to use, but we have some custom AWS Glue jobs in place of that. I like Spark… but I am not a fan of the AWS Glue’s weird wrapper over pyspark. It is hard to get working.

Snowflake has the option of using Snowpipes to load data from S3 as well, using SQS events or direct via API, which is fast and much cheaper as it doesn’t require (the user) running a virtual warehouse. The idea of the Snowplow pipeline needing to run yet another EC2 Autoscaling cluster with (I am guessing) a Scala app to load to Snowflake might perhaps be necessary if it’s directly feeding from Kinesis or Dataflow, but if it is picking up from S3 then I wonder if this is overkill when the Snowpipe would mean less infrastructure to manage.

I do this, it’s pretty straightforward to accomplish but the csv files are not performant, and new partitions objects for the (AWS glue catalog) tables need to be managed.

We use AWS Glue jobs to compact and convert to parquet files, I also add a new column for partitioning which amounts to derived_tstamp::date as derived_date.

I have a raw events table (csv) and then an events table ( parquet, date partitioned).
The setup requires an external database & schema created in Redshift using an IAM role and that’s all you need to do on the Redshift side. The rest is managed in AWS Glue Catalog and then the Glue jobs.

My biggest concern is that you can’t write to Redshift Spectrum using Redshift. You can now write data using Athena though, so it would probably make more sense to run Athena SQL jobs to do some post-processing/ analytics models rather than Redshift Spectrum.

Our EMR jobs run every 12 hours and they can range from taking 1 hour to taking 8hours + which is pretty awful. We do use page pings which adds a lot of events though. We also have quiet weekends so the variation might not apply to others. The pipeline is pretty stable though, in the last 6 months it has stopped 3 times. Twice for EMR issues that I do not blame on Snowplow ( mostly because EMR is a horrible implementation for running Spark) and once because the emrEtlRunner EC2 instances was inexplicably stopped! I suspect AWS migrating the EC2 to new bare metal or some other shenanigans.

The broader solution is probably to run those jobs more regularly and look at how you provision them - not my area so I can’t say much about it.

However I do think it’s worth mentioning that if you’re concerned about page pings & processing volumes, the latest version of the JS tracker lets you use a callback to aggregate them client-side.

3 Likes

Hi @davehowell,

The idea is not to make them fully-streaming, but to get rid of its overall batch architecture with EMR cluster including separate steps, and instead to switch to something like:

  • Constantly running Transformer job, getting data from Kinesis, transforming and sinking it to S3
  • Stateful, long-running Loader executing COPY and ALTER TABLE statements, which given a nature of COPY will be far from real-time, but faster, cheaper and easier to maintain

Transformer would then send a message to tell Loader to trigger loading. Overall I belive this architecture is very similar to Snowpipe, but generic enough to let us combine Snowflake and RDB Loader into a single family of applications in future (and support other similar OLAPs!), lets us to exectue ALTER TABLE statements, which IIRC is not possible with Snowpipe.

There are many minor and major pros and cons with this approach and every pipeline has very different set of pain points, but pros in general:

  • No more S3DistCp
  • No more EmrEtlRunner
  • No resources wasted by Loader running on a big EMR cluster
  • No scaling-with-backlog problem
  • Reduced delay (not as much as we would like to, but still it should be much faster)
  • No more Spark tuning
1 Like