But when I run the Spark app, I got the below error
Exception in thread "main" java.lang.NoClassDefFoundError: scalaz/Validation_
at SparkEnrich$.main(sparkenrich.scala:12)_
at SparkEnrich.main(sparkenrich.scala)_
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)_
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)_
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)_
at java.lang.reflect.Method.invoke(Method.java:606)_
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)_
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)_
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)_
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)_
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)_
Caused by: java.lang.ClassNotFoundException: scalaz.Validation_
I don’t see much documentation around some implementing the SDK. Please can someone help to fix this or provide sample code to use this in a Spark Scala app.
My end goal is to transform the tab delimited data from enrich job in a JSON format.
Sorry to hear you are having problems! It looks like at least one dependency (scalaz) is missing when you run your Spark Scala app. How are you running your app? Are you creating a fat jar which bundles all dependencies?
So I managed to pull in kinesis stream and even able to store the raw beacon from the spark streaming to S3 path.
But now when i apply the transform function of the Analytic SDK it give empty response, ideally it should return a json response.
import com.snowplowanalytics.snowplow.analytics.scalasdk.json.EventTransformer
val kinesisStreams = (0 until numShards).map { i =>
KinesisUtils.createStream(ssc, streamName, endpointUrl, Seconds(10),
InitialPositionInStream.TRIM_HORIZON, StorageLevel.MEMORY_AND_DISK_2)
}
// Union all the streams
val unionStream = ssc.union(kinesisStreams)
// convert each record of type Byte to string
val words = unionStream.flatMap(new String(_).split("\\s+"))
val prefix = "s3://cdpsnowplow/2016-07-25/"
words.foreachRDD(wordRdd => {
if (!wordRdd.isEmpty) {
val events =wordRdd.map(line => EventTransformer.transform(line)).flatMap(_.toOption)
events.saveAsTextFile(prefix)
}
})
// Start the streaming context and await termination
ssc.start()
ssc.awaitTermination()
Scala’s not my thing but it could mean the transform failed for some reason. In my experience transform() returns nothing on lines that failed to parse.
If that’s the case then: EventTransformer.transform(line)).filter(_.isFailure).flatMap(_.toOption).size() should return greater than 0.
edit - above should be:EventTransformer.transform(line)).filter(_.isFailure).size()
@Shin is right - most likely is that your lines are ending up on the Failure[] side of the transformation output (rather than as a Success[String]. You need to dig into the failure messages to find out what is going wrong…
Failure(NonEmptyList(Expected 131 fields, received 1 fields. This may be caused by attempting to use this SDK version on an older or newer version of Snowplow enriched events.))
Hey @PuneetBabbar - it doesn’t sound like you are feeding the transformer with Snowplow enriched events. The strings you are providing to the transformer have no tabs in them, which does not fit with the Snowplow enriched event at all.
Could you draw out your Kinesis pipeline using ASCII art?
I haven’t quite figured out what to do with bad rows coming from the enriched stream (we’re not in production yet) but for debugging I’m using one of the many “Kinesis tail” utilities to watch the bad event stream and dump them to console.