Kinesis stream enrich failing - TimeoutException

Hi
We are running docker version of Snowplow Stream Enrich 0.22 on AWS ECS. We have 2 shards on the source and 4 on the destination. There are 3 ECS tasks running.

We started seeing similar errors as here: Kinesis stream enrich failing - For engineers / AWS real-time pipeline - Discourse – Snowplow (snowplowanalytics.com). We have increased ECS memory and cpu but that doesn’t seem to help.

  # Maximum number of records to get from Kinesis per call to GetRecords
  maxRecords = 10000

  # LATEST: most recent data.
  # TRIM_HORIZON: oldest available data.
  # "AT_TIMESTAMP": Start from the record at or after the specified timestamp
  # Note: This only effects the first run of this application on a stream.
  initialPosition = TRIM_HORIZON

  # Need to be specified when initial-position is "AT_TIMESTAMP".
  # Timestamp format need to be in "yyyy-MM-ddTHH:mm:ssZ".
  # Ex: "2017-05-17T10:00:00Z"
  # Note: Time need to specified in UTC.
  initialTimestamp = "{{initialTimestamp}}"

  # Minimum and maximum backoff periods, in milliseconds
  backoffPolicy {
    minBackoff = 500
    maxBackoff = 10000
  }

}

# After enrichment, events are accumulated in a buffer before being sent to Kinesis/Kafka.
# The buffer is emptied whenever:
# - the number of stored records reaches recordLimit or
# - the combined size of the stored records reaches byteLimit or
# - the time in milliseconds since it was last emptied exceeds timeLimit when
#   a new event enters the buffer
buffer {
  byteLimit = 4500000
  recordLimit = 500 # Not supported by Kafka; will be ignored
  timeLimit = 60000
}

The destination stream seems to be getting records but they look duplicates since collector timestamp ins’t increasing. AWS Kinesis support didn’t see anything from the stream perspective.

The logs:
[RecordProcessor-0000] ERROR com.snowplowanalytics.snowplow.enrich.stream.sinks.KinesisSink - Writing failed.
java.util.concurrent.TimeoutException: Futures timed out after [10 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:223)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:190)
at com.snowplowanalytics.snowplow.enrich.stream.sinks.KinesisSink.sendBatch(KinesisSink.scala:217)
at com.snowplowanalytics.snowplow.enrich.stream.sinks.KinesisSink$$anonfun$flush$1.apply(KinesisSink.scala:191)
at com.snowplowanalytics.snowplow.enrich.stream.sinks.KinesisSink$$anonfun$flush$1.apply(KinesisSink.scala:191)
at scala.collection.immutable.List.foreach(List.scala:392)
at com.snowplowanalytics.snowplow.enrich.stream.sinks.KinesisSink.flush(KinesisSink.scala:191)
at com.snowplowanalytics.snowplow.enrich.stream.sources.Source.enrichAndStoreEvents(Source.scala:255)
at com.snowplowanalytics.snowplow.enrich.stream.sources.KinesisSource$RawEventProcessor.processRecordsWithRetries(KinesisSource.scala:189)
at com.snowplowanalytics.snowplow.enrich.stream.sources.KinesisSource$RawEventProcessor.processRecords(KinesisSource.scala:180)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.V1ToV2RecordProcessorAdapter.processRecords(V1ToV2RecordProcessorAdapter.java:42)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.callProcessRecords(ProcessTask.java:221)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.call(ProcessTask.java:176)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:49)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:24)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[RecordProcessor-0000] ERROR com.snowplowanalytics.snowplow.enrich.stream.sinks.KinesisSink - + Retrying in 10000 milliseconds…
[RecordProcessor-0000] ERROR com.snowplowanalytics.snowplow.enrich.stream.sinks.KinesisSink - Writing failed.

Hi @nevakj ,

First of all, please note that your version is 1.5 years old, latest enrich version is 1.4.2. You can find the latest setup guide on our docs website.

Do you mean that you also see ProvisionedThroughputExceededException or rather Java heap space ?

Could you share the same graphs as below for your enriched stream please ? You can find them in Kinesis UI in Monitoring tab.

1 Like

Hi Ben

