Making SQL data models incremental to improve performance [tutorial]

Loading events into a data warehouse is almost never a goal in and of itself. The companies that take full advantage of their Snowplow data all have a data modeling step that transforms and aggregates events into a set of derived tables, which are then made available to end users in the business.

For more information on event data modeling:

Most Snowplow users load their data into Redshift and implement their data models in SQL (using either SQL Runner or a BI tool such as Looker). These data models often work like this: each time new data is loaded into Redshift, the derived tables are dropped (or truncated) and rebuilt using all historical data.

This approach has several benefits. For instance, it makes it possible to iterate on the data model without having to update the existing derived tables. However, because the time it takes to build the derived tables goes up with the number of events in Redshift, there will be a point when rebuilding the tables from scratch each time will start to take too long.

One solution is to move the data models out of Redshift and rewrite them to run on Spark instead. We have written about this before and it’s something we will keep exploring. This tutorial instead focuses on improving the performance in Redshift – in particular on how to update a set of derived tables rather than rebuild them from scratch with each run.

1. The example data model

We’ll use a simple SQL data model to illustrate how one would go about updating a derived table rather than rebuilding it with each run. The data model itself is simple, but the queries that make it incremental also work for a more complex data model.

Let’s create a simple sessions table. Each time the Snowplow pipeline runs, we create a new sessions table using all events in atomic.events, drop the old table, and rename the new table:

CREATE SCHEMA IF NOT EXISTS derived;

CREATE TABLE derived.sessions_new
  DISTKEY(session_id)
  SORTKEY(tstamp)
AS (
  SELECT
    domain_sessionid AS session_id,
    MIN(derived_tstamp) AS tstamp
  FROM atomic.events
  GROUP BY 1
);

DROP TABLE derived.sessions;
ALTER TABLE derived.sessions_new RENAME TO sessions;

In the next section, we’ll make this example model incremental.

2. Moving to incremental updates

2.1 Preparation

Rebuild the sessions table one last time

The sessions table needs to exist before we can start updating it, so build the table from scratch one last time.

Create a manifest

Next, create a table (derived.etl_tstamps) that will be used to keep track of which events have been processed. We use the etl_tstamp for this.

CREATE SCHEMA IF NOT EXISTS scratch;

CREATE TABLE derived.etl_tstamps (etl_tstamp timestamp encode lzo) DISTSTYLE ALL;

INSERT INTO derived.etl_tstamps (SELECT DISTINCT etl_tstamp FROM atomic.events);

CREATE TABLE scratch.sessions (LIKE derived.sessions);
CREATE TABLE scratch.etl_tstamps (LIKE derived.etl_tstamps);

2.2 Incremental updates

The incremental update happens in a couple of steps.

Step 0: truncate the tables in scratch

TRUNCATE scratch.etl_tstamps; 
TRUNCATE scratch.sessions;

ANALYZE scratch.etl_tstamps;
ANALYZE scratch.sessions;

Step 1: select all ETL timestamps that are not in the manifest

INSERT INTO scratch.etl_tstamps (

  WITH recent_etl_tstamps AS ( -- return all recent ETL timestamps

    SELECT
      DISTINCT etl_tstamp
    FROM atomic.events
    WHERE collector_tstamp > DATEADD(week, -1, CURRENT_DATE) -- restrict table scan to the last week (SORTKEY)
    ORDER BY 1

  )

  -- return all ETL timestamps that are not in the manifest (i.e. that have NOT been processed)

  SELECT
    DISTINCT etl_tstamp
  FROM recent_etl_tstamps
  WHERE etl_tstamp NOT IN (SELECT etl_tstamp FROM derived.etl_tstamps ORDER BY 1)
  ORDER BY 1

);

Note that we filter on collector_tstamp. This same filter will be used in subsequent queries as well. Because the atomic tables are sorted on collector_tstamp, Redshift will be able to skip almost all blocks because the events in them are older than 1 week, therefore returning the results much faster.

The default in this case is 1 week, because this is:

  • is restrictive enough to speed things up
  • provides enough margin if things break

