As part of our drive to make the Snowplow community more collaborative and widen our network of open source contributors, we will be regularly posting our proposals for new features, apps and libraries under the Request for Comments section of our forum. You are welcome to post your own proposals too!
A new bad row format
Introduction
Data quality has always been a huge focus for Snowplow. Being a non-lossy data pipeline is one of the central piece of this data quality puzzle. Indeed, other pieces such as data validity (through our schema validation technology) or data richness (through our enrichments) connect directly with the non-lossiness piece.
In practice, for a Snowplow pipeline, non-lossiness meant that when something went wrong anywhere in the pipeline, instead of discarding it, the data impacted is parked as “bad” for later inspection, fixing and reprocessing. In Snowplow jargon, this bad data is called “bad rows”.
However, over the years, a certain form of antagonism has developed between how great it is to have this feature and how annoying it can be to deal with. In particular, inspection, fixing and reprocessing of this bad data are currently harder than they should be. We believe tackling these issues starts with a new format for this bad data.
It seems that the community shares our views and enthusiasm on the subject as the answer “Better bad row format” was the most voted for when answering the question “If you were Snowplow, how would you prioritize the following roadmap ideas?” in our recent 2018 open source community survey:
Of course, we have been aware of this major pain point for a while and our journey into a more intelligent bad row format actually already started last November during a hackweek when we got together to think, as a group, about data quality in a Snowplow pipeline. Of course, one of the central theme that came up was the bad row format and a team got started on a proof of concept for a new format.
Since then, data quality has been the focus of the pipeline team, this has translated into different efforts:
- Google Cloud Storage loader which fits into the data availability and consistency pieces by letting you save your data to durable storage on GCP be it “bad” or “good” data
- A standardized recovery process for real-time pipelines on AWS and GCP which is, at the time of writing, still in development: https://github.com/snowplow-incubator/snowplow-event-recovery
- The RFC you’re currently reading
Furthermore, it will continue to be our focus going into 2019 as we start working on the object of discussion of this RFC.
Because of all these converging factors, we are eager to share our plans regarding a new bad row format!
Current state of affairs
At the moment, a bad row will look something like:
{
"errors": [
{
"level": "error",
"message": "a message"
}
],
"line": "a collector payload",
"failure_tstamp": "2018-12-17T15:54:44.493Z"
}
The current format comes up short in quite a few places.
First, if we look at the errors
field, all errors are stored in the message
field as flat
strings which are not machine-readable and sometimes not even easy to understand by a human.
Moreover, the level
field in each of those errors always has the "error"
value, making it superfluous.
Next, if we look at the line
field, its content is dependent on the collector you’re using:
- if you’re using the Clojure Collector, you will get a tab-separated collector payload
- if you’re using the Scala Stream Collector, you will get a base64-encoded Thrift-serialized collector payload which makes it virtually impossible to know what the payload actually look like
Both of these lacks of structure make it difficult to know what actually happened when looking at a specific bad row.
Furthermore, if we look at the format holistically, it also becomes apparent that there is no
inherent partitioning possible with this kind of data (except by date): every bad rows are mingled together and there is no way to tell different types of bad rows apart from just looking at them structurally. For example, a bad row resulting from a schema invalidation and another coming from an enrichment failure will both have the exact same structure despite having completely different semantics.
This makes it hard to investigate specific issues as you automatically get the full bad row firehose while you might be interested in a single specific issue.
Finally, there is a set of more esoteric issues with the current format such as what we internally refer to as “POST bombs”.
This issue starts with a batch of events being sent as a single POST request. The problem occurs if one or more of these events result in a bad row: a bad row will be generated for every one of these events containing the entirety of the original payload (the whole batch of events), leading to what can be a huge multiplication factor, hence the term “bomb”.
In addition to this, POST bombs are a source of duplicate generation.
For example, if you were to send events through a POST request containing a batch of 100 events and two of those events resulted in bad rows, you would have two bad rows each containing the 100 events. When reprocessing, you would generate 98 duplicates because you would reprocess those 100 events twice in addition to wasting processing power.
In this RFC, we set out to address these shortcomings.
A new format
As it turns out, the bad row format can be much more featureful than it currently is and work towards solving these issues.
This becomes a lot clearer if we dive into the different points of generation of bad rows. In the following schema we can see a payload move through the different stages of a Snowplow pipeline and the places where something can go wrong and a bad row can be generated.
For each step we can see the coordinates of a corresponding bad row schema which a bad row will have to conform to if a particular macro pipeline step were to fail.
For example, let’s say you are running an AWS real-time Snowplow pipeline and a payload exceeding 1Mb hits your collector (1Mb is the maximum size of a record in Kinesis), this payload will fail size validation and a bad row conforming to iglu:com.snowplowanalytics.snowplow.badrows/size_violation/jsonschema/1-0-0
will be generated.
Now that we have a better grasp of where and why different bad rows are generated, we can dive into each type in turn.
Size validation
As we’ve just seen, a payload can exceed the maximum size allowed by the used data streaming technology. This implies that the payload can’t be stored in its entirety, most likely due to a payload body that is too big. For example, allowing images to be inserted in a form leads to this type of issue.
In this case, a bad row conforming to iglu:com.snowplowanalytics.snowplow.badrows/size_violation/jsonschema/1-0-0
will be generated, for example:
{
"schema": "iglu:com.snowplowanalytics.snowplow.badrows/size_violation/jsonschema/1-0-0",
"data": {
"failure": {
"timestamp": "2018-12-23T13:37:57.056Z",
"maximumAllowedSizeBytes": 1048576,
"actualSizeBytes": 1073741824
},
"payload": {
"timestamp": "2018-12-24T13:37:57.056Z",
"headers": {},
"path": "/com.snowplowanalytics.snowplow/tp2",
"truncatedQueryString": "a=12",
"truncatedBody": "{}"
},
"processor": {
"artifact": "scala-stream-collector",
"platform": "kinesis",
"version": "0.14.0"
}
}
}
If this check is succeful, it means that we can safely store our whole payload into our real-time data storage technology of choice.
Keep in mind that this case is basically impossible to recover from as the truncation leads to data loss.
Collector payload validation
Since collectors are web-facing, they can be reached by entities that are not Snowplow
trackers or are not part of the webhooks we currently support. Those entities are more often than not robots trying to find some sort of vulnerability.
For such cases, we will generate bad rows according to the iglu:com.snowplowanalytics.snowplow.badrows/format_violation/jsonschema/1-0-0
schema:
{
"schema" : "iglu:com.snowplowanalytics.snowplow.badrows/format_violation/jsonschema/1-0-0",
"data" : {
"failure": {
"timestamp": "2018-12-23T13:37:57.056Z",
"message" : "Unrecognized collector format"
},
"payload": {
"timestamp": "2018-12-24T13:37:57.056Z",
"headers": {},
"path": "/test",
"queryString": "a=12",
"body": "{}"
},
"processor": {
"artifact": "stream-enrich",
"platform": "kafka",
"version": "0.19.1"
}
}
}
From this type of bad rows, we can aggregate along different dimensions in order to find out the most common robots.
If the payload manages to pass this step, we know it is Snowplow-related.
Tracker protocol validation
Moving along our Snowplow pipeline, we can check that this Snowplow-related payload conform to our tracker protocol.
Failures for this step could resemble event ids not being UUIDs or contexts being invalid json which would have been generated by something Snowplow-aware, i.e. a faulty SDK (tracker or webhook integration).
If tracker protocol validation fails, we’ll be confronted to a iglu:com.snowplowanalytics.snowplow.badrows/tracker_protocol_violations/jsonschema/1-0-0
bad row:
{
"schema" : "iglu:com.snowplowanalytics.snowplow.badrows/tracker_protocol_violations/jsonschema/1-0-0",
"data" : {
"failure": {
"timestamp": "2018-12-23T13:37:57.056Z",
"messages": [
{"key": "eid", "value": "INVALID_UUID"},
{"key": "cx", "value": "INVALID_PAYLOAD"}
]
},
"payload": {
"timestamp": "2018-12-24T13:37:57.056Z",
"headers": {},
"path": "/test",
"queryString": "a=12",
"body": "{}"
},
"processor": {
"artifact": "spark-enrich",
"platform": "emr",
"version": "1.16.0"
}
}
}
Succeeding in going through this step, means that, of course, we know every part of the tracker protocol as well as how many self-describing entities are contained in the payload, for example.
Schema validation
Next, the topic that readers will probably be the most familiar with, are schema validation
failures. We’ll go through every self-describing entities in the payload and check that they conform to their schema.
As such, there could be one or more schema validation failures per self-describing entity. These bad rows, produced by Iglu, will follow the
iglu:com.snowplowanalytics.snowplow.badrows/schema_violations/jsonschema/1-0-0
schema:
{
"schema" : "iglu:com.snowplowanalytics.snowplow.badrows/schema_violations/jsonschema/1-0-0",
"data" : {
"failure": {
"timestamp": "2018-12-23T13:37:57.056Z",
"messages": [
{
"schemaKey": "iglu:com.acme/signup/jsonschema/1-0-2",
"instance": "unstructured_event",
"pointer": "/personal/1/name",
"property": "minLength",
"message": "property name has length 2; minimum allowed 3",
"igluRepository": "Iglu Central"
},
{
"schemaKey": "iglu:com.acme/healthcheck/jsonschema/1-0-0",
"instance": "contexts/2",
"message": "schema not found",
"igluRepository": "com.acme Iglu"
}
]
},
"payload": {
"timestamp": "2018-12-24T13:37:57.056Z",
"headers": {},
"path": "/test",
"queryString": "a=12",
"body": "{}"
},
"processor": {
"artifact": "beam-enrich",
"platform": "pubsub",
"version": "0.1.0"
}
}
}
Success here means that we know every self-describing entity is valid.
Enrichments
Moving on to the enrichment phase, failures here will mean that one or more enrichments will have failed and bad rows will conform to iglu:com.snowplowanalytics.snowplow.badrows/enrichment_failures/jsonschema/1-0-0
:
{
"schema" : "iglu:com.snowplowanalytics.snowplow.badrows/enrichment_failures/jsonschema/1-0-0",
"data" : {
"failure": {
"timestamp": "2018-12-23T13:37:57.056Z",
"messages": [
{
"enrichment": "iglu:com.snowplowanalytics.snowplow.enrichments/weather_enrichment/jsonschema/1-0-0",
"message": "Timeout error"
},
{
"enrichment": "iglu:com.snowplowanalytics.snowplow.enrichments/pii_enrichment/jsonschema/2-0-0",
"message": "No $.users.id property found"
}
]
},
"payload": {
"timestamp": "2018-12-24T13:37:57.056Z",
"headers": {},
"path": "/test",
"queryString": "a=12",
"body": "{}"
},
"processor": {
"artifact": "mini",
"platform": "nsq",
"version": "0.6.0"
}
}
}
After a successful enrichment phase, we will be in possession of a canonical event ready for loading.
Storage
Finally, we arrive at storage loading where we can have very semantically different failures. Those could range from connection issues with the targeted database to database user authorization issues.
Because of this heterogeneous nature, failures generated by storage components are nested:
{
"schema" : "iglu:com.snowplowanalytics.snowplow.badrows/storage_loader_failure/jsonschema/1-0-0",
"data" : {
"failure": {
"schema": "iglu:com.snowplowanalytics.snowplow.badrows/bigquery_bad_row/jsonschema/1-0-0",
"data": {
"timestamp": "2018-12-23T13:37:57.056Z",
"column": "contexts_com_acme_event_1",
"message": "Unrecognized type integer"
}
},
"payload": {
"enrichedEvent": "TSV enriched event",
"id": "deadbeef-dead-beef-dead-beefdeadbeef"
},
"processor": {
"artifact": "bigquery-loader",
"version": "0.2.0"
}
}
}
This final storage step ends our bad row journey!
A common collector payload format
By now, you will have noticed that the payloads contained in those bad rows are completely collector-independent as well as not obfuscated by serialization or weird formatting. This should help greatly during investigation and recovery as serialization and formatting do not get in the way.
Phasing
Our current plan of action will start, as do a lot of others projects at Snowplow, with defining schemas for the new different types of bad rows as well as a new collector payload format unifying the different collector formats (Scala Stream Collector, Clojure Collector, etc).
Work will continue on the Iglu client, focusing on improving the processing message structure which will help, in turn, the bad row format with clearer and immutable error messages to facilitate rollups when analyzing bad data.
With those foundations laid down, we will then works towards integrating them into Scala Common Enrich, our library common to all enrichment platforms. We will also take advantage of this opportunity to change the json and functional programming libraries (we’ll move from json4s to circe and from scalaz to cats respectively).
From there, we will be able to integrate this work into the different enrichment platforms
(Stream Enrich, Beam Enrich and Spark Enrich) so that this change can reach all users (AWS real-time, GCP real-time and batch respectively).
Moving further downstream, we will need to adjust the different components which let you store bad rows into durable storage. Those are the S3, Google Cloud Storage and Elasticsearch loaders.
Finally, the new bad row format workstream will end with a new recovery process leveraging the new bad row structure allowing you to act and recover certain types of bad rows while neglecting others for example.
Out of scope
One item that is currently out of scope which we want to work on further down the road is to enable recovery traceability. In essence, this process would add metadata relating to the kind of recoveries which have been run on a particular event. This would allow people to inspect an event and know how many times and for what purpose an event has been through recovery.
A process such as this one would unlock smarter and real-time recoveries.
Your feedback
With this approach which combines a lot more fine-grained structure through schemaing and bad rows discrimination thanks to the different types we’ve established above, we aim to make bad data easier to investigate and load into SQL-compatible systems and fix.
We’d love to hear what you think about the above proposal. Don’t hesitate to share
ideas and discuss the proposed format.