Hello,

I have a spark streaming application which reads from Kafka based on the
given schema.

Dataset<Row>  m_oKafkaEvents =
getSparkSession().readStream().format("kafka")
            .option("kafka.bootstrap.servers", strKafkaAddress)
            .option("assign", strSubscription)
            .option("maxOffsetsPerTrigger", "100000")
            .option("startingOffsets", "latest")
            .option("failOnDataLoss", false)
            .load()
            .filter(strFilter)

.select(functions.from_json(functions.col("value").cast("string"),
schema).alias("events"))
            .select("events.*");


Now this dataset is grouped by one of the column(InstanceId) which is the
key for us and then fed into flatMapGroupsWithState function. This function
does some correlation.

Dataset<InsightEventUpdate> sessionUpdates = m_oKafkaEvents.groupByKey(
  new MapFunction<Row, String>() {
    @Override public String call(Row event) {
    return event.getAs("InstanceId");
    }
  }, Encoders.STRING())
  .flatMapGroupsWithState(idstateUpdateFunction, OutputMode.Append(),
  Encoders.bean(InsightEventInfo.class),
Encoders.bean(InsightEventUpdate.class),
  GroupStateTimeout.ProcessingTimeTimeout());


The output dataset is of type InsightEventUpdate which contains List of
Spark Rows which is related to the InstanceId.

Now I want to convert this back into of type Dataset<Row>. Basically I have
List of rows.

I tried

sparkSession.createDataFrame(listOfRows, schema);

this gives me

ava.lang.NullPointerException
        at
org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:139)
        at
org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:137)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:73)
        at
org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:376)
        at
oracle.insight.spark.identifier_processor.ForeachFederatedEventProcessor.process(ForeachFederatedEventProcessor.java:102)

Can someone help me what is the way to go ahead?

thanks
Robin Kuttaiah

Reply via email to