In Snowplow R86 Petra we released DynamoDB-powered cross-batch natural depulication, which effectively eliminated the problem of duplicates for the vast majority of Snowplow pipelines. A powerful new feature, this did also introduce some additional complexity around failed pipeline recovery, which Snowplow pipeline operators should be aware of.
In this post we will explain how to safely recover a Snowplow pipeline with cross-batch deduplication enabled, introducing a new Spark app we have written, Event Manifest Cleaner v0.1.0.
1. Recap on cross-batch event deduplication
Cross-batch event deduplication involves Spark Shred extracting two event properties which precisely identify an event (event_id
and event_fingerprint
), plus the etl_tstamp
which identifies the pipeline run, and storing these three properties (the “deduplication triple”) in persistent DynamoDB storage. These three properties together let us build a simple and efficient algorithm to maintain state between loads and identify duplicates using AWS DynamoDB. The algorithm works as follows:
- If an element with the same
event_id
andevent_fingerprint
but differentetl_tstamp
exists in storage, then we found a duplicate from an older run, and the event will be silently dropped from the Spark Shred output - If element with same
event_id
, sameevent_fingerprint
and sameetl_tstamp
exists in storage - we’re reprocessing same enriched folder (due to a previous run failure), and the event should not be marked as duplicate - All other combinations as well as previous one (e.g. same
event_id
differentevent_fingerprint
) have no effect on the Spark Shred output
The following figure illustrates the different scenarios:
In the above, we see that algorithm has an effect, in the form of dropping data, if and only if the same event_id
, same event_fingerprint
and different etl_tstamp
were found in DynamoDB table.
2. Impact on pipeline recovery
The problem with the above arises if you follow the standard recovery process, involving deleting enriched and shredded good data and reprocessing raw events starting from enrich
step.
The issue is that the etl_tstamp
is assigned when EmrEtlRunner launches the Spark Enrich job. Imagine the following scenario:
- Spark Shred fails for processing folder X, but managed to write some events to DynamoDB table with
etl_tstamp
A - Pipeline operator tries to reprocess same folder X
- New
etl_tstamp
B is assigned to all events in new pipeline run - Cross-batch reduplication drops events whose
event_id
’s andevent_fingerprint
s are already in DynamoDB, but accompanied withetl_tstamp
A - These events have not made it into Redshift in either run
The following figure illustrates this:
3. Introducing Event Manifest Cleaner
To address this problem, we have released Event Manifest Cleaner, a Spark job that cleans up a particular ETL’s events from DynamoDB table.
Event Manifest Cleaner takes as input the path to the enriched folder that failed processing, and then deletes from DynamoDB those records which match the events found in input folder. The interface of this Spark job closely resembles the Event Manifest Populator, but deletes data from the manifest instead of inserting it.
There are three scenarios we need to discuss:
- Spark Shred failed, and processing has been paused pending recovery
- Spark Shred failed, but the enriched folder has already been deleted
- Spark Shred failed, but further runs have already happened
3.1 Processing has been paused pending recovery
The least serious scenario, where the pipeline failed at shredding, and the pipeline operator immediately realized that Spark Shred already populated the manifest with some data.
The pipeline operator needs to run Event Manifest Cleaner pointing it to single remaining folder in enriched.good
path from config.yml
:
$ python run.py run_emr s3://snowplow-acme/enriched/good/run=2017-08-17-22-10-30/ /path/to/dynamodb_config.json /path/to/iglu_resolver.json
Here s3://snowplow-acme/enriched/good/run=2017-08-17-22-10-30/
is the full path to enriched folder that failed to shred and the other two arguments are the usual duplicate storage config and Iglu resolver config.
Here are the effects of running this:
After this, the pipeline can be resumed as per our Troubleshooting Guide.
3.2 Enriched folder has already been deleted
Spark Shred failed, but unfortunately the pipeline operator did not realize that he or she needs to take extra steps to recover it - and instead, simply deleted folders from enriched and shredded locations. Data was not loaded into Redshift either.
The deleted folder makes recovery problematic because there’s now no single source of truth to provide to Event Manifest Cleaner - the enriched folder that was re-processed has a new etl_tstamp
and therefore cannot be used.
In this case you can provide an additional --time
option to Event Manifest Cleaner, which specifies the timestamp of the first failed run (not the run with the same data that eventually succeeded). The timestamp can be retrieved from EmrEtlRunner logs or EMR console, and has the exact format as in enriched folders.
Here is an example:
$ python run.py run_emr s3://snowplow-acme/archive/enriched/run=2017-08-17-22-10-30/ /path/to/dynamodb_config.json /path/to/iglu_resolver.json --time 2017-08-17-22-48-00
Here, s3://snowplow-acme/archive/enriched/run=2017-08-17-22-10-30/
is the enriched folder that was successfully processed and archived to archive/enriched
S3 location. Event Manifest Cleaner will only use this data to retrieve the affected event_id
s and event_fingerprint
s - the etl_tstamp
will be taken from the --time 2017-08-17-22-48-00
argument.
As next step, the pipeline operator will need to:
- Delete the
s3://snowplow-acme/archive/enriched/run=2017-08-17-22-10-30/
folder - Delete the
s3://snowplow-acme/archive/shredded/run=2017-08-17-22-10-30/
folder - Place the
s3://snowplow-acme/archive/enriched/run=2017-08-17-22-10-30/
folder intoprocessing
location - Resume the pipeline as per Troubleshooting Guide
3.3 Further runs have already happened
Spark Shred failed, the pipeline operator did not realize they need to take extra-steps to recover it and simply deleted folders from enriched and shredded locations. Then, the pipeline was restarted and data was loaded to Redshift. The problem is that this load remains incomplete as some part of dataset was filtered out by cross-batch deduplication algorithm, as explained in the problem description above.
This is the most complex scenario and it can be discovered days and weeks after an incomplete dataset was loaded into database. AWS provides several tools (explained below) to help you ensure that this scenario never happens, but if it’s too late - well, Snowplow never deletes your data and you should be able to recover it in Redshift:
- Pause your pipeline. We don’t want to make state even more inconsistent
- Identify the timestamp for the failed run. This can be done through EmrEtlRunner logs or EMR console
- Identify the timestamp for the incomplete run. This is a recovery pipeline launched right after failed one and processing same dataset. It should visibile as a huge spike in “Conditional Write Failures” in the AWS DynamoDB console
- Convert this timestamp from Snowplow format (e.g.
2017-07-20-18-47-06
) to the Redshift format (e.g.2017-07-20 18:47:06.874
. You will have to check inatomic.events
in Redshift to retrieve the milliseconds portion of the timestamp - Delete data from all shredded Redshift tables:
DELETE atomic.com_snowplowanalytics_snowplow_change_form_1 FROM atomic.com_snowplowanalytics_snowplow_change_form_1 as s JOIN atomic.events as e ON s.root_id = e.event_id WHERE e.etl_tstamp = '2017-07-20 18:47:06.874';
- Delete atomic data from Redshift:
DELETE FROM atomic.events WHERE etl_tstamp = '2017-07-20 18:47:06.874’
- Launch the Manifest Cleaner:
$ python run.py run_emr s3://snowplow-acme/archive/enriched/run=2017-08-17-22-10-30/ /path/to/dynamodb_config.json /path/to/iglu_resolver.json --time 2017-08-17-22-48-00
- Run the pipeline against the raw data as per Troubleshooting Guide.
4. Detecting incomplete loads
While this recovery process is something every pipeline operator should be aware of, there’s also a way to have more confidence that your data is never being silently lost due to cross-batch de-duplication issues.
When Spark Shred finds a record in DynamoDB that needs to be marked as a duplicate (whether it is a real duplicate or an event left from a failed run), it raises ConditionalCheckFailedException
, which DynamoDB keeps track of and allows to monitor using CloudWatch alarms.
If you have 1,000 duplicates on a typical run and the number of conditional write failures suddenly spikes to 400,000 - you have a clear sign that almost 400,000 events have been mistakenly marked as duplicates.
To be notified about cases like this you can create a CloudWatch alarm:
$ aws cloudwatch put-metric-alarm \
--alarm-name DeduplicationInconsistentStateAlarm \
--alarm-description "Alarm when unexpectedly high amount of events marked as duplicates" \
--namespace AWS/DynamoDB \
--metric-name ConditionalCheckFailedRequests \
--dimensions Name=TableName,Value=snowplow-event-manifest \
--statistic Sum \
--threshold 10000 \
--comparison-operator GreaterThanThreshold \
--period 60 \
--unit Count \
--evaluation-periods 1 \
--alarm-actions arn:aws:sns:us-east-1:123456789012:deduplication-alarm
This creates a CloudWatch alarm for incidents where the total amount of conditional check failures reaches 10000 per one minute. There’s no silver bullet for calculating threshold, but from our experience 5% of event volume in average load is a reasonable metric.
5. A warning
In our experience, EmrEtlRunner logs are not always reliable enough to recognize Shred job failure. Particularly, we have occasionally encountered the following picture in logs:
- 1. Elasticity Scalding Step: Enrich Raw Events: COMPLETED ~ 01:15:04 [2017-07-20 14:52:26 +0000 - 2017-07-20 16:07:31 +0000]
- 2. Elasticity S3DistCp Step: Enriched HDFS -> S3: COMPLETED ~ 00:01:50 [2017-07-20 16:07:33 +0000 - 2017-07-20 16:09:23 +0000]
- 3. Elasticity S3DistCp Step: Enriched HDFS _SUCCESS -> S3: FAILED ~ 00:00:40 [2017-07-20 16:09:25 +0000 - 2017-07-20 16:10:06 +0000]
- 4. Elasticity Scalding Step: Shred Enriched Events: CANCELLED ~ elapsed time n/a [2017-07-20 16:10:06 +0000 - ]
- 5. Elasticity S3DistCp Step: Raw S3 Staging -> S3 Archive: CANCELLED ~ elapsed time n/a [ - ]
- 6. Elasticity S3DistCp Step: Shredded HDFS -> S3: CANCELLED ~ elapsed time n/a [ - ]):
These logs imply that Spark Shred was cancelled and that therefore no data was written to DynamoDB. However, data in DynamoDB console can reveal a different truth, with the Write Throughput chart showing that the job actually did start writing to DynamoDB. This means that recovery steps must be taken.
Use DynamoDB console or a CloudWatch alarm as source of truth regarding data coming into the event manifest to power cross-batch de-duplication.
6. Conclusion
Cross-batch deduplication is a powerful and popular feature of the Snowplow pipeline - operating it safely requires some specific care and attention.
This post has introduced the various failure scenarios you might encounter, provided ways of monitoring the status of the manifest, and documented a new Event Manifest Cleaner Spark job which makes recovery much safer and more ergonomic.