There is 2 EC2 instances that are used as collector and enricher and used Kinesis. We are using JS tracker for events.
Configs of collector and enricher are looks like this:
collector {
# The collector runs as a web service specified on the following interface and port.
interface = "0.0.0.0"
port = "8080"
# Configure the P3P policy header.
p3p {
policyRef = "/w3c/p3p.xml"
CP = "NOI DSP COR NID PSA OUR IND COM NAV STA"
}
# Cross domain policy configuration.
# If "enabled" is set to "false", the collector will respond with a 404 to the /crossdomain.xml
# route.
crossDomain {
enabled = true
# Domain that is granted access, *.acme.com will match http://acme.com and http://sub.acme.com
domain = "*.ourdomain.com"
# Whether to only grant access to HTTPS or both HTTPS and HTTP sources
secure = false
}
# The collector returns a cookie to clients for user identification
# with the following domain and expiration.
cookie {
enabled = true
expiration = "365 days" # e.g. "365 days"
# Network cookie name
name = "_some_name"
# The domain is optional and will make the cookie accessible to other
# applications on the domain. Comment out this line to tie cookies to
# the collector's full domain
# domain = "*.ourdomain.com"
}
# When enabled and the cookie specified above is missing, performs a redirect to itself to check
# if third-party cookies are blocked using the specified name. If they are indeed blocked,
# fallbackNetworkId is used instead of generating a new random one.
cookieBounce {
enabled = false
# The name of the request parameter which will be used on redirects checking that third-party
# cookies work.
name = "n3pc"
# Network user id to fallback to when third-party cookies are blocked.
fallbackNetworkUserId = "00000000-0000-4000-A000-000000000000"
# Optionally, specify the name of the header containing the originating protocol for use in the
# bounce redirect location. Use this if behind a load balancer that performs SSL termination.
# The value of this header must be http or https. Example, if behind an AWS Classic ELB.
forwardedProtocolHeader = "X-Forwarded-Proto"
}
# When enabled, the redirect url passed via the `u` query parameter is scanned for a placeholder
# token. All instances of that token are replaced withe the network ID. If the placeholder isn't
# specified, the default value is `${SP_NUID}`.
redirectMacro {
enabled = false
# Optional custom placeholder token (defaults to the literal `${SP_NUID}`)
placeholder = "[TOKEN]"
}
streams {
# Events which have successfully been collected will be stored in the good stream/topic
good = "snowplow-events-stream"
# Events that are too big (w.r.t Kinesis 1MB limit) will be stored in the bad stream/topic
bad = "snowplow-events-bad-stream"
# Whether to use the incoming event's ip as the partition key for the good stream/topic
# Note: Nsq does not make use of partition key.
useIpAddressAsPartitionKey = false
# Enable the chosen sink by uncommenting the appropriate configuration
sink {
# Choose between kinesis, googlepubsub, kafka, nsq, or stdout.
# To use stdout, comment or remove everything in the "collector.streams.sink" section except
# "enabled" which should be set to "stdout".
enabled = kinesis
# Region where the streams are located
region = "us-east-1"
# Thread pool size for Kinesis API requests
threadPoolSize = 10
# The following are used to authenticate for the Amazon Kinesis sink.
# If both are set to 'default', the default provider chain is used
# (see http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)
# If both are set to 'iam', use AWS IAM Roles to provision credentials.
# If both are set to 'env', use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
aws {
accessKey = env
secretKey = env
}
# Minimum and maximum backoff periods, in milliseconds
backoffPolicy {
minBackoff = 3000
maxBackoff = 600000
}
}
# Incoming events are stored in a buffer before being sent to Kinesis/Kafka.
# Note: Buffering is not supported by NSQ.
# The buffer is emptied whenever:
# - the number of stored records reaches record-limit or
# - the combined size of the stored records reaches byte-limit or
# - the time in milliseconds since the buffer was last emptied reaches time-limit
buffer {
byteLimit = 4500000
recordLimit = 1000 # Not supported by Kafka; will be ignored
timeLimit = 60000
}
}
}
# Akka has a variety of possible configuration options defined at
# http://doc.akka.io/docs/akka/current/scala/general/configuration.html
akka {
loglevel = DEBUG # 'OFF' for no logging, 'DEBUG' for all logging.
loggers = ["akka.event.slf4j.Slf4jLogger"]
# akka-http is the server the Stream collector uses and has configurable options defined at
# http://doc.akka.io/docs/akka-http/current/scala/http/configuration.html
http.server {
# To obtain the hostname in the collector, the 'remote-address' header
# should be set. By default, this is disabled, and enabling it
# adds the 'Remote-Address' header to every request automatically.
remote-address-header = on
raw-request-uri-header = on
# Define the maximum request length (the default is 2048)
parsing {
max-uri-length = 32768
uri-parsing-mode = relaxed
}
}
}
Enricher:
enrich {
streams {
in {
# Stream/topic where the raw events to be enriched are located
raw = "snowplow-events-stream"
}
out {
# Stream/topic where the events that were successfully enriched will end up
enriched = "snowplow-events-stream-enriched-good"
# Stream/topic where the event that failed enrichment will be stored
bad = "snowplow-events-stream-enriched-bad"
# How the output stream/topic will be partitioned.
# Possible partition keys are: event_id, event_fingerprint, domain_userid, network_userid,
# user_ipaddress, domain_sessionid, user_fingerprint.
# Refer to https://github.com/snowplow/snowplow/wiki/canonical-event-model to know what the
# possible parittion keys correspond to.
# Otherwise, the partition key will be a random UUID.
# Note: Nsq does not make use of partition key.
partitionKey = "event_id"
}
# Configuration shown is for Kafka, to use another uncomment the appropriate configuration
# and comment out the other
# To use stdin, comment or remove everything in the "enrich.streams.sourceSink" section except
# "enabled" which should be set to "stdin".
sourceSink {
# Sources / sinks currently supported are:
# 'kinesis' for reading Thrift-serialized records and writing enriched and bad events to a
# Kinesis stream
# 'googlepubsub' for reading / writing to a Google PubSub topic
# 'kafka' for reading / writing to a Kafka topic
# 'nsq' for reading / writing to a Nsq topic
# 'stdin' for reading from stdin and writing to stdout and stderr
type = kinesis
enabled = kinesis
# Region where the streams are located (AWS region, pertinent to kinesis sink/source type)
region = "us-east-1"
# AWS credentials (pertinent to kinesis sink/source type)
# If both are set to 'default', use the default AWS credentials provider chain.
# If both are set to 'iam', use AWS IAM Roles to provision credentials.
# If both are set to 'env', use env variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
aws {
accessKey = iam
secretKey = iam
}
# Maximum number of records to get from Kinesis per call to GetRecords
maxRecords = 10000
# LATEST: most recent data.
# TRIM_HORIZON: oldest available data.
# "AT_TIMESTAMP": Start from the record at or after the specified timestamp
# Note: This only effects the first run of this application on a stream.
# (pertinent to kinesis source type)
initialPosition = TRIM_HORIZON
# Need to be specified when initial-position is "AT_TIMESTAMP".
# Timestamp format need to be in "yyyy-MM-ddTHH:mm:ssZ".
# Ex: "2017-05-17T10:00:00Z"
# Note: Time need to specified in UTC.
initialTimestamp ="2018-08-01T10:00:00Z"
# Minimum and maximum backoff periods, in milliseconds
backoffPolicy {
minBackoff = 3000
maxBackoff = 600000
}
# Or Google PubSub
#googleProjectId = my-project-id
## Size of the subscriber thread pool
#threadPoolSize = 4
## Minimum, maximum and total backoff periods, in milliseconds
## and multiplier between two backoffs
#backoffPolicy {
# minBackoff = {{enrichStreamsOutMinBackoff}}
# maxBackoff = {{enrichStreamsOutMaxBackoff}}
# totalBackoff = {{enrichStreamsOutTotalBackoff}} # must be >= 10000
# multiplier = {{enrichStreamsOutTotalBackoff}}
#}
# Or Kafka (Comment out for other types)
brokers = "{{kafkaBrokers}}"
# Number of retries to perform before giving up on sending a record
retries = 0
# Or NSQ
## Channel name for nsq source
## If more than one application is reading from the same NSQ topic at the same time,
## all of them must have the same channel name
#rawChannel = "{{nsqSourceChannelName}}"
## Host name for nsqd
#host = "{{nsqHost}}"
## TCP port for nsqd, 4150 by default
#port = {{nsqdPort}}
## Host name for lookupd
#lookupHost = "{{lookupHost}}"
## HTTP port for nsqlookupd, 4161 by default
#lookupPort = {{nsqlookupdPort}}
}
# After enrichment, events are accumulated in a buffer before being sent to Kinesis/Kafka.
# Note: Buffering is not supported by NSQ.
# The buffer is emptied whenever:
# - the number of stored records reaches recordLimit or
# - the combined size of the stored records reaches byteLimit or
# - the time in milliseconds since it was last emptied exceeds timeLimit when
# a new event enters the buffer
buffer {
byteLimit = 45000
recordLimit = 200 # Not supported by Kafka; will be ignored
timeLimit = 10000
}
# Used for a DynamoDB table to maintain stream state.
# Used as the Kafka consumer group ID.
# Used as the Google PubSub subscription name.
appName = "snowplow-env"
}
}