Debugging bad rows in Spark and Zeppelin [tutorial]

One of the features that makes Snowplow unique is that we report bad data: events that hit the collector but fail to be processed (e.g. because the event fails validation). This is valuable because it allows our users to:

  • spot data tracking issues that emerge, often during testing, and address them at the source
  • have a high degree of confidence that trends in the data reflect trends in the business, not data issues
  • recover events that failed to be processed

Most Snowplow users load their bad rows into Elasticsearch. For more information on how to use Elasticsearch and Kibana to debug bad rows, or how to recover events, visit these tutorials:

In this tutorial, we will show how to debug bad data using Spark. We recommend this approach when there is the need to investigate large numbers of historical bad rows that weren’t loaded into Elasticsearch as part of the regular runs.

1. Debugging bad rows in Spark and Zeppelin on EMR

Apache Zeppelin is an open source project which allows users to create and share web-based notebooks for data exploration in Spark. We are a big fan of notebooks at Snowplow because of their interactive and collaborative nature. Debugging bad rows is a good use case.

To get Zeppelin up and running, follow sections 1 to 3 in the following tutorial: Loading Snowplow events into Apache Spark and Zeppelin on EMR.

You should now have the Zeppelin UI open in a browser.

1.1 Loading bad rows from S3 into Spark

To load bad rows from S3 into Spark, create a new note and paste the following:

%spark

sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "<REPLACE_WITH_ACCESS_KEY>")
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "<REPLACE_WITH_SECRET_ACCESS_KEY>")

val input = sc.textFile("s3n://snowplow-enrichment-output/enriched/bad/run=201{5-1[12],6-[0-3][0-9]}*")

You’ll need to replace the placeholders and enter the correct path. This particular regular expression will load 5 months worth of bad data, from November 2015 to March 2016 (I recommend estimating the size of the bad rows before attempting to loading them all). The regular expression rules are:

  • * (match 0 or more character)
  • ? (match single character)
  • [ab] (character class)
  • [^ab] (negated character class)
  • [a-b] (character range)
  • {a,b} (alternation)
  • \c (escape character)

Next, create a SQLContext to transform the RDD into a Spark DataFrame:

import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)

import sqlContext.implicits._
val df = sqlContext.read.json(input)

import org.apache.spark.sql.functions._

1.2 Exploring the bad rows

For those that are not familiar with the bad row format, start with printing the schema.

df.printSchema
root
 |-- errors: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- level: string (nullable = true)
 |    |    |-- message: string (nullable = true)
 |-- failure_tstamp: string (nullable = true)
 |-- line: string (nullable = true)

The field we are most interested in is errors.message as it will tell us what caused the row to fail. The failure_tstamp lets us know when bad row was generated, and line contains the original row (the one failed to be processed).

Here is what a bad row looks like in its original JSON format:

{
  "line": "2016-03-02\t19:00:47\t-\t-\tXX.XXX.XX.XXX\tOPTIONS\tXX.XXX.XX.XXX\t/com.snowplowanalytics.snowplow/tp2\t200\thttps://www.website.com/\tMozilla%2F5.0+%28compatible%3B+Googlebot%2F2.1%3B+%2Bhttp%3A%2F%2Fwww.google.com%2Fbot.html%29\t&cv=clj-1.1.0-tom-0.2.0&nuid=-\t-\t-\t-\t-\t-",
  "errors": [{
    "level": "error",
    "message": "Unrecognized event [null]"
  }],
  "failure_tstamp": "2016-03-03T07:02:46.465Z"
}

To count the total number of bad rows, run:

df.count

We can also use the failure_tstamp to find out how this number varies over time.

df.
  select(to_date($"failure_tstamp").alias("date")).
  groupBy("date").
  agg(count("date").alias("count")).
  sort("date").
  show(100)
+----------+-----+
|      date|count|
+----------+-----+
|2016-03-25| 1640|
|2016-03-26| 2728|
+----------+-----+

1.3 Excluding all known bad rows

One of the interesting things that jumps out when investigating bad rows is that there is a fair amount of data that did not originate from our trackers or webhooks. Instead, it is data from (often malicious) bots pinging the internet looking for vulnerabilities.

The 2 main examples are bad rows that contain the following error messages:

does not match (/)vendor/version(/) pattern nor is a legacy /i(ce.png) request
Unrecognized event [null]

The first are requests to paths the collector does not support. The second are OPTIONS requests that are made when the Javascript tracker sends data via POST. It’s necessary for the browser to send an OPTIONS request prior to issuing the POST request with the actual data (this is a CORS requirement). We plan to stop logging OPTIONS request in a future release.

Both kinds of bad rows can be ignored as none of them represent failed attempts to send actual data. We can filter them out and create a new DataFrame with all remaining bad rows:

val df_filtered = df.
  select($"errors.message"(0).alias("message"), $"failure_tstamp", $"line").
  filter(not($"message".contains("does not match (/)vendor/version(/) pattern nor is a legacy /i(ce.png) request"))).
  filter(not($"message".contains("Unrecognized event [null]")))

The remaining rows should all be genuine bad data, i.e. events that did originate from our trackers or webhooks, so we need to drill into what’s left to unpick the errors.

1.4 Diagnosing all remaining bad rows

The process of diagnosing the remaining bad rows is an iterative one:

  • pick a common error message
  • investigate the message and decide on a solution:
    • either resolve the issue (e.g. in the tracker) so no new bad rows are created
    • or ignore the error if it doesn’t require a fix
  • filter all bad rows with that same message from the DataFrame

