Writing thrift from enriched bad rows

I have successfully deserialized an enriched/bad/ row using the python thrift library. This has created a Collector Payload.

I am now attempting to re-write the thrift file using the struct. I assume the order in which I write the fields matters. I am writing them in the same order they are presented in the struct:

struct CollectorPayload {
  31337: string schema

  // Required fields which are intrinsic properties of HTTP
  100: string ipAddress

  // Required fields which are Snowplow-specific
  200: i64 timestamp
  210: string encoding
  220: string collector

  // Optional fields which are intrinsic properties of HTTP
  300: optional string userAgent
  310: optional string refererUri
  320: optional string path
  330: optional string querystring
  340: optional string body
  350: optional list<string> headers
  360: optional string contentType

  // Optional fields which are Snowplow-specific
  400: optional string hostname
  410: optional string networkUserId
}

Should the order be different? I ask this since when I look at a binary file (as much of it as I can) from the raw/good/ bucket and one I am creating, the order is slightly different. If you could point me to the scala even where you are writing these thrift files that might shed some light.

For additional information the error messages I am getting in enriched bad after trying to run the thrift file I have created and converted to an lzo using lzop are these:

{"line":"ziAiglucomsnowplowanalyticssnowplow/CollectorPayload/thrift/1+0+0d2418615610","errors":[{"level":"error","message":"Error deserializing raw event: Cannot read. Remote side has closed. Tried to read 2 bytes, but only got 0 bytes. (This is often indicative of an internal error on the server side. Please check your server logs.)"}],"failure_tstamp":"2017-03-22T22:39:47.350Z"}
{"line":"Z/UTF+8ssc+070+kinesismMozilla/50WindowsNT100WOW64AppleWebKit/53736KHTMLlikeGeckoChrome/560292487Safari/537366http//wwwalexkaplanphotography/Weddings/iJuic=","errors":[{"level":"error","message":"Error deserializing raw event: Cannot read. Remote side has closed. Tried to read 2 bytes, but only got 1 bytes. (This is often indicative of an internal error on the server side. Please check your server logs.)"}],"failure_tstamp":"2017-03-22T22:39:47.705Z"}

Thank you.

Hi @dkeidel - the contents of /raw/good in S3 are not simply the raw Thrift events with an lzo encoding. They have been stored to S3 via Kinesis S3, using the Elephant Bird project’s wrapper:

The error you are getting is consistent with Hadoop Enrich looking for the Elephant Bird wrapper and not finding it…

@alex, thank you for your reply. It is great to know why I am getting that error, but also kind of a bummer that I might not be able to go the route I was taking over the last 2 weeks.

Please excuse my ignorance when it come to the Elephant Bird wrapper, but after doing some reading it is not clear to me if the thrift is wrapped by this Elephant Bird class, or if the class does the whole serialization process. Is it possible for me to take the thrift I have created and feed that into elephant bird somehow to get the desired result? I am doing all of this in python and unfortunately I have not been able to find an Elephant Bird solution in python. If I could save the thrift file I have and somehow use that as input for the elephant bird process maybe I can generate what is needed? Please let me know if any of that is possible or if I have hit a dead end when it come to reprocessing the enriched/bad/ rows (that are unfortunately in thrift format).

Thank you

Hi @dkeidel - I think you are going to struggle to do this from Python, but it’s relatively straightforward from the JVM. Here is the relevant file in Kinesis S3:

https://github.com/snowplow/kinesis-s3/blob/master/src/main/scala/com.snowplowanalytics.snowplow.storage.kinesis/s3/serializers/LzoSerializer.scala

Incidentally, we are speaking to a customer who is interested in sponsoring a “collector payload scrubber” which would I suspect cover your underlying requirement, but it’s not clear if that sponsorship will go ahead at this time.

@alex - we have a raw kinesis stream that Kinesis-S3 is reading from. Are these records in that stream raw thrift before they are grabbed by Kinesis-S3? I base64 decode them and they look like thrift.

If so couldn’t I just pump the thrift back into this kinesis stream?

Hey @dkeidel - yes that should work, the stream in question is just the Thrift records, and Kinesis S3 will correctly encapsulate them using the Elephant Bird wrapper before sinking to S3.

I’d do this in a staging environment first (not in prod) just to be super-sure you are generating valid Thrifts (otherwise you will corrupt your production S3 archive).

@alex, thanks for the reply. I have a development pipeline and Redshift cluster that we test everything with. I simply aws s3 cp the rows over and process them through that pipeline. As a first test of identity, I am going to read one of our kinesis streams, grab data from there, use that exact same record and put it back into the stream, and process that. Then if that is successful I am going to modify one of the records using the code I have to deserialize and serialize thrift and see if that is successful. Having a development pipline and separate Redshift cluster allows for a lot of fun experimentation :slight_smile:

1 Like

Hello @dkeidel and @alex - sorry for bumping this old topic but I am in the same position.

After a year, is there a recommended method for dealing with bad Thrift rows?
I am able to (de)serialize them, should I go the same way of feeding raw Thrift bytes into the Kinesis stream?

Thanks!

Hi @pranas - yes, that should still work.

@alex - we took some bad rows (custom API enrichment timeout, otherwise good), Base64 decoded lines to get raw Thrift and fed them (unmodified) into the Kinesis stream.

It went through the staging pipeline all the way to Redshift and looks good except that collector_tstamp values are all ‘1970-01-01 00:00:00’.
How can it be? Is it a version mismatch? I can parse Thrift entries manually and see that CollectorPayload.timestamp values look ok.

Replying to my own post. We actually used some aws-kinesis-agent tool to transfer Thrift data from S3 to Kinesis. Apparently, this messed up our collector_tstamp values. Writing to Kinesis directly from our Python script solved the issue…