Making Redshift Use Merge Joins Instead of Slower Hash Joins

We are loading our Snowplow data into Redshift using the rdb-loader-redshift container.

The problem we have is that queries are quite slow when incorporating even the most basic joins.

For example, we presently only have about 50M rows in the events and nl_basjes_yauaa_context_1 tables, but the following basic query takes over 30 seconds to complete. (Our actual queries with a few joins in them take over 10 minutes, even with this minimal data set.)

SELECT e.event_id, yauaa.device_class
FROM events e
LEFT JOIN nl_basjes_yauaa_context_1 yauaa ON e.event_id = yauaa.root_id
ORDER BY e.collector_tstamp DESC
LIMIT 100;

In looking at the EXPLAIN output, I noticed that Redshift is doing a “hash join” in this query, which seems to be pretty much solely responsible for the slowness.

I noticed that by default, the event_id or root_id is set as the Redshift distribution key and collector_tstamp is set as the Redshift sort key. It seems like perhaps that setup should enable a merge join, and yet it’s not happening.

After discovering this stack overflow post, I tried running full vacuums on the two tables:

VACUUM FULL events TO 100 PERCENT;
VACUUM FULL nl_basjes_yauaa_context_1 TO 100 PERCENT;

But Redshift is still using a hash join, even immediately after running the vacuum, before inserting more data.

Perhaps based on the Redshift best practices, it would be possible if the merge key and sort key were the same, although that seems like it may have other undesirable consequences.

My question is: Is it possible to adjust the Redshift configuration in some way to use a merge join and speed up my queries? And if so, how could I configure docker and terraform to deploy that configuration?

Hi @WTravisH are you able to try adding a second condition to your join that is e.collector_tstamp = yauaa.root_tstamp and see if that improves the performance? We tend to do this internally and it might make RS convert it to a merge join.

Keeping the collector_tstamp as a sort key has huge performance improvements over time as you want to filter to more recent/a specific period of data, so I’d recommend against changing the key, but would recommend adding a filter on it where possible in your queries, particularly as your data volume grows history.

Thanks Ryan. Unfortunately that seems to have actually doubled the cost, even after a fresh vacuum. Still not doing a merge join.

Totally get your point about leaving the timestamp as the sort key, and obviously can’t join on the timestamp alone.

Any other ideas? Or is a hash join inevitable? Would it make sense to add an auto-increment key that could function as a sort and distribution key?

It’s a while since I’ve done any serious work with Redshift SQL, so this might be outdated, but if my memory is correct, the table scans in question won’t be limited unless you specifically restrict the sortkey - limit only restricts the number of results produced.

I think things should improve if you add a WHERE e.collector_tstamp > {some time} AND yauaa.root_tstamp > {some_time} ... (one for each table involved).

Additionally, adding the tstamps to the join as Ryan suggests is also advisable, but for a different reason - the semantics are at least once delivery, so you can have duplicates. Typically most dupes won’t have these tstamps duplicated so this avoids a potential cartesian explosion.

A final word on it is that in the overall workflow design, we recommend using incremental data modeling to produce derived tables, which is where most querying activity would happen. Snowplow gives you event-level data which is awesome for lots of things but regular querying isn’t one! This way you solve the efficiency issue only once.

2 Likes

Noted. Glad you mentioned the at least once delivery. I’ll make that change just for accuracy’s sake.

Redshift has materialized views, which could add another layer of data modeling without too much work. Refreshing the views will be slow, but querying should be quite fast.

In our case, I don’t think we need much beyond that in terms of derived tables.

Still interested in if there’s any way to force merge joins (if only just to speed up materialized view generation) if anyone has ideas.

I think you won’t be able to get a merge join, not without changing your sortkey, but given you need the collector_tstamp anyway this is unlikely to be worth it. After vacuuming did you also run analyze in your warehouse?

If you run explain select... does the first line show a DS_DIST_NONE type hash join? This is the best one as it basically means it doesn’t need to redistribute your data to do the join. Hash joins aren’t intrinsically bad (unlike the much feared nested loops), only if the data needs distributing.

To Colm’s point, processing your data in an incremental way is going to be a HUGE benefit, even more so the more your data grows, a materialized view will reprocess the data each time to refresh the view, compared to an incremental approach that would only process new events since the last refresh.

Finally, just a word of warning whichever route you take to be careful about joining any contexts that may contain multiple entities (yauaa never will, but you may have some custom ones) as even after de-duplication this can still lead to a 1:many join.

Yes, it does show DS_DIST_NONE (not on the first line, but when it shows the join), so perhaps there’s no problem as long as we are always filtering by collector_tstamp.

If we get to the point where we need to add another layer with incremental updates, that def seems like the best approach. But materialized views will work for quite a while.

Here’s the explain output for the query in my original post, immediately after running a full vacuum and analyze:

XN Limit  (cost=1000031461801.64..1000031461801.89 rows=100 width=57)	
  ->  XN Merge  (cost=1000031461801.64..1000031582361.57 rows=48223972 width=57)	
        Merge Key: e.collector_tstamp	
        ->  XN Network  (cost=1000031461801.64..1000031582361.57 rows=48223972 width=57)	
              Send to leader	
              ->  XN Sort  (cost=1000031461801.64..1000031582361.57 rows=48223972 width=57)	
                    Sort Key: e.collector_tstamp	
                    ->  XN Hash Right Join DS_DIST_NONE  (cost=602799.65..25307639.86 rows=48223972 width=57)	
                          Hash Cond: ("outer".root_id = "inner".event_id)	
                          ->  XN Seq Scan on nl_basjes_yauaa_context_1 yauaa  (cost=0.00..482239.72 rows=48223972 width=49)	
                          ->  XN Hash  (cost=482239.72..482239.72 rows=48223972 width=48)	
                                ->  XN Seq Scan on events e  (cost=0.00..482239.72 rows=48223972 width=48)	

There’s a pretty large costs associated with an order by, even on a sortkey field, especially across this many records. If possible I would leave the order by for any downstream queries on your view (that are hopefully using filters) rather than in the view generation itself.

Don’t forget to set a distkey and sortkey on your materialized view so filters on it are also optimised!

We have been working on a similar project to make merge joins happen. If you alter the sortkey of all context tables to be (root_tstamp, event_id) and the event table to be (collector_tstamp, event_id) it does allow redshift to do a merge join; as long as you do the recommended join using both event and time.

This allows the tables to still be sorted by time, we have found that most queries are faster. Redshift’s EXPLAIN of returning the first-row cost as 0 is a bit optimistic.

Another thing we found is that some queries that take a small number of events and then join a full context table can be very slow if the merge join is done on disk; while the same hash join is fast because the smaller table hash can be done in memory.

The fix to this is to add a predicate on the tstamp to tables if there not there already. The predicate can be much wider than you normally have to with Redshift, we tested with a full year and it was ok but it any longer went to disk.