Enricher CPU utilization

Hello,

We are using the snowplow kafka collector/enricher, the collector doesn’t have any issues to follow traffic even when we have peaks but the enricher does. We run the enricher on ECS with several tasks (20 for the moment). When we get higher traffic we scale automatically the number of tasks running until 50, but it seems like the CPU is not used as it should because it never raises above 30%, are there any tips to make every enricher tasks more efficient/better throughput?
Screenshot from 2024-06-18 11-04-34


Thank you for the help.

1 Like

Hi @Alexandre5602 a few questions to help figure this out.

  1. How many partitions does the topic you are consuming from have?
  2. Have you checked for any downstream bottlenecks?
  3. Anything showing up in the logs to give any clues as to performance?
  4. Version of the application itself that you are using?
  5. Which Enrichments do you have enabled?
  6. Can you share the full config file as well of the Enrich service?

Hi Josh,

  1. The topic its reading from has 200 partitions and the topic its writing to has 20, this could indeed partly cause the bottleneck.
  2. Since the enricher reads from kafka and writes to Kafka I’m not really sure how the downstream applications could have any impact on the enricher lag ?
  3. No in the logs nothing can be found.
  4. Enrich application is the following: snowplow/stream-enrich-kafka:3.7.1
  5. We have the following enrichments: campaign_attribution, ip_lookup, referer_parser, yauaa.
# Copyright (c) 2013-2020 Snowplow Analytics Ltd. All rights reserved.
#
# This program is licensed to you under the Apache License Version 2.0, and
# you may not use this file except in compliance with the Apache License
# Version 2.0.  You may obtain a copy of the Apache License Version 2.0 at
# http://www.apache.org/licenses/LICENSE-2.0.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the Apache License Version 2.0 is distributed on an "AS
# IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.  See the Apache License Version 2.0 for the specific language
# governing permissions and limitations there under.

