Snowplow ETL runner failing at the step: [enrich] spark: Enrich Raw Events with spark exception

On recent runs of snowplow etl, we are encountering an exception at the step: [enrich] spark: Enrich Raw Events with spark exception.

19/08/07 02:06:04 INFO Client: Application report for application_1565140764718_0002 (state: RUNNING)
19/08/07 02:06:05 INFO Client: Application report for application_1565140764718_0002 (state: FAILED)
19/08/07 02:06:05 INFO Client: 
	 client token: N/A
	 diagnostics: Application application_1565140764718_0002 failed 1 times due to ApplicationMaster for attempt appattempt_1565140764718_0002_000001 timed out. Failing the application.
	 ApplicationMaster host: N/A
	 ApplicationMaster RPC port: -1
	 queue: default
	 start time: 1565141138494
	 final status: FAILED
	 user: hadoop
19/08/07 02:06:05 INFO Client: Deleted staging directory hdfs://ip-172-23-3-75.ap-southeast-1.compute.internal:8020/user/hadoop/.sparkStaging/application_1565140764718_0002
Exception in thread "main" org.apache.spark.SparkException: Application application_1565140764718_0002 finished with failed status
	at org.apache.spark.deploy.yarn.Client.run(Client.scala:1104)
	at org.apache.spark.deploy.yarn.Client$.main(Client.scala:1150)
	at org.apache.spark.deploy.yarn.Client.main(Client.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:755)
	at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
	at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
19/08/07 02:06:05 INFO ShutdownHookManager: Shutdown hook called
19/08/07 02:06:05 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-af6e3dc9-ec33-4a2a-b358-49402e4bc844
Command exiting with ret '1'

What is the cause of this exception and how can we resolve it?

Hi @buddhi_weragoda,

Have you checked the applications logs? In these you can find more information about the failure. Have you tried rerunning the job from enrich?

Hi @egor,

Yes tried few retries.

application logs 
51,833 / 66,014 (5 failed)
all 5 errors are  java.nio.file.FileAlreadyExistsException: ./ip_geo

19/08/07 04:27:14 INFO LzoRecordReader: input split: hdfs://ip-172-23-3-202.ap-southeast-1.compute.internal:8020/local/snowplow/raw-events/2019-08-05-49594175633372395629707031899141386142874544948782301186-49594175633372395629707031899141386142874544948782301186.lzo 0:2863
19/08/07 04:27:14 WARN BlockManager: Putting block rdd_3_20 failed due to an exception
19/08/07 04:27:14 WARN BlockManager: Block rdd_3_20 could not be removed as it was not found on disk or in memory
19/08/07 04:27:14 WARN BlockManager: Putting block rdd_3_39 failed due to an exception
19/08/07 04:27:14 WARN BlockManager: Block rdd_3_39 could not be removed as it was not found on disk or in memory
19/08/07 04:27:14 WARN BlockManager: Putting block rdd_3_48 failed due to an exception
19/08/07 04:27:14 WARN BlockManager: Block rdd_3_48 could not be removed as it was not found on disk or in memory
19/08/07 04:27:14 ERROR Executor: Exception in task 20.0 in stage 0.0 (TID 24)
java.nio.file.FileAlreadyExistsException: ./ip_geo
	at sun.nio.fs.UnixException.translateToIOException(UnixException.java:88)
	at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
	at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
	at sun.nio.fs.UnixFileSystemProvider.createSymbolicLink(UnixFileSystemProvider.java:457)
	at java.nio.file.Files.createSymbolicLink(Files.java:1043)
	at com.snowplowanalytics.snowplow.enrich.spark.EnrichJob$$anonfun$createSymbolicLinks$2.apply(EnrichJob.scala:137)
	at com.snowplowanalytics.snowplow.enrich.spark.EnrichJob$$anonfun$createSymbolicLinks$2.apply(EnrichJob.scala:134)
	at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
	at scala.collection.immutable.List.foreach(List.scala:381)
	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
	at com.snowplowanalytics.snowplow.enrich.spark.EnrichJob$.createSymbolicLinks(EnrichJob.scala:134)
	at com.snowplowanalytics.snowplow.enrich.spark.EnrichJob$$anonfun$4.apply(EnrichJob.scala:189)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1038)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1029)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:969)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1029)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:760)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
