Hello, I have a spark streaming application which reads from Kafka based on the given schema.
Dataset<Row> m_oKafkaEvents = getSparkSession().readStream().format("kafka") .option("kafka.bootstrap.servers", strKafkaAddress) .option("assign", strSubscription) .option("maxOffsetsPerTrigger", "100000") .option("startingOffsets", "latest") .option("failOnDataLoss", false) .load() .filter(strFilter) .select(functions.from_json(functions.col("value").cast("string"), schema).alias("events")) .select("events.*"); Now this dataset is grouped by one of the column(InstanceId) which is the key for us and then fed into flatMapGroupsWithState function. This function does some correlation. Dataset<InsightEventUpdate> sessionUpdates = m_oKafkaEvents.groupByKey( new MapFunction<Row, String>() { @Override public String call(Row event) { return event.getAs("InstanceId"); } }, Encoders.STRING()) .flatMapGroupsWithState(idstateUpdateFunction, OutputMode.Append(), Encoders.bean(InsightEventInfo.class), Encoders.bean(InsightEventUpdate.class), GroupStateTimeout.ProcessingTimeTimeout()); The output dataset is of type InsightEventUpdate which contains List of Spark Rows which is related to the InstanceId. Now I want to convert this back into of type Dataset<Row>. Basically I have List of rows. I tried sparkSession.createDataFrame(listOfRows, schema); this gives me ava.lang.NullPointerException at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:139) at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:137) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:73) at org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:376) at oracle.insight.spark.identifier_processor.ForeachFederatedEventProcessor.process(ForeachFederatedEventProcessor.java:102) Can someone help me what is the way to go ahead? thanks Robin Kuttaiah