Writing Custom Spark Lake Loader for Iceberg

Hello Folks!

I am currently working on writing a Spark-based lake loader for Iceberg that can load data from Kafka to GCS. I am taking some inspiration from the existing GCS loader, but I want to learn from your experiences in getting to know possible challenges and roadmaps.

One of the challenges I am facing is how to manage deserialization and schema evolution.

Deserialization

The Kafka messages will need to be deserialized into Spark DataFrames/Datasets before they can be loaded into Iceberg. I am considering two approaches:

Use a custom deserializer:
This would give me the most control and flexibility over the deserialization process.

  • One straight approach could be to simply convert TSV output from enrichment into JSON before loading it into GCS as an iceberg table.

  • The other approach would be to create some custom event objects for deserialization, which would be a bit more complex and would require changes to every schema.

Use a pre-built deserializer:
I think there is an existing Event class in the Analytics SDK that I can use to deserialze events into a JSON/Event Dataset in Spark.

Schema Evolution

Iceberg supports schema evolution, which allows the schema of a table to change over time. This is important for handling changes to the Kafka message schema.

I am considering two approaches for handling schema evolution:

  1. Use the Iceberg MergeWrite operation: This operation allows you to merge new data into an existing table, even if the schema of the new data is different from the schema of the existing table.
  2. Create a new Iceberg table partition for each schema version: This would be more complex to implement, but it would give me more control over how schema evolution is handled.

I am interested in hearing from others who have experience writing lake loaders or working with Iceberg schema evolution. What advice would you give me?

Discussion Questions:

  • What are the pros and cons of using a custom deserializer vs a pre-built deserializer using Analytics SDK? Any limitations with Event in the case of self-describing schemas?
  • What are the pros and cons of using the Iceberg MergeWrite operation vs creating a new Iceberg table for each schema version?
  • What other challenges have you faced when writing lake loaders or working with Iceberg schema evolution?
  • What advice would you give to someone who is new to writing lake loaders or working with Iceberg schema evolution?

More discussion here → Kafka to BigQuery/GCS loader

Hi @Jayant_Kumar

We faced some of these challenges you describe when we built the Snowplow Lake Loader. I think we found a nice way to solve the problem in a way that works for all pipelines and all schemas.

The Lake Loader works roughly like this:

  1. First it deserializes and parses the TSV output using the snowplow scala analytics sdk.
  2. Next it inspects any self describing event or self describing context, to discover which Iglu Schemas have been used for this event.
  3. Fetches those schemas from Iglu using iglu-scala-client.
  4. Uses the Iglu schema to guide transforming the JSON objects into strictly typed Spark fields. This step is very important! We cannot naively convert JSON into strict parquet types unless we have the Iglu schema to guide us.
  5. Write the data frame to Iceberg using the “append” mode and the “merge-schema” and “check-ordering” options. These options allow schema evolution when the pipeline starts tracking new event types for the first time.

The end result is a single wide events table, which has strictly-typed nested columns for each self-describing entity or event. Because it uses Iglu to guide the transformation, it is intelligent enough to know if a JSON numeric field should be a double or int parquet type. And whether a JSON string field should be a string or timestamp or date parquet type.

If you want to see the relevant code, I suggest start from this method and follow down through the transformation logic.

By the way, the Snowplow Lake Loader does have partial support for Iceberg with BigQuery/BigLake as a catalog. We did not announce it as a feature in the first release, because the experience is a bit rough around the edges still. You can see in this file how we configure the Spark for Iceberg with BigQuery.

We are continuing to develop the Lake Loader because it is an important part of Snowplow’s capabilities going forwards. You can definitely expect announcements over the coming months about improved Iceberg support.

Hi @istreeter Thank you for the pointers. While it takes sometimes for me to digest the cat’s syntax, I have a question in mind.

Considering the fact that schemas are fetched from the Iglu schema registry, won’t’ that be a bottleneck for large-scale event streaming pipelines? I mean, if there are multiple fan-out streaming pipelines in the downstream, each of them will have to fetch schema from Iglu registry for every event that passes through the system.

I am not sure if I am missing something. There could some workarounds like schema caching etc, I wanted to figure out if there are other alternatives for it.

HI @Jayant_Kumar you are absolutely right that those are things that need to be considered (and avoided!), to make it work efficiently.

The Lake Loader uses a couple of tricks to avoid the problem:

  1. It caches the schemas it fetches from the Iglu Schema. This caching is handled automatically by Iglu Scala Client: If you call resolver.lookupSchema(schemaKey) then the schema is returned from the cache if possible, or fetched from an Iglu Server if needed.
  2. It transforms events in batches, not one-by-one. So after parsing a set of Iglu Schemas for a batch, it can use the same parsing result to transform all events in one go.

I’ve found the Lake Loader can load >200,000 events per minute to the lake, when running on a single CPU. And if you give it more CPU then it runs even faster. Do you know what event volume your pipeline needs to handle?

