hi,all: I'm using spark 2.4.4 to readstream data from kafka and want to write to kudu 1.7.0 , my code like below :
val kuduContext = new KuduContext("master:7051", spark.sparkContext) val console = cnew.select("*").as[CstoreNew] .writeStream .option("checkpointLocation", "/tmp/t3/") .trigger(Trigger.Once()) .foreach(new ForeachWriter[CstoreNew] { override def open(partitionId: Long, version: Long): Boolean = { true } override def process(value: CstoreNew): Unit = { val spark = SparkSessionSingleton.getInstance(sparkConf) val valueDF = Seq(value).toDF() // GET WRONG kuduContext.upsertRows(valueDF, "impala::test.cstore_bury_event_data") } override def close(errorOrNull: Throwable): Unit = { } }) val query = console.start() query.awaitTermination() when run to val valueDF = Seq(value).toDF() I got error msg : Caused by: java.lang.NullPointerException at org.apache.spark.sql.SQLImplicits.localSeqToDatasetHolder(SQLImplicits.scala:228) at com.gaojihealth.spark.kafkaconsumer.CstoreNew2KUDU$$anon$1.process(CstoreNew2KUDU.scala:122) ... and SQLImplicits.scala:228 is : 227: implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): DatasetHolder[T] = { 228: DatasetHolder(_sqlContext.createDataset(s)) 229: } can anyone give me some help? 2019-11-25 lk_spark