Hi Fellows,
My current pipeline is: Scala Collector Instance -> Kinesis -> Enrich Instance -> Kinesis -> S3 Loader Instance -> S3
I am now trying to copy the data in S3 to Redshift. However, no matter what I tried, it just gives the error message below.
Please help! Thank you!!!
Here is the error result after executing:
java -Dorg.slf4j.simpleLogger.defaultLogLevel=debug -jar snowplow-emr-etl-runner run -c config/s3-to-redshift-config.yml -r config/iglu_resolver.json -t config/targets --skip staging_stream_enrich
ubuntu@ip-172-xx-xx-xxx:~$ java -Dorg.slf4j.simpleLogger.defaultLogLevel=debug -jar snowplow-emr-etl-runner run -c config/s3-to-redshift-config.yml -r config/iglu_resolver.json -t config/targets --skip staging_stream_enrich
uri:classloader:/gems/avro-1.8.1/lib/avro/schema.rb:350: warning: constant ::Fixnum is deprecated
ReturnContractError: Contract violation for return value:
Expected: #<Contracts::Maybe:0xf01fc6d @vals=[{:aws=>{:access_key_id=>String, :secret_access_key=>String, :s3=>{:region=>String, :buckets=>{:assets=>String, :jsonpath_assets=>#<Contracts::Maybe:0x6591f8ea @vals=[String, nil]>, :log=>String, :encrypted=>Contracts::Bool, :raw=>#<Contracts::Maybe:0x136fece2 @vals=[{:in=>#<Contracts::CollectionOf:0x77476fcf @collection_class=Array, @contract=String>, :processing=>String, :archive=>String}, nil]>, :enriched=>{:good=>String, :bad=>#<Contracts::Maybe:0x5dd747c1 @vals=[String, nil]>, :errors=>#<Contracts::Maybe:0x7aa3857b @vals=[String, nil]>, :archive=>#<Contracts::Maybe:0x37806be6 @vals=[String, nil]>, :stream=>#<Contracts::Maybe:0x1d2046bb @vals=[String, nil]>}, :shredded=>{:good=>String, :bad=>String, :errors=>#<Contracts::Maybe:0xfee881 @vals=[String, nil]>, :archive=>#<Contracts::Maybe:0x1ff463bb @vals=[String, nil]>}}, :consolidate_shredded_output=>Contracts::Bool}, :emr=>{:ami_version=>String, :region=>String, :jobflow_role=>String, :service_role=>String, :placement=>#<Contracts::Maybe:0x4ca0b9b1 @vals=[String, nil]>, :ec2_subnet_id=>#<Contracts::Maybe:0x1be77a76 @vals=[String, nil]>, :ec2_key_name=>String, :security_configuration=>#<Contracts::Maybe:0x25f7cc38 @vals=[String, nil]>, :bootstrap=>#<Contracts::Maybe:0x53e28097 @vals=[#<Contracts::CollectionOf:0x7747cc1b @collection_class=Array, @contract=String>, nil]>, :software=>{:hbase=>#<Contracts::Maybe:0x2e1ba142 @vals=[String, nil]>, :lingual=>#<Contracts::Maybe:0x595ec862 @vals=[String, nil]>}, :jobflow=>{:job_name=>String, :master_instance_type=>String, :core_instance_count=>Contracts::Num, :core_instance_type=>String, :core_instance_ebs=>#<Contracts::Maybe:0x173b1af1 @vals=[{:volume_size=>#<Proc:0x2ce24a1a@uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/contracts.rb:41 (lambda)>, :volume_type=>#<Proc:0x26bce60d@uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/contracts.rb:40 (lambda)>, :volume_iops=>#<Contracts::Maybe:0x76eadc5a @vals=[#<Proc:0x2ce24a1a@uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/contracts.rb:41 (lambda)>, nil]>, :ebs_optimized=>#<Contracts::Maybe:0x20914835 @vals=[Contracts::Bool, nil]>}, nil]>, :task_instance_count=>Contracts::Num, :task_instance_type=>String, :task_instance_bid=>#<Contracts::Maybe:0x6f68756d @vals=[Contracts::Num, nil]>}, :additional_info=>#<Contracts::Maybe:0x5d9bb69b @vals=[String, nil]>, :bootstrap_failure_tries=>Contracts::Num, :configuration=>#<Contracts::Maybe:0x7ffcb232 @vals=[#<Contracts::HashOf:0x1dd76982 @key=Symbol, @value=#<Contracts::HashOf:0x7e76a66f @key=Symbol, @value=String>>, nil]>}}, :collectors=>#<Contracts::Maybe:0x5feb82b3 @vals=[{:format=>String}, nil]>, :enrich=>{:versions=>#<Contracts::Maybe:0x54e0f76f @vals=[{:spark_enrich=>String}, nil]>, :continue_on_unexpected_error=>#<Contracts::Maybe:0x7462ba4b @vals=[Contracts::Bool, nil]>, :output_compression=>#<Proc:0x2b058bfd@uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/contracts.rb:39 (lambda)>}, :storage=>{:versions=>{:rdb_shredder=>String, :hadoop_elasticsearch=>String, :rdb_loader=>String}}, :monitoring=>{:tags=>#<Contracts::HashOf:0x7ee64b53 @key=Symbol, @value=String>, :logging=>{:level=>String}, :snowplow=>#<Contracts::Maybe:0x26d73519 @vals=[{:method=>String, :collector=>String, :app_id=>String}, nil]>}}, nil]>,
Actual: {:aws=>{:access_key_id=>“redacted”, :secret_access_key=>“redacted”, :s3=>{:region=>“ap-southeast-1”, :buckets=>{:assets=>“s3://snowplow-hosted-assets”, :jsonpath_assets=>nil, :log=>“s3://snowplow-s3-to-redshift-log”, :encrypted=>false, :enriched=>{:good=>nil, :archive=>“s3://snowplow-s3-to-redshift/archive”, :stream=>“s3://snowplow-s3storage”}, :shredded=>{:good=>“s3://snowplow-s3-to-redshift/shredded/good”, :bad=>“s3://snowplow-s3-to-redshift/shredded/bad”, :errors=>“ADD HERE”, :archive=>“s3://snowplow-s3-to-redshift/shredded/archive”}}, :consolidate_shredded_output=>false}, :emr=>{:ami_version=>“5.9.0”, :region=>“ap-southeast-1”, :jobflow_role=>“EMR_EC2_DefaultRole”, :service_role=>“EMR_DefaultRole”, :placement=>“ADD HERE”, :ec2_subnet_id=>“subnet-6aa9a123”, :ec2_key_name=>“conroychan2”, :security_configuration=>nil, :bootstrap=>, :software=>{:hbase=>nil, :lingual=>nil}, :jobflow=>{:job_name=>“Data Load from S3 to Redshift”, :master_instance_type=>“m1.medium”, :core_instance_count=>2, :core_instance_type=>“m1.medium”, :core_instance_ebs=>{:volume_size=>100, :volume_type=>“gp2”, :volume_iops=>400, :ebs_optimized=>false}, :task_instance_count=>0, :task_instance_type=>“m1.medium”, :task_instance_bid=>0.015}, :bootstrap_failure_tries=>3, :configuration=>{:“yarn-site”=>{:“yarn.resourcemanager.am.max-attempts”=>“1”}, :spark=>{:maximizeResourceAllocation=>“true”}}, :additional_info=>nil}}, :enrich=>{:versions=>{:spark_enrich=>“1.17.0”}, :output_compression=>“GZIP”}, :storage=>{:versions=>{:rdb_loader=>“0.14.0”, :rdb_shredder=>“0.13.1”, :hadoop_elasticsearch=>“0.1.0”}}, :monitoring=>{:tags=>{}, :logging=>{:level=>“DEBUG”}}}
Value guarded in: Snowplow::EmrEtlRunner::Cli::load_config
With Contract: Maybe, String, Bool => Maybe
At: uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/cli.rb:211
failure_callback at uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/contracts.rb:32
call_with at uri:classloader:/gems/contracts-0.11.0/lib/contracts/call_with.rb:80
block in redefine_method at uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_handler.rb:138
process_options at uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/cli.rb:199
get_args_config_enrichments_resolver at uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/cli.rb:173
send_to at uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_reference.rb:43
call_with at uri:classloader:/gems/contracts-0.11.0/lib/contracts/call_with.rb:76
block in redefine_method at uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_handler.rb:138
at uri:classloader:/emr-etl-runner/bin/snowplow-emr-etl-runner:37
load at org/jruby/RubyKernel.java:994
at uri:classloader:/META-INF/main.rb:1
require at org/jruby/RubyKernel.java:970
(root) at uri:classloader:/META-INF/main.rb:1
at uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/rubygems/core_ext/kernel_require.rb:1
ERROR: org.jruby.embed.EvalFailedException: (ReturnContractError) Contract violation for return value:
Expected: #<Contracts::Maybe:0xf01fc6d @vals=[{:aws=>{:access_key_id=>String, :secret_access_key=>String, :s3=>{:region=>String, :buckets=>{:assets=>String, :jsonpath_assets=>#<Contracts::Maybe:0x6591f8ea @vals=[String, nil]>, :log=>String, :encrypted=>Contracts::Bool, :raw=>#<Contracts::Maybe:0x136fece2 @vals=[{:in=>#<Contracts::CollectionOf:0x77476fcf @collection_class=Array, @contract=String>, :processing=>String, :archive=>String}, nil]>, :enriched=>{:good=>String, :bad=>#<Contracts::Maybe:0x5dd747c1 @vals=[String, nil]>, :errors=>#<Contracts::Maybe:0x7aa3857b @vals=[String, nil]>, :archive=>#<Contracts::Maybe:0x37806be6 @vals=[String, nil]>, :stream=>#<Contracts::Maybe:0x1d2046bb @vals=[String, nil]>}, :shredded=>{:good=>String, :bad=>String, :errors=>#<Contracts::Maybe:0xfee881 @vals=[String, nil]>, :archive=>#<Contracts::Maybe:0x1ff463bb @vals=[String, nil]>}}, :consolidate_shredded_output=>Contracts::Bool}, :emr=>{:ami_version=>String, :region=>String, :jobflow_role=>String, :service_role=>String, :placement=>#<Contracts::Maybe:0x4ca0b9b1 @vals=[String, nil]>, :ec2_subnet_id=>#<Contracts::Maybe:0x1be77a76 @vals=[String, nil]>, :ec2_key_name=>String, :security_configuration=>#<Contracts::Maybe:0x25f7cc38 @vals=[String, nil]>, :bootstrap=>#<Contracts::Maybe:0x53e28097 @vals=[#<Contracts::CollectionOf:0x7747cc1b @collection_class=Array, @contract=String>, nil]>, :software=>{:hbase=>#<Contracts::Maybe:0x2e1ba142 @vals=[String, nil]>, :lingual=>#<Contracts::Maybe:0x595ec862 @vals=[String, nil]>}, :jobflow=>{:job_name=>String, :master_instance_type=>String, :core_instance_count=>Contracts::Num, :core_instance_type=>String, :core_instance_ebs=>#<Contracts::Maybe:0x173b1af1 @vals=[{:volume_size=>#<Proc:0x2ce24a1a@uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/contracts.rb:41 (lambda)>, :volume_type=>#<Proc:0x26bce60d@uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/contracts.rb:40 (lambda)>, :volume_iops=>#<Contracts::Maybe:0x76eadc5a @vals=[#<Proc:0x2ce24a1a@uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/contracts.rb:41 (lambda)>, nil]>, :ebs_optimized=>#<Contracts::Maybe:0x20914835 @vals=[Contracts::Bool, nil]>}, nil]>, :task_instance_count=>Contracts::Num, :task_instance_type=>String, :task_instance_bid=>#<Contracts::Maybe:0x6f68756d @vals=[Contracts::Num, nil]>}, :additional_info=>#<Contracts::Maybe:0x5d9bb69b @vals=[String, nil]>, :bootstrap_failure_tries=>Contracts::Num, :configuration=>#<Contracts::Maybe:0x7ffcb232 @vals=[#<Contracts::HashOf:0x1dd76982 @key=Symbol, @value=#<Contracts::HashOf:0x7e76a66f @key=Symbol, @value=String>>, nil]>}}, :collectors=>#<Contracts::Maybe:0x5feb82b3 @vals=[{:format=>String}, nil]>, :enrich=>{:versions=>#<Contracts::Maybe:0x54e0f76f @vals=[{:spark_enrich=>String}, nil]>, :continue_on_unexpected_error=>#<Contracts::Maybe:0x7462ba4b @vals=[Contracts::Bool, nil]>, :output_compression=>#<Proc:0x2b058bfd@uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/contracts.rb:39 (lambda)>}, :storage=>{:versions=>{:rdb_shredder=>String, :hadoop_elasticsearch=>String, :rdb_loader=>String}}, :monitoring=>{:tags=>#<Contracts::HashOf:0x7ee64b53 @key=Symbol, @value=String>, :logging=>{:level=>String}, :snowplow=>#<Contracts::Maybe:0x26d73519 @vals=[{:method=>String, :collector=>String, :app_id=>String}, nil]>}}, nil]>,
Actual: {:aws=>{:access_key_id=>“redacted”, :secret_access_key=>“redacted”, :s3=>{:region=>“ap-southeast-1”, :buckets=>{:assets=>“s3://snowplow-hosted-assets”, :jsonpath_assets=>nil, :log=>“s3://snowplow-s3-to-redshift-log”, :encrypted=>false, :enriched=>{:good=>nil, :archive=>“s3://snowplow-s3-to-redshift/archive”, :stream=>“s3://snowplow-s3storage”}, :shredded=>{:good=>“s3://snowplow-s3-to-redshift/shredded/good”, :bad=>“s3://snowplow-s3-to-redshift/shredded/bad”, :errors=>“ADD HERE”, :archive=>“s3://snowplow-s3-to-redshift/shredded/archive”}}, :consolidate_shredded_output=>false}, :emr=>{:ami_version=>“5.9.0”, :region=>“ap-southeast-1”, :jobflow_role=>“EMR_EC2_DefaultRole”, :service_role=>“EMR_DefaultRole”, :placement=>“ADD HERE”, :ec2_subnet_id=>“subnet-6aa9a123”, :ec2_key_name=>“conroychan2”, :security_configuration=>nil, :bootstrap=>, :software=>{:hbase=>nil, :lingual=>nil}, :jobflow=>{:job_name=>“Data Load from S3 to Redshift”, :master_instance_type=>“m1.medium”, :core_instance_count=>2, :core_instance_type=>“m1.medium”, :core_instance_ebs=>{:volume_size=>100, :volume_type=>“gp2”, :volume_iops=>400, :ebs_optimized=>false}, :task_instance_count=>0, :task_instance_type=>“m1.medium”, :task_instance_bid=>0.015}, :bootstrap_failure_tries=>3, :configuration=>{:“yarn-site”=>{:“yarn.resourcemanager.am.max-attempts”=>“1”}, :spark=>{:maximizeResourceAllocation=>“true”}}, :additional_info=>nil}}, :enrich=>{:versions=>{:spark_enrich=>“1.17.0”}, :output_compression=>“GZIP”}, :storage=>{:versions=>{:rdb_loader=>“0.14.0”, :rdb_shredder=>“0.13.1”, :hadoop_elasticsearch=>“0.1.0”}}, :monitoring=>{:tags=>{}, :logging=>{:level=>“DEBUG”}}}
Value guarded in: Snowplow::EmrEtlRunner::Cli::load_config
With Contract: Maybe, String, Bool => Maybe
At: uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/cli.rb:211
Here is my config file:
aws:
Credentials can be hardcoded or set in environment variables
access_key_id: XXXXXXXXXXXX
secret_access_key: ZZZZZZZZZZZZZZs3:
region: ap-southeast-1
buckets:
assets: s3://snowplow-hosted-assets # DO NOT CHANGE unless you are hosting the jarfiles etc yourself in your own bucket
jsonpath_assets: # If you have defined your own JSON Schemas, add the s3:// path to your own JSON Path files in your own bucket here
log: s3://snowplow-s3-to-redshift-log
encrypted: false
enriched:
good: # e.g. s3://my-out-bucket/enriched/good
archive: s3://snowplow-s3-to-redshift/archive # Where to archive enriched events to, e.g. s3://my-archive-bucket/enriched
stream: s3://snowplow-s3storage # S3 Loader’s output folder with enriched data. If present raw buckets will be discarded
shredded:
good: s3://snowplow-s3-to-redshift/shredded/good # e.g. s3://my-out-bucket/shredded/good
bad: s3://snowplow-s3-to-redshift/shredded/bad # e.g. s3://my-out-bucket/shredded/bad
errors: ADD HERE # Leave blank unless :continue_on_unexpected_error: set to true below
archive: s3://snowplow-s3-to-redshift/shredded/archive # Where to archive shredded events to, e.g. s3://my-archive-bucket/shredded
consolidate_shredded_output: false # Whether to combine files when copying from hdfs to s3
emr:
ami_version: 5.9.0
region: ap-southeast-1 # Always set this
jobflow_role: EMR_EC2_DefaultRole # Created using aws emr create-default-roles service_role: EMR_DefaultRole # Created using aws emr create-default-roles
placement: ADD HERE # Set this if not running in VPC. Leave blank otherwise
ec2_subnet_id: subnet-6aa9a123 # Set this if running in VPC. Leave blank otherwise
ec2_key_name: conroychan2
security_configuration: # Specify your EMR security configuration if needed. Leave blank otherwise
bootstrap: # Set this to specify custom boostrap actions. Leave empty otherwise
software:
hbase: # Optional. To launch on cluster, provide version, “0.92.0”, keep quotes. Leave empty otherwise.
lingual: # Optional. To launch on cluster, provide version, “1.1”, keep quotes. Leave empty otherwise.
# Adjust your Hadoop cluster below
jobflow:
job_name: Data Load from S3 to Redshift
master_instance_type: m1.medium
core_instance_count: 2
core_instance_type: m1.medium
core_instance_ebs: # Optional. Attach an EBS volume to each core instance.
volume_size: 100 # Gigabytes
volume_type: “gp2”
volume_iops: 400 # Optional. Will only be used if volume_type is “io1”
ebs_optimized: false # Optional. Will default to true
task_instance_count: 0 # Increase to use spot instances
task_instance_type: m1.medium
task_instance_bid: 0.015 # In USD. Adjust bid, or leave blank for non-spot-priced (i.e. on-demand) task instances
bootstrap_failure_tries: 3 # Number of times to attempt the job in the event of bootstrap failures
configuration:
yarn-site:
yarn.resourcemanager.am.max-attempts: “1”
spark:
maximizeResourceAllocation: “true”
additional_info: # Optional JSON string for selecting additional features
enrich:
versions:
spark_enrich: 1.17.0 # Version of the Spark Enrichment process
output_compression: GZIP # Stream mode supports only GZIP
storage:
versions:
rdb_loader: 0.14.0
rdb_shredder: 0.13.1 # Version of the Spark Shredding process
hadoop_elasticsearch: 0.1.0 # Version of the Hadoop to Elasticsearch copying process
monitoring:
tags: {} # Name-value pairs describing this job
logging:
level: DEBUG # You can optionally switch to INFO for production
Here is the iglu_resolver.json:
{
“schema”: “iglu:com.snowplowanalytics.iglu/resolver-config/jsonschema/1-0-1”,
“data”: {
"cacheSize": 500, "repositories": [ { "name": "Iglu Central", "priority": 0, "vendorPrefixes": [ "com.snowplowanalytics" ], "connection": { "http": { "uri": "http://iglucentral.com" } } }, { "name": "Iglu Central - GCP Mirror", "priority": 1, "vendorPrefixes": [ "com.snowplowanalytics" ], "connection": { "http": { "uri": "http://mirror01.iglucentral.com" } } } ]
}
}
Here is the redshift.json:
{
"schema": "iglu:com.snowplowanalytics.snowplow.storage/redshift_config/jsonschema/2-1-0", "data": { "name": "AWS Redshift enriched events storage", "host": "xxxxxxx.ap-southeast-1.redshift.amazonaws.com", "database": "snowplowstream", "port": 5439, "sslMode": "DISABLE", "username": "storageloader", "password": "xxxxxxxxx", "roleArn": "arn:aws:iam::xxxxxxxxxxxx:role/RedshiftS3Access", "schema": "atomic", "maxError": 1, "compRows": 20000, "sshTunnel": null, "purpose": "ENRICHED_EVENTS" }
}