We need to filter out certain records for all data files in archive/raw, then write the resulting data back to s3. We are using pyspark, but we are not able to read the LZO file properly and getting ��X�L)��W�!q���e instead of actual value. The command we used to read are
files = sc.newAPIHadoopFile(“s3://archive/raw/2018-07-01-01-00/2018-06-30-49579197474875581036433502924195635816725933726179000578-49579197474875581036433502924746905990470264003472916738.us-west-2.raw.lzo”, “com.hadoop.mapreduce.LzoTextInputFormat”,“org.apache.hadoop.io.LongWritable”,“org.apache.hadoop.io.Text”)
lzoRDD = files.map(lambda x: x[1])
– apply filter function on lzoRDD, then write it back to s3:
lzoRDD.saveAsTextFile(path=sys.argv[2], compressionCodecClass=“com.hadoop.compression.lzo.LzopCodec”)
The resulting data set can no longer be correctly parsed by snowplow-emr-etl-runner. What are the problems in the input variables to sc.newAPIHadoopFile ()?
We use “collectors: format: thrift”. What is the file format and encoding details of archive/raw files?
Thanks for help,
Richard
Thank you Alex. I know that’s not easy especially for us new to the field; I haven’t gotten much clue even after reading your code. But I have two specific questions about the log files in raw/ folder:
what encoding do they use? We see some un-recognized special characters (they are killer!) when using spark open those raw logs. Spark cluster defaults to UTF-8; un-recognized special characters mean they are not encoded in UTF-8?
Our log files are in thrift format. Each record is a [key, value] pair or a sequence of tab separated strings?
@RichardJ Did you end up working out the answer to your encoding questions? I am also chasing answers to this.
I currently have a collector writing to a kinesis stream, which then has Kinesis Firehose writing the records to S3. When attempting to open the files from S3 I am running into a number of encoding issues.
The raw collector payloads are Thrift encoded. This guide to handling raw bad rows from the awesome Poplin (formerly Snowflake) down under should be helpful.
I don’t have much experience with handling RAW (fingers crossed I don’t need to as I don’t envy you this task), but I believe the format is the same as a bad row payload so that should get you most of the way there.
Hi @jacobmaxkelsen,
If you are looking at the logs just after collector then they won’t be in json format they are thrift encoded. In python if you read this file as binary file you can see lines in byte format but you can still read them.
However if you read data after enrichment you can get data in proper tsv format, this can be easily read.
Then there are couple of Analytical SDKs of snowplow that you can use to convert data into json from tsv after enrichment phase.
Ps. @Colm’s comment makes a lot of sense in decoding the thrift data.