Error using StorageLoader to load data into Redshift

Hello,

My setup is similar to described on - Enriched event stream into Redshift using StorageLoader: Contract violation error

I am having issues with the StorageLoader in the following setup:
– StreamCollector > StreamEnrich > Kinesis LZO S3 Sink > StorageLoader > Redshift

The enriched LZO formatted events are correctly getting to s3://snowplow/enriched-events (in GZIP compression) using StreamEnrich and kinesisS3SinkLZO, and redshift is setup, however I am having trouble getting the enriched events from S3 > redshift using the StorageLoader from snowplow_emr_r87_chichen_itza_rc4.zip.

When I run the storage loader using following command, I get the error below.
snowplow-storage-loader --config config.yml --include compupdate,vacuum --skip analyze

Loading Snowplow events and shredded types into Redshift database (Redshift cluster)...
Unexpected error: Cannot find atomic-events directory in shredded/good
uri:classloader:/storage-loader/lib/snowplow-storage-loader/redshift_loader.rb:74:in `load_events_and_shredded_types'
uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_reference.rb:43:in `send_to'
uri:classloader:/gems/contracts-0.11.0/lib/contracts/call_with.rb:76:in `call_with'
uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_handler.rb:138:in `block in redefine_method'
uri:classloader:/storage-loader/bin/snowplow-storage-loader:54:in `block in (root)'
uri:classloader:/storage-loader/bin/snowplow-storage-loader:51:in `<main>'
org/jruby/RubyKernel.java:973:in `load'
uri:classloader:/META-INF/main.rb:1:in `<main>'
org/jruby/RubyKernel.java:955:in `require'
uri:classloader:/META-INF/main.rb:1:in `(root)'
uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/rubygems/core_ext/kernel_require.rb:1:in `<main>'

The “atomic-events” directory exists under -
enriched:
good: s3://snowplow/enriched-events &
shredded:
good: s3://snowplow/shredded/good

Other related questions -

  1. Since I’m using stream collector and stream enrich, do I still need config below in my config.yml -
    raw:
    in: # Multiple in buckets are permitted
    processing:
    archive: # e.g. s3://my-archive-bucket/raw
  2. Since I’m not using emr, do I need any configurations under “emr”, “collectors” and “enrich” ?
  3. Will storage loader pick up the enriched events from under enriched/good or shredded/good? Also, since my events are enriched through “Kinesis LZO S3 Sink”, do I need the “bad”, “errors” and “archive” folders under “enriched” and “shredded” ?

My config.yml is -

aws:
  # Credentials can be hardcoded or set in environment variables
  access_key_id: XXX
  secret_access_key: XXX
  s3:
    region: us-west-2
    buckets:
      assets: s3://snowplow-hosted-assets # DO NOT CHANGE unless you are hosting the jarfiles etc yourself in your own bucket
      jsonpath_assets: # If you have defined your own JSON Schemas, add the s3:// path to your own JSON Path files in your own bucket here
      log: s3://snowplow/enrich-log
      raw:
        in:                  # Multiple in buckets are permitted
        processing: 
        archive:             # e.g. s3://my-archive-bucket/raw
      enriched:
        good:    s3://snowplow/enriched-events    #s3://snowplow/enriched/good       # e.g. s3://my-out-bucket/enriched/good
        bad:     s3://snowplow/enriched/bad        # e.g. s3://my-out-bucket/enriched/bad
        errors:  s3://snowplow/enriched/errors     # Leave blank unless :continue_on_unexpected_error: set to true below
        archive: s3://snowplow/enriched/archive    # Where to archive enriched events to, e.g. s3://my-archive-bucket/enriched
      shredded:
        good:    s3://snowplow/shredded/good      # e.g. s3://my-out-bucket/shredded/good
        bad:     s3://snowplow/shredded/bad        # e.g. s3://my-out-bucket/shredded/bad
        errors:  s3://snowplow/shredded/errors  # Leave blank unless :continue_on_unexpected_error: set to true below
        archive: s3://snowplow/shredded/archive  # Where to archive shredded events to, e.g. s3://my-archive-bucket/shredded
  emr:
    ami_version: 4.5.0
    region: ADD HERE        # Always set this
    jobflow_role: EMR_EC2_DefaultRole # Created using $ aws emr create-default-roles
    service_role: EMR_DefaultRole     # Created using $ aws emr create-default-roles
    placement: ADD HERE     # Set this if not running in VPC. Leave blank otherwise
    ec2_subnet_id: ADD HERE # Set this if running in VPC. Leave blank otherwise
    ec2_key_name: ADD HERE
    bootstrap: []           # Set this to specify custom boostrap actions. Leave empty otherwise
    software:
      hbase:                # Optional. To launch on cluster, provide version, "0.92.0", keep quotes. Leave empty otherwise.
      lingual:              # Optional. To launch on cluster, provide version, "1.1", keep quotes. Leave empty otherwise.
    # Adjust your Hadoop cluster below
    jobflow:
      master_instance_type: m1.medium
      core_instance_count: 2
      core_instance_type: m1.medium
      task_instance_count: 0 # Increase to use spot instances
      task_instance_type: m1.medium
      task_instance_bid: 0.015 # In USD. Adjust bid, or leave blank for non-spot-priced (i.e. on-demand) task instances
    bootstrap_failure_tries: 3 # Number of times to attempt the job in the event of bootstrap failures
    additional_info:        # Optional JSON string for selecting additional features
collectors:
  format: cloudfront # For example: 'clj-tomcat' for the Clojure Collector, 'thrift' for Thrift records, 'tsv/com.amazon.aws.cloudfront/wd_access_log' for Cloudfront access logs or 'ndjson/urbanairship.connect/v1' for UrbanAirship Connect events
enrich:
  job_name: Snowplow ETL # Give your job a name
  versions:
    hadoop_enrich: 1.8.0 # Version of the Hadoop Enrichment process
    hadoop_shred: 0.10.0 # Version of the Hadoop Shredding process
    hadoop_elasticsearch: 0.1.0 # Version of the Hadoop to Elasticsearch copying process
  continue_on_unexpected_error: false # Set to 'true' (and set :out_errors: above) if you don't want any exceptions thrown from ETL
  output_compression: NONE # Compression only supported with Redshift, set to NONE if you have Postgres targets. Allowed formats: NONE, GZIP
storage:
  download:
    folder: # Postgres-only config option. Where to store the downloaded files. Leave blank for Redshift
  targets:
    - name: "Redshift database"
      type: "redshift"
      host: "xxx.us-west-2.redshift.amazonaws.com:xxx" # The endpoint as shown in the Redshift console
      database: "dev" # Name of database
      port: xxxx # Default Redshift port
      ssl_mode: disable # One of disable (default), require, verify-ca or verify-full
      table: atomic.events
      username: "storageloader"
      password: "xxxx"
      maxerror: 1 # Stop loading on first error, or increase to permit more load errors
      comprows: 200000 # Default for a 1 XL node cluster. Not used unless --include compupdate specified
monitoring:
  tags: {} # Name-value pairs describing this job
  logging:
    level: DEBUG # You can optionally switch to INFO for production
  snowplow:
    method: get
    app_id: snowplow # e.g. snowplow
    collector: XXXXXXXXXXX.us-west-2.redshift.amazonaws.com # e.g. d3rkrsqld9gmqf.cloudfront.net

I’d greatly appreciate your help.

Thanks!

Hi @amitkhanal,

The correct configuration is

StreamCollector > Raw Stream >  Kinesis LZO S3 Sink > EmrEtlRunner > StorageLoader > Redshift

Note “Raw Stream” instead of “StreamEnrich”. The enrichment is done by EmrEtlRunner instead.

The error message “Cannot find atomic-events directory in shredded/good” can be explained by the fact StreamEnrich does not perform shredding. This functionality is a part of the “batch” pipeline.

Please, refer to the post How to setup a Lambda architecture for Snowplow for more details.

The other point is the R87 version of the runners is still in development/testing. I would stick to R83 instead for now.

–Ihor

2 Likes

Thanks Ihor, that helped.

The setup document for enrich (https://github.com/snowplow/snowplow/wiki/Setting-up-Enrich) contains 2 enrich options - EmrEtlRunner and Stream Enrich. I chose “Stream Enrich” thinking either of the approaches could be used.

The Lambda architecture document (How to setup a Lambda architecture for Snowplow) was very helpful. As per the “Snowplow approach to Lambda Architecture” the enrich approach can be setup either real-time or batch. Both cannot be done simultaneously on the same raw stream correct?

Also, is there a possibility of allowing to sink to S3 via Kinesis enriched stream any time soon?
I saw Alex’s post here in the link below but wasn’t sure if there’s been any updates since then. Does the Kinesis LZO S3 Sink support reading from an “enriched” stream?

Amit

Hi @amitkhanal,

No, actually it is not correct. The idea behind lambda architecture is to have the best of both world, that is real-time and batch pipelines, at the same time (Stream Collector and Raw Stream are common to both real-time and batch, but not StreamEnrich for now).

If you are only interested in batch processing then there’s no need in this extra complexity. Just set up a “pure” batch, that is

Clojure or Cloudfront collector > EmrEtlRunner > StorageLoader > Redshift

I believe it is still on the roadmap. However, I cannot tell when it is due to be released.

Thanks for your help!

Good question: we are planning on testing this in the next few weeks - we think it should work, but we are not sure yet…

I’ve had good results using Kinesis Firehose (despite its unavailability in most regions, occasional inconsistent behaviours and generally being a complete pain in the behind) to sink the enriched stream to an S3 bucket and then running the RDB shredder on that output.

I haven’t yet tried using the Snowplow kinesis-s3 sink but planning on doing that soon very soon and happy to report back.

Thanks @acgray - and yes @amitkhanal, we have had good results now using the vanilla Kinesis S3 Sink to store enriched events, and then run the RDB Shredder on them.

Our documentation is lagging behind our experimentation, but the goal here is to move away from having to run a Lambda architecture just to sink Snowplow data into Redshift.