Snowplow Event recovery

hi,
I have a bunch of snowplow bad events to process in an S3 enrich bad bucket.
I would like to get someones help on understanding how can I use snowplow event recovery jar for this scenario.

I started testing using EMR spark job with the jar s3://snowplow-hosted-assets/3-enrich/snowplow-event-recovery/snowplow-event-recovery-spark-0.1.0.jar and it wasn’t successful.

More details about the bad events is it’s caused due to an invalid value in one of our unstructered events context. So I am not quite sure how I should write the config JSON with the replace function for that unstructured context.

The following is the config.json

basically I want to replace “off plan” with “off_plan” in this listing context

  {
  "schema": "iglu:com.snowplowanalytics.snowplow/recoveries/jsonschema/1-0-0",
  "data": [{
    "name": "ReplaceInBase64FieldInBody",
    "error": "instance value (\"off plan\") not found in enum (possible values: [\"off_plan\",\"completed\",null])\n  level: \"error\"\n schema: {\"loadingURI\":\"#\",\"pointer\":\"/properties/completion_status\"}\n instance: {\"pointer\":\"/completion_status\"}\n    domain: \"validation\"\n    keyword: \"enum\"\n    value: \"off plan\"\n    enum: [\"off_plan\",\"completed\",null]\n",
    "base64Field": "cx",
    "toReplace": "\"off plan\"",
    "replacement": "\"off plan\":\"off_plan\""
  }]
}

Kindly shed some light on how to write the config.json for my scenario correctly.

EMR spark app details:

spark-submit --class com.snowplowanalytcs.snowplow.event.recovery.Main --master yarn --deploy-mode cluster s3://snowplow-hosted-assets/3-enrich/snowplow-event-recovery/snowplow-event-recovery-spark-0.1.0.jar --input s3://sp-dev-badevents/ --output s3://sp-dev-badevents/ --config ewogICJzY2hlbWEiOiAiaWdsdTpjb20uc25vd3Bsb3dhbmFseXRpY3Muc25vd3Bsb3cvcmVjb3Zlcmllcy9qc29uc2NoZW1hLzEtMC0wIiwKICAiZGF0YSI6IFt7CiAgICAibmFtZSI6ICJSZXBsYWNlSW5CYXNlNjRGaWVsZEluQm9keSIsCiAgICAiZXJyb3IiOiAiaW5zdGFuY2UgdmFsdWUgKFwib2ZmIHBsYW5cIikgbm90IGZvdW5kIGluIGVudW0gKHBvc3NpYmxlIHZhbHVlczogW1wib2ZmX3BsYW5cIixcImNvbXBsZXRlZFwiLG51bGxdKVxuICBsZXZlbDogXCJlcnJvclwiXG4gc2NoZW1hOiB7XCJsb2FkaW5nVVJJXCI6XCIjXCIsXCJwb2ludGVyXCI6XCIvcHJvcGVydGllcy9jb21wbGV0aW9uX3N0YXR1c1wifVxuIGluc3RhbmNlOiB7XCJwb2ludGVyXCI6XCIvY29tcGxldGlvbl9zdGF0dXNcIn1cbiAgICBkb21haW46IFwidmFsaWRhdGlvblwiXG4gICAga2V5d29yZDogXCJlbnVtXCJcbiAgICB2YWx1ZTogXCJvZmYgcGxhblwiXG4gICAgZW51bTogW1wib2ZmX3BsYW5cIixcImNvbXBsZXRlZFwiLG51bGxdXG4iLAogICAgImJhc2U2NEZpZWxkIjogImN4IiwKICAgICJ0b1JlcGxhY2UiOiAiXCJvZmYgcGxhblwiIiwKICAgICJyZXBsYWNlbWVudCI6ICJcIm9mZiBwbGFuXCI6XCJvZmZfcGxhblwiIgogIH1dCn0=

EMR failing with java.lang.ClassNotFoundException: com.snowplowanalytcs.snowplow.event.recovery.Main

Anyone @alex @anton can guide me with this?

Thanks in Advance.

@Milan_Mathew, before we can give you a hand, could you confirm in what format your bad data is, please? Starting from R118 we introduced New Bad Format. Depending on the format of your bad data the instructions will be different:

Hi @ihor ,
Thank you for your reply. To confirm bad data is in the older format with line,
following is the structure:
{"line":"...base64enoded data........","errors":[{"level":"error","message":"error: instance value (\"off plan\") not found in enum (possible values: [\"off_plan\",\"completed\",null])\n level: \"error\"\n schema: {\"loadingURI\":\"#\",\"pointer\":\"/properties/completion_status\"}\n instance: {\"pointer\":\"/completion_status\"}\n domain: \"validation\"\n keyword: \"enum\"\n value: \"off plan\"\n enum: [\"off_plan\",\"completed\",null]\n"}],"failure_tstamp":"2019-02-01T16:29:46.253Z"}

Thank you

@Milan_Mathew, I can see a few possible issues here:

  1. Region of the hosted assets bucket could be wrong (inaccessible)
  2. Your input and output are the same
  3. Your replacement doesn’t look correct

What region are you running your EMR cluster from? You might need to adjust the bucket name. For example, if you are in us-east-1 then the bucket name would be s3://snowplow-hosted-assets-us-east-1/...

You are taking the files from the same bucket where you intend to load the recovered files to. How would you distinguish bad data from recovered data? I would advise using different buckets ensuring the bad data is not deleted. If something goes wrong you still have your bad data to retry.

You are replacing "off plan" with "off plan":"off_plan". I don’t think that was your intention. Instead, you could use

    "toReplace": "\"off plan\"",
    "replacement": "\"off_plan\""

@ihor,
Thank you for your response again, I checked all the three points you mentioned.

Region of the hosted assets bucket could be wrong (inaccessible)
                    I used the one which is accessible in our region(eu-west-1).

The other two points you mentioned I corrected them.Thanks again for pointing them out.

Anyways, the real issue was the main class I used had a spelling mistake. Now it’s working fine and I got the .lzo files.

Hi @ihor ,
I have comeup with the new issue when I tried to run the EmrEtlRunner.
etl/snowplow-emr-etl-runner run --config etl/config.yml --resolver etl/resolver.json

Following is the config.yml

