Stream-Transformer-Kinesis crashes when there are huge amount of events

I’m using the latest version snowplow/transformer-kinesis:4.2.1 of stream transformer for kinesis running as pod in EKS Kubernetes cluster. everything is working great in the staging environment where we have small number of events but when i deployed it on production environment where we have average 1.5k + events per minute it starts normally but after 10-15 seconds it gives the following error. I suspected that kinesis throughput might be reaching and thats why i’m facing this error. i have used the FanOut option and configured enhanced fanout for kinesis where i can see the transformer as consumer. i also tried to tripple the no. of shards (16)

My config looks like this

{
      "input": {
        "type": "kinesis",
        "appName": "{{ .Values.config.streamTransformer.appName }}",
        "streamName": "{{ .Values.config.streamTransformer.kinesisStream }}",
        "position": "LATEST",
        "region": "eu-central-1",
        "retrievalMode": {
          "type": "{{ .Values.config.streamTransformer.readType }}",
          "maxRecords": 10000
        }
    
      },
      "bufferSize": 3,
      "output": {
        "path": "s3://{{ .Values.config.streamTransformer.s3Bucket }}/events/stream-transformer/",
        "region": "eu-central-1"
      },
      "queue": {
        "type": "sqs",
        "queueName": "{{ .Values.config.streamTransformer.messageQueue }}",
        "region": "eu-central-1"
      },
      "formats": {
        "transformationType": "shred",
        "default": "TSV"
      },
      "windowing": "5 minutes",
      "validations": {
        "minimumTimestamp": "2022-01-01T11:00:00.00Z"
      }
    }

Following is the error log

