Databricks Loader Building Up Lag

Hello again!

We’ve implemented the Databricks Loader into our pipeline and have been running it along side the stream transformer for a couple months. Currently the windowing for both services is set to five minutes. We’d like to get that down even further, but five minutes is acceptable currently. However, we’re noticing the lag–or length of time it takes the loader to grab the next sqs message and load the s3 folder is getting incrementally longer. The data in Databricks is now nearly 15 hours behind from when it was collected/enriched/stored in kinesis. If we spawn new loaders within the same EC2 cluster this causes data to be duplicated and has no effect on the lag witnessed.

Are there any suggestions to reduce this lag?

I would recommend updating loader to the latest version or at least to 5.2.0, which introduced optimize schedules.

You can verify that it will improve your lag by running OPTIMIZE manually:

OPTIMIZE manifest ZORDER BY base.

Increasing window size would also help, as it will allow transformer to partition parquet more granularly, which in turn use cluster resources more efficiently. As spark would spawn one task per partition.

1 Like

Hello there! I should have included in my original question, we are running 5.2.0 and about to upgrade to 5.3.0. We had originally upgraded hoping it would decrease the lag as you mentioned.

Is this increasing the window size for the loader, transformer, or both?

I assume that even if that does decrease the lag, we will always have static lag for the length of time of the window, correct? For all the other loaders the time from collection to db destination has always been < 1min. Is there a reason the Databricks option is so slow by comparison? Unless I’m reading the documentation wrong, we’re asking the transformer and loader to both batch by 10+ minute windows leaving our data over 20 minutes older then when it was originally collected.

Hey!

Is this increasing the window size for the loader, transformer, or both?

Transformer. Loader does not have an aggregation window. If it is not busy, it will start loading as soon as it gets an SQS message from the transformer upstream, which is almost instant. If it is busy it will complete the current operation and consume next SQS message. Could you clarify what parameter do you refer to as loader window?

we will always have static lag for the length of time of the window, correct?

Correct.

Is there a reason the Databricks option is so slow by comparison?

There is no reason it should be slower.

  • Are you running stream or batch version of transformer?
  • Have your event volume increase recently?
  • In Databricks go to Compute → your cluster → Spark UI → find the COPY TO command that loader run → check how many tasks did it create?
  • In the same Spark UI are there any stages, that stand out as too long or too frequent? Issue could be with the overall cluster performance (dbt or custom models) as opposed to loading process.
  • Check the log4j-active.log in Driver Logs for errors. It is generally very noisy. I would be happy to look at that, if you can share part of it around the time of COPY command.

Also in spark UI see how long it took loader to run select from manifest .

Could you clarify what parameter do you refer to as loader window?

My bad, you’re right that the windowing is only on the transformer. We had a leftover variable that led me to believe that there was windowing on the loader as well. Glad to know it’s on a first-come-first-serve basis.

Are you running stream or batch version of transformer?

Stream transformer. This is connecting to our enriched kinesis stream.

Have your event volume increase recently?

Yes, we’ve had traffic spike multiple times in recent weeks. I suspect this is when the lag began to build.

In Databricks go to Compute → your cluster → Spark UI → find the COPY TO command that loader run → check how many tasks did it create?

Looks like it’s only ever making 1/1 task. Looking at the logs over the last few hours, most tasks are completing fairly quickly with the longest INSERT being around 67 ms. I certainly won’t rule out cluster performance as this is a POC Cluster, but I haven’t seen anything too alarming from the Spark UI.

Also in spark UI see how long it took loader to run select from manifest

This is taking on average 0.2 seconds.

Check the log4j-active.log in Driver Logs for errors.

Working on this now. It’s a bit timely to download the log files.

Follow up question: We recently tried to scale up the number of Databricks loaders in our environment but found that they were all processing the same SQS messages and causing duplication of data in Databricks for the events table. Is that expected behavior or is there something misconfigured that would cause multiple loaders to process the same data?

Looks like it’s only ever making 1/1 task.

1 task is a problem, it could lead to 100x worse performance. This is how it should look like(except for executor id being driver):

In my benchmarks databrick works best with 5x NUMBER OF CLUSTER CORES parquet partitions, 10x is about the same and anything higher has diminishing returns (but 50x is still a lot faster than single file).

I have cut a quickfix release 5.4.0-rc3 for you, which would allow to tune the number of events per parquet partition manually. Set the "output": { "rowsPerFile": N } in transformer config. N is your average number of events per window divided by the number of databricks cores, but not less than 1000. This is a quickfix and name or location of parameter might change later, it would be in release notes.