I rather meant that we see the error java.util.concurrent.TimeoutException: Futures timed out after [10 seconds] which was in the other thread as well.

We realize that we are behind in version a lot and will try to update today. It’s just weird that the docker image is the same at it has been working correctly so far.

To me it seems that Snowplow Enrich can succesfully write to the enrich stream but will error before acknowledgement of success. Then it will try to resend the same hits over and over again. We see this down the stream: etl_tstamp of hits is growing but the collector_tstamp is not.

Please find the graphs below. It’s weird that the graphs are highest at night when the traffic is the lowest.


We succesfully updated stream-enrich-kinesis to latest version but the error persists

[RecordProcessor-0000] INFO com.snowplowanalytics.snowplow.enrich.stream.sinks.KinesisSink - Writing 500 records to Kinesis stream snowplow-enrich-good-prod
[RecordProcessor-0000] ERROR com.snowplowanalytics.snowplow.enrich.stream.sinks.KinesisSink - Writing failed.
java.util.concurrent.TimeoutException: Futures timed out after [10 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:259)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:263)
at scala.concurrent.Await$.$anonfun$result$1(package.scala:220)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:57)
at scala.concurrent.Await$.result(package.scala:146)
at com.snowplowanalytics.snowplow.enrich.stream.sinks.KinesisSink.sendBatch(KinesisSink.scala:216)
at com.snowplowanalytics.snowplow.enrich.stream.sinks.KinesisSink.$anonfun$flush$1(KinesisSink.scala:190)
at com.snowplowanalytics.snowplow.enrich.stream.sinks.KinesisSink.$anonfun$flush$1$adapted(KinesisSink.scala:190)
at scala.collection.immutable.List.foreach(List.scala:392)
at com.snowplowanalytics.snowplow.enrich.stream.sinks.KinesisSink.flush(KinesisSink.scala:190)
at com.snowplowanalytics.snowplow.enrich.stream.sources.Source.enrichAndStoreEvents(Source.scala:287)
at com.snowplowanalytics.snowplow.enrich.stream.sources.KinesisSource$RawEventProcessor.processRecordsWithRetries(KinesisSource.scala:221)
at com.snowplowanalytics.snowplow.enrich.stream.sources.KinesisSource$RawEventProcessor.processRecords(KinesisSource.scala:214)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.V1ToV2RecordProcessorAdapter.processRecords(V1ToV2RecordProcessorAdapter.java:42)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.callProcessRecords(ProcessTask.java:221)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.call(ProcessTask.java:176)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:49)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:24)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
[RecordProcessor-0000] ERROR com.snowplowanalytics.snowplow.enrich.stream.sinks.KinesisSink - + Retrying in 886 milliseconds…
[RecordProcessor-0000] ERROR com.snowplowanalytics.snowplow.enrich.stream.sinks.KinesisSink - Writing failed.

Decreasing the recordLimit from 500 to 100 seemed to help. Now we are able to see Success messages in logs. Still some java.util.concurrent.TimeoutException errors but at least the queue is now processed.

Any ideas why this happens? The JVM fails?

    buffer {
      byteLimit = 4500000
      recordLimit = 100 # Not supported by Kafka; will be ignored
      timeLimit = 60000
    }