19/08/07 04:27:14 ERROR Executor: Exception in task 48.0 in stage 0.0 (TID 27)
java.nio.file.FileAlreadyExistsException: ./ip_geo
	at sun.nio.fs.UnixException.translateToIOException(UnixException.java:88)
	at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
	at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
	at sun.nio.fs.UnixFileSystemProvider.createSymbolicLink(UnixFileSystemProvider.java:457)
	at java.nio.file.Files.createSymbolicLink(Files.java:1043)
	at com.snowplowanalytics.snowplow.enrich.spark.EnrichJob$$anonfun$createSymbolicLinks$2.apply(EnrichJob.scala:137)
	at com.snowplowanalytics.snowplow.enrich.spark.EnrichJob$$anonfun$createSymbolicLinks$2.apply(EnrichJob.scala:134)
	at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
	at scala.collection.immutable.List.foreach(List.scala:381)
	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
	at com.snowplowanalytics.snowplow.enrich.spark.EnrichJob$.createSymbolicLinks(EnrichJob.scala:134)
	at com.snowplowanalytics.snowplow.enrich.spark.EnrichJob$$anonfun$4.apply(EnrichJob.scala:189)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1038)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1029)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:969)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1029)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:760)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
19/08/07 04:27:14 ERROR Executor: Exception in task 39.0 in stage 0.0 (TID 26)
java.nio.file.FileAlreadyExistsException: ./ip_geo
	at sun.nio.fs.UnixException.translateToIOException(UnixException.java:88)
	at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
	at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
	at sun.nio.fs.UnixFileSystemProvider.createSymbolicLink(UnixFileSystemProvider.java:457)
	at java.nio.file.Files.createSymbolicLink(Files.java:1043)
	at com.snowplowanalytics.snowplow.enrich.spark.EnrichJob$$anonfun$createSymbolicLinks$2.apply(EnrichJob.scala:137)
	at com.snowplowanalytics.snowplow.enrich.spark.EnrichJob$$anonfun$createSymbolicLinks$2.apply(EnrichJob.scala:134)
	at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
	at scala.collection.immutable.List.foreach(List.scala:381)
	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
	at com.snowplowanalytics.snowplow.enrich.spark.EnrichJob$.createSymbolicLinks(EnrichJob.scala:134)
	at com.snowplowanalytics.snowplow.enrich.spark.EnrichJob$$anonfun$4.apply(EnrichJob.scala:189)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1038)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1029)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:969)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1029)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:760)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
19/08/07 04:27:14 INFO CoarseGrainedExecutorBackend: Got assigned task 38

I ran below command to start because it’s failing enrich step
snowplow-emr-etl-runner run -c config.yml -r resolver.json --targets targets --enrichments enrichments -f enrich

@egor

image

image


@buddhi_weragoda, what is the volume of your raw lzo files in your processing bucket (total size of the files)?

HI @ihor, @egor

Yes this is due to the total sizes of the files and my cluster size not enough. I changed the instance type to m4.2xlarge it’s fixed my issue. Thanks a lot for everyone’s replies. This setup faster than 6 times. But needs to check cost :frowning:

related posts :

@buddhi_weragoda, while using more powerful instances speeded up the ETL process it might still be possible to make further performance improvements and even downscale your cluster. It could be achieved by tweaking your Spark configuration to utilize the resources to their maximum.

Your pipeline configuration screenshot indicates you are using the default (basic) Spark configuration. It is OK for low volume data. It looks like in your case you would be better off with some extra Spark settings. You should be able to find some guidance in this forum.

You mentioned switching to m4.2xlarge instances, which is a generic type. Spark jobs are memory hungry and r4 instances are a better option here.