Data during the validation step isn't decoded correctly

Hi guys,
So I have a snowplow data source, going through to a kinesis stream. The is returning my ‘bad data’.

When I call in Python this kinesis stream I get

"e":"ue","ue_px":"eyJzY2hlbWEiOiJpZ2x1OmNvbS5zbm93cGxvd2FuYWx5dGljcy5zbm93cGxvd1wvdW5zdHJ1Y3RfZXZlbnRcL2pzb25zY2hlbWFcLzEtMC0wIiwiZGF0YSI6eyJzY2hlbWEiOiJpZ2x1OmNvbS5idXN1dVwvcHJvZ3Jlc3NfZXZlbnRfbG9nXC9qc29uc2NoZW1hXC8xLTAtMCIsImRhdGEiOnsidWlkIjoiMTU5ODM5NjkiLCJsYW5ndWFnZV9sZWFybnQiOiJpdCIsImludGVyZmFjZV9sYW5ndWFnZSI6ImVuYyIsImNvbXBvbmVudF9pZCI6ImVudGl0eV8yXzJfMl85IiwiY29tcG9uZW50X3R5cGUiOiJudWxsIiwiY29tcG9uZW50X2NsYXNzIjoiZW50aXR5IiwiY2xpZW50X3ZlcnNpb24iOiJudWxsIiwicGFzc2VkIjoibnVsbCIsInNjb3JlIjowLCJtYXhfc2NvcmUiOjAsImVuZF90aW1lIjoxNTQ0OTY0MTc5Nzg5LCJzdGFydF90aW1lIjoxNTQ0OTYxOTMzOTEzLCJkdXJhdGlvbiI6MjI0NTg3NiwicGxhdGZvcm0iOiJ3ZWIiLCJldmVudCI6InZvY2FidWxhcnktYWN0aXZpdHkiLCJlbnZpcm9ubWVudCI6InByb2R1Y3Rpb24ifX19","tv":"php-0.3.0-rc1","p":"srv","eid":"1e3e9024-0130-11e9-b9e7-06a13e9f64b8 stuff of the following form. Now it looks to me like this hasn’t decoded properly.

I’m using

import json
from datetime import datetime
import time
import configparser
import time

config = configparser.ConfigParser()
config.read("conf2.ini")
AWS_access_key_id = config.get("AWS Creds", "AWS_access_key_id")
AWS_secret_access_key = config.get("AWS Creds", "AWS_secret_access_key")

my_stream_name = "rawdatabad"

kinesis_client = boto3.client(
    "kinesis",
    region_name="eu-west-1",
    aws_access_key_id=AWS_access_key_id,
    aws_secret_access_key=AWS_secret_access_key,
)


response = kinesis_client.describe_stream(StreamName=my_stream_name)
my_shard_id = response["StreamDescription"]["Shards"][0]["ShardId"]

shard_iterator = kinesis_client.get_shard_iterator(
    StreamName=my_stream_name, ShardId=my_shard_id, ShardIteratorType="TRIM_HORIZON"
)

my_shard_iterator = shard_iterator["ShardIterator"]

while 1 == 1:
    record_response = kinesis_client.get_records(
        ShardIterator=my_shard_iterator, Limit=500
    )
    my_shard_iterator = record_response["NextShardIterator"]
    import time

    timestr = time.strftime("%Y%m%d-%H%M%S")
    if len(record_response["Records"]) > 0:

        record_length = len(record_response["Records"])
        for i in range(record_length):

            print(int(record_length - i), "left")
            json_loaded = json.loads(
                record_response["Records"][i]["Data"].decode("utf8")
            )
            # if "vocabulary-activity" in json_loaded["errors"][0]["message"]:

            file = open(timestr + "logfile.txt", "w")
            print(json_loaded["failure_tstamp"], json_loaded["errors"])
            time.sleep(0.25)
            # print(record_response)

Now when I specifically look at the data dump itself I use base64.b64decode(record_response["Records"][i]["Data"]).decode('utf8', errors="ignore") and that returns the dump we see above.

Does anyone have any insight into why this is the case. I’d love to be able to look at the events in more detail.

Hi @springcoil.

"e":"ue" tells you this event is an ‘unstructured’ (self-describing) event.

"tv" is the tracker version.

"p" is the platform.

"eid" is the event ID.

The actual event payload is in the "ue_px" field. If you base64-decode that, you’ll get a JSON with your com.busuu/progress_event_log/jsonschema/1-0-0 event.

How do I access that part of the dict? Just like a pythonic dict?

