Hello,
I’m new to this but I’m trying to figure out how to get a Streaming DataFrame in Spark from Kafka input.
val spark = SparkSession
.builder()
.appName("Main Processor")
.getOrCreate()
import spark.implicits._
val sdf = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "enriched-good")
.load()
sdf.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
val events = sdf.map(line => Event.parse(line))
.filter(_.isValid)
.flatMap(_.toOption)
.map(event => event.toJson(true).noSpaces)
The last part makes no sense as events is a DStream.
Any idea how to achieve that ?