EMR ETL stream_enrich mode

Hi Team,

I am confused around how to setup the stream enrich process in the emr etl runner. My current setup looks as follows:

  • Scala Stream Collector setup to receives events and write to kinesis stream “snowplow-events-raw”.
  • Stream Enrich to read from above kinesis stream and write good events to “snowplow-events-good” and bad events to “snowplow-events-bad” kinesis streams.
  • S3 Loader to read from “snowplow-events-good” and write the records to s3 bucket “collector-logs”.
  • EMR ETL runner to read the records from “collector-logs” bucket and write to Redshift table(atomic.events).

After setting this up, the final stage of the pipeline which should write records to Redshift is not working. Is there something I’m missing in the setup?

Thanks in advance!

@shubhamg2208, how did you implement the last step “EMR ETL runner to read the records from “collector-logs” bucket and write to Redshift table(atomic.events)”? How did you configure the targets to let pipeline know the data needs to be loaded to Redshift? What are the EMR logs (steps) when you run the pipeline?

Your pipeline configuration file has to include enriched/stream bucket pointing to collector-logs.

Hi @ihor,

My configuration for the EMR ETL Runner looks as follows:

aws:
  # Credentials can be hardcoded or set in environment variables
  access_key_id: <%= ENV['AWS_SNOWPLOW_ACCESS_KEY'] %>
  secret_access_key: <%= ENV['AWS_SNOWPLOW_SECRET_KEY'] %>
  s3:
    region: ap-south-1
    buckets:
      assets: s3://snowplow-hosted-assets # DO NOT CHANGE unless you are hosting the jarfiles etc yourself in your own bucket
      jsonpath_assets: s3://scrpbx-snwplw-hosted-assets/jsonpaths
      # https://stackoverflow.com/questions/10569455/differences-between-amazon-s3-and-s3n-in-hadoop
      log: s3n://logs/
      enriched:
        good: s3://output/enriched/good       # e.g. s3://my-out-bucket/enriched/good
        bad: s3://output/enriched/bad        # e.g. s3://my-out-bucket/enriched/bad
        stream: s3://collector-logs
      shredded:
        good: s3://output/shredded/good       # e.g. s3://my-out-bucket/shredded/good
        bad: s3://output/shredded/bad        # e.g. s3://my-out-bucket/shredded/bad
        errors: s3://output/shredded/error    # Leave blank unless :continue_on_unexpected_error: set to true below
        archive: s3://output/shredded/archive    # Where to archive shredded events to, e.g. s3://my-archive-bucket/shredded
  emr:
    ami_version: 5.9.0
    region: ap-south-1        # Always set this
    jobflow_role: EMR_EC2_DefaultRole # Created using $ aws emr create-default-roles
    service_role: EMR_DefaultRole     # Created using $ aws emr create-default-roles
    autoscaling_role: EMR_AutoScaling_DefaultRole
    placement:     # Set this if not running in VPC. Leave blank otherwise
    ec2_subnet_id: subnet-c9ed65a1 # Set this if running in VPC. Leave blank otherwise
    ec2_key_name: test@test.com
    bootstrap: []           # Set this to specify custom boostrap actions. Leave empty otherwise
    software:
      hbase:    # Optional. To launch on cluster, provide version, "0.92.0", keep quotes. Leave empty otherwise.
      lingual:  # Optional. To launch on cluster, provide version, "1.1", keep quotes. Leave empty otherwise.
    # Adjust your Hadoop cluster below
    jobflow:
      job_name: Snowplow Stream ETL # Give your job a name
      master_instance_type: c4.xlarge
      core_instance_count: 2
      core_instance_type: c4.xlarge
      core_instance_ebs:    # Optional. Attach an EBS volume to each core instance.
        volume_size: 100    # Gigabytes
        volume_type: "gp2"
        volume_iops: 400    # Optional. Will only be used if volume_type is "io1"
        ebs_optimized: false # Optional. Will default to true
      task_instance_count: 0 # Increase to use spot instances
      task_instance_type: c4.xlarge
      task_instance_bid: 0.015 # In USD. Adjust bid, or leave blank for non-spot-priced (i.e. on-demand) task instances
    bootstrap_failure_tries: 3 # Number of times to attempt the job in the event of bootstrap failures
    configuration:
      yarn-site:
        yarn.resourcemanager.am.max-attempts: "1"
      spark:
        maximizeResourceAllocation: "true"
    additional_info:        # Optional JSON string for selecting additional features
