We are using the snowplow kafka collector/enricher, the collector doesn’t have any issues to follow traffic even when we have peaks but the enricher does. We run the enricher on ECS with several tasks (20 for the moment). When we get higher traffic we scale automatically the number of tasks running until 50, but it seems like the CPU is not used as it should because it never raises above 30%, are there any tips to make every enricher tasks more efficient/better throughput?
The topic its reading from has 200 partitions and the topic its writing to has 20, this could indeed partly cause the bottleneck.
Since the enricher reads from kafka and writes to Kafka I’m not really sure how the downstream applications could have any impact on the enricher lag ?
No in the logs nothing can be found.
Enrich application is the following: snowplow/stream-enrich-kafka:3.7.1
We have the following enrichments: campaign_attribution, ip_lookup, referer_parser, yauaa.
# Copyright (c) 2013-2020 Snowplow Analytics Ltd. All rights reserved.
#
# This program is licensed to you under the Apache License Version 2.0, and
# you may not use this file except in compliance with the Apache License
# Version 2.0. You may obtain a copy of the Apache License Version 2.0 at
# http://www.apache.org/licenses/LICENSE-2.0.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the Apache License Version 2.0 is distributed on an "AS
# IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the Apache License Version 2.0 for the specific language
# governing permissions and limitations there under.
# This file (application.conf.example) contains a template with
# configuration options for Stream Enrich.
ENRICH_STREAMS_SINK_MIN_BACKOFF=40
ENRICH_STREAMS_SINK_MAX_BACKOFF=200
ENRICH_STREAMS_BUFFER_BYTE_LIMIT=10000
ENRICH_STREAMS_BUFFER_RECORD_LIMIT=1000
ENRICH_STREAMS_BUFFER_TIME_LIMIT=2000
enrich {
streams {
in {
# Stream/topic where the raw events to be enriched are located
raw = "good_collector"
}
out {
# Stream/topic where the events that were successfully enriched will end up
enriched = "good_enriched"
# Stream/topic where the event that failed enrichment will be stored
bad = "bad_enriched"
# Stream/topic where the pii tranformation events will end up
#pii = "datariver-pii-web-enricher-kinesis-stream-nonprod"
#pii = ${?ENRICH_STREAMS_OUT_PII}
# How the output stream/topic will be partitioned.
# Possible partition keys are: event_id, event_fingerprint, domain_userid, network_userid,
# user_ipaddress, domain_sessionid, user_fingerprint.
# Refer to https://github.com/snowplow/snowplow/wiki/canonical-event-model to know what the
# possible parittion keys correspond to.
# Otherwise, the partition key will be a random UUID.
# Note: Nsq does not make use of partition key.
partitionKey = "event_id"
#partitionKey = ${?ENRICH_STREAMS_OUT_PARTITION_KEY}
}
# Configuration shown is for Kafka, to use another uncomment the appropriate configuration
# and comment out the other
# To use stdin, comment or remove everything in the "enrich.streams.sourceSink" section except
# "enabled" which should be set to "stdin".
sourceSink {
# Sources / sinks currently supported are:
# 'kinesis' for reading Thrift-serialized records and writing enriched and bad events to a
# Kinesis stream
# 'kafka' for reading / writing to a Kafka topic
# 'nsq' for reading / writing to a Nsq topic
# 'stdin' for reading from stdin and writing to stdout and stderr
enabled = "kafka"
# Region where the streams are located (AWS region, pertinent to kinesis sink/source type)
# region = ${?ENRICH_STREAMS_SOURCE_SINK_REGION}
backoffPolicy {
minBackoff = 40
maxBackoff = 200
}
aws {
accessKey = default
secretKey = default
}
# Or Kafka (Comment out for other types)
brokers = ${?BROKERS}
# Number of retries to perform before giving up on sending a record
retries = 0
# The kafka producer has a variety of possible configuration options defined at
# https://kafka.apache.org/documentation/#producerconfigs
# Some values are set to other values from this config by default:
# "bootstrap.servers" -> brokers
# retries -> retries
# "buffer.memory" -> buffer.byteLimit
# "linger.ms" -> buffer.timeLimit
producerConf {
"key.serializer" = "org.apache.kafka.common.serialization.StringSerializer"
"value.serializer" = "org.apache.kafka.common.serialization.StringSerializer"
"sasl.client.callback.handler.class" = "software.amazon.msk.auth.iam.IAMClientCallbackHandler"
"sasl.mechanism" = "AWS_MSK_IAM"
"sasl.jaas.config" = "software.amazon.msk.auth.iam.IAMLoginModule required;"
"security.protocol" = "SASL_SSL"
"buffer.memory" = 10000000
}
# The kafka consumer has a variety of possible configuration options defined at
# https://kafka.apache.org/documentation/#consumerconfigs
# Some values are set to other values from this config by default:
# "bootstrap.servers" -> brokers
# "group.id" -> appName
consumerConf {
"enable.auto.commit" = true
"auto.commit.interval.ms" = 1000
"auto.offset.reset" = earliest
"session.timeout.ms" = 30000
"buffer.memory" = 10000000
"key.deserializer" = "org.apache.kafka.common.serialization.StringDeserializer"
"value.deserializer" = "org.apache.kafka.common.serialization.ByteArrayDeserializer"
"sasl.client.callback.handler.class" = "software.amazon.msk.auth.iam.IAMClientCallbackHandler"
"sasl.mechanism" = "AWS_MSK_IAM"
"sasl.jaas.config" = "software.amazon.msk.auth.iam.IAMLoginModule required;"
"security.protocol" = "SASL_SSL"
}
}
# After enrichment, events are accumulated in a buffer before being sent to Kinesis/Kafka.
# Note: Buffering is not supported by NSQ.
# The buffer is emptied whenever:
# - the number of stored records reaches recordLimit or
# - the combined size of the stored records reaches byteLimit or
# - the time in milliseconds since it was last emptied exceeds timeLimit when
# a new event enters the buffer
buffer {
byteLimit = 1000000
recordLimit = 450
timeLimit = 3000
}
# Used for a DynamoDB table to maintain stream state.
# Used as the Kafka consumer group ID.
# Used as the Google PubSub subscription name.
appName = "enricher-kafka"
#appName = ${?ENRICH_STREAMS_APP_NAME}
}
}
So the most immediate thing that pops out at me is the disparity between the partitions you are reading from and the number you are writing out into.
This feels like a natural bottleneck that you will hit during peak periods where you simply cannot write data out fast enough. We don’t use the MSK internally but are there any metrics to show “write” saturation in the good_enriched topic?
Indeed by looking at this graph we can see that the writing to the good_enriched always caps even during peak hours, but since we added more partitions to the good_enriched we see that we were able to go above that cap (Last part of the graph), so thank you for the help already, I also read that the “acks=1” setting could maybe help improve the throught put since it only waits for the master of the partition to acknowledge the message, could you confirm that ?
That version of Stream Enrich Kafka is sub-optimal as it does not allow for multi-threading so has poor CPU usage patterns - the solution is, largely, to move to the latest version of Enrich Kafka which has solved these problems.