aws:
  access_key_id: *************************
  secret_access_key: ****************************
  s3:
    region: eu-west-1
    buckets:
      assets: 's3://sp-dev-hostedassetsmirror-25zbzj9qbp5t'
      jsonpath_assets: 's3://sp-dev-iglurepo-3in6ep4wt6ld/jsonpaths'
      log: 's3://sp-dev-badevents-recovered/logs/2020-11-01/'
      encrypted: false
      raw:
        in:
          - 's3://sp-dev-badevents-recovered/2020-11-01/'
        processing: 's3://sp-dev-badevents-recovered/2020-11-01/processing'
        archive: 's3://sp-dev-archive-badevents-recovered/raw/'
      enriched:
        good: 's3://sp-dev-badevents-recovered/enriched/good/2020-11-01/'
        bad: 's3://sp-dev-badevents-recovered/enriched/bad/2020-11-01/'
        errors: 's3://sp-dev-badevents-recovered/enriched/errors/2020-11-01/'
        archive: 's3://sp-dev-archive-badevents-recovered/enriched/good/2020-11-01/'
      shredded:
        good: 's3://sp-dev-badevents-recovered/shredded/good/2020-11-01/'
        bad: 's3://sp-dev-badevents-recovered/shredded/bad/2020-11-01/'
        errors: 's3://sp-dev-badevents-recovered/shredded/errors/2020-11-01/'
        archive: 's3://sp-dev-archive-badevents-recovered/shredded/good/2020-11-01/'
  emr:
    ami_version: 5.9.0
    region: eu-west-1
    jobflow_role: EMR_EC2_DefaultRole
    service_role: EMR_DefaultRole
    placement: null
    ec2_subnet_id: null
    ec2_key_name: '--'
    bootstrap: []
    software:
      hbase: null
      lingual: null
    jobflow:
      job_name: Snowplow BadData ETL
      master_instance_type: m4.xlarge
      core_instance_count: 1
      core_instance_type: m4.xlarge
      core_instance_ebs:
        volume_size: 100
        volume_type: gp2
        volume_iops: null
        ebs_optimized: false
      task_instance_count: 2
      task_instance_type: m4.xlarge
      task_instance_bid: 0.999
    bootstrap_failure_tries: 2
    configuration:
      yarn-site:
        yarn.resourcemanager.am.max-attempts: '1'
      spark:
        maximizeResourceAllocation: 'true'
    additional_info: null
  collectors:
    format: thrift
  enrich:
    versions:
      spark_enrich: 1.14.0
    continue_on_unexpected_error: true
    output_compression: GZIP
  storage:
    versions:
      rdb_loader: 0.14.0
      rdb_shredder: 0.13.1
      hadoop_elasticsearch: 0.1.0
  monitoring:
    tags: {}
    logging: null
    level: DEBUG
  snowplow:
    method: get
    app_id: pf-dev-snowplow
    collector: c-dev.propertyfinder.ae

Failing with the following error :
ERROR: org.jruby.embed.EvalFailedException: (ReturnContractError) Contract violation for return value:
Expected: #<Contracts::Maybe:0x333e01c6 @vals=[{:aws=>{:access_key_id=>String, :secret_access_key=>String, :s3=>{:region=>String, :buckets=>{:assets=>String, :jsonpath_assets=>#<Contracts::Maybe:0x2487b621 @vals=[String, nil]>, :log=>String, :raw=>#<Contracts::Maybe:0x504b4a97 @vals=[{:in=>#<Contracts::CollectionOf:0x3e79473d @contract=String, @collection_class=Array>, :processing=>String, :archive=>String}, nil]>, :enriched=>{:good=>String, :bad=>#<Contracts::Maybe:0x49741e80 @vals=[String, nil]>, :errors=>#<Contracts::Maybe:0x39acf187 @vals=[String, nil]>, :archive=>#<Contracts::Maybe:0xdd3e1e3 @vals=[String, nil]>, :stream=>#<Contracts::Maybe:0x7878459f @vals=[String, nil]>}, :shredded=>{:good=>String, :bad=>String, :errors=>#<Contracts::Maybe:0x4ef10d3b @vals=[String, nil]>, :archive=>#<Contracts::Maybe:0x749ffdc7 @vals=[String, nil]>}}}, :emr=>{:ami_version=>String, :region=>String, :jobflow_role=>String, :service_role=>String, :placement=>#<Contracts::Maybe:0x74ab8610 @vals=[String, nil]>, :ec2_subnet_id=>#<Contracts::Maybe:0x296949c8 @vals=[String, nil]>, :ec2_key_name=>String, :bootstrap=>#<Contracts::Maybe:0x729d1428 @vals=[#<Contracts::CollectionOf:0x257e8c43 @contract=String, @collection_class=Array>, nil]>, :software=>{:hbase=>#<Contracts::Maybe:0x3f0b5619 @vals=[String, nil]>, :lingual=>#<Contracts::Maybe:0x36ce9eaf @vals=[String, nil]>}, :jobflow=>{:job_name=>String, :master_instance_type=>String, :core_instance_count=>Contracts::Num, :core_instance_type=>String, :core_instance_ebs=>#<Contracts::Maybe:0xdc3eda6 @vals=[{:volume_size=>#<Proc:0x77587422@uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/contracts.rb:28 (lambda)>, :volume_type=>#<Proc:0x39eea4f6@uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/contracts.rb:27 (lambda)>, :volume_iops=>#<Contracts::Maybe:0x5c94d4b8 @vals=[#<Proc:0x77587422@uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/contracts.rb:28 (lambda)>, nil]>, :ebs_optimized=>#<Contracts::Maybe:0x308d8de8 @vals=[Contracts::Bool, nil]>}, nil]>, :task_instance_count=>Contracts::Num, :task_instance_type=>String, :task_instance_bid=>#<Contracts::Maybe:0x325236f5 @vals=[Contracts::Num, nil]>}, :additional_info=>#<Contracts::Maybe:0x73633230 @vals=[String, nil]>, :bootstrap_failure_tries=>Contracts::Num, :configuration=>#<Contracts::Maybe:0x69d2c460 @vals=[#<Contracts::HashOf:0x60be9fdf @key=Symbol, @value=#<Contracts::HashOf:0x7d816d32 @key=Symbol, @value=String>>, nil]>}}, :collectors=>#<Contracts::Maybe:0x3e984100 @vals=[{:format=>String}, nil]>, :enrich=>{:versions=>#<Contracts::Maybe:0x47d7e4b6 @vals=[{:spark_enrich=>String}, nil]>, :continue_on_unexpected_error=>#<Contracts::Maybe:0x49e92724 @vals=[Contracts::Bool, nil]>, :output_compression=>#<Proc:0x36e95b7b@uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/contracts.rb:26 (lambda)>}, :storage=>{:versions=>{:rdb_shredder=>String, :hadoop_elasticsearch=>String, :rdb_loader=>String}}, :monitoring=>{:tags=>#<Contracts::HashOf:0x58189132 @key=Symbol, @value=String>, :logging=>{:level=>String}, :snowplow=>#<Contracts::Maybe:0x6f9999f6 @vals=[{:method=>String, :collector=>String, :app_id=>String}, nil]>}}, nil]>,

