I had an issue with my schema and ended up having bad rows show up in shredded/bad. I am following the instructions here https://snowplowanalytics.com/blog/2019/01/16/snowplow-event-recovery-0.1.0-released/ to recover these rows for re processing. I have been able to successfully set up the EMR instance and run the jar.
An example of the errors on one of the bad rows can be seen here
When i attempt start the spark job with the following command on the EMR cluster
spark-submit --deploy-mode cluster --class com.snowplowanalytics.snowplow.event.recovery.Main --master yarn --conf spark.hadoop.validateOutputSpecs=false s3://recover-jars/snowplow-event-recovery-spark-0.1.1.jar --input s3://bad-shredded-test --output s3://test-recover-2/ --config ew0KICAic2NoZW1hIjogImlnbHU6Y29tLnNub3dwbG93YW5hbHl0aWNzLnNub3dwbG93L3JlY292ZXJpZXMvanNvbnNjaGVtYS8xLTAtMCIsDQogICJkYXRhIjogWw0KICAgIHsNCiAgICAgICJuYW1lIjogIlBhc3NUaHJvdWdoIiwNCiAgICAgICJlcnJvciI6ICJvYmplY3QgaGFzIG1pc3NpbmcgcmVxdWlyZWQgcHJvcGVydGllcyAoW1wiaXNBZG1pblwiXSkiDQogICAgfQ0KICBdDQp9DQo=
The base64 encoded recovery config is as followed
{
"schema": "iglu:com.snowplowanalytics.snowplow/recoveries/jsonschema/1-0-0",
"data": [
{
"name": "PassThrough",
"error": "object has missing required properties ([\"isAdmin\"])"
}
]
}
The application fails and when looking at the logs i see java.io.IOException: File already exists:s3://test-recover-2/part-r-00000.lzo
I believe the initial cause is this though
19/08/20 04:21:19 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, ip-172-31-29-145.us-east-2.compute.internal, executor 1): org.apache.spark.SparkException: Task failed while writing rows
at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:151)
at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:79)
at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:78)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: Illegal base64 character 2d
at java.util.Base64$Decoder.decode0(Base64.java:714)
at java.util.Base64$Decoder.decode(Base64.java:526)
at java.util.Base64$Decoder.decode(Base64.java:549)
at com.snowplowanalytics.snowplow.event.recovery.utils$$anonfun$1.apply(utils.scala:34)
at com.snowplowanalytics.snowplow.event.recovery.utils$$anonfun$1.apply(utils.scala:33)
at scala.Function1$$anonfun$andThen$1.apply(Function1.scala:52)
at com.snowplowanalytics.snowplow.event.recovery.model$BadRow.mutateCollectorPayload(model.scala:56)
at com.snowplowanalytics.snowplow.event.recovery.RecoveryJob$$anonfun$9.apply(RecoveryJob.scala:92)
at com.snowplowanalytics.snowplow.event.recovery.RecoveryJob$$anonfun$9.apply(RecoveryJob.scala:92)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:125)
at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:123)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1414)
at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:135)
... 8 more
If I change the config to something like
{
"schema": "iglu:com.snowplowanalytics.snowplow/recoveries/jsonschema/1-0-0",
"data": []
}
The job succeeds but all i get are a bunch of .lzo
files which are empty when decompressed.
I have dug through the recovery code and cannot seem to figure out where its attempting to decode an invalid base64 string.