Analytics SDK Scala with Spark - DStream to Streaming Dataframe

Hello,

I’m new to this but I’m trying to figure out how to get a Streaming DataFrame in Spark from Kafka input.

val spark = SparkSession
  .builder()
  .appName("Main Processor")
  .getOrCreate()

import spark.implicits._

val sdf = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "enriched-good")
  .load() 
sdf.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]   

val events = sdf.map(line => Event.parse(line))
  .filter(_.isValid)
  .flatMap(_.toOption)
  .map(event => event.toJson(true).noSpaces)

The last part makes no sense as events is a DStream.

Any idea how to achieve that ?