RDB loader fails after load

Hello,

I’m using the emretlrunner tool to load events into Redshift with nightly scheduled loads. Enrichment and shredding have been working fine, but lately the RDB loading step has begin failing due to what looks like a Hadoop exception, causing the EMR cluster to hang.

The exception happens after shredding (so the shredded/good directory contains data), specifically on the “[rdb_load] Load AWS Redshift enriched events storage Storage Target” step. The logs from this step show that the RDB loader successfully completes the consistency check, finishes loading the detected shredded/good data, VACCUM queries skipped, ANALYZE transaction executed, and then we reach the exception (randomly - sometimes the job works!). Traceback:

Exception in thread "main" java.util.concurrent.TimeoutException: Futures timed out after [5 seconds]
	at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:223)
	at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:157)
	at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:169)
	at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:169)
	at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
	at scala.concurrent.Await$.ready(package.scala:169)
	at com.snowplowanalytics.snowplow.scalatracker.emitters.id.RequestProcessor.sendSync(RequestProcessor.scala:162)
	at com.snowplowanalytics.snowplow.scalatracker.emitters.id.SyncEmitter.send(SyncEmitter.scala:39)
	at com.snowplowanalytics.snowplow.scalatracker.emitters.id.SyncEmitter.send(SyncEmitter.scala:31)
	at com.snowplowanalytics.snowplow.scalatracker.Tracker$$anonfun$send$1.apply(Tracker.scala:64)
	at com.snowplowanalytics.snowplow.scalatracker.Tracker$$anonfun$send$1.apply(Tracker.scala:64)
	at cats.data.NonEmptyList.traverse(NonEmptyList.scala:231)
	at com.snowplowanalytics.snowplow.scalatracker.Tracker.send(Tracker.scala:64)
	at com.snowplowanalytics.snowplow.scalatracker.Tracker.com$snowplowanalytics$snowplow$scalatracker$Tracker$$track(Tracker.scala:54)
	at com.snowplowanalytics.snowplow.scalatracker.Tracker$$anonfun$trackSelfDescribingEvent$1.apply(Tracker.scala:137)
	at com.snowplowanalytics.snowplow.scalatracker.Tracker$$anonfun$trackSelfDescribingEvent$1.apply(Tracker.scala:137)
	at cats.package$$anon$1.flatMap(package.scala:41)
	at cats.FlatMap$Ops$class.flatMap(FlatMap.scala:21)
	at cats.FlatMap$ToFlatMapOps$$anon$2.flatMap(FlatMap.scala:21)
	at com.snowplowanalytics.snowplow.scalatracker.Tracker.trackSelfDescribingEvent(Tracker.scala:137)
	at com.snowplowanalytics.snowplow.rdbloader.interpreters.implementations.TrackerInterpreter$.trackSuccess(TrackerInterpreter.scala:109)
	at com.snowplowanalytics.snowplow.rdbloader.interpreters.RealWorldInterpreter$$anon$1.apply(RealWorldInterpreter.scala:197)
	at com.snowplowanalytics.snowplow.rdbloader.interpreters.RealWorldInterpreter$$anon$1.apply(RealWorldInterpreter.scala:116)
	at cats.free.Free$$anonfun$foldMap$1.apply(Free.scala:155)
	at cats.free.Free$$anonfun$foldMap$1.apply(Free.scala:153)
	at cats.package$$anon$1.tailRecM(package.scala:43)
	at cats.free.Free.foldMap(Free.scala:153)
	at cats.free.Free$$anonfun$foldMap$1.apply(Free.scala:156)
	at cats.free.Free$$anonfun$foldMap$1.apply(Free.scala:153)
	at cats.package$$anon$1.tailRecM(package.scala:43)
	at cats.free.Free.foldMap(Free.scala:153)
	at com.snowplowanalytics.snowplow.rdbloader.Main$.run(Main.scala:69)
	at com.snowplowanalytics.snowplow.rdbloader.Main$.main(Main.scala:36)
	at com.snowplowanalytics.snowplow.rdbloader.Main.main(Main.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.hadoop.util.RunJar.run(RunJar.java:239)
	at org.apache.hadoop.util.RunJar.main(RunJar.java:153)

I have naively copy/pasted this error and increased the spark.sql.broadcastTimeout configuration setting as suggested by S/O to 300 but this hasn’t fixed the issue.

Totally appreciate any tips/insights, thank you!

Hey @Frank_Fineis!

It seems that RDB Loader “fails” on what seems to be dogfooding of monitoring details back to Snowplow. When Loader finishes its work it reports back to a Snowplow collector you have configured in monitoring.snowplow section in config.yml file and seems that the collector is not responding in time. Unless you have this requirement - it’s completely optional and doesn’t bear much of useful information, my guess is that you have monitoring.snowplow section configured by mistake.

What confuses me the most is that it’s an error, while clearly must be just a warning.

I’ll investigate if this is still the case in latest 1.0.0 version, but very likely it is not. There’s a lot of changes since the version you use (presumably R32). An important detail is that RDB Loader is not running on EMR cluster anymore (and it’s never been a Spark job, btw, so setting timeout wouldn’t help anyway). I think you might want to consider upgrading to the latest version:

1 Like

Hey @anton thanks for the reply!

Ah ok, yeah so interesting point about the monitoring. On the times that the job has completed, I’ve noticed this message that occurs precisely after the point where the failing jobs have failed:

VACUUM queries skipped
ANALYZE transaction executed
Snowplow Tracker [2021-04-21T18:20:43.325Z]: Cannot deliver event to http://<collector URL>:80. Collector responded with 404
Completed successfully

So yeah… definitely seems like there could be an issue with the monitoring config! I’ll try to either remedy it (maybe point this to port 80, the monitoring port is currently for HTTPS) or just remove it altogether.

And thanks for the clarification on Spark vs EMR, sorry yeah I really treat a lot of snowplow ETL as a black box and am trying to understand it better. I’ll talk to our data engineer about moving to R35, thanks so much!