Hello,
I am currently trying to set up batch enrichment using EmrEtlRunner.
The staging process is working fine and my logs are copied from their raw/in
bucket to raw/processing
.
Then the EMR cluster starts and apparently finishes the Hadoop enrichment step but the logs (see below, seems to indicate that nothing was processed).
Then when the enriched events should be copied back to S3, I get the following error:
Exception in thread "main" java.lang.RuntimeException: Error running job
at com.amazon.elasticmapreduce.s3distcp.S3DistCp.run(S3DistCp.java:927)
at com.amazon.elasticmapreduce.s3distcp.S3DistCp.run(S3DistCp.java:720)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
at com.amazon.elasticmapreduce.s3distcp.Main.main(Main.java:22)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
Caused by: org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://<IP_ADDR:PORT>/tmp/03482dc5-7ec2-4884-9593-e196dd9fe755/files
at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:317)
at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:265)
at org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.listStatus(SequenceFileInputFormat.java:59)
at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:352)
at org.apache.hadoop.mapreduce.JobSubmitter.writeNewSplits(JobSubmitter.java:301)
at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:318)
at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:196)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1290)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1287)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
at org.apache.hadoop.mapreduce.Job.submit(Job.java:1287)
at com.amazon.elasticmapreduce.s3distcp.S3DistCp.run(S3DistCp.java:901)
... 10 more
You can find some logs attached to this post and here is some more information about my configuration:
- I am using the clojure collector but we modified a bit the logrotate to have a custom file name. I forked and edited the EmrEtlRunner project to add a
file_pattern
attribute in the collectors namespace. This allows me to copy logs with a different naming from the raw/in bucket to raw/processing. - I do not have any custom enrichments set up yet.
Do you have any idea why this error could happen?
Something else I need to mention, if I try to run EmrEtlRunne
with the --debug option, it fails because it can’t initialize debug for hadoop.
Thank you in advance for your response.
Console output - No debug
./snowplow-emr-etl-runner --skip staging --enrichments ../emr-etl-runner/enrichments/ --config ../emr-etl-runner/conf/dev-conf.yml --resolver ../emr-etl-runner/iglu/iglu_resolver.json
D, [2016-10-26T15:46:07.864000 #11714] DEBUG -- : Initializing EMR jobflow
D, [2016-10-26T15:46:15.102000 #11714] DEBUG -- : EMR jobflow j-3HMEWSLYEE8VW started, waiting for jobflow to complete...
F, [2016-10-26T15:58:18.325000 #11714] FATAL -- :
Snowplow::EmrEtlRunner::EmrExecutionError (EMR jobflow j-3HMEWSLYEE8VW failed, check Amazon EMR console and Hadoop logs for details (help: https://github.com/snowplow/snowplow/wiki/Troubleshooting-jobs-on-Elastic-MapReduce). Data files not archived.
Snowplow ETL: TERMINATING [STEP_FAILURE] ~ elapsed time n/a [2016-10-26 15:55:05 +0200 - ]
- 1. Elasticity Scalding Step: Enrich Raw Events: COMPLETED ~ 00:01:50 [2016-10-26 15:55:12 +0200 - 2016-10-26 15:57:02 +0200]
- 2. Elasticity S3DistCp Step: Enriched HDFS -> S3: FAILED ~ 00:00:14 [2016-10-26 15:57:04 +0200 - 2016-10-26 15:57:19 +0200]
- 3. Elasticity S3DistCp Step: Shredded HDFS -> S3: CANCELLED ~ elapsed time n/a [ - ]
- 4. Elasticity Scalding Step: Shred Enriched Events: CANCELLED ~ elapsed time n/a [ - ]
- 5. Elasticity S3DistCp Step: Enriched HDFS _SUCCESS -> S3: CANCELLED ~ elapsed time n/a [ - ]):
uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/emr_job.rb:475:in `run'
uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_reference.rb:43:in `send_to'
uri:classloader:/gems/contracts-0.11.0/lib/contracts/call_with.rb:76:in `call_with'
uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_handler.rb:138:in `block in redefine_method'
uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/runner.rb:68:in `run'
uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_reference.rb:43:in `send_to'
uri:classloader:/gems/contracts-0.11.0/lib/contracts/call_with.rb:76:in `call_with'
uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_handler.rb:138:in `block in redefine_method'
uri:classloader:/emr-etl-runner/bin/snowplow-emr-etl-runner:39:in `<main>'
org/jruby/RubyKernel.java:973:in `load'
uri:classloader:/META-INF/main.rb:1:in `<main>'
org/jruby/RubyKernel.java:955:in `require'
uri:classloader:/META-INF/main.rb:1:in `(root)'
uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/rubygems/core_ext/kernel_require.rb:1:in `<main>'
Console output - Debug
./snowplow-emr-etl-runner --debug --skip staging --enrichments ../emr-etl-runner/enrichments/ --config ../emr-etl-runner/conf/dev-conf.yml --resolver ../emr-etl-runner/iglu/iglu_resolver.json
D, [2016-10-26T16:01:53.883000 #12879] DEBUG -- : Initializing EMR jobflow
D, [2016-10-26T16:01:59.577000 #12879] DEBUG -- : EMR jobflow j-YK85XVK0207O started, waiting for jobflow to complete...
F, [2016-10-26T16:12:02.353000 #12879] FATAL -- :
Snowplow::EmrEtlRunner::EmrExecutionError (EMR jobflow j-YK85XVK0207O failed, check Amazon EMR console and Hadoop logs for details (help: https://github.com/snowplow/snowplow/wiki/Troubleshooting-jobs-on-Elastic-MapReduce). Data files not archived.
Snowplow ETL: TERMINATING [STEP_FAILURE] ~ elapsed time n/a [2016-10-26 16:10:33 +0200 - ]
- 1. Elasticity Setup Hadoop Debugging: FAILED ~ 00:00:00 [2016-10-26 16:10:33 +0200 - 2016-10-26 16:10:33 +0200]
- 2. Elasticity S3DistCp Step: Shredded HDFS -> S3: CANCELLED ~ elapsed time n/a [ - ]
- 3. Elasticity Scalding Step: Shred Enriched Events: CANCELLED ~ elapsed time n/a [ - ]
- 4. Elasticity S3DistCp Step: Enriched HDFS _SUCCESS -> S3: CANCELLED ~ elapsed time n/a [ - ]
- 5. Elasticity S3DistCp Step: Enriched HDFS -> S3: CANCELLED ~ elapsed time n/a [ - ]
- 6. Elasticity Scalding Step: Enrich Raw Events: CANCELLED ~ elapsed time n/a [ - ]):
uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/emr_job.rb:475:in `run'
uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_reference.rb:43:in `send_to'
uri:classloader:/gems/contracts-0.11.0/lib/contracts/call_with.rb:76:in `call_with'
uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_handler.rb:138:in `block in redefine_method'
uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/runner.rb:68:in `run'
uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_reference.rb:43:in `send_to'
uri:classloader:/gems/contracts-0.11.0/lib/contracts/call_with.rb:76:in `call_with'
uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_handler.rb:138:in `block in redefine_method'
uri:classloader:/emr-etl-runner/bin/snowplow-emr-etl-runner:39:in `<main>'
org/jruby/RubyKernel.java:973:in `load'
uri:classloader:/META-INF/main.rb:1:in `<main>'
org/jruby/RubyKernel.java:955:in `require'
uri:classloader:/META-INF/main.rb:1:in `(root)'
uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/rubygems/core_ext/kernel_require.rb:1:in `<main>'
Configuration file
aws:
# Credentials can be hardcoded or set in environment variables
access_key_id: KEY_ID
secret_access_key: KEY_SECRET
s3:
region: eu-west-1
buckets:
assets: s3://snowplow-hosted-assets # DO NOT CHANGE unless you are hosting the jarfiles etc yourself in your own bucket
jsonpath_assets: s3://my_bucket/jsonpath/ # If you have defined your own JSON Schemas, add the s3:// path to your own JSON Path files in your own bucket here
log: s3://my_bucket/log/ # Bbucket in which Amazon EMR will record processing information for this job run, including logging any errors
raw:
in: # Multiple in buckets are permitted
- s3://my_bucket/raw/in/ # e.g. s3://my-in-bucket
# - ADD HERE # Another bucket
processing: s3://my_bucket/raw/processing/
archive: s3://my_bucket/raw/archive/ # e.g. s3://my-archive-bucket/raw
enriched:
good: s3://my_bucket/enriched/good/ # e.g. s3://my-out-bucket/enriched/good
bad: s3://my_bucket/enriched/bad/ # e.g. s3://my-out-bucket/enriched/bad
errors: #s3://my_bucket/enriched/errors # Leave blank unless :continue_on_unexpected_error: set to true below
archive: s3://my_bucket/enriched/archive/ # Where to archive enriched events to, e.g. s3://my-archive-bucket/enriched
shredded:
good: s3://my_bucket/shredded/good/ # e.g. s3://my-out-bucket/shredded/good
bad: s3://my_bucket/shredded/bad/ # e.g. s3://my-out-bucket/shredded/bad
errors: #s3://my_bucket/shredded/errors # Leave blank unless :continue_on_unexpected_error: set to true below
archive: s3://my_bucket/shredded/archive/ # Where to archive shredded events to, e.g. s3://my-archive-bucket/shredded
emr:
ami_version: 4.5.0
region: eu-west-1 # Always set this
jobflow_role: SnowplowEMRInstanceRole # Created using $ aws emr create-default-roles
service_role: SnowplowEMRRole # Created using $ aws emr create-default-roles
placement: eu-west-1a # Set this if NOT running in VPC. Leave blank otherwise
ec2_subnet_id: # Set this if running in VPC. Leave blank otherwise
ec2_key_name: KEY_NAME
bootstrap: [] # Set this to specify custom boostrap actions. Leave empty otherwise
software:
hbase: # Optional. To launch on cluster, provide version, "0.92.0", keep quotes. Leave empty otherwise.
lingual: # Optional. To launch on cluster, provide version, "1.1", keep quotes. Leave empty otherwise.
# Adjust your Hadoop cluster below
jobflow:
master_instance_type: m1.medium
core_instance_count: 2
core_instance_type: m1.medium
task_instance_count: 0 # Increase to use spot instances
task_instance_type: m1.medium
task_instance_bid: 0.015 # In USD. Adjust bid, or leave blank for non-spot-priced (i.e. on-demand) task instances
bootstrap_failure_tries: 3 # Number of times to attempt the job in the event of bootstrap failures
additional_info: # Optional JSON string for selecting additional features
collectors:
format: clj-tomcat # For example: 'clj-tomcat' for the Clojure Collector, 'thrift' for Thrift records, 'tsv/com.amazon.aws.cloudfront/wd_access_log' for Cloudfront access logs or 'ndjson/urbanairship.connect/v1' for UrbanAirship Connect events
file_pattern: "my_event_log_pattern.*"
enrich:
job_name: Snowplow ETL # Give your job a name
versions:
hadoop_enrich: 1.8.0 # Version of the Hadoop Enrichment process
hadoop_shred: 0.9.0 # Version of the Hadoop Shredding process
hadoop_elasticsearch: 0.1.0 # Version of the Hadoop to Elasticsearch copying process
continue_on_unexpected_error: false # Set to 'true' (and set :out_errors: above) if you don't want any exceptions thrown from ETL
output_compression: GZIP # Compression only supported with Redshift, set to NONE if you have Postgres targets. Allowed formats: NONE, GZIP
storage:
download:
folder: # Postgres-only config option. Where to store the downloaded files. Leave blank for Redshift
targets:
- name: "My Redshift database"
type: redshift
host: MY_REDSHIT_HOST # The endpoint as shown in the Redshift console
database: MY_REDSHIT_DATABASE # Name of database
port: MY_REDSHIT_PORT # Default Redshift port
ssl_mode: disable # One of disable (default), require, verify-ca or verify-full
table: atomic.events
username: MY_REDSHIT_USR
password: MY_REDSHIT_PWD
maxerror: 1 # Stop loading on first error, or increase to permit more load errors
comprows: 200000 # Default for a 1 XL node cluster. Not used unless --include compupdate specifiedh
monitoring:
tags: {} # Name-value pairs describing this job
logging:
level: DEBUG # You can optionally switch to INFO for production
Logs of enrichment step
2016-10-24 13:44:54,119 INFO cascading.flow.Flow (flow com.snowplowanalytics.snowplow.enrich.hadoop.EtlJob): [com.snowplowanalytics....] parallel execution is enabled: true
2016-10-24 13:44:54,119 INFO cascading.flow.Flow (flow com.snowplowanalytics.snowplow.enrich.hadoop.EtlJob): [com.snowplowanalytics....] starting jobs: 3
2016-10-24 13:44:54,119 INFO cascading.flow.Flow (flow com.snowplowanalytics.snowplow.enrich.hadoop.EtlJob): [com.snowplowanalytics....] allocating threads: 3
2016-10-24 13:44:54,123 INFO cascading.flow.FlowStep (pool-5-thread-1): [com.snowplowanalytics....] starting step: (1/3)
2016-10-24 13:44:54,222 INFO org.apache.hadoop.yarn.client.RMProxy (pool-5-thread-1): Connecting to ResourceManager at <IP_ADDR>
2016-10-24 13:44:54,717 INFO org.apache.hadoop.yarn.client.RMProxy (pool-5-thread-1): Connecting to ResourceManager at <IP_ADDR>
2016-10-24 13:44:56,945 INFO com.hadoop.compression.lzo.GPLNativeCodeLoader (pool-5-thread-1): Loaded native gpl library
2016-10-24 13:44:56,949 INFO com.hadoop.compression.lzo.LzoCodec (pool-5-thread-1): Successfully loaded & initialized native-lzo library [hadoop-lzo rev 38958adfc6f8f191da8f76e5beafd1f11eaccab2]
2016-10-24 13:44:57,011 INFO com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem (pool-5-thread-1): listStatus s3://my_bucket/raw/processing with recursive false
2016-10-24 13:44:57,089 INFO org.apache.hadoop.mapred.FileInputFormat (pool-5-thread-1): Total input paths to process : 0
2016-10-24 13:44:57,232 INFO org.apache.hadoop.mapreduce.JobSubmitter (pool-5-thread-1): number of splits:0
2016-10-24 13:44:57,964 INFO org.apache.hadoop.mapreduce.JobSubmitter (pool-5-thread-1): Submitting tokens for job: job_1477316512805_0001
2016-10-24 13:44:58,842 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl (pool-5-thread-1): Submitted application application_1477316512805_0001
2016-10-24 13:44:58,969 INFO org.apache.hadoop.mapreduce.Job (pool-5-thread-1): The url to track the job: http://<IP_ADDR>/proxy/application_1477316512805_0001/
2016-10-24 13:44:58,971 INFO cascading.flow.FlowStep (pool-5-thread-1): [com.snowplowanalytics....] submitted hadoop job: job_1477316512805_0001
2016-10-24 13:44:58,971 INFO cascading.flow.FlowStep (pool-5-thread-1): [com.snowplowanalytics....] tracking url: http://<IP_ADDR>/proxy/application_1477316512805_0001/
2016-10-24 13:45:24,127 INFO org.apache.hadoop.mapred.ClientServiceDelegate (pool-5-thread-1): Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server
2016-10-24 13:45:24,323 INFO cascading.util.Update (UpdateRequestTimer): newer Cascading release available: 2.6.3
2016-10-24 13:45:25,665 INFO cascading.flow.FlowStep (pool-5-thread-2): [com.snowplowanalytics....] starting step: (2/3) ...d/run=2016-10-24-15-35-02
2016-10-24 13:45:25,665 INFO cascading.flow.FlowStep (pool-5-thread-3): [com.snowplowanalytics....] starting step: (3/3) .../snowplow/enriched-events
2016-10-24 13:45:25,756 INFO org.apache.hadoop.yarn.client.RMProxy (pool-5-thread-2): Connecting to ResourceManager at <IP_ADDR>
2016-10-24 13:45:25,863 INFO org.apache.hadoop.yarn.client.RMProxy (pool-5-thread-3): Connecting to ResourceManager at <IP_ADDR>
2016-10-24 13:45:25,918 INFO org.apache.hadoop.yarn.client.RMProxy (pool-5-thread-2): Connecting to ResourceManager at <IP_ADDR>
2016-10-24 13:45:25,991 INFO org.apache.hadoop.yarn.client.RMProxy (pool-5-thread-3): Connecting to ResourceManager at <IP_ADDR>
2016-10-24 13:45:29,719 INFO org.apache.hadoop.mapred.FileInputFormat (pool-5-thread-3): Total input paths to process : 0
2016-10-24 13:45:29,851 INFO org.apache.hadoop.mapreduce.JobSubmitter (pool-5-thread-3): number of splits:0
2016-10-24 13:45:30,016 INFO org.apache.hadoop.mapreduce.JobSubmitter (pool-5-thread-3): Submitting tokens for job: job_1477316512805_0002
2016-10-24 13:45:30,051 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl (pool-5-thread-3): Submitted application application_1477316512805_0002
2016-10-24 13:45:30,058 INFO org.apache.hadoop.mapreduce.Job (pool-5-thread-3): The url to track the job: http://<IP_ADDR>/proxy/application_1477316512805_0002/
2016-10-24 13:45:30,058 INFO cascading.flow.FlowStep (pool-5-thread-3): [com.snowplowanalytics....] submitted hadoop job: job_1477316512805_0002
2016-10-24 13:45:30,058 INFO cascading.flow.FlowStep (pool-5-thread-3): [com.snowplowanalytics....] tracking url: http://<IP_ADDR>/proxy/application_1477316512805_0002/
2016-10-24 13:45:30,280 INFO org.apache.hadoop.mapred.FileInputFormat (pool-5-thread-2): Total input paths to process : 0
2016-10-24 13:45:30,345 INFO org.apache.hadoop.mapreduce.JobSubmitter (pool-5-thread-2): number of splits:0
2016-10-24 13:45:30,430 INFO org.apache.hadoop.mapreduce.JobSubmitter (pool-5-thread-2): Submitting tokens for job: job_1477316512805_0003
2016-10-24 13:45:30,467 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl (pool-5-thread-2): Submitted application application_1477316512805_0003
2016-10-24 13:45:30,472 INFO org.apache.hadoop.mapreduce.Job (pool-5-thread-2): The url to track the job: http://<IP_ADDR>/proxy/application_1477316512805_0003/
2016-10-24 13:45:30,472 INFO cascading.flow.FlowStep (pool-5-thread-2): [com.snowplowanalytics....] submitted hadoop job: job_1477316512805_0003
2016-10-24 13:45:30,473 INFO cascading.flow.FlowStep (pool-5-thread-2): [com.snowplowanalytics....] tracking url: http://<IP_ADDR>/proxy/application_1477316512805_0003/
2016-10-24 13:45:50,083 INFO org.apache.hadoop.mapred.ClientServiceDelegate (pool-5-thread-3): Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server
2016-10-24 13:46:06,286 INFO cascading.tap.hadoop.util.Hadoop18TapUtil (flow com.snowplowanalytics.snowplow.enrich.hadoop.EtlJob): deleting temp path s3://my_bucket/enriched/bad/run=2016-10-24-15-35-02/_temporary
2016-10-24 13:46:06,334 INFO cascading.tap.hadoop.util.Hadoop18TapUtil (flow com.snowplowanalytics.snowplow.enrich.hadoop.EtlJob): deleting temp path hdfs:/local/snowplow/enriched-events/_temporary