Scaling Snowplow Kafka Enricher

I can’t seem to understand how the Kafka Enricher scales.

I’m running very 2.0.5 of stream-enrich-kafka in docker-compose. I’m running on Linux c5n.4xlarge, amzn linux 2. Docker version: Docker version 20.10.13, build a224086, docker-compose: docker-compose version 1.29.2, build unknown.

Scenario:

  • We had a giant backup messages on our good collector stream (the stream the enricher processes off of). We only had 4 partitions (with two replicas) and the CPU was low on the enricher instance (< 10%). The CPU was also low on the kafka cluster.
  • Using AWS MSK (managed kafka) metrics, we saw that the enricher was processing around 425 messages a second.

We increased the CPU from 2 to 16 to try to see that effect.

  • No increase in messages processed. As we expected due to CPU metrics, but this also told us that the EC2 instance was likely not network bound either.

We figured that we didn’t have enough partitions to process the messages fast enough. So we created a new topic with 40 partitions. We ran the same 16 vCPU enricher instance against it that was running the enricher previously.

  • It is still processing basically the same number of messages (~480 per second or 160 per broker per second). This was shocking. The CPU on the instance is just a few percentage points higher (I assume to iterate through the extra partitions). The only good thing that happened is the message processing is more spread across the three brokers (likely because going from 4 to 40 partitions reduced hot partitions on certain brokers since 4 isn’t divisible by 3).

Questions:

  1. Does the Snowplow Enricher use threads? I was under the impression it spun up a thread per CPU or some such thing. In which case, I’d expect a 16 vCPU enricher to increase dramatically when trying to process 40 partitions vs 4.

  2. Any other ideas of what might be limiting things? I tried increasing the instance size to get more network, but hit same limits. I have not tried increasing kafka node size to see if network increase would help there, but I’d be shocked if that was the issue honestly.

  3. I know I can do ack = 1 rather than ack = all to possibly speed things up, but I’d like to understand this behavior first. I’ve used kafka before, and this behavior doesn’t make sense…it should be able to scale close to linearly as you increase the number of partitions and this isn’t even close. (10x the partitions led to 13% increase in message processing).

  4. I also know I should be spreading the enricher over many containers for redundancy, but again, I want to understand the scaling with one instance before spreading out across many.

SETTINGS
docker-compose.yml:

  snowplow-stream-enrich:
    image: snowplow/stream-enrich-kafka:2.0.5
    command: [
      "--config", "/snowplow/enricher.hocon",
      "--resolver", "file:/snowplow/resolver.json",
      "--enrichments", "file:/snowplow/enrichments/"
    ]
    volumes:
      - ./snowplow:/snowplow
    environment:
      - "SP_JAVA_OPTS=-Xms512m -Xmx512m"

enricher.hocon