This is good to know. Our legacy event tracking system, handles 5k QPS on average with two 4 vcores cpu and 8G RAM. This volume grows every month by .5%.

So with that estimation, 300k QPM.

Wouldn’t it be nice if the enrich stage use some schema-rich format to ease downstream pipelines?

@istreeter Can I use the code as is it? I tried finding the jar in the maven but it seems it’s not available at present. I can copy those files or place my spark code on the forked repo.

Hi @istreeter
I think you are the owner of this code, and I need to ask you for a favor. I am not really familiar with cats and FS2 stream libraries. I am still trying to learn, but I find it hard.

Is it possible for you to help me convert these series of expressions into something Apache Spark friendly code?

Something like dataFrame.forEachBatch(batch => function(itr(String)))

These are the magical expression that I am not able to fully understand.

Stream Expr1:

 private def processBatches[F[_]: Async: RegistryLookup](
    env: Environment[F],
    badProcessor: BadRowProcessor,
    ref: Ref[F, WindowState]
  ): Pipe[F, TokenedEvents, Nothing] =
    _.through(rememberTokens(ref))
      .through(incrementReceivedCount(env))
      .through(parseBytes(env, badProcessor))
      .through(sendAndDropFailedEvents(env))
      .through(batchUp(env.inMemBatchBytes))
      .through(sinkParsedBatch(env, badProcessor, ref))

Expr2:

  private def parseBytes[F[_]: Async](
    env: Environment[F],
    processor: BadRowProcessor
  ): Pipe[F, List[Array[Byte]], (List[BadRow], Batched)] =
    _.parEvalMapUnordered(env.cpuParallelism) { list =>
      list
        .traverse { bytes =>
          Applicative[F].pure {
            val stringified = new String(bytes, StandardCharsets.UTF_8)
            Event
              .parse(stringified)
              .map(event => Parsed(event, bytes.size, TabledEntity.forEvent(event)))
              .leftMap { failure =>
                val payload = BadRowRawPayload(stringified)
                BadRow.LoaderParsingError(processor, failure, payload)
              }
          }
        }
        .map(_.separate)
        .map { case (bad, parsed) =>
          val batched = parsed.foldLeft(Monoid[Batched].empty) {
            case (Batched(allEvents, allEntities, allBytes), Parsed(event, bytes, entities)) =>
              Batched(event :: allEvents, allEntities |+| entities, allBytes + bytes.toLong)
          }
          (bad, batched)
        }
    }

@istreeter Can you clarify this a bit more? I mean, do you recommend using IcebergBigLake Loader for Bigqurey or not?

