Bad Event Recovery Failing!

Hi Guys,
I am trying to recover the bad events from the bad events bucket and the schema of the bad event is “Schema violation” and I’m using the spark
*Release label: emr-5.17.0
Hadoop distribution: Amazon 2.8.4
Applications: Spark 2.3.1, Zeppelin 0.7.3
Jar=s3://snowplow-hosted-assets/3-enrich/snowplow-event-recovery/snowplow-event-recovery-spark-0.3.1.jar,`.
I am getting this error while running the emr step, Can you guys help me with this!

> java.lang.NoClassDefFoundError: org/apache/spark/metrics/source/Source
> 	at com.snowplowanalytics.snowplow.event.recovery.Main$.$anonfun$main$9(Main.scala:96)
> 	at com.snowplowanalytics.snowplow.event.recovery.Main$.$anonfun$main$9$adapted(Main.scala:96)
> 	at scala.util.Either.map(Either.scala:353)
> 	at com.snowplowanalytics.snowplow.event.recovery.Main$.$anonfun$main$8(Main.scala:96)
> 	at cats.SemigroupalArityFunctions.$anonfun$map7$1(SemigroupalArityFunctions.scala:105)
> 	at cats.data.Validated.ap(Validated.scala:182)
> 	at cats.data.ValidatedApplicative.ap(Validated.scala:527)
> 	at cats.data.ValidatedApplicative.ap(Validated.scala:520)
> 	at cats.ComposedApply.$anonfun$ap$2(Composed.scala:33)
> 	at cats.Monad.$anonfun$map$1(Monad.scala:16)
> 	at cats.instances.Function0Instances$$anon$4.$anonfun$flatMap$1(function.scala:75)
> 	at cats.instances.Function0Instances$$anon$4.$anonfun$flatMap$1(function.scala:75)
> 	at com.monovore.decline.Parser.evalResult(Parser.scala:30)
> 	at com.monovore.decline.Parser.consumeAll(Parser.scala:107)
> 	at com.monovore.decline.Parser.apply(Parser.scala:19)
> 	at com.monovore.decline.Command.parse(opts.scala:18)
> 	at com.monovore.decline.effect.CommandIOApp.$anonfun$run$2(CommandIOApp.scala:42)
> 	at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:87)
> 	at cats.effect.internals.IORunLoop$RestartCallback.signal(IORunLoop.scala:355)
> 	at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:376)
> 	at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:316)
> 	at cats.effect.internals.IOShift$Tick.run(IOShift.scala:36)
> 	at cats.effect.internals.PoolUtils$$anon$2$$anon$3.run(PoolUtils.scala:51)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 	at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ClassNotFoundException: org.apache.spark.metrics.source.Source
> 	at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
> 	... 26 more

This is my job config

{
  "schema": "iglu:com.snowplowanalytics.snowplow/recoveries/jsonschema/3-0-0",
  "data": {
    "iglu:com.snowplowanalytics.snowplow.badrows/schema_violations/jsonschema/2-0-0": [
      {
        "name": "mainFlow",
        "conditions": [],
        "steps": [
          {
            "op": "Cast",
            "path": "$.payload.enriched.unstruct_event.data.data.submitted_input",
            "from": "Numeric",
            "to": "String"
          }
        ]
      }
    ]
  }
}

Thanks in advance!

Hey Guys,
My command is

aws emr add-steps --cluster-id j-MMMMNNNN --steps "Name=snowplow-event-recovery, Type=CUSTOM_JAR,Jar=s3://snowplow-hosted-assets/3-enrich/snowplow-event-recovery/snowplow-event-recovery-spark-0.2.0.jar,MainClass=com.snowplowanalytics.snowplow.event.recovery.Main,
 Args=[--input,s3://abcd,--region,us-east-1,--output,s3://efgh,
 --failedOutput,s3://ijkl,--unrecoverableOutput,s3://mnop,--config, base64-string,
 --resolver,base64-string], ActionOnFailure=CONTINUE"

When I run this command I’m getting this error

Unexpected argument: com.snowplowanalytics.snowplow.event.recovery.Main

Usage: snowplow-event-recovery-job --input <string> --output <string> [--failedOutput <string>] [--unrecoverableOutput <string>] --region <string> [--batchSize <integer>] --resolver <string> --config <string>

