Value guarded in: Snowplow::EmrEtlRunner::Cli::load_config

I am very new to snowplow and yaml configs so I assume I am making a minor mistake here hopefully.

I am using this zip of emr snowplow_emr_r102_afontova_gora.zip

I am following these instructions https://github.com/snowplow/snowplow/wiki/1-Installing-EmrEtlRunner
the one part I didn’t set up was the storage part because I didn’t know where to put the reshift.json or other ones depending on what i used.

Ok thanks ahead of time hopefully someone can help me with this.

When I run
./snowplow-emr-etl-runner run -c config.yml -r iglu_resolver.json

this is my yaml config so far

aws:

  # Credentials can be hardcoded or set in environment variables

  access_key_id: 'ggg'

  secret_access_key: 'g+g'

  s3:

    region: "us-east-2"

    buckets:

      assets: s3://snowplow-hosted-assets # DO NOT CHANGE unless you are hosting the jarfiles etc yourself in your own bucket

      jsonpath_assets: # If you have defined your own JSON Schemas, add the s3:// path to your own JSON Path files in your own bucket here

      log: s3://enrich-logs-riel-design

      raw:

        in: s3://enrich-in

        processing: s3://enrich-processing

        archive: s3://enrich-archive    # e.g. s3://my-archive-bucket/in

      enriched:

        good: s3://enrich-good       # e.g. s3://my-out-bucket/enriched/good

        bad: s3://enrich-bad        # e.g. s3://my-out-bucket/enriched/bad

        errors: s3://enrich-errors     # Leave blank unless continue_on_unexpected_error: set to true below

        archive: s3://enrich-archive-enriched    # Where to archive enriched events to, e.g. s3://my-archive-bucket/enriched

      shredded:

        good: s3://enrich-shredded/good       # e.g. s3://my-out-bucket/shredded/good

        bad: s3://enrich-shredded/bad        # e.g. s3://my-out-bucket/shredded/bad

        errors: s3://enrich-shredded/errors     # Leave blank unless continue_on_unexpected_error: set to true below

        archive: s3://enrich-shredded/archive   # Where to archive shredded events to, e.g. s3://my-archive-bucket/shredded

  emr:

    job_name: Snowplow ETL # Give your job a name

    ami_version: 5.9.0      # Don't change this

    region: "us-east-2"      # Always set this

    jobflow_role: EMR_EC2_DefaultRole # Created using $ aws emr create-default-roles

    service_role: EMR_DefaultRole     # Created using $ aws emr create-default-roles

    placement:      # Set this if not running in VPC. Leave blank otherwise

    ec2_subnet_id: "subnet-a92da0e5" # Set this if running in VPC. Leave blank otherwise

    ec2_key_name: "snowplow-ec2"

    bootstrap: []           # Set this to specify custom boostrap actions. Leave empty otherwise

    software:

      hbase:                # Optional. To launch on cluster, provide version, "0.92.0", keep quotes. Leave empty otherwise.

      lingual:              # Optional. To launch on cluster, provide version, "1.1", keep quotes. Leave empty otherwise.

    # Adjust your Spark cluster below

    jobflow:

      master_instance_type: m1.medium

      core_instance_count: 2

      core_instance_type: m1.medium

      core_instance_ebs:    # Optional. Attach an EBS volume to each core instance.

        volume_size: 100    # Gigabytes

        volume_type: "gp2"

        volume_iops: 400    # Optional. Will only be used if volume_type is "io1"

        ebs_optimized: false # Optional. Will default to true

      task_instance_count: 0 # Increase to use spot instances

      task_instance_type: m1.medium

      task_instance_bid: 0.015 # In USD. Adjust bid, or leave blank for non-spot-priced (i.e. on-demand) task instances

    bootstrap_failure_tries: 3 # Number of times to attempt the job in the event of bootstrap failures

    additional_info:        # Optional JSON string for selecting additional features

collectors:

  format: cloudfront # Or 'clj-tomcat' for the Clojure Collector, or 'thrift' for Thrift records, or 'tsv/com.amazon.aws.cloudfront/wd_access_log' for Cloudfront access logs

enrich:

  versions:

    spark_enrich: 1.10.0 # Version of the Spark Enrichment process

  continue_on_unexpected_error: false # Set to 'true' (and set out_errors: above) if you don't want any exceptions thrown from ETL

  output_compression: NONE # Compression only supported with Redshift, set to NONE if you have Postgres targets. Allowed formats: NONE, GZIP