Note, that I have not tested this release fully.

We recently tried to scale up the number of Databricks loaders in our environment but found that they were all processing the same SQS messages and causing duplication of data in Databricks for the events table. Is that expected behavior or is there something misconfigured that would cause multiple loaders to process the same data?

This is expected. Loader is not load bearing application and doesn’t need scaling and was not designed for it. All the heavy lifting is done by the warehouse (databricks in this instance).

Thank you for that quickfix! I’ve pulled it into our pipeline and have been testing it this morning. One thing I’ve noticed is that most of our files aren’t even reaching 1k events. It seems like we have a ton of tiny files that the loader/cluster just can’t seem to catch up on. I’ve bumped windowing to try and cut down on the amount of files in hopes to catch up. I’m not sure if this is a miscalculation but our loader is saying the following:

INFO Load: 152 good events were loaded. It took minimum 247465 seconds and maximum 247760 seconds between the collector and warehouse for these events. It took 247759 seconds between the start of transformer and warehouse and 247458 seconds between the completion of transformer and warehouse

There’s no way this is taking 68 hrs~

It is important to narrow down the issue to transformer or loader/databricks cluster.

A good indicator is age of the oldest message in loader’s SQS queue. Old message means problem with loading. Based on the log line you provided it looks like this is the case. I would recommend to increase window size at least temporarily, to allow loader to catch up. How many items are in the queue? What is the dynamic there, is queue growing or reducing and at what pace?

If oldest message is new than problem is with transformer.

Looking at the queue itself, bumping up the windowing has allowed the number of visible messages in the queue to start trending downward. This seems to have no effect on the oldest message in the queue–which continues to trend upward.


Backlog is too long, so loader did not get to process longer windows yet (FIFO queue), so trend for age is continuing for now. It is too early to tell if increasing window is working. I would expect to see changes in age after 68 hours (your backlog) or longer given the trend.

It still worth investigating why loading takes over 5 min (your old aggregation window).
While single file is bad, loader’s log message only had 150 events, so shouldn’t matter if it one file or 100.

  • Could you share snippet of log4j-active.log?
  • And maybe loader’s log to match it?

I would think that 30min interval should capture at least one full job.

Apologies for the late response, but the good news is that we managed to get back to baseline for our loader. I’ve even lowered the window back down to five minutes and haven’t seen any increase in lag! I’ll continue to monitor, but it seems like the hotfix of having 1k events per file is working nicely. Thank you so much for all your assistance!

One final question, do you know if there’s a point in the future when the Databricks loader will be able to load invalid data? Judging by the current message structure, the stream transformer doesn’t seem to ever report via the SQS messages on “bad” or invalid data.

1 Like

Hey Katie,

That is excellent news. I am surprised that event limit worked, given the low event volume as per the loader message -154 events would still remain a single file. Perhaps, it was not representative for rest of the batches.

One final question, do you know if there’s a point in the future when the Databricks loader will be able to load invalid data?

Current strategy for dealing with bad data in snowplow is using event recovery. Which would modify the data and reinsert it into the pipeline.

Many sources of bad data exist in snowplow, collector, enrich and transformer itself. It also comes in many different categories, each requires different adjustment. How event that failed validation should be loaded? It is almost guaranteed that typecasting in transformer would fail.

Generalised recovery scenario required for each bad row type. It is not an easy task, but we are working on it. Recovery is one of the main focuses of our team this year.

Some progress had been made already, as of 5.3.0 transfomer+loader would automatically recover data, where schema evolution resulted in type error.

@stanch might be in better position to answer this question.

1 Like

Hi there,

As Pavel said, we want to make working with (and eventually recovering) bad data easier. I also echo his point that many different types of bad data exist, and actually their formats being so heterogeneous complicates both tasks. So it’s a “maybe, but we are not sure when yet” from me.

If I may ask, how were you thinking to use the bad data in Databricks?

  • Run SQL queries to analyze and report on various types of failed events (similar to what’s described here)?
  • Run SQL queries to massage invalid events and load them as valid events directly into the atomic events table?
  • Something different?
1 Like

Our biggest want for loading in the invalid data is so that we can generate reports on schema violations so that we can track down the appropriate teams and work on an implementation fix, like you mentioned in your first bullet point above. We currently have that functionality in Postgres and would like to move it do Databricks.

Being able to pull data from invalid and insert it back into the events table would also be a nice feature, but I’m not sure how often it would get used.