2022-08-10 20:59:40	
{"log":"[ioapp-compute-1] INFO com.snowplowanalytics.snowplow.rdbloader.transformer.kinesis.sinks.generic.KeyedEnqueue - Pulling 425 elements for output=good/vendor=kis/name=sessionBasicsCon/format=tsv/model=1/\n","stream":"stderr","time":"2022-08-10T18:59:39.919600052Z"}
2022-08-10 20:59:40	
{"log":"[ioapp-compute-1] INFO com.snowplowanalytics.snowplow.rdbloader.transformer.kinesis.sinks.generic.KeyedEnqueue - Pulling 425 elements for output=good/vendor=kis/name=botDetectionCon/format=tsv/model=1/\n","stream":"stderr","time":"2022-08-10T18:59:39.921426408Z"}
2022-08-10 20:59:40	
{"log":"[ioapp-compute-1] INFO com.snowplowanalytics.snowplow.rdbloader.transformer.kinesis.sinks.generic.KeyedEnqueue - Pulling 42 elements for output=good/vendor=kis/name=wishCon2/format=tsv/model=1/\n","stream":"stderr","time":"2022-08-10T18:59:39.980226806Z"}
2022-08-10 20:59:40	
{"log":"[ioapp-compute-1] INFO com.snowplowanalytics.snowplow.rdbloader.transformer.kinesis.sinks.generic.KeyedEnqueue - Pulling 5 elements for output=good/vendor=kis/name=wishCon1/format=tsv/model=1/\n","stream":"stderr","time":"2022-08-10T18:59:39.983047735Z"}
2022-08-10 20:59:40	
{"log":"[ioapp-compute-1] INFO com.snowplowanalytics.snowplow.rdbloader.transformer.kinesis.sinks.generic.KeyedEnqueue - Pulling 5 elements for output=good/vendor=kis/name=sliderItemClick/format=tsv/model=1/\n","stream":"stderr","time":"2022-08-10T18:59:39.984757824Z"}
2022-08-10 20:59:40	
{"log":"[ioapp-compute-0] INFO com.snowplowanalytics.snowplow.rdbloader.transformer.kinesis.sinks.generic.KeyedEnqueue - Pulling 425 elements for output=good/vendor=kis/name=pageViewCon/format=tsv/model=1/\n","stream":"stderr","time":"2022-08-10T18:59:40.090735724Z"}
2022-08-10 20:59:40	
{"log":"[ioapp-compute-0] INFO com.snowplowanalytics.snowplow.rdbloader.transformer.kinesis.sinks.generic.KeyedEnqueue - Pulling 200 elements for output=good/vendor=kis/name=pageBasicsCon/format=tsv/model=1/\n","stream":"stderr","time":"2022-08-10T18:59:40.201350449Z"}
2022-08-10 20:59:56	
{"log":"[cw-metrics-publisher] WARN software.amazon.kinesis.metrics.CloudWatchMetricsPublisher - Could not publish 10 datums to CloudWatch\n","stream":"stderr","time":"2022-08-10T18:59:55.928589363Z"}
2022-08-10 20:59:56	
{"log":"java.util.concurrent.TimeoutException\n","stream":"stderr","time":"2022-08-10T18:59:55.928741389Z"}
2022-08-10 20:59:56	
{"log":"\u0009at java.base/java.util.concurrent.CompletableFuture.timedGet(Unknown Source)\n","stream":"stderr","time":"2022-08-10T18:59:55.928751057Z"}
2022-08-10 20:59:56	
{"log":"\u0009at java.base/java.util.concurrent.CompletableFuture.get(Unknown Source)\n","stream":"stderr","time":"2022-08-10T18:59:55.928756184Z"}
2022-08-10 20:59:56	
{"log":"\u0009at software.amazon.kinesis.metrics.CloudWatchMetricsPublisher.blockingExecute(CloudWatchMetricsPublisher.java:87)\n","stream":"stderr","time":"2022-08-10T18:59:55.928760927Z"}
2022-08-10 20:59:56	
{"log":"\u0009at software.amazon.kinesis.metrics.CloudWatchMetricsPublisher.publishMetrics(CloudWatchMetricsPublisher.java:74)\n","stream":"stderr","time":"2022-08-10T18:59:55.928765366Z"}
2022-08-10 20:59:56	
{"log":"\u0009at software.amazon.kinesis.metrics.CloudWatchPublisherRunnable.runOnce(CloudWatchPublisherRunnable.java:138)\n","stream":"stderr","time":"2022-08-10T18:59:55.928769617Z"}
2022-08-10 20:59:56	
{"log":"\u0009at software.amazon.kinesis.metrics.CloudWatchPublisherRunnable.run(CloudWatchPublisherRunnable.java:84)\n","stream":"stderr","time":"2022-08-10T18:59:55.92877407Z"}
2022-08-10 20:59:56	
{"log":"\u0009at java.base/java.lang.Thread.run(Unknown Source)\n","stream":"stderr","time":"2022-08-10T18:59:55.928778206Z"}
2022-08-10 20:59:57	
{"log":"[prefetch-cache-shardId-000000000013-0000] ERROR software.amazon.kinesis.retrieval.polling.PrefetchRecordsPublisher - data-team-final:shardId-000000000013 :  Exception thrown while fetching records from Kinesis\n","stream":"stderr","time":"2022-08-10T18:59:57.899111808Z"}
2022-08-10 20:59:57	
{"log":"software.amazon.awssdk.core.exception.SdkClientException: Unable to execute HTTP request: Java heap space\n","stream":"stderr","time":"2022-08-10T18:59:57.912981665Z"}
2022-08-10 20:59:57	
{"log":"\u0009at software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:98)\n","stream":"stderr","time":"2022-08-10T18:59:57.913170839Z"}
2022-08-10 20:59:57	
{"log":"\u0009at software.amazon.awssdk.core.exception.SdkClientException.create(SdkClientException.java:43)\n","stream":"stderr","time":"2022-08-10T18:59:57.913185076Z"}
2022-08-10 20:59:57	
{"log":"\u0009at software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:204)\n","stream":"stderr","time":"2022-08-10T18:59:57.913223255Z"}
2022-08-10 20:59:57	
{"log":"\u0009at software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:200)\n","stream":"stderr","time":"2022-08-10T18:59:57.913240417Z"}
2022-08-10 20:59:57	
{"log":"\u0009at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeRetryExecute(AsyncRetryableStage.java:179)\n","stream":"stderr","time":"2022-08-10T18:59:57.91324436Z"}
2022-08-10 20:59:57	
{"log":"\u0009at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.lambda$attemptExecute$1(AsyncRetryableStage.java:159)\n","stream":"stderr","time":"2022-08-10T18:59:57.913247857Z"}
2022-08-10 20:59:57	
{"log":"\u0009at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)\n","stream":"stderr","time":"2022-08-10T18:59:57.913250999Z"}
2022-08-10 20:59:57	
{"log":"\u0009at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)\n","stream":"stderr","time":"2022-08-10T18:59:57.913254125Z"}
2022-08-10 20:59:57	
{"log":"\u0009at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)\n","stream":"stderr","time":"2022-08-10T18:59:57.913257097Z"}
2022-08-10 20:59:57	
{"log":"\u0009at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(Unknown Source)\n","stream":"stderr","time":"2022-08-10T18:59:57.913260245Z"}
2022-08-10 20:59:57	
{"log":"\u0009at software.amazon.awssdk.utils.CompletableFutureUtils.lambda$forwardExceptionTo$0(CompletableFutureUtils.java:76)\n","stream":"stderr","time":"2022-08-10T18:59:57.913263442Z"}
2022-08-10 20:59:57	
{"log":"\u0009at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)\n","stream":"stderr","time":"2022-08-10T18:59:57.913266511Z"}
2022-08-10 20:59:57	
{"log":"\u0009at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)\n","stream":"stderr","time":"2022-08-10T18:59:57.913269355Z"}
2022-08-10 20:59:57	
{"log":"\u0009at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)\n","stream":"stderr","time":"2022-08-10T18:59:57.913334506Z"}
2022-08-10 20:59:57	
{"log":"\u0009at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(Unknown Source)\n","stream":"stderr","time":"2022-08-10T18:59:57.913341437Z"}
2022-08-10 20:59:57	
{"log":"\u0009at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$null$0(MakeAsyncHttpRequestStage.java:104)\n","stream":"stderr","time":"2022-08-10T18:59:57.913345257Z"}
2022-08-10 20:59:57	
{"log":"\u0009at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)\n","stream":"stderr","time":"2022-08-10T18:59:57.913348866Z"}
2022-08-10 20:59:57	
{"log":"\u0009at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)\n","stream":"stderr","time":"2022-08-10T18:59:57.913352015Z"}
2022-08-10 20:59:57	
{"log":"\u0009at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)\n","stream":"stderr","time":"2022-08-10T18:59:57.913355402Z"}
2022-08-10 20:59:57	
{"log":"\u0009at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(Unknown Source)\n","stream":"stderr","time":"2022-08-10T18:59:57.913358736Z"}
2022-08-10 20:59:57	
{"log":"\u0009at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$executeHttpRequest$3(MakeAsyncHttpRequestStage.java:209)\n","stream":"stderr","time":"2022-08-10T18:59:57.913362249Z"}
2022-08-10 20:59:57	
{"log":"\u0009at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)\n","stream":"stderr","time":"2022-08-10T18:59:57.913365967Z"}
2022-08-10 20:59:57	
{"log":"\u0009at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)\n","stream":"stderr","time":"2022-08-10T18:59:57.913425689Z"}
2022-08-10 20:59:57	
{"log":"\u0009at java.base/java.util.concurrent.CompletableFuture$Completion.run(Unknown Source)\n","stream":"stderr","time":"2022-08-10T18:59:57.913431402Z"}
2022-08-10 20:59:57	
{"log":"\u0009at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)\n","stream":"stderr","time":"2022-08-10T18:59:57.913435049Z"}
2022-08-10 20:59:57	
{"log":"\u0009at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)\n","stream":"stderr","time":"2022-08-10T18:59:57.913444295Z"}
2022-08-10 20:59:57	
{"log":"\u0009at java.base/java.lang.Thread.run(Unknown Source)\n","stream":"stderr","time":"2022-08-10T18:59:57.913448038Z"}
2022-08-10 20:59:57	
{"log":"Caused by: java.lang.OutOfMemoryError: Java heap space\n","stream":"stderr","time":"2022-08-10T18:59:57.913497175Z"}
2022-08-10 20:59:57	
{"log":"\u0009at software.amazon.awssdk.thirdparty.jackson.dataformat.cbor.CBORParser._finishBytes(CBORParser.java:2489)\n","stream":"stderr","time":"2022-08-10T18:59:57.913527101Z"}
2022-08-10 20:59:57	
{"log":"\u0009at software.amazon.awssdk.thirdparty.jackson.dataformat.cbor.CBORParser._finishToken(CBORParser.java:2119)\n","stream":"stderr","time":"2022-08-10T18:59:57.913531908Z"}
2022-08-10 20:59:57	
{"log":"\u0009at software.amazon.awssdk.thirdparty.jackson.dataformat.cbor.CBORParser.getEmbeddedObject(CBORParser.java:1691)\n","stream":"stderr","time":"2022-08-10T18:59:57.913535504Z"}
2022-08-10 20:59:57	
{"log":"\u0009at software.amazon.awssdk.protocols.jsoncore.JsonNodeParser.parseToken(JsonNodeParser.java:162)\n","stream":"stderr","time":"2022-08-10T18:59:57.913539117Z"}
2022-08-10 20:59:57	
{"log":"\u0009at software.amazon.awssdk.protocols.jsoncore.JsonNodeParser.parseObject(JsonNodeParser.java:173)\n","stream":"stderr","time":"2022-08-10T18:59:57.913542765Z"}
2022-08-10 20:59:57	
{"log":"\u0009at software.amazon.awssdk.protocols.jsoncore.JsonNodeParser.parseToken(JsonNodeParser.java:158)\n","stream":"stderr","time":"2022-08-10T18:59:57.913546431Z"}
2022-08-10 20:59:57	
{"log":"\u0009at software.amazon.awssdk.protocols.jsoncore.JsonNodeParser.parseArray(JsonNodeParser.java:183)\n","stream":"stderr","time":"2022-08-10T18:59:57.913611497Z"}
2022-08-10 20:59:57	
{"log":"\u0009at software.amazon.awssdk.protocols.jsoncore.JsonNodeParser.parseToken(JsonNodeParser.java:160)\n","stream":"stderr","time":"2022-08-10T18:59:57.913616477Z"}
2022-08-10 20:59:57	
{"log":"\u0009at software.amazon.awssdk.protocols.jsoncore.JsonNodeParser.parseObject(JsonNodeParser.java:173)\n","stream":"stderr","time":"2022-08-10T18:59:57.913620261Z"}
2022-08-10 20:59:57	
{"log":"\u0009at software.amazon.awssdk.protocols.jsoncore.JsonNodeParser.parseToken(JsonNodeParser.java:158)\n","stream":"stderr","time":"2022-08-10T18:59:57.913623744Z"}
2022-08-10 20:59:57	
{"log":"\u0009at software.amazon.awssdk.protocols.jsoncore.JsonNodeParser.parse(JsonNodeParser.java:116)\n","stream":"stderr","time":"2022-08-10T18:59:57.913627304Z"}
2022-08-10 20:59:57	
{"log":"\u0009at software.amazon.awssdk.protocols.jsoncore.JsonNodeParser.lambda$parse$0(JsonNodeParser.java:85)\n","stream":"stderr","time":"2022-08-10T18:59:57.913631065Z"}
2022-08-10 20:59:57	
{"log":"\u0009at software.amazon.awssdk.protocols.jsoncore.JsonNodeParser$$Lambda$1898/0x0000000100bf5840.get(Unknown Source)\n","stream":"stderr","time":"2022-08-10T18:59:57.913634594Z"}
2022-08-10 20:59:57	
{"log":"\u0009at software.amazon.awssdk.utils.FunctionalUtils.lambda$safeSupplier$4(FunctionalUtils.java:108)\n","stream":"stderr","time":"2022-08-10T18:59:57.913638228Z"}
2022-08-10 20:59:57	
{"log":"\u0009at software.amazon.awssdk.utils.FunctionalUtils$$Lambda$405/0x0000000100433040.get(Unknown Source)\n","stream":"stderr","time":"2022-08-10T18:59:57.91364183Z"}
2022-08-10 20:59:57	
{"log":"\u0009at software.amazon.awssdk.utils.FunctionalUtils.invokeSafely(FunctionalUtils.java:136)\n","stream":"stderr","time":"2022-08-10T18:59:57.913645734Z"}
2022-08-10 20:59:57	
{"log":"\u0009at software.amazon.awssdk.protocols.jsoncore.JsonNodeParser.parse(JsonNodeParser.java:82)\n","stream":"stderr","time":"2022-08-10T18:59:57.913649275Z"}
2022-08-10 20:59:57	
{"log":"\u0009at software.amazon.awssdk.protocols.json.internal.unmarshall.JsonProtocolUnmarshaller.unmarshall(JsonProtocolUnmarshaller.java:189)\n","stream":"stderr","time":"2022-08-10T18:59:57.9137122Z"}
2022-08-10 20:59:57	
{"log":"\u0009at software.amazon.awssdk.protocols.json.internal.unmarshall.JsonResponseHandler.handle(JsonResponseHandler.java:82)\n","stream":"stderr","time":"2022-08-10T18:59:57.913717041Z"}
2022-08-10 20:59:57	
{"log":"\u0009at software.amazon.awssdk.protocols.json.internal.unmarshall.JsonResponseHandler.handle(JsonResponseHandler.java:36)\n","stream":"stderr","time":"2022-08-10T18:59:57.913720147Z"}
2022-08-10 20:59:57	
{"log":"\u0009at software.amazon.awssdk.protocols.json.internal.unmarshall.AwsJsonResponseHandler.handle(AwsJsonResponseHandler.java:44)\n","stream":"stderr","time":"2022-08-10T18:59:57.913723729Z"}
2022-08-10 20:59:57	
{"log":"\u0009at software.amazon.awssdk.core.http.MetricCollectingHttpResponseHandler.lambda$handle$0(MetricCollectingHttpResponseHandler.java:52)\n","stream":"stderr","time":"2022-08-10T18:59:57.913732211Z"}
2022-08-10 20:59:57	
{"log":"\u0009at software.amazon.awssdk.core.http.MetricCollectingHttpResponseHandler$$Lambda$1886/0x0000000100bf3840.call(Unknown Source)\n","stream":"stderr","time":"2022-08-10T18:59:57.913736148Z"}
2022-08-10 20:59:57	
{"log":"\u0009at software.amazon.awssdk.core.internal.util.MetricUtils.measureDurationUnsafe(MetricUtils.java:64)\n","stream":"stderr","time":"2022-08-10T18:59:57.913739416Z"}
2022-08-10 20:59:57	
{"log":"\u0009at software.amazon.awssdk.core.http.MetricCollectingHttpResponseHandler.handle(MetricCollectingHttpResponseHandler.java:52)\n","stream":"stderr","time":"2022-08-10T18:59:57.913743122Z"}
2022-08-10 20:59:57	
{"log":"\u0009at software.amazon.awssdk.core.internal.handler.BaseClientHandler.lambda$resultTransformationResponseHandler$7(BaseClientHandler.java:249)\n","stream":"stderr","time":"2022-08-10T18:59:57.913746697Z"}
2022-08-10 20:59:57	
{"log":"\u0009at software.amazon.awssdk.core.internal.handler.BaseClientHandler$$Lambda$1715/0x0000000100ad6440.handle(Unknown Source)\n","stream":"stderr","time":"2022-08-10T18:59:57.913750411Z"}
2022-08-10 20:59:57	
{"log":"\u0009at software.amazon.awssdk.core.internal.http.async.AsyncResponseHandler.lambda$prepare$0(AsyncResponseHandler.java:89)\n","stream":"stderr","time":"2022-08-10T18:59:57.913754157Z"}
2022-08-10 20:59:57	
{"log":"\u0009at software.amazon.awssdk.core.internal.http.async.AsyncResponseHandler$$Lambda$1792/0x0000000100b00c40.apply(Unknown Source)\n","stream":"stderr","time":"2022-08-10T18:59:57.913757714Z"}
2022-08-10 20:59:57	
{"log":"\u0009at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown Source)\n","stream":"stderr","time":"2022-08-10T18:59:57.913806418Z"}
2022-08-10 20:59:57	
{"log":"\u0009at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)\n","stream":"stderr","time":"2022-08-10T18:59:57.913811348Z"}
2022-08-10 20:59:57	
{"log":"\u0009at java.base/java.util.concurrent.CompletableFuture.complete(Unknown Source)\n","stream":"stderr","time":"2022-08-10T18:59:57.913815028Z"}
2022-08-10 21:00:03	
{"log":"[pool-16-thread-1] INFO software.amazon.kinesis.coordinator.PeriodicShardSyncManager - WorkerId stream-transformer-969848cd4-74pn9:4bba6f0e-ed3c-436d-871e-849e42e2133f is leader, running the periodic shard sync task\n","stream":"stderr","time":"2022-08-10T19:00:03.483188428Z"}
2022-08-10 21:00:03	
{"log":"[pool-17-thread-1] INFO software.amazon.kinesis.leases.LeaseCleanupManager - Number of pending leases to clean before the scan : 0\n","stream":"stderr","time":"2022-08-10T19:00:03.838590103Z"}
2022-08-10 21:00:05	
{"log":"[pool-16-thread-1] INFO software.amazon.kinesis.coordinator.PeriodicShardSyncManager - Skipping shard sync for data-team-final due to the reason - Hash Ranges are complete for data-team-final\n","stream":"stderr","time":"2022-08-10T19:00:05.175763397Z"}
2022-08-10 21:00:06	
{"log":"[cats-effect-blocker-0] INFO software.amazon.kinesis.coordinator.DiagnosticEventLogger - Current thread pool executor state: ExecutorStateEvent(executorName=SchedulerThreadPoolExecutor, currentQueueSize=0, activeThreads=0, coreThreads=0, leasesOwned=4, largestPoolSize=5, maximumPoolSize=2147483647)\n","stream":"stderr","time":"2022-08-10T19:00:06.374170935Z"}
2022-08-10 21:00:17	
{"log":"[prefetch-cache-shardId-000000000011-0000] ERROR software.amazon.kinesis.retrieval.polling.PrefetchRecordsPublisher - data-team-final:shardId-000000000011 :  Exception thrown while fetching records from Kinesis\n","stream":"stderr","time":"2022-08-10T19:00:16.921657215Z"}
2022-08-10 21:00:17	
{"log":"software.amazon.awssdk.core.exception.SdkClientException: Unable to execute HTTP request: Java heap space\n","stream":"stderr","time":"2022-08-10T19:00:16.921838444Z"}
2022-08-10 21:00:17	
{"log":"\u0009at software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:98)\n","stream":"stderr","time":"2022-08-10T19:00:16.923075039Z"}
2022-08-10 21:00:17	
{"log":"\u0009at software.amazon.awssdk.core.exception.SdkClientException.create(SdkClientException.java:43)\n","stream":"stderr","time":"2022-08-10T19:00:16.92309685Z"}
2022-08-10 21:00:17	
{"log":"\u0009at software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:204)\n","stream":"stderr","time":"2022-08-10T19:00:16.923113788Z"}
2022-08-10 21:00:17	
{"log":"\u0009at software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:200)\n","stream":"stderr","time":"2022-08-10T19:00:16.923118079Z"}
2022-08-10 21:00:17	
{"log":"\u0009at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeRetryExecute(AsyncRetryableStage.java:179)\n","stream":"stderr","time":"2022-08-10T19:00:16.92312177Z"}
2022-08-10 21:00:17	
{"log":"\u0009at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.lambda$attemptExecute$1(AsyncRetryableStage.java:159)\n","stream":"stderr","time":"2022-08-10T19:00:16.923125432Z"}
2022-08-10 21:00:17	
{"log":"\u0009at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)\n","stream":"stderr","time":"2022-08-10T19:00:16.923129344Z"}
2022-08-10 21:00:17	
{"log":"\u0009at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)\n","stream":"stderr","time":"2022-08-10T19:00:16.923133221Z"}
2022-08-10 21:00:17	
{"log":"\u0009at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)\n","stream":"stderr","time":"2022-08-10T19:00:16.923136951Z"}
2022-08-10 21:00:17	
{"log":"\u0009at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(Unknown Source)\n","stream":"stderr","time":"2022-08-10T19:00:16.923140319Z"}
2022-08-10 21:00:17	
{"log":"\u0009at software.amazon.awssdk.utils.CompletableFutureUtils.lambda$forwardExceptionTo$0(CompletableFutureUtils.java:76)\n","stream":"stderr","time":"2022-08-10T19:00:16.923144124Z"}
2022-08-10 21:00:17	
{"log":"\u0009at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)\n","stream":"stderr","time":"2022-08-10T19:00:16.923148063Z"}
2022-08-10 21:00:17	
{"log":"\u0009at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)\n","stream":"stderr","time":"2022-08-10T19:00:16.923151542Z"}
2022-08-10 21:00:17	
{"log":"\u0009at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)\n","stream":"stderr","time":"2022-08-10T19:00:16.923155016Z"}
2022-08-10 21:00:17	
{"log":"\u0009at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(Unknown Source)\n","stream":"stderr","time":"2022-08-10T19:00:16.923158294Z"}
2022-08-10 21:00:17	
{"log":"\u0009at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$null$0(MakeAsyncHttpRequestStage.java:104)\n","stream":"stderr","time":"2022-08-10T19:00:16.923161942Z"}
2022-08-10 21:00:17	
{"log":"\u0009at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)\n","stream":"stderr","time":"2022-08-10T19:00:16.923165769Z"}
2022-08-10 21:00:17	
{"log":"\u0009at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)\n","stream":"stderr","time":"2022-08-10T19:00:16.923169184Z"}
2022-08-10 21:00:17	
{"log":"\u0009at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)\n","stream":"stderr","time":"2022-08-10T19:00:16.923173033Z"}
2022-08-10 21:00:17	
{"log":"\u0009at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(Unknown Source)\n","stream":"stderr","time":"2022-08-10T19:00:16.923177785Z"}
2022-08-10 21:00:17	
{"log":"\u0009at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage$WrappedErrorForwardingResponseHandler.onError(MakeAsyncHttpRequestStage.java:158)\n","stream":"stderr","time":"2022-08-10T19:00:16.923181497Z"}
2022-08-10 21:00:17	
{"log":"\u0009at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler$PublisherAdapter$1.lambda$notifyError$5(ResponseHandler.java:309)\n","stream":"stderr","time":"2022-08-10T19:00:16.923185117Z"}
2022-08-10 21:00:17	
{"log":"\u0009at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler.runAndLogError(ResponseHandler.java:181)\n","stream":"stderr","time":"2022-08-10T19:00:16.923188621Z"}
2022-08-10 21:00:17	
{"log":"\u0009at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler.access$500(ResponseHandler.java:71)\n","stream":"stderr","time":"2022-08-10T19:00:16.923192708Z"}
2022-08-10 21:00:17	
{"log":"\u0009at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler$PublisherAdapter$1.notifyError(ResponseHandler.java:307)\n","stream":"stderr","time":"2022-08-10T19:00:16.923199392Z"}
2022-08-10 21:00:17	
{"log":"\u0009at software.amazon.awssdk.http.nio.netty.internal.utils.ExceptionHandlingUtils.tryCatch(ExceptionHandlingUtils.java:42)\n","stream":"stderr","time":"2022-08-10T19:00:16.92320297Z"}
2022-08-10 21:00:17	
{"log":"\u0009at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler$PublisherAdapter$1.onNext(ResponseHandler.java:270)\n","stream":"stderr","time":"2022-08-10T19:00:16.923206427Z"}
2022-08-10 21:00:17	
{"log":"\u0009at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler$PublisherAdapter$1.onNext(ResponseHandler.java:221)\n","stream":"stderr","time":"2022-08-10T19:00:16.923210501Z"}
2022-08-10 21:00:17	
{"log":"\u0009at software.amazon.awssdk.http.nio.netty.internal.nrs.HandlerPublisher.publishMessage(HandlerPublisher.java:407)\n","stream":"stderr","time":"2022-08-10T19:00:16.923214162Z"}
2022-08-10 21:00:17	
{"log":"\u0009at software.amazon.awssdk.http.nio.netty.internal.nrs.HandlerPublisher.channelRead(HandlerPublisher.java:383)\n","stream":"stderr","time":"2022-08-10T19:00:16.92321799Z"}
2022-08-10 21:00:17	
{"log":"\u0009at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)\n","stream":"stderr","time":"2022-08-10T19:00:16.923221601Z"}
2022-08-10 21:00:17	
{"log":"\u0009at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)\n","stream":"stderr","time":"2022-08-10T19:00:16.92322524Z"}
2022-08-10 21:00:17	
{"log":"\u0009at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)\n","stream":"stderr","time":"2022-08-10T19:00:16.92322892Z"}
2022-08-10 21:00:17	
{"log":"\u0009at software.amazon.awssdk.http.nio.netty.internal.nrs.HttpStreamsHandler.handleReadHttpContent(HttpStreamsHandler.java:228)\n","stream":"stderr","time":"2022-08-10T19:00:16.923232405Z"}
2022-08-10 21:00:17	
{"log":"\u0009at software.amazon.awssdk.http.nio.netty.internal.nrs.HttpStreamsHandler.channelRead(HttpStreamsHandler.java:199)\n","stream":"stderr","time":"2022-08-10T19:00:16.923235885Z"}
2022-08-10 21:00:17	
{"log":"\u0009at software.amazon.awssdk.http.nio.netty.internal.nrs.HttpStreamsClientHandler.channelRead(HttpStreamsClientHandler.java:173)\n","stream":"stderr","time":"2022-08-10T19:00:16.923239492Z"}
2022-08-10 21:00:17	
{"log":"\u0009at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)\n","stream":"stderr","time":"2022-08-10T19:00:16.923243449Z"}
2022-08-10 21:00:17	
{"log":"\u0009at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)\n","stream":"stderr","time":"2022-08-10T19:00:16.923246988Z"}
2022-08-10 21:00:17	
{"log":"\u0009at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)\n","stream":"stderr","time":"2022-08-10T19:00:16.923250653Z"}
2022-08-10 21:00:17	
{"log":"\u0009at software.amazon.awssdk.http.nio.netty.internal.LastHttpContentHandler.channelRead(LastHttpContentHandler.java:43)\n","stream":"stderr","time":"2022-08-10T19:00:16.92325403Z"}
2022-08-10 21:00:17	
{"log":"\u0009at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)\n","stream":"stderr","time":"2022-08-10T19:00:16.923257495Z"}
2022-08-10 21:00:17	
{"log":"\u0009at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)\n","stream":"stderr","time":"2022-08-10T19:00:16.92326099Z"}
2022-08-10 21:00:17	
{"log":"\u0009at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)\n","stream":"stderr","time":"2022-08-10T19:00:16.923264299Z"}
2022-08-10 21:00:17	
{"log":"\u0009at software.amazon.awssdk.http.nio.netty.internal.http2.Http2ToHttpInboundAdapter.onDataRead(Http2ToHttpInboundAdapter.java:84)\n","stream":"stderr","time":"2022-08-10T19:00:16.923267895Z"}
2022-08-10 21:00:17	
{"log":"\u0009at software.amazon.awssdk.http.nio.netty.internal.http2.Http2ToHttpInboundAdapter.channelRead0(Http2ToHttpInboundAdapter.java:49)\n","stream":"stderr","time":"2022-08-10T19:00:16.92327169Z"}
2022-08-10 21:00:17	
{"log":"\u0009at software.amazon.awssdk.http.nio.netty.internal.http2.Http2ToHttpInboundAdapter.channelRead0(Http2ToHttpInboundAdapter.java:42)\n","stream":"stderr","time":"2022-08-10T19:00:16.923275434Z"}
2022-08-10 21:00:17	
{"log":"\u0009at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)\n","stream":"stderr","time":"2022-08-10T19:00:16.923281417Z"}
2022-08-10 21:00:17	
{"log":"\u0009at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)\n","stream":"stderr","time":"2022-08-10T19:00:16.923285085Z"}
2022-08-10 21:00:17	
{"log":"\u0009at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)\n","stream":"stderr","time":"2022-08-10T19:00:16.923288449Z"}
2022-08-10 21:00:17	
{"log":"\u0009at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)\n","stream":"stderr","time":"2022-08-10T19:00:16.923291794Z"}
2022-08-10 21:00:17	
{"log":"\u0009at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)\n","stream":"stderr","time":"2022-08-10T19:00:16.923295165Z"}
2022-08-10 21:00:17	
{"log":"\u0009at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)\n","stream":"stderr","time":"2022-08-10T19:00:17.341854847Z"}
2022-08-10 21:00:17	
{"log":"\u0009at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)\n","stream":"stderr","time":"2022-08-10T19:00:17.341920214Z"}
2022-08-10 21:00:17	
{"log":"\u0009at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)\n","stream":"stderr","time":"2022-08-10T19:00:17.341970646Z"}
2022-08-10 21:00:17	
{"log":"\u0009at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)\n","stream":"stderr","time":"2022-08-10T19:00:17.342050088Z"}
2022-08-10 21:00:17	
{"log":"\u0009at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)\n","stream":"stderr","time":"2022-08-10T19:00:17.769752198Z"}
1 Like

