oracle.insight.spark.identifier_processor.ForeachFederatedEventProcessor is a ForeachWriter. Right? You can not use SparkSession in its process method as it will run in executors.
Best Regards, Ryan On Fri, Oct 5, 2018 at 6:54 AM Kuttaiah Robin <kutta...@gmail.com> wrote: > Hello, > > I have a spark streaming application which reads from Kafka based on the > given schema. > > Dataset<Row> m_oKafkaEvents = > getSparkSession().readStream().format("kafka") > .option("kafka.bootstrap.servers", strKafkaAddress) > .option("assign", strSubscription) > .option("maxOffsetsPerTrigger", "100000") > .option("startingOffsets", "latest") > .option("failOnDataLoss", false) > .load() > .filter(strFilter) > > .select(functions.from_json(functions.col("value").cast("string"), > schema).alias("events")) > .select("events.*"); > > > Now this dataset is grouped by one of the column(InstanceId) which is the > key for us and then fed into flatMapGroupsWithState function. This function > does some correlation. > > Dataset<InsightEventUpdate> sessionUpdates = m_oKafkaEvents.groupByKey( > new MapFunction<Row, String>() { > @Override public String call(Row event) { > return event.getAs("InstanceId"); > } > }, Encoders.STRING()) > .flatMapGroupsWithState(idstateUpdateFunction, OutputMode.Append(), > Encoders.bean(InsightEventInfo.class), > Encoders.bean(InsightEventUpdate.class), > GroupStateTimeout.ProcessingTimeTimeout()); > > > The output dataset is of type InsightEventUpdate which contains List of > Spark Rows which is related to the InstanceId. > > Now I want to convert this back into of type Dataset<Row>. Basically I > have List of rows. > > I tried > > sparkSession.createDataFrame(listOfRows, schema); > > this gives me > > ava.lang.NullPointerException > at > org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:139) > at > org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:137) > at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:73) > at > org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:376) > at > oracle.insight.spark.identifier_processor.ForeachFederatedEventProcessor.process(ForeachFederatedEventProcessor.java:102) > > Can someone help me what is the way to go ahead? > > thanks > Robin Kuttaiah > > > > >