Count number of messages produced by GCSloader for collector

Hi,

I was building an Automation framework for testing. So, I was writing a java code to count the number of messages produced by GCSloader for the collector to ensure nothing is missing from the tracker.

Question: do you have the simplest way to count the number of messages? Because I see the collector output format is thrift.

Thanks,
Hanumanth

Hi @Hanumanth ,

What is the input of your test ? Is it data on GCS ?

If what you want to test is the collector, you don’t need GCS loader, you can read directly the output of the collector in PubSub.

One thrift payload === 1 tracker event, so all you need is to count the payloads. If you need to do some parsing, you need to use something like this.

Hi @BenB ,

Basically, I need to test the messages flowing from the collector to Bq(whole pipeline by count). So, we are storing both good and bad events into GCS for future reference. Hence, we are maintaining gcs loader for every component so that we can have a copy of the messages for each component.

Attaching the example payload of the collector. Processing: uat_gcsloader_collector_2021_11_03_01_output-2021-11-03T01_30_00.000Z-2021-11-03T01_35_00.000Z-pane-0-last-00000-of-00001.txt…


do you suggest the best way to count the payloads looking at this?

Thanks,
Hanumanth

If you want to count the payloads, you can just count the number of lines in that file (assuming you are line delimiting them) or count the number of messages in the PubSub raw queue.

If you want to count the number of events then as Ben mentioned you’ll need to decode the serialised Thrift record using the collector-payload schema and count the events that way (as one payload may contain 1 or more events).

1 Like

Can we convert the thrift output format into csv using the below option? is it available?

No you can’t, if you want to parse Thrift you need dedicated code for this (the one that I shared).

Why do you want to convert into csv if all what you need is the count ? As Mike said for that you only need to count the number of lines in your files.

Thank you @BenB for the clarification. So, similarly, we can count enrichers’ output as well for count validation with one another.

I mean it should be the number of lines in collector’s good record = the number of lines in enricher’s good record + the number of lines in enricher’s bad record as input for enricher is processed events of the collector.

It depends on your tracking. 1 tracking event == 1 collector payload. But tracking event/collector payload can contain multiple actual events, depending on your tracking implementation, and if that’s the case then several enriched events and bad rows can correspond to one tracking event/collector payload.

Hi @BenB ,

Thank you for the information. Just had one more question. How does gcs loader create the following directory? Is it based on current_timestamp or something else?

It’s based on the current date / time of the partition. If you’d like to change it you can do so by modifying the --dateFormat option to the GCS loader.

Thanks, @mike. We thought it was based on collector_timestamp or some other timestamps that are captured in snowplow events.

So, it is the current_date at which gcs loader is loading data.

Yes - technically it is around the current timestamp, namely the filename is based on the UTC end time of the window - which is why you’ll see the minutes ending in 0 or 5 if your windows are 5 minutes long.