We have created a firehose to read the data from collector good stream to store the data to S3. The intent of storing the data is
a. to help us replay the data to EMR or enrich, if there are issues with the downstream logic changes
b. We would like to use the data in the non-prod environment to build and test changes to the pipeline.
Currently the data is created as gzip file from firehose. Firehose does not have an option of converting the file to lzo and index.
I tried to downloaded lzop utility and pip lzo-indexer, I uncompressed the gzip file ran through lzo and lzo indexer utilities. When I tried to process the data using the snowplow emr, It did not produce and good data either in shredded or enriched.
The easiest way to do this is to setup the Snowplow S3 loader to point to your Kinesis stream and sink (as LZO) to a bucket of your choosing. Once you’ve set it up you can leave it running and it’ll take care of that process for you.
If we have a real time enrichment and we had issues with it and wanted fix it, can we replay records directly from s3 sync to the enrich KS?
This likely depends on your use case.
I avoid replaying anything into the Kinesis streams (just my opinion) that hasn’t been processed by stream enrich. Part of this is for technical reasons (ending up with duplicates in stream) and part of it is philosophical (downstream consumers might not want to see events that are out of order).
If you need to fix something I’d opt for reprocessing data through Spark Enrich for your batch events.
With respect to a non-prod environment I’d strongly recommend using Snowplow Mini to test any changes that you are making to the enrichment process. It’s often the quickest way to debug any data quality issues that you may run into at collection and enrichment.