Snowplow collector throws InterruptedException

Hi there! I’m running snowplow collector in k8s. There is a problem when one of collectors pod is terminated I’m starting to get 502 errors with following stacktrace. Could you advice if this can be caused by executorService.awaitTermination(10000, MILLISECONDS) with hardcoded time of 10 seconds and this time should be increased ?

[Thread-1] WARN com.snowplowanalytics.snowplow.collectors.scalastream.KinesisCollector$ - Received shutdown signal
[Thread-1] WARN com.snowplowanalytics.snowplow.collectors.scalastream.KinesisCollector$ - setting health endpoint to unhealthy
[Thread-1] WARN com.snowplowanalytics.snowplow.collectors.scalastream.KinesisCollector$ - Sleeping for 10 seconds
[pool-1-thread-4] INFO com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink - Successfully wrote 480 out of 480 records to Kinesis stream snowplow_raw_good.
[scala-stream-collector-akka.actor.default-dispatcher-12] INFO com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink - Writing 480 Thrift records to Kinesis stream snowplow_raw_good.
[pool-1-thread-10] INFO com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink - Successfully wrote 480 out of 480 records to Kinesis stream snowplow_raw_good.
[scala-stream-collector-akka.actor.default-dispatcher-14] INFO com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink - Writing 480 Thrift records to Kinesis stream snowplow_raw_good.
[Thread-1] WARN com.snowplowanalytics.snowplow.collectors.scalastream.KinesisCollector$ - Initiating http server termination
[pool-1-thread-7] INFO com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink - Successfully wrote 480 out of 480 records to Kinesis stream snowplow_raw_good.
[scala-stream-collector-akka.actor.default-dispatcher-8] INFO com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink - Writing 480 Thrift records to Kinesis stream snowplow_raw_good.
[Thread-1] WARN com.snowplowanalytics.snowplow.collectors.scalastream.KinesisCollector$ - Server terminated
[scala-stream-collector-akka.actor.default-dispatcher-16] WARN com.snowplowanalytics.snowplow.collectors.scalastream.KinesisCollector$ - Initiating good sink shutdown
[scala-stream-collector-akka.actor.default-dispatcher-16] INFO com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink - Writing 22 Thrift records to Kinesis stream snowplow_raw_good.
[scala-stream-collector-akka.actor.default-dispatcher-18] WARN com.snowplowanalytics.snowplow.collectors.scalastream.KinesisCollector$ - Initiating bad sink shutdown
java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@75aa31ef[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@26a3fae7[Wrapped task = scala.concurrent.impl.CallbackRunnable@6a44b6a4]] rejected from java.util.concurrent.ScheduledThreadPoolExecutor@3b406ee5[Shutting down, pool size = 10, active threads = 2, queued tasks = 2, completed tasks = 94]
	at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor.reject(Unknown Source)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(Unknown Source)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor.schedule(Unknown Source)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor.execute(Unknown Source)
	at scala.concurrent.impl.ExecutionContextImpl$$anon$4.execute(ExecutionContextImpl.scala:138)`Preformatted text`

Hi @Serhii_Dimchenko if you’re getting 502s when a collector shuts down, then it’s probably because your load balancer is still sending http requests to the pod even after the pod starts to terminate. To avoid the 502s you need to make sure the pod stays alive for longer than it takes the load balancer to re-route requests to a different pod. There are two configuration changes that can help you with this:

  • Set the collector config option preTerminationPeriod (described here) to a much larger value, so the collector services requests for longer after receiving the sigterm.
  • Set the kubernetes pod config option terminationGracePeriodSeconds (described here) so kubernetes allows the collector pod plenty of time to stay alive after receiving the signal.

If it helps, I wrote a bit about configuring graceful shutdown in the collector 2.5.0 release notes.

1 Like