Even if I do simple count aggregation like below I get the same error as https://issues.apache.org/jira/browse/SPARK-19268
Dataset<Row> df2 = df1.groupBy(functions.window(df1.col("Timestamp5"), "24 hours", "24 hours"), df1.col("AppName")).count(); On Wed, May 24, 2017 at 3:35 PM, kant kodali <kanth...@gmail.com> wrote: > Hi All, > > I am using Spark 2.1.1 and running in a Standalone mode using HDFS and > Kafka > > I am running into the same problem as https://issues.apache.org/ > jira/browse/SPARK-19268 with my app(not KafkaWordCount). > > Here is my sample code > > *Here is how I create ReadStream* > > sparkSession.readStream() > .format("kafka") > .option("kafka.bootstrap.servers", > config.getString("kafka.consumer.settings.bootstrapServers")) > .option("subscribe", > config.getString("kafka.consumer.settings.topicName")) > .option("startingOffsets", "earliest") > .option("failOnDataLoss", "false") > .option("checkpointLocation", hdfsCheckPointDir) > .load(); > > > *The core logic* > > Dataset<Row> df = ds.select(from_json(new Column("value").cast("string"), > client.getSchema()).as("payload")); > Dataset<Row> df1 = df.selectExpr("payload.info.*", "payload.data.*"); > Dataset<Row> df2 = df1.groupBy(window(df1.col("Timestamp5"), "24 hours", "24 > hours"), df1.col("AppName")).agg(sum("Amount")); > StreamingQuery query = df1.writeStream().foreach(new > KafkaSink()).outputMode("update").start(); > query.awaitTermination(); > > > I can also provide any other information you may need. > > Thanks! >