Replacing the Clojure Collector

This is a Request for Comments (RFC) for replacing Snowplow’s Clojure Collector with an alternative event collection approach for the Snowplow batch pipeline.

We welcome feedback on all of our RFCs! Please respond with your thoughts and comments in the thread below.

Why we created the Clojure Collector

We created the Clojure Collector in January 2013 as an alternative to our CloudFront pixel-based collector. The Snowplow community liked the CloudFront Collector for its ease of setup, essentially unlimited horizontal scalability and zero maintenance. However, over time the functional limitations of the CloudFront Collector had become clear, including:

  1. The inability to set a tracking cookie on the collector endpoint
  2. The lack of POST support, restricting us to sending only one event at a time via GET

We had also started to run up against some non-functional challenges, including:

  • Unexpected changes to the underlying CloudFront access log format breaking our Hadoop Enrich process
  • Unexplained delays in CloudFront logs arriving in S3

For these reasons, we elected to design and build an alternative collector for the Snowplow batch pipeline.

It was important that the new collector support a tracking cookie, support POST, and use a logging format that we could control. However, it was also important that the new collector be very scalable, easy to setup and upgrade. From these requirements the Clojure Collector was born.

The Clojure Collector architecture

The Clojure Collector has an unusual architecture, designed to meet the above functional requirements whilst also aiming to minimize the collector’s support burden.

We designed the Clojure Collector (“CC” for the rest of this RFC) around running in Elastic Beanstalk to make deployment and operation easier. At the time, Tomcat application server was the standard deployment target for Beanstalk web applications.

Whilst evaluating Beanstalk with Tomcat, we noticed an interesting thing, namely that Beanstalk had built-in support for rotating the Tomcat server logs to S3 once an hour. We also saw that the Tomcat hosting your Beanstalk application was highly configurable, even down to altering the logfile format.

From this we came up with an unorthodox two-part design for the CC:

  1. We would write a simple Tomcat servlet that would be responsible for setting the collector cookie and any other required behaviors in the request-response loop
  2. We would replace the standard Tomcat logging with a customised one with our preferred format for incoming Snowplow events (both GET and POST)

This design is set out in this diagram:

While the design was technically complex, it was operationally simple: we were leaning on Beanstalk itself to do our log rotation of raw Snowplow events to S3.

We chose to write the Servlet component in Clojure as a pilot for that technology. The pilot was a partial success: we never re-wrote the CC in another language, but we never wrote another Snowplow sub-system in Clojure.

The innovative design seems to have worked: the CC is today the most popular Snowplow collector for both the open source community and our Managed Service customers. And on Discourse, the CC accounts for a relatively low share of support queries relatively to our other sub-systems.

However in the years since its launch some limitations have emerged with the CC, which lead us towards replacing this collector. These limitations are set out in the next section.

Rationale for replacing the Clojure Collector

The first reason for replacing the Clojure Collector is around the logging to S3. By outsourcing this logging to Beanstalk, we have given up a lot of flexibility.

For example, we cannot change the logging frequency from once an hour to perhaps every quarter hour, which would be ideal for frequent pipeline runs. We cannot change the logfile compression to something which is more performant in a Hadoop/Spark environment, like splittable LZO. We cannot address infosec requirements such as encrypting the event data at rest.

While the hourly logging approach is predictable, it means that our raw event logs sit on unbacked-up servers for up to an hour before they are saved off to S3. This adds some risk into our pipeline’s “at least once” guarantees.

The second challenge is around the operational control (or lack of) that we can have around Elastic Beanstalk. Beanstalk is designated as an abstraction layer on top of EC2, Autoscaling Groups and Elastic Load Balancers.

While this can be great for newcomers to AWS, at a certain point this creates more problems than it solves:

  • We have seen new Beanstalk solution stacks for Tomcat with the log rotation to S3 broken
  • We have seen boxes get stuck at 75-80% CPU, failing to rotate Snowplow event logs but never triggering Beanstalk’s health check
  • We have tried and failed to add various monitoring solutions to the CC instances without impacting log rotation

The third challenge has been around duplication of development effort across the Clojure Collector and the newer Scala Stream Collector (or SSC).

We built the SSC from the ground-up to write out Snowplow raw events to various stream and unified log technologies, including Amazon Kinesis, stdout and Apache Kafka (as of Snowplow R85).

The challenge is that as new requirements emerge for our collectors, they now have to be implemented in two different places (the SC and the SSC), in two different languages (Clojure and Scala).

For all these reasons, it makes sense to phase out the Clojure Collector and replace it with an alternative. I’ll cover an alternative approach that works today, rule out another approach, and then propose a new design for the community to comment on.

Alternative 1: using the Scala Stream Collector for the batch pipeline