If either the pipeline or the SQL break for some reason, the problem will need to be resolved within one week or some batches that still need to be processed will be excluded (the filter can of course be updated if that were to happen).

Step 2a: create a list of session ID

Select all session ID that have at least one event in the batches (or batches) that we want to process.

INSERT INTO scratch.session_id (

  SELECT
    DISTINCT session_id AS id
  FROM atomic.events
  WHERE collector_tstamp > DATEADD(week, -1, CURRENT_DATE) -- restrict table scan to the last week (SORTKEY)
    AND etl_tstamp IN (SELECT etl_tstamp FROM scratch.etl_tstamps ORDER BY 1)
  ORDER BY 1

);

Step 2b: create a list of event ID

In some cases, it’s easier if we can restrict on the event ID, so we don’t need to join to atomic.events to get the session ID (e.g. in the unstructured event or context tables).

INSERT INTO scratch.event_id (

  -- XX% are within 48 hours
  -- YY% are within the last week so we don't look at older events

  SELECT
    event_id AS id
  FROM atomic.events
  WHERE collector_tstamp > DATEADD(week, -1, CURRENT_DATE) -- restrict table scan to the last week (SORTKEY)
    AND domain_sessionid IN (SELECT id FROM scratch.session_id ORDER BY 1)
  ORDER BY 1

);

There are a couple of important things to note here.

First, some sessions will have events that were processed in earlier runs. This will happen when a session was still active when the pipeline last ran, or if not all events had arrived yet. These sessions already have a row in derived.sessions, which we will drop just before we update the table. To prevent mistakes, the model recomputes sessions in full (i.e. using all events with that session ID, not just the events that haven’t been processed before), but only those sessions that have at least one unprocessed event.

However, we also restrict on collector_tstamp, which limits how far we can look back in time. This will introduce a small number of mistakes – for instance if a session lasts for more than 1 week, or if an event arrives more than 1 week late – but it can also speed up the query by 2 orders of magnitude.

We recommend running a quick analysis on the distribution to determine what is acceptable. Based on what we have seen with sessions, more than 99.99% of sessions fall entirely within the 1 week range.

Step 3: aggregate the sessions

INSERT INTO scratch.sessions (

  SELECT
    domain_sessionid,
    COUNT(*)
  FROM atomic.events
  WHERE collector_tstamp > DATEADD(week, -1, CURRENT_DATE) -- restrict table scan to the last week (SORTKEY)
    AND domain_sessionid IN (SELECT id FROM scratch.session_id ORDER BY 1) -- restrict to session ID we need to recalculate
  GROUP BY 1
  ORDER BY 1

);

Step 4: commit into the derived table

DELETE FROM derived.sessions WHERE domain_sessionid IN (SELECT id FROM scratch.session_id ORDER BY 1);
ALTER TABLE derived.sessions APPEND FROM scratch.sessions;

...

ALTER TABLE derived.etl_tstamps APPEND FROM scratch.etl_tstamps;

Because we do a DELETE FROM, the sessions table will need to be vacuumed.

2.3 Final notes

The restrictions on collector_tstamp (and root_tstamp) are, to a large extent, what keep the queries fast. However, it places some constraints on what tables can be updated like this. It works well for a sessions table because sessions never last for weeks at a time. It’s a bit different if the aggregation is happening across longer timeframes (e.g. at the visitor level). In that case, it’s still possible to move the model to incremental updates, but the table that is updated will have to be a bit different. For example, in the case of visitors, the table might have one row per visitor per day rather than one row per visitor.

2.4 Questions or feedback

Let us know below if we made any mistakes, missed anything, or if you have any feedback of questions!

Make sure to also check out this related post:

http://discourse.snowplow.io/t/tracking-how-sql-data-models-perform-tutorial/330

3 Likes

Hey @christophe!

This makes a lot of sense - thanks for sharing. How updated is the current web-incremental model?

Do you recommend using it as-is, or building the models ourselves with the concepts explained in this post?

Thanks!

Hi @bernardosrulzon,

I’d recommend building the model yourself with the concepts explained here. The example on GitHub is outdated and will be deleted with the next Snowplow release.

