Here’s the beginnings of a Dataflow Runner playbook that should help you guys:
{
"schema": "iglu:com.snowplowanalytics.dataflowrunner/PlaybookConfig/avro/1-0-1",
"data": {
"region": "eu-west-1",
"credentials": {
"accessKeyId": "...",
"secretAccessKey": "..."
},
"steps": [
{
"type": "CUSTOM_JAR",
"name": "S3DistCp Step: Enriched events -> staging S3",
"actionOnFailure": "CANCEL_AND_WAIT",
"jar": "/usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar",
"arguments": [
"--src","s3n://path-to-kinesis-s3-sink-output/",
"--dest","s3n://my-working-bucket/enriched/good/run={{timeWithFormat .epochTime "2006-01-02-15-04-05"}}/",
"--s3Endpoint","s3-eu-west-1.amazonaws.com",
"--deleteOnSuccess"
]
},
{
"type": "CUSTOM_JAR",
"name": "Hadoop Shred: shred enriched events for Redshift",
"actionOnFailure": "CANCEL_AND_WAIT",
"jar": "s3://snowplow-hosted-assets/3-enrich/scala-hadoop-shred/snowplow-hadoop-shred-0.11.0-rc2.jar",
"arguments": [
"com.snowplowanalytics.snowplow.enrich.hadoop.ShredJob",
"--hdfs",
"--iglu_config",
"{{base64File "/snowplow-config/iglu_resolver.json"}}",
"--duplicate_storage_config",
"{{base64File "/snowplow-config/targets/duplicate_dynamodb.json"}}",
"--input_folder","s3n://my-working-bucket/enriched/good/run={{timeWithFormat .epochTime "2006-01-02-15-04-05"}}/",
"--output_folder","s3n://my-working-bucket/shredded/good/run={{timeWithFormat .epochTime "2006-01-02-15-04-05"}}/",
"--bad_rows_folder","s3n://my-working-bucket/shredded/bad/run={{timeWithFormat .epochTime "2006-01-02-15-04-05"}}/"
]
}
],
"tags": [
]
}
}
Invoked with:
--vars epochTime,`date +%s`
```