A key data processing stage in a Snowplow pipeline is event data modeling:
Event data modeling is the process of using business logic to aggregate over event-level data to produce ‘modeled’ data that is simpler for querying.
Typically Snowpow users have relied on Amazon Redshift for their event data modeling. The Snowplow pipeline ingests enriched events into Redshift, and then a series of SQL scripts, perhaps orchestrated by SQL Runner, has performed the aggregations, storing the results in new tables. A good example of this is Snowplow’s own web data model.
1. Challenges of SQL event data modeling with Redshift
Event data modeling in Redshift with SQL has worked well for many Snowplow users, particularly at small to medium event volumes, but there have been certain challenges, some around the use of SQL in general, and some around the use of Redshift itself:
1.1 Challenges of SQL
SQL is great for prototyping event data models, but it can be challenging to then put these models into production:
- SQL is difficult to unit test - leading to plenty of “debugging in production”
- SQL is difficult to modularise – leading to “copy-paste-itis”
- SQL is impossible to parameterise – you quickly end up with a user-specific fork of any given data model
1.2 Challenges of Redshift
Running data modeling processes in Amazon Redshift also comes with some challenges, particularly at very large event volumes:
- Loading all enriched events into Redshift and then running event data modeling processes can put a significant load on the Redshift database
- Redshift does not support elastically scaling compute independently of storage (unlike e.g. Snowflake), limiting options to tune the event data modeling performance
- Loading richly nested enriched event data into Redshift requires our shredding process, which is an costly operation in EMR, and leads to complex SQL
JOIN
s in Redshift
2. An alternative approach with Apache Spark
This tutorial presents one possible solution, using Apache Spark, Dataflow Runner and a Snowplow Analytics SDK to perform event data modeling.
This figure compares the dataflows for the Redshift- and Spark-based approaches:
Let’s get started with this tutorial, by setting out the event data modeling that we want to migrate to Spark.
3. Our example data model
Let’s imagine that we want to collect page views from a website, group visitors by country, count how many times particular page was viewed from each country, and then store the aggregated results for further analysis or visualization.
Suppose, we had this SQL query:
DROP TABLE IF EXISTS atomic.pageview_geo_summary;
CREATE TABLE atomic.pageview_geo_summary AS
SELECT
DATE_TRUNC('day', derived_tstamp) AS report_date,
page_url AS page,
geo_country AS country,
COUNT(*) AS num_views,
MAX(etl_tstamp) AS last_etl_tstamp
FROM
atomic.events
WHERE
event = 'page_view'
GROUP BY
report_date,
page,
country;
In our “canonical” batch pipeline, this event data model could be executed via SQL Runner inside a Factotum job, straight after successful EMR jobflow completion.
Each time this query is executed, it drops pageview_geo_summary
table and then recreates it from updated data in Redshift. It can be made incremental to improve performance, but in this tutorial we’ll try to gain much more performance and flexibility.
4. Building blocks of Spark data modeling
First we will introduce the building blocks of our Spark-based event data modeling.
4.1 Event transformation
The Snowplow Analytics SDKs are powerful tools to perform data-modeling in environments such as Apache Spark. They allow you to validate, access and extract canonical Snowplow properties from Snowplow enriched events.
As an example, you can transform an enriched event (in TSV line format) into a convenient JSON object with property names and pre-validated values.
Here’s an example, using the Snowplow Python Analytics SDK:
>>> import json
>>> import snowplow_analytics_sdk.event_transformer
>>> example_tsv = "demo\tweb\t2015-12-01T08:32:35.048Z\t2015-12-01T04:00:54.000Z\t2015-12-01T03:57:08.986Z\tpage_view\tf4b8dd3c-85ef-4c42-9207-11ef61b2a46\tco\tjs-2.5.0..."
>>> json_str = snowplow_analytics_sdk.event_transformer.transform(example_tsv)
>>> json.dumps(json)['v_tracker']
'js-2.5.0'
The event_transformer
converts the enriched event into a flattened JSON object, where each column assigned to predefined key from the canonical event model. The flatness of this object allows us to easily map this JSON object to table row.
4.2 Run manifests
Another feature we will make use of in this tutorial are Snowplow Run Manifests, as introduced in Scala Analytics SDK 0.2.0 and Python Analytics SDK 0.2.0
Snowplow’s batch pipeline marks folders as processed by moving these folders around different locations on Amazon S3. But file moves are quite problematic:
- They are time-consuming
- They are network-intensive
- They are error-prone - a failure to move a file will cause the job to fail and require manual intervention
- They only support one use-case at a time - you can’t have two distinct jobs moving the same files at the same time
Therefore, we decided to introduce new mechanism for optionally tracking a Snowplow pipeline’s progress: run manifests. Run manifests take form of a simple AWS DynamoDB table, plus an API found in the Analytics SDKs to help you read and update the manifest with processed folders.
Here’s a simple example of the manifests in action, checking for new un-processed folders in the Snowplow enriched event archive, and then processing them using user-provided process
function.
from snowplow_analytics_sdk.run_manifests import list_runids, RunManifests
# Create wrapper to access AWS DynamoDB table
run_manifests = RunManifests(dynamodb_client, 'acme-snowplow-run-manifests')
# Iterate folders with enriched events
# run_id takes form of 'enriched/good/run=2017-07-17-22-40-30'
for run_id in list_runids(s3_client, 's3a://acme-enriched-archive/enriched/good'):
# If folder is not yet processed - process and add to run manifest table
if not run_manifests.contains(run_id):
process(sc, run_id, sys.argv[1])
run_manifests.add(run_id)
else:
pass
Run manifests provide a lightweight and reliable mechanism to ensure our pipeline does not process folder more than once and doesn’t miss unprocessed events. And the API provided by the Analytics SDKs lets you work with the manifest via three simple functions: list_runids
, RunManifests.check
and RunManifests.add
.
4.3 Spark DataFrames
Apache Spark is a powerful framework for distributed computation, providing us with three distinct APIs: DataFrames, DataSets and RDDs. These APIs share a lot of the same functionality - for this example we’re going to use the DataFrames API, which is straightforward to map onto existing SQL code.
First, we need to load all enriched JSONs from the enriched events folder into a Spark DataFrame, where each JSON key conforms to a DataFrame column:
transformed = sc.textFile(enriched_folder).map(event_to_json)
df = spark.read.json(transformed)
Now, df
contains our DataFrame, with an on-fly derived SQL-like schema and ready for queries to be run.
The SQL query easily can be mapped to DataFrame with a few minor tweaks:
df.select(to_date(df.derived_tstamp).alias('report_date'),
df.page_url.alias('page'),
df.geo_country.alias('country'),
df.etl_tstamp)
.where(df.event == 'page_view')
.groupBy('report_date', 'page', 'country')
.agg(count("*").alias('num_views'), max("etl_tstamp").astype('timestamp').alias("last_etl_tstamp"))
To find out more about DataFrame API you can check out Spark Programming Guide.
4.4 Writing data to PostgreSQL
When we are event data modeling in Redshift, our aggregation outputs are typically written back into new tables in Redshift. With Spark, we can choose any database or blob storage to write our outputs to.
Given that our aggregated data volumes will not be significant, we can simply use Postgres for our aggregate store. Spark supports writing to relational databases via the Spark JDBC API:
url = "jdbc:postgresql://postgres.acme.com:5432/summaries"
properties = {'user' : "snowplow-model",
'password': password,
'driver': "org.postgresql.Driver",
'sslmode': "require"}
aggregated_events.write.jdbc(url, 'pageview_geo_summary', mode="append", properties=properties)
Having defined the DataFrame associated with aggregated_events
variable we can “dump” it to any JDBC-compatible database. Obviously, we will need a pre-existing database (summaries
in our case) plus a table (pageview_geo_summary
) with schema conforming our SQL query:
CREATE TABLE "pageview_geo_summary" (
"id" INTEGER SERIAL PRIMARY KEY,
"report_date" TIMESTAMP,
"page" VARCHAR(255),
"country" CHAR(2),
"last_etl_tstamp" TIMESTAMP WITHOUT TIME ZONE,
"num_views" INTEGER,
CONSTRAINT uniq_record UNIQUE(report_date, page, country, last_etl_tstamp),
);
The above DDL creates our table, which matches the output of our aggregation and has a defensive UNIQUE CONSTRAINT
check to ensures that the table cannot be loaded twice, even if the Spark job failed before correctly updating the manifest.
It’s worth highlighting an important difference between this data model and the original Redshift one. Redshift can group all page views into one day, using the derived_tstamp
, but the Spark job is only reading from a single enriched event folder; two different enriched event folders can easily contain report_date
s in common, which has two important consequences:
- We must have
last_etl_tstamp
in ourUNIQUE
constraint, otherwise it’ll be violated very soon - The ultimate reporting layer in Postgres will need to perform one more aggregation
Also, it’s very important to note that this approach works only for additive data points, for example total views per day, but not for non-additive data points, such as unique website visitors.
5. Putting it all together: our Spark job
Putting all of the above together, we can write the following PySpark job:
import json
import argparse
import boto3
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import to_date, max, count
import snowplow_analytics_sdk.event_transformer
from snowplow_analytics_sdk.run_manifests import *
APP_NAME = "Snowplow Page Views Data Modeling"
DYNAMODB_RUN_MANIFESTS_TABLE = 'acme-run-manifests'
ENRICHED_EVENTS_ARCHIVE = 's3a://acme-enriched-archive/enriched/good/'
def process(sc, run_id, password):
# Transform enriched events to JSON
full_path = ENRICHED_EVENTS_ARCHIVE + run_id + '*'
transformed = sc.textFile(full_path).map(event_to_json)
# Import JSON to Spark Dataframe
df = spark.read.json(transformed)
# Port of SQL modeling step
pageview_geo = df.select(to_date(df.derived_tstamp).alias('report_date'),
df.page_url.alias('page'),
df.geo_country.alias('country'),
df.etl_tstamp)
.where(df.event == 'page_view')
.groupBy('report_date', 'page', 'country')
.agg(count("*").alias('num_views'), max("etl_tstamp").astype('timestamp').alias("last_etl_tstamp"))
# Connect to PSQL DB
url = "jdbc:postgresql://postgres.acme.com:5432/summaries"
properties = {'user' : "snowplow-model",
'password': password,
'driver': "org.postgresql.Driver",
"sslmode": "require"}
aggregated_events.write.jdbc(url, 'pageview_geo_summary', mode="append", properties=properties)
def event_to_json(line):
json = snowplow_analytics_sdk.event_transformer.transform(line)
return json.dumps(json)
if __name__ == "__main__":
# Initialize run manifests
session = boto3.Session()
s3_client = session.client('s3')
dynamodb_client = session.client('dynamodb', region_name='us-east-1')
run_manifests = RunManifests(dynamodb_client, DYNAMODB_RUN_MANIFESTS_TABLE)
# Initialize Spark job
conf = SparkConf().setAppName(APP_NAME).setMaster('local[*]')
sc = SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()
# Traversing unprocessed run folders in enriched archive
run_ids = list_runids(s3_client, ENRICHED_EVENTS_ARCHIVE)
for run_id in run_ids:
if not run_manifests.contains(run_id):
process(sc, run_id, sys.argv[1])
run_manifests.add(run_id)
else:
pass
This self-contained job can be saved on your S3 bucket as s3://acme-snowplow/jobs/page_views.py
for further use in Elastic MapReduce.
6. Running our job: Dataflow Runner
The last ingredient in our data modeling implementation is Dataflow Runner, a Snowplow tool intended to replace EmrEtlRunner in the future, as per our RFC.
Dataflow Runner lets you create and orchestrate cloud computing clusters such as Elastic MapReduce. It’s a lightweight and very configurable tool, without any Snowplow-specific logic
You can download Dataflow Runner or learn more about it on project’s wiki.
6.1 Dataflow Runner cluster configuration
Our configuration consists of two files. The first one is the cluster configuration, cluster.json
:
{
"schema": "iglu:com.snowplowanalytics.dataflowrunner/ClusterConfig/avro/1-1-0",
"data": {
"name": "Spark Pageview by country data-modeling",
"logUri": "s3://acme-snowplow/logs/",
"region": "us-east-1",
"credentials": {
"accessKeyId": "env",
"secretAccessKey": "env"
},
"roles": {
"jobflow": "EMR_EC2_DefaultRole",
"service": "EMR_DefaultRole"
},
"ec2": {
"amiVersion": "5.5.0",
"location": {
"vpc": {
"subnetId": null
}
},
"instances": {
"master": {
"type": "m3.xlarge"
},
"core": {
"type": "c3.8xlarge",
"count": 1
},
"task": {
"type": "m1.medium",
"count": 0,
"bid": "0.015"
}
}
},
"tags": [ ],
"bootstrapActionConfigs": [
{
"name": "Installing Python dependencies",
"scriptBootstrapAction": {
"path": "s3://acme-snowplow/scripts/snowplow-pyspark-bootstrap.sh",
"args": []
}
}
],
"configurations": [
{
"classification": "core-site",
"properties": {
"Io.file.buffer.size": "65536"
}
},
{
"classification": "mapred-site",
"properties": {
"Mapreduce.user.classpath.first": "true"
}
},
{
"classification": "yarn-site",
"properties": {
"yarn.resourcemanager.am.max-attempts": "1"
}
},
{
"classification": "spark",
"properties": {
"maximizeResourceAllocation": "true"
}
}
],
"applications": [ "Hadoop", "Spark" ]
}
}
To use this cluster configuration file yourself, just change the bucket name in logUri
and update the bootstrapActionConfigs
. Notice that credentials are written as env
, which means they will need to be set as AWS_ACCESS_KEY
and AWS_SECRET_KEY
environment variables, or else hardcoded right in cluster.json
otherwise.
6.2 PySpark bootstrap script
The “Installing Python Depencies” bootstrap action references a simple shell-script, required to install dependencies (only the Analytics SDK in our case) on Spark nodes:
#!/bin/sh
set -e
# Script to install necessary Python dependencies
sudo pip install snowplow_analytics_sdk==0.2.3
If your data-modeling step requires any other Python libraries, you should install them using this bootstrap-script.
The file needs to be uploaded to S3 as snowplow-pyspark-bootstrap.sh
.
6.3 Dataflow Runner playbook
Oure second Dataflow Runner configuration file, playbook.json
, is responsible for running the actual data-modeling step:
{
"schema": "iglu:com.snowplowanalytics.dataflowrunner/PlaybookConfig/avro/1-0-0",
"data": {
"region": "us-east-1",
"credentials": {
"accessKeyId": "env",
"secretAccessKey": "env"
},
"steps": [
{
"type": "CUSTOM_JAR",
"name": "PySpark load",
"actionOnFailure": "CANCEL_AND_WAIT",
"jar": "command-runner.jar",
"arguments": [
"spark-submit",
"--deploy-mode",
"cluster",
"--packages",
"org.postgresql:postgresql:42.0.0.jre7",
"s3://acme-snowplow/scripts/snowplow.py",
"{{.dbPassword}}"
]
}
]
}
}
This playbook tells Dataflow Runner to submit a jobflow step for our PySpark job to the EMR cluster.
6.4 Running the EMR jobflow via Dataflow Runner
After you have prepared your Dataflow Runner configurations and uploaded all the necessary files to S3, you can launch Dataflow Runner:
$ export AWS_ACCESS_KEY=ABCDEKIA
$ export AWS_SECRET_KEY=abcdefgh1234
$ DB_PASSWORD=secret
$ dataflow-runner run-transient --emr-config cluster.json --emr-playbook playbook.json --vars dbPassword,$DB_PASSWORD
We are passing the database password as the first argument to PySpark job to avoid storing sensitive credentials on S3; environment variables and --vars
option allows you to avoid hard-coding credentials into configuration.
Dataflow Runner can be scheduled via cron or even launched after the EmrEtlRunner step via Factotum.
7. Conclusion
Here is the overall process we have put together:
And that’s it! You now have all the necessary tools to perform event data modeling for your Snowplow event data using Apache Spark, storing the results in an inexpensive Postgres database for later analysis, visualization or processing.
Of course this pipeline is not limited to simple aggregations - with the full power of Apache Spark, you’re free to perform very complex event data modeling on Snowplow enriched events, all the way up to machine-learning-based approaches.