Kafka to BigQuery/GCS loader

Hi Team,

While deploying snowplow-bigquery-streamloader, I realized that it only supports pub-sub as the input from the enrich stage as of now. I am currently using Kafka for log streaming.

I think the other option that I might have is to use Dataflow or Spark to do the same.

Any thoughts on this?

Thanks
Jay

Hi @Jayant_Kumar,

Currently we use native streaming on each cloud:

  • Kinesis on AWS
  • Pub/Sub on GCP
  • Event Hubs (Kafka) on Azure

While it’s possible to run the pipeline end to end on Kafka, only components relevant for Azure currently support that. I’m afraid that excludes the BigQuery loader.

(As a side note, it should not be necessary to set up components one by one, the quick start guide launches everything already pre-wired to use Pub/Sub across the pipeline.)

Hi @stanch Thank you for your thoughts on this.
We have confluent cloud-hosted plans and licenses for Kafka for the whole year. It would be a challenge to explain teams to adopt pub/sub and get rid of the current license.
Also, our existing streaming and batch pipelines are based on Kafka platform. It will be a big shift for them to migrate.

Additionally, I am setting it up on GKE. Considering my familiarity with Kafka and its configs, I managed to do it in a week. Now that I have realized that, there is a limitation.

While I can use Apache Spark to sync data to GCS and BQ, I am just worried that it may have some limitations or complexity while deserializing an enriched event into a DataFrame or Dataset.

Any thoughts on this? While this question can have another topic for wider view.

I see, thanks for explaining the context.

I can think of a few options, although none of them is ideal:

  • Use Pub/Sub just between Enrich and BigQuery loader (extra cost on top of the existing Confluent cluster)
  • Contribute a Kafka source for the BigQuery loader — as an example, we’ve added one to RDB Loader recently (this requires solid Scala knowledge)
  • Wait for the upcoming Apache Iceberg support in the Lake loader, which would allow you to consume data from GCS via external tables (but this option is not real-time)

I would advise against writing a Spark job as you describe. Once you start using self-describing events with custom schemas, you will get data that changes shape over time. Our existing loader applications deal with this nicely by managing the columns in the warehouse to receive data in different schemas.

1 Like

Thank you for the suggestions. I think some of the suggestions can work.

  1. I have good programming experience with Scala but only to certain extent in FP. But I can still give it a try to see if I can contribute.

I am guessing that, a lot of components would be common with the existing loaders. So it shouldn’t be a big problem to getting started.

  1. I would need little more clarity on the Spark Streaming part. We have multiple real-time stream processing pipelines built on spark which are mission critical.

The current latency/freshness for it to be delivered to our online systems is within 30 secs.

So even if I go ahead with native loader, I would still need to process enriched events directly from the pub/sub to maintain the data freshness.

So If I have a self-describing events schems and derived pojos, would I be able to deserialise it into Dataset? I am just asking based on your experience. Hope you don’t mind.

Thanks

I meant, we can’t wait for events to be delivered to Data Warehouse or Lake. It adds up delays and complexity in processing those events in the downstream in real-time.

Regardless of loading to a database like BigQuery, you can set up a separate consumer on the enriched stream to do real-time processing.

So even if I go ahead with native loader, I would still need to process enriched events directly from the pub/sub to maintain the data freshness.

So If I have a self-describing events schems and derived pojos, would I be able to deserialise it into Dataset? I am just asking based on your experience. Hope you don’t mind.

If by this, you mean you’d like to deliver the data to real-time systems - not do something like loading to a database - then this is likley fairly straightforward (depending on use case). The data will be enriched TSV format - which is a TSV with some JSON/JSON array columns. You can use the analytics SDKs to transform that into a JSON.

Loading the data to an in-memory spark Dataframe or similar is not difficult - but you need to be aware that the schemas for the self-describing JSON can change, and account for that in how you architect your job.

If your task is to load this data into a database, it is then much more complex, because the schema of the table you’re loading into is fixed, but the schema of the data evolves. So for getting the data into the database, we recommend using one of our loaders. If your use case has similar constraints or difficulties with evolving schemas, then the task will of course be much more challenging.