enrich:
  versions:
    spark_enrich: 1.13.0 # Version of the Spark Enrichment process
  continue_on_unexpected_error: false # Set to 'true' (and set :out_errors: above) if you don't want any exceptions thrown from ETL
  output_compression: NONE # Compression only supported with Redshift, set to NONE if you have Postgres targets. Allowed formats: NONE, GZIP
storage:
  versions:
    rdb_loader: 0.14.0
    rdb_shredder: 0.13.0        # Version of the Spark Shredding process
    hadoop_elasticsearch: 0.1.0 # Version of the Hadoop to Elasticsearch copying process
monitoring:
  tags: {} # Name-value pairs describing this job
  logging:
    level: DEBUG # You can optionally switch to INFO for production
  snowplow:
    method: get
    app_id: snowplow # e.g. snowplow
    collector: collector.test.net # e.g. d3rkrsqld9gmqf.cloudfront.net

I’m currently running R104(Stoplesteinan) of the EMR ETL runner. It is executed with the command

AWS_SNOWPLOW_ACCESS_KEY=key AWS_SNOWPLOW_SECRET_KEY=secret snowplow-emr-etl-runner-r104 run -c emr-etl-runner-stream-enrich.config.yml --resolver iglu_resolver.json -t targets -n enrichments -f staging_stream_enrich

The targets folder contains the Redshift target with the required configuration. I’ve also added additional enrichments using the enrichment folder.

With the staging_stream_enrich mode, there is only one step displayed in EMR which is Elasticity S3DistCp Step: Shredded S3 -> S3 Shredded Archive. There are no error logs for this step, just the controller logs and syslog.

Controller logs:

2019-06-05T10:05:49.455Z INFO Ensure step 1 jar file /usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar
2019-06-05T10:05:49.456Z INFO StepRunner: Created Runner for step 1
INFO startExec 'hadoop jar /usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar --src s3://output/shredded/good/run=2018-09-12-20-00-21/ --dest s3://output/shredded/archive/run=2018-09-12-20-00-21/ --s3Endpoint s3-ap-south-1.amazonaws.com --deleteOnSuccess'
INFO Environment:
  PATH=/sbin:/usr/sbin:/bin:/usr/bin:/usr/local/sbin:/opt/aws/bin
  LESS_TERMCAP_md=[01;38;5;208m
  LESS_TERMCAP_me=[0m
  HISTCONTROL=ignoredups
  LESS_TERMCAP_mb=[01;31m
  AWS_AUTO_SCALING_HOME=/opt/aws/apitools/as
  UPSTART_JOB=rc
  LESS_TERMCAP_se=[0m
  HISTSIZE=1000
  HADOOP_ROOT_LOGGER=INFO,DRFA
  JAVA_HOME=/etc/alternatives/jre
  AWS_DEFAULT_REGION=ap-south-1
  AWS_ELB_HOME=/opt/aws/apitools/elb
  LESS_TERMCAP_us=[04;38;5;111m
  EC2_HOME=/opt/aws/apitools/ec2
  TERM=linux
  XFILESEARCHPATH=/usr/dt/app-defaults/%L/Dt
  runlevel=3
  LANG=en_US.UTF-8
  AWS_CLOUDWATCH_HOME=/opt/aws/apitools/mon
  MAIL=/var/spool/mail/hadoop
  LESS_TERMCAP_ue=[0m
  LOGNAME=hadoop
  PWD=/
  LANGSH_SOURCED=1
  HADOOP_CLIENT_OPTS=-Djava.io.tmpdir=/mnt/var/lib/hadoop/steps/s-3OM7XL0P3NKQ/tmp
  _=/etc/alternatives/jre/bin/java
  CONSOLETYPE=serial
  RUNLEVEL=3
  LESSOPEN=||/usr/bin/lesspipe.sh %s
  previous=N
  UPSTART_EVENTS=runlevel
  AWS_PATH=/opt/aws
  USER=hadoop
  UPSTART_INSTANCE=
  PREVLEVEL=N
  HADOOP_LOGFILE=syslog
  PYTHON_INSTALL_LAYOUT=amzn
  HOSTNAME=ip-172-31-50-196
  NLSPATH=/usr/dt/lib/nls/msg/%L/%N.cat
  HADOOP_LOG_DIR=/mnt/var/log/hadoop/steps/s-3OM7XL0P3NKQ
  EC2_AMITOOL_HOME=/opt/aws/amitools/ec2
  SHLVL=5
  HOME=/home/hadoop
  HADOOP_IDENT_STRING=hadoop
INFO redirectOutput to /mnt/var/log/hadoop/steps/s-3OM7XL0P3NKQ/stdout
INFO redirectError to /mnt/var/log/hadoop/steps/s-3OM7XL0P3NKQ/stderr
INFO Working dir /mnt/var/lib/hadoop/steps/s-3OM7XL0P3NKQ
INFO ProcessRunner started child process 8578 :
hadoop    8578  4184  0 10:05 ?        00:00:00 bash /usr/lib/hadoop/bin/hadoop jar /usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar --src s3://output/shredded/good/run=2018-09-12-20-00-21/ --dest s3://output/shredded/archive/run=2018-09-12-20-00-21/ --s3Endpoint s3-ap-south-1.amazonaws.com --deleteOnSuccess
2019-06-05T10:05:53.462Z INFO HadoopJarStepRunner.Runner: startRun() called for s-3OM7XL0P3NKQ Child Pid: 8578
INFO Synchronously wait child process to complete : hadoop jar /usr/share/aws/emr/s3-dist-cp/lib/s3...
INFO waitProcessCompletion ended with exit code 0 : hadoop jar /usr/share/aws/emr/s3-dist-cp/lib/s3...
INFO total process run time: 30 seconds
2019-06-05T10:06:21.722Z INFO Step created jobs: job_1559729030926_0001
2019-06-05T10:06:21.722Z INFO Step succeeded with exitCode 0 and took 30 seconds
 $@

Syslog:

2019-06-05 10:05:51,788 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): Running with args: --src s3://output/shredded/good/run=2018-09-12-20-00-21/ --dest s3://output/shredded/archive/run=2018-09-12-20-00-21/ --s3Endpoint s3-ap-south-1.amazonaws.com --deleteOnSuccess 
2019-06-05 10:05:52,048 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): S3DistCp args: --src s3://output/shredded/good/run=2018-09-12-20-00-21/ --dest s3://output/shredded/archive/run=2018-09-12-20-00-21/ --s3Endpoint s3-ap-south-1.amazonaws.com --deleteOnSuccess 
2019-06-05 10:05:52,067 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): Using output path 'hdfs:/tmp/a1ae3c19-b66f-4eac-9f0a-c2cd80144d47/output'
2019-06-05 10:05:54,021 WARN com.amazon.ws.emr.hadoop.fs.rolemapping.RoleMappings (main): Found no mappings configured with 'fs.s3.authorization.roleMapping', credentials resolution may not work as expected
2019-06-05 10:05:55,230 WARN com.amazonaws.profile.path.cred.CredentialsLegacyConfigLocationProvider (main): Found the legacy config profiles file at [/home/hadoop/.aws/config]. Please move it to the latest default location [~/.aws/credentials].
2019-06-05 10:05:55,288 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): DefaultAWSCredentialsProviderChain is used to create AmazonS3Client. KeyId: ASIA2LHO7ZSGIVSQCLND
2019-06-05 10:05:55,288 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): AmazonS3Client setEndpoint s3-ap-south-1.amazonaws.com
2019-06-05 10:05:55,422 INFO com.amazon.elasticmapreduce.s3distcp.FileInfoListing (main): Opening new file: hdfs:/tmp/a1ae3c19-b66f-4eac-9f0a-c2cd80144d47/files/1
2019-06-05 10:05:55,500 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): Created 1 files to copy 9 files 
2019-06-05 10:05:56,241 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): Reducer number: 3
2019-06-05 10:05:56,501 INFO org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl (main): Timeline service address: http://ip-172-31-50-196.ap-south-1.compute.internal:8188/ws/v1/timeline/
2019-06-05 10:05:56,508 INFO org.apache.hadoop.yarn.client.RMProxy (main): Connecting to ResourceManager at ip-172-31-50-196.ap-south-1.compute.internal/172.31.50.196:8032
2019-06-05 10:05:57,279 INFO org.apache.hadoop.mapreduce.lib.input.FileInputFormat (main): Total input paths to process : 1
2019-06-05 10:05:57,351 INFO org.apache.hadoop.mapreduce.JobSubmitter (main): number of splits:1
2019-06-05 10:05:57,931 INFO org.apache.hadoop.mapreduce.JobSubmitter (main): Submitting tokens for job: job_1559729030926_0001
2019-06-05 10:05:58,466 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl (main): Submitted application application_1559729030926_0001
2019-06-05 10:05:58,504 INFO org.apache.hadoop.mapreduce.Job (main): The url to track the job: http://ip-172-31-50-196.ap-south-1.compute.internal:20888/proxy/application_1559729030926_0001/
2019-06-05 10:05:58,505 INFO org.apache.hadoop.mapreduce.Job (main): Running job: job_1559729030926_0001
2019-06-05 10:06:04,583 INFO org.apache.hadoop.mapreduce.Job (main): Job job_1559729030926_0001 running in uber mode : false
2019-06-05 10:06:04,584 INFO org.apache.hadoop.mapreduce.Job (main):  map 0% reduce 0%
2019-06-05 10:06:10,625 INFO org.apache.hadoop.mapreduce.Job (main):  map 100% reduce 0%
2019-06-05 10:06:17,654 INFO org.apache.hadoop.mapreduce.Job (main):  map 100% reduce 33%
2019-06-05 10:06:18,658 INFO org.apache.hadoop.mapreduce.Job (main):  map 100% reduce 67%
2019-06-05 10:06:19,662 INFO org.apache.hadoop.mapreduce.Job (main):  map 100% reduce 100%
2019-06-05 10:06:20,673 INFO org.apache.hadoop.mapreduce.Job (main): Job job_1559729030926_0001 completed successfully
2019-06-05 10:06:20,748 INFO org.apache.hadoop.mapreduce.Job (main): Counters: 54
  File System Counters
    FILE: Number of bytes read=1133
    FILE: Number of bytes written=518685
    FILE: Number of read operations=0
    FILE: Number of large read operations=0
    FILE: Number of write operations=0
    HDFS: Number of bytes read=3577
    HDFS: Number of bytes written=0
    HDFS: Number of read operations=13
    HDFS: Number of large read operations=0
    HDFS: Number of write operations=6
    S3: Number of bytes read=0
    S3: Number of bytes written=0
    S3: Number of read operations=0
    S3: Number of large read operations=0
    S3: Number of write operations=0
  Job Counters 
    Launched map tasks=1
    Launched reduce tasks=3
    Data-local map tasks=1
    Total time spent by all maps in occupied slots (ms)=129976
    Total time spent by all reduces in occupied slots (ms)=1425952
    Total time spent by all map tasks (ms)=2954
    Total time spent by all reduce tasks (ms)=16204
    Total vcore-milliseconds taken by all map tasks=2954
    Total vcore-milliseconds taken by all reduce tasks=16204
    Total megabyte-milliseconds taken by all map tasks=4159232
    Total megabyte-milliseconds taken by all reduce tasks=45630464
  Map-Reduce Framework
    Map input records=9
    Map output records=9
    Map output bytes=4421
    Map output materialized bytes=1121
    Input split bytes=170
    Combine input records=0
    Combine output records=0
    Reduce input groups=9
    Reduce shuffle bytes=1121
    Reduce input records=9
    Reduce output records=0
    Spilled Records=18
    Shuffled Maps =3
    Failed Shuffles=0
    Merged Map outputs=3
    GC time elapsed (ms)=625
    CPU time spent (ms)=19230
    Physical memory (bytes) snapshot=1539936256
    Virtual memory (bytes) snapshot=16474624000
    Total committed heap usage (bytes)=1281359872
  Shuffle Errors
    BAD_ID=0
    CONNECTION=0
    IO_ERROR=0
    WRONG_LENGTH=0
    WRONG_MAP=0
    WRONG_REDUCE=0
  File Input Format Counters 
    Bytes Read=3407
  File Output Format Counters 
    Bytes Written=0