That said, it’s not trivial to aggregate error messages. Take these two examples of actual error messages:

Provided URI string [http://www.mycompany.com/?utm_source=affilinet&utm_medium=affiliate&utm_term=%MEDIANAME%&utm_term=581176] could not be parsed by Netaporter: [Malformed escape pair at index 77: http://www.mycompany.com/?utm_source=affilinet&utm_medium=affiliate&utm_term=%MEDIANAME%&utm_term=581176]
Provided URI string [http://www.company-shopping.com/redirect/0824602_r1663866/sg/4/s_id/8?origin=r14s&s_evar6=cebqhvgf-zbfnvdhr&s_evar7=pngrtbevr:%2Sirgrzragf_p1%2Syvatrevr-srzzr_p1354%2Syvatrevr-frkl-srzzr_p1379%2Sahvfrggr-frkl-srzzr_p1381] could not be parsed by Netaporter: [Malformed escape pair at index 126: http://www.company-shopping.com/redirect/0824602_r1663866/sg/4/s_id/8?origin=r14s&s_evar6=cebqhvgf-zbfnvdhr&s_evar7=pngrtbevr:%2Sirgrzragf_p1%2Syvatrevr-srzzr_p1354%2Syvatrevr-frkl-srzzr_p1379%2Sahvfrggr-frkl-srzzr_p1381]

If we were to aggregate the DataFrame on errors.message, these two rows wouldn’t be grouped together, even though the issue is the same: there was a problem parsing the URI (a non-url-encoded character in a query string value). We plan to change to improve how error messages are structured in a future release.

To see the 25 most common error messages, run:

df_filtered.
  groupBy("message").
  agg(count("message").alias("count")).
  sort(desc("count")).
  show(25)

Let’s pick a particular error message (could not be parsed by Netaporter) and see how often it occurs:

df_filtered.filter($"message".contains("could not be parsed by Netaporter")).count
df_filtered.
  filter($"message".contains("could not be parsed by Netaporter")).
  select(to_date($"failure_tstamp").alias("date")).
  groupBy("date").
  agg(count("date").alias("count")).
  sort("count").
  show(250)

We can also inspect the full error message or the original line (it won’t look nice):

df_filtered.
  filter($"message".contains("could not be parsed by Netaporter")).
  select($"message").
  show(1, false)
df_filtered.
  filter($"message".contains("could not be parsed by Netaporter")).
  select($"line").
  show(1, false)

Once the issue has been identified, we can remove all rows with this particular error message from our DataFrame and start over:

df_filtered.filter(not($"message".contains("could not be parsed by Netaporter")))

2. Debugging bad rows in Spark without Zeppelin

To run Spark without Zeppelin, replace %spark at the beginning of this tutorial with:

import org.apache.spark.{SparkContext, SparkConf}

val sc = {
  val conf = new SparkConf()
    .setAppName("badrows")
    .setMaster("local")
  new SparkContext(conf)
}

Spark can be run on EMR, Databricks, or even on device, if the number of bad rows is small enough.

1 Like

As Christophe mentions above, it’s possible to use Spark to analyze bad rows without EMR or Zeppelin.

A very quick and easy alternative (especially over smaller bad data sets) is to download the bad rows locally (e.g. using the AWS CLI and then parse them in Spark locally. Assuming you’re on a Mac:

1. Install the AWS CLI

$ brew install awscli

2. Install Spark

$ brew install apache-spark

3. Download the bad rows you want to inspect locally

$ aws s3 cp s3://snowplow-company-enrichment-output/enriched/bad/run=2016-08-10-06-17-56/ run=2016-08-10-06-17-56/ --include "*" --recursive

Update the bucket reference above to the s3 path to your bad rows.

Navigate into the directory with your bad rows:

$ cd run=2016-08-10-06-17-56

4. Launch the spark shell

$ spark-shell
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Using Spark's repl log4j profile: org/apache/spark/log4j-defaults-repl.properties
To adjust logging level use sc.setLogLevel("INFO")
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.6.1
      /_/

Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_51)
Type in expressions to have them evaluated.
Type :help for more information.
Spark context available as sc.
16/08/10 08:59:03 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
16/08/10 08:59:03 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
16/08/10 08:59:06 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
16/08/10 08:59:06 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
16/08/10 08:59:08 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
16/08/10 08:59:08 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
16/08/10 08:59:11 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
16/08/10 08:59:11 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
SQL context available as sqlContext.

5. Load the bad rows into a data frame

scala> val badRowsFiles = sc.textFile("part-*")
badRowsFiles: org.apache.spark.rdd.RDD[String] = part-* MapPartitionsRDD[3] at textFile at <console>:27

scala> import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLContext

scala> val sqlContext = new SQLContext(sc)
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@4eab4474

scala> import sqlContext.implicits._
import sqlContext.implicits._

scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._

scala> val df = sqlContext.read.json(badRowsFiles)
df: org.apache.spark.sql.DataFrame = [errors: array<struct<level:string,message:string>>, failure_tstamp: string, line: string]

scala> df.printSchema
root
 |-- errors: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- level: string (nullable = true)
 |    |    |-- message: string (nullable = true)
 |-- failure_tstamp: string (nullable = true)
 |-- line: string (nullable = true)


scala> df.count
res2: Long = 33085

You can now debug the bad rows exactly as described by Christophe above!