Hi @ahid_002 ,
we also ran into an Out of Memory error (heap space) within our team with a similar setup (K8s). We addressed this issue in Retry for RDB Stream Transformer - #2 by capchriscap and Snowplow team is currently investigating. But I am pretty happy that we are not the only ones facing this issue :stuck_out_tongue:
We also have the heap space error within our Enricher v3.2.x, do you face there the same issue?
CC: @enes_aldemir

@capchriscap , @enes_aldemir , Haha i’m glad i’m not the only one too. but i have resolved the issue. I have increased the pod resources to have 6G of memory for the pod. you can do it as follows

in deployment.yaml

 spec:
      containers:
        - name: {{ .Chart.Name }}
          image: {{ .Values.image.repository }}
          resources:
            requests:
              cpu: 2000m
              memory: 6Gi
              limits:
                cpu: 2000m
                memory: 6Gi

and by default pods are using only 0.25 of the allocated memory you have to set an ENV variable to make pod use full resources assigned. you can do it in 2 ways.

  1. By setting up in dockerfile ENV JAVA_TOOL_OPTIONS="-Xms6G -Xmx6G"
  2. In deployment.yaml as
spec:
      containers:
        - name: {{ .Chart.Name }}
          image: {{ .Values.image.repository }}
          env:
            - name: JAVA_TOOL_OPTIONS
              value: {{ .Values.config.streamTransformer.envValue }}
          imagePullPolicy: Always