2019-06-05 10:06:20,750 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): Try to recursively delete hdfs:/tmp/a1ae3c19-b66f-4eac-9f0a-c2cd80144d47/tempspace

Also as a digression, would it be possible to the records from the raw kinesis stream to collector-logs s3 bucket and use EMR ETL directly? That way I can remove the Stream Enrich part in my pipeline.

@shubhamg2208, you should not be using flag -f unless you are trying to recover from the failure as per the guide here: https://github.com/snowplow/snowplow/wiki/Batch-pipeline-steps#recovery-steps-for-stream-enrich-mode.

By skipping staging_stream_enrich step you essentially skip staging files to enriched:good bucket. As that bucket is empty, there are no good files to shred and hence no data to load to Redshift.

You are also missing enriched:archive bucket in your configuration.

@ihor Yes I realized the configuration options I was using were wrong. I added the enriched:archive bucket in the configuration and changed my command to:

AWS_SNOWPLOW_ACCESS_KEY=key AWS_SNOWPLOW_SECRET_KEY=secret snowplow-emr-etl-runner-r104 run -c emr-etl-runner-stream-enrich.config.yml --resolver iglu_resolver.json -t targets -n enrichments

It seems to be working now! All the data is now available in Redshift. Thanks for your help! :smiley:

Can you please share your config file of EmrEtlRunner
I have completed till this Stream Enrich -> kinesis-s3 -> s3 bucket( folder enriched/good/ *.gz file )
I am stuck with EmrEtlRunner step.

