Excutors lost and disconnecting in EMR

Hey!

Since a couple of days, I’m getting an issue I’m not able to resolve for now. My EMR job fails during the shredding step (that already takes significantly longer than the enrichment step).

The issues I’m seeing in the logs relate to executors being lost because of closed connections or timeouts.

Examples I can find:

18/02/03 14:40:03 WARN TransportChannelHandler: Exception in connection from /172.31.16.113:54972
java.io.IOException: Connection reset by peer
	at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
	at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
	at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
	at sun.nio.ch.IOUtil.read(IOUtil.java:192)
	at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
	at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:221)
	at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:899)
	at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:275)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:652)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:575)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:489)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:451)
	at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
	at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
	at java.lang.Thread.run(Thread.java:748)
18/02/03 14:40:03 INFO YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 5.
18/02/03 14:40:03 INFO DAGScheduler: Executor lost: 5 (epoch 1)
18/02/03 14:40:03 INFO BlockManagerMasterEndpoint: Trying to remove executor 5 from BlockManagerMaster.
18/02/03 14:40:03 INFO BlockManagerMaster: Removed 5 successfully in removeExecutor
18/02/03 14:40:03 ERROR YarnClusterScheduler: Lost executor 5 on ip-172-31-16-113.ec2.internal: Container container_1517648233216_0006_01_041190 exited from explicit termination request.
...

Or another one:

18/02/05 16:01:00 INFO TransportClientFactory: Successfully created connection to ip-172-31-26-187.ec2.internal/172.31.26.187:7337 after 1 ms (0 ms spent in bootstraps)
18/02/05 16:01:00 INFO BlockManager: Initialized BlockManager: BlockManagerId(13, ip-172-31-26-187.ec2.internal, 44825, None)
18/02/05 16:01:00 INFO CoarseGrainedExecutorBackend: Got assigned task 2584
18/02/05 16:01:00 INFO CoarseGrainedExecutorBackend: Got assigned task 2585
18/02/05 16:01:00 INFO CoarseGrainedExecutorBackend: Got assigned task 2586
18/02/05 16:01:00 INFO CoarseGrainedExecutorBackend: Got assigned task 2587
18/02/05 16:01:00 INFO CoarseGrainedExecutorBackend: Got assigned task 2588
18/02/05 16:01:00 INFO CoarseGrainedExecutorBackend: Got assigned task 2589
18/02/05 16:01:00 INFO CoarseGrainedExecutorBackend: Got assigned task 2590
18/02/05 16:01:00 INFO CoarseGrainedExecutorBackend: Got assigned task 2591
18/02/05 16:01:00 INFO CoarseGrainedExecutorBackend: Got assigned task 2592
18/02/05 16:01:00 INFO Executor: Running task 380.2 in stage 3.0 (TID 2591)
18/02/05 16:01:00 INFO Executor: Running task 2.3 in stage 3.0 (TID 2584)
18/02/05 16:01:00 INFO Executor: Running task 117.1 in stage 3.0 (TID 2586)
18/02/05 16:01:00 INFO Executor: Running task 74.3 in stage 3.0 (TID 2585)
org.apache.spark.SparkException: Exception thrown in awaitResult
	at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:77)
	at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:75)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
	at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
	at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
	at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)
	at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
	at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:538)
	at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:567)
	at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:567)
	at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:567)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1951)
	at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:567)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Connection from /172.31.19.107:44075 closed
	at org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:128)
	at org.apache.spark.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:109)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:251)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:230)
	at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
	at io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:257)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:251)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:230)
	at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:251)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:230)
	at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
	at org.apache.spark.network.util.TransportFrameDecoder.channelInactive(TransportFrameDecoder.java:182)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:251)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:230)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1289)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:251)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
	at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:893)
	at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:691)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:408)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:455)
	at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
	at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
	... 1 more
18/02/05 16:09:51 INFO Executor: Executor killed task 116.1 in stage 3.0 (TID 2643)
18/02/05 16:09:51 INFO Executor: Executor killed task 8.3 in stage 3.0 (TID 2622)
18/02/05 16:09:51 INFO Executor: Executor killed task 89.3 in stage 3.0 (TID 2635)
18/02/05 16:09:51 INFO Executor: Executor killed task 105.2 in stage 3.0 (TID 2641)
18/02/05 16:09:51 INFO Executor: Executor killed task 16.3 in stage 3.0 (TID 2590)
....