Snowplow event recovery job

Options and flags:
    --help
        Display this help text.
    --input <string>
        Input S3 path
    --output <string>
        Output Kinesis topic
    --failedOutput <string>
        Unrecovered (bad row) output S3 path. Defaults to `input`
    --unrecoverableOutput <string>
        Unrecoverable (bad row) output S3 path. Defaults failedOutput/unrecoverable` or `input/unrecoverable`
    --region <string>
        Kinesis region
    --batchSize <integer>
        Kinesis batch size
    --resolver <string>
        Iglu resolver configuration
    --config <string>
        Base64 config with schema com.snowplowanalytics.snowplow/recovery_config/jsonschema/1-0-0

But If I remove the main class I’m getting the class not found exception, which I mentioned above!
Can you share some thoughts on how to resolve this error?
Specs:
Release label: emr-5.17.0
Hadoop distribution: Amazon 2.8.4
Applications: Spark 2.3.1, Zeppelin 0.7.3
Jar=s3://snowplow-hosted-assets/3-enrich/snowplow-event-recovery/snowplow-event-recovery-spark-0.3.1.jar ,
@ihor can you help me with this?

@colm can you help me with this issue?

@Prasanth_Rosario 2 questions:

What version of enrich are you using?
Which event recovery job are you using? (a link to the repo or the docs will do - whatever you are working from)

@colm Thanks for replying!
Here are the jars and versions which i use
The enricher version is the latest one - 2.0.2
https://github.com/snowplow/enrich/releases/download/2.0.2/snowplow-stream-enrich-kinesis-2.0.2.jar

The recovery job is using this jar
s3://snowplow-hosted-assets/3-enrich/snowplow-event-recovery/snowplow-event-recovery-spark-0.3.1.jar

The type of recovery needed Is schema violation!
"iglu:com.snowplowanalytics.snowplow.badrows/schema_violations/jsonschema/2-0-0

The recovery schema is
“iglu:com.snowplowanalytics.snowplow/recoveries/jsonschema/3-0-0”,

I can’t see what’s going wrong here, but this isn’t my area of expertise. I’ll see if anyone who knows a bit more about it is free to take a look.

Sure Thanks @Colm !

I am using EMR steps to run the event recovery jobs, and while running the step the main class which is sent Is being sent along with the arguments so that (but I’m referring it in a separate field not inside args), the error is being thrown!

Hi Prasanth,

Looks like the schema should be 4-0-0, rather than 3-0-0. Also we use AMI version 6.0.0 but I’m not sure how much of an impact that has.

Hi Colm,

I have made those changes as well, but I’m still getting the same errors!

Hey @Prasanth_Rosario, did you find a solution for this?

I’m running into the same issues. I can’t make the Event Recovery work.

Hi @Timmycarbone ,

Which way from this page do you use to run recovery and what is your error ?

Hey! Thanks for the reply.

I discovered that I may still be using the old format of bad rows (?), although I upgraded my setup. I guess I must have missed something? Not sure what is going on but I’ve decided to move on, I’ll get back to bad rows a bit later.

My bad rows format looks like this:

{"line":"CwBkAAAACjMuODMu[......]C8xLTAtMAA=","errors":[{"level":"error","message":"Evaluating JavaScript script threw an exception: [org.mozilla.javascript.WrappedException: Wrapped java.lang.IllegalArgumentException: Input byte array has incorrect ending byte at 68 (user-defined-script#1)]"}],"failure_tstamp":"2022-01-05T13:14:21.290Z"}

I managed to run this:

but I’m failing to reprocess the resulting data so far.
I usually have .lzo files coming straight out of the collector/kinesis stream but this recovery process I just ran just dumped the value of line in the bad rows into files. So I now have files with one base64 event per line and I need to reprocess that. How to do that is still unclear to me, but I’m making progress. If you have any clue to help me, that would be amazing.

Regarding the more recent processes, I’ve tried:

  • Spark

On AWS EMR, I wasn’t able to get the thing going. I was getting

Unexpected argument: com.snowplowanalytics.snowplow.event.recovery.Main

or, when I didn’t supply the above in MainClass, I was getting:

java.lang.NoClassDefFoundError: org/apache/spark/metrics/source/Source

Then I tried in a local Spark environment (although I’m new to this) but wasn’t able to setup the LZO compression happening. Running a lot into this when running the job:

GPL NativeCodeLoader: Could not load native gpl library

Then struggled way too long to try and get hadoop-lzo setup but I feel like it’s too old for the most recent Spark and Hadoop versions… or I’m just incompetent in Spark/Hadoop, which I am.

  • Flink

Couldn’t get anything happen as I was consistently getting this message:

Ambiguous option/flag: --input

I tried versions 0.1.0, 0.2.0 and 0.3.1 of the snowplow-event-recovery jar for both setups, I was never able to make it work.

Now, du to this, I’m afraid that if I upgrade my Snowplow setup, I would not be able to recover bad rows anymore so it’s still important for me to understand how to make all of this work. I’ll probably come back to it soon and I’ll be able to share more details.

I’m curious if there’s a simple working example of bad rows reprocessing somewhere, like a bundle with dummy bad rows and the process to run against that. That would be a huge starting point for me. If it doesn’t exist, I’ll try to create my own and I’ll share it. No ETA on this though :stuck_out_tongue:

How complicated is the recovery that you need to do?

From what I recall there’s a bit of a challenge when recovering old format bad rows from real time pipelines. snowplow-event-recovery is designed to work with the new self-describing bad row format and the much older hadoop-event-recovery I believe (it’s a while ago now) was only designed to work with the TSV format rather than the realtime Thrift format.

I haven’t done one of these recoveries for quite a while but when I did so I ended up writing some code to deserialise the row / perform the fix and then reserialise.

Yeah that’s exactly what it looks like I need to do.

The recovery in itself is just replaying events, I don’t need to update anything in the row. They failed the enrichment process because my custom JS enrichment was too strict. I updated its code and now I just want to replay those events that failed.

After the hadoop-event-recovery, I’m left with records like this one:

CwBkAAAADjU0LjE3NS4yMjMuMTc0CgDIAAABfhCYds4LANIAAAAFVVRGLTgLANwAAAAvc25vd3Bsb3ctc3RyZWFtLWNvbGxlY3Rvci1raW5lc2lzLTIuNC40LWtpbmVzaXMLASwAAAAEUnVieQsBQAAAAAIvaQsBSgAAAg9lPXNlJnNlX2NhPXBob3RvJnNlX2FjPWRvd25sb2FkZWQtcGhvdG8mc2VfbGE9MjA4OTMwNyZzZV9wcj1Nbnd4TWpBM2ZEQjhNWHh6WldGeVkyaDhPWHg4YjJabWFXTmxKVEl3Wm5WdWZId3dmSHg4ZkRFMk16azFPRFV6T1RJZm9yY2UlM0R0cnVlJmN4PWV5SnpZMmhsYldFaU9pSnBaMngxT21OdmJTNXpibTkzY0d4dmQyRnVZV3g1ZEdsamN5NXpibTkzY0d4dmR5OWpiMjUwWlhoMGN5OXFjMjl1YzJOb1pXMWhMekV0TUMweElpd2laR0YwWVNJNlczc2ljMk5vWlcxaElqb2lhV2RzZFRwamIyMHVkVzV6Y0d4aGMyZ3ZZWEJwWDJGd2NHeHBZMkYwYVc5dUwycHpiMjV6WTJobGJXRXZNUzB3TFRBaUxDSmtZWFJoSWpwN0ltRndjR3hwWTJGMGFXOXVYMmxrSWpveE1qQTNmWDFkZlElM0QlM0QmZHRtPTE2NDA5NTU5MzQ0MDQmcD1zcnYmaXA9NTQuMjE4LjQ2LjEwMyZ0dj1yYi0wLjYuMSZhaWQ9dW5zcGxhc2gtYXBpLXByb2R1Y3Rpb24mZWlkPTIyZDAxYzNiLTlmYjctNDlkMS1iNmMyLTE2ODAzM2Q4Y2FlOSZzdG09MTY0MDk1NTkzNDQwNQ8BXgsAAAALAAAAG1RpbWVvdXQtQWNjZXNzOiA8ZnVuY3Rpb24xPgAAAA9YLUZvcndhcmRlZC1Gb3IAAAAXWC1Gb3J3YXJkZWQtUHJvdG86IGh0dHAAAAAUWC1Gb3J3YXJkZWQtUG9ydDogODAAAAAESG9zdAAAADlYLUFtem4tVHJhY2UtSWQ6IFJvb3Q9MS02MWNmMDAxZS0zZjYwY2IzZTMzNjExMWZkMWVlNjJjYWYAAAA0QWNjZXB0LUVuY29kaW5nOiBnemlwLCBkZWZsYXRlO3E9MC42LCBpZGVudGl0eTtxPTAuMwAAAAtBY2NlcHQ6ICovKgAAABBVc2VyLUFnZW50OiBSdWJ5AAAAI1gtTmV3cmVsaWMtSWQ6IFV3TUNVRlpSR3dZQVVGTmJCd1k9AAAAZFgtTmV3cmVsaWMtVHJhbnNhY3Rpb246IFB4UVBBd0phRGdWUlZsTlhWRlVIQWx3Q0ZCOEVCdzhSVlU0YVdscGNEUVFGQkF0VEJBSlhVZ0FPQkVOS1FRRUdBRkFEVlZOUUZUcz0LAZAAAAAWY29sbGVjdG9yLnVuc3BsYXNoLmNvbQsBmgAAACRjYjU4NGIxZC1kN2ZjLTRmOGItOWZiMC1iYzc4YWFlMWMxNDkLemkAAABBaWdsdTpjb20uc25vd3Bsb3dhbmFseXRpY3Muc25vd3Bsb3cvQ29sbGVjdG9yUGF5bG9hZC90aHJpZnQvMS0wLTAA

which would bring me to this, if I were to base64 decode it:
(I’m leaving all the data, I don’t think there’s anything sensitive or that couldn’t be found publicly anyways).

�d���54.175.223.174
���~v����UTF-8����/snowplow-stream-collector-kinesis-2.4.4-kinesis,���Ruby@���/iJ��e=se&se_ca=photo&se_ac=downloaded-photo&se_la=2089307&se_pr=MnwxMjA3fDB8MXxzZWFyY2h8OXx8b2ZmaWNlJTIwZnVufHwwfHx8fDE2Mzk1ODUzOTIforce%3Dtrue&cx=eyJzY2hlbWEiOiJpZ2x1OmNvbS5zbm93cGxvd2FuYWx5dGljcy5zbm93cGxvdy9jb250ZXh0cy9qc29uc2NoZW1hLzEtMC0xIiwiZGF0YSI6W3sic2NoZW1hIjoiaWdsdTpjb20udW5zcGxhc2gvYXBpX2FwcGxpY2F0aW9uL2pzb25zY2hlbWEvMS0wLTAiLCJkYXRhIjp7ImFwcGxpY2F0aW9uX2lkIjoxMjA3fX1dfQ%3D%3D&dtm=1640955934404&p=srv&ip=54.218.46.103&tv=rb-0.6.1&aid=unsplash-api-production&eid=22d01c3b-9fb7-49d1-b6c2-168033d8cae9&stm=1640955934405^������eTimeout-Access: <function1>���X-Forwarded-For���X-Forwarded-Proto: http���X-Forwarded-Port: 80���Host���9X-Amzn-Trace-Id: Root=1-61cf001e-3f60cb3e336111fd1ee62caf���4Accept-Encoding: gzip, deflate;q=0.6, identity;q=0.3���Accept: */*���User-Agent: Ruby���#X-Newrelic-Id: UwMCUFZRGwYAUFNbBwY=���dX-Newrelic-Transaction: PxQPAwJaDgVRVlNXVFUHAlwCFB8EBw8RVU4aWlpcDQQFBAtTBAJXUgAOBENKQQEGAFADVVNQFTs=���collector.unsplash.com���$cb584b1d-d7fc-4f8b-9fb0-bc78aae1c149zi���Aiglu:com.snowplowanalytics.snowplow/CollectorPayload/thrift/1-0-0�

Like you said, I seem to understand that the new EMR EtlRunner expects Thrift records but I’m not sure what those records I got are and I was wondering if I could make it work by providing another input format. From what I recall, I think the Clojure format was also different from this (?).

It’s not that much data, those records the hadoop-event-recovery left me are less than 2GB (13 files of about 100-150 MB). So really, any solution, even local, to transform those into something that the EMREtlRunner could read would work wonders.

As I don’t really know what formats the EMR job would accept, I’m just not sure how to proceed from there. Also, I’m not really familiar with Thrift but I’m guessing that I could serialize these recovered rows in Thrift if I’m able to transform them into the expected format first. Would you know what the Thrift records represent? Is it a CSV/TSV, is it just the request log (like the full URL)?

Thanks a lot for jumping in! I’m taking all the help and ideas you have at this point!

By the way, if this should be in a new topic, I’m OK to open a new one for clarity, let me know.

The base64 (the line part of the bad row) you are getting is the base64 serialized Thrift record - so in if you only need to replay you could pump this directly back in Kinesis (raw stream) and you shouldn’t run into any issues - downstream processes won’t differentiate this from any other records as they are both in the correct serialised Thrift format.

The representation is something in between - it’s pretty much these fields.

Yep, the Clojure format was tab delimited.

Oh wow ok! That’d be awesome.

Instead of pumping right back into the raw stream, I’ll create an alternative pipeline for recovery. Just so I get more traceability of what’s going on.

Ah that makes sense! That’s funny I actually remember trying to play with that at some point. Probably for the same reason as right now. I probably ended up replaying the raw archive :x

Ok I’m on it, I’ll let you know how it goes, thank you so much!

Yep - this is absolutely worth doing just for isolation. Hope it goes well but let me know if you get stuck!

I’m soooo close.

I’m using boto in Python to push to Kinesis. The library encodes the data that we pass to it in base64 so since we actually want the Thrift records in the stream, I need to decode each base64 line first. I also have to decode the line in ascii since I’m getting binary lines (b'xxxxxx')

This is the decoding part:

for line in smart_open.smart_open('s3://path-to-recovered-file'):
  decoded = base64.b64decode(line.decode('ascii'))
  records.append(decoded)

I’m pushing the result to our isolated stream and as I go, I’m comparing the records in that isolated recovery stream with the records I’m getting in our production stream.

To me, what I’m getting as a result looks really identical to the records I’m seeing in our working raw stream.

Example of record pushed and available in the recovery stream:

d
3.91.44.86
�~%��
     �UTF-8
           �/snowplow-stream-collector-kinesis-2.4.4-kinesis
                                                            ,Ruby
                                                                 @/i
                                                                    J�e=se&se_ca=photo&se_ac=downloaded-photo&se_la=2138933&se_pr=MnwxNDI5OTZ8MHwxfGFsbHx8fHx8fHx8fDE2NDA5MTQ4NDA%3F&cx=eyJzY2hlbWEiOiJpZ2x1OmNvbS5zbm93cGxvd2FuYWx5dGljcy5zbm93cGxvdy9jb250ZXh0cy9qc29uc2NoZW1hLzEtMC0xIiwiZGF0YSI6W3sic2NoZW1hIjoiaWdsdTpjb20udW5zcGxhc2gvYXBpX2FwcGxpY2F0aW9uL2pzb25zY2hlbWEvMS0wLTAiLCJkYXRhIjp7ImFwcGxpY2F0aW9uX2lkIjoxNDI5OTZ9fV19&dtm=1640914846152&p=srv&ip=51.161.92.178&tv=rb-0.6.1&aid=unsplash-api-production&eid=1c2ec4b8-3474-4b7b-9ad2-930d529815d4&stm=1640914846152^

                                                                                                 imeout-Access: <function1>X-Forwarded-ForX-Forwarded-Proto: httpX-Forwarded-Port: 80Host9X-Amzn-Trace-Id: Root=1-61ce5f9e-0083113e18b5f239443462604Accept-Encoding: gzip, deflate;q=0.6, identity;q=0.3
                                                              Accept: */*User-Agent: Ruby#X-Newrelic-Id: UwMCUFZRGwYAUFNbBwY=dX-Newrelic-Transaction: PxRWAl5WWgsBUwBSAVBVBwVXFB8EBw8RVU4aA1sAAVALVA4AAVdSAAVXUUNKQQEGAFADVVNQFTs=
                                                                                                             �collector.unsplash.com
               �$48b22093-608c-47f1-adea-9f3bc82da443
                                                     ziAiglu:com.snowplowanalytics.snowplow/CollectorPayload/thrift/1-0-0


Example of a record currently in our raw production stream:

d
3.91.44.86
�~%��
     �UTF-8
           �/snowplow-stream-collector-kinesis-2.4.4-kinesis
                                                            ,Ruby
                                                                 @/i
                                                                    J�e=se&se_ca=photo&se_ac=downloaded-photo&se_la=2138933&se_pr=MnwxNDI5OTZ8MHwxfGFsbHx8fHx8fHx8fDE2NDA5MTQ4NDA%3F&cx=eyJzY2hlbWEiOiJpZ2x1OmNvbS5zbm93cGxvd2FuYWx5dGljcy5zbm93cGxvdy9jb250ZXh0cy9qc29uc2NoZW1hLzEtMC0xIiwiZGF0YSI6W3sic2NoZW1hIjoiaWdsdTpjb20udW5zcGxhc2gvYXBpX2FwcGxpY2F0aW9uL2pzb25zY2hlbWEvMS0wLTAiLCJkYXRhIjp7ImFwcGxpY2F0aW9uX2lkIjoxNDI5OTZ9fV19&dtm=1640914846152&p=srv&ip=51.161.92.178&tv=rb-0.6.1&aid=unsplash-api-production&eid=1c2ec4b8-3474-4b7b-9ad2-930d529815d4&stm=1640914846152^

                                                                                                 imeout-Access: <function1>X-Forwarded-ForX-Forwarded-Proto: httpX-Forwarded-Port: 80Host9X-Amzn-Trace-Id: Root=1-61ce5f9e-0083113e18b5f239443462604Accept-Encoding: gzip, deflate;q=0.6, identity;q=0.3
                                                              Accept: */*User-Agent: Ruby#X-Newrelic-Id: UwMCUFZRGwYAUFNbBwY=dX-Newrelic-Transaction: PxRWAl5WWgsBUwBSAVBVBwVXFB8EBw8RVU4aA1sAAVALVA4AAVdSAAVXUUNKQQEGAFADVVNQFTs=
                                                                                                             �collector.unsplash.com
               �$48b22093-608c-47f1-adea-9f3bc82da443
                                                     ziAiglu:com.snowplowanalytics.snowplow/CollectorPayload/thrift/1-0-0


Following a previous post of yours @mike that I found, I was able to deserialize the record, which also indicates that the format is correct.

So here I am, expecting things to fully work when I push the payload to Kinesis but … I’m running a copy of my s3loader (v. 0.7.0, it’s old I know, but it’s working great) locally and it doesn’t work. I replaced the input stream and the dynamoDB app name in the config, that’s it.

It looks like the loader is seeing the records but not able to process them?

This is the log I’m getting:

java -jar snowplow-s3-loader-0.7.0.jar --config loader.conf
log4j:WARN No appenders could be found for logger (com.amazonaws.AmazonWebServiceClient).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
[main] INFO com.snowplowanalytics.s3.loader.sinks.KinesisSink - Stream unsplash-snowplow-raw-loader-fail exists and is active
[main] INFO com.snowplowanalytics.s3.loader.S3Loader$ - Initializing sink with KinesisConnectorConfiguration: {regionName=us-east-1, s3Endpoint=https://s3.amazonaws.com, kinesisInputStream=xxx, maxRecords=500, connectorDestination=s3, bufferMillisecondsLimit=30000, bufferRecordCountLimit=500, s3Bucket=xxx, kinesisEndpoint=https://kinesis.us-east-1.amazonaws.com, appName=unsp_s3loader_recovery_raw, bufferByteSizeLimit=10000000, retryLimit=1, initialPositionInStream=LATEST}
[main] INFO com.snowplowanalytics.s3.loader.KinesisSourceExecutor - KinesisSourceExecutor worker created
[RecordProcessor-0001] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 67 records.
[RecordProcessor-0000] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 67 records.
[RecordProcessor-0003] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 60 records.
[RecordProcessor-0002] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 106 records.
[RecordProcessor-0001] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 82 records.
[RecordProcessor-0000] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 86 records.
[RecordProcessor-0002] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 138 records.
[RecordProcessor-0003] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 94 records.
[RecordProcessor-0000] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 100 records.
[RecordProcessor-0001] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 124 records.
[RecordProcessor-0002] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 168 records.
[RecordProcessor-0003] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 108 records.
[RecordProcessor-0003] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 214 records.
[RecordProcessor-0000] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 179 records.
[RecordProcessor-0002] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 147 records.
[RecordProcessor-0001] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 160 records.
[RecordProcessor-0003] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 248 records.
[RecordProcessor-0001] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 193 records.
[RecordProcessor-0000] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 183 records.
[RecordProcessor-0002] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 176 records.
[RecordProcessor-0000] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 244 records.
[RecordProcessor-0003] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 231 records.
[RecordProcessor-0002] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 306 records.

Looks like this unresolved issue:

I’ve hidden the stream and bucket names but they both exist.
So here I am, happy about the format thing but I can’t get the whole pipeline to work.

Not sure if you have any clue about this s3loader thing? Any thoughts about the format? It should be fine right?
I’ll probably come back to this tomorrow, I need some sleep :slight_smile:

:sweat:

The format may be fine - I’m not too sure about decoding as ascii (utf-8 would be the safer bet). Is the S3 loader emitting any records to the bad sink? If so this should contain any errors or anything that it failed to partition to that Kinesis stream.

Good news below !!

Oh wow, I didn’t even think about fixing this. Since I was seeing the “Flushing” logs, I assumed it was still logging by default to the console or something.
This is what I’m seeing after fixing the log4j thing:

ERROR	2022-01-06 21:58:52,198	0	com.hadoop.compression.lzo.GPLNativeCodeLoader	[main]	Could not load native gpl library
java.lang.UnsatisfiedLinkError: no gplcompression in java.library.path
	at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1860)
	at java.lang.Runtime.loadLibrary0(Runtime.java:871)
	at java.lang.System.loadLibrary(System.java:1122)
	at com.hadoop.compression.lzo.GPLNativeCodeLoader.<clinit>(GPLNativeCodeLoader.java:54)
	at com.hadoop.compression.lzo.LzoCodec.<clinit>(LzoCodec.java:71)
	at com.snowplowanalytics.s3.loader.serializers.LzoSerializer$.<init>(LzoSerializer.scala:37)
	at com.snowplowanalytics.s3.loader.serializers.LzoSerializer$.<clinit>(LzoSerializer.scala)
	at com.snowplowanalytics.s3.loader.S3Loader$.run(S3Loader.scala:49)
	at com.snowplowanalytics.s3.loader.SinkApp$.main(SinkApp.scala:52)
	at com.snowplowanalytics.s3.loader.SinkApp.main(SinkApp.scala)
ERROR	2022-01-06 21:58:52,201	3	com.hadoop.compression.lzo.LzoCodec	[main]	Cannot load native-lzo without native-hadoop

So this is exactly the same issue I was running into when I was trying to run the new snowplow-event-recovery on a local Spark.

I fought it, tried to install Hadoop and its native libraries, tried to compile it etc … too much of a pain really. Hadoop won’t compile on Big Sur.
From https://github.com/apache/hadoop/blob/trunk/BUILDING.txt :

Note that building Hadoop 3.1.1/3.1.2/3.2.0 native code from source is broken
on macOS. For 3.1.1/3.1.2, you need to manually backport YARN-8622. For 3.2.0,
you need to backport both YARN-8622 and YARN-9487 in order to build native code.

So I decided that I would abandon the idea of working locally and I just started a new s3loader in the EC2 instance that’s hosting the production one. Not sure why I didn’t think about that before …

And well … it just works over there. I’m guessing the Java install is cleaner and more exhaustive over there. I’m getting the LZO encoded files ready for reprocessing.

Haven’t tried processing them yet but I’m sure it’ll work (eh? :slight_smile: ).

I guess all this seems to indicate that I’d probably be better moving away from EMREtlRunner and towards Stream Enrich and the RDB Shredder.

Thanks a ton for your help @mike !

1 Like