Hi,
I am using structured streaming for ETL.
val data_stream = spark
.readStream // constantly expanding dataframe
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "sms_history")
.option("startingOffsets", "earliest") // begin from start of topic
.option("failOnDataLoss", "false")
.load()
I transform this into a DataSet with following schema.
root
|-- accountId: long (nullable = true)
|-- countryId: long (nullable = true)
|-- credits: double (nullable = true)
|-- deliveryStatus: string (nullable = true)
|-- senderId: string (nullable = true)
|-- sentStatus: string (nullable = true)
|-- source: integer (nullable = true)
|-- createdOn: timestamp (nullable = true)
|-- send_success_credits: double (nullable = true)
|-- send_error_credits: double (nullable = true)
|-- delivered_credits: double (nullable = true)
|-- invalid_sd_credits: double (nullable = true)
|-- undelivered_credits: double (nullable = true)
|-- unknown_credits: double (nullable = true)
Now I want to write this transformed stream to another Kafka topic. I have
temporarily used a UDF that accepts all these columns as parameters and create
a json string for adding a column "value" for writing to Kafka.
Is there easier and cleaner way to do the same?
Thanks,
Pankaj