Exception occuring while attempting to recover bad shredded events

I had an issue with my schema and ended up having bad rows show up in shredded/bad. I am following the instructions here https://snowplowanalytics.com/blog/2019/01/16/snowplow-event-recovery-0.1.0-released/ to recover these rows for re processing. I have been able to successfully set up the EMR instance and run the jar.

An example of the errors on one of the bad rows can be seen here

When i attempt start the spark job with the following command on the EMR cluster

spark-submit --deploy-mode cluster --class com.snowplowanalytics.snowplow.event.recovery.Main --master yarn --conf spark.hadoop.validateOutputSpecs=false s3://recover-jars/snowplow-event-recovery-spark-0.1.1.jar --input s3://bad-shredded-test --output s3://test-recover-2/ --config ew0KICAic2NoZW1hIjogImlnbHU6Y29tLnNub3dwbG93YW5hbHl0aWNzLnNub3dwbG93L3JlY292ZXJpZXMvanNvbnNjaGVtYS8xLTAtMCIsDQogICJkYXRhIjogWw0KICAgIHsNCiAgICAgICJuYW1lIjogIlBhc3NUaHJvdWdoIiwNCiAgICAgICJlcnJvciI6ICJvYmplY3QgaGFzIG1pc3NpbmcgcmVxdWlyZWQgcHJvcGVydGllcyAoW1wiaXNBZG1pblwiXSkiDQogICAgfQ0KICBdDQp9DQo=

The base64 encoded recovery config is as followed

{
  "schema": "iglu:com.snowplowanalytics.snowplow/recoveries/jsonschema/1-0-0",
  "data": [
    {
      "name": "PassThrough",
      "error": "object has missing required properties ([\"isAdmin\"])"
    }
  ]
}

The application fails and when looking at the logs i see java.io.IOException: File already exists:s3://test-recover-2/part-r-00000.lzo

I believe the initial cause is this though

19/08/20 04:21:19 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, ip-172-31-29-145.us-east-2.compute.internal, executor 1): org.apache.spark.SparkException: Task failed while writing rows
        at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:151)
        at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:79)
        at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:78)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: Illegal base64 character 2d
        at java.util.Base64$Decoder.decode0(Base64.java:714)
        at java.util.Base64$Decoder.decode(Base64.java:526)
        at java.util.Base64$Decoder.decode(Base64.java:549)
        at com.snowplowanalytics.snowplow.event.recovery.utils$$anonfun$1.apply(utils.scala:34)
        at com.snowplowanalytics.snowplow.event.recovery.utils$$anonfun$1.apply(utils.scala:33)
        at scala.Function1$$anonfun$andThen$1.apply(Function1.scala:52)
        at com.snowplowanalytics.snowplow.event.recovery.model$BadRow.mutateCollectorPayload(model.scala:56)
        at com.snowplowanalytics.snowplow.event.recovery.RecoveryJob$$anonfun$9.apply(RecoveryJob.scala:92)
        at com.snowplowanalytics.snowplow.event.recovery.RecoveryJob$$anonfun$9.apply(RecoveryJob.scala:92)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
        at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:125)
        at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:123)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1414)
        at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:135)
        ... 8 more

If I change the config to something like

{
  "schema": "iglu:com.snowplowanalytics.snowplow/recoveries/jsonschema/1-0-0",
  "data": []
}

The job succeeds but all i get are a bunch of .lzo files which are empty when decompressed.

I have dug through the recovery code and cannot seem to figure out where its attempting to decode an invalid base64 string.

@Dmitriy_Tarasevich, Snowplow Event Recovery won’t work on shredded data. It’s meant to recover bad events generated by the real-time pipeline (events rejected at enrichment stage).

I assume you are running real-time pipeline. For batch, you would need to use a different recovery app as per tutorial Running Hadoop Event Recovery with Dataflow Runner [tutorial].

It appears you are trying to recover a custom event which is missing a required field. The strategy you have chosen for that, PassThrough, is not correct unless you have modified your JSON schema which now does not require that field. My understanding is you would want to insert a missing property into the event with presumably a default value. You likely need to choose a different strategy for this task.