I set up the collector to push a kinesis stream - snowplow-good. Then the enrichment to use kinesis streams - reading from snowplow-good and output to snowplow-enriched. I then configured the s3loader to read from snowplow-enriched and dump the shredded (right term?) files to s3://snowplow-collector/shredded/good. I see that s3loader is producing the files in the tsv format, one event per line. All seems to be working as the documentation says.
Redshift is already set up following the documentation and I’m currently trying to get the emretlrunner to load the shredded s3 files into redshift. The problem I am facing is that emretlrunner produces this message even though there are new files in the bucket
java -Dorg.slf4j.simpleLogger.defaultLogLevel=debug -jar snowplow-emr-etl-runner run -c emretlrunner.yml -r /home/collector/config/iglu_resolver.json -t targets --skip staging_stream_enrich
“D, [2020-02-10T21:20:15.933053 #4193] DEBUG – : Initializing EMR jobflow
No logs to process: No Snowplow logs to process since last run”
I suspect that I need to limit the steps that emretlrunner is executing to only the rdb_load step. Is that correct? How do I do that?
my emretlrunner config is below. Please note that I added s3 buckets for raw and enriched because without them I would get a contract error. I don’t need them because I’m using streams for everything before the s3loader process.
aws:
# Credentials can be hardcoded or set in environment variables
access_key_id: xxxxxxx
secret_access_key: xxxxxxxx
s3:
region: us-east-2
buckets:
assets: s3://snowplow-hosted-assets # DO NOT CHANGE unless you are hosting the jarfiles etc yourself in your own bucket
jsonpath_assets: # If you have defined your own JSON Schemas, add the s3:// path to your own JSON Path files in your own bucket here
log: s3://snowplow-s3-to-redshift-log
encrypted: false # Whether the buckets below are enrcrypted using server side encryption (SSE-S3)
raw:
in: # This is a YAML array of one or more in buckets - you MUST use hyphens before each entry in the array, as below
- s3://snowplow-collector/raw/old #ADD HERE # e.g. s3://my-old-collector-bucket
- s3://snowplow-collector/raw/new #ADD HERE # e.g. s3://my-new-collector-bucket
processing: s3://snowplow-collector/raw/processing #ADD HERE
archive: s3://snowplow-collector/raw/archive #ADD HERE # e.g. s3://my-archive-bucket/raw
enriched:
good: s3://snowplow-collector/enriched/good #ADD HERE # e.g. s3://my-out-bucket/enriched/good
bad: s3://snowplow-collector/enriched/bad #ADD HERE # e.g. s3://my-out-bucket/enriched/bad
errors: s3://snowplow-collector/enriched/errors #ADD HERE # Leave blank unless :continue_on_unexpected_error: set to true below
archive: s3://snowplow-collector/enriched/archive #ADD HERE # Where to archive enriched events to, e.g. s3://my-archive-bucket/enriched
shredded:
good: s3://snowplow-collector/shredded/good # e.g. s3://my-out-bucket/shredded/good
bad: s3://snowplow-collector/shredded/bad # e.g. s3://my-out-bucket/shredded/bad
errors: #ADD HERE # Leave blank unless :continue_on_unexpected_error: set to true below
archive: s3://snowplow-collector/shredded/archive # Where to archive shredded events to, e.g. s3://my-archive-bucket/shredded
consolidate_shredded_output: false # Whether to combine files when copying from hdfs to s3
emr:
ami_version: 5.9.0
region: us-east-2 # Always set this
jobflow_role: EMR_EC2_DefaultRole # Created using $ aws emr create-default-roles
service_role: EMR_DefaultRole # Created using $ aws emr create-default-roles
placement: #ADD HERE # Set this if not running in VPC. Leave blank otherwise
ec2_subnet_id: subnet-xxxxx # Set this if running in VPC. Leave blank otherwise
ec2_key_name: collector-xxxxxx
security_configuration: #ADD HERE # Specify your EMR security configuration if needed. Leave blank otherwise
bootstrap: [] # Set this to specify custom boostrap actions. Leave empty otherwise
software:
hbase: # Optional. To launch on cluster, provide version, "0.92.0", keep quotes. Leave empty otherwise.
lingual: # Optional. To launch on cluster, provide version, "1.1", keep quotes. Leave empty otherwise.
# Adjust your Hadoop cluster below
jobflow:
job_name: Snowplow ETL # Give your job a name
master_instance_type: m1.medium
core_instance_count: 2
core_instance_type: m1.medium
core_instance_bid: 0.015
core_instance_ebs: # Optional. Attach an EBS volume to each core instance.
volume_size: 100 # Gigabytes
volume_type: "gp2"
volume_iops: 400 # Optional. Will only be used if volume_type is "io1"
ebs_optimized: false # Optional. Will default to true
task_instance_count: 0 # Increase to use spot instances
task_instance_type: m1.medium
task_instance_bid: 0.015 # In USD. Adjust bid, or leave blank for non-spot-priced (i.e. on-demand) task instances
bootstrap_failure_tries: 3 # Number of times to attempt the job in the event of bootstrap failures
configuration:
yarn-site:
yarn.resourcemanager.am.max-attempts: "1"
spark:
maximizeResourceAllocation: "true"
additional_info: # Optional JSON string for selecting additional features
collectors:
format: cloudfront # For example: 'clj-tomcat' for the Clojure Collector, 'thrift' for Thrift records, 'tsv/com.amazon.aws.cloudfront/wd_access_log' for Cloudfront access logs or 'ndjson/urbanairship.connect/v1' for UrbanAirship Connect events
enrich:
versions:
spark_enrich: 1.18.0 # Version of the Spark Enrichment process
continue_on_unexpected_error: false # Set to 'true' (and set :out_errors: above) if you don't want any exceptions thrown from ETL
output_compression: NONE # Compression only supported with Redshift, set to NONE if you have Postgres targets. Allowed formats: NONE, GZIP
storage:
versions:
rdb_loader: 0.14.0
rdb_shredder: 0.13.1 # Version of the Spark Shredding process
hadoop_elasticsearch: 0.1.0 # Version of the Hadoop to Elasticsearch copying process
monitoring:
tags: {} # Name-value pairs describing this job
logging:
level: DEBUG # You can optionally switch to INFO for production
snowplow:
method: get
protocol: http
port: 8080
app_id: snowplow # e.g. snowplow
collector: xxxxxxxxxx # e.g. d3rkrsqld9gmqf.cloudfront.net
Thanks for your help,
JB