Hi, I am using structured streaming for ETL.
val data_stream = spark .readStream // constantly expanding dataframe .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", "sms_history") .option("startingOffsets", "earliest") // begin from start of topic .option("failOnDataLoss", "false") .load() I transform this into a DataSet with following schema. root |-- accountId: long (nullable = true) |-- countryId: long (nullable = true) |-- credits: double (nullable = true) |-- deliveryStatus: string (nullable = true) |-- senderId: string (nullable = true) |-- sentStatus: string (nullable = true) |-- source: integer (nullable = true) |-- createdOn: timestamp (nullable = true) |-- send_success_credits: double (nullable = true) |-- send_error_credits: double (nullable = true) |-- delivered_credits: double (nullable = true) |-- invalid_sd_credits: double (nullable = true) |-- undelivered_credits: double (nullable = true) |-- unknown_credits: double (nullable = true) Now I want to write this transformed stream to another Kafka topic. I have temporarily used a UDF that accepts all these columns as parameters and create a json string for adding a column "value" for writing to Kafka. Is there easier and cleaner way to do the same? Thanks, Pankaj