Error s3 loader 2.2.2

Hi all. There was a problem with s3 loader 2.2.2 that works with enriched data. When applying a load of 12k rps, we encounter an error:

xception in thread "RecordProcessor-0055" java.lang.OutOfMemoryError: Java heap space
[2022-09-14 09:21:52.571+0000] [LeaseCoordinator-0001] [ERROR] com.amazonaws.services.kinesis.leases.impl.LeaseCoordinator - Throwable encountered in lease taking thread
java.lang.OutOfMemoryError: Java heap space
[2022-09-13 10:53:21.548+0000] [RecordProcessor-0044] [ERROR] com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitializeTask - Caught exception:
java.lang.IllegalStateException: Connection pool shut down
	at org.apache.http.util.Asserts.check(Asserts.java:34)
	at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.requestConnection(PoolingHttpClientConnectionManager.java:269)
	at jdk.internal.reflect.GeneratedMethodAccessor27.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:567)
	at com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76)
	at com.amazonaws.http.conn.$Proxy0.requestConnection(Unknown Source)
	at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:176)
	at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
	at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
	at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
	at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
	at com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1343)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1154)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:811)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:779)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:753)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:713)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:695)
	at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:559)
	at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:539)
	at com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2980)
	at com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2947)
	at com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2936)
	at com.amazonaws.services.kinesis.AmazonKinesisClient.executeGetShardIterator(AmazonKinesisClient.java:1436)
	at com.amazonaws.services.kinesis.AmazonKinesisClient.getShardIterator(AmazonKinesisClient.java:1405)
	at com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy.getIterator(KinesisProxy.java:574)
	at com.amazonaws.services.kinesis.clientlibrary.proxies.MetricsCollectingKinesisProxyDecorator.getIterator(MetricsCollectingKinesisProxyDecorator.java:125)
	at com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisDataFetcher.getIterator(KinesisDataFetcher.java:224)
	at com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisDataFetcher.advanceIteratorTo(KinesisDataFetcher.java:200)
	at com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisDataFetcher.initialize(KinesisDataFetcher.java:172)
	at com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitializeTask.call(InitializeTask.java:94)
	at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:49)
	at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:24)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:835)

Hi @Edward_Kim this is a known limitation of the S3 Loader which can only be resolved by either horizontally or vertically scaling the loader (or if possible assigning more heap to the application).

See: Eliminate possibility of OutOfMemory errors from scaling Kinesis Shards · Issue #250 · snowplow/snowplow-s3-loader · GitHub

As the loader has the same byte buffer per shard you need to ensure that given N shards and X bytes per shard buffer than the cumulative count cannot exceed the heap assigned to the application → if it does exceed this will happen.

Hope this helps!

1 Like

How to configure for horizontal scaling?

Hi @Shalini_Balakrishnan you would increase the number of workers from whatever it is currently to some higher number which will spread the load of the Kinesis shards more evenly. If you have a stream of say 64 shards you might assign 4 workers to it to ensure that they all only see 16 shards worth of work.

You can setup some auto-scaling with CPU based scaling as well but generally you will want to raise the base number of works to something more stable as the S3 loader really doesn’t need a lot of resource to work so might not scale reliably in this way.

1 Like