Collector RabbitMQ and Enrich RabbitMQ released

We’re excited to announce that we’ve added RabbitMQ support to the collector and to enrich.

Why ?

We want to experiment running a cloud agnostic pipeline that we could deploy on any Kubernetes cluster (leveraging Helm Charts). This would mean that the same pipeline could be running on AWS, on GCP, on Azure, on-premise or even locally. We’d love to hear what you think of this idea!

We already had Kafka assets that could serve this purpose but it seems easier for us to run and maintain a RabbitMQ cluster in production than a Kafka one.

Experimental

It’s important to keep in mind that these 2 new assets are experimental, which means that we might decide to drop them in the future depending on the results of our experiments.

If anyone is interested in testing these assets, we’ll gladly receive feedback. Issues can be opened on collector and enrich repos.

How to run?

Instructions to setup and configure Collector RabbitMQ collector can be found on our docs website.

Instructions to setup and configure Enrich RabbitMQ collector can be found on our docs website.

9 Likes

Hi @BenB ,

Great release!

How do you decide when to drop things that are Experimental or promote them to fully supported? Is there a way to see the current state of that discussion?

We are considering using this for a project, but don’t want to commit to it, build a bunch of things, then have support be dropped.

Thanks!

-Sam

Hi @skerr,

We are still testing a few different messaging frameworks, including RabbitMQ, to pick the one we could use for all our AWS, GCP and (in the future) Azure deployments. Unfortunately, at the moment it looks like we are unlikely to go with RabbitMQ because of its scalability model and maintenance complexity. This means we are likely to deprecate these modules in the near future :frowning:

Are you set on RabbitMQ, or would you be ok with a different framework? Using Snowbridge, you can stream enriched Snowplow data from Kinesis or Pub/Sub into Kafka, and that integration is not going away :slight_smile: Actually, we have a Product Office Hours session about that tomorrow in case you’d like to attend.

1 Like

Hi @stanch,

Dare I ask, but is RabbitMQ definitely going to be (already is) deprecated? I came across it recently and it would certainly be of value. I am, however, having a few config issues, so was not sure if I could trouble someone for some help, if it is to be no more…

Hi @steve.gingell, we’re still planning to deprecate it, yes :frowning: May I ask what your use case is?

Implement data collection and enrichment on Azure.

So far, RabbitMQ and the Collector are running as containers in Azure Container Apps, with each message triggering an Azure Function (making use of the RabbitMQ binding).

Unfortunately, although the Collector is picking-up page views, for example, and states that it is sending a Thrift record to the RabbitMQ server, no such record is created and the Azure Function is not being triggered.

It feels like I’m agonisingly close to getting it working; gut instinct says it could just be a config issue with the collector, but not sure…

Is there a reason you are using RabbitMQ and not Kafka / EventHubs?

And are you planning to load the data into a warehouse / lake? (Which?) I appreciate that currently the loaders don’t support RabbitMQ nor Kafka, but that might change in the near future :slight_smile:

I didn’t realise that the collector integrated with Event Hubs / Kafka, but I had disregarded Kafka due to the anticipated high costs, but maybe I’m wrong … :slight_smile:

I had planned for the Azure Function to make use of the Rust Deltalake crate to write to Azure Data Lake Storage (Gen 2) or even the newly announced OneLake :slight_smile:

Hi @stanch,

Would you mind sending me the links to the Event Hubs implementation documentation? I have looked, but no joy - does it require Snowbridge?

Apologies if not mentioned specifically above, but a requirement for this project is 100% Azure - is this possible for the Collector and Enrich steps?

Thanks again for all of your help.

Hi @steve.gingell,

My understanding is that the Kafka assets (for Collector and Enrich) can be used with EventHubs.

Hi @stanch,

I have been working through the Azure Event Hubs implementation and it is my understanding that the following properties would need to be added to the KafkaProducer for it to work:

// Event Hubs specific settings using a connection string
props.setProperty(“security.protocol”, “SASL_SSL”)
props.setProperty(“sasl.mechanism”, “PLAIN”)
props.setProperty(“sasl.jaas.config”, “”“org.apache.kafka.common.security.plain.PlainLoginModule required username=”$ConnectionString" password=“<your_connection_string>”;“”")

// Event Hubs specific settings for OAuth
props.setProperty(“security.protocol”, “SASL_SSL”)
props.setProperty(“sasl.mechanism”, “OAUTHBEARER”)
props.setProperty(“sasl.jaas.config”, “org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;”)
props.setProperty(“sasl.login.callback.handler.class”, “CustomAuthenticateCallbackHandler”)

Are you aware of a successful implementation of Azure Event Hubs that was achieved without such changes?

Many thanks,
Steve

Hi @steve.gingell,

You can set these via producerConf / consumerConf, e.g. for Enrich: enrich/config.kafka.extended.hocon at 6a11d15a23e46e466b5641a482c477b04c4eac9a · snowplow/enrich · GitHub

2 Likes

Hi @stanch,

Thanks for your response and help; almost there :slight_smile:

With the Enrich Event Hub producer working, do you know of a way of deserialising the payload to one of the following Azure natively supported formats: Avro, JSON, CSV (ideally JSON as this opens up A LOT more doors).

Would Snowbridge be an option?

Thanks again for your help and hopefully I won’t need to bother you after I have this last piece of the puzzle :slight_smile:

Many thanks,
Steve

Hi @steve.gingell,

Glad to hear it’s almost working!

Snowbridge could be an option, but I’m afraid you’d need to wait for the Kafka source support PR. After that it’s just using the JSON transformation.

You could also use Benthos with a custom transformation (using jq or bloblang) that replicates what the Snowbridge transformation does (convert CSV to JSON and convert the stringified JSON columns to JSON).

Or… If you don’t mind waiting a few months, we are looking into getting our regular warehouse loader working on Azure and/or potentially loading into OneLake in delta parquet :wink: The advantage of using whatever loader we come up with here is that it will properly deal with schema evolution. I.e. if you change the schema of your data, the loader would automatically adjust the warehouse column or create a new one (we have this functionality today for AWS and GCP).

Hi @stanch ,

Thanks for this, Nick.

You’ve probably already guessed my next question, do you have an idea of timeframes? Ideally, I need to be up and running by the end of the month. Do you think that the Kafka source support PR will be closed by then?

And with regards the warehouse loader working on Azure, when realistically would “a few months” be?

In the meantime, is there an easy way to pull the default event schemata, so that I can generate some dummy data?

Many thanks,
Steve

Hi @stanch ,

I should have asked, what formats can the Kafka messages take (e.g. CSV, JSON) when produced by the Enrich step?

Cheers,
Steve

Do you think that the Kafka source support PR will be closed by then?

It’s not high priority atm but if someone was willing to implement the test described in this comment before either myself or the original contributor get to it, that would speed it along!

In the meantime, is there an easy way to pull the default event schemata, so that I can generate some dummy data?

I’ll do you one better - we have an event generator project to create dummy data.

1 Like

I think this answer will help you: Kafka Pipeline message format docs - #4 by stanch

This will depend on how other things in our roadmap pan out, but I don’t think it will happen this month. There is a good chance it happens during the summer though :slight_smile:

This is awesome, thanks @Colm - just what I’m looking for.

I’ve just tested it and although the following command appears to run without error, no events are being saved to file - any ideas? Appreciate any assistance you can offer.

sudo ./snowplow-event-generator --config ./snowplow-enrich-event-generator/config.hocon --output file:/snowplow-enrich-event-generator/kafka/my-events

Here is the contents of my config file:

{
“seed”: 1
“payloadsTotal”: 1000
“withRaw”: true
“withEnrichedTsv”: true
“withEnrichedJson”: true
“compress”: false
“payloadsPerFile”: 1000
“eventPerPayloadMin”: 1
“eventPerPayloadMax”: 1
“duplicates”: {
“natProb”: 0.0
“synProb”: 0.0
“natTotal”: 1
“synTotal”: 1
}
“timestamps”: {
“type”: “Fixed”
“at”: “2022-02-01T01:01:01z”
}
}j