It is possible today to use the SSC to feed events into the batch pipeline. Essentially you would set up a cut-down version of the Snowplow lambda architecture, composed of just:

  • The SSC, feeding a Kinesis stream
  • Our Kinesis S3 project, reading from the Kinesis stream and writing to S3

You then simply configure your EmrEtlRunner’s config.ymlto use the Kinesis S3 bucket as the aws:s3:raw:in property and set the collectors:format property to thrift.

This is set out below:

This approach has some merit: with the right configs for the SSC and Kinesis S3, we can ensure that events are quickly forwarded from the collector instances to the comparatively durable Kinesis stream, helping deliver our “at least once” processing mandate

With this approach we can also consolidate on one collector codebase, the SSC, and have complete control over how we deploy the components using AWS Elastic Load Balancers and Autoscaling Groups.

However there are some limitations to this approach:

  • It’s much more complex to setup than the Clojure Collector
  • It requires S3, Kinesis and DynamoDB (for Kinesis checkpointing) all to be available for successful event capture, so it is vulnerable to a service outage
  • To handle changes in event volumes you now have to scale three separate components (the collectors, the Kinesis stream’s shards and the Kinesis S3 sinks)

Given these limitations, it’s worth considering alternatives.

Alternative 2: using API Gateway and Kinesis Firehose

What about a “serverless” alternative to the Clojure Collector, using:

This approach is set out below:

By using these very high-level AWS services, we would be exchanging some control for automatic scaling and simple setup - it’s a tempting exchange.

However there are some challenges:

  • We would be back having to maintain two collectors: one for Kafka and this one for Kinesis
  • At the time of writing, API Gateway and Kinesis Firehose are not available in all AWS regions (source)
  • It’s not clear that the CC’s support for cookies and link redirects would be supported by API Gateway
  • Kinesis Firehose’s options for writing Kinesis records to S3 are extremely limited - we don’t believe that it would be possible to write out a Hadoop-optimised format (similar to the output of the Kinesis S3 project) to S3

Our view is that these challenges collectively rule out this option.

Alternative 3: coupling an S3 sink to each SSC instance

An alternative we have been considering is to configure a new collector instance which contains:

  1. The SSC
  2. A local message queue, probably NSQ
  3. An NSQ to S3 sink (not yet written)

The SSC would write all incoming events to a queue in the local NSQ instance, and the NSQ to S3 sink app would be responsible for writing batches of these events back out to S3.

This design is set out below:

There are some advantages to this approach:

  • It’s truly horizontally-scalable, with a shared-nothing approach: just add and remove instances from a single Autoscaling Group to handle changes in event volume
  • It doesn’t introduce dependencies on additional or immature AWS services - it just needs EC2 and S3
  • It’s easy to deploy and configure: we could provide a single AMI in Amazon’s AMI Marketplace, along with instructions on setting this up in an Autoscaling Group with an Elastic Load Balancer in front
  • We are back to using a single codebase for event collection, the Scala Stream Collector - and in any case we plan on adding NSQ support to the SSC for Snowplow Mini to use

The main weakness of this approach is that events will sit around on the collector for some time (perhaps 15 minutes) before being stored to S3, and a server outage will cause event loss in that time. To some extent we can mitigate this by:

  • Reducing the likelihood of a server outage, for example by adding some highly pessimistic health checking into each collector, taking it out of the Autoscaling Group (to drain and shutdown) in the event of a possible problem
  • Reducing the impact of a server outage, for example by using putting the NSQ onto a persistent EBS volume and having a simple tool for recovering the not-yet-stored events from a failed collector’s EBS volume

There is another option we could consider, which is to increase the S3 write frequency to perhaps every minute.

This would generate many more files - with 5 collectors you would be facing 7,200 files a day, which would completely break Hadoop. But this would be a good opportunity to introduce a file consolidation process, for example to reducing each hour’s 300 files down to a single file, in splittable LZO index format.

REQUEST FOR COMMENTS

This is our most open-ended RFC to date - your comments in this thread will have a real influence on the approach that we end up taking to replace the Clojure Collector.

And this RFC will affect a large proportion of Snowplow users over time - so please do give us your feedback! We’d love to hear your views on the alternative approaches set out above, plus of course any other designs that we might have missed.

5 Likes

Hi Snowplowers, thanks for sharing your views in this RFC!

I think that there’s another weakness in the 3rd approach, which is the introduction of another building block (NSQ) in the Snowplow toolbox. I think that keeping Kinesis has more advantages:

  • It makes it easier for users wanting to move from the batch to the real-time pipeline (or the other way around). This is during the setup and when debugging problems.
  • Having a common setup for the batch and real-time pipelines will make it easier for people to help each other in the Discourse forums and to fix bugs.