Actual: {:aws=>{:access_key_id=>"", :secret_access_key=>"", :s3=>{:region=>“eu-west-1”, :buckets=>{:assets=>“s3://sp-dev-hostedassetsmirror-25zbzj9qbp5t”, :jsonpath_assets=>“s3://sp-dev-iglurepo-3in6ep4wt6ld/jsonpaths”, :log=>“s3://sp-dev-badevents-recovered/logs/2020-11-01/”, :encrypted=>false, :raw=>{:in=>[“s3://sp-dev-badevents-recovered/2020-11-01/”], :processing=>“s3://sp-dev-badevents-recovered/2020-11-01/processing”, :archive=>“s3://sp-dev-archive-badevents-recovered/raw/”}, :enriched=>{:good=>“s3://sp-dev-badevents-recovered/enriched/good/2020-11-01/”, :bad=>“s3://sp-dev-badevents-recovered/enriched/bad/2020-11-01/”, :errors=>“s3://sp-dev-badevents-recovered/enriched/errors/2020-11-01/”, :archive=>“s3://sp-dev-archive-badevents-recovered/enriched/good/2020-11-01/”}, :shredded=>{:good=>“s3://sp-dev-badevents-recovered/shredded/good/2020-11-01/”, :bad=>“s3://sp-dev-badevents-recovered/shredded/bad/2020-11-01/”, :errors=>“s3://sp-dev-badevents-recovered/shredded/errors/2020-11-01/”, :archive=>“s3://sp-dev-archive-badevents-recovered/shredded/good/2020-11-01/”}}}, :emr=>{:ami_version=>“5.9.0”, :region=>“eu-west-1”, :jobflow_role=>“EMR_EC2_DefaultRole”, :service_role=>“EMR_DefaultRole”, :placement=>nil, :ec2_subnet_id=>nil, :ec2_key_name=>"–", :bootstrap=>, :software=>{:hbase=>nil, :lingual=>nil}, :jobflow=>{:job_name=>“Snowplow BadData ETL”, :master_instance_type=>“m4.xlarge”, :core_instance_count=>1, :core_instance_type=>“m4.xlarge”, :core_instance_ebs=>{:volume_size=>100, :volume_type=>“gp2”, :volume_iops=>nil, :ebs_optimized=>false}, :task_instance_count=>2, :task_instance_type=>“m4.xlarge”, :task_instance_bid=>0.999}, :bootstrap_failure_tries=>2, :configuration=>{:“yarn-site”=>{:“yarn.resourcemanager.am.max-attempts”=>“1”}, :spark=>{:maximizeResourceAllocation=>“true”}}, :additional_info=>nil}, :collectors=>{:format=>“thrift”}, :enrich=>{:versions=>{:spark_enrich=>“1.14.0”}, :continue_on_unexpected_error=>true, :output_compression=>“GZIP”}, :storage=>{:versions=>{:rdb_loader=>“0.14.0”, :rdb_shredder=>“0.13.1”, :hadoop_elasticsearch=>“0.1.0”}}, :monitoring=>{:tags=>{}, :logging=>nil, :level=>“DEBUG”}, :snowplow=>{:method=>“get”, :app_id=>“pf-dev-snowplow”, :collector=>“c-dev.propertyfinder.ae”}}}
Value guarded in: Snowplow::EmrEtlRunner::Cli::load_config
** With Contract: Maybe, String, Bool => Maybe**
** At: uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/cli.rb:202**

I checked actual and expected results and i couldnot find out the real issue.

Can you please guide me on this plus I would like to know whats the relevance of the snowplow block part in this YAML.
Whatever name I specified as app_id and collector are just a name I defined. (no relevance), let me know if it has to be the real snowplow collector name that we use for the real time streaming pipeline.

Hi @Milan_Mathew!

Could you please let me know what version of EmrEtlRunner are you using? The configuration template might be different between the versions. Looking at your config, though, it appears the issue is caused by a wrong indentation.

Expected:

:logging=>{:level=>String}

So, instead of

    logging: null
    level: DEBUG

Could you please try

  logging:
    level: DEBUG

Hope this helps!

Hi @Yulia ,
Thanks for the reply. Yes, I did change that indentation and tried again, but no luck, getting the same error again.

Following is the updated yaml:

aws:
  access_key_id: ******
  secret_access_key: ********
  s3:
    region: eu-west-1
    buckets:
      assets: s3://sp-dev-hostedassetsmirror-25zbzj9qbp5t # DO NOT CHANGE unless you are hosting the jarfiles etc yourself in your own bucket
      jsonpath_assets: s3://sp-dev-iglurepo-3in6ep4wt6ld/jsonpaths # If you have defined your own JSON Schemas, add the s3:// path to your own JSON Path files in your own bucket here
      log: s3://sp-dev-badevents-recovered/logs/2020-11-01/
      encrypted: false
      raw:
        in:                  # This is a YAML array of one or more in buckets - you MUST use hyphens before each entry in the array, as below
          #- ADD HERE         # e.g. s3://my-old-collector-bucket
          - s3://sp-dev-badevents-recovered/
        processing: s3://sp-dev-badevents-recovered/2020-11-01/
        archive: s3://sp-dev-archive-badevents-recovered/raw/    # e.g. s3://my-archive-bucket/raw
      enriched:
        good: s3://sp-dev-badevents-recovered/enriched/good/2020-11-01/     # e.g. s3://my-out-bucket/enriched/good
        bad: s3://sp-dev-badevents-recovered/enriched/bad/2020-11-01/        # e.g. s3://my-out-bucket/enriched/bad
        errors: s3://sp-dev-badevents-recovered/enriched/errors/2020-11-01/     # Leave blank unless :continue_on_unexpected_error: set to true below
        archive: s3://sp-dev-archive-badevents-recovered/enriched/good/2020-11-01/    # Where to archive enriched events to, e.g. s3://my-archive-bucket/enriched
        stream:
      shredded:
        good: s3://sp-dev-badevents-recovered/shredded/good/2020-11-01/       # e.g. s3://my-out-bucket/shredded/good
        bad: s3://sp-dev-badevents-recovered/shredded/bad/2020-11-01/        # e.g. s3://my-out-bucket/shredded/bad
        errors: s3://sp-dev-badevents-recovered/shredded/errors/2020-11-01/     # Leave blank unless :continue_on_unexpected_error: set to true below
        archive: s3://sp-dev-archive-badevents-recovered/shredded/good/2020-11-01/
        # Where to archive shredded events to, e.g. s3://my-archive-bucket/shredded
      consolidate_shredded_output: true
  emr:
    ami_version: 5.10.0
    region: eu-west-1        # Always set this
    jobflow_role: EMR_EC2_DefaultRole # Created using $ aws emr create-default-roles
    service_role: EMR_DefaultRole     # Created using $ aws emr create-default-roles    bootstrap: []           # Set this to specify custom boostrap actions. Leave empty otherwise
    placement:
    ec2_subnet_id:
    ec2_key_name: pf-dsa-dev
    security_configuration:
    bootstrap: []
    software:
      hbase:                # Optional. To launch on cluster, provide version, "0.92.0", keep quotes. Leave empty otherwise.
      lingual:              # Optional. To launch on cluster, provide version, "1.1", keep quotes. Leave empty otherwise.
    # Adjust your Hadoop cluster below
    jobflow:
      job_name: Snowplow BadData ETL # Give your job a name
      master_instance_type: m4.xlarge
      core_instance_count: 1
      core_instance_type: m4.xlarge
      core_instance_ebs:    # Optional. Attach an EBS volume to each core instance.
        volume_size: 100    # Gigabytes
        volume_type: "gp2"
        volume_iops:     # Optional. Will only be used if volume_type is "io1"
        ebs_optimized: false # Optional. Will default to true
      task_instance_count: 2 # Increase to use spot instances
      task_instance_type: m4.xlarge
      task_instance_bid: 0.999 # In USD. Adjust bid, or leave blank for non-spot-priced (i.e. on-demand) task instances
    additional_info:
    bootstrap_failure_tries: 3 # Number of times to attempt the job in the event of bootstrap failures
    configuration:
#      yarn-site:
#        yarn.resourcemanager.am.max-attempts: "1"
#      spark:
#        maximizeResourceAllocation: "true"
          # Optional JSON string for selecting additional features
  collectors:
    format: thrift # For example: 'clj-tomcat' for the Clojure Collector, 'thrift' for Thrift records, 'tsv/com.amazon.aws.cloudfront/wd_access_log' for Cloudfront access logs or 'ndjson/urbanairship.connect/v1' for UrbanAirship Connect events
  enrich:
    versions:
        spark_enrich: 1.17.0 # Version of the Spark Enrichment process
    continue_on_unexpected_error: true # Set to 'true' (and set :out_errors: above) if you don't want any exceptions thrown from ETL
    output_compression: GZIP # Compression only supported with Redshift, set to NONE if you have Postgres targets. Allowed formats: NONE, GZIP
  storage:
    versions:
      rdb_shredder: 0.14.1        # Version of the Spark Shredding process
      hadoop_elasticsearch: 0.1.0 # Version of the Hadoop to Elasticsearch copying process
      rdb_loader: 0.15.0
  monitoring:
    tags: snowplow # Name-value pairs describing this job
    logging:
      level: DEBUG # You can optionally switch to INFO for production
    snowplow:
      method: get
      collector: c-dev.propertyfinder.ae # e.g. d3rkrsqld9gmqf.cloudfront.net
      app_id: pf-dev-snowplow # e.g. snowplow

using the version r115_sigiriya and I am not able to find whats the issue with this yaml.

Hi @Milan_Mathew,

Looks like you are using the incorrect template for r115_sigiriya Here’s the correctly formed configuration file for R115 release . You can check your current config.yml against it and adjust accordingly.

Hi @Jenni ,
Thank you for the response. I will adjust and let you know. But will you be able to answer this question I asked?

whats the relevance of the snowplow block part in this YAML. Whatever name I specified as app_id and collector are just a name I defined. (no relevance), let me know if it has to be the real snowplow collector name that we use for the real-time streaming pipeline.

Hi,
I corrected the template to align with the configuration in the link you provided and still, it’s failing again with the same error.

Following the latest version of config yml i tried for the version r115_sigiriya:

aws:
  access_key_id: ***************
  secret_access_key: ******************
  s3:
    region: eu-west-1
    buckets:
      assets: s3://sp-dev-hostedassetsmirror-25zbzj9qbp5t # DO NOT CHANGE unless you are hosting the jarfiles etc yourself in your own bucket
      jsonpath_assets: s3://sp-dev-iglurepo-3in6ep4wt6ld/jsonpaths # If you have defined your own JSON Schemas, add the s3:// path to your own JSON Path files in your own bucket here
      log: s3://sp-dev-badevents-recovered/logs/2020-11-01/
      encrypted: false
      raw:
        in:                  # This is a YAML array of one or more in buckets - you MUST use hyphens before each entry in the array, as below
          #- ADD HERE         # e.g. s3://my-old-collector-bucket
          - s3://sp-dev-badevents-recovered/
        processing: s3://sp-dev-badevents-recovered/2020-11-01/
        archive: s3://sp-dev-archive-badevents-recovered/raw/    # e.g. s3://my-archive-bucket/raw
      enriched:
        good: s3://sp-dev-badevents-recovered/enriched/good/2020-11-01/     # e.g. s3://my-out-bucket/enriched/good
        bad: s3://sp-dev-badevents-recovered/enriched/bad/2020-11-01/        # e.g. s3://my-out-bucket/enriched/bad
        errors: s3://sp-dev-badevents-recovered/enriched/errors/2020-11-01/     # Leave blank unless :continue_on_unexpected_error: set to true below
        archive: s3://sp-dev-archive-badevents-recovered/enriched/good/2020-11-01/    # Where to archive enriched events to, e.g. s3://my-archive-bucket/enriched
      shredded:
        good: s3://sp-dev-badevents-recovered/shredded/good/2020-11-01/       # e.g. s3://my-out-bucket/shredded/good
        bad: s3://sp-dev-badevents-recovered/shredded/bad/2020-11-01/        # e.g. s3://my-out-bucket/shredded/bad
        errors: s3://sp-dev-badevents-recovered/shredded/errors/2020-11-01/     # Leave blank unless :continue_on_unexpected_error: set to true below
        archive: s3://sp-dev-archive-badevents-recovered/shredded/good/2020-11-01/
        # Where to archive shredded events to, e.g. s3://my-archive-bucket/shredded
      consolidate_shredded_output: false
  emr:
    ami_version: 5.9.0
    region: eu-west-1        # Always set this
    jobflow_role: EMR_EC2_DefaultRole # Created using $ aws emr create-default-roles
    service_role: EMR_DefaultRole     # Created using $ aws emr create-default-roles    bootstrap: []           # Set this to specify custom boostrap actions. Leave empty otherwise
    placement:
    ec2_subnet_id:
    ec2_key_name: pf-dsa-dev
    security_configuration:
    bootstrap: []
    software:
      hbase:                # Optional. To launch on cluster, provide version, "0.92.0", keep quotes. Leave empty otherwise.
      lingual:              # Optional. To launch on cluster, provide version, "1.1", keep quotes. Leave empty otherwise.
    # Adjust your Hadoop cluster below
    jobflow:
      job_name: Snowplow BadData ETL # Give your job a name
      master_instance_type: m1.medium
      core_instance_count: 2
      core_instance_type: m1.medium
      core_instance_ebs:    # Optional. Attach an EBS volume to each core instance.
        volume_size: 100    # Gigabytes
        volume_type: "gp2"
        volume_iops: 400    # Optional. Will only be used if volume_type is "io1"
        ebs_optimized: false # Optional. Will default to true
      task_instance_count: 2 # Increase to use spot instances
      task_instance_type: m1.medium
      task_instance_bid: 0.015 # In USD. Adjust bid, or leave blank for non-spot-priced (i.e. on-demand) task instances
    bootstrap_failure_tries: 3 # Number of times to attempt the job in the event of bootstrap failures
    configuration:
      yarn-site:
        yarn.resourcemanager.am.max-attempts: "1"
      spark:
        maximizeResourceAllocation: "true"
    additional_info:
  collectors:
    format: thrift # For example: 'clj-tomcat' for the Clojure Collector, 'thrift' for Thrift records, 'tsv/com.amazon.aws.cloudfront/wd_access_log' for Cloudfront access logs or 'ndjson/urbanairship.connect/v1' for UrbanAirship Connect events
  enrich:
    versions:
        spark_enrich: 1.17.0 # Version of the Spark Enrichment process
    continue_on_unexpected_error: true # Set to 'true' (and set :out_errors: above) if you don't want any exceptions thrown from ETL
    output_compression: GZIP # Compression only supported with Redshift, set to NONE if you have Postgres targets. Allowed formats: NONE, GZIP
  storage:
    versions:
      rdb_loader: 0.14.0
      rdb_shredder: 0.13.1        # Version of the Spark Shredding process
      hadoop_elasticsearch: 0.1.0 # Version of the Hadoop to Elasticsearch copying process
  monitoring:
    tags: {}   # Name-value pairs describing this job
    logging:
      level: DEBUG # You can optionally switch to INFO for production
    snowplow:
      method: get
      protocol: http
      port: 80
      app_id: pf-dev-snowplow # e.g. snowplow
      collector: c-dev.propertyfinder.ae # e.g. d3rkrsqld9gmqf.cloudfront.net

Kindly check the expected output as well since its showing little different with the version yaml template i followed from R115 release provided.

> Expected: #<Contracts::Maybe:0x240e61 @vals=[{:aws=>{:access_key_id=>String, :secret_access_key=>String, :s3=>{:region=>String, :buckets=>{:assets=>String, :jsonpath_assets=>#<Contracts::Maybe:0x5e643402 @vals=[String, nil]>, :log=>String, :encrypted=>Contracts::Bool, :raw=>#<Contracts::Maybe:0x7010c9e4 @vals=[{:in=>#<Contracts::CollectionOf:0x24d7c365 @collection_class=Array, @contract=String>, :processing=>String, :archive=>String}, nil]>, :enriched=>{:good=>String, :bad=>#<Contracts::Maybe:0x218e186b @vals=[String, nil]>, :errors=>#<Contracts::Maybe:0x612531a3 @vals=[String, nil]>, :archive=>#<Contracts::Maybe:0x6b9fc5c7 @vals=[String, nil]>, :stream=>#<Contracts::Maybe:0x6b30ff23 @vals=[String, nil]>}, :shredded=>{:good=>String, :bad=>String, :errors=>#<Contracts::Maybe:0x7a689979 @vals=[String, nil]>, :archive=>#<Contracts::Maybe:0x164db8f0 @vals=[String, nil]>}}, :consolidate_shredded_output=>Contracts::Bool}, :emr=>{:ami_version=>String, :region=>String, :jobflow_role=>String, :service_role=>String, :placement=>#<Contracts::Maybe:0x3dbf3bc @vals=[String, nil]>, :ec2_subnet_id=>#<Contracts::Maybe:0x3b850bb7 @vals=[String, nil]>, :ec2_key_name=>String, :security_configuration=>#<Contracts::Maybe:0x29693b1d @vals=[String, nil]>, :bootstrap=>#<Contracts::Maybe:0x44d6f9cf @vals=[#<Contracts::CollectionOf:0x4fe9fb65 @collection_class=Array, @contract=String>, nil]>, :software=>{:hbase=>#<Contracts::Maybe:0xfeab3ae @vals=[String, nil]>, :lingual=>#<Contracts::Maybe:0x64f2b1b4 @vals=[String, nil]>}, :jobflow=>{:job_name=>String, :master_instance_type=>String, :core_instance_count=>Contracts::Num, :core_instance_type=>String, :core_instance_ebs=>#<Contracts::Maybe:0x7505dcab @vals=[{:volume_size=>#<Proc:0x25b87e1b@uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/contracts.rb:41 (lambda)>, :volume_type=>#<Proc:0x623a891d@uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/contracts.rb:40 (lambda)>, :volume_iops=>#<Contracts::Maybe:0x7962a364 @vals=[#<Proc:0x25b87e1b@uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/contracts.rb:41 (lambda)>, nil]>, :ebs_optimized=>#<Contracts::Maybe:0x134ec0f3 @vals=[Contracts::Bool, nil]>}, nil]>, :task_instance_count=>Contracts::Num, :task_instance_type=>String, :task_instance_bid=>#<Contracts::Maybe:0x1f602930 @vals=[Contracts::Num, nil]>}, :additional_info=>#<Contracts::Maybe:0x1d4c6e32 @vals=[String, nil]>, :bootstrap_failure_tries=>Contracts::Num, :configuration=>#<Contracts::Maybe:0x71689ee2 @vals=[#<Contracts::HashOf:0x2b569858 @key=Symbol, @value=#<Contracts::HashOf:0x1b84d03d @key=Symbol, @value=String>>, nil]>}}, :collectors=>#<Contracts::Maybe:0x23dda7a3 @vals=[{:format=>String}, nil]>, :enrich=>{:versions=>#<Contracts::Maybe:0x3dbb7bb @vals=[{:spark_enrich=>String}, nil]>, :continue_on_unexpected_error=>#<Contracts::Maybe:0x6632eb19 @vals=[Contracts::Bool, nil]>, :output_compression=>#<Proc:0x4a778943@uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/contracts.rb:39 (lambda)>}, :storage=>{:versions=>{:rdb_shredder=>String, :hadoop_elasticsearch=>String, :rdb_loader=>String}}, :monitoring=>{:tags=>#<Contracts::HashOf:0xcc91fe3 @key=Symbol, @value=String>, :logging=>{:level=>String}, :snowplow=>#<Contracts::Maybe:0x16f62062 @vals=[{:method=>String, :collector=>String, :app_id=>String}, nil]>}}, nil]>,

Above is the expected output coming along with the error when I run emr-etl-runner.

@Jenni @Yulia @ihor Can someone help me as I am stuck with it for the past few days?

Thanks In advance

@Milan_Mathew, I spotted one more incorrect indentation - you placed consolidate_shredded_output as a child of buckets. You need to take it out of buckets so that it is listed under s3 instead.

whats the relevance of the snowplow block part in this YAML. Whatever name I specified as app_id and collector are just a name I defined. (no relevance), let me know if it has to be the real snowplow collector name that we use for the real-time streaming pipeline.

You do not have to send the ETL data to the collector but if you do it would be either the very same collector/pipeline as the one you process your usual tracked data or any other collector/pipeline you might have deployed. The app_id would distinguish these ETL processing-related data from the actual data you track on your application(s).

Hi @ihor,
Thank you for spotting that . I corrected that too and tried , but failing with the same error.
I am so curious to understand why the expected output is a little different like it has a stream field under enriched etc from the format I am using for the version.

> aws:
>   access_key_id: ****************
>   secret_access_key: *******************
>   s3:
>     region: eu-west-1
>     buckets:
>       assets: s3://sp-dev-hostedassetsmirror-25zbzj9qbp5t # DO NOT CHANGE unless you are hosting the jarfiles etc yourself in your own bucket
>       jsonpath_assets: s3://sp-dev-iglurepo-3in6ep4wt6ld/jsonpaths # If you have defined your own JSON Schemas, add the s3:// path to your own JSON Path files in your own bucket here
>       log: s3://sp-dev-badevents-recovered/logs/2020-11-01/
>       encrypted: false
>       raw:
>         in:                  # This is a YAML array of one or more in buckets - you MUST use hyphens before each entry in the array, as below
>           #- ADD HERE         # e.g. s3://my-old-collector-bucket
>           - s3://sp-dev-badevents-recovered/
>         processing: s3://sp-dev-badevents-recovered/2020-11-01/
>         archive: s3://sp-dev-archive-badevents-recovered/raw/    # e.g. s3://my-archive-bucket/raw
>       enriched:
>         good: s3://sp-dev-badevents-recovered/enriched/good/2020-11-01/     # e.g. s3://my-out-bucket/enriched/good
>         bad: s3://sp-dev-badevents-recovered/enriched/bad/2020-11-01/        # e.g. s3://my-out-bucket/enriched/bad
>         errors: s3://sp-dev-badevents-recovered/enriched/errors/2020-11-01/     # Leave blank unless :continue_on_unexpected_error: set to true below
>         archive: s3://sp-dev-archive-badevents-recovered/enriched/good/2020-11-01/    # Where to archive enriched events to, e.g. s3://my-archive-bucket/enriched
>       shredded:
>         good: s3://sp-dev-badevents-recovered/shredded/good/2020-11-01/       # e.g. s3://my-out-bucket/shredded/good
>         bad: s3://sp-dev-badevents-recovered/shredded/bad/2020-11-01/        # e.g. s3://my-out-bucket/shredded/bad
>         errors: s3://sp-dev-badevents-recovered/shredded/errors/2020-11-01/     # Leave blank unless :continue_on_unexpected_error: set to true below
>         archive: s3://sp-dev-archive-badevents-recovered/shredded/good/2020-11-01/
>         # Where to archive shredded events to, e.g. s3://my-archive-bucket/shredded
>     consolidate_shredded_output: false
>   emr:
>     ami_version: 5.9.0
>     region: eu-west-1        # Always set this
>     jobflow_role: EMR_EC2_DefaultRole # Created using $ aws emr create-default-roles
>     service_role: EMR_DefaultRole     # Created using $ aws emr create-default-roles    bootstrap: []           # Set this to specify custom boostrap actions. Leave empty otherwise
>     placement:
>     ec2_subnet_id:
>     ec2_key_name: pf-dsa-dev
>     security_configuration:
>     bootstrap: []
>     software:
>       hbase:                # Optional. To launch on cluster, provide version, "0.92.0", keep quotes. Leave empty otherwise.
>       lingual:              # Optional. To launch on cluster, provide version, "1.1", keep quotes. Leave empty otherwise.
>     # Adjust your Hadoop cluster below
>     jobflow:
>       job_name: Snowplow BadData ETL # Give your job a name
>       master_instance_type: m1.medium
>       core_instance_count: 2
>       core_instance_type: m1.medium
>       core_instance_ebs:    # Optional. Attach an EBS volume to each core instance.
>         volume_size: 100    # Gigabytes
>         volume_type: "gp2"
>         volume_iops: 400    # Optional. Will only be used if volume_type is "io1"
>         ebs_optimized: false # Optional. Will default to true
>       task_instance_count: 2 # Increase to use spot instances
>       task_instance_type: m1.medium
>       task_instance_bid: 0.015 # In USD. Adjust bid, or leave blank for non-spot-priced (i.e. on-demand) task instances
>     bootstrap_failure_tries: 3 # Number of times to attempt the job in the event of bootstrap failures
>     configuration:
>       yarn-site:
>         yarn.resourcemanager.am.max-attempts: "1"
>       spark:
>         maximizeResourceAllocation: "true"
>     additional_info:
>   collectors:
>     format: thrift # For example: 'clj-tomcat' for the Clojure Collector, 'thrift' for Thrift records, 'tsv/com.amazon.aws.cloudfront/wd_access_log' for Cloudfront access logs or 'ndjson/urbanairship.connect/v1' for UrbanAirship Connect events
>   enrich:
>     versions:
>         spark_enrich: 1.17.0 # Version of the Spark Enrichment process
>     continue_on_unexpected_error: true # Set to 'true' (and set :out_errors: above) if you don't want any exceptions thrown from ETL
>     output_compression: GZIP # Compression only supported with Redshift, set to NONE if you have Postgres targets. Allowed formats: NONE, GZIP
>   storage:
>     versions:
>       rdb_loader: 0.14.0
>       rdb_shredder: 0.13.1        # Version of the Spark Shredding process
>       hadoop_elasticsearch: 0.1.0 # Version of the Hadoop to Elasticsearch copying process
>   monitoring:
>     tags: {}   # Name-value pairs describing this job
>     logging:
>       level: DEBUG # You can optionally switch to INFO for production
>     snowplow:
>       method: get
>       protocol: http
>       port: 80
>       app_id: pf-dev-snowplow # e.g. snowplow
>       collector: c-dev.propertyfinder.ae # e.g. d3rkrsqld9gmqf.cloudfront.net

`

Expected: #<Contracts::Maybe:0x240e61 @vals=[{:aws=>{:access_key_id=>String, :secret_access_key=>String, :s3=>{:region=>String, :buckets=>{:assets=>String, :jsonpath_assets=>#<Contracts::Maybe:0x5e643402 @vals=[String, nil]>, :log=>String, :encrypted=>Contracts::Bool, :raw=>#<Contracts::Maybe:0x7010c9e4 @vals=[{:in=>#<Contracts::CollectionOf:0x24d7c365 @collection_class=Array, @contract=String>, :processing=>String, :archive=>String}, nil]>, :enriched=>{:good=>String, :bad=>#<Contracts::Maybe:0x218e186b @vals=[String, nil]>, :errors=>#<Contracts::Maybe:0x612531a3 @vals=[String, nil]>, :archive=>#<Contracts::Maybe:0x6b9fc5c7 @vals=[String, nil]>, :stream=>#<Contracts::Maybe:0x6b30ff23 @vals=[String, nil]>}, :shredded=>{:good=>String, :bad=>String, :errors=>#<Contracts::Maybe:0x7a689979 @vals=[String, nil]>, :archive=>#<Contracts::Maybe:0x164db8f0 @vals=[String, nil]>}}, :consolidate_shredded_output=>Contracts::Bool}, :emr=>{:ami_version=>String, :region=>String, :jobflow_role=>String, :service_role=>String, :placement=>#<Contracts::Maybe:0x3dbf3bc @vals=[String, nil]>, :ec2_subnet_id=>#<Contracts::Maybe:0x3b850bb7 @vals=[String, nil]>, :ec2_key_name=>String, :security_configuration=>#<Contracts::Maybe:0x29693b1d @vals=[String, nil]>, :bootstrap=>#<Contracts::Maybe:0x44d6f9cf @vals=[#<Contracts::CollectionOf:0x4fe9fb65 @collection_class=Array, @contract=String>, nil]>, :software=>{:hbase=>#<Contracts::Maybe:0xfeab3ae @vals=[String, nil]>, :lingual=>#<Contracts::Maybe:0x64f2b1b4 @vals=[String, nil]>}, :jobflow=>{:job_name=>String, :master_instance_type=>String, :core_instance_count=>Contracts::Num, :core_instance_type=>String, :core_instance_ebs=>#<Contracts::Maybe:0x7505dcab @vals=[{:volume_size=>#<Proc:0x25b87e1b@uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/contracts.rb:41 (lambda)>, :volume_type=>#<Proc:0x623a891d@uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/contracts.rb:40 (lambda)>, :volume_iops=>#<Contracts::Maybe:0x7962a364 @vals=[#<Proc:0x25b87e1b@uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/contracts.rb:41 (lambda)>, nil]>, :ebs_optimized=>#<Contracts::Maybe:0x134ec0f3 @vals=[Contracts::Bool, nil]>}, nil]>, :task_instance_count=>Contracts::Num, :task_instance_type=>String, :task_instance_bid=>#<Contracts::Maybe:0x1f602930 @vals=[Contracts::Num, nil]>}, :additional_info=>#<Contracts::Maybe:0x1d4c6e32 @vals=[String, nil]>, :bootstrap_failure_tries=>Contracts::Num, :configuration=>#<Contracts::Maybe:0x71689ee2 @vals=[#<Contracts::HashOf:0x2b569858 @key=Symbol, @value=#<Contracts::HashOf:0x1b84d03d @key=Symbol, @value=String>>, nil]>}}, :collectors=>#<Contracts::Maybe:0x23dda7a3 @vals=[{:format=>String}, nil]>, :enrich=>{:versions=>#<Contracts::Maybe:0x3dbb7bb @vals=[{:spark_enrich=>String}, nil]>, :continue_on_unexpected_error=>#<Contracts::Maybe:0x6632eb19 @vals=[Contracts::Bool, nil]>, :output_compression=>#<Proc:0x4a778943@uri:classloader:/emr-etl-runner/lib/snowplow-emr-etl-runner/contracts.rb:39 (lambda)>}, :storage=>{:versions=>{:rdb_shredder=>String, :hadoop_elasticsearch=>String, :rdb_loader=>String}}, :monitoring=>{:tags=>#<Contracts::HashOf:0xcc91fe3 @key=Symbol, @value=String>, :logging=>{:level=>String}, :snowplow=>#<Contracts::Maybe:0x16f62062 @vals=[{:method=>String, :collector=>String, :app_id=>String}, nil]>}}, nil]>,

@ihor does it matter ?

@Milan_Mathew , could you, please, review the indentation starting from collectors and all the way down (enrich, storage, etc). All the properties there are listed under aws in your configuration file. They should be taken out of aws.

The enriched:stream is used when you are running EmrEtlRunner in Stream Enrich mode. That is EmrEtlRunner consumes already enriched (in real-time pipeline) data. If you are processing raw data (the result of recovery) you run EmrEtlRunner in standard mode (no enriched:stream bucket in the configuration file).

Do make sure you have your enrichments configuration files present (if any used) and point to it with -n option as per usage doc.

Hi @ihor ,
I happened to recheck the indentation again and It looked fine. I used an online YAML to make sure about indentation.

> aws:
>   access_key_id: *****************
>   secret_access_key: ******************
>   s3:
>     region: eu-west-1
>     buckets:
>       assets: 's3://sp-dev-hostedassetsmirror-25zbzj9qbp5t'
>       jsonpath_assets: 's3://sp-dev-iglurepo-3in6ep4wt6ld/jsonpaths'
>       log: 's3://sp-dev-badevents-recovered/logs/2020-11-01/'
>       encrypted: false
>       raw:
>         in:
>           - 's3://sp-dev-badevents-recovered/'
>         processing: 's3://sp-dev-badevents-recovered/2020-11-01/'
>         archive: 's3://sp-dev-archive-badevents-recovered/raw/'
>       enriched:
>         good: 's3://sp-dev-badevents-recovered/enriched/good/2020-11-01/'
>         bad: 's3://sp-dev-badevents-recovered/enriched/bad/2020-11-01/'
>         errors: 's3://sp-dev-badevents-recovered/enriched/errors/2020-11-01/'
>         archive: 's3://sp-dev-archive-badevents-recovered/enriched/good/2020-11-01/'
>       shredded:
>         good: 's3://sp-dev-badevents-recovered/shredded/good/2020-11-01/'
>         bad: 's3://sp-dev-badevents-recovered/shredded/bad/2020-11-01/'
>         errors: 's3://sp-dev-badevents-recovered/shredded/errors/2020-11-01/'
>         archive: 's3://sp-dev-archive-badevents-recovered/shredded/good/2020-11-01/'
>     consolidate_shredded_output: false
>   emr:
>     ami_version: 5.9.0
>     region: eu-west-1
>     jobflow_role: EMR_EC2_DefaultRole
>     service_role: EMR_DefaultRole
>     placement: null
>     ec2_subnet_id: null
>     ec2_key_name: pf-dsa-dev
>     security_configuration: null
>     bootstrap: []
>     software:
>       hbase: null
>       lingual: null
>     jobflow:
>       job_name: Snowplow BadData ETL
>       master_instance_type: m1.medium
>       core_instance_count: 2
>       core_instance_type: m1.medium
>       core_instance_ebs:
>         volume_size: 100
>         volume_type: gp2
>         volume_iops: 400
>         ebs_optimized: false
>       task_instance_count: 2
>       task_instance_type: m1.medium
>       task_instance_bid: 0.015
>     bootstrap_failure_tries: 3
>     configuration:
>       yarn-site:
>         yarn.resourcemanager.am.max-attempts: '1'
>       spark:
>         maximizeResourceAllocation: 'true'
>     additional_info: null
>   collectors:
>     format: thrift
>   enrich:
>     versions:
>       spark_enrich: 1.17.0
>     continue_on_unexpected_error: true
>     output_compression: GZIP
>   storage:
>     versions:
>       rdb_loader: 0.14.0
>       rdb_shredder: 0.13.1
>       hadoop_elasticsearch: 0.1.0
>   monitoring:
>     tags: {}
>     logging:
>       level: DEBUG
>     snowplow:
>       method: get
>       protocol: http
>       port: 80
>       app_id: pf-dev-snowplow
>       collector: c-dev.propertyfinder.ae

Anything else you think it will be the problem?

I am using the following step by skipping staging as I have the lzo files after recovery.

etl/snowplow-emr-etl-runner run --config etl/config.yml --resolver etl/resolver.json --skip staging

Resolver file is the following

{
  "schema":"iglu:com.snowplowanalytics.iglu/resolver-config/jsonschema/1-0-0",
  "data":{
    "cacheSize":500,
    "repositories":[
      {
        "name":"Iglu Propertyfinder events",
        "priority":1,
        "vendorPrefixes":[
          "ae.propertyfinder"
        ],
        "connection":{
          "http":{
            "uri":"https://s3-eu-west-1.amazonaws.com/sp-dev-iglurepo-3in6ep4wt6ld"
          }
        }
      },
      {
        "name":"Iglu Central",
        "priority":0,
        "vendorPrefixes":[
          "com.snowplowanalytics"
        ],
        "connection":{
          "http":{
            "uri":"http://iglucentral.com"
          }
        }
      }
    ]
  }
}

Enrichment option i dnt need it currently.

@Jenni @Yulia

@Milan_Mathew , sorry but you do have the wrong indentation (at least that is what you have in the config you posted here). I got it working once I fixed it. For example, if you remove > from the config you will see collectors, enrich, storage, monitoring are not on the same level as aws but they should.

By the way, to post the codeblock place you config in between the pair of the triple ticks ```.

@ihor,
Oh, that was totally my bad, I thought those are coming under AWS. Yes, i corrected that and things got moving now.

Thanks a ton.