Below is code I have written. I am getting NotSerializableException. How
can I handle this scenario?

kafkaStream.foreachRDD(rdd => {
      println("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<")
      rdd.foreachPartition(partitionOfRecords => {
        partitionOfRecords.foreach(
          record => {

            //Write for CSV.
            if (true == true) {

              val structType = table.schema
              val csvFile = ssc.sparkContext.textFile(record.toString())

              val rowRDD = csvFile.map(x =>
getMappedRowFromCsvRecord(structType, x))

            }
          })

-- 
Regards,
Nishant

Reply via email to