Our new Hadoop Event Recovery project (documentation) lets you fix up Snowplow bad rows and make them ready for reprocessing, by writing your own custom JavaScript to execute on each bad row.
While this is a powerful tool, using it can be quite involved. This tutorial walks you through one common use case for event recovery: where some of your events failed validation because you forgot to upload a particular schema. Let’s get started.
Refresher on Snowplow bad rows
Snowplow bad rows look like this:
{
"line": "2015-06-09\t09:56:09\tNRT12\t831\t211.14.8.250\tGET...",
"errors": [{
"level": "error",
"message": "Could not find schema with key iglu:com.acme/myevent/jsonschema/1-0-0 in any repository"
}]
}
This is the structure that Hadoop Event Recovery will be working on.
Writing our function
The Hadoop Event Recovery jar will extract the “line” string (containing the original raw event) and an array of all the error messages which describe why the line failed validation, and pass them to your JavaScript function.
We need to write a JavaScript function called process
which accepts two arguments: the raw line string and the error message array. The function should always return either null
(signifying that the bad row should be ignored, not recovered) or a string which will be used as the new bad row. Your JavaScript can define other top-level functions besides as process
. We also provide several built-in JavaScript functions for you to call - for more detail check out the documentation.
Remember, we want to recover events where the failure message involved a missing schema. The JavaScript function looks like this:
function process(event, errors) {
for (var i = 0; i < errors.length; i++) {
if (! /Could not find schema with key/.test(errors[i])) {
return null;
}
}
return event;
}
We only want to reprocess events which only failed due to the missing schema, so we return null
(meaning that we won’t recover the event) if any of the error messages is not a missing schema error. Otherwise we return the original raw line, which should now pass validation because we have uploaded the schema.
Next we use base64encode.org to encode our script:
ZnVuY3Rpb24gcHJvY2VzcyhldmVudCwgZXJyb3JzKSB7DQogICAgZm9yICh2YXIgaSA9IDA7IGkgPCBlcnJvcnMubGVuZ3RoOyBpKyspIHsNCiAgICAgICAgaWYgKCEgL0NvdWxkIG5vdCBmaW5kIHNjaGVtYSB3aXRoIGtleS8udGVzdChlcnJvcnNbaV0pKSB7DQogICAgICAgICAgICByZXR1cm4gbnVsbDsNCiAgICAgICAgfQ0KICAgIH0NCiAgICByZXR1cm4gZXZlbnQ7DQp9
Running the Hadoop Event Recovery job
Now we ready to run the job using the AWS CLI:
$ aws emr create-cluster --applications Name=Hadoop --ec2-attributes '{
"InstanceProfile":"EMR_EC2_DefaultRole",
"AvailabilityZone":"{{...}}",
"EmrManagedSlaveSecurityGroup":"{{...}}",
"EmrManagedMasterSecurityGroup":"{{...}}"
}' --service-role EMR_DefaultRole --enable-debugging --release-label emr-4.3.0 --log-uri 's3n://{{path to logs}}' --steps '[
{
"Args":[
"--src",
"s3n://{{my-output-bucket/enriched/bad}}/",
"--dest",
"hdfs:///local/monthly/",
"--groupBy",
".*(run)=2014.*",
"--targetSize",
"128",
"--outputCodec",
"lzo"
],
"Type":"CUSTOM_JAR",
"ActionOnFailure":"TERMINATE_CLUSTER",
"Jar":"/usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar",
"Name":"Combine Months"
},
{
"Args":[
"com.snowplowanalytics.hadoop.scalding.SnowplowEventRecoveryJob",
"--hdfs",
"--input",
"hdfs:///local/monthly/*",
"--output",
"hdfs:///local/recovery/",
"--inputFormat",
"bad",
"--script",
"VHlwZSAob3IgcGFzdGUpIGhlcmUuLi5mdW5jdGlvbiBwcm9jZXNzKGV2ZW50LCBlcnJvcnMpIHsNCglmb3IgKHZhciBpID0gMDsgaSA8IGVycm9ycy5sZW5ndGg7IGkrKykgew0KCQlpZiAoISAvQ291bGQgbm90IGZpbmQgc2NoZW1hIHdpdGgga2V5Ly50ZXN0KGVycm9yc1tpXSkpIHsNCgkJCXJldHVybiBudWxsOw0KCQl9DQoJfQ0KCXJldHVybiBldmVudDsNCn0="
],
"Type":"CUSTOM_JAR",
"ActionOnFailure":"CONTINUE",
"Jar":"s3://snowplow-hosted-assets/3-enrich/hadoop-event-recovery/snowplow-hadoop-event-recovery-0.2.0.jar",
"Name":"Fix up bad rows"
},
{
"Args":[
"--src",
"hdfs:///local/recovery/",
"--dest",
"s3n://{{my-recovery-bucket/recovered}}"
],
"Type":"CUSTOM_JAR",
"ActionOnFailure":"TERMINATE_CLUSTER",
"Jar":"/usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar",
"Name":"Back to S3"
}
]' --name 'MyCluster' --instance-groups '[
{
"InstanceCount":1,
"InstanceGroupType":"MASTER",
"InstanceType":"m1.medium",
"Name":"MASTER"
},
{
"InstanceCount":2,
"InstanceGroupType":"CORE",
"InstanceType":"m1.medium",
"Name":"CORE"
}
]'
There are a couple of things to note about this command. First, the placeholders in curly brackets should be replaced with actual S3 paths. Second, the --groupBy
argument’s value of .*(run)=2014.*
means that only bad rows from 2014 will be considered for recovery. For more information on how to control the range of input bad rows using the --groupBy
argument, please see the wiki page.
Start this job and then keep an eye on the job in the EMR console.
Before we re-process our recovered events
Assuming the job completes successfully, we now have the fixed-up raw events available in s3://my-recovery-bucket/recovered
for reprocessing. We now need to process them through the Snowplow batch pipeline.
Before we start, double check that the missing schemas are now available from one of the Iglu registries in your resolver! Otherwise this event recovery process will be unsuccessful.
Next, we need to ensure that no other jobs are running - we don’t want a conflict between our recovery job and our regular batch pipeline runs. If you have the regular job running on a frequent cron, it’s a good idea to disable it for the duration. Don’t forget to re-enable it afterwards.
Processing the recovered events through Snowplow
We are now ready to kick off the batch pipeline.
Create a copy of your regular config.yml
file, calling it config-recovery.yml
or similar. You need to update the aws.s3.buckets.raw section
of your EmrEtlRunner configuration file so it looks something like this:
raw:
in:
- s3://does-not-exist # The "in" section will be ignored
processing: s3://{{my-recovery-bucket/recovered}}
Now, you can run EmrEtlRunner using your new config-recovery.yml
and with the --skip staging
option, since the data is already in the processing bucket. The reason for treating the recovery bucket as the processing location and skipping staging is explained in the documentation.
This pipeline run should complete fine - take a note of the run=
ID of the recovery run (as seen in the archive folders) so that you can distinguish this run from regular pipeline runs in the future.
And that’s it! We’ve successfully recovered Snowplow events which ended up in bad rows due to a missing schema.
If you have a different event recovery challenge, do please create a new Discourse thread and we’ll be happy to brainstorm it there!