Scaling kinesis enricher for high loads

We are using Scala sream collectors -> kinesis stream->Scala stream enrich-> kinesis stream in an elastic beanstalk applications for both collectors and enricher separately… We are running a load of roughly 20krps. What we are seeing is that enrichers are not catching up to the load and we are getting very low throughput from enrichers.
The configs are as follows
enricher :
threadPoolSize = 100
maxRecords = 100
buffer {
byteLimit = 5000000
recordLimit = 100
timeLimit = 2000
we are running at (over-provisioned) 60 shards for source and sink.
we are seeing a tremendous rise in Millis Behind Latest Average for kinesis.
any suggestions/recommendations to improve the performance of enrichers to come closer to collectors. I understand it won’t be a 1:1 given the tasks enricher has to do but this is way too slow.
Thanks in advance.

Hi @kaushikbhanu the most likely issue is with the fairly low limits on both recordLimit and maxRecords settings causing excessive gets and puts to and from the Kinesis streams.

I would recommend bumping up maxRecords from 100 to at least 1000 if not 10000 records - this limit imposes how many records you want to get from Kinesis each time you process some records. The extra time imposed by having to do so many extra gets can be a contributor to the process slowing down.

It should also be safe to bump recordLimit from 100 to something around 500 which again will reduce the amount of times you are sending data to Kinesis and reduces network traffic.

Other throttling causes can include:

  1. Lots of bad data with an insufficiently provisioned “bad” kinesis stream - you need to be able to PUT to both the “good” and “bad” paths equally; if either of them is under provisioned the application will slow down and fall behind
  2. Lots of bad data from POST requests. If you have lots of “bad” records within a POST request this can cause an explosion in network traffic as for each bad event within a POST request a full copy of the payload is sent to the bad stream - this multiplication factor can bring a realtime pipeline to its knees very quickly.
  3. Insufficiently provisioned DynamoDB table for the stream enrich service to checkpoint against. If your DynamoDB table for checkpoints has a very low amount of write throughput this can prevent the KCL from advancing at the pace it needs to.
  4. Server could have simply gone bad. Occasionally EC2 malfunctions and a reboot / replacement of the server is needed to get things humming again.
  5. Server is insufficiently provisioned for the load. What are your CPU and Memory statistics? If the server is running at 100% it might not be able to go faster - have you tried scaling up the processing cluster?

I understand it won’t be a 1:1 given the tasks enricher has to do but this is way too slow.

You will always need more compute for enrichment than collection as the former actually has to do a lot of work in validating events and then applying enrichments. The collector by comparison does very very little so using equivalent compute will never work for these services unless you have very low volume!

Hope this helps!

1 Like

Thanks @josh Thats good feedback.
I have adjusted the maxRecords and recordLimit.
I am using enricher as elastic beanstalk worker environment. I have 10 m4.2xl instances. Currently we are doing a sustained load test. the instances are ~50% cpu with all in healthy state. We are not sending any bad records so 1) and 2) are not the case here yet. (confirmed with kinesis metric)
DynamoDB is provisioned for 100 and autoscaling to 1000 if required.
Shard count is 60 for sink and source for a 20krps which is fairly high.

Still we are getting nowhere close to 20krps from the enricher. :frowning:

Hmm that is strange - have you checked the application log output to see if there are any clues in there?

No, nothing weird there. the logs seem alright. And no kinesis read.write exceptions either.

Is it possible that you built up a large backlog in a smaller amount of shards before scaling it up to 60? The way the KCL is implemented we need to process all child shards before it can start on the active parent shards so until the child shards have been fully processed it won’t be able to access the higher bandwidth.

no we started with 60 shards.

Hi @kaushikbhanu this then might be a server configuration issue - how are the stream enrich processes currently configured? What is the memory allocation to the process? What are your maxfile settings on the server? What is the memory usage on the server (are you swapping to disk?)

@josh We have Stream Enrichers running in elastic beanstalk environment. running 10 m4.2xls (which is too High already). Already raised the maxRecords to 5000. We are not swapping disks, we have 500GB provisioned and we are rotating logs.

@josh … this is what my question is , if I want to use enricher to process 100k rps throughput how would I achieve that.

Hi @kaushikbhanu - so using the online calculator from Amazon they estimate that if you have sustained 100k RPS with an average payload byte size of 5 KB that you would need 489 shards per stream to handle that amount of throughput:

As each shard can only sustain 1 MB per second of throughput this does make some sense as that is a pretty huge amount of traffic!

From what you are saying my best guess is that the bottleneck is in Kinesis and not within your EC2 setup - that amount of servers should be able to handle quite a considerable amount of traffic but there is something holding it back. This again could be a server misconfiguration where the actual application does not have enough access to the host VM resources to work harder - we do not run this stack on ElasticBeanstalk internally so cannot provide much help in that regard (we use EC2 auto-scaling groups that publish application logs to CloudWatch directly).

If you cannot see any issues with any of the applications and there is no contention on the streams with exceeding read or write throughputs there is no other reason why it would be falling behind in processing.

@josh what we are seeing is that that the cpu usage is ~50% only on these instances. we tried by increasing the shard limit to 200 while we are receiving only 30krps right now. However by making these changes we still don’t see any improvement in the records processing. here is the config we are running

threadPoolSize = 500
maxRecords = 10000
buffer {
byteLimit = 5000000
recordLimit = 1000
timeLimit = 2000

any recommendations ?