I’ve been trying with different instance types and counts, for the whole pipeline and only on the shredding step (resume-from=shred) and it still fails. I’m running R92. I’d say that my enriched volume is about 100G (800Mb per file in average, 1.5Gb max).

Did anyone encounter this issue already? Does anyone have a hint at a config setting I could try to change ?

PS: It looks like the issue also shows up in successful jobs. But when the retries fail as well, it triggers timeouts and crashes the job (at least it’s what I think is happening).

Thanks for your help!

Hi @Timmycarbone.

Is it the size of uncompressed files? Do you use any Spark configuration options?

If it’s an uncompressed size you can try the following configuration:

      master_instance_type: "m4.xlarge"
      core_instance_count: 3
      core_instance_type: "r4.8xlarge"
      core_instance_ebs:        
        volume_size: 320        
        volume_type: "gp2"
        ebs_optimized: true     
    ...
    bootstrap_failure_tries: 3
    configuration:
      yarn-site:
        yarn.nodemanager.vmem-check-enabled: "false"
        yarn.nodemanager.resource.memory-mb: "245760"
        yarn.scheduler.maximum-allocation-mb: "245760"
      spark:
        maximizeResourceAllocation: "false"
      spark-defaults:
        spark.dynamicAllocation.enabled: "false"
        spark.executor.instances: "44"
        spark.yarn.executor.memoryOverhead: "3072"
        spark.executor.memory: "13G" 
        spark.executor.cores: "2"
        spark.yarn.driver.memoryOverhead: "3072"
        spark.driver.memory: "13G"
        spark.driver.cores: "2"
        spark.default.parallelism: "352"

and in order, if you use Clojure collector you can upgrade the pipeline to R97 as it gives a significant speedup in the performance of the Spark Enrich job (link)

Hope this helps.

3 Likes

Thanks a lot for the advised configuration. I’m trying it out tomorrow.
Re-running somehow worked this time …

I’ve upgraded to the latest version as well.

I don’t use any Spark configuration options (except the maximizeResourceAllocation that was set to “true”), so this helps a lot, thanks again!

I used your advised settings and switched my environment to use VPC and it’s pure gold now. It also went down from 4-5 hours runs to 1h. Thanks a lot !

I have another issue now but it’s unrelated. Will search for a solution or post another topic.

Thanks thanks thanks!

Hi @Timmycarbone,

Good news!

You can also find this article quite useful for Spark tuning. It contains a description of the process and that’s more important - a spreadsheet which can be used in case if you decide to optimize or change the configuration.

Would like to weigh in to upvote this issue.

I encountered same issue as threadstarter when processing 4.7GB logs using 3 m4.large instances (1 master, 3 cores). Job is failing at shredding stage.

YARN Executor is losing due to memory limit issue. That seems to lead to Connection Reset by peer issue and after several incident (mine takes about 4), Spark aborted the job due to too many task failures

Here is the complete error message if anybody is interested

org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 6.0 failed 4 times, most recent failure: Lost task 2.3 in stage 6.0 (TID 490, ip-172-31-21-201.ap-southeast-1.compute.internal, executor 32): ExecutorLostFailure (executor 32 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 5.1 GB of 5.1 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

Enrichment process took 39 minutes and Shredding process takes more than 1 hour 20 minutes.

What works for me is to increase instance to m4.xlarge. This reduce shredding process to about 40 minutes. I haven’t tried doubling instance count to see if there is any difference or maybe improvement. Maybe anybody can suggest if doubling instance count is better than upgrading instance type? Cost is the same I believe since 6 x m4.large has same cost as 3 x m4.xlarge.

@aditya, it is important to adjust Spark configuration according to the number and instance type in the EMR cluster. Just changing the instance type might not be an efficient use of the cluster.

Noted on that. TBH I didn’t know much yet about Spark cluster to experiment with the different configuration. Changing instance type is the quickest way to solve my problem at the moment.

I am wondering what is standard configuration of spark that you guys use for EMR ETL?

Or alternatively is this below a good reference to it?
https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-configure.html

@aditya, a good guide we ourselves refer to was mentioned above in this thread, here it is again - http://c2fo.io/c2fo/spark/aws/emr/2016/07/06/apache-spark-config-cheatsheet.

1 Like