We are building a real time pipeline with kinesis and at the moment we want to work with the enriched data on the stream.
We were hoping to use a simple ruby script, in a aws lambda, to transform the enriched base64 data that comes from the kinesis stream to a json, as @grzegorzewald has mentioned. But in order to do that we need to deserialize the thrift data, which needs something like a enriched_payload.thrift but we haven’t found nothing like that.
And there’s one more issue. In our tests we have locally deserialized some data using the collector-payload.thrift using something like seen in here: (Decoding real-time bad records (Thrift) [tutorial]) but in ruby, and it looks like this:
which works! But it depends on the gem thrift-base64 which depends on the gem thrift which has a system dependency, and because of that we didn’t manage to make it work on AWS lambda
So… Is there a enrich.thrift to use? Has anyone made a ruby lambda that reads the stream? Is it a bad idea to work with the kinesis enriched stream without lambda, (a container reading the stream?)
PS: I know that there is the Python-Analytics-SDK and that I could use an elastic search sink, I’m looking for alternatives.
The good news is that if you want to read off the enriched stream (in Kinesis) it will be a bas64 encoded tab separated record rather than a Thrift record. The raw stream that the collector outputs is Thrift serialized but the output of the enricher that consumes this will output TSV (in which some columns are JSON).
Both the Scala and Python Analytics SDK will allow you to transform and convert this enriched format which can make things significantly easier - so if it’s possible for you to use them I’d recommend that.
If that isn’t an option you can also apply your own custom logic in a Lambda function with a Kinesis trigger (this will take care of some of the semantics of reading/batching records for you) or you can also read off the Kinesis enriched stream using any other client. If you do decide to read off the stream directly I’d strongly encourage using the Ruby (or other language) KCL library that AWS provides as this significantly simplifies some of the logic you’d otherwise have to implement around record processing and checkpointing.