Hi Guys,
I am trying to recover the bad events from the bad events bucket and the schema of the bad event is “Schema violation” and I’m using the spark
*Release label: emr-5.17.0 Hadoop distribution: Amazon 2.8.4 Applications: Spark 2.3.1, Zeppelin 0.7.3 Jar=s3://snowplow-hosted-assets/3-enrich/snowplow-event-recovery/snowplow-event-recovery-spark-0.3.1.jar,`.
I am getting this error while running the emr step, Can you guys help me with this!
> java.lang.NoClassDefFoundError: org/apache/spark/metrics/source/Source
> at com.snowplowanalytics.snowplow.event.recovery.Main$.$anonfun$main$9(Main.scala:96)
> at com.snowplowanalytics.snowplow.event.recovery.Main$.$anonfun$main$9$adapted(Main.scala:96)
> at scala.util.Either.map(Either.scala:353)
> at com.snowplowanalytics.snowplow.event.recovery.Main$.$anonfun$main$8(Main.scala:96)
> at cats.SemigroupalArityFunctions.$anonfun$map7$1(SemigroupalArityFunctions.scala:105)
> at cats.data.Validated.ap(Validated.scala:182)
> at cats.data.ValidatedApplicative.ap(Validated.scala:527)
> at cats.data.ValidatedApplicative.ap(Validated.scala:520)
> at cats.ComposedApply.$anonfun$ap$2(Composed.scala:33)
> at cats.Monad.$anonfun$map$1(Monad.scala:16)
> at cats.instances.Function0Instances$$anon$4.$anonfun$flatMap$1(function.scala:75)
> at cats.instances.Function0Instances$$anon$4.$anonfun$flatMap$1(function.scala:75)
> at com.monovore.decline.Parser.evalResult(Parser.scala:30)
> at com.monovore.decline.Parser.consumeAll(Parser.scala:107)
> at com.monovore.decline.Parser.apply(Parser.scala:19)
> at com.monovore.decline.Command.parse(opts.scala:18)
> at com.monovore.decline.effect.CommandIOApp.$anonfun$run$2(CommandIOApp.scala:42)
> at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:87)
> at cats.effect.internals.IORunLoop$RestartCallback.signal(IORunLoop.scala:355)
> at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:376)
> at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:316)
> at cats.effect.internals.IOShift$Tick.run(IOShift.scala:36)
> at cats.effect.internals.PoolUtils$$anon$2$$anon$3.run(PoolUtils.scala:51)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ClassNotFoundException: org.apache.spark.metrics.source.Source
> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
> ... 26 more
But If I remove the main class I’m getting the class not found exception, which I mentioned above!
Can you share some thoughts on how to resolve this error?
Specs: Release label: emr-5.17.0 Hadoop distribution: Amazon 2.8.4 Applications: Spark 2.3.1, Zeppelin 0.7.3 Jar=s3://snowplow-hosted-assets/3-enrich/snowplow-event-recovery/snowplow-event-recovery-spark-0.3.1.jar , @ihor can you help me with this?
@colm Thanks for replying!
Here are the jars and versions which i use
The enricher version is the latest one - 2.0.2 https://github.com/snowplow/enrich/releases/download/2.0.2/snowplow-stream-enrich-kinesis-2.0.2.jar
The recovery job is using this jar s3://snowplow-hosted-assets/3-enrich/snowplow-event-recovery/snowplow-event-recovery-spark-0.3.1.jar
The type of recovery needed Is schema violation!
"iglu:com.snowplowanalytics.snowplow.badrows/schema_violations/jsonschema/2-0-0
The recovery schema is
“iglu:com.snowplowanalytics.snowplow/recoveries/jsonschema/3-0-0”,
I am using EMR steps to run the event recovery jobs, and while running the step the main class which is sent Is being sent along with the arguments so that (but I’m referring it in a separate field not inside args), the error is being thrown!
I discovered that I may still be using the old format of bad rows (?), although I upgraded my setup. I guess I must have missed something? Not sure what is going on but I’ve decided to move on, I’ll get back to bad rows a bit later.
My bad rows format looks like this:
{"line":"CwBkAAAACjMuODMu[......]C8xLTAtMAA=","errors":[{"level":"error","message":"Evaluating JavaScript script threw an exception: [org.mozilla.javascript.WrappedException: Wrapped java.lang.IllegalArgumentException: Input byte array has incorrect ending byte at 68 (user-defined-script#1)]"}],"failure_tstamp":"2022-01-05T13:14:21.290Z"}
I managed to run this:
but I’m failing to reprocess the resulting data so far.
I usually have .lzo files coming straight out of the collector/kinesis stream but this recovery process I just ran just dumped the value of line in the bad rows into files. So I now have files with one base64 event per line and I need to reprocess that. How to do that is still unclear to me, but I’m making progress. If you have any clue to help me, that would be amazing.
Regarding the more recent processes, I’ve tried:
Spark
On AWS EMR, I wasn’t able to get the thing going. I was getting
Then I tried in a local Spark environment (although I’m new to this) but wasn’t able to setup the LZO compression happening. Running a lot into this when running the job:
GPL NativeCodeLoader: Could not load native gpl library
Then struggled way too long to try and get hadoop-lzo setup but I feel like it’s too old for the most recent Spark and Hadoop versions… or I’m just incompetent in Spark/Hadoop, which I am.
Flink
Couldn’t get anything happen as I was consistently getting this message:
Ambiguous option/flag: --input
I tried versions 0.1.0, 0.2.0 and 0.3.1 of the snowplow-event-recovery jar for both setups, I was never able to make it work.
Now, du to this, I’m afraid that if I upgrade my Snowplow setup, I would not be able to recover bad rows anymore so it’s still important for me to understand how to make all of this work. I’ll probably come back to it soon and I’ll be able to share more details.
I’m curious if there’s a simple working example of bad rows reprocessing somewhere, like a bundle with dummy bad rows and the process to run against that. That would be a huge starting point for me. If it doesn’t exist, I’ll try to create my own and I’ll share it. No ETA on this though
How complicated is the recovery that you need to do?
From what I recall there’s a bit of a challenge when recovering old format bad rows from real time pipelines. snowplow-event-recovery is designed to work with the new self-describing bad row format and the much older hadoop-event-recovery I believe (it’s a while ago now) was only designed to work with the TSV format rather than the realtime Thrift format.
I haven’t done one of these recoveries for quite a while but when I did so I ended up writing some code to deserialise the row / perform the fix and then reserialise.
Yeah that’s exactly what it looks like I need to do.
The recovery in itself is just replaying events, I don’t need to update anything in the row. They failed the enrichment process because my custom JS enrichment was too strict. I updated its code and now I just want to replay those events that failed.
After the hadoop-event-recovery, I’m left with records like this one:
which would bring me to this, if I were to base64 decode it:
(I’m leaving all the data, I don’t think there’s anything sensitive or that couldn’t be found publicly anyways).
Like you said, I seem to understand that the new EMR EtlRunner expects Thrift records but I’m not sure what those records I got are and I was wondering if I could make it work by providing another input format. From what I recall, I think the Clojure format was also different from this (?).
It’s not that much data, those records the hadoop-event-recovery left me are less than 2GB (13 files of about 100-150 MB). So really, any solution, even local, to transform those into something that the EMREtlRunner could read would work wonders.
As I don’t really know what formats the EMR job would accept, I’m just not sure how to proceed from there. Also, I’m not really familiar with Thrift but I’m guessing that I could serialize these recovered rows in Thrift if I’m able to transform them into the expected format first. Would you know what the Thrift records represent? Is it a CSV/TSV, is it just the request log (like the full URL)?
Thanks a lot for jumping in! I’m taking all the help and ideas you have at this point!
By the way, if this should be in a new topic, I’m OK to open a new one for clarity, let me know.
The base64 (the line part of the bad row) you are getting is the base64 serialized Thrift record - so in if you only need to replay you could pump this directly back in Kinesis (raw stream) and you shouldn’t run into any issues - downstream processes won’t differentiate this from any other records as they are both in the correct serialised Thrift format.
The representation is something in between - it’s pretty much these fields.
Instead of pumping right back into the raw stream, I’ll create an alternative pipeline for recovery. Just so I get more traceability of what’s going on.
Ah that makes sense! That’s funny I actually remember trying to play with that at some point. Probably for the same reason as right now. I probably ended up replaying the raw archive :x
Ok I’m on it, I’ll let you know how it goes, thank you so much!
I’m using boto in Python to push to Kinesis. The library encodes the data that we pass to it in base64 so since we actually want the Thrift records in the stream, I need to decode each base64 line first. I also have to decode the line in ascii since I’m getting binary lines (b'xxxxxx')
This is the decoding part:
for line in smart_open.smart_open('s3://path-to-recovered-file'):
decoded = base64.b64decode(line.decode('ascii'))
records.append(decoded)
I’m pushing the result to our isolated stream and as I go, I’m comparing the records in that isolated recovery stream with the records I’m getting in our production stream.
To me, what I’m getting as a result looks really identical to the records I’m seeing in our working raw stream.
Example of record pushed and available in the recovery stream:
Following a previous post of yours @mike that I found, I was able to deserialize the record, which also indicates that the format is correct.
So here I am, expecting things to fully work when I push the payload to Kinesis but … I’m running a copy of my s3loader (v. 0.7.0, it’s old I know, but it’s working great) locally and it doesn’t work. I replaced the input stream and the dynamoDB app name in the config, that’s it.
It looks like the loader is seeing the records but not able to process them?
This is the log I’m getting:
java -jar snowplow-s3-loader-0.7.0.jar --config loader.conf
log4j:WARN No appenders could be found for logger (com.amazonaws.AmazonWebServiceClient).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
[main] INFO com.snowplowanalytics.s3.loader.sinks.KinesisSink - Stream unsplash-snowplow-raw-loader-fail exists and is active
[main] INFO com.snowplowanalytics.s3.loader.S3Loader$ - Initializing sink with KinesisConnectorConfiguration: {regionName=us-east-1, s3Endpoint=https://s3.amazonaws.com, kinesisInputStream=xxx, maxRecords=500, connectorDestination=s3, bufferMillisecondsLimit=30000, bufferRecordCountLimit=500, s3Bucket=xxx, kinesisEndpoint=https://kinesis.us-east-1.amazonaws.com, appName=unsp_s3loader_recovery_raw, bufferByteSizeLimit=10000000, retryLimit=1, initialPositionInStream=LATEST}
[main] INFO com.snowplowanalytics.s3.loader.KinesisSourceExecutor - KinesisSourceExecutor worker created
[RecordProcessor-0001] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 67 records.
[RecordProcessor-0000] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 67 records.
[RecordProcessor-0003] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 60 records.
[RecordProcessor-0002] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 106 records.
[RecordProcessor-0001] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 82 records.
[RecordProcessor-0000] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 86 records.
[RecordProcessor-0002] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 138 records.
[RecordProcessor-0003] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 94 records.
[RecordProcessor-0000] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 100 records.
[RecordProcessor-0001] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 124 records.
[RecordProcessor-0002] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 168 records.
[RecordProcessor-0003] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 108 records.
[RecordProcessor-0003] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 214 records.
[RecordProcessor-0000] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 179 records.
[RecordProcessor-0002] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 147 records.
[RecordProcessor-0001] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 160 records.
[RecordProcessor-0003] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 248 records.
[RecordProcessor-0001] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 193 records.
[RecordProcessor-0000] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 183 records.
[RecordProcessor-0002] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 176 records.
[RecordProcessor-0000] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 244 records.
[RecordProcessor-0003] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 231 records.
[RecordProcessor-0002] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 306 records.
Looks like this unresolved issue:
I’ve hidden the stream and bucket names but they both exist.
So here I am, happy about the format thing but I can’t get the whole pipeline to work.
Not sure if you have any clue about this s3loader thing? Any thoughts about the format? It should be fine right?
I’ll probably come back to this tomorrow, I need some sleep
The format may be fine - I’m not too sure about decoding as ascii (utf-8 would be the safer bet). Is the S3 loader emitting any records to the bad sink? If so this should contain any errors or anything that it failed to partition to that Kinesis stream.
Oh wow, I didn’t even think about fixing this. Since I was seeing the “Flushing” logs, I assumed it was still logging by default to the console or something.
This is what I’m seeing after fixing the log4j thing:
ERROR 2022-01-06 21:58:52,198 0 com.hadoop.compression.lzo.GPLNativeCodeLoader [main] Could not load native gpl library
java.lang.UnsatisfiedLinkError: no gplcompression in java.library.path
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1860)
at java.lang.Runtime.loadLibrary0(Runtime.java:871)
at java.lang.System.loadLibrary(System.java:1122)
at com.hadoop.compression.lzo.GPLNativeCodeLoader.<clinit>(GPLNativeCodeLoader.java:54)
at com.hadoop.compression.lzo.LzoCodec.<clinit>(LzoCodec.java:71)
at com.snowplowanalytics.s3.loader.serializers.LzoSerializer$.<init>(LzoSerializer.scala:37)
at com.snowplowanalytics.s3.loader.serializers.LzoSerializer$.<clinit>(LzoSerializer.scala)
at com.snowplowanalytics.s3.loader.S3Loader$.run(S3Loader.scala:49)
at com.snowplowanalytics.s3.loader.SinkApp$.main(SinkApp.scala:52)
at com.snowplowanalytics.s3.loader.SinkApp.main(SinkApp.scala)
ERROR 2022-01-06 21:58:52,201 3 com.hadoop.compression.lzo.LzoCodec [main] Cannot load native-lzo without native-hadoop
So this is exactly the same issue I was running into when I was trying to run the new snowplow-event-recovery on a local Spark.
Note that building Hadoop 3.1.1/3.1.2/3.2.0 native code from source is broken
on macOS. For 3.1.1/3.1.2, you need to manually backport YARN-8622. For 3.2.0,
you need to backport both YARN-8622 and YARN-9487 in order to build native code.
So I decided that I would abandon the idea of working locally and I just started a new s3loader in the EC2 instance that’s hosting the production one. Not sure why I didn’t think about that before …
And well … it just works over there. I’m guessing the Java install is cleaner and more exhaustive over there. I’m getting the LZO encoded files ready for reprocessing.
Haven’t tried processing them yet but I’m sure it’ll work (eh? ).
I guess all this seems to indicate that I’d probably be better moving away from EMREtlRunner and towards Stream Enrich and the RDB Shredder.