Should I run rdb_load only?

I set up the collector to push a kinesis stream - snowplow-good. Then the enrichment to use kinesis streams - reading from snowplow-good and output to snowplow-enriched. I then configured the s3loader to read from snowplow-enriched and dump the shredded (right term?) files to s3://snowplow-collector/shredded/good. I see that s3loader is producing the files in the tsv format, one event per line. All seems to be working as the documentation says.

Redshift is already set up following the documentation and I’m currently trying to get the emretlrunner to load the shredded s3 files into redshift. The problem I am facing is that emretlrunner produces this message even though there are new files in the bucket

java -Dorg.slf4j.simpleLogger.defaultLogLevel=debug -jar snowplow-emr-etl-runner run -c emretlrunner.yml -r /home/collector/config/iglu_resolver.json -t targets --skip staging_stream_enrich

“D, [2020-02-10T21:20:15.933053 #4193] DEBUG – : Initializing EMR jobflow
No logs to process: No Snowplow logs to process since last run”

I suspect that I need to limit the steps that emretlrunner is executing to only the rdb_load step. Is that correct? How do I do that?

my emretlrunner config is below. Please note that I added s3 buckets for raw and enriched because without them I would get a contract error. I don’t need them because I’m using streams for everything before the s3loader process.

aws:
  # Credentials can be hardcoded or set in environment variables
  access_key_id: xxxxxxx 
  secret_access_key: xxxxxxxx
  s3:
    region: us-east-2
    buckets:
      assets: s3://snowplow-hosted-assets # DO NOT CHANGE unless you are hosting the jarfiles etc yourself in your own bucket
      jsonpath_assets: # If you have defined your own JSON Schemas, add the s3:// path to your own JSON Path files in your own bucket here
      log: s3://snowplow-s3-to-redshift-log
      encrypted: false # Whether the buckets below are enrcrypted using server side encryption (SSE-S3)
      raw:
        in:                  # This is a YAML array of one or more in buckets - you MUST use hyphens before each entry in the array, as below
          - s3://snowplow-collector/raw/old #ADD HERE         # e.g. s3://my-old-collector-bucket
          - s3://snowplow-collector/raw/new #ADD HERE         # e.g. s3://my-new-collector-bucket
        processing: s3://snowplow-collector/raw/processing #ADD HERE
        archive: s3://snowplow-collector/raw/archive #ADD HERE    # e.g. s3://my-archive-bucket/raw
      enriched:
        good: s3://snowplow-collector/enriched/good #ADD HERE       # e.g. s3://my-out-bucket/enriched/good
        bad: s3://snowplow-collector/enriched/bad #ADD HERE        # e.g. s3://my-out-bucket/enriched/bad
        errors: s3://snowplow-collector/enriched/errors #ADD HERE     # Leave blank unless :continue_on_unexpected_error: set to true below
        archive: s3://snowplow-collector/enriched/archive #ADD HERE    # Where to archive enriched events to, e.g. s3://my-archive-bucket/enriched
      shredded:
        good: s3://snowplow-collector/shredded/good       # e.g. s3://my-out-bucket/shredded/good
        bad: s3://snowplow-collector/shredded/bad        # e.g. s3://my-out-bucket/shredded/bad
        errors: #ADD HERE     # Leave blank unless :continue_on_unexpected_error: set to true below
        archive: s3://snowplow-collector/shredded/archive    # Where to archive shredded events to, e.g. s3://my-archive-bucket/shredded
    consolidate_shredded_output: false # Whether to combine files when copying from hdfs to s3
  emr:
    ami_version: 5.9.0
    region: us-east-2        # Always set this
    jobflow_role: EMR_EC2_DefaultRole # Created using $ aws emr create-default-roles
    service_role: EMR_DefaultRole     # Created using $ aws emr create-default-roles
    placement: #ADD HERE     # Set this if not running in VPC. Leave blank otherwise
    ec2_subnet_id: subnet-xxxxx # Set this if running in VPC. Leave blank otherwise
    ec2_key_name: collector-xxxxxx
    security_configuration: #ADD HERE # Specify your EMR security configuration if needed. Leave blank otherwise
    bootstrap: []           # Set this to specify custom boostrap actions. Leave empty otherwise
    software:
      hbase:                # Optional. To launch on cluster, provide version, "0.92.0", keep quotes. Leave empty otherwise.
      lingual:              # Optional. To launch on cluster, provide version, "1.1", keep quotes. Leave empty otherwise.
    # Adjust your Hadoop cluster below
    jobflow:
      job_name: Snowplow ETL # Give your job a name
      master_instance_type: m1.medium
      core_instance_count: 2
      core_instance_type: m1.medium
      core_instance_bid: 0.015
      core_instance_ebs:    # Optional. Attach an EBS volume to each core instance.
        volume_size: 100    # Gigabytes
        volume_type: "gp2"
        volume_iops: 400    # Optional. Will only be used if volume_type is "io1"
        ebs_optimized: false # Optional. Will default to true
      task_instance_count: 0 # Increase to use spot instances
      task_instance_type: m1.medium
      task_instance_bid: 0.015 # In USD. Adjust bid, or leave blank for non-spot-priced (i.e. on-demand) task instances
    bootstrap_failure_tries: 3 # Number of times to attempt the job in the event of bootstrap failures
    configuration:
      yarn-site:
        yarn.resourcemanager.am.max-attempts: "1"
      spark:
        maximizeResourceAllocation: "true"
    additional_info:        # Optional JSON string for selecting additional features
collectors:
  format: cloudfront # For example: 'clj-tomcat' for the Clojure Collector, 'thrift' for Thrift records, 'tsv/com.amazon.aws.cloudfront/wd_access_log' for Cloudfront access logs or 'ndjson/urbanairship.connect/v1' for UrbanAirship Connect events
enrich:
  versions:
    spark_enrich: 1.18.0 # Version of the Spark Enrichment process
  continue_on_unexpected_error: false # Set to 'true' (and set :out_errors: above) if you don't want any exceptions thrown from ETL
  output_compression: NONE # Compression only supported with Redshift, set to NONE if you have Postgres targets. Allowed formats: NONE, GZIP
storage:
  versions:
    rdb_loader: 0.14.0
    rdb_shredder: 0.13.1        # Version of the Spark Shredding process
    hadoop_elasticsearch: 0.1.0 # Version of the Hadoop to Elasticsearch copying process
monitoring:
  tags: {} # Name-value pairs describing this job
  logging:
    level: DEBUG # You can optionally switch to INFO for production
  snowplow:
    method: get
    protocol: http
    port: 8080
    app_id: snowplow # e.g. snowplow
    collector: xxxxxxxxxx # e.g. d3rkrsqld9gmqf.cloudfront.net

Thanks for your help,
JB

@JBATX, this is not how it was meant to run. You need to shred the data first - you cannot just move place the enriched data into the shredded bucket. Instead, you need to add enriched:stream bucket to your EmrEtlRunner configuration file (see here) and download the streamed enriched data to that bucket.

Once done, EmrEtlRunner will run in Stream Enrich mode.

thanks ihor,
I had the terminology mixed up - Let me clarify my new understanding and what I need to do now.

  1. Have s3loader put the enriched and formatted data into the snowplow-collector/enriched/good bucket not into snowplow-collector/shredded/good.
  2. Add enriched:stream as s3://snowplow-collector/enriched/good to the config.
  3. Run this emretlrunner command:

java -Dorg.slf4j.simpleLogger.defaultLogLevel=debug -jar snowplow-emr-etl-runner run -c emretlrunner.yml -r /home/collector/config/iglu_resolver.json -t targets

…I went ahead and ran the above after replacing and updating my config with the template you linked to.

I now get this error:
ERROR – : No run folders in [s3://snowplow-collector/enriched/good/] found

I see gzipped files produced by s3loader in that bucket directory. What am I doing wrong?

I discovered my error with the last comment in this thread. Confused about Stream Enrich -> S3Loader Step

I changed enriched:stream to s3://snowplow-collector/enriched/stream and changed s3loader to output there.

@JBATX, now it’s correct. The enriched:stream and enriched:good are two distinct buckets.

great! thanks. I’ve managed to get through some more issues beyond that and I’m now seeing this error from EMR on the “[rdb_load] Load AWS Redshift enriched events storage Storage Target” step in syslog:

“2020-02-11 13:17:58,791 WARN com.amazonaws.profile.path.cred.CredentialsLegacyConfigLocationProvider (main): Found the legacy config profiles file at [/home/hadoop/.aws/config]. Please move it to the latest default location [~/.aws/credentials].”

This one in stdout:

“ERROR: Data loading error Problems with establishing DB connection
Amazon Error setting/closing connection: Connection timed out.
Following steps completed: [Discover]”

Is this an issue with the location of a credentials file on the EMR cluster that emretlrunner created or a permission error for the user in the config?

@JBATX, the AWS credentials warning is just that a warning. You can ignore that.

The problem is with establishing the connection to Redshift. This post might help, https://github.com/snowplow/snowplow/wiki/setting-up-redshift.

It was an inbound rule issue. Good news is that it seems to be connecting to Redshift. I fixed that and I’m now seeing

Invalid operation: relation “atomic.manifest” does not exist

I don’t believe creating atomic.manifest was in the set up docs. Did I miss something?

I’m using emretlrunner 117 rc8

UPDATE: I got off track somewhere - just found this: https://github.com/snowplow/snowplow/wiki/setting-up-redshift#db
…I had done everything else on that page except that.