[RecordProcessor-0000] INFO com.snowplowanalytics.snowplow.enrich.stream.sinks.KinesisSink - Successfully wrote 100 out of 100 records
[RecordProcessor-0000] INFO com.snowplowanalytics.snowplow.enrich.stream.sinks.KinesisSink - Writing 100 records to Kinesis stream snowplow-enrich-good-prod
[main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Current stream shard assignments: shardId-000000000001
[main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Sleeping …
[RecordProcessor-0000] ERROR com.snowplowanalytics.snowplow.enrich.stream.sinks.KinesisSink - Writing failed.
java.util.concurrent.TimeoutException: Futures timed out after [10 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:259)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:263)
at scala.concurrent.Await$.$anonfun$result$1(package.scala:220)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:57)
at scala.concurrent.Await$.result(package.scala:146)
at com.snowplowanalytics.snowplow.enrich.stream.sinks.KinesisSink.sendBatch(KinesisSink.scala:216)
at com.snowplowanalytics.snowplow.enrich.stream.sinks.KinesisSink.$anonfun$flush$1(KinesisSink.scala:190)
at com.snowplowanalytics.snowplow.enrich.stream.sinks.KinesisSink.$anonfun$flush$1$adapted(KinesisSink.scala:190)
at scala.collection.immutable.List.foreach(List.scala:392)
at com.snowplowanalytics.snowplow.enrich.stream.sinks.KinesisSink.flush(KinesisSink.scala:190)
at com.snowplowanalytics.snowplow.enrich.stream.sources.Source.enrichAndStoreEvents(Source.scala:287)
at com.snowplowanalytics.snowplow.enrich.stream.sources.KinesisSource$RawEventProcessor.processRecordsWithRetries(KinesisSource.scala:221)
at com.snowplowanalytics.snowplow.enrich.stream.sources.KinesisSource$RawEventProcessor.processRecords(KinesisSource.scala:214)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.V1ToV2RecordProcessorAdapter.processRecords(V1ToV2RecordProcessorAdapter.java:42)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.callProcessRecords(ProcessTask.java:221)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.call(ProcessTask.java:176)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:49)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:24)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
[RecordProcessor-0000] ERROR com.snowplowanalytics.snowplow.enrich.stream.sinks.KinesisSink - + Retrying in 1129 milliseconds…
[RecordProcessor-0000] INFO com.snowplowanalytics.snowplow.enrich.stream.sinks.KinesisSink - Successfully wrote 100 out of 100 records
[RecordProcessor-0000] INFO com.snowplowanalytics.snowplow.enrich.stream.sinks.KinesisSink - Writing 100 records to Kinesis stream snowplow-enrich-good-prod
[RecordProcessor-0000] INFO com.snowplowanalytics.snowplow.enrich.stream.sinks.KinesisSink - Successfully wrote 100 out of 100 records
[RecordProcessor-0000] INFO com.snowplowanalytics.snowplow.enrich.stream.sinks.KinesisSink - Writing 100 records to Kinesis stream snowplow-enrich-good-prod
[RecordProcessor-0000] INFO com.snowplowanalytics.snowplow.enrich.stream.sinks.KinesisSink - Successfully wrote 100 out of 100 records
[RecordProcessor-0000] INFO com.snowplowanalytics.snowplow.enrich.stream.sinks.KinesisSink - Writing 100 records to Kinesis stream snowplow-enrich-good-prod

Thanks for the details @nevakj !

Great news!

It seems indeed that PutRecords request is sent to Kinesis but enrich times out before it can receive the request. I think that there are 2 things to try :

  1. Decrease byteLimit (e.g. to 1000000). If this is the limit that is reached first, it will send smaller batches to Kinesis, thus reducing the time to receive the response.
  2. Increase maxBackoff(e.g. to 30000)

Thanks for the tips. Before making the changes you suggested we still saw some errors and the processing wasn’t able to keep up with the stream. The stream is processed but very slowly.

Source collector stream:

Destination enrich stream

I now did the changes @BenB you suggested and decreased the recordLimit from 100->50. Some errors still occur but fewer.

I also increased the collector stream shards from 2->4 hoping to increase parallelism. One of our containers has been sleeping and not getting shard leases. I’m hoping the shard increase will help this. Is my thinking correct?

Any more suggestions how we could improve performance while keeping the errors at minimum?

Thanks a lot!

The issue has been resolved. The root cause was network traffic spike in our AWS Transit VPC which was caused by another service. This caused the firewall to act extremely slow between our private subnet Snowplow Enrich and Kinesis Data Streams. This slowness caused the timeout errors seen in the logs. When the firewall issue was fixed the errors disappeared and the enrich process speeded up rapidly and was able to catch the source stream very quickly.

Even though Snowplow wasn’t the issue it was valuable lesson for us to be able to change Snowplow configuration to adapt to abnormal situation. We able to process most messages with 5+5 shards, maxRecords 500 and recordLimit 50. This kept most of the Processors running despite some errors.

Now that the situation has stabilized will probably slowly increase the record counts to reduce the calls to Kinesis.

Thanks for the help @BenB

That’s great news @nevakj !

Thanks for sharing your findings!

Cheers