# This file (application.conf.example) contains a template with
# configuration options for Stream Enrich.
ENRICH_STREAMS_SINK_MIN_BACKOFF=40
ENRICH_STREAMS_SINK_MAX_BACKOFF=200
ENRICH_STREAMS_BUFFER_BYTE_LIMIT=10000
ENRICH_STREAMS_BUFFER_RECORD_LIMIT=1000
ENRICH_STREAMS_BUFFER_TIME_LIMIT=2000
enrich {

  streams {

    in {
      # Stream/topic where the raw events to be enriched are located
      raw = "good_collector"
    }

    out {
      # Stream/topic where the events that were successfully enriched will end up
      enriched = "good_enriched"
      # Stream/topic where the event that failed enrichment will be stored
      bad = "bad_enriched"
      # Stream/topic where the pii tranformation events will end up
      #pii = "datariver-pii-web-enricher-kinesis-stream-nonprod"
      #pii = ${?ENRICH_STREAMS_OUT_PII}

      # How the output stream/topic will be partitioned.
      # Possible partition keys are: event_id, event_fingerprint, domain_userid, network_userid,
      # user_ipaddress, domain_sessionid, user_fingerprint.
      # Refer to https://github.com/snowplow/snowplow/wiki/canonical-event-model to know what the
      # possible parittion keys correspond to.
      # Otherwise, the partition key will be a random UUID.
      # Note: Nsq does not make use of partition key.
      partitionKey = "event_id"
      #partitionKey = ${?ENRICH_STREAMS_OUT_PARTITION_KEY}
    }

    # Configuration shown is for Kafka, to use another uncomment the appropriate configuration
    # and comment out the other
    # To use stdin, comment or remove everything in the "enrich.streams.sourceSink" section except
    # "enabled" which should be set to "stdin".
    sourceSink {
      # Sources / sinks currently supported are:
      # 'kinesis' for reading Thrift-serialized records and writing enriched and bad events to a
      # Kinesis stream
      # 'kafka' for reading / writing to a Kafka topic
      # 'nsq' for reading / writing to a Nsq topic
      # 'stdin' for reading from stdin and writing to stdout and stderr
      enabled =  "kafka"

      # Region where the streams are located (AWS region, pertinent to kinesis sink/source type)
      # region = ${?ENRICH_STREAMS_SOURCE_SINK_REGION}
      backoffPolicy {
        minBackoff = 40
        maxBackoff = 200
      }

      aws {
        accessKey = default
        secretKey = default
      }

      # Or Kafka (Comment out for other types)
      brokers = ${?BROKERS}
      # Number of retries to perform before giving up on sending a record
      retries = 0
      # The kafka producer has a variety of possible configuration options defined at
      # https://kafka.apache.org/documentation/#producerconfigs
      # Some values are set to other values from this config by default:
      # "bootstrap.servers" -> brokers
      # retries             -> retries
      # "buffer.memory"     -> buffer.byteLimit
      # "linger.ms"         -> buffer.timeLimit
      producerConf {
        "key.serializer"     = "org.apache.kafka.common.serialization.StringSerializer"
        "value.serializer"   = "org.apache.kafka.common.serialization.StringSerializer"
        "sasl.client.callback.handler.class" = "software.amazon.msk.auth.iam.IAMClientCallbackHandler"
        "sasl.mechanism" = "AWS_MSK_IAM"
        "sasl.jaas.config" = "software.amazon.msk.auth.iam.IAMLoginModule required;"
        "security.protocol" = "SASL_SSL"
        "buffer.memory" = 10000000
      }
      # The kafka consumer has a variety of possible configuration options defined at
      # https://kafka.apache.org/documentation/#consumerconfigs
      # Some values are set to other values from this config by default:
      # "bootstrap.servers" -> brokers
      # "group.id"          -> appName
      consumerConf {
        "enable.auto.commit" = true
        "auto.commit.interval.ms" = 1000
        "auto.offset.reset"  = earliest
        "session.timeout.ms" = 30000
        "buffer.memory" = 10000000
        "key.deserializer"   = "org.apache.kafka.common.serialization.StringDeserializer"
        "value.deserializer" = "org.apache.kafka.common.serialization.ByteArrayDeserializer"
        "sasl.client.callback.handler.class" = "software.amazon.msk.auth.iam.IAMClientCallbackHandler"
        "sasl.mechanism" = "AWS_MSK_IAM"
        "sasl.jaas.config" = "software.amazon.msk.auth.iam.IAMLoginModule required;"
        "security.protocol" = "SASL_SSL"
      }

    }

    # After enrichment, events are accumulated in a buffer before being sent to Kinesis/Kafka.
    # Note: Buffering is not supported by NSQ.
    # The buffer is emptied whenever:
    # - the number of stored records reaches recordLimit or
    # - the combined size of the stored records reaches byteLimit or
    # - the time in milliseconds since it was last emptied exceeds timeLimit when
    #   a new event enters the buffer
    buffer {

      byteLimit = 1000000
      recordLimit = 450
      timeLimit = 3000
    }

    # Used for a DynamoDB table to maintain stream state.
    # Used as the Kafka consumer group ID.
    # Used as the Google PubSub subscription name.
    appName = "enricher-kafka"
    #appName = ${?ENRICH_STREAMS_APP_NAME}
  }
}

Thanks for the help!

So the most immediate thing that pops out at me is the disparity between the partitions you are reading from and the number you are writing out into.

This feels like a natural bottleneck that you will hit during peak periods where you simply cannot write data out fast enough. We don’t use the MSK internally but are there any metrics to show “write” saturation in the good_enriched topic?

Indeed by looking at this graph we can see that the writing to the good_enriched always caps even during peak hours, but since we added more partitions to the good_enriched we see that we were able to go above that cap (Last part of the graph), so thank you for the help already, I also read that the “acks=1” setting could maybe help improve the throught put since it only waits for the master of the partition to acknowledge the message, could you confirm that ?

Hi @Alexandre5602 I mentioned this to the team and they directed me to this old thread (Scaling Snowplow Kafka Enricher).

That version of Stream Enrich Kafka is sub-optimal as it does not allow for multi-threading so has poor CPU usage patterns - the solution is, largely, to move to the latest version of Enrich Kafka which has solved these problems.