Maybe the alternatives 1 and 3 could be merged, and have the SSC and the Kinesis S3 processes running on the same machine. They could also be provided in an AMI. In that case, to scale the solution we would only have to change the number of instances in the autoscaling group and the number of shards.

Thanks again for the openness!

Hey @danisola!

Great comments. A few thoughts:

the introduction of another building block (NSQ) in the Snowplow toolbox

To be clear - we plan on adding NSQ support for Snowplow anyway, to power Snowplow Mini. Snowplow Mini has outgrown our use of Unix named pipes - we need a proper queue running on the box, and NSQ looks to be a good fit for this.

It makes it easier for users wanting to move from the batch to the real-time pipeline (or the other way around). This is during the setup and when debugging problems.

This is a great point.

Maybe the alternatives 1 and 3 could be merged, and have the SSC and the Kinesis S3 processes running on the same machine.

I think this is a credible Option 4 that we should add to the discussion. The main challenge is that it increases the support complexity of the batch pipeline. If I am running this new collector and I am missing events in S3, what’s to blame? Here are some additional failure states:

  • Kinesis outage
  • DynamoDB outage
  • Underprovisioned Kinesis stream
  • Underprovisioned DynamoDB table
  • Too few Kinesis S3 processes for the size of the Kinesis stream

Of course an NSQ-based collector could malfunction too, but because this would occur at the level of an individual server (shared-nothing), it should be possible to add these failure states into the health check, and then the AWS ELB can handle them automatically.

I agree, alternatives 1 & 4 are a bit overkill for batch pipeline users.

By the way, I forgot to mention that we use the real time pipeline but that we also store the events permanently in S3 with a solution similar to ‘Kinesis S3’. I guess this is a requirement that most users of the real time pipeline have.

Alternatives 1 & 4 solve that implicitly. If alternative 3 is chosen, will ‘Kinesis S3’ still be supported and maintained? Another option would be to have the collector write to both Kinesis and NQS, but that could introduce discrepancies between both if something goes wrong.

Of course yes - a Snowplow real-time pipeline user won’t use the new batch collector - they will continue to use the “raw” SSC, populating the Kinesis event stream, feeding Kinesis S3, Stream Enrich etc.

It’s an open question whether the new NSQ S3 sink would be an all-new project, or whether it would be an extension to the existing Kinesis S3 codebase (maybe that project gets renamed “Stream Archiver” or similar).

Makes sense! What format do you store the events in? Is your primary use case (re-)processing the events in Spark, or replaying them back into a new Kinesis stream?

Could you sink to a remote NSQ topic (in addition to a local topic)?

I like the idea of using NSQ over including any additional AWS specific components in the pipeline - any component that is married to AWS makes it harder to migrate onto other platforms.

Sure - we could make this an option. Then we are inching towards a formal NSQ version of the pipeline, which some people might prefer over Kafka, despite its limitations (NSQ is a message queue not a unified log).

This was another factor in our thinking. A batch collector using NSQ might be fairly easy to migrate to Azure, GCP etc (because you would just add Azure Blob Storage, Google Cloud Storage etc as additional back-ends to the NSQ S3 sink).

1 Like

What format do you store the events in?

We currently store them as access logs because historically we used the Cloudfront collector. We might revisit that in order to be able to process POST requests, but in the short term we won’t migrate them to the Thrift schemas.

By the way, do you have any changes planned for the collector payload? Maybe moving them to Avro?

Is your primary use case (re-)processing the events in Spark, or replaying them back into a new Kinesis stream?

We reprocess them with our Spark Enricher that uses scala-common-enrich and that can be run in batch or streaming mode.

Makes sense @danisola -

We’ve thought about it but there’s not a lot of upside in the change - Thrift is very compact, it has tons of language support (we’ve found the Avro support in Golang and Scala to be … disappointing), and the collector payload schema is relatively static so Avro’s schema evolution niceties don’t really come in to play.

Well Alternate 1 sounds good from an architectural point of view, but in my opinion, having a kafka queue instead of kineses could be another option. I am saying this because i feel that overall the snowplow team wants to make it easier to move out of AWS. Having Kinesis might be an easy start but then it will add another propriety component to the pipeline.

Alternative 2 sounds easy but then its too much dependent on the AWS technology and then makes the platform limited, i feel if we go with this approach we will have to again rewrite the collector in some time.

Alternative 3 sounds promising, but then i think adding NSQ will add one more component to the existing toolset. Though i agree that we should use the technology/tools best for the job, i see that writing logs should be handled best by kafka. This will also have cost implications, Kafka can run really well on magnetic spinning disks as its appending logs. I don’t have a lot of experience in running NSQ, but from its architecture it seems like an in memory queue, which means that mostly we would need bigger machines with lots of ram in order to scale things up, this might be required for building a real time pipeline but then those who are fine with delay of one hour (batch pipeline), they will also have to go with bigger machines and it would increase their cost too.

