On recent runs of snowplow etl, we are encountering an exception at the step: [enrich] spark: Enrich Raw Events with spark exception.
19/08/07 02:06:04 INFO Client: Application report for application_1565140764718_0002 (state: RUNNING)
19/08/07 02:06:05 INFO Client: Application report for application_1565140764718_0002 (state: FAILED)
19/08/07 02:06:05 INFO Client:
client token: N/A
diagnostics: Application application_1565140764718_0002 failed 1 times due to ApplicationMaster for attempt appattempt_1565140764718_0002_000001 timed out. Failing the application.
ApplicationMaster host: N/A
ApplicationMaster RPC port: -1
queue: default
start time: 1565141138494
final status: FAILED
user: hadoop
19/08/07 02:06:05 INFO Client: Deleted staging directory hdfs://ip-172-23-3-75.ap-southeast-1.compute.internal:8020/user/hadoop/.sparkStaging/application_1565140764718_0002
Exception in thread "main" org.apache.spark.SparkException: Application application_1565140764718_0002 finished with failed status
at org.apache.spark.deploy.yarn.Client.run(Client.scala:1104)
at org.apache.spark.deploy.yarn.Client$.main(Client.scala:1150)
at org.apache.spark.deploy.yarn.Client.main(Client.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:755)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
19/08/07 02:06:05 INFO ShutdownHookManager: Shutdown hook called
19/08/07 02:06:05 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-af6e3dc9-ec33-4a2a-b358-49402e4bc844
Command exiting with ret '1'
What is the cause of this exception and how can we resolve it?
application logs
51,833 / 66,014 (5 failed)
all 5 errors are java.nio.file.FileAlreadyExistsException: ./ip_geo
19/08/07 04:27:14 INFO LzoRecordReader: input split: hdfs://ip-172-23-3-202.ap-southeast-1.compute.internal:8020/local/snowplow/raw-events/2019-08-05-49594175633372395629707031899141386142874544948782301186-49594175633372395629707031899141386142874544948782301186.lzo 0:2863
19/08/07 04:27:14 WARN BlockManager: Putting block rdd_3_20 failed due to an exception
19/08/07 04:27:14 WARN BlockManager: Block rdd_3_20 could not be removed as it was not found on disk or in memory
19/08/07 04:27:14 WARN BlockManager: Putting block rdd_3_39 failed due to an exception
19/08/07 04:27:14 WARN BlockManager: Block rdd_3_39 could not be removed as it was not found on disk or in memory
19/08/07 04:27:14 WARN BlockManager: Putting block rdd_3_48 failed due to an exception
19/08/07 04:27:14 WARN BlockManager: Block rdd_3_48 could not be removed as it was not found on disk or in memory
19/08/07 04:27:14 ERROR Executor: Exception in task 20.0 in stage 0.0 (TID 24)
java.nio.file.FileAlreadyExistsException: ./ip_geo
at sun.nio.fs.UnixException.translateToIOException(UnixException.java:88)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
at sun.nio.fs.UnixFileSystemProvider.createSymbolicLink(UnixFileSystemProvider.java:457)
at java.nio.file.Files.createSymbolicLink(Files.java:1043)
at com.snowplowanalytics.snowplow.enrich.spark.EnrichJob$$anonfun$createSymbolicLinks$2.apply(EnrichJob.scala:137)
at com.snowplowanalytics.snowplow.enrich.spark.EnrichJob$$anonfun$createSymbolicLinks$2.apply(EnrichJob.scala:134)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at com.snowplowanalytics.snowplow.enrich.spark.EnrichJob$.createSymbolicLinks(EnrichJob.scala:134)
at com.snowplowanalytics.snowplow.enrich.spark.EnrichJob$$anonfun$4.apply(EnrichJob.scala:189)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1038)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1029)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:969)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1029)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:760)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
19/08/07 04:27:14 ERROR Executor: Exception in task 48.0 in stage 0.0 (TID 27)
java.nio.file.FileAlreadyExistsException: ./ip_geo
at sun.nio.fs.UnixException.translateToIOException(UnixException.java:88)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
at sun.nio.fs.UnixFileSystemProvider.createSymbolicLink(UnixFileSystemProvider.java:457)
at java.nio.file.Files.createSymbolicLink(Files.java:1043)
at com.snowplowanalytics.snowplow.enrich.spark.EnrichJob$$anonfun$createSymbolicLinks$2.apply(EnrichJob.scala:137)
at com.snowplowanalytics.snowplow.enrich.spark.EnrichJob$$anonfun$createSymbolicLinks$2.apply(EnrichJob.scala:134)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at com.snowplowanalytics.snowplow.enrich.spark.EnrichJob$.createSymbolicLinks(EnrichJob.scala:134)
at com.snowplowanalytics.snowplow.enrich.spark.EnrichJob$$anonfun$4.apply(EnrichJob.scala:189)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1038)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1029)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:969)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1029)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:760)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
19/08/07 04:27:14 ERROR Executor: Exception in task 39.0 in stage 0.0 (TID 26)
java.nio.file.FileAlreadyExistsException: ./ip_geo
at sun.nio.fs.UnixException.translateToIOException(UnixException.java:88)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
at sun.nio.fs.UnixFileSystemProvider.createSymbolicLink(UnixFileSystemProvider.java:457)
at java.nio.file.Files.createSymbolicLink(Files.java:1043)
at com.snowplowanalytics.snowplow.enrich.spark.EnrichJob$$anonfun$createSymbolicLinks$2.apply(EnrichJob.scala:137)
at com.snowplowanalytics.snowplow.enrich.spark.EnrichJob$$anonfun$createSymbolicLinks$2.apply(EnrichJob.scala:134)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at com.snowplowanalytics.snowplow.enrich.spark.EnrichJob$.createSymbolicLinks(EnrichJob.scala:134)
at com.snowplowanalytics.snowplow.enrich.spark.EnrichJob$$anonfun$4.apply(EnrichJob.scala:189)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1038)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1029)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:969)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1029)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:760)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
19/08/07 04:27:14 INFO CoarseGrainedExecutorBackend: Got assigned task 38
I ran below command to start because it’s failing enrich step
snowplow-emr-etl-runner run -c config.yml -r resolver.json --targets targets --enrichments enrichments -f enrich
Yes this is due to the total sizes of the files and my cluster size not enough. I changed the instance type to m4.2xlarge it’s fixed my issue. Thanks a lot for everyone’s replies. This setup faster than 6 times. But needs to check cost
@buddhi_weragoda, while using more powerful instances speeded up the ETL process it might still be possible to make further performance improvements and even downscale your cluster. It could be achieved by tweaking your Spark configuration to utilize the resources to their maximum.
Your pipeline configuration screenshot indicates you are using the default (basic) Spark configuration. It is OK for low volume data. It looks like in your case you would be better off with some extra Spark settings. You should be able to find some guidance in this forum.
You mentioned switching to m4.2xlarge instances, which is a generic type. Spark jobs are memory hungry and r4 instances are a better option here.