hope it helps for the enricher too. because i did not face this issue as i was using already 6G memory for Enricher.

ah, ok. When we did this one with 8GB heap space, we ran also in the heap space error (only a little bit later). However, we are also processing 1,5k events per second.

@ahid_002 when using the old enricher v2.x we only need ~2GB RAM, therefore I am a little bit concerned that upgrading will take way more resources. Is there a way to throttle this one to the old limits @enes_aldemir ? Otherwise also upgrading the enricher would increase our costs significantly :-/

@capchriscap did you try setting up the environment variable to use the max memory?
JAVA_TOOL_OPTIONS=“-Xms2G -Xmx2G” in both enricher and transformer while keeping the memory as 2GB? And see if it happens again

Thanks for this hints, this also worked for us. We could even reduce the heap space to JAVA_TOOL_OPTIONS="-Xms1500M -Xmx1500M" with

resources:
  limits:
    cpu: 2
    memory: 3000Mi
  requests:
    cpu: 1
    memory: 3000Mi

and

input:
  bufferSize: "1"
  retrievalMode:
    type: Polling
    maxRecords: 256

This makes the RDB Stream Transformer so robust that the memory does not explode on high traffic because the RDB Transformer is so potent.

The only thing I figured out is that the memory is still increasing from day to day (since 5 days):