aws:
  access_key_id: XXXXXXXXXXXXX
  secret_access_key: XXXXXXXXXXXXXXXX
  s3:
    region: ap-south-1
    buckets:
      assets: s3://snowplow-hosted-assets 
      jsonpath_assets: 
      log: 
      encrypted: false
      enriched:
        good: 
        bad: 
        errors: 
        archive: s3://snowplow-kinesis-test/enriched/good/
      shredded:
        good: s3://snowplow-shredded-test/enrich/good
        bad: s3://snowplow-shredded-test/enrich/bad
        errors: s3://snowplow-shredded-test/enrich/errors
        archive: s3://snowplow-shredded-test/archive/enriched
    consolidate_shredded_output: false 
  emr:
    ami_version: 5.9.0
    region: ap-south-1
    jobflow_role: EMR_EC2_DefaultRole 
    service_role: EMR_DefaultRole 
    placement: 
    ec2_subnet_id: subnet-216t8f23
    ec2_key_name: test-snowplow.pem
    security_configuration: 
    bootstrap: []          
    software:
      hbase:                
      lingual:              
    jobflow:
      job_name: Snowplow Test # Give your job a name
      master_instance_type: t1.medium
      core_instance_count: 0
      core_instance_type: t1.medium
      task_instance_count: 0
      task_instance_type: m1.medium
      task_instance_bid: 
    bootstrap_failure_tries: 3
    configuration:
	    yarn-site:
		    yarn.resourcemanager.am.max-attempts: "1"
	    spark:
		    maximizeResourceAllocation: "true"
    additional_info:
collectors:
  format: thrift
enrich:
  versions:
    spark_enrich: 1.17.0 
  continue_on_unexpected_error: false 
  output_compression: NONE 
storage:
  versions:
    rdb_loader: 0.14.0
    rdb_shredder: 0.13.1        
    hadoop_elasticsearch: 0.1.0 