@istreeter This expression doesn’t compile with Scala 2.12. Is there a way to fix it?

    resolver: Resolver[F],
    schemaKey: SchemaKey
  ): EitherT[F, FailureDetails.LoaderIgluError, NonEmptyList[SchemaWithKey]] =
    for {
      schemaKeys <- EitherT(resolver.listSchemasLike(schemaKey))
                      .leftMap(resolverFetchBadRow(schemaKey.vendor, schemaKey.name, schemaKey.format, schemaKey.version.model))
                      .map(_.schemas)
      schemaKeys <- EitherT.rightT[F, FailureDetails.LoaderIgluError](schemaKeys.filter(_ < schemaKey))
      topSchema <- getSchema(resolver, schemaKey)
      lowerSchemas <- schemaKeys.filter(_ < schemaKey).traverse(getSchema(resolver, _))
    } yield NonEmptyList(topSchema, lowerSchemas)```

Hi @Jayant_Kumar

Sorry I’ve let a few of your questions build up, so I will try to answer as many as I can…

Can I use the code as is it?

You certainly can! The code is licensed under The Snowplow Community License, which lets use use or modify the code, subject to some limitations of use.

The Lake Loader is distributed only as a docker image, which is why you cannot find anything in Maven. However, in the next release (0.2.0) we will be packaging it differently, so large parts of the code will available as libraries in Maven. Including the code we discussed which handles schema transformations.

Is it possible for you to help me convert these series of expressions into something Apache Spark friendly code?

This is a difficult task to modify it to work on a Spark platform. I also do not think it’s the right way to approach the problem: the existing Lake Loader is really close to having the functionality you need. And it performs really well (albeit without Spark). I think a better use of time is to adapt the lake loader in the small ways needed to get Iceberg support working.

Can you clarify this a bit more? I mean, do you recommend using IcebergBigLake Loader for Bigqurey or not?

I can give you some pointers on how to experiment with it, and then you can decide for yourself if it is production-ready for your use case.

First of all, you need to build the docker image for yourself, because the biglake support was not included in the official 0.1.0 release. You can do this by checking out the code from github and run:

sbt 'project gcpWithBiglake; Docker / publishLocal'

In your config file, the output.good section needs to look something like this:

output: {
  good: {
    type: "IcebergBigLake"
    catalog: "your_biglake_catalog"
    database: "your_biglake_database"
    table: "events"
    connection: "name_of_a_bigquery_connection"
    bqDataset: "name_of_a_bigquery_dataset"
    region: "your_gcp_region"
    location: "gs://your-snowplow-bucket/events"
  }
}

The connection parameter needs special attention. You need to configure a bigquery connection, and the associated service account needs to have the iam roles roles/biglake.viewer and roles/storage.objectViewer on your bucket.

Here’s the difficult bit… if you have properly configured your connection with the correct permissions, then the Lake Loader will successfully create the Iceberg table when you first run the loader, and register the table in BigLake and BigQuery. But, if you get the permissions wrong on the first attempt, then it ends up in a stuck state where the BigLake catalog exists, but the BigQuery table does not exist, and it does not ever create the BigQuery table on subsequent attempts.

The good news is, if you can successfully get past the step of creating the table, then after that the loader works very well. Querying the data in BigQuery works very well too.

This expression doesn’t compile with Scala 2.12. Is there a way to fix it

When we publish the code to maven (mentioned above) then it will support scala 2.12 and 2.13.

I am very excited that you are interested in the Lake Loader, because I think it’s a really great tool with huge potential. Please understand that it is still an early release, which is why I cannot offer you everything immediately. Feature requests are very welcome!

@istreeter While I agree on some of your arguments, I am trying to figure out a way to transform raw events into Spark struct type for wide variety of use cases.

I also agree that goal for this library is to not to provide tool specific typings. It’s my bad that I am finding it hard to translate the cat and effects code to a simpler scala code.

I currently do not want to adopt any of the lake table formats for the simple reason that I have low confidence on the bigquery and other GCP integrations with them.

I would rather prefer something much simpler like parquet to be my storage format.

The ecosystem that we have at our company is a lot around Spark and Flink streaming. So typing to dataframe is unavoidable in our case.

So I will still be trying hard to develop the utility even if it takes weeks or months because that’s going to be a blocker for Snowplow to be adopted here.

It would be helpful if you can provide little more help in doing that. But I understand your limitations.

Thank you.

Hi @istreeter

I have question around Biglake setup. How does it inherit the credentials?

If I am running it on GKE, what user or service account credentials should I point it to and how.

I meant, we are just pointing it to the connection but not credentials itself.

@istreeter Apologies for too many questions.
I also see this configuration present for Spark in the code. Can you point me to the dependency jar or coordinates? I don’t think it comes with the iceberg-spark runtime artifact.

.config("spark.sql.catalog.iceberg_catalog.catalog-impl", "org.apache.iceberg.gcp.biglake.BigLakeCatalog")

Hi @istreeter

I have managed to create an Iceberg Biglake table using the lake loader by following the steps mentioned above. As you said, it’s very tricky to get the linking right during the Biglake table creation itself; otherwise, it’s never linked to BigQuery.

The workaround would be to link this table with BigQuery explicitly by pointing to the metadata file in the warehouse. Though I did not try, it’s worth trying.

I am yet to test a few features like schema evolution, access control, etc.

I have a couple of requests/suggestions.

@Simon_Rumble @istreeter

  • We should release the Biglake loader image to Docker Hub to let the community use the official image itself.
  • Secondly, modify the spark caster and transformers in a way to allow them to be used extensively with Apache Spark(Batch+Streaming).

Hi @Jayant_Kumar this is great that you got it working. I will answer a few of your questions (even though it sounds like you solved them!) so that anyone else finding this thread can see the answers.

How does it inherit the credentials? If I am running it on GKE, what user or service account credentials should I point it to and how.

The docker image lets you use any of GCP’s standard methods of authenticating. If you are running in GKE then you can configure the workload to use a service account. If you are running the docker image locally, then you can mount user credentials into the docker image and then set the standard GOOGLE_APPLICATION_CREDENTIALS environment variable.

I also see this configuration present for Spark in the code. Can you point me to the dependency jar or coordinates? I don’t think it comes with the iceberg-spark runtime artifact.
.config("spark.sql.catalog.iceberg_catalog.catalog-impl", "org.apache.iceberg.gcp.biglake.BigLakeCatalog")

That class is provided by a JAR file that comes from GCP and is documented on this page. In the Lake Loader, the build step fetches the jar from GCS and includes it in the docker image. It’s in these lines of the build configuration. We have to use this strange method to include it in the build because the jar is not available in Maven.

The workaround would be to link this table with BigQuery explicitly by pointing to the metadata file in the warehouse.

I have also been thinking about this workaround. It would be easier to get set up for first time users, but it has the disadvantage that you periodically need to update BigQuery to point at the newest metadata file. The advantage of using BigLake is that BigQuery always sees the latest data.

1 Like

@istreeter Thank you for adding details.

Yes, updating metadata for bigquery is not very convenient. I will also think about it in my side to see the possible work arounds.

Also, if you can please share any details on the last two suggestions that I have on my last message, it would be great.