I am getting the following error message when trying to run EmrETLRunner (Version: 1.0.4):
Value guarded in: Snowplow::EmrEtlRunner::Cli::load_config
With Contract: Maybe, String, Bool => Maybe
At: uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/cli.rb:211
EmrEtlRunner at uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/contracts.rb:33
failure_callback at uri:classloader:/gems/contracts-0.11.0/lib/contracts.rb:154
call_with at uri:classloader:/gems/contracts-0.11.0/lib/contracts/call_with.rb:80
redefine_method at uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_handler.rb:138
process_options at uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/cli.rb:199
get_args_config_enrichments_resolver at uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/cli.rb:173
send_to at uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_reference.rb:43
call_with at uri:classloader:/gems/contracts-0.11.0/lib/contracts/call_with.rb:76
redefine_method at uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_handler.rb:138
<main> at uri:classloader:/emr-etl-runner/bin/snowplow-emr-etl-runner:37
load at org/jruby/RubyKernel.java:1009
<main> at uri:classloader:/META-INF/main.rb:1
require at org/jruby/RubyKernel.java:974
require at uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/rubygems/core_ext/kernel_require.rb:54
<main> at <script>:3
This is what I have in the config file:
aws:
# Credentials can be hardcoded or set in environment variables
access_key_id: <%= ENV['AWS_ACCESS_KEY'] %>
secret_access_key: <%= ENV['AWS_SECRET_KEY'] %>
s3:
region: us-east-1
buckets:
assets: s3://snowplow-hosted-assets # DO NOT CHANGE unless you are hosting the jarfiles etc yourself in your own bucket
jsonpath_assets: # If you have defined your own JSON Schemas, add the s3:// path to your own JSON Path files in your own bucket here
log: s3://company-shared-production/var/log/snowplow_tracker/runs/run=redshift_test/logs/
encrypted: false
raw:
in:
- company-snowplow-raw-production
processing: s3://company-shared-production/var/log/snowplow_tracker/runs/run=redshift_test/raw/processing/
archive: s3://company-shared-production/var/log/snowplow_tracker/runs/run=redshift_test/raw/archive/
enriched:
good: s3://company-shared-production/var/log/snowplow_tracker/runs/run=redshift_test/enriched/good/ # e.g. s3://my-out-bucket/enriched/good
archive: s3://company-shared-production/var/log/snowplow_tracker/runs/run=redshift_test/enriched/archive/ # Where to archive enriched events to, e.g. s3://my-archive-bucket/enriched
bad: s3://company-shared-production/var/log/snowplow_tracker/runs/run=redshift_test/enriched/bad/ # S3 Loader's output folder with enriched data. If present raw buckets will be discarded
errors: s3://company-shared-production/var/log/snowplow_tracker/runs/run=redshift_test/enriched/errors/
stream: s3://company-shared-production/var/log/snowplow_tracker/runs/run=redshift_test/enriched/stream/
shredded:
good: s3://company-shared-production/var/log/snowplow_tracker/runs/run=redshift_test/enriched/good/ # e.g. s3://my-out-bucket/shredded/good
bad: s3://company-shared-production/var/log/snowplow_tracker/runs/run=redshift_test/enriched/rdb_loader/ # e.g. s3://my-out-bucket/shredded/bad
errors: s3://company-shared-production/var/log/snowplow_tracker/runs/run=redshift_test/enriched/errors/ # Leave blank unless :continue_on_unexpected_error: set to true below
archive: s3://company-shared-production/var/log/snowplow_tracker/runs/run=redshift_test/enriched/archive/ # Where to archive shredded events to, e.g. s3://my-archive-bucket/shredded
consolidate_shredded_output: false # Whether to combine files when copying from hdfs to s3
emr:
ami_version: 5.9.0
region: us-east-1 # Always set this
jobflow_role: EMR_EC2_DefaultRole # Created using $ aws emr create-default-roles
service_role: EMR_DefaultRole # Created using $ aws emr create-default-roles
placement: # Set this if not running in VPC. Leave blank otherwise
ec2_subnet_id: subnet-690b1543 # Set this if running in VPC. Leave blank otherwise
ec2_key_name: amplify-keypair
security_configuration: # Specify your EMR security configuration if needed. Leave blank otherwise
bootstrap: # Set this to specify custom boostrap actions. Leave empty otherwise
software:
hbase: # Optional. To launch on cluster, provide version, "0.92.0", keep quotes. Leave empty otherwise.
lingual: # Optional. To launch on cluster, provide version, "1.1", keep quotes. Leave empty otherwise.
# Adjust your Hadoop cluster below
jobflow:
job_name: Snowplow ETL # Give your job a name
master_instance_type: m1.medium
core_instance_count: 2
core_instance_type: m1.medium
core_instance_bid: 0.015 # In USD. Adjust bid, or leave blank for on-demand core instances
core_instance_ebs: # Optional. Attach an EBS volume to each core instance.
volume_size: 100 # Gigabytes
volume_type: "gp2"
volume_iops: 400 # Optional. Will only be used if volume_type is "io1"
ebs_optimized: false # Optional. Will default to true
task_instance_count: 0 # Increase to use spot instances
task_instance_type: m1.medium
task_instance_bid: 0.015 # In USD. Adjust bid, or leave blank for non-spot-priced (i.e. on-demand) task instances
bootstrap_failure_tries: 3 # Number of times to attempt the job in the event of bootstrap failures
configuration:
yarn-site:
yarn.resourcemanager.am.max-attempts: "1"
spark:
maximizeResourceAllocation: "true"
additional_info: # Optional JSON string for selecting additional features
enrich:
versions:
spark_enrich: 1.18.0 # Version of the Spark Enrichment process
output_compression: GZIP # Stream mode supports only GZIP
storage:
versions:
rdb_loader: 0.14.0
rdb_shredder: 0.13.1 # Version of the Spark Shredding process
hadoop_elasticsearch: 0.1.0 # Version of the Hadoop to Elasticsearch copying process
monitoring:
tags:
app: ETL # Name-value pairs describing this job
logging:
level: DEBUG # You can optionally switch to INFO for production
# snowplow:
# method: get
# app_id: redshift_loader # e.g. snowplow
# collector: # e.g. d3rkrsqld9gmqf.cloudfront.net
# protocol: http
# port: 80
@Farzin_Zaker, your configuration file is good and I had no problem parsing it. I think the problem is your access_key_id and secret_access_key are not substituted with the appropriate environmental variable’s values.
As proof of concept, can you substitute the variables with the actual values and see if you progress further?
Hi @ihor, Thanks for your reply. You are right, after fixing this and some other issues, I was able to take a few steps ahead but now, I am getting ClassNotFoundException at the “[shred] spark: Shred Enriched Events” step:
20/10/14 22:59:22 ERROR ApplicationMaster: User class threw exception: java.lang.NoClassDefFoundError: org/apache/spark/sql/execution/datasources/ExecutedWriteSummary
java.lang.NoClassDefFoundError: org/apache/spark/sql/execution/datasources/ExecutedWriteSummary
at com.snowplowanalytics.snowplow.storage.spark.ShredJob$.<init>(ShredJob.scala:146)
at com.snowplowanalytics.snowplow.storage.spark.ShredJob$.<clinit>(ShredJob.scala)
at com.snowplowanalytics.snowplow.storage.spark.ShredJob.main(ShredJob.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:635)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.execution.datasources.ExecutedWriteSummary
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 8 more
20/10/14 22:59:22 INFO ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: java.lang.NoClassDefFoundError: org/apache/spark/sql/execution/datasources/ExecutedWriteSummary)
20/10/14 22:59:22 ERROR ApplicationMaster: Uncaught exception:
org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
at org.apache.spark.deploy.yarn.ApplicationMaster.runDriver(ApplicationMaster.scala:401)
at org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:254)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$main$1.apply$mcV$sp(ApplicationMaster.scala:764)
at org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:67)
at org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:66)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:66)
at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:762)
at org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)
Caused by: java.util.concurrent.ExecutionException: Boxed Error
at scala.concurrent.impl.Promise$.resolver(Promise.scala:55)
at scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:47)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:244)
at scala.concurrent.Promise$class.tryFailure(Promise.scala:112)
at scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:153)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:653)
Caused by: java.lang.NoClassDefFoundError: org/apache/spark/sql/execution/datasources/ExecutedWriteSummary
at com.snowplowanalytics.snowplow.storage.spark.ShredJob$.<init>(ShredJob.scala:146)
at com.snowplowanalytics.snowplow.storage.spark.ShredJob$.<clinit>(ShredJob.scala)
at com.snowplowanalytics.snowplow.storage.spark.ShredJob.main(ShredJob.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:635)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.execution.datasources.ExecutedWriteSummary
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 8 more
I faced a similar issue at the “[enrich] spark: Enrich Raw Events” step and changing the ami_version from 5.29.0 to 5.9.0 fixed the issue.
My EMR cluster got stuck at the shredding step and I have no idea how to fix it. Would you please help with this?
Here is my latest config file:
aws:
# Credentials can be hardcoded or set in environment variables
access_key_id: <%= ENV['AWS_ACCESS_KEY'] %>
secret_access_key: <%= ENV['AWS_SECRET_KEY'] %>
s3:
region: us-east-1
buckets:
assets: s3://company-hosted-assets # DO NOT CHANGE unless you are hosting the jarfiles etc yourself in your own bucket
jsonpath_assets: # If you have defined your own JSON Schemas, add the s3:// path to your own JSON Path files in your own bucket here
log: s3n://company-shared-production/redshift/var/log/company_tracker/logs/
encrypted: false
raw:
in:
- s3://company-raw-production/
processing: s3://company-shared-production/redshift/var/log/company_tracker/raw/processing/
archive: s3://company-shared-production/redshift/var/log/company_tracker/raw/archive/
enriched:
good: s3://company-shared-production/redshift/var/log/company_tracker/enriched/good/ # e.g. s3://my-out-bucket/enriched/good
archive: s3://company-shared-production/redshift/var/log/company_tracker/enriched/archive/ # Where to archive enriched events to, e.g. s3://my-archive-bucket/enriched
bad: s3://company-shared-production/redshift/var/log/company_tracker/enriched/bad/ # S3 Loader's output folder with enriched data. If present raw buckets will be discarded
errors: s3://company-shared-production/redshift/var/log/company_tracker/enriched/errors/
# stream: s3://company-shared-production/var/log/company_tracker/enriched/stream/
shredded:
good: s3://company-shared-production/redshift/var/log/company_tracker/shredded/good/ # e.g. s3://my-out-bucket/shredded/good
bad: s3://company-shared-production/redshift/var/log/company_tracker/shredded/rdb_loader/ # e.g. s3://my-out-bucket/shredded/bad
errors: s3://company-shared-production/redshift/var/log/company_tracker/shredded/errors/ # Leave blank unless :continue_on_unexpected_error: set to true below
archive: s3://company-shared-production/redshift/var/log/company_tracker/shredded/archive/ # Where to archive shredded events to, e.g. s3://my-archive-bucket/shredded
consolidate_shredded_output: false # Whether to combine files when copying from hdfs to s3
emr:
ami_version: 5.9.0
region: us-east-1 # Always set this
jobflow_role: EMR_EC2_DefaultRole # Created using $ aws emr create-default-roles
service_role: EMR_DefaultRole # Created using $ aws emr create-default-roles
placement: # Set this if not running in VPC. Leave blank otherwise
ec2_subnet_id: subnet-690b1543 # Set this if running in VPC. Leave blank otherwise
ec2_key_name: amplify-keypair
security_configuration: # Specify your EMR security configuration if needed. Leave blank otherwise
bootstrap: [] # Set this to specify custom boostrap actions. Leave empty otherwise
software:
hbase: #"1.3.1" #"1.4.13" # Optional. To launch on cluster, provide version, "0.92.0", keep quotes. Leave empty otherwise.
lingual: #"1.1" # Optional. To launch on cluster, provide version, "1.1", keep quotes. Leave empty otherwise.
# Adjust your Hadoop cluster below
jobflow:
job_name: company to Redshift ETL # Give your job a name
master_instance_type: m1.medium
core_instance_count: 12
core_instance_type: r3.2xlarge
core_instance_bid: #0.015 # In USD. Adjust bid, or leave blank for on-demand core instances
core_instance_ebs: # Optional. Attach an EBS volume to each core instance.
volume_size: 100 # Gigabytes
volume_type: "gp2"
volume_iops: 400 # Optional. Will only be used if volume_type is "io1"
ebs_optimized: false # Optional. Will default to true
task_instance_count: 0 # Increase to use spot instances
task_instance_type: m4.large
task_instance_bid: #0.015 # In USD. Adjust bid, or leave blank for non-spot-priced (i.e. on-demand) task instances
bootstrap_failure_tries: 3 # Number of times to attempt the job in the event of bootstrap failures
configuration:
yarn-site:
yarn.resourcemanager.am.max-attempts: "1"
spark-defaults:
maximizeResourceAllocation: "true"
# additional_info: # Optional JSON string for selecting additional features
collectors:
format: thrift # For example: 'clj-tomcat' for the Clojure Collector, 'thrift' for Thrift records, 'tsv/com.amazon.aws.cloudfront/wd_access_log' for Cloudfront access logs or 'ndjson/urbanairship.connect/v1' for UrbanAirship Connect events
enrich:
versions:
spark_enrich: 1.18.0 # Version of the Spark Enrichment process
continue_on_unexpected_error: false # Set to 'true' (and set :out_errors: above) if you don't want any exceptions thrown from ETL
output_compression: GZIP # Stream mode supports only GZIP
storage:
versions:
rdb_loader: 0.17.0
rdb_shredder: 0.16.0 # Version of the Spark Shredding process
hadoop_elasticsearch: 0.1.0 # Version of the Hadoop to Elasticsearch copying process
monitoring:
tags:
app: company ETL # Name-value pairs describing this job
logging:
level: DEBUG # You can optionally switch to INFO for production
# company:
# method: get
# app_id: redshift_loader # e.g. company
# collector: # e.g. d3rkrsqld9gmqf.cloudfront.net
# protocol: http
# port: 80
I’ve responded to that issue in the other thread - but I wanted to ask, is there any reason you’re setting up the older batch-based Spark version of the pipeline?
We stopped maintaining the batch pipeline a while ago, and it’s no longer supported. I suspect that many of the issues you’ve hit are because of this (dependencies clashes for example). The stream pipeline is our main focus, and a lot of work has been done to automate things - for example in the latest version the jsonpaths are handled for you.
I’m quite confident spark-enrich will still work, but it doesn’t surprise me that you’re hitting bumps in the road, and obviously over time it’ll become more and more out of date. I thought it’s worth mentioning in case you just weren’t aware of this. (I know our docs can be confusing so perhaps they misled you somewhere along the line).
Thanks for your reply. We have a very old Snowplow pipeline in place that stores events in Snowflake. For some reason, we have to migrate from Snowflake to Redshift. Here is the current pipeline:
To change the storage engine I was planning to modify the pipeline like this:
Raw S3 bucket -> Enrich -> Shred -> Load (Redshift)
I don’t want to change anything outside of the ETL pipeline and end up with a similar schema in the storage engine to minimize the migration effort. There are a lot of recommendation services and BI tools that need to be modified accordingly.
Unfortunately, I could not find out which version of the EMR ETL Runner is the latest version in the documents.
It is also not clear to me which versions of enrich, shredder and Ioader are compatible with each version of ETL Runner.
I also don’t know how can I switch from Raw In buckets to enrich streams. Can you point me to the right documents explaining this?
I am wondering if the latest version supports the same Raw data format that is being gathered in the S3 buckets or not.
You are right, I am lost in old and new documents and don’t know which one should I use