I am trying to process my s3 logs into Redshift but i am getting error
Error
Loading Snowplow events and shredded types into sp (Redshift cluster)…
Unexpected error: Cannot find atomic-events directory in shredded/good
uri:classloader:/storage-loader/lib/snowplow-storage-loader/redshift_loader.rb:74:inload_events_and_shredded_types' uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_reference.rb:43:in
send_to’
uri:classloader:/gems/contracts-0.11.0/lib/contracts/call_with.rb:76:incall_with' uri:classloader:/gems/contracts-0.11.0/lib/contracts/method_handler.rb:138:in
block in redefine_method’
uri:classloader:/storage-loader/bin/snowplow-storage-loader:54:inblock in (root)' uri:classloader:/storage-loader/bin/snowplow-storage-loader:51:in
’
org/jruby/RubyKernel.java:973:inload' uri:classloader:/META-INF/main.rb:1:in
’
org/jruby/RubyKernel.java:955:inrequire' uri:classloader:/META-INF/main.rb:1:in
(root)’
uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/rubygems/core_ext/kernel_require.rb:1:in `’
my setup: Scala Stream Collector > Stream Enrich > Kinesis > S3 > storage loader > redshift
enricher.conf
enrich {
source = “kinesis”
sink = “kinesis”
aws {
access-key: “key”
secret-key: “key”
}
kafka {
brokers: “{{enrichKafkaBrokers}}”
}
streams {
in: {
raw: “good”
maxRecords: 10000
buffer: {
byte-limit: 4500000
record-limit: 500 # Not supported by Kafka; will be ignored
time-limit: 60000
}
}
out: {
enriched: “enriched”
bad: “bad”
backoffPolicy: {
minBackoff: 3000
maxBackoff: 600000
}
}
app-name: “enricher-app”
initial-position = “TRIM_HORIZON”
region: “us-west-2”
}
monitoring {
snowplow {
collector-uri: “xx.xx.xx.xx”
collector-port: 80
app-id: “collector-monitor”
method: “GET”
}
}
}
snowplow-kinesis-s3-0.4.0 config:
sink {
aws {
access-key: “key”
secret-key: “key”
}
kinesis {
in {
stream-name: “good”
initial-position: “TRIM_HORIZON”
max-records: “10000”
}
out {
stream-name: “bad”
}
region: “us-west-2”
app-name: “s3-sink-app”
}
s3 {
region: “us-west-2”
endpoint: “http://s3-us-west-2.s3.amazonaws.com”
bucket: “bucket-name/logs”
max-timeout: “300000”
format: “lzo”
}
buffer {
byte-limit: 4500000
record-limit: 500 # Not supported by Kafka; will be ignored
time-limit: 60000
}
logging {
level: “error”
}
}
Storage Loader config:
aws:
Credentials can be hardcoded or set in environment variables
access_key_id: XXXXXXXXXXXXX
secret_access_key: XXXXXXXXXXXXX
s3:
region: us-west-2
buckets:
assets: s3://snowplow-hosted-assets # DO NOT CHANGE unless you are hosting the jarfiles etc yourself in your own bucket
jsonpath_assets: # If you have defined your own JSON Schemas, add the s3:// path to your own JSON Path files in your own bucket here
log: s3://s3-bucket-log/slog/
raw: in: - s3://s3-bucket-log/logs # e.g. s3://my-in-bucket processing: s3://s3-bucket-log/raw/processing archive: s3://s3-bucket-log/archive/raw # e.g. s3://my-archive-bucket/raw enriched: good: s3://s3-bucket-log/enrich/good # e.g. s3://my-out-bucket/enriched/good bad: s3://s3-bucket-log/enrich/bad # e.g. s3://my-out-bucket/enriched/bad errors: s3://s3-bucket-log/enrich/errors # Leave blank unless :continue_on_unexpected_error: set to true below archive: s3://s3-bucket-log/enriched # Where to archive enriched events to, e.g. s3://my-archive-bucket/enriched shredded: good: s3://s3-bucket-log/shredded/good # e.g. s3://my-out-bucket/shredded/good bad: s3://s3-bucket-log/shredded/bad # e.g. s3://my-out-bucket/shredded/bad errors: s3://s3-bucket-log/shredded/errors # Leave blank unless :continue_on_unexpected_error: set to true below archive: s3://s3-bucket-log/archive/shredded # Where to archive shredded events to, e.g. s3://my-archive-bucket/shredded
emr:
ami_version: 4.5.0
region: us-west-2 # Always set this
jobflow_role: EMR_EC2_DefaultRole # Created using $ aws emr create-default-roles
service_role: EMR_DefaultRole # Created using $ aws emr create-default-roles
placement: # Set this if not running in VPC. Leave blank otherwise
ec2_subnet_id: XXXXXXXXXXX # Set this if running in VPC. Leave blank otherwise
ec2_key_name: XXXXXXXXX
bootstrap: # Set this to specify custom boostrap actions. Leave empty otherwise
software:
hbase: # Optional. To launch on cluster, provide version, “0.92.0”, keep quotes. Leave empty otherwise.
lingual: # Optional. To launch on cluster, provide version, “1.1”, keep quotes. Leave empty otherwise.
# Adjust your Hadoop cluster below
jobflow:
master_instance_type: m1.medium
core_instance_count: 2
core_instance_type: m1.medium
task_instance_count: 0 # Increase to use spot instances
task_instance_type: m1.medium
task_instance_bid: 0.015 # In USD. Adjust bid, or leave blank for non-spot-priced (i.e. on-demand) task instances
bootstrap_failure_tries: 3 # Number of times to attempt the job in the event of bootstrap failures
additional_info: # Optional JSON string for selecting additional features
collectors:
format: thrift # For example: ‘clj-tomcat’ for the Clojure Collector, ‘thrift’ for Thrift records, ‘tsv/com.amazon.aws.cloudfront/wd_access_log’ for Cloudfront access logs or ‘ndjson/urbanairship.connect/v1’ for UrbanAirship Connect events
enrich:
job_name: snowplow-enrich # Give your job a name
versions:
hadoop_enrich: 1.8.0 # Version of the Hadoop Enrichment process
hadoop_shred: 0.9.0 # Version of the Hadoop Shredding process
hadoop_elasticsearch: 0.1.0 # Version of the Hadoop to Elasticsearch copying process
continue_on_unexpected_error: false # Set to ‘true’ (and set :out_errors: above) if you don’t want any exceptions thrown from ETL
output_compression: NONE # Compression only supported with Redshift, set to NONE if you have Postgres targets. Allowed formats: NONE, GZIP
storage:
download:
folder: # Postgres-only config option. Where to store the downloaded files. Leave blank for Redshift
targets:
- name: “XXXX”
type: redshift
host: xxxxxxxxxxxxx.redshift.amazonaws.com # The endpoint as shown in the Redshift console
database: XXXXXXXXX
port: 5439
ssl_mode: disable
table: atomic.events
username: XXXXXXXX
password: XXXXXXXXX
maxerror: 10 # Stop loading on first error, or increase to permit more load errors
comprows: 200000 # Default for a 1 XL node cluster. Not used unless --include compupdate specified
monitoring:
logging:
level: DEBUG