I’m soooo close.
I’m using boto
in Python to push to Kinesis. The library encodes the data that we pass to it in base64 so since we actually want the Thrift records in the stream, I need to decode each base64 line first. I also have to decode the line in ascii
since I’m getting binary lines (b'xxxxxx'
)
This is the decoding part:
for line in smart_open.smart_open('s3://path-to-recovered-file'):
decoded = base64.b64decode(line.decode('ascii'))
records.append(decoded)
I’m pushing the result to our isolated stream and as I go, I’m comparing the records in that isolated recovery stream with the records I’m getting in our production stream.
To me, what I’m getting as a result looks really identical to the records I’m seeing in our working raw stream.
Example of record pushed and available in the recovery stream:
d
3.91.44.86
�~%��
�UTF-8
�/snowplow-stream-collector-kinesis-2.4.4-kinesis
,Ruby
@/i
J�e=se&se_ca=photo&se_ac=downloaded-photo&se_la=2138933&se_pr=MnwxNDI5OTZ8MHwxfGFsbHx8fHx8fHx8fDE2NDA5MTQ4NDA%3F&cx=eyJzY2hlbWEiOiJpZ2x1OmNvbS5zbm93cGxvd2FuYWx5dGljcy5zbm93cGxvdy9jb250ZXh0cy9qc29uc2NoZW1hLzEtMC0xIiwiZGF0YSI6W3sic2NoZW1hIjoiaWdsdTpjb20udW5zcGxhc2gvYXBpX2FwcGxpY2F0aW9uL2pzb25zY2hlbWEvMS0wLTAiLCJkYXRhIjp7ImFwcGxpY2F0aW9uX2lkIjoxNDI5OTZ9fV19&dtm=1640914846152&p=srv&ip=51.161.92.178&tv=rb-0.6.1&aid=unsplash-api-production&eid=1c2ec4b8-3474-4b7b-9ad2-930d529815d4&stm=1640914846152^
imeout-Access: <function1>X-Forwarded-ForX-Forwarded-Proto: httpX-Forwarded-Port: 80Host9X-Amzn-Trace-Id: Root=1-61ce5f9e-0083113e18b5f239443462604Accept-Encoding: gzip, deflate;q=0.6, identity;q=0.3
Accept: */*User-Agent: Ruby#X-Newrelic-Id: UwMCUFZRGwYAUFNbBwY=dX-Newrelic-Transaction: PxRWAl5WWgsBUwBSAVBVBwVXFB8EBw8RVU4aA1sAAVALVA4AAVdSAAVXUUNKQQEGAFADVVNQFTs=
�collector.unsplash.com
�$48b22093-608c-47f1-adea-9f3bc82da443
ziAiglu:com.snowplowanalytics.snowplow/CollectorPayload/thrift/1-0-0
Example of a record currently in our raw production stream:
d
3.91.44.86
�~%��
�UTF-8
�/snowplow-stream-collector-kinesis-2.4.4-kinesis
,Ruby
@/i
J�e=se&se_ca=photo&se_ac=downloaded-photo&se_la=2138933&se_pr=MnwxNDI5OTZ8MHwxfGFsbHx8fHx8fHx8fDE2NDA5MTQ4NDA%3F&cx=eyJzY2hlbWEiOiJpZ2x1OmNvbS5zbm93cGxvd2FuYWx5dGljcy5zbm93cGxvdy9jb250ZXh0cy9qc29uc2NoZW1hLzEtMC0xIiwiZGF0YSI6W3sic2NoZW1hIjoiaWdsdTpjb20udW5zcGxhc2gvYXBpX2FwcGxpY2F0aW9uL2pzb25zY2hlbWEvMS0wLTAiLCJkYXRhIjp7ImFwcGxpY2F0aW9uX2lkIjoxNDI5OTZ9fV19&dtm=1640914846152&p=srv&ip=51.161.92.178&tv=rb-0.6.1&aid=unsplash-api-production&eid=1c2ec4b8-3474-4b7b-9ad2-930d529815d4&stm=1640914846152^
imeout-Access: <function1>X-Forwarded-ForX-Forwarded-Proto: httpX-Forwarded-Port: 80Host9X-Amzn-Trace-Id: Root=1-61ce5f9e-0083113e18b5f239443462604Accept-Encoding: gzip, deflate;q=0.6, identity;q=0.3
Accept: */*User-Agent: Ruby#X-Newrelic-Id: UwMCUFZRGwYAUFNbBwY=dX-Newrelic-Transaction: PxRWAl5WWgsBUwBSAVBVBwVXFB8EBw8RVU4aA1sAAVALVA4AAVdSAAVXUUNKQQEGAFADVVNQFTs=
�collector.unsplash.com
�$48b22093-608c-47f1-adea-9f3bc82da443
ziAiglu:com.snowplowanalytics.snowplow/CollectorPayload/thrift/1-0-0
Following a previous post of yours @mike that I found, I was able to deserialize the record, which also indicates that the format is correct.
So here I am, expecting things to fully work when I push the payload to Kinesis but … I’m running a copy of my s3loader (v. 0.7.0, it’s old I know, but it’s working great) locally and it doesn’t work. I replaced the input stream and the dynamoDB app name in the config, that’s it.
It looks like the loader is seeing the records but not able to process them?
This is the log I’m getting:
java -jar snowplow-s3-loader-0.7.0.jar --config loader.conf
log4j:WARN No appenders could be found for logger (com.amazonaws.AmazonWebServiceClient).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
[main] INFO com.snowplowanalytics.s3.loader.sinks.KinesisSink - Stream unsplash-snowplow-raw-loader-fail exists and is active
[main] INFO com.snowplowanalytics.s3.loader.S3Loader$ - Initializing sink with KinesisConnectorConfiguration: {regionName=us-east-1, s3Endpoint=https://s3.amazonaws.com, kinesisInputStream=xxx, maxRecords=500, connectorDestination=s3, bufferMillisecondsLimit=30000, bufferRecordCountLimit=500, s3Bucket=xxx, kinesisEndpoint=https://kinesis.us-east-1.amazonaws.com, appName=unsp_s3loader_recovery_raw, bufferByteSizeLimit=10000000, retryLimit=1, initialPositionInStream=LATEST}
[main] INFO com.snowplowanalytics.s3.loader.KinesisSourceExecutor - KinesisSourceExecutor worker created
[RecordProcessor-0001] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 67 records.
[RecordProcessor-0000] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 67 records.
[RecordProcessor-0003] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 60 records.
[RecordProcessor-0002] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 106 records.
[RecordProcessor-0001] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 82 records.
[RecordProcessor-0000] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 86 records.
[RecordProcessor-0002] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 138 records.
[RecordProcessor-0003] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 94 records.
[RecordProcessor-0000] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 100 records.
[RecordProcessor-0001] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 124 records.
[RecordProcessor-0002] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 168 records.
[RecordProcessor-0003] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 108 records.
[RecordProcessor-0003] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 214 records.
[RecordProcessor-0000] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 179 records.
[RecordProcessor-0002] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 147 records.
[RecordProcessor-0001] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 160 records.
[RecordProcessor-0003] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 248 records.
[RecordProcessor-0001] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 193 records.
[RecordProcessor-0000] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 183 records.
[RecordProcessor-0002] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 176 records.
[RecordProcessor-0000] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 244 records.
[RecordProcessor-0003] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 231 records.
[RecordProcessor-0002] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 306 records.
Looks like this unresolved issue:
I’ve hidden the stream and bucket names but they both exist.
So here I am, happy about the format thing but I can’t get the whole pipeline to work.
Not sure if you have any clue about this s3loader thing? Any thoughts about the format? It should be fine right?
I’ll probably come back to this tomorrow, I need some sleep