Hey!
Since a couple of days, I’m getting an issue I’m not able to resolve for now. My EMR job fails during the shredding step (that already takes significantly longer than the enrichment step).
The issues I’m seeing in the logs relate to executors being lost because of closed connections or timeouts.
Examples I can find:
18/02/03 14:40:03 WARN TransportChannelHandler: Exception in connection from /172.31.16.113:54972
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:221)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:899)
at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:275)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:652)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:575)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:489)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:451)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Thread.java:748)
18/02/03 14:40:03 INFO YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 5.
18/02/03 14:40:03 INFO DAGScheduler: Executor lost: 5 (epoch 1)
18/02/03 14:40:03 INFO BlockManagerMasterEndpoint: Trying to remove executor 5 from BlockManagerMaster.
18/02/03 14:40:03 INFO BlockManagerMaster: Removed 5 successfully in removeExecutor
18/02/03 14:40:03 ERROR YarnClusterScheduler: Lost executor 5 on ip-172-31-16-113.ec2.internal: Container container_1517648233216_0006_01_041190 exited from explicit termination request.
...
Or another one:
18/02/05 16:01:00 INFO TransportClientFactory: Successfully created connection to ip-172-31-26-187.ec2.internal/172.31.26.187:7337 after 1 ms (0 ms spent in bootstraps)
18/02/05 16:01:00 INFO BlockManager: Initialized BlockManager: BlockManagerId(13, ip-172-31-26-187.ec2.internal, 44825, None)
18/02/05 16:01:00 INFO CoarseGrainedExecutorBackend: Got assigned task 2584
18/02/05 16:01:00 INFO CoarseGrainedExecutorBackend: Got assigned task 2585
18/02/05 16:01:00 INFO CoarseGrainedExecutorBackend: Got assigned task 2586
18/02/05 16:01:00 INFO CoarseGrainedExecutorBackend: Got assigned task 2587
18/02/05 16:01:00 INFO CoarseGrainedExecutorBackend: Got assigned task 2588
18/02/05 16:01:00 INFO CoarseGrainedExecutorBackend: Got assigned task 2589
18/02/05 16:01:00 INFO CoarseGrainedExecutorBackend: Got assigned task 2590
18/02/05 16:01:00 INFO CoarseGrainedExecutorBackend: Got assigned task 2591
18/02/05 16:01:00 INFO CoarseGrainedExecutorBackend: Got assigned task 2592
18/02/05 16:01:00 INFO Executor: Running task 380.2 in stage 3.0 (TID 2591)
18/02/05 16:01:00 INFO Executor: Running task 2.3 in stage 3.0 (TID 2584)
18/02/05 16:01:00 INFO Executor: Running task 117.1 in stage 3.0 (TID 2586)
18/02/05 16:01:00 INFO Executor: Running task 74.3 in stage 3.0 (TID 2585)
org.apache.spark.SparkException: Exception thrown in awaitResult
at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:77)
at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:75)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)
at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:538)
at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:567)
at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:567)
at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:567)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1951)
at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:567)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Connection from /172.31.19.107:44075 closed
at org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:128)
at org.apache.spark.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:109)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:251)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:230)
at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
at io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:257)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:251)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:230)
at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:251)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:230)
at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
at org.apache.spark.network.util.TransportFrameDecoder.channelInactive(TransportFrameDecoder.java:182)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:251)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:230)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1289)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:251)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:893)
at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:691)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:408)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:455)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
... 1 more
18/02/05 16:09:51 INFO Executor: Executor killed task 116.1 in stage 3.0 (TID 2643)
18/02/05 16:09:51 INFO Executor: Executor killed task 8.3 in stage 3.0 (TID 2622)
18/02/05 16:09:51 INFO Executor: Executor killed task 89.3 in stage 3.0 (TID 2635)
18/02/05 16:09:51 INFO Executor: Executor killed task 105.2 in stage 3.0 (TID 2641)
18/02/05 16:09:51 INFO Executor: Executor killed task 16.3 in stage 3.0 (TID 2590)
....
I’ve been trying with different instance types and counts, for the whole pipeline and only on the shredding step (resume-from=shred) and it still fails. I’m running R92. I’d say that my enriched volume is about 100G (800Mb per file in average, 1.5Gb max).
Did anyone encounter this issue already? Does anyone have a hint at a config setting I could try to change ?
PS: It looks like the issue also shows up in successful jobs. But when the retries fail as well, it triggers timeouts and crashes the job (at least it’s what I think is happening).
Thanks for your help!