You can use our Python Analytics SDK to transform the enriched event into a JSON and from there it’s just like a pythonic dict.

So I have something like this from kinesis b'{"line":"CwBkAAAADTUyLjIwOC4xNTguOTUKAMgAAAFoQagCzwsA0gAAAAVVVEYtOAsA3AAAABFzc2MtMC4`3LjAta2luZXNpcwsBQAAAACMvY29tLnNub3dwbG93YW5hbHl0aWNzLnNub3dwbG93L3RwMgsBVAAAAt97InNjaGVtYSI6ImlnbHU6Y29tLnNub3dwbG93YW5hbHl0aWNzLnNub3dwbG93XC9wYXlsb2FkX2RhdGFcL2pzb25zY2hlbWFcLzEtMC0yIiwiZGF0YSI6W3siZHRtIjoiMTU0NzI4OTc1NjAwMCIsImUiOiJ1ZSIsInVlX3B4IjoiZXlKelkyaGxiV0VpT2lKcFoyeDFPbU52YlM1emJtOTNjR3h2ZDJGdVlXeDVkR2xqY3k1emJtOTNjR3h2ZDF3dmRXNXpkSEoxWTNSZlpYWmxiblJjTDJwemIyNXpZMmhsYldGY0x6RXRNQzB3SWl3aVpHRjBZU0k2ZXlKelkyaGxiV0VpT2lKcFoyeDFPbU52YlM1aWRYTjFkVnd2ZFhObGNsOXdjbTltYVd4bFgyUmxkR0ZwYkY5amFHRnVaMlZmYkc5bmMxd3Zhbk52Ym5OamFHVnRZVnd2TVMwd0xUQWlMQ0prWVhSaElqcDdJblZwWkNJNk5UazFNVEF5TkRZc0ltRmpkR2x2YmlJNkluVndaR0YwWlNJc0ltdGxlU0k2SW5Od2IydGxibDlzWVc1bmRXRm5aU0lzSW5aaGJIVmxJanA3SW01aGRHbDJaU0k2V3lKd2RDSmRMQ0ppWldkcGJtNWxjaUk2VzEwc0ltbHVkR1Z5YldWa2FXRjBaU0k2VzEwc0ltRmtkbUZ1WTJWa0lqcGJYWDBzSW5ScGJXVnpkR0Z0Y0NJNk1UVTBOekk0T1RjMU5pd2lZM0psWVhSbFpDSTZNVFV6TlRVMU5qUTBOeXdpWlc1MmFYSnZibTFsYm5RaU9pSndjbTlrZFdOMGFXOXVJbjE5ZlE9PSIsInR2IjoicGhwLTAuMy4wLXJjMSIsInAiOiJzcnYiLCJ1aWQiOiI1OTUxMDI0NiIsImVpZCI6ImM2NmRjZjIyLTE2NTYtMTFlOS1hZDlmLTA2MDg4ZjhlYjM4YSJ9XX0PAV4LAAAACAAAABZIb3 N0OiBldmVudHMuYnVzdXUuY29tAAAAGEFjY2VwdDogYXBwbGljYXRpb24vanNvbgAAA C1Db250ZW50LVR5cGU6IGFwcGxpY2F0aW9uL2pzb247IGNoYXJzZXQ9VVRGLTgAAA AeWC1Gb3J3YXJkZWQtRm9yOiA1Mi4yMDguMTU4Ljk1AAAAFVgtRm9yd2FyZGVkLVBv cnQ6IDQ0MwAAABhYLUZvcndhcmRlZC1Qcm90bzogaHR0cHMAAAATQ29udGVudC1M ZW5ndGg6IDczNQAAABZDb25uZWN0aW9uOiBrZWVwLWFsaXZlCwFoAAAAH2FwcGxp Y2F0aW9uL2pzb247IGNoYXJzZXQ9dXRmLTgLAZAAAAAQZXZlbnRzLmJ1c3V1LmNvbQ sBmgAAACRhYmI4MmZiNC1kNDkyLTQ1MTMtOWQzMy04NzhkMTUzYmRlOGELemkAA ABBaWdsdTpjb20uc25vd3Bsb3dhbmFseXRpY3Muc25vd3Bsb3cvQ29sbGVjdG9yUGF5b G9hZC90aHJpZnQvMS0wLTAA", "errors":[{"level":"error","message":"error: instance type (object) does not match any allowed primitive type (allowed: [\\"string\\"])\\n level: \\"error\\"\\n schema: {\\"loadingURI\\":\\"#\\",\\"pointer\\":\\"/properties/value\\"}\\n instance: {\\"pointer\\":\\"/value\\"}\\n` domain: \\"validation\\"\\n keyword: \\"type\\"\\n found: \\"object\\"\\n expected: [\\"string\\"]\\n"}], "failure_tstamp":"2019-01-12T10:42:37.390Z"}'