Christophe

Hey @christophe this is great.

For this part

CREATE TABLE derived.sessions_new (LIKE derived.sessions);

INSERT INTO derived.sessions_new (
  SELECT
    domain_sessionid AS session_id,
    MIN(derived_tstamp) AS tstamp
  FROM atomic.events
  GROUP BY 1
);

DROP TABLE derived.sessions;
ALTER TABLE derived.sessions_new RENAME TO sessions; 

Would you recommend running this inside a single transaction… If so, are there any other transaction boundaries you would recommend for this general framework?

@christophe all of the other snippets in these examples show the statements for creating these tables along with their sort and dist keys. Do the session_id and event_id also need to be created somewhere in this process? Should we drop/create/populate the table in one statement?

This statement throws errors when run. There is no session_id or tstamp fields in this table we are creating

Hi @ryanrozich - those are all good questions!

I’d run as little as possible in a transaction (it does add some overhead).

If something goes wrong with the first or second step, no damage is done and the solution is either to re-run or fix the problem and re-run, depending on what went wrong. I’ll update the post and add DROP TABLE IF EXISTS derived.session_new before the CREATE TABLE statement to handle that edge case.

If the third or fourth step fail, we cannot re-run without recreating the sessions table because CREATE TABLE derived.sessions_new (LIKE derived.sessions) requires it to exist.

There are 2 solutions:

  1. Run steps 3 and 4 in a single transaction;
  2. Remove CREATE TABLE ... LIKE and use CREATE TABLE ... AS instead.

Redshift didn’t use to add compression encodings when creating a table using CREATE TABLE ... AS, but that was changed with a recent update. Redshift now uses LZO compression wherever possible so there’s no longer a benefit in running 1 and 2 as separate steps.

Now that Redshift does add compression encodings, I’d replace INSERT INTO with CREATE TABLE scratch.session_id AS (...) and use DISTSTYLE ALL.

Ah - that’s a mistake! It should be DISTSTYLE ALL. I’ll change that too.

Thanks for the feedback,

Christophe

Hi @christophe as part of our incremental updates, we do this VACUUM step after the commit, and it is one of the longest steps in the process. We are currently only operating on 2-3 weeks of data and I’m wondering whether the time to vaccum our tables will increase as the tables get bigger over time.

Do you have any suggestions on dealing with this? Specifically:

(a) If we run incremental modeling every 1-2 hours, do you suggest having this vacuum step on the critical path of the modeling process so that it happens on every run? Or, would it be better to do this less frequently and as a separate process that runs every 1-2 days?

(b) Do you suggest doing a VACUUM FULL as part of this or another type of VACUUM (like DELETE ONLY)

Hi @ryanrozich,

A couple of recommendations:

1. Remove the vacuum step from the critical path

As a general rule removing as many steps from the critical path as possible and have them run as standalone pipelines / DAGs: that way any issues with those steps wont impact on the critical path of processing data and making that data available.

2. Run a separate Redshift-defrag process

The folks at AWS provide a handy Analyze & Vacuum Utility. This can be setup to run regularly e.g. during evenings or weekends to keep your Redshift tables well maintained. It is more intelligent than running a VACUUM statement because it only runs one when a table reaches a certain set of fragmentation thresholds.

We’ve found that if we run the utility frequently (i.e. daily) it run relatively quickly, so doesn’t adversely impact other database operations.

Does that help?

This thread is already a little bit older, but I hope this method is still valid. I want to ask, how would be the propsed approach with multiple playbooks, because then we either must have multiple etl_tstamps tables or change the table in a way, that we have (etl_tstamp,playbook_name) columns, so that each playbook has it’s on etl_tstamp, so we know which playbook consumed already which etl_tstamps right?

Example for the new etl_tstamp table:
etl_tstamp, playbook_name 2017-01-01 users 2017-01-02 users 2017-01-02 shopping_cart 2017-01-03 customer
Are there other suggestions?

Hi @tclass,

A bit late, but I’d indeed keep different manifests. How did you end up doing it?

No problem :slight_smile: we went with the table (etl_tstamp, playbook_name) works without a problem.