monitoring:
  tags: {} 
  logging:
    level: DEBUG

@anshratn1997, your configuration file is missing enriched:good and enriched:stream buckets. They are required to run EmrEtlRunner in Stream Enrich mode (see workflow diagram, step 1). The enriched:stream bucket should be the one you sink your stream enriched data to. The enriched:good bucket will play a similar role the processing bucket does when EmrEtlRunner runs in Standard mode.

@ihor
I have changed my config file for stream enrich mode.
Through running kinesis-s3 repo i have only (.gz files => s3://bucket-name/enriched/good/) in my s3 bucket.
should i put s3://bucket-name/enriched/good/ in enriched:stream or enriched:good or enriched:archive?
I have some more doubts ?
1-) which snowplow-emr-etl-runner version suitable for stream_enrich_mode
2-) Does EMRETL runner automatically create Amazon EMR cluster and kill itself when finished?
3-) If I provide target redshift.json directory , will it automatically create tables inside redshift db?

And please let me know for below config is correct or need to change anything.

aws:
  # Credentials can be hardcoded or set in environment variables
  access_key_id: XXXXXXXXXX
  secret_access_key: XXXXXXXXXXXXXXXXXXX
  s3:
    region: ap-south-1
    buckets:
      assets: s3://snowplow-hosted-assets # DO NOT CHANGE unless you are hosting the jarfiles etc yourself in your own bucket
      jsonpath_assets: # If you have defined your own JSON Schemas, add the s3:// path to your own JSON Path files in your own bucket here
      log: ADD HERE
      encrypted: false
      enriched:
        good: s3://kinesis-test/enrich/good/       # e.g. s3://my-out-bucket/enriched/good
        archive: s3://kinesis-test/archive/     # Where to archive enriched events to, e.g. s3://my-archive-bucket/enriched
        stream: s3://kinesis-test/enriched/good/     # S3 Loader's output folder with enriched data. If present raw buckets will be discarded
      shredded:
        good: s3://shredded-test/enrich/good       # e.g. s3://my-out-bucket/shredded/good
        bad: s3://shredded-test/enrich/bad        # e.g. s3://my-out-bucket/shredded/bad
        errors: s3://shredded-test/enrich/errors     # Leave blank unless :continue_on_unexpected_error: set to true below
        archive: s3://shredded-test/archive/enriched   # Where to archive shredded events to, e.g. s3://my-archive-bucket/shredded
    consolidate_shredded_output: false # Whether to combine files when copying from hdfs to s3
  emr:
    ami_version: 5.9.0
    region: ap-south-1        # Always set this
    jobflow_role: EMR_EC2_DefaultRole # Created using $ aws emr create-default-roles
    service_role: EMR_DefaultRole     # Created using $ aws emr create-default-roles
    placement:     # Set this if not running in VPC. Leave blank otherwise
    ec2_subnet_id: subnet-22bec8f4b # Set this if running in VPC. Leave blank otherwise
    ec2_key_name: test.pem
    security_configuration:  # Specify your EMR security configuration if needed. Leave blank otherwise
    bootstrap: []           # Set this to specify custom boostrap actions. Leave empty otherwise
    software:
      hbase:                # Optional. To launch on cluster, provide version, "0.92.0", keep quotes. Leave empty otherwise.
      lingual:              # Optional. To launch on cluster, provide version, "1.1", keep quotes. Leave empty otherwise.
    # Adjust your Hadoop cluster below
    jobflow:
      job_name: Snowplow ETL # Give your job a name
      master_instance_type: m4.large
      core_instance_count: 2
      core_instance_type: m4.large
      core_instance_ebs:    # Optional. Attach an EBS volume to each core instance.
        volume_size: 100    # Gigabytes
        volume_type: "gp2"
        volume_iops: 400    # Optional. Will only be used if volume_type is "io1"
        ebs_optimized: false # Optional. Will default to true
      task_instance_count: 0 # Increase to use spot instances
      task_instance_type: m4.large
      task_instance_bid: 0.015 # In USD. Adjust bid, or leave blank for non-spot-priced (i.e. on-demand) task instances
    bootstrap_failure_tries: 3 # Number of times to attempt the job in the event of bootstrap failures
    configuration:
      yarn-site:
        yarn.resourcemanager.am.max-attempts: "1"
      spark:
        maximizeResourceAllocation: "true"
    additional_info:        # Optional JSON string for selecting additional features
