It seems to be impossible to run multiple Databricks Loaders from the same Transformation Queue, because SQS FIFO only allow one consumer per message group.
What is the reason behind the requirement for the SQS to be of type FIFO and could it be change so that we can run multiple databricks loaders to improve ingestion speed?
Below references to documentation stating this requirement:
Hi @BrianKjaerskov, I’ll comment first on your point about running multiple databricks loaders:
The RDB loader is designed for running just a single loader at a time, and we find this is the most efficient way to get data loaded.
The bottleneck for the loader is almost always the size of the cluster receiving the data. i.e. your Databricks compute cluster (or for other users, the Redshift compute cluster or Snowflake compute cluster). If you want to load events more quickly, then pick a larger Databricks compute cluster.
To explain why this is the case… The RDB Loader doesn’t do much “work” itself; all it does is issue the load statements to Databricks. All the hard work of loading is run on the Databricks compute cluster. If you run multiple loader apps, then you will find the loaders are competing to issue load statements to the compute cluster. This means the Databricks cluster is not able to dedicate its full resource to each load. (You might also run into problems with table locks – but I’m not 100% sure about this, I’ve never tried it).
To answer your original question… I have never tried using a non-FIFO queue with the loader. We like the FIFO queue because it prioritises the oldest events first, to avoid problems of missing events if there’s a backlog.
I would be very interested to get your feedback on the single-loader thing. If you find the single loader is not working well for you then I would be interested to hear about your setup, e.g. event volume, size of a typical batch, Databricks cluster configuration.
3 Likes
Hi @istreeter
Thanks for your reply.
I am somewhat aware about how the Databricks Loader works and the limited work it has to do. However my assumption were, that by running multiple loaders, we could thereby have more worker nodes on our Databricks cluster loading data in parallel.
Since I submitted this post, we managed solve our issue with the loader/databricks cluster not being able to keep up with the number of messages delivered by the transformer, by increasing the transformer windowing time from 5 to 15 minutes (wondering what the optimal windowing configuration is for high data volumes, is bigger, better?)
Our current setup is:
Transformer:
1 instance
windowing: 15 minutes (was 5 at the time I submitted the issue)
other settings is default
Databricks setup:
1-8 Workers
61-488 GB Memory
8-64 Cores
1 Driver
61 GB Memory, 8 Cores
Where can I see the batch size ?
@BrianKjaerskov One of the reasons for the increased performance with the longer window would be the number of parquet files behind the events
table.
There is a diminishing return on the number of files behind the events table, so it might a good idea to keep that in check.
You could try optimizing the events
with something like:
OPTIMIZE snowplow.events
WHERE collector_tstamp_date >= current_timestamp() - INTERVAL 86400 second
ZORDER BY collector_tstamp
Are you sure that the COPY INTO
statement that the loader issues doesn’t distribute load across workers? The events table is partitioned, so it would make sense for the databricks to split the load per partition. I haven’t tested it myself, this is just a speculation.
Hi @Pavel_Voropaev
Thanks for your thoughts on the windowing config and for sharing the optimize script, I put this in to a daily schedule on a Databricks Job cluster.
Regarding the COPY INTO statement. From what I can see in the Databricks console there is ever only on worker running at the time when the loader is running. So I don’t think it distributes the load.
Hey @BrianKjaerskov, we’ve opened an internal investigation not this issue and I am working on it atm. Will keep you updated in this thread.
In the meantime, you could also OPTIMIZE snowplow.manifest ZORDER BY base
. It is a small table so there is no time constraint on the statement. But it had been shown to slow the loads after ~5,000 batches.
Hey @BrianKjaerskov, would you mind checking the Spark UI. I am curious as to how many Tasks
did COPY TO ..events...
stage produce.
In your case, I would expect to see many different Executor ID
s. My example is from a single node cluster: