Serialization error while using EtlPipeline.processEvents in Spark

Hi,

I’m trying to use EtlPipeline.processEvents method in Spark streaming in order to enrich events in realtime. However I got the “java.io.NotSerializableException” error. It seems that the serialization problem is related to scalaz library because according to the serialization stack trace I can see that the not serializable field is val e: Validated[Option[Validated[NonEmptyList[ValidatedEnrichedEvent]]]] which is an instance of scalaz.Failure class.
I tried to use Kryo wrapper in Spark in order to serialize the whole EtlPipeline.processEvents method using the approach given here, but unfortunately I got the same error.
Can anyone help me?

1 Like

Hello @fnozarian,

Could you please publish whole stack trace? I think problem shouldn’t be in scalaz.Validation because Success and Failure are case classes and Scala case classes are perfectly serializable by design. Also, why e is instance of Failure, what exact error there?

UPD: Problem can be not in case classes, but in whole sealed hierarchy. Before scalaz 7.1 they used plain sealed trait Validation[+E, +A] and now it is sealed abstract class Validation[+E, +A] extends Product with Serializable. On the other hand we’re using 7.0 Validations in Schema Guru Spark Job and it works fine.

Thank you very much for your prompt reply!
Here is my stack strace:

16/11/29 08:56:56 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
    java.io.NotSerializableException: scalaz.NonEmptyListFunctions$$anon$4
    Serialization stack:
    	- object not serializable (class: scalaz.NonEmptyListFunctions$$anon$4, value: NonEmptyList(Could not extract geo-location from IP address [172.16.132.211]: [java.lang.ArrayIndexOutOfBoundsException: 9625938]))
    	- field (class: scalaz.Failure, name: e, type: class java.lang.Object)
    	- object (class scalaz.Failure, Failure(NonEmptyList(Could not extract geo-location from IP address [172.16.132.211]: [java.lang.ArrayIndexOutOfBoundsException: 9625938])))
    	- writeObject data (class: scala.collection.immutable.$colon$colon)
    	- object (class scala.collection.immutable.$colon$colon, List(Failure(NonEmptyList(Could not extract geo-location from IP address [172.16.132.211]: [java.lang.ArrayIndexOutOfBoundsException: 9625938]))))
    	- field (class: scala.Tuple2, name: _2, type: class java.lang.Object)
    	- object (class scala.Tuple2, (442547a8-009b-47fe-af8c-7dea7d790f20,List(Failure(NonEmptyList(Could not extract geo-location from IP address [172.16.132.211]: [java.lang.ArrayIndexOutOfBoundsException: 9625938])))))
    	- element of array (index: 0)
    	- array (class [Lscala.Tuple2;, size 6)
    	at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
    	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:265)
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    	at java.lang.Thread.run(Thread.java:745)

@fnozarian seems problem is in scalaz.NonEmptyList which is indeed not serializable. At least it was until some moment.

The only thing I can advice here is to try to include in your build latest compatible scalaz which is 7.0.9. May be they fixed it in that version.

Also it would be nice to be sure it happens not only for failures, so could you please also try to disable IP Lookup Enrichment, because It throws an exception because it cannot determine position of local 172.* IP address.

UPD: Wrong artifact. Right one: https://mvnrepository.com/artifact/org.scalaz/scalaz-core_2.10/7.0.9

I added the latest scalaz core (7.0.9) and disabled the IP Lookup Enrichment, but again it doesn’t work. I got the serialization error for scalaz.Success this time:

16/11/29 11:18:39 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.io.NotSerializableException: com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent
Serialization stack:
	- object not serializable (class: com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent, value: com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent@7dc0d500)
	- field (class: scalaz.Success, name: a, type: class java.lang.Object)
	- object (class scalaz.Success, Success(com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent@7dc0d500))
	- writeObject data (class: scala.collection.immutable.$colon$colon)
	- object (class scala.collection.immutable.$colon$colon, List(Success(com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent@7dc0d500)))
	- field (class: scala.Tuple2, name: _2, type: class java.lang.Object)
	- object (class scala.Tuple2, (442547a8-009b-47fe-af8c-7dea7d790f20,List(Success(com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent@7dc0d500))))
	- element of array (index: 0)
	- array (class [Lscala.Tuple2;, size 6)

UPD: @anton seems that the EnrichedEvent should be Serialized!