storage:

  versions:

    rdb_shredder: 0.13.0        # Version of the Relational Database Shredding process

    rdb_loader: 0.14.0          # Version of the Relational Database Loader app

    hadoop_elasticsearch: 0.1.0 # Version of the Hadoop to Elasticsearch copying process

monitoring:

  tags: {} # Name-value pairs describing this job

  logging:

    level: DEBUG # You can optionally switch to INFO for production

  snowplow:

    method: get

    app_id: "e-dpcpvtxxpi" # e.g. snowplow

    collector: riel-design-email-open.us-east-2.elasticbeanstalk.com # e.g. d3rkrsqld9gmqf.cloudfront.net

I get the following error

ReturnContractError: Contract violation for return value:        Expected: #<Contracts::Maybe:0x7146c6ea @vals=[{:aws=>{:access_key_id=>String, :secret_access_key=>String, :s3=>{:region=>String, :buckets=>{:assets=>String, :jsonpath_assets=>#<Contracts::Maybe:0x410f53b2 @vals=[String, nil]>, :log=>String, :raw=>#<Contracts::Maybe:0x7210f559 @vals=[{:in=>#<Contracts::CollectionOf:0x61e0f9b9 @contract=String, @collection_class=Array>, :processing=>String, :archive=>String}, nil]>, :enriched=>{:good=>String, :bad=>#<Contracts::Maybe:0x41404aa2 @vals=[String, nil]>, :errors=>#<Contracts::Maybe:0x31e22365 @vals=[String, nil]>, :archive=>#<Contracts::Maybe:0x41bdaa81 @vals=[String, nil]>, :stream=>#<Contracts::Maybe:0x59c08cf1 @vals=[String, nil]>}, :shredded=>{:good=>String, :bad=>String, :errors=>#<Contracts::Maybe:0x1a3c4b3e @vals=[String, nil]>, :archive=>#<Contracts::Maybe:0x4746fb8c @vals=[String, nil]>}}}, :emr=>{:ami_version=>String, :region=>String, :jobflow_role=>String, :service_role=>String, :placement=>#<Contracts::Maybe:0x470135da @vals=[String, nil]>, :ec2_subnet_id=>#<Contracts::Maybe:0x5f68eec6 @vals=[String, nil]>, :ec2_key_name=>String, :bootstrap=>#<Contracts::Maybe:0x6cb194f5 @vals=[#<Contracts::CollectionOf:0x6296e4bf @contract=String, @collection_class=Array>, nil]>, :software=>{:hbase=>#<Contracts::Maybe:0x50fa5938 @vals=[String, nil]>, :lingual=>#<Contracts::Maybe:0x58b5f7d2 @vals=[String, nil]>}, :jobflow=>{:job_name=>String, :master_instance_type=>String, :core_instance_count=>Contracts::Num, :core_instance_type=>String, :core_instance_ebs=>#<Contracts::Maybe:0x5814b4fb @vals=[{:volume_size=>#<Proc:0x5c459194@uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/contracts.rb:28 (lambda)>, :volume_type=>#<Proc:0x61d8a491@uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/contracts.rb:27 (lambda)>, :volume_iops=>#<Contracts::Maybe:0x6065bcb7 @vals=[#<Proc:0x5c459194@uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/contracts.rb:28 (lambda)>, nil]>, :ebs_optimized=>#<Contracts::Maybe:0x71d78cac @vals=[Contracts::Bool, nil]>}, nil]>, :task_instance_count=>Contracts::Num, :task_instance_type=>String, :task_instance_bid=>#<Contracts::Maybe:0x18d22ecf @vals=[Contracts::Num, nil]>}, :additional_info=>#<Contracts::Maybe:0x31443680 @vals=[String, nil]>, :bootstrap_failure_tries=>Contracts::Num, :configuration=>#<Contracts::Maybe:0x878feb2 @vals=[#<Contracts::HashOf:0x8b1170f @key=Symbol, @value=#<Contracts::HashOf:0x69cb134 @key=Symbol, @value=String>>, nil]>}}, :collectors=>#<Contracts::Maybe:0x67a38 @vals=[{:format=>String}, nil]>, :enrich=>{:versions=>#<Contracts::Maybe:0x5b5b8730 @vals=[{:spark_enrich=>String}, nil]>, :continue_on_unexpected_error=>#<Contracts::Maybe:0x321c2a7 @vals=[Contracts::Bool, nil]>, :output_compression=>#<Proc:0x1bbc1b90@uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/contracts.rb:26 (lambda)>}, :storage=>{:versions=>{:rdb_shredder=>String, :hadoop_elasticsearch=>String, :rdb_loader=>String}}, :monitoring=>{:tags=>#<Contracts::HashOf:0x72ceafeb @key=Symbol, @value=String>, :logging=>{:level=>String}, :snowplow=>#<Contracts::Maybe:0x63d3c9dc @vals=[{:method=>String, :collector=>String, :app_id=>String}, nil]>}}, nil]>,        Actual: {:aws=>{:access_key_id=>"fdsaf", :secret_access_key=>"fdsafsa+dhdyuZDOcqZlji", :s3=>{:region=>"us-east-2", :buckets=>{:assets=>"s3://snowplow-hosted-assets", :jsonpath_assets=>nil, :log=>"s3://enrich-logs-riel-design", :raw=>{:in=>"s3://enrich-in", :processing=>"s3://enrich-processing", :archive=>"s3://enrich-archive"}, :enriched=>{:good=>"s3://enrich-good", :bad=>"s3://enrich-bad", :errors=>"s3://enrich-errors", :archive=>"s3://enrich-archive-enriched"}, :shredded=>{:good=>"s3://enrich-shredded/good", :bad=>"s3://enrich-shredded/bad", :errors=>"s3://enrich-shredded/errors", :archive=>"s3://enrich-shredded/archive"}}}, :emr=>{:job_name=>"Snowplow ETL", :ami_version=>"5.9.0", :region=>"us-east-2", :jobflow_role=>"EMR_EC2_DefaultRole", :service_role=>"EMR_DefaultRole", :placement=>nil, :ec2_subnet_id=>"subnet-a92da0e5", :ec2_key_name=>"snowplow-ec2", :bootstrap=>[], :software=>{:hbase=>nil, :lingual=>nil}, :jobflow=>{:master_instance_type=>"m1.medium", :core_instance_count=>2, :core_instance_type=>"m1.medium", :core_instance_ebs=>{:volume_size=>100, :volume_type=>"gp2", :volume_iops=>400, :ebs_optimized=>false}, :task_instance_count=>0, :task_instance_type=>"m1.medium", :task_instance_bid=>0.015}, :bootstrap_failure_tries=>3, :additional_info=>nil}}, :collectors=>{:format=>"cloudfront"}, :enrich=>{:versions=>{:spark_enrich=>"1.10.0"}, :continue_on_unexpected_error=>false, :output_compression=>"NONE"}, :storage=>{:versions=>{:rdb_shredder=>"0.13.0", :rdb_loader=>"0.14.0", :hadoop_elasticsearch=>"0.1.0"}}, :monitoring=>{:tags=>{}, :logging=>{:level=>"DEBUG"}, :snowplow=>{:method=>"get", :app_id=>"e-dpcpvtxxpi", :collector=>"riel-design-email-open.us-east-2.elasticbeanstalk.com"}}}        Value guarded in: Snowplow::EmrEtlRunner::Cli::load_config        With Contract: Maybe, String, Bool => Maybe        At: uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/cli.rb:202                     block in Contract at uri:classloader:/gems/contracts-0.11.0/lib/contracts.rb:45                      failure_callback at uri:classloader:/gems/contracts-0.11.0/lib/contracts.rb:154                             call_with at uri:classloader:/gems/contracts-0.11.0/lib/contracts/call_with.rb:80              block in redefine_method at uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_handler.rb:138                       process_options at uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/cli.rb:191  get_args_config_enrichments_resolver at uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/cli.rb:167                               send_to at uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_reference.rb:43
                             call_with at uri:classloader:/gems/contracts-0.11.0/lib/contracts/call_with.rb:76
              block in redefine_method at uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_handler.rb:138                                <main> at uri:classloader:/emr-etl-runner/bin/snowplow-emr-etl-runner:37
                                  load at org/jruby/RubyKernel.java:979
                                <main> at uri:classloader:/META-INF/main.rb:1
                               require at org/jruby/RubyKernel.java:961
                                (root) at uri:classloader:/META-INF/main.rb:1
                                <main> at uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/rubygems/core_ext/kernel_require.rb:1