If you do need a loader, one option is to set up a kafka pipeline, and use snowbridge to stream that data into a separate pubsub topic, which has a bigquery loader set up on it. This would duplicate the cost of the enriched stream, but it would give you freedom to have your Kafka infrastructure power real-time use cases, while also loading to BQ without having to fork anything.

There’s absolutely no requirement to set up a loader at all though. We typically recommend doing so as the data tends to be very valuable for analytics use cases, but if your primary goal is to power real-time systems, you can do so without a database at all. :slight_smile:

1 Like

Hi @Colm

Good to hear that. I think it’s something that is negotiable.

I have a couple of points to discuss, because I am very new to it, I don’t have complete understanding of how the TSV data is deserialised into JSON. I think the analytics sdk might use some generic Event type for deserialisation.

  1. does that mean, even with self-describing schemas , the analytics sdk will work and using the same we can parse custom events and entities into JSON without additional boilerplate and approach remain the same.

I agree that schema evolution for databases are limited and assuming that loader and mutators must be checking the compatibilities and doing the type conversion to the compatible types.

  1. If we take the LakeHouse route for analytics and use streaming framework like Apache Spark with Lakshouse format like Iceberg which does offer schema evolution, and BigQuery has an integration with it, would that be a good approach?

Considering that enrichment stage will make sure that type changes are backward/forward compatible with the existing one and new types to evolve?

Loading the data to an in-memory spark Dataframe or similar is not difficult - but you need to be aware that the schemas for the self-describing JSON can change, and account for that in how you architect your job.

I think this makes sense, and one way to handle this would be to generate some target types like POJOs or case classes, so whenever there is a change in the schema, this job adopts those, as long as enrich stage makes sure the compatibility, this should be fine.

But I am not sure, if I can use these custom types with analytics sdk that you have mentioned and can deserialise the events in the similar fashion or may be the multiple stages like the generic event first and then into custom event and context.

We do favour object storage like GCS for supporting batch queries as long as the query performance is similar between native BigQuery format rather than loading data completely into it.

It gives us better control on the durability, integration and the overall cost.

@Colm @stanch I will be really thankful if you can share some final thoughts on this. It will help me move in the best direction possible as of now.

I have started a parallel topic for better visibility.

Link: Writing Custom Spark Lake Loader for Iceberg

Thank you!!

Hi Jaynat, thanks for sparking interesting discussion. Just to add it is definitely possible to load from kafka to BQ, we manage this using spark as an intermediary, with the following workflow:

transform good-enrich->events-process->load-via-spark-stream->reprocess with spark (depending on your use case)->push parquet files to GCS and load to BQ (just standard BQ client lib)

This does incur additional overhead, but for our use case it works really well and you can still do schema evolution by leveraging iglu versions, in our case, we just merge minor versions by taking the latest one, which assumes only field additions are allowed.

2 Likes

Hi @evaldas Thank you for the details.
Can you share a bit more details on the first three stages that you have mentioned? That will help.
transform good-enrich->events-process->load-via-spark-stream

The deserialization from TSV to JSON and leveraging the schema are things I am not very clear on.
Do you have any examples?

Hi @Jayant_Kumar so on a high level it looks like this:

val builder: StreamsBuilder = new StreamsBuilder
    val enrichedMessages: KStream[Array[Byte], Array[Byte]] = builder.stream[Array[Byte], Array[Byte]](inputTopic)
    (enrichedMessages map { case (_, rawEvent) =>
      val deserialized = Option(parseMessage(rawEvent))
      val transformed = deserialized.flatMap(m => transformMessage(m))

      val serialized = transformed.map(m => serializeMessage(m)).get
      (Array.empty[Byte], serialized)
    })
      .to((_: Array[Byte], value: Array[Byte], _: RecordContext) =>
        "realtime-events-topic"
    builder.build()

how you transform TSV to JSON is up to you, but we just match it as tuple field_name, type, and value for the base event and we extract contexts as sub-fields using schema name+version as an identifier.

Once you have a valid json, you can use iglu schemas to dynamically create spark struct objects that match pretty well and just process those events and write them as parquets.

1 Like

Hello @evaldas Thank you for the heads-up.

Let me try this snippet out and see if this works.