We’re tremendously excited to release Snowplow RDB Loader 1.0.0. This release marks stability of the new architecture featuring independent RDB Loader and adds a new experimental Stream Shredder component.
RDB Loader
In R35 we made RDB Loader an independent task, running outside of EMR cluster. Since then we were testing the new architecture and gathering community feedback. Among overall positive results we’ve also identified a few areas for improvement.
Manifest table
The most important improvement in 1.0.0 is double-loading protection. With microservice architecture and message queues, duplicates are a very common problem, even with tiny throughput. In order to keep the track of folders that have been loaded, in 1.0.0 we’ve introduced a new manifest
table. This table closely resembles an old one with the same name, which has been deprecated in R33. Unlike legacy table, the new one is designed with real-time architecture in mind and contains rich metadata about every loaded batch of data. But what is most important - RDB Loader coordinates loading with the manifest and no folder can be loaded twice.
Also, loading is happening in a transaction block, which means this is a true exactly-once delivery semantics and no folders can be loaded twice, unless its record manually deleted from the table.
Message acknowledgement
Another related change is more predictable SQS message acknowledgement.
In R35 we’ve made an attempt to be as precise as possible in acknowledgement strategy and deliberately left several scenarios where message remains unacknowledged after a loading failure.
It meant that a pipeline operator would have to fix the loading problem (such as incorrect schema) or manually delete the message in order to unblock loading. While at some cases this strategy remains preferable, overall it makes it hard to figure out if message has been acknowledged and needs to be sent again or the whole pipeline is blocked.
Our new strategy is simply trying to acknowledge the message in all circumstances.
If loading fails - an ERROR
message is being printed to stderr channel or Loader crashes, acking the message. If the failure is transient, a pipeline’s operator would have to send content of shredding_complete.json
file from corresponding folder to SQS queue.
Stream Shredder Beta
RDB Shredder (and its predecessor - Hadoop Shred job) since inception has been a batch job, designed to work on EMR cluster with bounded dataset from S3 or HDFS. This architecture has been battle-tested and served us well many years, but still has several flaws:
- Scalability - although everything in Hadoop ecosystem designed with scalability in mind, it also requires some expertise to configure an EMR cluster for a specific workload, especially if it needs to be done in real-time, responding to suddenly increased traffic
- Latency - if Spark Shredder has started processing a big batch of data - there’s no way to interrupt it and ask RDB Loader to load whatever has been processed. In other words, the pipeline is often blocked by a long-running shredding step.
- Small pipelines - not everyone needs to process terabytes of data per day and Spark brings a significant overhead for low-volume pipelines
Unlike existing Spark Shredder, the Stream Shredder reads data directly from enriched Kinesis stream and does not use Spark (neither EMR) - it’s a plain JVM application, like Stream Enrich or S3 Loader.
Reading directly from Kinesis means that the Shredder can bypass long and error-prone S3DistCp staging/archiving steps. Another benefit is that it doesn’t work with bounded dataset anymore and can “emit” shredded folders based only on specified frequency.
With frequency-based emission in Shredder, the pipeline loading frequency is limited only by Redshift capabilities - on low-volume test pipelines we managed to load data as often as every two minutes.
WARNING Despite RDB Loader as an architecture reached its maturity and Stream Shredder has same 1.0.0 version it is not considered production-ready application yet.
Known limitations
- Stream Shredder does not have deduplication logic (neither in-batch nor cross-batch)
- Stream Shredder hasn’t been tested in multi-node environment and we do anticipate race conditions when multiple KCL workers used
Other changes
The most important change since the last release is the new partitining scheme.
In R35 we got rid of atomic-events
, shredded-types
and shredded-tsv
folders and made paths consistent across all shredded types.
In 1.0.0 we made Shredder output (“good” and “bad”) consistent with rest of paritioning - archive does not have anymore two distinct good
and bad
paths, but only single output
path, where every folder contains both kinds of output. For example:
run=2021-03-29-15-40-30/
output=good/
vendor=com.snowplowanalytics.snowplow/
name=atomic/
format=tsv/
model=1/
vendor=com.acme/
name=link_click/
format=json/
model=1/
output=bad/
vendor=com.snowplowanalytics.snowplow/
name=loader_parsing_error/
format=json/
model=1/
shredding_complete.json
New versioning scheme
In R35 we unified versions of all components (Loader and Shredder) and used a single version - 0.18.0.
In this release we’re also dropping umbrella R-based versioning and just use a single version to refer to set of apps and every individual app.
Upgrade
Upgrade guide is available on our docs website: