I have setup an Scala stream collector and currently using in the STDOUT option.
I want to integrate the collector to an already running kafka service. I know that in future there will snowplow and Kafka integration available.
But currently is there any code or reference, I can use to implement for my use case.
Hey @PuneetBabbar - sure thing, there is a old branch here:
This is super-old in terms of the Scala Stream Collector and Kafka versions targeted. It would be worth updating this to the latest versions and opening a new PR for us so that we can accelerate the official release of Snowplow-Kafka (at which point you won’t have to operate a fork any more).
Will have a look into this. And open a PR as well.
Thanks @PuneetBabbar! Keep us posted as you get further along. Happy to have a Google Hangout too if there are design questions you want to go through together.
As a temporary solution you can use kafkacat using which you can pipe from stdout to a kafka instance.
This is something we got working in our preprod environment. Let me know if you need more details.
Collector App-> stdout | kafkacat ->kafka(raw topic)
kafkacat->(raw topic) | enrich -> stdout | kafkacat-> kafka(enriched topic
we also piped bad stream out of enrich and collector to bad topic in kafka
Thanks @devsaik @alex
So as per your direction, I was able to feed into the STDOUT beacon from scala stream to a Kafka Server. And was able to consume that by a Apache streaming application.
Now I’m facing issue to decode and transform this beacon from base 64 encoded Thrift format.
Do you have code any code sample that help to decode.
Or is there any scala collector setting to send this data in normal JSON or tsv format.
Thanks a lot for ur help.
Glad to hear you got it working! Scala Common Enrich is the Snowplow library which operates on the Base64-encoded Thrift format. Stream Enrich is the Snowplow stream-processing application which can process the Base64-encoded Thrift format as relayed in a Kinesis stream.
Our plans for Kafka support in Snowplow involve extending Stream Enrich to add support for reading and writing to/from Kafka. Pull request welcome!
I am also trying to integrate the kafka with snoplow collector, but I couldn’t find any information regarding. Kindly guide me through steps, how you did.
Just follow these steps, what i did to test on the same collector server install intstance of Kafka. Then start the kafka and zookeper.
Then just run the snowplow stream collector in STDOUT mode, and pipe its output to Kafak.
nohup ~/kafka/bin/zookeeper-server-start.sh ~/kafka/config/zookeeper.properties &
nohup ~/kafka/bin/kafka-server-start.sh ~/kafka/config/server.properties &
Send the snowplow stream to kafka
sudo ./snowplow-stream-collector-0.7.0 --config scala.stream.config | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic snowplow > /dev/null &
verify to kafka
sudo ~/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic snowplow --from-beginning
It should work.