Hi,
we are running the shredder 0.19.0 (s3://snowplow-hosted-assets-eu-central-1/4-storage/rdb-shredder/snowplow-rdb-shredder-0.19.0.jar) with the following emr.config:
{
"schema": "iglu:com.snowplowanalytics.dataflowrunner/ClusterConfig/avro/1-1-0",
"data": {
"name": "com.myapp",
"logUri": "LOGURI",
"region": "eu-west-1",
"credentials": {
"accessKeyId": "AWS_ACCESS_KEY_ID",
"secretAccessKey": "AWS_SECRET_ACCESS_KEY"
},
"roles": {
"jobflow": "EMR_EC2_DefaultRole",
"service": "EMR_DefaultRole"
},
"ec2": {
"amiVersion": "6.2.0",
"instances": {
"core": {
"count": 1,
"type": "r5.xlarge"
},
"master": {
"ebsConfiguration": {
"ebsBlockDeviceConfigs": [],
"ebsOptimized": true
},
"type": "m4.large"
},
"task": {
"bid": "0.015",
"count": 0,
"type": "m4.large"
}
},
"keyName": "EMR_ECS_KEY_PAIR",
"location": {
"vpc": {
"subnetId": "AWS_SUBNET_PUBLIC_ID"
}
}
},
"tags": [
{
"key": "client",
"value": "com.myapp"
},
{
"key": "job",
"value": "main"
}
],
"bootstrapActionConfigs": [],
"configurations": [
{
"classification": "spark",
"configurations": [],
"properties": {
"maximizeResourceAllocation": "false"
}
},
{
"classification": "spark-defaults",
"configurations": [],
"properties": {
"spark.default.parallelism": "8",
"spark.driver.maxResultSize": "0",
"spark.driver.cores": "1",
"spark.driver.memory": "9G",
"spark.dynamicAllocation.enabled": "false",
"spark.executor.cores": "1",
"spark.executor.instances": "2",
"spark.executor.memory": "9G",
"spark.yarn.driver.memoryOverhead": "1024",
"spark.yarn.executor.memoryOverhead": "1024"
}
},
{
"classification": "yarn-site",
"configurations": [],
"properties": {
"yarn.nodemanager.resource.memory-mb": "24576",
"yarn.nodemanager.vmem-check-enabled": "false",
"yarn.scheduler.maximum-allocation-mb": "24576"
}
}
],
"applications": [ "Hadoop", "Spark" ]
}
}
}
Last week we changed our AWS credentials and forgot to update them in the shredder so that it stopped running for 12 hours. After updating the credentials the shredder had to process a backlog of 12 hours of data in our loader bucket. We currently receive around 500k events per day. That’s roughly 500MB per day in our case. The backlog was around 250 MB that had to be shredded.
However the shredder was really slow when running this job. It took up to 21 hours with the configuration that we used above. We decided to solve the problem by bumping the emr instance version to r5.24xlarge. This solved the problem of the blockage in the pipeline but caused one of the bigger runs to contain a _SUCCESS but not a shredding_complete.json file in the corresponding shredded bucket folder. We could load that part of the data manually to Redshift by tweaking the shredding_complete json message for this run and send it to sqs. Somewhat hacky
Now we have several questions:
-
How well does our current pipeline with the above config scale? Currently the shredding job takes at least 20 minutes so the data ends up in redshift with a considerable delay.
More importantly, we will expect a lot more data in the upcoming months. How long will the shredder take if we have 1 million events per day or more? Can we enable something like EMR autoscaling? Should we move to the rdbloader 1.2.0 to overcome some of these issues in the future. -
We are very interested in the real-time postgresloader and we might try it out in parallel soon. It’s currently at 0.3.1. Do you recommend using this one in production?
Best regards