The increase is not significant and could be resolved in the worst case by an automated restart every x days but I think worth mentioning.

With this setup, the RDB Transformer can process 150k events/min on peak with only 2 transformers of this setup. Compared to previously 24vCores and 96GB RAM (Snowflake Loader) to now 4vCores and 8GB RAM, this is great improvement. Thanks a lot there to the Snowplow team! :slight_smile:

Regarding the Enricher, let’s see if we can also optimize the new Enricher to have the old performance :slight_smile:

@capchriscap thank you so much for the great suggestions to get it working at 1500m.
I wanted to ask about running multiple stream transformer pods. Does it need any special configurations or can we just simply set replicas 2 in deployment.yaml

These are great findings guys, thanks a lot for sharing!

@capchriscap May I ask if you tried to increase the max memory for Enrich too similar to what you did with Stream Transformer ? Did you see any difference in performance ?

@ahid_002 Only thing you need to do is using version 4.1.0 or greater. No config change is needed. In version 4.1.0, we’ve introduced a new change which adds random uuids to the names of output run folders. This change allows to us to run multiple Stream Transformer instances in parallel. You can find the detailed explanation about this change in here.

@enes_aldemir that is so great. Thank you so much. We are already using v 4.2

I’ll do.

Latest update is now that it seems like the Garbage Collector is causing the high memory load as it does not clean up properly.