ERROR: org.jruby.embed.EvalFailedException: (ReturnContractError) Contract violation for return value:
        Expected: #<Contracts::Maybe:0x7146c6ea @vals=[{:aws=>{:access_key_id=>String, :secret_access_key=>String, :s3=>{:region=>String, :buckets=>{:assets=>String, :jsonpath_assets=>#<Contracts::Maybe:0x410f53b2 @vals=[String, nil]>, :log=>String, :raw=>#<Contracts::Maybe:0x7210f559 @vals=[{:in=>#<Contracts::CollectionOf:0x61e0f9b9 @contract=String, @collection_class=Array>, 
:processing=>String, :archive=>String}, nil]>, :enriched=>{:good=>String, :bad=>#<Contracts::Maybe:0x41404aa2 @vals=[String, nil]>, :errors=>#<Contracts::Maybe:0x31e22365 @vals=[String, nil]>, :archive=>#<Contracts::Maybe:0x41bdaa81 @vals=[String, nil]>, :stream=>#<Contracts::Maybe:0x59c08cf1 @vals=[String, nil]>}, :shredded=>{:good=>String, :bad=>String, :errors=>#<Contracts::Maybe:0x1a3c4b3e @vals=[String, nil]>, :archive=>#<Contracts::Maybe:0x4746fb8c @vals=[String, nil]>}}}, :emr=>{:ami_version=>String, :region=>String, :jobflow_role=>String, :service_role=>String, :placement=>#<Contracts::Maybe:0x470135da @vals=[String, nil]>, :ec2_subnet_id=>#<Contracts::Maybe:0x5f68eec6 @vals=[String, nil]>, :ec2_key_name=>String, :bootstrap=>#<Contracts::Maybe:0x6cb194f5 @vals=[#<Contracts::CollectionOf:0x6296e4bf @contract=String, @collection_class=Array>, nil]>, :software=>{:hbase=>#<Contracts::Maybe:0x50fa5938 @vals=[String, nil]>, :lingual=>#<Contracts::Maybe:0x58b5f7d2 @vals=[String, nil]>}, :jobflow=>{:job_name=>String, :master_instance_type=>String, :core_instance_count=>Contracts::Num, :core_instance_type=>String, :core_instance_ebs=>#<Contracts::Maybe:0x5814b4fb @vals=[{:volume_size=>#<Proc:0x5c459194@uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/contracts.rb:28 (lambda)>, :volume_type=>#<Proc:0x61d8a491@uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/contracts.rb:27 (lambda)>, :volume_iops=>#<Contracts::Maybe:0x6065bcb7 @vals=[#<Proc:0x5c459194@uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/contracts.rb:28 (lambda)>, nil]>, :ebs_optimized=>#<Contracts::Maybe:0x71d78cac @vals=[Contracts::Bool, nil]>}, nil]>, :task_instance_count=>Contracts::Num, :task_instance_type=>String, :task_instance_bid=>#<Contracts::Maybe:0x18d22ecf @vals=[Contracts::Num, nil]>}, :additional_info=>#<Contracts::Maybe:0x31443680 @vals=[String, nil]>, :bootstrap_failure_tries=>Contracts::Num, :configuration=>#<Contracts::Maybe:0x878feb2 @vals=[#<Contracts::HashOf:0x8b1170f @key=Symbol, @value=#<Contracts::HashOf:0x69cb134 @key=Symbol, @value=String>>, nil]>}}, :collectors=>#<Contracts::Maybe:0x67a38 @vals=[{:format=>String}, nil]>, :enrich=>{:versions=>#<Contracts::Maybe:0x5b5b8730 @vals=[{:spark_enrich=>String}, nil]>, :continue_on_unexpected_error=>#<Contracts::Maybe:0x321c2a7 @vals=[Contracts::Bool, nil]>, :output_compression=>#<Proc:0x1bbc1b90@uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/contracts.rb:26 (lambda)>}, :storage=>{:versions=>{:rdb_shredder=>String, :hadoop_elasticsearch=>String, :rdb_loader=>String}}, :monitoring=>{:tags=>#<Contracts::HashOf:0x72ceafeb @key=Symbol, @value=String>, :logging=>{:level=>String}, :snowplow=>#<Contracts::Maybe:0x63d3c9dc @vals=[{:method=>String, :collector=>String, :app_id=>String}, nil]>}}, nil]>,
        Actual: {:aws=>{:access_key_id=>"fdsafsa", :secret_access_key=>"fdsafa+fdsf", :s3=>{:region=>"us-east-2", :buckets=>{:assets=>"s3://snowplow-hosted-assets", :jsonpath_assets=>nil, :log=>"s3://enrich-logs-riel-design", :raw=>{:in=>"s3://enrich-in", :processing=>"s3://enrich-processing", :archive=>"s3://enrich-archive"}, :enriched=>{:good=>"s3://enrich-good", :bad=>"s3://enrich-bad", :errors=>"s3://enrich-errors", :archive=>"s3://enrich-archive-enriched"}, :shredded=>{:good=>"s3://enrich-shredded/good", :bad=>"s3://enrich-shredded/bad", :errors=>"s3://enrich-shredded/errors", :archive=>"s3://enrich-shredded/archive"}}}, :emr=>{:job_name=>"Snowplow ETL", :ami_version=>"5.9.0", :region=>"us-east-2", :jobflow_role=>"EMR_EC2_DefaultRole", :service_role=>"EMR_DefaultRole", :placement=>nil, :ec2_subnet_id=>"subnet-a92da0e5", :ec2_key_name=>"snowplow-ec2", :bootstrap=>[], :software=>{:hbase=>nil, :lingual=>nil}, :jobflow=>{:master_instance_type=>"m1.medium", :core_instance_count=>2, :core_instance_type=>"m1.medium", :core_instance_ebs=>{:volume_size=>100, :volume_type=>"gp2", :volume_iops=>400, :ebs_optimized=>false}, :task_instance_count=>0, :task_instance_type=>"m1.medium", :task_instance_bid=>0.015}, :bootstrap_failure_tries=>3, :additional_info=>nil}}, :collectors=>{:format=>"cloudfront"}, :enrich=>{:versions=>{:spark_enrich=>"1.10.0"}, :continue_on_unexpected_error=>false, :output_compression=>"NONE"}, :storage=>{:versions=>{:rdb_shredder=>"0.13.0", 
:rdb_loader=>"0.14.0", :hadoop_elasticsearch=>"0.1.0"}}, :monitoring=>{:tags=>{}, :logging=>{:level=>"DEBUG"}, :snowplow=>{:method=>"get", :app_id=>"e-dpcpvtxxpi", :collector=>"riel-design-email-open.us-east-2.elasticbeanstalk.com"}}}
        Value guarded in: Snowplow::EmrEtlRunner::Cli::load_config
        With Contract: Maybe, String, Bool => Maybe
        At: uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/cli.rb:202

@Anders_Kitson, here’s the example of correct configuration file format for the EmrEtlRunner you are using, R102: https://github.com/snowplow/snowplow/blob/r102-afontova-gora/3-enrich/emr-etl-runner/config/config.yml.sample.

Specifically,

  1. Your raw:in bucket is expected to be an array of buckets
  2. The job_name has to be moved out of immediate emr scope into emr:jobflow

That is

raw:
    in:
    - s3://enrich-in

and

emr:
    ami_version: 5.9.0      # Don't change this
    . . .
    jobflow:
      job_name: Snowplow ETL # Give your job a name

Do note deferent EmrEtlRunner versions might have different config format.

Thank you I will give the new config a try and report back. @ihor How do I know how many buckets are needed for the array, or is it just two like the example?

Ok It seemed to work now it said
No logs to process: No Snowplow logs to process since last run
But there are logs in elastic beanstock collector, so not sure if it was working 100%, but you solved my error. SO thank you so much.

@ihor, I don’t see anything in my s3 buckets. Should I at this stage? should there be initial test data in them?

Is my environent ID my app_id in the config.yml image just not sure why I get no logs

@Anders_Kitson, just an example. There could be many sources of raw events. You might have more than one collector to feed the data to the pipeline from.

Have you set up any collector to catch the data you track? If so, you need to make sure the data the collector collects ends up in the “raw” bucket (unless you enrich data in real-time pipeline).

If you set up Clojure collector the logs (raw events) are rotated by AWS once an hour and are beyond your control.

oh so I need to in my collector output the data to the raw bucket is there setup instructions for doing that. “just an example” do you mean I should see example data?

I think this is what I missed

Yes, it is important to enable log rotation to S3.