enrich:
  versions:
    spark_enrich: 1.17.0 # Version of the Spark Enrichment process
  output_compression: GZIP # Stream mode supports only GZIP
storage:
  versions:
    rdb_loader: 0.14.0
    rdb_shredder: 0.13.1        # Version of the Spark Shredding process
    hadoop_elasticsearch: 0.1.0 # Version of the Hadoop to Elasticsearch copying process
monitoring:
  tags: {} # Name-value pairs describing this job
  logging:
    level: DEBUG # You can optionally switch to INFO for production
  snowplow:
    method: get
    app_id: ADD HERE # e.g. snowplow
    collector: ADD HERE # e.g. d3rkrsqld9gmqf.cloudfront.net
    protocol: http
    port: 80

As per my earlier comment “The enriched:stream bucket should be the one you sink your stream enriched data to”. This the I expect you to load your enriched data with S3 Loader into enriched:stream bucket. Using S3 Loder ensures the correct files format for further processing (shredding and loading to Redshift).

Stream Enrich mode was introduced with R102 and is available in all the following releases (the latest release as of today is R116).

Yes. It is also possible to use EmrEtlRunner on persistent EMR cluster with --use-persistent-jobflow option if needed/appropriate.

No, you would have to create the required tables manually. It is possible but that requires a much more complicated setup and we are testing it out now for our managed client’s pipelines.

I would use separate (distinct) buckets for streamed, batched processed and archive buckets.

Hi ihor, so EmrEtlRunner in stream_enrich_mode will not continuously? Does that mean if we want to keep running this, we have to schedule it just like Standard mode?

I kinda expect stream_enrich_mode will make it run continuously in streaming mode.

@aditya, you have the flexibility of running it with both transient and persistent clusters. However, EmrEtlRunner is designed for batch processing. Running it in Stream Enrich mode is an extended functionality to allow using data enriched in real-time. Currently, you cannot load data to Redshift in real-time.

To work with real-time data you need to read it off Kinesis Stream Enrich with Lambda and utilizing one of our Snowplow Analytics SDK designed to work with enriched Snowplow data.

@ihor

I am using this emretlrunner version snowplow_emr_r116_madara_rider.zip
I am getting this error no run folders found in s3://kinesis-test/enriched/good/ bucket
Can you please help me with correct file format that emretlrunner is looking for in bucket.

@aditya, you have the flexibility of running it with both transient and persistent clusters. However, EmrEtlRunner is designed for batch processing. Running it in Stream Enrich mode is an extended functionality to allow using data enriched in real-time. Currently, you cannot load data to Redshift in real-time.

To work with real-time data you need to read it off Kinesis Stream Enrich with Lambda and utilizing one of our Snowplow Analytics SDK designed to work with enriched Snowplow data.

I hope my understanding here is correct. So basically the lambda function is triggered everytime there is a stream into Enrich and insert that many records into PostgreSQL. That’s interesting method to explore but I would need to read up first the Snowplow Analytics SDK to know more and possibly come out with questions.

One question though: Why is this not described in snowplow’s github Wiki setup page? I am aware that the Snowplow Analytics SDK link up to github wiki page but I did not find this is being linked to in Setup Guide page (or maybe I missed something). This is a very useful suggestion to write realtime analytics result to data stores.

@anshratn1997, the run folder is created when the files are staged in Stream Enrich mode (step 1 of the Dataflow diagram) - files moved from enriched:stream to enriched:good by EmrEtlRunner. You need to make sure you do not skip that step, staging_stream_enrich.

We build a lot of various applications and in particular those that help to work with Snowplow events. Snowplow Analytics SDKs are just some of them. It’s up to you how you build your pipeline. Our standard real-time (lambda) architecture is depicted in these diagrams.