This is the original heap space increase:

whereas when changing the garbage collector to a serial one, the CPU usage for garbage collection is higher (and the throughput slightly lower) but the memory stabilizes:

This is can be also seen in the total overview:

Final configuration is for us now:

env:
- name: JAVA_TOOL_OPTIONS
  value: -Xms256m -Xmx2g -XX:+UseSerialGC -XX:ActiveProcessorCount=1
image: snowplow/transformer-kinesis:4.2.1
resources:
  limits:
    cpu: "1"
    memory: 2Gi
  requests:
    cpu: 800m
    memory: 1536Mi

with 2-3 concurrent transformers of this setting we handle ~2k events/sec. When scaling to a higher number, the number of shards needs to be increased or the number of to be polled events (currently: 256, increase will also increase memory/cpu).

Hope this helps others to deal with memory issues of the RDB transformer :slight_smile:

I will write again when we have statistics about the tests with the enricher where I hope that this is similar.

Best,

Christoph

4 Likes

@capchriscap thank you so much for this. really helpful. As we have the similar use case, i wanted to know if its possible to run multiple instances of rdb-loader-redshift as well. because it seems like with 2 stream transformer running in parallel created a lot of load on single rdb loader.

@ahid_002 Running multiple instance of rdb-loader should be okay in theory but we didn’t try it in practice before. It would be great if you can share your findings with us if you try that.

