One of the features that makes Snowplow unique is that we report bad data: events that hit the collector but fail to be processed (e.g. because the event fails validation). This is valuable because it allows our users to:
- spot data tracking issues that emerge, often during testing, and address them at the source
- have a high degree of confidence that trends in the data reflect trends in the business, not data issues
- recover events that failed to be processed
Most Snowplow users load their bad rows into Elasticsearch. For more information on how to use Elasticsearch and Kibana to debug bad rows, or how to recover events, visit these tutorials:
- Debugging bad rows in Elasticsearch and Kibana
- Debugging bad rows in Elasticsearch using curl (without Kibana)
- Recovering events with a missing schema (documentation)
In this tutorial, we will show how to debug bad data using Spark. We recommend this approach when there is the need to investigate large numbers of historical bad rows that weren’t loaded into Elasticsearch as part of the regular runs.
1. Debugging bad rows in Spark and Zeppelin on EMR
Apache Zeppelin is an open source project which allows users to create and share web-based notebooks for data exploration in Spark. We are a big fan of notebooks at Snowplow because of their interactive and collaborative nature. Debugging bad rows is a good use case.
To get Zeppelin up and running, follow sections 1 to 3 in the following tutorial: Loading Snowplow events into Apache Spark and Zeppelin on EMR.
You should now have the Zeppelin UI open in a browser.
1.1 Loading bad rows from S3 into Spark
To load bad rows from S3 into Spark, create a new note and paste the following:
%spark
sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "<REPLACE_WITH_ACCESS_KEY>")
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "<REPLACE_WITH_SECRET_ACCESS_KEY>")
val input = sc.textFile("s3n://snowplow-enrichment-output/enriched/bad/run=201{5-1[12],6-[0-3][0-9]}*")
You’ll need to replace the placeholders and enter the correct path. This particular regular expression will load 5 months worth of bad data, from November 2015 to March 2016 (I recommend estimating the size of the bad rows before attempting to loading them all). The regular expression rules are:
-
*
(match 0 or more character) -
?
(match single character) -
[ab]
(character class) -
[^ab]
(negated character class) -
[a-b]
(character range) -
{a,b}
(alternation) -
\c
(escape character)
Next, create a SQLContext to transform the RDD into a Spark DataFrame:
import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val df = sqlContext.read.json(input)
import org.apache.spark.sql.functions._
1.2 Exploring the bad rows
For those that are not familiar with the bad row format, start with printing the schema.
df.printSchema
root
|-- errors: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- level: string (nullable = true)
| | |-- message: string (nullable = true)
|-- failure_tstamp: string (nullable = true)
|-- line: string (nullable = true)
The field we are most interested in is errors.message
as it will tell us what caused the row to fail. The failure_tstamp
lets us know when bad row was generated, and line
contains the original row (the one failed to be processed).
Here is what a bad row looks like in its original JSON format:
{
"line": "2016-03-02\t19:00:47\t-\t-\tXX.XXX.XX.XXX\tOPTIONS\tXX.XXX.XX.XXX\t/com.snowplowanalytics.snowplow/tp2\t200\thttps://www.website.com/\tMozilla%2F5.0+%28compatible%3B+Googlebot%2F2.1%3B+%2Bhttp%3A%2F%2Fwww.google.com%2Fbot.html%29\t&cv=clj-1.1.0-tom-0.2.0&nuid=-\t-\t-\t-\t-\t-",
"errors": [{
"level": "error",
"message": "Unrecognized event [null]"
}],
"failure_tstamp": "2016-03-03T07:02:46.465Z"
}
To count the total number of bad rows, run:
df.count
We can also use the failure_tstamp
to find out how this number varies over time.
df.
select(to_date($"failure_tstamp").alias("date")).
groupBy("date").
agg(count("date").alias("count")).
sort("date").
show(100)
+----------+-----+
| date|count|
+----------+-----+
|2016-03-25| 1640|
|2016-03-26| 2728|
+----------+-----+
1.3 Excluding all known bad rows
One of the interesting things that jumps out when investigating bad rows is that there is a fair amount of data that did not originate from our trackers or webhooks. Instead, it is data from (often malicious) bots pinging the internet looking for vulnerabilities.
The 2 main examples are bad rows that contain the following error messages:
does not match (/)vendor/version(/) pattern nor is a legacy /i(ce.png) request
Unrecognized event [null]
The first are requests to paths the collector does not support. The second are OPTIONS requests that are made when the Javascript tracker sends data via POST. It’s necessary for the browser to send an OPTIONS request prior to issuing the POST request with the actual data (this is a CORS requirement). We plan to stop logging OPTIONS request in a future release.
Both kinds of bad rows can be ignored as none of them represent failed attempts to send actual data. We can filter them out and create a new DataFrame with all remaining bad rows:
val df_filtered = df.
select($"errors.message"(0).alias("message"), $"failure_tstamp", $"line").
filter(not($"message".contains("does not match (/)vendor/version(/) pattern nor is a legacy /i(ce.png) request"))).
filter(not($"message".contains("Unrecognized event [null]")))
The remaining rows should all be genuine bad data, i.e. events that did originate from our trackers or webhooks, so we need to drill into what’s left to unpick the errors.
1.4 Diagnosing all remaining bad rows
The process of diagnosing the remaining bad rows is an iterative one:
- pick a common error message
- investigate the message and decide on a solution:
- either resolve the issue (e.g. in the tracker) so no new bad rows are created
- or ignore the error if it doesn’t require a fix
- filter all bad rows with that same message from the DataFrame
That said, it’s not trivial to aggregate error messages. Take these two examples of actual error messages:
Provided URI string [http://www.mycompany.com/?utm_source=affilinet&utm_medium=affiliate&utm_term=%MEDIANAME%&utm_term=581176] could not be parsed by Netaporter: [Malformed escape pair at index 77: http://www.mycompany.com/?utm_source=affilinet&utm_medium=affiliate&utm_term=%MEDIANAME%&utm_term=581176]
Provided URI string [http://www.company-shopping.com/redirect/0824602_r1663866/sg/4/s_id/8?origin=r14s&s_evar6=cebqhvgf-zbfnvdhr&s_evar7=pngrtbevr:%2Sirgrzragf_p1%2Syvatrevr-srzzr_p1354%2Syvatrevr-frkl-srzzr_p1379%2Sahvfrggr-frkl-srzzr_p1381] could not be parsed by Netaporter: [Malformed escape pair at index 126: http://www.company-shopping.com/redirect/0824602_r1663866/sg/4/s_id/8?origin=r14s&s_evar6=cebqhvgf-zbfnvdhr&s_evar7=pngrtbevr:%2Sirgrzragf_p1%2Syvatrevr-srzzr_p1354%2Syvatrevr-frkl-srzzr_p1379%2Sahvfrggr-frkl-srzzr_p1381]
If we were to aggregate the DataFrame on errors.message
, these two rows wouldn’t be grouped together, even though the issue is the same: there was a problem parsing the URI (a non-url-encoded character in a query string value). We plan to change to improve how error messages are structured in a future release.
To see the 25 most common error messages, run:
df_filtered.
groupBy("message").
agg(count("message").alias("count")).
sort(desc("count")).
show(25)
Let’s pick a particular error message (could not be parsed by Netaporter
) and see how often it occurs:
df_filtered.filter($"message".contains("could not be parsed by Netaporter")).count
df_filtered.
filter($"message".contains("could not be parsed by Netaporter")).
select(to_date($"failure_tstamp").alias("date")).
groupBy("date").
agg(count("date").alias("count")).
sort("count").
show(250)
We can also inspect the full error message or the original line (it won’t look nice):
df_filtered.
filter($"message".contains("could not be parsed by Netaporter")).
select($"message").
show(1, false)
df_filtered.
filter($"message".contains("could not be parsed by Netaporter")).
select($"line").
show(1, false)
Once the issue has been identified, we can remove all rows with this particular error message from our DataFrame and start over:
df_filtered.filter(not($"message".contains("could not be parsed by Netaporter")))
2. Debugging bad rows in Spark without Zeppelin
To run Spark without Zeppelin, replace %spark
at the beginning of this tutorial with:
import org.apache.spark.{SparkContext, SparkConf}
val sc = {
val conf = new SparkConf()
.setAppName("badrows")
.setMaster("local")
new SparkContext(conf)
}
Spark can be run on EMR, Databricks, or even on device, if the number of bad rows is small enough.