Hi,
I am attempting to setup Snowplow using the Cloudfront Collector and sending events to a Redshift database, but am receiving this error while running the EmrEtlRunner:
Exception in thread "main" java.lang.RuntimeException: Error running job
at com.amazon.elasticmapreduce.s3distcp.S3DistCp.run(S3DistCp.java:927)
at com.amazon.elasticmapreduce.s3distcp.S3DistCp.run(S3DistCp.java:720)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
at com.amazon.elasticmapreduce.s3distcp.Main.main(Main.java:22)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
Caused by: org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://ip-10-13-1-73.ec2.internal:8020/tmp/bc831f91-0969-4863-a3bd-5932a9632619/files
at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:317)
at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:265)
at org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.listStatus(SequenceFileInputFormat.java:59)
at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:352)
at org.apache.hadoop.mapreduce.JobSubmitter.writeNewSplits(JobSubmitter.java:301)
at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:318)
at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:196)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1290)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1287)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
at org.apache.hadoop.mapreduce.Job.submit(Job.java:1287)
at com.amazon.elasticmapreduce.s3distcp.S3DistCp.run(S3DistCp.java:901)
My .yml configuration is as follows:
aws:
# Credentials can be hardcoded or set in environment variables
access_key_id: "*********"
secret_access_key: "*************"
s3:
region: us-east-1
buckets:
assets: s3://snowplow-hosted-assets # DO NOT CHANGE unless you are hosting the jarfiles etc yourself in your own bucket
jsonpath_assets: # If you have defined your own JSON Schemas, add the s3:// path to your own JSON Path files in your own bucket here
log: s3://snowplow-emr-etl/logs
raw:
in: # Multiple in buckets are permitted
- s3://snowplow-logs-collector # e.g. s3://my-in-bucket
processing: s3://etc/processing
archive: s3://etc/raw # e.g. s3://my-archive-bucket/in
enriched:
good: s3://etc/enriched/good # e.g. s3://my-out-bucket/enriched/good
bad: s3://etc/enriched/bad # e.g. s3://my-out-bucket/enriched/bad
errors: s3://etc/enriched/errors # Leave blank unless continue_on_unexpected_error: set to true below
archive: s3://etc/enriched/archive # Where to archive enriched events to, e.g. s3://my-archive-bucket/enriched
shredded:
good: s3://etc/shredded/good # e.g. s3://my-out-bucket/shredded/good
bad: s3://etc/shredded/bad # e.g. s3://my-out-bucket/shredded/bad
errors: # Leave blank unless continue_on_unexpected_error: set to true below
archive: s3:/etc/shredded/archive # Where to archive shredded events to, e.g. s3://my-archive-bucket/shredded
emr:
ami_version: 4.5.0 # Don't change this
region: us-east-1 # Always set this
jobflow_role: EMR_EC2_DefaultRole # Created using $ aws emr create-default-roles
service_role: EMR_DefaultRole # Created using $ aws emr create-default-roles
placement: # Set this if not running in VPC. Leave blank otherwise
ec2_subnet_id: ****** # Set this if running in VPC. Leave blank otherwise
ec2_key_name: *****
bootstrap: [] # Set this to specify custom boostrap actions. Leave empty otherwise
software:
hbase: # Optional. To launch on cluster, provide version, "0.92.0", keep quotes. Leave empty otherwise.
lingual: # Optional. To launch on cluster, provide version, "1.1", keep quotes. Leave empty otherwise.
# Adjust your Hadoop cluster below
jobflow:
master_instance_type: m1.medium
core_instance_count: 2
core_instance_type: m1.medium
task_instance_count: 0 # Increase to use spot instances
task_instance_type: m1.medium
task_instance_bid: 0.015 # In USD. Adjust bid, or leave blank for non-spot-priced (i.e. on-demand) task instances
bootstrap_failure_tries: 3 # Number of times to attempt the job in the event of bootstrap failures
additional_info: # Optional JSON string for selecting additional features
collectors:
format: cloudfront # Or 'clj-tomcat' for the Clojure Collector, or 'thrift' for Thrift records, or 'tsv/com.amazon.aws.cloudfront/wd_access_log' for Cloudfront access logs
enrich:
job_name: GB Snowplow ETL # Give your job a name
versions:
hadoop_enrich: 1.7.0 # Version of the Hadoop Enrichment process
hadoop_shred: 0.9.0 # Version of the Hadoop Shredding process
hadoop_elasticsearch: 0.1.0 # Version of the Hadoop to Elasticsearch copying process
continue_on_unexpected_error: false # Set to 'true' (and set out_errors: above) if you don't want any exceptions thrown from ETL
output_compression: GZIP # Compression only supported with Redshift, set to NONE if you have Postgres targets. Allowed formats: NONE, GZIP
storage:
download:
folder: # Postgres-only config option. Where to store the downloaded files. Leave blank for Redshift
targets:
- name: "Snowplow-Cerebro"
type: redshift
host: ****.us-east-1.redshift.amazonaws.com:5439 # The endpoint as shown in the Redshift console
database: snowplow # Name of database
port: 5439 # Default Redshift port
table: atomic.events
username: ********
password: ********
maxerror: 1 # Stop loading on first error, or increase to permit more load errors
comprows: 200000 # Default for a 1 XL node cluster. Not used unless --include compupdate specified
ssl_mode: disable
monitoring:
tags: {} # Name-value pairs describing this job
logging:
level: DEBUG # You can optionally switch to INFO for production
snowplow:
method: get
app_id: gb-snowplow # e.g. snowplow
collector: ***.cloudfront.net # e.g. d3rkrsqld9gmqf.cloudfront.net
Things were actually working earlier when I was using a different Redshift cluster, but I had to create a new one because the last cluster was configured with a private subnet that could not be accessed through a client to view/query the database. I am sure that the new host / port / database / username / password have all been changed to match the new database but am still having the problem.
Any help would be great, thanks in advance!