Hi everyone! I’m new to Snowplow and am working on RDB Shredder piece of the AWS real time pipeline.
When I run the dataflow runner job via ./dataflow-runner run-transient --emr-config dataflow-runner-cluster.json --emr-playbook play book.json
the cluster starts up successfully and the S3DistCP step of the playbook succeeds. However, the second step fails, causing the cluster to terminate.
I’m not really sure what the problem is, and I’m at a bit of a loss because I believe I’ve used the standard template versions of the playbook and cluster files and only customized the fields called out in the documentation. For reference, below are all my related config files. Please let me know if anything looks off or if there are any known configuration issues that may cause the shredder step to not to succeed.
Playbook.json:
{
"schema": "iglu:com.snowplowanalytics.dataflowrunner/PlaybookConfig/avro/1-0-1",
"data": {
"region": "us-east-1",
"credentials": {
"accessKeyId": "{{my access key}}",
"secretAccessKey": "{{my secret key}}"
},
"steps": [
{
"type": "CUSTOM_JAR",
"name": "S3DistCp enriched data archiving",
"actionOnFailure": "CANCEL_AND_WAIT",
"jar": "/usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar",
"arguments": [
"--src", "s3://snowplow/enriched/",
"--dest", "s3://snowplow/archive/enriched/run={{nowWithFormat "2006-01-02-15-04-05"}}/",
"--s3Endpoint", "s3.us-east-1.amazonaws.com",
"--srcPattern", ".*",
"--outputCodec", "gz",
"--deleteOnSuccess"
]
},
{
"type": "CUSTOM_JAR",
"name": "RDB Shredder",
"actionOnFailure": "CANCEL_AND_WAIT",
"jar": "command-runner.jar",
"arguments": [
"spark-submit",
"--class", "com.snowplowanalytics.snowplow.shredder.batch.Main",
"--master", "yarn",
"--deploy-mode", "cluster","s3://snowplow-hosted-assets-us-east-1/4-storage/rdb-shredder/snowplow-rdb-shredder-1.0.0.jar",
"--iglu-config", "{{base64File "./iglu_resolver.json"}}",
"--config", "{{base64File "./rdb-loader-config.hocon"}}"
]
}
],
"tags": [ ]
}
}
Cluster.json:
{
"schema": "iglu:com.snowplowanalytics.dataflowrunner/ClusterConfig/avro/1-1-0",
"data": {
"name": "RDB Shredder",
"logUri": "s3://snowplow/logs/",
"region":"us-east-1",
"credentials": {
"accessKeyId": "{{my access key}}",
"secretAccessKey": "{{my secret key}}"
},
"roles": {
"jobflow": "EMR_EC2_DefaultRole",
"service": "EMR_DefaultRole"
},
"ec2": {
"amiVersion": "6.2.0",
"keyName": "{{my key name}}",
"location": {
"vpc": {
"subnetId": "{{my subnet ID}}"
}
},
"instances": {
"master": {
"type": "m4.large",
"ebsConfiguration": {
"ebsOptimized": true,
"ebsBlockDeviceConfigs": [
]
}
},
"core": {
"type": "r4.xlarge",
"count": 1
},
"task": {
"type": "m4.large",
"count": 0,
"bid": "0.015"
}
}
},
"tags": [ ],
"bootstrapActionConfigs": [ ],
"configurations": [
{
"classification":"core-site",
"properties":{
"Io.file.buffer.size":"65536"
},
"configurations":[
]
},
{
"classification":"yarn-site",
"properties":{
"yarn.nodemanager.resource.memory-mb":"57344",
"yarn.scheduler.maximum-allocation-mb":"57344",
"yarn.nodemanager.vmem-check-enabled":"false"
},
"configurations":[
]
},
{
"classification":"spark",
"properties":{
"maximizeResourceAllocation":"false"
},
"configurations":[
]
},
{
"classification":"spark-defaults",
"properties":{
"spark.executor.memory":"7G",
"spark.driver.memory":"7G",
"spark.driver.cores":"3",
"spark.yarn.driver.memoryOverhead":"1024",
"spark.default.parallelism":"24",
"spark.executor.cores":"1",
"spark.executor.instances":"6",
"spark.yarn.executor.memoryOverhead":"1024",
"spark.dynamicAllocation.enabled":"false"
},
"configurations":[
]
}
],
"applications": [ "Hadoop", "Spark" ]
}
}
Standard iglu_resolver.json file:
{
"schema": "iglu:com.snowplowanalytics.iglu/resolver-config/jsonschema/1-0-1",
"data": {
"cacheSize": 500,
"repositories": [
{
"name": "Iglu Central",
"priority": 0,
"vendorPrefixes": [ "com.snowplowanalytics" ],
"connection": {
"http": {
"uri": "http://iglucentral.com"
}
}
}
]
}
}
loader-config.hocon:
{
# Human-readable identificator, can be random
"name": "SP-Redshift",
# Machine-readable unique identificator, must be UUID
"id": "{{my uuid}}",
# Data Lake (S3) region
"region": "us-east-1",
# SQS topic name used by Shredder and Loader to communicate
"messageQueue": "{{my queue name}}",
# Shredder-specific configs
"shredder": {
"type": "batch",
# Path to enriched archive (must be populated separately with run=YYYY-MM-DD-hh-mm-ss directories)
"input": "s3://snowplow/archive/enriched/",
# Path to shredded output
"output": {
"path": "s3://snowplow/archive/shredded/",
# Shredder output compression, GZIP or NONE
"compression": "GZIP"
}
},
# Schema-specific format settings (recommended to leave all three groups empty and use TSV as default)
"formats": {
# Format used by default (TSV or JSON)
"default": "TSV",
# Schemas to be shredded as JSONs, corresponding JSONPath files must be present. Automigrations will be disabled
"json": [ ],
# Schemas to be shredded as TSVs, presence of the schema on Iglu Server is necessary. Automigartions enabled
"tsv": [ ],
# Schemas that won't be loaded
"skip": [ ]
},
# Warehouse connection details
"storage" = {
# Database, redshift is the only acceptable option
"type": "redshift",
# Redshift hostname
"host": "{{my redshift endpoint}}",
# Database name
"database": "{{database name}}",
# Database port
"port": 5439,
# AWS Role ARN allowing Redshift to load data from S3
"roleArn": "arn:aws:iam::490975147635:role/aws-service-role/redshift.amazonaws.com/AWSServiceRoleForRedshift",
# DB schema name
"schema": "atomic",
# DB user with permissions to load data
"username": "{{storage loader username}}",
# DB password
"password": "{{storage loader password}}",
# Custom JDBC configuration
"jdbc": {"ssl": true},
# MAXERROR, amount of acceptable loading errors
"maxError": 10
},
# Additional steps. analyze, vacuum and transit_load are valid values
"steps": ["analyze"],
# Observability and logging opitons
"monitoring": {
# Snowplow tracking (optional)
"snowplow": null,
# Sentry (optional)
"sentry": null
}
}