I had setup snowplow in AWS EKS pods and kinesis streams where used. Issue I am facing while doing a loadtest is S3loader tooks more time to load data to s3.
Collector Config
collector {
interface = "0.0.0.0"
port = 8080
paths {
"/com.acme/track" = "/com.snowplowanalytics.snowplow/tp2"
}
doNotTrackCookie {
enabled = false
#enabled = ${?COLLECTOR_DO_NOT_TRACK_COOKIE_ENABLED}
# name = {{doNotTrackCookieName}}
name = collector-do-not-track-cookie
# value = {{doNotTrackCookieValue}}
value = collector-do-not-track-cookie-value
}
streams {
good = ${collector_good}
bad = ${collector_bad}
sink {
enabled = "kinesis"
threadPoolSize = 10
region = ${envregion}
aws {
accessKey = default
secretKey = default
}
backoffPolicy {
minBackoff = 3000
maxBackoff = 600000
}
}
buffer {
byteLimit = 2097152
recordLimit = 500
timeLimit = 5000
}
}
telemetry {
disable = true
}
}
akka {
loglevel = WARNING
loggers = ["akka.event.slf4j.Slf4jLogger"]
http.server {
remote-address-header = on
raw-request-uri-header = on
parsing {
max-uri-length = 32768
uri-parsing-mode = relaxed
illegal-header-warnings = off
}
max-connections = 2048
}
coordinated-shutdown {
run-by-jvm-shutdown-hook = off
}
}
Enricher Config
{
"input": {
"type": "Kinesis"
# Optional. Name of the application which the KCL daemon should assume
"appName": "snowplow-enrich-kinesis"
# Name of the Kinesis stream to read from
"streamName": ${collector_good}
# Optional. Region where the Kinesis stream is located
# This field is optional if it can be resolved with AWS region provider chain.
# It checks places like env variables, system properties, AWS profile file.
# https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/regions/providers/DefaultAwsRegionProviderChain.html
"region": ${envregion}
# Optional, set the initial position to consume the Kinesis stream
# Must be TRIM_HORIZON, LATEST or AT_TIMESTAMP
# LATEST: most recent data.
# TRIM_HORIZON: oldest available data.
# AT_TIMESTAMP: start from the record at or after the specified timestamp
"initialPosition": {
"type": "TRIM_HORIZON"
}
# "initialPosition": {
# "type": "AT_TIMESTAMP"
# "timestamp": "2020-07-17T10:00:00Z" # Required for AT_TIMESTAMP
# }
# Optional, set the mode for retrieving records.
"retrievalMode": {
"type": "Polling"
# Maximum size of a batch returned by a call to getRecords.
# Records are checkpointed after a batch has been fully processed,
# thus the smaller maxRecords, the more often records can be checkpointed
# into DynamoDb, but possibly reducing the throughput.
"maxRecords": 10000
}
# "retrievalMode": {
# "type": "FanOut"
# }
# Optional. Size of the internal buffer used when reading messages from Kinesis,
# each buffer holding up to maxRecords from above
"bufferSize": 3
# Optional. Settings for backoff policy for checkpointing.
# Records are checkpointed after all the records of the same chunk have been enriched
"checkpointBackoff": {
"minBackoff": 100 milliseconds
"maxBackoff": 10 seconds
"maxRetries": 10
}
# Optional, endpoint url configuration to override aws kinesis endpoints
# Can be used to specify local endpoint when using localstack
# "customEndpoint": "http://localhost:4566"
# Optional, endpoint url configuration to override aws dyanomdb endpoint for Kinesis checkpoints lease table
# Can be used to specify local endpoint when using localstack
#"dynamodbCustomEndpoint": "http://dynamodb:us-east-1:180648733583:table/snowplow-enrich-kinesis:8000"
# Optional, endpoint url configuration to override aws cloudwatch endpoint for metrics
# Can be used to specify local endpoint when using localstack
# "cloudwatchCustomEndpoint": "http://localhost:4582"
}
"output": {
"good": {
"streamName": ${enrich_good}
# Optional. Limits the number of events in a single PutRecords request.
# Several requests are made in parallel
# Maximum allowed: 500
"recordLimit": 500
# Optional. Limits the number of bytes in a single PutRecords request,
# including records and partition keys.
# Several requests are made in parallel
# Maximum allowed: 5 MB
"byteLimit": 5242880
}
"bad": {
"streamName": ${enrich_bad}
"recordLimit": 500
}
}
}
S3loader config
{
# Optional, but recommended
"region": ${envregion},
# Options are: RAW, ENRICHED_EVENTS, JSON
# RAW simply sinks data 1:1
# ENRICHED_EVENTS work with monitoring.statsd to report metrics (identical to RAW otherwise)
# SELF_DESCRIBING partitions self-describing data (such as JSON) by its schema
"purpose": "ENRICHED_EVENTS",
# Input Stream config
"input": {
# Kinesis Client Lib app name (corresponds to DynamoDB table name)
"appName": "snowplow-s3-loader-kinesis",
# Kinesis stream name
"streamName": ${enrich_good},
# Options are: LATEST, TRIM_HORIZON, AT_TIMESTAMP
"position": "LATEST",
# Max batch size to pull from Kinesis
"maxRecords": 10000
},
"output": {
"s3": {
# Full path to output data
"path": ${s3path},
# Partitioning format; Optional
# Valid substitutions are {vendor}, {schema}, {format}, {model} for self-describing jsons
# and {yy}, {mm}, {dd}, {hh} for year, month, day, hour
#partitionFormat: "date={yy}-{mm}-{dd}"
# Prefix for all file names; Optional
"filenamePrefix": ${prefix},
# Maximum Timeout that the application is allowed to fail for, e.g. in case of S3 outage
"maxTimeout": 2000,
# Output format; Options: GZIP, LZO
"compression": "GZIP"
},
# Kinesis Stream to output failures
"bad": {
"streamName": ${enrich_bad}
}
},
# Flush control. A first limit the KCL worker hits will trigger flushing
"buffer": {
# Maximum bytes to read before flushing
"byteLimit": 4096,
# Maximum records to read before flushing
"recordLimit": 500,
# Maximum time between flushes
"timeLimit": 5000
}
}