For some reason, each time a row is malformed the process stops to a halt, leaving files in the “processing” folder.
This, in turn, causes our all consecutive runs to fail on folder not empty.
Lately this issue is causing us a lot of work as we need to move all the files from “processing” back to the “in” bucket and restart the ETL.
How can we configure the ETL to “skip” malformed rows, and send an email of how many missed rows were skipped?
After that, on the next run we receive:
Snowplow::EmrEtlRunner::DirectoryNotEmptyError (Should not stage files for enrichment, processing bucket s3://theculturetrip-snow-plow-processing/events/processing/ is not empty)
When the EMR process runs, any data that cannot be successfully processed will be written to the ‘bad’ bucket. It is normal to have several of these lines generated with each run. We have guides to using these bad rows to debugging upstream issues here, here and here.
However, this process is non-blocking. The reason your EMR job is failing will not be related to the bad rows.
When the EMR job fails you should get an error message back from EmrEtlRunner. In addition it should be possible to look in the AWS EMR console and see what an error message there.
If you can share with us those error messages we’ll be in a better position to help you diagnose the route cause of the failures.
aws:
access_key_id: ----------------------
secret_access_key: ----------------------
s3:
region: us-east-1
buckets:
assets: s3://snowplow-hosted-assets # DO NOT CHANGE unless you are hosting the jarfiles etc yourself in your own bucket
jsonpath_assets: # If you have defined your own JSON Schemas, add the s3:// path to your own JSON Path files in your own bucket here
log: s3://theculturetrip-snow-plow-logs
raw:
in: # Multiple in buckets are permitted
- ---------------------- # e.g. s3://my-in-bucket
# - s3://elasticbeanstalk-us-east-1-754733210272/resources/environments/logs/publish/e-yyubswy4pr/i-0daea088
processing: ----------------------
archive: ---------------------- # e.g. s3://my-archive-bucket/raw
enriched:
good: ---------------------- # e.g. s3://my-out-bucket/enriched/good
bad: ---------------------- # e.g. s3://my-out-bucket/enriched/bad
errors: # Leave blank unless :continue_on_unexpected_error: set to true below
archive: ---------------------- # Where to archive enriched events to, e.g. s3://my-archive-bucket/enriched
shredded:
good: ---------------------- # e.g. s3://my-out-bucket/shredded/good
bad: ---------------------- # e.g. s3://my-out-bucket/shredded/bad
errors: # Leave blank unless :continue_on_unexpected_error: set to true below
archive: ---------------------- # Where to archive shredded events to, e.g. s3://my-archive-bucket/shredded
emr:
ami_version: 3.7.0 # Don't change this
region: ---------------------- # Always set this
jobflow_role: ---------------------- # Created using $ aws emr create-default-roles
service_role: ---------------------- # Created using $ aws emr create-default-roles
placement: ---------------------- # Set this if not running in VPC. Leave blank otherwise
ec2_subnet_id: # Set this if running in VPC. Leave blank otherwise
ec2_key_name: TheCultureTrip-Snow-Plow
bootstrap: # Set this to specify custom boostrap actions. Leave empty otherwise
software:
hbase: # To launch on cluster, provide version, "0.92.0", keep quotes
lingual: # To launch on cluster, provide version, "1.1", keep quotes
# Adjust your Hadoop cluster below
jobflow:
master_instance_type: m1.medium
core_instance_count: 2
core_instance_type: m1.medium
task_instance_count: 0 # Increase to use spot instances
task_instance_type: m1.medium
task_instance_bid: 0.015 # In USD. Adjust bid, or leave blank for non-spot-priced (i.e. on-demand) task instances
bootstrap_failure_tries: 3 # Number of times to attempt the job in the event of bootstrap failures
collectors:
format: cloudfront # For example: 'clj-tomcat' for the Clojure Collector, 'thrift' for Thrift records, 'tsv/com.amazon.aws.cloudfront/wd_access_log' for Cloudfront access logs or 'ndjson/urbanairship.connect/v1' for UrbanAirship Connect events
enrich:
job_name: Snowplow ETL # Give your job a name
versions:
hadoop_enrich: 1.5.1 # Version of the Hadoop Enrichment process
hadoop_shred: 0.7.0 # Version of the Hadoop Shredding process
hadoop_elasticsearch: 0.1.0 # Version of the Hadoop to Elasticsearch copying process
continue_on_unexpected_error: false # Set to 'true' (and set :out_errors: above) if you don't want any exceptions thrown from ETL
output_compression: GZIP # Compression only supported with Redshift, set to NONE if you have Postgres targets. Allowed formats: NONE, GZIP
storage:
download:
folder: # Postgres-only config option. Where to store the downloaded files. Leave blank for Redshift
targets:
- name: ----------------------
type: redshift
host: ---------------------- # The endpoint as shown in the Redshift console
database: snow_plow # Name of database
port: 5439 # Default Redshift port
ssl_mode: ---------------------- # One of disable (default), require, verify-ca or verify-full
table: ----------------------
username: ----------------------
password: ----------------------
maxerror: 1 # Stop loading on first error, or increase to permit more load errors
comprows: 200000 # Default for a 1 XL node cluster. Not used unless --include compupdate specified
es_nodes_wan_only: false # Set to true if using Amazon Elasticsearch Service
monitoring:
tags: {} # Name-value pairs describing this job
logging:
level: DEBUG # You can optionally switch to INFO for production
snowplow:
method: get
app_id: snowplow # e.g. snowplow
collector: ---------------------- # e.g. d3rkrsqld9gmqf.cloudfront.net
You’re running an old version of Snowplow which uses an old EMR AMI (version 3.7.0).
We’ve found that newer AMIs are significantly less likely to fail on job steps so my first recommendation would be for you to upgrade. The latest version of the batch enrichment using version 4.5.0 of the EMR AMI:
My second recommendation would be not to use 2xm1.medium core instances: better to run the job on a single larger core instance (e.g. an m1.large or m3.xlarge). Using an m1.medium instance for the master node is fine - this node does very little at all.
You can consult our upgrade guide here: https://github.com/snowplow/snowplow/wiki/Upgrade-Guide. It’s r79 you want to get on (r80 and 81 are updates to the real-time pipeline). Follow the steps to upgrade from your current version
For me it looks like you haven’t replaced old EmrEtlRunner with newer (r77) one. You can find a link (and other instructions) in Upgrade Guide @Yali provided.