When I use type - I get class bytes. So I know for example that this won’t work with
snowplow_analytics_sdk.event_transformer.transform from the Snowplow Analytics SDK.
However I’m confused - how do I go from that byte like string to a dict like object or a TSV object that the snowplow analytics SDK will be happy with? I know it’s close because the structure looks similar.

I’ve also observed the following error too.

snowplow_analytics_sdk.event_transformer.transform(record_response
["Records"][10]["Data"].decode('utf8', errors='ignore'))
*** 
snowplow_analytics_sdk.snowplow_event_transformation_exception.
SnowplowEventTransformationException: ['Expected 131 fields, received 1 fields.']

Hi @springcoil,

So what you’re working with here is a bad row - which means it’s not actually an enriched event yet. The SDK won’t be much help here as it’s designed to work with enriched/good data.

If you want to work with this data, consuming from the bad bucket isn’t the best way to go - the best route forward is to debug the issue and fix it so that the event gets fully processed. I can see from the error message:

"errors":[{"level":"error","message":"error: instance type (object) does not match any allowed primitive type (allowed: [\\"string\\"])\\n level: \\"error\\"\\n schema: {\\"loadingURI\\":\\"#\\",\\"pointer\\":\\"/properties/value\\"}\\n instance: {\\"pointer\\":\\"/value\\"}\\n` domain: \\"validation\\"\\n keyword: \\"type\\"\\n found: \\"object\\"\\n expected: [\\"string\\"]\\n"}]`

That the value field is being sent as an object where it should be a string. So the best solution is to either bump the schema version to change this field to an object field, or fix the tracking so that the value is always a string.

Bad rows are centrally used for debugging. I’m not sure what your plans are for this data, but clarifying as that might save you some effort.

If you’re keen to dig into them programmatically, or recover them, some resources that might be helpful below.

The bad row ‘line’ is thrift-encoded - our friends at Snowflake Analytics have written a great blog about handling that in python here.

Within that the custom data for the event will be in the ue_px field (for custom event data) or the cx field (for custom context data). Your first post has you nearly there - the ue_px field you see is base64 encoded, and when decoded it’ll be that events’ self-describing JSON. The last section of this blog lays out the format of that data and how to handle it (in SQL for BigQuery, but format is the same so shouldn’t be a stretch to translate).

Hope that’s helpful.

Best,

1 Like

We’d like to be able to analyse the bad rows in some way - maybe with them in SQL say, in athena. Largely so we can then start to fix the errors. If that makes sense?

Thanks for the clarification about the snowplow SDK.

Btw just for clarification should there be no bad data at all? Like is there a certain amount of ‘bad data rows’ in most production systems or should be have zero?

We’d like to be able to analyse the bad rows in some way - maybe with them in SQL say, in athena.

That makes sense. In that case take a look at this blog (if you’re running a recent-version RT pipeline) and/or this one (otherwise).

If it’s for purposes of fixing the errors I recommend taking the approach of counting rows per error message, and manually decoding (I use the atom text editor which has handy packages for decoding base64 and prettifying JSON, or you can use Snowflake Analytics’ Snowplow inspector chrome plugin which has a bad row decoder in there too) one or two of them, then fixing. If the error message is the same, then the same field is corrupt for the same reason so you just need to find the issue for one bad row in order to fix all of them.

Btw just for clarification should there be no bad data at all? Like is there a certain amount of ‘bad data rows’ in most production systems or should be have zero?

There are going to be bad rows which can be ignored due to random traffic hitting your collector - mostly bots. These will be recognisable from errors like vendor with version... or Querystring is empty... etc.

Aside from these ideally you have 0 bad rows but sometimes there are edge cases that are so low in numbers that people decide not to bother with them. I always lean on the side of having a very strict schema and being very robust in your tracking implementation - the idea is that validation forces you to ensure your data is high quality when it lands in DB. A more permissive schema increases the chance of corrupt data.

It’s not always possible to have 0 bad rows however - if you don’t have control over the environment you’re tracking in or if your SP tags are in third party sites then you’re probably going to have some unavoidable ones.

In that case it’s a judgement call - generally about finding the best balance between having as strict as possible a schema but not so strict that enough data fails validation to impact your usage of it.

Additionally it’s always a good idea to monitor bad rows - checking in regularly.

Hope that’s helpful!

1 Like