Just for some context here are our configs for the pipeline:
collector.conf
# Copyright (c) 2013-2018 Snowplow Analytics Ltd. All rights reserved.
#
# This program is licensed to you under the Apache License Version 2.0, and
# you may not use this file except in compliance with the Apache License
# Version 2.0. You may obtain a copy of the Apache License Version 2.0 at
# http://www.apache.org/licenses/LICENSE-2.0.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the Apache License Version 2.0 is distributed on an "AS
# IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the Apache License Version 2.0 for the specific language
# governing permissions and limitations there under.
# This file (application.conf.example) contains a template with
# configuration options for the Scala Stream Collector.
#
# To use, copy this to 'application.conf' and modify the configuration options.
# 'collector' contains configuration options for the main Scala collector.
collector {
# The collector runs as a web service specified on the following interface and port.
interface = "0.0.0.0"
port = "8080"
# Configure the P3P policy header.
p3p {
policyRef = "/w3c/p3p.xml"
CP = "NOI DSP COR NID PSA OUR IND COM NAV STA"
}
# Cross domain policy configuration.
# If "enabled" is set to "false", the collector will respond with a 404 to the /crossdomain.xml
# route.
crossDomain {
enabled = false
# Domain that is granted access, *.acme.com will match http://acme.com and http://sub.acme.com
domains = [ "*" ]
# Whether to only grant access to HTTPS or both HTTPS and HTTP sources
secure = true
}
# The collector returns a cookie to clients for user identification
# with the following domain and expiration.
cookie {
enabled = true
expiration = "365 days"
# Network cookie name
name = "piv-kenesis-collector"
# The domain is optional and will make the cookie accessible to other
# applications on the domain. Comment out this line to tie cookies to
# the collector's full domain
# domain = "{{cookieDomain}}"
}
doNotTrackCookie {
enabled = false
name = "" #{{doNotTrackCookieName}}
value = "" #{{doNotTrackCookieValue}}
}
# When enabled and the cookie specified above is missing, performs a redirect to itself to check
# if third-party cookies are blocked using the specified name. If they are indeed blocked,
# fallbackNetworkId is used instead of generating a new random one.
cookieBounce {
enabled = false
# The name of the request parameter which will be used on redirects checking that third-party
# cookies work.
name = "n3pc"
# Network user id to fallback to when third-party cookies are blocked.
fallbackNetworkUserId = "00000000-0000-4000-A000-000000000000"
# Optionally, specify the name of the header containing the originating protocol for use in the
# bounce redirect location. Use this if behind a load balancer that performs SSL termination.
# The value of this header must be http or https. Example, if behind an AWS Classic ELB.
forwardedProtocolHeader = "X-Forwarded-Proto"
}
# When enabled, the redirect url passed via the `u` query parameter is scanned for a placeholder
# token. All instances of that token are replaced withe the network ID. If the placeholder isn't
# specified, the default value is `${SP_NUID}`.
redirectMacro {
enabled = false
# Optional custom placeholder token (defaults to the literal `${SP_NUID}`)
placeholder = "[TOKEN]"
}
rootResponse {
enabled = false
statusCode = 302
# Optional, defaults to empty map
headers = {
Location = "https://127.0.0.1/",
X-Custom = "something"
}
#Optional, defaults to empty string
body = "302, redirecting"
}
streams {
# Events which have successfully been collected will be stored in the good stream/topic
good = stream-${AWS_REGION}-${PRODUCTION_ENV}-collector-to-enricher
# Events that are too big (w.r.t Kinesis 1MB limit) will be stored in the bad stream/topic
bad = stream-${AWS_REGION}-${PRODUCTION_ENV}-bad-data-to-s3
# Whether to use the incoming event's ip as the partition key for the good stream/topic
# Note: Nsq does not make use of partition key.
useIpAddressAsPartitionKey = false
# Enable the chosen sink by uncommenting the appropriate configuration
sink {
# Choose between kinesis, googlepubsub, kafka, nsq, or stdout.
# To use stdout, comment or remove everything in the "collector.streams.sink" section except
# "enabled" which should be set to "stdout".
## enabled = kinesis
enabled = kinesis
# Region where the streams are located
region = ${AWS_REGION}
# Thread pool size for Kinesis API requests
threadPoolSize = 10
# The following are used to authenticate for the Amazon Kinesis sink.
# If both are set to 'default', the default provider chain is used
# (see http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)
# If both are set to 'iam', use AWS IAM Roles to provision credentials.
# If both are set to 'env', use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
aws {
accessKey = "default"
secretKey = "default"
}
# Minimum and maximum backoff periods, in milliseconds
backoffPolicy {
minBackoff = 1000 #{{minBackoffMillis}}
maxBackoff = 3000 #{{maxBackoffMillis}}
}
# Or Google Pubsub
#googleProjectId = ID
# Minimum, maximum and total backoff periods, in milliseconds
# and multiplier between two backoff
#backoffPolicy {
# minBackoff = {{minBackoffMillis}}
# maxBackoff = {{maxBackoffMillis}}
# totalBackoff = {{totalBackoffMillis}} # must be >= 10000
# multiplier = {{backoffMultiplier}}
#}
# Or Kafka
#brokers = "{{kafkaBrokers}}"
## Number of retries to perform before giving up on sending a record
#retries = 0
# Or NSQ
## Host name for nsqd
#host = "{{nsqHost}}"
## TCP port for nsqd, 4150 by default
#port = {{nsqdPort}}
}
# Incoming events are stored in a buffer before being sent to Kinesis/Kafka.
# Note: Buffering is not supported by NSQ.
# The buffer is emptied whenever:
# - the number of stored records reaches record-limit or
# - the combined size of the stored records reaches byte-limit or
# - the time in milliseconds since the buffer was last emptied reaches time-limit
buffer {
byteLimit: 4000000
recordLimit: 500
timeLimit: 5000 # "" #{{bufferTimeThreshold}}
}
}
}
# Akka has a variety of possible configuration options defined at
# http://doc.akka.io/docs/akka/current/scala/general/configuration.html
akka {
loglevel = DEBUG # 'OFF' for no logging, 'DEBUG' for all logging.
loggers = ["akka.event.slf4j.Slf4jLogger"]
# akka-http is the server the Stream collector uses and has configurable options defined at
# http://doc.akka.io/docs/akka-http/current/scala/http/configuration.html
http.server {
# To obtain the hostname in the collector, the 'remote-address' header
# should be set. By default, this is disabled, and enabling it
# adds the 'Remote-Address' header to every request automatically.
remote-address-header = on
raw-request-uri-header = on
# Define the maximum request length (the default is 2048)
parsing {
max-uri-length = 32768
uri-parsing-mode = relaxed
}
}
}
enricher.conf
# Copyright (c) 2013-2018 Snowplow Analytics Ltd. All rights reserved.
#
# This program is licensed to you under the Apache License Version 2.0, and
# you may not use this file except in compliance with the Apache License
# Version 2.0. You may obtain a copy of the Apache License Version 2.0 at
# http://www.apache.org/licenses/LICENSE-2.0.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the Apache License Version 2.0 is distributed on an "AS
# IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the Apache License Version 2.0 for the specific language
# governing permissions and limitations there under.
# This file (application.conf.example) contains a template with
# configuration options for Stream Enrich.
enrich {
streams {
in {
# Stream/topic where the raw events to be enriched are located
raw = stream-${AWS_REGION}-${PRODUCTION_ENV}-collector-to-enricher
}
out {
# Stream/topic where the events that were successfully enriched will end up
enriched = stream-${AWS_REGION}-${PRODUCTION_ENV}-enricher-to-s3
# Stream/topic where the event that failed enrichment will be stored
bad = stream-${AWS_REGION}-${PRODUCTION_ENV}-bad-data-to-s3
# Stream/topic where the pii tranformation events will end up
# pii = "" # {{outPii}}
# How the output stream/topic will be partitioned.
# Possible partition keys are: event_id, event_fingerprint, domain_userid, network_userid,
# user_ipaddress, domain_sessionid, user_fingerprint.
# Refer to https://github.com/snowplow/snowplow/wiki/canonical-event-model to know what the
# possible parittion keys correspond to.
# Otherwise, the partition key will be a random UUID.
# Note: Nsq does not make use of partition key.
partitionKey = "user_id"
}
# Configuration shown is for Kafka, to use another uncomment the appropriate configuration
# and comment out the other
# To use stdin, comment or remove everything in the "enrich.streams.sourceSink" section except
# "enabled" which should be set to "stdin".
sourceSink {
# Sources / sinks currently supported are:
# 'kinesis' for reading Thrift-serialized records and writing enriched and bad events to a
# Kinesis stream
# 'googlepubsub' for reading / writing to a Google PubSub topic
# 'kafka' for reading / writing to a Kafka topic
# 'nsq' for reading / writing to a Nsq topic
# 'stdin' for reading from stdin and writing to stdout and stderr
enabled = kinesis
# Region where the streams are located (AWS region, pertinent to kinesis sink/source type)
region = ${AWS_REGION}
## Optional endpoint url configuration to override aws kinesis endpoints,
## this can be used to specify local endpoints when using localstack
# customEndpoint = {{kinesisEndpoint}}
# AWS credentials
# If both are set to 'default', use the default AWS credentials provider chain.
# If both are set to 'iam', use AWS IAM Roles to provision credentials.
# If both are set to 'env', use env variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
aws {
accessKey = default
secretKey = default
}
# Maximum number of records to get from Kinesis per call to GetRecords
maxRecords = 10000
# LATEST: most recent data.
# TRIM_HORIZON: oldest available data.
# "AT_TIMESTAMP": Start from the record at or after the specified timestamp
# Note: This only effects the first run of this application on a stream.
# (pertinent to kinesis source type)
initialPosition = TRIM_HORIZON
# Need to be specified when initial-position is "AT_TIMESTAMP".
# Timestamp format need to be in "yyyy-MM-ddTHH:mm:ssZ".
# Ex: "2017-05-17T10:00:00Z"
# Note: Time need to specified in UTC.
initialTimestamp = "2018-08-27T10:00:00Z"
# Minimum and maximum backoff periods, in milliseconds
backoffPolicy {
minBackoff = 1000
maxBackoff = 3000
}
# Or Google PubSub
#googleProjectId = my-project-id
## Size of the subscriber thread pool
#threadPoolSize = 4
## Minimum, maximum and total backoff periods, in milliseconds
## and multiplier between two backoffs
#backoffPolicy {
# minBackoff = {{enrichStreamsOutMinBackoff}}
# maxBackoff = {{enrichStreamsOutMaxBackoff}}
# totalBackoff = {{enrichStreamsOutTotalBackoff}} # must be >= 10000
# multiplier = {{enrichStreamsOutTotalBackoff}}
#}
# Or Kafka (Comment out for other types)
# brokers = "{{kafkaBrokers}}"
# Number of retries to perform before giving up on sending a record
# retries = 0
# Or NSQ
## Channel name for nsq source
## If more than one application is reading from the same NSQ topic at the same time,
## all of them must have the same channel name
#rawChannel = "{{nsqSourceChannelName}}"
## Host name for nsqd
#host = "{{nsqHost}}"
## TCP port for nsqd, 4150 by default
#port = {{nsqdPort}}
## Host name for lookupd
#lookupHost = "{{lookupHost}}"
## HTTP port for nsqlookupd, 4161 by default
#lookupPort = {{nsqlookupdPort}}
}
# After enrichment, events are accumulated in a buffer before being sent to Kinesis/Kafka.
# Note: Buffering is not supported by NSQ.
# The buffer is emptied whenever:
# - the number of stored records reaches recordLimit or
# - the combined size of the stored records reaches byteLimit or
# - the time in milliseconds since it was last emptied exceeds timeLimit when
# a new event enters the buffer
buffer {
byteLimit: 4000000
recordLimit: 500 # Not supported by Kafka; will be ignored
timeLimit: 5000
}
# Used for a DynamoDB table to maintain stream state.
# Used as the Kafka consumer group ID.
# Used as the Google PubSub subscription name.
appName = kinesis-${AWS_REGION}-${PRODUCTION_ENV}-enricher-db
}
# Optional section for tracking endpoints
#monitoring {
# snowplow {
# collectorUri = "{{collectorUri}}"
# collectorPort = 80
# appId = {{enrichAppName}}
# method = GET
# }
#}
}
s3loader.conf
# Prod configuration for PIV s3 Loader
# Sources currently supported are:
# 'kinesis' for reading records from a Kinesis stream
# 'nsq' for reading records from a NSQ topic
source = "kinesis"
# Sink is used for sending events which processing failed.
# Sinks currently supported are:
# 'kinesis' for writing records to a Kinesis stream
# 'nsq' for writing records to a NSQ topic
sink = "kinesis"
# The following are used to authenticate for the Amazon Kinesis sink.
# If both are set to 'default', the default provider chain is used
# (see http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)
# If both are set to 'iam', use AWS IAM Roles to provision credentials.
# If both are set to 'env', use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
aws {
accessKey = "default"
secretKey = "default"
}
# Config for NSQ
nsq {
channelName = "{{dummy}}"
host = "{{nsqHost}}"
# TCP port for nsqd, 4150 by default
port = 4150
# Host name for lookupd
lookupHost = "{{lookupHost}}"
# HTTP port for nsqlookupd, 4161 by default
lookupPort = 4161
}
kinesis {
initialPosition = LATEST
initialTimestamp = AT_TIMESTAMP
# Maximum number of records to read per GetRecords call
# Chosen per Josh recommendation at:
# https://discourse.snowplow.io/t/350k-rpm-of-throughput-with-stream-collector-kinesis/103
maxRecords = 500
region = ${AWS_REGION}
# "appName" is used for a DynamoDB table to maintain stream state.
appName = kinesis-${AWS_REGION}-${PRODUCTION_ENV}-s3loader-db
}
streams {
# Input stream name
inStreamName = stream-${AWS_REGION}-${PRODUCTION_ENV}-enricher-to-s3
# Stream for events for which the storage process fails
outStreamName = stream-${AWS_REGION}-${PRODUCTION_ENV}-bad-data-to-s3
# Events are accumulated in a buffer before being sent to S3.
# The buffer is emptied whenever:
# - the combined size of the stored records exceeds byteLimit or
# - the number of stored records exceeds recordLimit or
# - the time in milliseconds since it was last emptied exceeds timeLimit
buffer {
byteLimit = 45000 # Not supported by NSQ; will be ignored
recordLimit = 200
timeLimit = 10000 # Not supported by NSQ; will be ignored
}
}
s3 {
region = ${AWS_REGION}
bucket = piv-data-${AWS_REGION}-${PRODUCTION_ENV}-good/enriched
# Format is one of lzo or gzip
# Note, that you can use gzip only for enriched data stream.
format = "gzip"
# Maximum Timeout that the application is allowed to fail for
maxTimeout = 1
}
# Optional section for tracking endpoints
#monitoring {
# snowplow{
# collectorUri = "{{collectorUri}}"
# collectorPort = 80
# appId = "{{appName}}"
# method = "{{method}}"
# }
#}
logging {
level: "INFO"
}
rs-load
{
"schema": "iglu:com.snowplowanalytics.dataflowrunner/PlaybookConfig/avro/1-0-1",
"data": {
"region": "{{systemEnv "AWS_REGION"}}",
"credentials": {
"accessKeyId": "{{systemEnv "AWS_ACCESS_KEY_ID"}}",
"secretAccessKey": "{{systemEnv "AWS_SECRET_ACCESS_KEY"}}"
},
"steps": [
{
"type": "CUSTOM_JAR",
"name": "S3DistCp Step: Enriched events -> staging S3",
"actionOnFailure": "CANCEL_AND_WAIT",
"jar": "/usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar",
"arguments": [
"--src","s3n://piv-data-{{systemEnv "AWS_REGION"}}-{{systemEnv "PRODUCTION_ENV"}}-good/enriched/",
"--dest","s3n://piv-data-{{systemEnv "AWS_REGION"}}-{{systemEnv "PRODUCTION_ENV"}}-good/staging/run={{timeWithFormat "1540322909" "2006-01-02-15-04-05"}}/",
"--deleteOnSuccess"
]
},
{
"type": "CUSTOM_JAR",
"name": "rdb shred step",
"actionOnFailure": "CANCEL_AND_WAIT",
"jar": "command-runner.jar",
"arguments": [
"spark-submit",
"--class", "com.snowplowanalytics.snowplow.storage.spark.ShredJob",
"--master", "yarn",
"--deploy-mode", "cluster",
"s3://snowplow-hosted-assets/4-storage/rdb-shredder/snowplow-rdb-shredder-0.14.0.jar",
"--iglu-config",
"{{base64File "/root/dataflow-runner_dir/configs/resolver.json"}}",
"--input-folder",
"s3n://piv-data-{{systemEnv "AWS_REGION"}}-{{systemEnv "PRODUCTION_ENV"}}-good/staging/run={{timeWithFormat "1540322909" "2006-01-02-15-04-05"}}/",
"--output-folder",
"s3n://piv-data-{{systemEnv "AWS_REGION"}}-{{systemEnv "PRODUCTION_ENV"}}-good/shredded/good/run={{timeWithFormat "1540322909" "2006-01-02-15-04-05"}}/",
"--bad-folder",
"s3n://piv-data-{{systemEnv "AWS_REGION"}}-{{systemEnv "PRODUCTION_ENV"}}-good/shredded/bad/run={{timeWithFormat "1540322909" "2006-01-02-15-04-05"}}/"
]
},
{
"type": "CUSTOM_JAR",
"name": "rdb load step",
"actionOnFailure": "CANCEL_AND_WAIT",
"jar": "s3://snowplow-hosted-assets/4-storage/rdb-loader/snowplow-rdb-loader-0.14.0.jar",
"arguments": [
"--config",
"{{base64File "/root/dataflow-runner_dir/configs/emr.yml"}}",
"--target",
"{{base64File "/root/dataflow-runner_dir/configs/targets/redshift.conf"}}",
"--resolver",
"{{base64File "/root/dataflow-runner_dir/configs/resolver.json"}}",
"--folder",
"s3n://piv-data-{{systemEnv "AWS_REGION"}}-{{systemEnv "PRODUCTION_ENV"}}-good/shredded/good/run={{timeWithFormat "1540322909" "2006-01-02-15-04-05"}}/",
"--logkey",
"s3n://piv-data-{{systemEnv "AWS_REGION"}}-{{systemEnv "PRODUCTION_ENV"}}-good/log/rdb-loader-{{timeWithFormat "1540322909" "2006-01-02-15-04-05"}}",
"--skip",
"analyze"
]
}
],
"tags": [
]
}
}