Hi there,
Just started using Snowplow. Have got the Scala collector -> Kinesis -> S3
pipe working and sending events using our Javascript tracker; we are seeing .gz
files in the S3 bucket as expected.
Am now trying to get the enrichment part working. I am running the EmrETLRunner:
./snowplow-emr-etl-runner --config /etc/snowplow/emretlrunner.conf --resolver /etc/snowplow/resolver.conf
where the config file has the following contents:
aws:
# Credentials can be hardcoded or set in environment variables
access_key_id: <%= ENV['AWS_ACCESS_KEY_ID'] %>
secret_access_key: <%= ENV['AWS_SECRET_ACCESS_KEY'] %>
s3:
region: us-west-2
buckets:
assets: s3n://snowplow-hosted-assets # DO NOT CHANGE unless you are hosting the jarfiles etc yourself in your own bucket
jsonpath_assets: # If you have defined your own JSON Schemas, add the s3:// path to your own JSON Path files in your own bucket here
log: s3n://snowplow-kinesis-logs
raw:
in:
- s3n://snowplow-kinesis # Multiple in buckets are permitted
processing: s3n://snowplow-kinesis-processing/processed
archive: s3n://snowplow-kinesis-archive/raw # e.g. s3://my-archive-bucket/raw
enriched:
good: s3n://snowplow-kinesis-output/enriched/good # e.g. s3://my-out-bucket/enriched/good
bad: s3n://snowplow-kinesis-output/enriched/bad # e.g. s3://my-out-bucket/enriched/bad
errors: # Leave blank unless :continue_on_unexpected_error: set to true below
archive: s3n://snowplow-kinesis-archive/enriched # Where to archive enriched events to, e.g. s3://my-archive-bucket/enriched
shredded:
good: s3n://snowplow-kinesis-output/shredded/good # e.g. s3://my-out-bucket/shredded/good
bad: s3n://snowplow-kinesis-output/shredded/bad # e.g. s3://my-out-bucket/shredded/bad
errors: # Leave blank unless :continue_on_unexpected_error: set to true below
archive: s3n://snowplow-kinesis-archive/shredded # Where to archive shredded events to, e.g. s3://my-archive-bucket/shredded
emr:
ami_version: 4.5.0
region: us-west-2 # Always set this
jobflow_role: EMR_EC2_DefaultRole # Created using $ aws emr create-default-roles
service_role: EMR_DefaultRole # Created using $ aws emr create-default-roles
placement: # Set this if not running in VPC. Leave blank otherwise
ec2_subnet_id: i45643454 # Set this if running in VPC. Leave blank otherwise
ec2_key_name: keyname # ec2_key_name here...
bootstrap: # Set this to specify custom boostrap actions. Leave empty otherwise
software:
hbase: # Optional. To launch on cluster, provide version, "0.92.0", keep quotes. Leave empty otherwise.
lingual: # Optional. To launch on cluster, provide version, "1.1", keep quotes. Leave empty otherwise.
# Adjust your Hadoop cluster below
jobflow:
master_instance_type: m1.medium
core_instance_count: 2
core_instance_type: m3.xlarge
task_instance_count: 0 # Increase to use spot instances
task_instance_type: m1.medium
task_instance_bid: 0.05 # In USD. Adjust bid, or leave blank for non-spot-priced (i.e. on-demand) task instances
bootstrap_failure_tries: 3 # Number of times to attempt the job in the event of bootstrap failures
additional_info: # Optional JSON string for selecting additional features
collectors:
format: thrift # For example: 'clj-tomcat' for the Clojure Collector, 'thrift' for Thrift records, 'tsv/co
enrich:
job_name: Snowplow ETL # Give your job a name
versions:
hadoop_enrich: 1.7.0 # Version of the Hadoop Enrichment process
hadoop_shred: 0.9.0 # Version of the Hadoop Shredding process
hadoop_elasticsearch: 0.1.0 # Version of the Hadoop to Elasticsearch copying process
continue_on_unexpected_error: true # Set to 'true' (and set :out_errors: above) if you don't want any exceptions thrown from ETL
output_compression: GZIP # Compression onh Redshift, set to NONE if you have Postgres targets. Allowed formats: NONE, GZIP
storage:
download:
folder: # Postgres-only config option. Where to store the downloaded files. Leave blank for Redshift
targets:
- name: "snowplow"
type: redshift
host: #
database: enriched # Name of database
port: 5439 # Default Redshift port
ssl_mode: disable # One of disable (default), require, verify-ca or verify-full
table: atomic.events
username: snowplow
password: #
maxerror: 1 # Stop loading on first error, or increase to permit more load errors
comprows: 200000 # Default for a 1 XL node cluster. Not used unless --include compupdate specified
monitoring:
tags: {} # Name-value pairs describing this job
logging:
level: DEBUG # You can optionally switch to INFO for production
When I run this, it correctly copies the Thrift files (assuming there are an even number of them…) to the snowplow-kinesis-processing/processed
bucket (i am generally just trying with a couple of small files) and spins up the EMR instance. It then seems to copy the events to the HDFS and Enrich them, but cannot output them back to S3:
Looking at the stderr file for the failing step (Elasticity S3DistCp Step: Enriched HDFS -> S3
):
Exception in thread "main" java.lang.RuntimeException: Error running job
at com.amazon.elasticmapreduce.s3distcp.S3DistCp.run(S3DistCp.java:927)
...
Caused by: org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://ip-10-2-1-227.us-west-2.compute.internal:8020/tmp/63be916c-da0a-46e7-b4b2-1855a00424cd/files
at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:317)
I’m not sure what this means; does it mean it tried to copy something back but there was nothing to copy? If so and the enrichment didn’t generate anything, why is nothing in the enrichment/bad/ bucket?
All pointers to a solution much appreciated.
Brian