Hi,
I’m trying to load events in Redshift from a Kinesis Stream using lambda architecture.
I use snowplow-s3-loader-0.6.0.jar, then run snowplow-emr-etl-runner (last version) with this command:
./snowplow-emr-etl-runner run -c config.yml -r /home/centos/storage-loader/iglu_resolver.json --debug -t /home/centos/storage-loader/targets -n /home/centos/storage-loader/enrichments
Then I get this error:
Snowplow::EmrEtlRunner::EmrExecutionError (EMR jobflow j-2VE5O4DFZOUGR failed, check Amazon EMR console and Hadoop logs for details (help: https://github.com/snowplow/snowplow/wiki/Troubleshooting-jobs-on-Elastic-MapReduce). Data files not archived.
Snowplow_ETL_Lzo: TERMINATING [STEP_FAILURE] ~ elapsed time n/a [2019-04-25 13:29:52 UTC - ]
-
- Elasticity Setup Hadoop Debugging: COMPLETED ~ 00:00:04 [2019-04-25 13:29:54 UTC - 2019-04-25 13:29:58 UTC]
-
- [staging] s3-dist-cp: Raw s3://sp-archive-acc/stream/ -> Raw Staging S3: COMPLETED ~ 00:00:40 [2019-04-25 13:30:00 UTC - 2019-04-25 13:30:41 UTC]
-
- [enrich] s3-dist-cp: Raw S3 -> Raw HDFS: COMPLETED ~ 00:00:40 [2019-04-25 13:30:43 UTC - 2019-04-25 13:31:23 UTC]
-
- [enrich] spark: Enrich Raw Events: COMPLETED ~ 00:00:58 [2019-04-25 13:31:23 UTC - 2019-04-25 13:32:21 UTC]
-
- [enrich] spark: Enriched HDFS -> S3: FAILED ~ 00:00:06 [2019-04-25 13:32:21 UTC - 2019-04-25 13:32:28 UTC]
-
- [enrich] spark: Enriched HDFS _SUCCESS -> S3: CANCELLED ~ elapsed time n/a [ - ]
-
- [shred] spark: Shred Enriched Events: CANCELLED ~ elapsed time n/a [ - ]
-
- [cleanup] Empty Raw HDFS: CANCELLED ~ elapsed time n/a [ - ]
-
- [shred] s3-dist-cp: Shredded atomic events HDFS -> S3: CANCELLED ~ elapsed time n/a [ - ]
-
- [shred] s3-dist-cp: Shredded HDFS _SUCCESS -> S3: CANCELLED ~ elapsed time n/a [ - ]
-
- [shred] s3-dist-cp: Shredded types HDFS -> S3: CANCELLED ~ elapsed time n/a [ - ]
-
- [archive_raw] s3-dist-cp: Raw Staging S3 -> Raw Archive S3: CANCELLED ~ elapsed time n/a [ - ]
-
- [archive_shredded] s3-dist-cp: Shredded S3 -> Shredded Archive S3: CANCELLED ~ elapsed time n/a [ - ]
-
- [archive_enriched] s3-dist-cp: Enriched S3 -> Enriched Archive S3: CANCELLED ~ elapsed time n/a [ - ]
-
- [rdb_load] Load AWS Redshift enriched events storage Storage Target: CANCELLED ~ elapsed time n/a [ - ]):
uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/emr_job.rb:783:inrun' uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_reference.rb:43:in
send_to’
uri:classloader:/gems/contracts-0.11.0/lib/contracts/call_with.rb:76:incall_with' uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_handler.rb:138:in
block in redefine_method’
uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/runner.rb:138:inrun' uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_reference.rb:43:in
send_to’
uri:classloader:/gems/contracts-0.11.0/lib/contracts/call_with.rb:76:incall_with' uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_handler.rb:138:in
block in redefine_method’
uri:classloader:/emr-etl-runner/bin/snowplow-emr-etl-runner:41:in<main>' org/jruby/RubyKernel.java:994:in
load’
uri:classloader:/META-INF/main.rb:1:in<main>' org/jruby/RubyKernel.java:970:in
require’
uri:classloader:/META-INF/main.rb:1:in(root)' uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/rubygems/core_ext/kernel_require.rb:1:in
’
- [rdb_load] Load AWS Redshift enriched events storage Storage Target: CANCELLED ~ elapsed time n/a [ - ]):
It seems like Emr can’t write to s3, but there are events in the proccessing bucket s3://sp-archive-acc/raw/processing
So I think Emr can access and write in S3.
This is my config.yml file
aws:
Credentials can be hardcoded or set in environment variables
access_key_id: XXXXX
secret_access_key: XXXXX
s3:
region: us-east-2
buckets:
assets: s3://snowplow-hosted-assets # DO NOT CHANGE unless you are hosting the jarfiles etc yourself in your own bucket
jsonpath_assets: # If you have defined your own JSON Schemas, add the s3:// path to your own JSON Path files in your own bucket here
log: s3://sp-archive-acc/logs
encrypted: false # Whether the buckets below are enrcrypted using server side encryption (SSE-S3)
raw:
in: # This is a YAML array of one or more in buckets - you MUST use hyphens before each entry in the array, as below
- s3://sp-archive-acc/stream # e.g. s3://my-old-collector-bucket
processing: s3://sp-archive-acc/raw/processing
archive: s3://sp-archive-acc/raw/archive # e.g. s3://my-archive-bucket/raw
enriched:
good: s3://sp-rs-acc/enr/good # e.g. s3://my-out-bucket/enriched/good
bad: s3://sp-rs-acc/enr/bad # e.g. s3://my-out-bucket/enriched/bad
errors: # Leave blank unless :continue_on_unexpected_error: set to true below
archive: s3://sp-rs-acc/enr/archive # Where to archive enriched events to, e.g. s3://my-archive-bucket/enriched
shredded:
good: s3://sp-archive-acc/shredded/good # e.g. s3://my-out-bucket/shredded/good
bad: s3://sp-archive-acc/shredded/bad # e.g. s3://my-out-bucket/shredded/bad
errors: continue_on_unexpected_error # Leave blank unless :continue_on_unexpected_error: set to true below
archive: s3://sp-archive-acc/shredded/archive # Where to archive shredded events to, e.g. s3://my-archive-bucket/shredded
consolidate_shredded_output: true # Whether to combine files when copying from hdfs to s3
emr:
ami_version: 5.9.0
region: us-east-2 # Always set this
jobflow_role: EMR_EC2_DefaultRole # Created using aws emr create-default-roles
service_role: EMR_DefaultRole # Created using aws emr create-default-roles
placement: # Set this if not running in VPC. Leave blank otherwise
ec2_subnet_id: subnet-a9f19bd3 # subnet-80dfe3e8 Set this if running in VPC. Leave blank otherwise
ec2_key_name: snowplow00
security_configuration: # Specify your EMR security configuration if needed. Leave blank otherwise
bootstrap: # Set this to specify custom boostrap actions. Leave empty otherwise
software:
hbase: # Optional. To launch on cluster, provide version, “0.92.0”, keep quotes. Leave empty otherwise.
lingual: # Optional. To launch on cluster, provide version, “1.1”, keep quotes. Leave empty otherwise.
# Adjust your Hadoop cluster below
jobflow:
job_name: Snowplow_ETL_Lzo # Give your job a name
master_instance_type: i2.xlarge
core_instance_count: 2
core_instance_type: i2.xlarge
core_instance_ebs: # Optional. Attach an EBS volume to each core instance.
volume_size: 100 # Gigabytes
volume_type: “gp2”
volume_iops: 400 # Optional. Will only be used if volume_type is “io1”
ebs_optimized: false # Optional. Will default to true
task_instance_count: 0 # Increase to use spot instances
task_instance_type: i2.xlarge
task_instance_bid: 0.015 # In USD. Adjust bid, or leave blank for non-spot-priced (i.e. on-demand) task instances
bootstrap_failure_tries: 3 # Number of times to attempt the job in the event of bootstrap failures
configuration:
yarn-site:
yarn.resourcemanager.am.max-attempts: “1”
spark:
maximizeResourceAllocation: “true”
additional_info: # Optional JSON string for selecting additional features
collectors:
format: thrift # For example: ‘clj-tomcat’ for the Clojure Collector, ‘thrift’ for Thrift records, ‘tsv/com.amazon.aws.cloudfront/wd_access_log’ for Cloudfront acc$
enrich:
versions:
spark_enrich: 1.17.0 # Version of the Spark Enrichment process
continue_on_unexpected_error: false # Set to ‘true’ (and set :out_errors: above) if you don’t want any exceptions thrown from ETL
output_compression: GZIP # Compression only supported with Redshift, set to NONE if you have Postgres targets. Allowed formats: NONE, GZIP
storage:
versions:
rdb_loader: 0.14.0
rdb_shredder: 0.13.1 # Version of the Spark Shredding process
hadoop_elasticsearch: 0.1.0 # Version of the Hadoop to Elasticsearch copying process
monitoring:
tags: {name: “data-pipeline-enrichment”} # Name-value pairs describing this job
logging:
level: DEBUG # You can optionally switch to INFO for production
snowplow:
method: get
protocol: http
port: 80
app_id: snowplow # e.g. snowplow
collector: d3rkrsqld9gmqf.cloudfront.net # e.g. d3rkrsqld9gmqf.cloudfront.net
And my iglu_resolver.json
{
“schema”: “iglu:com.snowplowanalytics.iglu/resolver-config/jsonschema/1-0-1”,
“data”: {
“cacheSize”: 500,
“repositories”: [
{
“name”: “Iglu Central”,
“priority”: 0,
“vendorPrefixes”: [ “com.snowplowanalytics” ],
“connection”: {
“http”: {
“uri”: “http://iglucentral.com”
}
}
},
{
“name”: “Iglu Central - GCP Mirror”,
“priority”: 1,
“vendorPrefixes”: [ “com.snowplowanalytics” ],
“connection”: {
“http”: {
“uri”: “http://mirror01.iglucentral.com”
}
}
}
]
}
}
This is the Amazon EMR error log:
Exception in thread “main” java.lang.RuntimeException: Error running job
at com.amazon.elasticmapreduce.s3distcp.S3DistCp.run(S3DistCp.java:927)
at com.amazon.elasticmapreduce.s3distcp.S3DistCp.run(S3DistCp.java:705)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
at com.amazon.elasticmapreduce.s3distcp.Main.main(Main.java:22)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
Caused by: org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://ip-172-31-28-186.us-east-2.compute.internal:8020/tmp/98d7c334-9beb-48ca-a5f1-4727b35cd3b1/files
at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:317)
at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:265)
at org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.listStatus(SequenceFileInputFormat.java:59)
at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:352)
at org.apache.hadoop.mapreduce.JobSubmitter.writeNewSplits(JobSubmitter.java:301)
at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:318)
at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:196)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1290)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1287)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
at org.apache.hadoop.mapreduce.Job.submit(Job.java:1287)
at com.amazon.elasticmapreduce.s3distcp.S3DistCp.run(S3DistCp.java:901)
Any help, please!