Hi @ahid_002 I’m very interested in your observation that…

with 2 stream transformer running in parallel created a lot of load on single rdb loader.

How are you observing this load? Is it e.g. by cpu usage, or is it just based on the time taken to load events? If it’s time taken to load events, then this might be a warehouse problem, not a loader problem.

There is a possibility if you double the number loaders, then each single load might end up taking longer. This is because you will have loads running in parallel, so the warehouse compute will get shared between executing multiple load statements.

Or in other words… instead of increasing the number of loaders, you might first want to investigate increasing the amount of compute in your warehouse.

The loader is designed to be a very lightweight process using minimal resources. All the hard work is done by the warehouse. The loader issues statements sequentially, in the expectation that the warehouse is able to use parallelism to maximise resource usage within the warehouse’s compute cluster.

I will be fascinated to find out what you discover! Which warehouse do you use, by the way? I’m guessing from your config file that it’s Redshift?

1 Like

@istreeter thank you very much for this great explanation, you are right it seems more like a warehouse issue, because after your comment i saw the metrics and cpu usage and other stuff was almost same thought the period but time taken to load data kept on changing during working hours and night. its pretty obvious that during working hours there is a lot of load going on the system and that is why we face this delay in data loading. Thank you once again. @enes_aldemir right now i’m not going to try to multiple instances but in case i do will definitely update you

1 Like