We have some bad events due to a missing schema and are not being able to recover.
Our current architecture looks like this:
Scala Collector > Kinesis > S3 Collector > S3 > EmrEtlRunner > Redshift
We tried to recover events using the Hadoop Event Recovery, it took approximately 9 hours to run this job in EMR and move 10 days worth of bad records (~200Gb) to s3/recovered. Files were apparently processed correctly and showed up in s3/recovered.
However when we ran the EmrEtlRunner again with --skip staging
and processing= s3/recovered
the job failed in the Elasticity S3DistCp Step: Enriched HDFS -> S3 step with the following message:
Caused by: org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://ip-10-0-4-101.eu-west-1.compute.internal:8020/tmp/e0bb72ce-4b9a-43d3-b341-19132d6f921c/files
Looks like files were not enriched at all and they were not available for S3DistCp. No bad records or errors in S3.
We also tried to run a python script similar to what is found here:
https://discourse.snowplow.io/t/python-script-to-reprocess-bad-rows/216/2
By the way this was much faster than the Hadoop Event Recovery (a few minutes vs 9 hours), and processed files also look fine. However when we ran the EmrEtlRunner the job hanged at the Enrich step for 12 hours. EMR monitoring showed 500 containers pending the whole time, looking like the job didn’t progress at all. HDFS usage was also quite low ~3%.
Again no bad records or errors in S3.
We canceled the execution and are kind of out of other alternatives. Would you have any idea of what we should do to process those bad records?
We also tried different instance settings for the EMR cluster.
This is our config file:
```
aws:
access_key_id: {{ aws_access_key_id }}
secret_access_key: {{ aws_secret_access_key }}
s3:
region: eu-west-1
buckets:
assets: s3://snowplow-hosted-assets
jsonpath_assets: s3://{{ environment }}-snowplow/jsonpaths
log: s3n://{{ environment }}-snowplow/etl-logs
encrypted: false
raw:
in:
- s3a://{ environment }}-snowplow/in
processing: s3a://{{ environment }}-snowplow/etl-processing
archive: s3a://{{ environment }}-snowplow/archive-raw
enriched:
good: s3a://{{ environment }}-snowplow/data-enriched/good
bad: s3a://{{ environment }}-snowplow/data-enriched/bad
errors: s3a://{{ environment }}-snowplow/data-enriched/errors
archive: s3a://{{ environment }}-snowplow/data-enriched/archive
shredded:
good: s3a://{{ environment }}-snowplow/data-shredded/good
bad: s3a://{{ environment }}-snowplow/data-shredded/bad
errors: s3a://{{ environment }}-snowplow/data-shredded/errors
archive: s3a://{{ environment }}-snowplow/data-shredded/archive
emr:
ami_version: 5.9.0
region: eu-west-1
jobflow_role: role-{{ environment }}-platform-snowplow-ec2
service_role: role-{{ environment }}-platform-snowplow-emr
placement:
ec2_subnet_id: {{ ec2_subnet_id }}
ec2_key_name: {{ ec2_key_name }}
bootstrap: []
software:
hbase:
lingual:
configuration:
yarn-site:
yarn.resourcemanager.am.max-attempts: "1"
spark:
maximizeResourceAllocation: "true"
jobflow:
job_name: Snowplow ETL
master_instance_type: m4.large
core_instance_count: 3
core_instance_type: r4.xlarge
task_instance_count: 0
task_instance_type: m4.large
task_instance_bid: 0.020
core_instance_ebs:
volume_size: 200
volume_type: "io1"
volume_iops: 400
ebs_optimized: false
bootstrap_failure_tries: 3
collectors:
format: thrift
enrich:
versions:
spark_enrich: 1.16.0
continue_on_unexpected_error: false
output_compression: NONE
storage:
versions:
rdb_loader: 0.14.0
rdb_shredder: 0.13.1
hadoop_elasticsearch: 0.1.0
monitoring:
tags: {
"Name": "emr-service-snowplowetl"
}
logging:
level: INFO
snowplow:
method: get
app_id: snowplow
collector: snplw.co.uk