Raw Data from Kinesis Stream to S3 Contains NO Keys

I am running the Scala collector, and the Scala/Kinesis enrichment processes. I set my Kinesis stream to dump to S3 with Kinesis Firehose, however when I look at that data, there are no keys. My goal is to write a lambda transformation function that runs with Firehose, but I’m lost as to how to know what the data is without any keys listed. Hoping this is not a complicated fix! :slight_smile:

I’m not too sure what you mean by no keys. Anything that lands on S3 by nature has to have its own key (bucket / filename).

You probably already know this but the current Snowplow pipeline doesn’t use Firehose in any way and instead sinks data to S3 via its own sink.

When I said key, I meant as in a key-value type of key. Example, I would transform the event into JSON.

So if I use the kinesis enrichment, how do I get that data into S3 as JSON properly?

If you’d like to convert the enriched event format (pre shredding) into a JSON payload you can use one of the Snowplow Analytics SDKs that will perform this mapping and data type conversion for you. As these are third party libraries it is possible to import these into a Lambda function directly if you’d like.

1 Like

So I got the S3 Loader setup and running, and it’s giving me the same output I got from Firehose. Here it is with sensitive data removed:

xxx	web	2018-11-22 00:31:28.303	2018-11-22 00:31:26.460	2018-11-22 00:31:28.156	page_view	fc848ebc-ccb7-40a2-a32b-e255db223c26		cf	js-2.9.0	ssc-0.14.0-kinesis	stream-enrich-0.19.1-common-0.35.0		*MYIP*	2098309707	45f3c6ae-48fa-4d6f-8ac2-4219941e9469	1	c170174b-45eb-4435-b3e9-6981504b4f8d	US	*MYSTATE*	*MYCITY*	*MYZIP*	*MYLATITUDE*	*MYLONGITUDE*	*MYSTATE*					*MYSITEURL*			https	*MYSITEURL*	443	/hello.html																																										Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.102 Safari/537.36						en-US	1	0	0	0	0	0	0	0	0	1	24	1920	920				America/New_York			1920	1080	UTF-8	1920	920								America/New_York				2018-11-22 00:31:28.159			{"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1","data":[{"schema":"iglu:com.snowplowanalytics.snowplow/ua_parser_context/jsonschema/1-0-0","data":{"useragentFamily":"Chrome","useragentMajor":"70","useragentMinor":"0","useragentPatch":"3538","useragentVersion":"Chrome 70.0.3538","osFamily":"Windows","osMajor":"10","osMinor":null,"osPatch":null,"osPatchMinor":null,"osVersion":"Windows 10","deviceFamily":"Other"}},{"schema":"iglu:org.ietf/http_cookie/jsonschema/1-0-0","data":{"name":"aw","value":"c170174b-45eb-4435-b3e9-6981504b4f8d"}}]}	fd12e71a-9303-4b28-8f61-13de9bd3d4e1	2018-11-22 00:31:26.457	com.snowplowanalytics.snowplow	page_view	jsonschema	1-0-0	5af4e4f192216ec7268c3339f29a6829	

How am I supposed to know what all those fields are? The structured data at the end is from enrichments and it looks good, but what about the first three dates in the data set(2018-11-22 00:31:28.303 2018-11-22 00:31:26.460 2018-11-22 00:31:28.156), or how about those 1s and 0s in the middle? What about those unique codes(45f3c6ae-48fa-4d6f-8ac2-4219941e9469, c170174b-45eb-4435-b3e9-6981504b4f8d)?

edit: Are these the fields in order? https://github.com/snowplow/snowplow/wiki/canonical-event-model#application

@passmeabeck, those appear to be enriched events. The order of the properties could be seen in Snowplow Analytics SDK code. Here’s the Python SDK: https://github.com/snowplow/snowplow-python-analytics-sdk/blob/master/snowplow_analytics_sdk/event_transformer.py#L57-L189. I think 0s and 1s are boolean values (in particular browser features, br_...).

1 Like

I’m doubting my entire setup and workflow now. So I have the scala collector -> kinesis stream(good/bad) -> stream enrich -> kinesis stream(enriched good/bad) -> s3 loader

Is there a way to shred the data BEFORE it goes to S3? I don’t quite understand how it’s helpful to store the data without any keys in S3 - I read on that SDK doc that there are over 100 possible fields. It seems very chaotic to not have the ability to declare what the values mean within the streams/S3. How can I change my workflow to get this information into S3?

My goal is to be able to load into Dynamo/Redshift/Snowflake/etc from S3. S3 would essentially act as my data lake.

edit: I think I’ll end up retrying Kinesis Firehose with a custom lambda shredder I’ll have to make paired with that link above on the fields( https://github.com/snowplow/snowplow-python-analytics-sdk/blob/master/snowplow_analytics_sdk/event_transformer.py#L57-L189 )

Is there a better way though? I’m open to changing my pipeline.

What about just using the standard real time pipeline as is with Stream Enrich mode? The pipeline can load into Redshift and Snowflake (though not DynamoDB other than for deduplication purposes).

https://sea2.discourse-cdn.com/flex016/uploads/snowplowanalytics/optimized/1X/7667f678693f18518e86ac091d11df438a1cf349_1_690x448.png

In general keys / column headers aren’t stored in S3 or Kinesis if they have a fixed structure as in the case of enriched events (excluding self-describing events). Storing this as JSON in a Kinesis stream (or S3) when we know the structure would require more bytes per event and therefore require more storage / lower throughput / increased deserialisation cost.

Mike and ihor I just want to say you have both helped me here tremendously. Thank you. Using the analytics SDK I was able to write a lambda to transform the event into JSON. While I agree that it costs more in storage, it becomes useful because now I can use that bucket as my data lake and feed that information into the DB of my choosing at any time.

I will take a look at elastic search, that may be useful.

2 Likes