I am using the following pipeline.
ScalaStreamCollector → StreamEnricherToKinesis → S3SnowPlowLoader → S3 → EmrEtlRunner in Enrich Mode → DataLoader → Redshift
Here is my config file:
# Credentials can be hardcoded or set in environment variables
access_key_id: <%= ENV['AWS_SNOWPLOW_ACCESS_KEY'] %>
secret_access_key: <%= ENV['AWS_SNOWPLOW_SECRET_KEY'] %>
region: us-east-2
assets: s3n://snowplow-hosted-assets-us-east-2 # DO NOT CHANGE unless you are hosting the jarfiles etc yourself in your own bucket
jsonpath_assets: # If you have defined your own JSON Schemas, add the s3:// path to your own JSON Path files in your own bucket here
log: s3n://snowplow-logs.instacar.in/logs
encrypted: false # Whether the buckets below are enrcrypted using server side encryption (SSE-S3)
good: s3n://snowplow-databucket.instacar.in/good # e.g. s3://my-out-bucket/enriched/good
bad: s3n://snowplow-databucket.instacar.in/bad # e.g. s3://my-out-bucket/enriched/bad
errors: s3n://snowplow-databucket.instacar.in/errors # Leave blank unless :continue_on_unexpected_error: set to true below
archive: s3n://snowplow-databucket.instacar.in/archive #Where to archive enriched events to, e.g. s3://my-archive-bucket/enriched
stream: s3n://snowplow-enrichedrecords.instacar.in
good: s3n://snowplow-databucket.instacar.in/shredded/good # e.g. s3://my-out-bucket/shredded/good
bad: s3n://snowplow-databucket.instacar.in/shredded/bad # e.g. s3://my-out-bucket/shredded/bad
errors: s3n://snowplow-databucket.instacar.in/shredded/errors # Leave blank unless :continue_on_unexpected_error: set to true below
archive: s3n://snowplow-databucket.instacar.in/shredded/archive # Where to archive shredded events to,e.g. s3://my-archive-bucket/shredded
consolidate_shredded_output: false # Whether to combine files when copying from hdfs to s3
ami_version: 5.9.0
region: us-east-2 # Always set this
jobflow_role: EMR_EC2_DefaultRole # Created using $ aws emr create-default-roles
service_role: EMR_DefaultRole # Created using $ aws emr create-default-roles
placement: # Set this if not running in VPC. Leave blank otherwise
ec2_subnet_id: subnet-2e0b6f47 # Set this if running in VPC. Leave blank otherwise
ec2_key_name: snowplow1
security_configuration: # Specify your EMR security configuration if needed. Leave blank otherwise
bootstrap: [] # Set this to specify custom boostrap actions. Leave empty otherwise
hbase: # Optional. To launch on cluster, provide version, "0.92.0", keep quotes. Leave empty otherwise.
lingual: # Optional. To launch on cluster, provide version, "1.1", keep quotes. Leave empty otherwise.
# Adjust your Hadoop cluster below
job_name: SnowplowETL # Give your job a name
master_instance_type: m4.2xlarge
core_instance_count: 2
core_instance_type: m4.2xlarge
core_instance_bid: 0.1
core_instance_ebs: # Optional. Attach an EBS volume to each core instance.
volume_size: 100 # Gigabytes
volume_type: "gp2"
volume_iops: 400 # Optional. Will only be used if volume_type is "io1"
ebs_optimized: false # Optional. Will default to true
task_instance_count: 0 # Increase to use spot instances
task_instance_type: m1.medium
task_instance_bid: 0.1 # In USD. Adjust bid, or leave blank for non-spot-priced (i.e. on-demand) task instances
bootstrap_failure_tries: 3 # Number of times to attempt the job in the event of bootstrap failures
yarn.nodemanager.vmem-check-enabled: "false"
yarn.nodemanager.resource.memory-mb: "27648"
yarn.scheduler.maximum-allocation-mb: "27648"
yarn.resourcemanager.am.max-attempts: "2"
maximizeResourceAllocation: "true"
spark.dynamicAllocation.enabled: "false"
spark.executor.instances: "2"
spark.yarn.executor.memoryOverhead: "1024"
spark.executor.memory: 8G
spark.executor.cores: "1"
spark.yarn.driver.memoryOverhead: "1024"
spark.driver.memory: 8G
spark.driver.cores: "1"
spark.default.parallelism: "8"
additional_info: # Optional JSON string for selecting additional features
format: thrift # For example: 'clj-tomcat' for the Clojure Collector, 'thrift' for Thrift records, 'tsv/com.amazon.aws.cloudfront/wd_access_log' for Cloudfront access logs or 'ndjson/urbanairship.connect/v1' for UrbanAirship Connect events
spark_enrich: 1.18.0 # Version of the Spark Enrichment process
continue_on_unexpected_error: false # Set to 'true' (and set :out_errors: above) if you don't want any exceptions thrown from ETL
output_compression: NONE # Compression only supported with Redshift, set to NONE if you have Postgres targets. Allowed formats: NONE, GZIP
rdb_loader: 0.14.0
rdb_shredder: 0.13.1 # Version of the Spark Shredding process
hadoop_elasticsearch: 0.1.0 # Version of the Hadoop to Elasticsearch copying process
tags: {} # Name-value pairs describing this job
level: DEBUG # You can optionally switch to INFO for production
Here is my Redshift target file ( all usernames and pwds have been redacted ) :
"schema": "iglu:com.snowplowanalytics.snowplow.storage/redshift_config/jsonschema/2-1-0",
"data": {
"name": "Redshiftenriched events storage",
"host": "inscompanysnowflakecluster.c2nasstlico1.us-east-2.redshift.amazonaws.com",
"database": "snowplow",
"port": 5439,
"username": "myuser",
"password": "mypassword",
"schema": "atomic",
"compRows": 1000,
"sshTunnel": null,
"sslMode": "DISABLE",
"purpose": "ENRICHED_EVENTS",
"id": "56gh2265fs-dbc1-11ea-87d0-0242ac130003",
"compRows": 10000,
"maxError": 10,
"roleArn": "arn:aws:iam::605934722134:role/RedshiftLoadRole"
I see the archive folder in and records all tab separated.
Here is the archived folder with atomic events in S3. I checked RedshiftLogs, and i see a COPy to Redshift. But appears , nothing was copied since the source was empty.
src here is:
COPY atomic.events
's3://snowplow-databucket.instacar.in/shredded/good/run=2020-08-15-07-03-11/atomic-events/' CREDENTIALS '' REGION AS 'us-east-2' DELIMITER ' ' MAXERROR 2 EMPTYASNULL FILLRECORD TRUNCATECOLUMNS TIMEFORMAT 'auto' ACCEPTINVCHARS
It seems like the src bucket for s3 from where dataloader picks up records is empty. No able to figure out, where .