and Big +1 to “But this would be a good opportunity to introduce a file consolidation process, for example to reducing each hour’s 300 files down to a single file, in splittable LZO index format.” This would make the pipeline more independent and would give more control to write logs and pace we want to.

Alternative 1: Sounds like a reliable solution, that helps with unblocking near real-time pipeline. Setup is more complicated, but could be simplified with some tooling. (When it comes for DynamoDB, AFAIR, all is handled by KCL, given that permissions are correctly set)

Alternative 2: Don’t have much experience with those services, but sounds like it’s not as flexible as other proposals.

Alternative 3: Easy setup, but the biggest drawback as you already mentioned is possibility of losing events when chaos monkey struck. We have a similar setup(AutoScaling, 3 instances with SSC, ELB) and in the last year instances were terminated about 4 times.
To minimise the risk it would need to flush events to S3 every minute or so. And recovery from EBS would need to be documented (that adda a bit of complexity to that solution IMO)

Alternative 4 - SSC and s3-sink on the same instances. Very similar to Alternative 1. KCL apps supposedly scales fine based on the number of shards, and even if there’s only one, the processes on other instances would work as a fallback.

I also think that in case of Alternative 3 and 4 the consolidation process would be required and in general it’s a good addition. We already have something like that in place to create daily logs that we then use for reprocessing purposes.

Considering all alternatives the 3 looks like most reasonable. I think it should be also considered how many batch vs realtime pipelines % wise are being used. I would guess someone who is using batch (us included) want to migrate to real-time to some point anyway as long as its not too painful.

As already mentioned decoupling from aws infrastructure is very beneficial going forward, as it would allow bigger snowplow adoption.

@yali @alex do you have a rough timeline for a replacement for this clojure collector? Not looking to hold you to anything but just want a sense of whether it is 1mth or 6-12mths

Hi @johan - it’s a fair question but we can’t share public timelines on the implementation of RFCs.

I’m digging up a rather old thread here, but seemed like the most relevant place. And reviewing this RFC, it looks like we’ve made a lot of progress towards these alternatives.

We recently deployed Alternative (1), and it’s worked pretty well. We did have a couple of surprises that could maybe lead to enhancements in the future.

  1. The SSC requires significantly more compute resources than the CC. We have some very spikey workloads, which have required at least 2-4x the number of instances with the SSC than the CC. Basically, the CPU becomes pegged, and I assume due to processing the incoming JSON into Thrift.

  2. Although, I’d much prefer a binary structured format like Thrift, it’s not well supported by the AWS tools that have been built upon Kinesis (like Kinesis Analytics).

We’re still trying to figure out how best to deal with (1), but adding the two up, I was wondering if it would make sense to migrate (or have the option to migrate) the Thrift encoding step from the SSC to the S3 Loader. Obviously, StreamEnrich depends on the existing behavior, but being able to change may move some heavy processing to a location that is less time sensitive (as the data is already in Kinesis) and enable more tooling options in AWS.

Thanks for sharing @rbolkey - glad Alternative (1) is working well for you.

We have been making some good progress on Alternative (3):

We’ve already added NSQ support to the S3 Loader per:

https://snowplowanalytics.com/blog/2017/09/13/snowplow-s3-loader-0.6.0-released/

You can follow progress adding NSQ support to the SSC and Stream Enrich here:

I didn’t fully follow your arguments against Thrift:

Thrift, it’s not well supported by the AWS tools that have been built upon Kinesis (like Kinesis Analytics) … enable more tooling options in AWS

It’s true that AWS Kinesis tools don’t play nicely with the Thrift, but what’s your use case for working directly with the raw collector payloads? Processing them with Stream Enrich (decoding, validation, enrichment, etc) is always our first step.

move some heavy processing to a location that is less time sensitive

We have to serialize the collector payload using some schema technology. If it’s not binary like Thrift, it’s probably going to be something like JSON, which will be significantly slower.

@alex not arguing against Thrift in any strong way.

(1) Performance: That comment was based on our initial experience with the SSC over the past week. The CC was able to handle much more load than the SSC. I had concerns that the heavy CPU usage was due to data serialization, but this appears to be the result of us not provisioning enough capacity in Kinesis (we only had 8 shards … may need something more like 48 for our peak capacity) and must be consistent retry attempts.

(2) Kinesis Tools: Not a hardened use case. After setting up the collector, Kinesis Analytics was the quickest way for us to see that everything looked good in the pipe and that we’re getting what we expected to see.

1 Like

Thanks for clarifying @rbolkey - all makes sense…