enrich {

  streams {

    in {
      # Stream/topic where the raw events to be enriched are located
      raw = sp_collector_good_events2
      raw = ${?ENRICH_STREAMS_IN_RAW}
    }

    out {
      # Stream/topic where the events that were successfully enriched will end up
      enriched = sp_enricher_good_events2
      enriched = ${?ENRICH_STREAMS_OUT_ENRICHED}
      # Stream/topic where the event that failed enrichment will be stored
      bad = sp_enricher_bad_events2
      bad = ${?ENRICH_STREAMS_OUT_BAD}
      # Stream/topic where the pii tranformation events will end up
      pii = ${?ENRICH_STREAMS_OUT_PII}

      # How the output stream/topic will be partitioned.
      # Possible partition keys are: event_id, event_fingerprint, domain_userid, network_userid,
      # user_ipaddress, domain_sessionid, user_fingerprint.
      # Refer to https://github.com/snowplow/snowplow/wiki/canonical-event-model to know what the
      # possible parittion keys correspond to.
      # Otherwise, the partition key will be a random UUID.
      partitionKey = domain_userid
      partitionKey = ${?ENRICH_STREAMS_OUT_PARTITION_KEY}
    }

    sourceSink {
      # Sources / sinks currently supported are:
      # 'kafka' for reading / writing to a Kafka topic
      enabled =  kafka
      enabled =  ${?ENRICH_STREAMS_SOURCE_SINK_ENABLED}

      # Or Kafka (Comment out for other types)
      brokers = "<REDACTED: list of 3 brokers with 9092 ports>"
      # Number of retries to perform before giving up on sending a record
      retries = 0
      # The kafka producer has a variety of possible configuration options defined at
      # https://kafka.apache.org/documentation/#producerconfigs
      # Some values are set to other values from this config by default:
      # "bootstrap.servers" -> brokers
      # retries             -> retries
      # "buffer.memory"     -> buffer.byteLimit
      # "linger.ms"         -> buffer.timeLimit
      #producerConf {
      #  acks = all
      #  "key.serializer"     = "org.apache.kafka.common.serialization.StringSerializer"
      #  "value.serializer"   = "org.apache.kafka.common.serialization.StringSerializer"
      #}
      # The kafka consumer has a variety of possible configuration options defined at
      # https://kafka.apache.org/documentation/#consumerconfigs
      # Some values are set to other values from this config by default:
      # "bootstrap.servers" -> brokers
      # "group.id"          -> appName
      #consumerConf {
      #  "enable.auto.commit" = true
      #  "auto.commit.interval.ms" = 1000
      #  "auto.offset.reset"  = earliest
      #  "session.timeout.ms" = 30000
      #  "key.deserializer"   = "org.apache.kafka.common.serialization.StringDeserializer"
      #  "value.deserializer" = "org.apache.kafka.common.serialization.ByteArrayDeserializer"
      #}

    }

    # After enrichment, events are accumulated in a buffer before being sent to Kinesis/Kafka.
    # The buffer is emptied whenever:
    # - the number of stored records reaches recordLimit or
    # - the combined size of the stored records reaches byteLimit or
    # - the time in milliseconds since it was last emptied exceeds timeLimit when
    #   a new event enters the buffer
    buffer {
      byteLimit = 1000000
      byteLimit = ${?ENRICH_STREAMS_BUFFER_BYTE_LIMIT}
      recordLimit = 0 # Not supported by Kafka; will be ignored
      recordLimit = ${?ENRICH_STREAMS_BUFFER_RECORD_LIMIT}
      timeLimit = 30000
      timeLimit = ${?ENRICH_STREAMS_BUFFER_TIME_LIMIT}
    }

    # Used as the Kafka consumer group ID.
    appName = "snowplow2"
    appName = ${?ENRICH_STREAMS_APP_NAME}
  }
}

Thanks!
Patrick

I went back to 4 partitions and added another server. So two containers, running on two servers, two partitions assigned to each.

The number of messages doubled.

All this data seems to indicate to me that either my docker settings are wrong or scala enrich is not multi-threaded. Can someone confirm this please?

I’m using the default settings on docker and docker-compose which should be unlimited CPUs on linux. Here are the output from docker inspect on the container:

            "CpusetCpus": "",
            "CpusetMems": "",

So I have no CPU limits set.

Hi @pcb, I have not worked much with the kafka variant of stream-enrich, but from looking at this code here I’m fairly sure it is not mutli-threaded. My best suggestion is to scale horizontally instead of vertically; or in other words run multiple instances of the app, each on a small node with 1 or 2 CPUs.

There is some good news though! In the next couple of months we will be working on a completely new implementation of enrich for kafka. It will use the same core as our recent enrich apps for kinesis and pubsub and it will be designed from ground up to use every thread available as efficiently as possible. We will announce it here on Discourse when it’s ready.

1 Like

Awesome thanks.

Yeah I spun up a second container on both servers for a total of 4 containers for 4 partitions. The number of messages processed doubled again.

So metrics definitely backup what you are saying.

I’ll scale horizontally, thanks a bunch!
Patrick

1 Like