Hi Pankaj, What version of Spark are you using?
If you are using 2.4+ then there is an inbuilt function "to_json" which converts the columns of your dataset to JSON format. https://spark.apache.org/docs/2.4.0/api/sql/#to_json Akshay Bhardwaj +91-97111-33849 On Wed, Mar 6, 2019 at 10:29 PM Pankaj Wahane <pankajwah...@live.com> wrote: > Hi, > > I am using structured streaming for ETL. > > val data_stream = spark > .readStream // constantly expanding dataframe > .format("kafka") > .option("kafka.bootstrap.servers", "localhost:9092") > .option("subscribe", "sms_history") > .option("startingOffsets", "earliest") // begin from start of topic > .option("failOnDataLoss", "false") > .load() > > I transform this into a DataSet with following schema. > > root > |-- accountId: long (nullable = true) > |-- countryId: long (nullable = true) > |-- credits: double (nullable = true) > |-- deliveryStatus: string (nullable = true) > |-- senderId: string (nullable = true) > |-- sentStatus: string (nullable = true) > |-- source: integer (nullable = true) > |-- createdOn: timestamp (nullable = true) > |-- send_success_credits: double (nullable = true) > |-- send_error_credits: double (nullable = true) > |-- delivered_credits: double (nullable = true) > |-- invalid_sd_credits: double (nullable = true) > |-- undelivered_credits: double (nullable = true) > |-- unknown_credits: double (nullable = true) > > > Now I want to write this transformed stream to another Kafka topic. I have > temporarily used a UDF that accepts all these columns as parameters and > create a json string for adding a column "value" for writing to Kafka. > > Is there easier and cleaner way to do the same? > > > Thanks, > Pankaj > >