Not sure where to grab the step names or locations from the current running config. I am pulling from an s3 bucket resulting from S3 Loader gzipping from scala stream enricher.
@dbuscaglia, to convert from EmrEtlRunner (running in Stream Enrich mode as per your earlier topics) to DataflowRunner, your playbook could be utilizing the following jars:
-
s3-dist-cp.jar
from AWS (see their wiki). You can use this utility to move files between buckets (we use it in latest versions of EmrEtlRunner) -
s3://snowplow-hosted-assets/4-storage/rdb-shredder/snowplow-rdb-shredder-0.13.0.jar
to shred your data -
s3://snowplow-hosted-assets/4-storage/rdb-loader/snowplow-rdb-loader-0.14.0.jar
to load your data to Redshift
Note: you might need to adjust the jar location according to the region the buckets are in. For example, if us-east-1
is used the RDB loader jar would be s3://snowplow-hosted-assets-us-east-1/4-storage/rdb-loader/snowplow-rdb-loader-0.14.0.jar
. You might also want to keep an eye on the latest versions of the above apps.
Your DataflowRunner playbook would contain the following steps replicating the EmrEtlRunner steps from bath pipeline dataflow diagram (assuming run
mode for persistent EMR cluster).
- Stage files from
enriched:stream
toenriched:good
bucket (with s3DistCp utility) - Shred files (to place shredded files into
shredded:good
bucket) - Load data to Redshift
- Archive
enriched:good
toenriched:archive
bucket (with s3DistCp utility) - Archive
shredded:good
toshredded:archive
bucket (with s3DistCp utility)
Thank you very much @ihor
Hi @ihor? I was wondering if you had any examples of how to provision the rdb shredder. I found this example here but its for the hadoop shred which i assume is out of date.
@NikoGomez-PIV, the latest shredder jar is in s3://snowplow-hosted-assets/4-storage/rdb-shredder/snowplow-rdb-shredder-0.14.0.jar
. Here’s an example of usage:
{
"type": "CUSTOM_JAR",
"name": "RDB Shredder: shred enriched events for Redshift",
"actionOnFailure": "CANCEL_AND_WAIT",
"jar": "command-runner.jar",
"arguments": [
"spark-submit",
"--class", "com.snowplowanalytics.snowplow.storage.spark.ShredJob",
"--master", "yarn",
"--deploy-mode", "cluster",
"s3://snowplow-hosted-assets/4-storage/rdb-shredder/snowplow-rdb-shredder-0.14.0.jar",
"--iglu-config",
"{{base64File \"/path_to/iglu_resolver.json\"}}",
"--input-folder","s3a://output-bucket/enriched/good/run={{timeWithFormat .epochTime \"2006-01-02-15-04-05\"}}/",
"--output-folder","s3a://output-bucket/shredded/good/run={{timeWithFormat .epochTime \"2006-01-02-15-04-05\"}}/",
"--bad-folder","s3a://output-bucket/shredded/bad/run={{timeWithFormat .epochTime \"2006-01-02-15-04-05\"}}/"
]
}
Thanks, your the best!
Sorry @ihor after shredding we’re stuck on rdb-loading…is there an example config for this step too? We are confused as the step is a scala application not a hadoop or spark job. This is our guess config:
{
"type": "CUSTOM_JAR",
"name": "rdb load step",
"actionOnFailure": "CANCEL_AND_WAIT",
"jar": "s3://snowplow-hosted-assets/4-storage/rdb-loader/snowplow-rdb-loader-0.14.0.jar",
"arguments": [
"--target",
"{{base64File "./configs/targets/redshift.conf"}}",
"--resolver",
"{{base64File "./configs/resolver.json"}}",
"--folder",
"s3n://piv-stream-data-prod-bucket/shredded/good/run={{timeWithFormat .epochTime "2006-01-02-15-04-05"}}/",
"--logkey",
"s3n://piv-stream-data-prod-bucket/log/rdb-loader/"
]
}
@NikoGomez-PIV, the app options are described here: https://github.com/snowplow/snowplow/wiki/Relational-Database-Loader#3-usage . A typical step in the playbook would look like this:
{
"type": "CUSTOM_JAR",
"name": "Load to Redshift Storage Target",
"actionOnFailure": "CANCEL_AND_WAIT",
"jar": "s3://snowplow-hosted-assets/4-storage/rdb-loader/snowplow-rdb-loader-0.14.0.jar",
"arguments": [
"--config",
"{{base64File \"path_to/config.yml\"}}",
"--resolver",
"{{base64File \"path_to/iglu_resolver.json\"}}",
"--target",
"{{base64File \"path_to/targets/redshift.json\"}}"
]
}
Thanks again!