Hi Team,
We are using snowplow kafka enricher for our snowplow pipeline. The snowplow enricher is not able to perform and lagging from the collector offsets from a huge margin.
Below are the enricher config we are using.
# Copyright (c) 2013-2020 Snowplow Analytics Ltd. All rights reserved.
#
# This program is licensed to you under the Apache License Version 2.0, and
# you may not use this file except in compliance with the Apache License
# Version 2.0. You may obtain a copy of the Apache License Version 2.0 at
# http://www.apache.org/licenses/LICENSE-2.0.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the Apache License Version 2.0 is distributed on an "AS
# IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the Apache License Version 2.0 for the specific language
# governing permissions and limitations there under.
# This file (application.conf.example) contains a template with
# configuration options for Stream Enrich.
enrich {
streams {
in {
# Stream/topic where the raw events to be enriched are located
raw = "collector-good"
}
out {
enriched = "enrich-good"
bad = "enrich-bad"
pii = "enrich-pii"
partitionKey = ""
}
# Configuration shown is for Kafka, to use another uncomment the appropriate configuration
# and comment out the other
# To use stdin, comment or remove everything in the "enrich.streams.sourceSink" section except
# "enabled" which should be set to "stdin".
sourceSink {
# Sources / sinks currently supported are:
# 'kinesis' for reading Thrift-serialized records and writing enriched and bad events to a
# Kinesis stream
# 'kafka' for reading / writing to a Kafka topic
# 'nsq' for reading / writing to a Nsq topic
# 'stdin' for reading from stdin and writing to stdout and stderr
enabled = "kafka"
# LATEST: most recent data.
# TRIM_HORIZON: oldest available data.
# "AT_TIMESTAMP": Start from the record at or after the specified timestamp
# Note: This only effects the first run of this application on a stream.
# (pertinent to kinesis source type)
# initialPosition = TRIM_HORIZON
# initialPosition = ${?ENRICH_STREAMS_SOURCE_SINK_INITIAL_POSITION}
# Need to be specified when initial-position is "AT_TIMESTAMP".
# Timestamp format need to be in "yyyy-MM-ddTHH:mm:ssZ".
# Ex: "2017-05-17T10:00:00Z"
# Note: Time need to specified in UTC.
initialTimestamp = "{{initialTimestamp}}"
initialTimestamp = ${?ENRICH_STREAMS_SOURCE_SINK_INITIAL_TIMESTAMP}
# Minimum and maximum backoff periods, in milliseconds
backoffPolicy {
minBackoff = 1000
maxBackoff = 10000
}
# Or Kafka (Comment out for other types)
#brokers = "localhost:9092"
brokers = ""
# Number of retries to perform before giving up on sending a record
retries = 0
# The kafka producer has a variety of possible configuration options defined at
# https://kafka.apache.org/documentation/#producerconfigs
# Some values are set to other values from this config by default:
# "bootstrap.servers" -> brokers
# retries -> retries
# "buffer.memory" -> buffer.byteLimit
# "linger.ms" -> buffer.timeLimit
producerConf {
acks = all
"key.serializer" = "org.apache.kafka.common.serialization.StringSerializer"
"value.serializer" = "org.apache.kafka.common.serialization.StringSerializer"
#"batch.size"="100000"
"buffer.memory"="18485760"
}
# The kafka consumer has a variety of possible configuration options defined at
# https://kafka.apache.org/documentation/#consumerconfigs
# Some values are set to other values from this config by default:
# "bootstrap.servers" -> brokers
# "group.id" -> appName
consumerConf {
# "enable.auto.commit" = true
# "auto.commit.interval.ms" = 1000
# "auto.offset.reset" = earliest
# "session.timeout.ms" = 30000
# "key.deserializer" = "org.apache.kafka.common.serialization.StringDeserializer"
# "value.deserializer" = "org.apache.kafka.common.serialization.StringDeserializer"
#"fetch.min.bytes"="1042880"
#"fetch.max.bytes"="12428800"
#"fetch.max.wait.ms"="30000"
"request.timeout.ms"="60000"
#"auto.commit.interval.ms"="5000"
#"max.partition.fetch.bytes"="2042880"
#"max.poll.records"="10000"
#"session.timeout.ms"="60000"
}
}
# After enrichment, events are accumulated in a buffer before being sent to Kinesis/Kafka.
# Note: Buffering is not supported by NSQ.
# The buffer is emptied whenever:
# - the number of stored records reaches recordLimit or
# - the combined size of the stored records reaches byteLimit or
# - the time in milliseconds since it was last emptied exceeds timeLimit when
# a new event enters the buffer
buffer {
byteLimit = 12428800
recordLimit = 120000
timeLimit = 1000
}
# Used for a DynamoDB table to maintain stream state.
# Used as the Kafka consumer group ID.
# Used as the Google PubSub subscription name.
appName = "enricher-test"
}
}
The collector-good topic is having 15 partitions and enricher-good topic as well having 15 partitions both are having replication factor of 2.
The Kafka cluster is 3 node cluster with configuration of 4CPU and 10GB RAM.
Also, we are receiving approximately 10,000 events/second and enricher is running on autoscaling mode of 5min instances to 30max instances.
If someone can help me on this that would be really appreciated as all our pipelines are dependent on snowplow heavily.
Thanks
Karan