Yurii Oleynikov created SPARK-30634: ---------------------------------------
Summary: Delta Merge and Arbitrary Stateful Processing in Structured streaming (foreachBatch) Key: SPARK-30634 URL: https://issues.apache.org/jira/browse/SPARK-30634 Project: Spark Issue Type: Question Components: Examples, Spark Core, Structured Streaming Affects Versions: 2.4.3 Environment: Spark 2.4.3 (scala 2.11.12) Delta: 0.5.0 Java(TM) SE Runtime Environment (build 1.8.0_91-b14) OS: Ubuntu 18.04 LTS Reporter: Yurii Oleynikov Attachments: Capture1.PNG Hi , I've faced strange behaviour with Delta merge and Arbitrary Stateful Processing in Structured streaming. I have an application that makes Arbitrary Stateful Processing in Structured Streaming and used delta.merge to update delta table. I've noticed that longs inside implementation of {{MapGroupsWithStateFunction}}/ {{FlatMapGroupsWithStateFunction}} in my application outputted twice. While finding a root cause I've also found that number State rows reported by Spark is also doubles. I thought that may be there's a bug in my code, so I back to {{JavaStructuredSessionization}} Apache Spark examples and changed it a bit. Still got same result. The problem happens only if I do not perform datch.DF.persist inside foreachBatch. {code:java} StreamingQuery query = sessionUpdates .writeStream() .outputMode("update") .foreachBatch((VoidFunction2<Dataset<SessionUpdate>, Long>) (batchDf, v2) -> { // following doubles number of spark state rows and causes MapGroupsWithStateFunction to log twice withport persisting deltaTable.as("sessions").merge(batchDf.toDF().as("updates"), mergeExpr) .whenNotMatched().insertAll() .whenMatched() .updateAll() .execute(); }) .trigger(Trigger.ProcessingTime(10000)) .queryName("ACME") .start(); {code} According to [https://docs.databricks.com/_static/notebooks/merge-in-streaming.html] and [Apache spark docs|https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch] there's seems to be no need to persist dataset/dataframe inside {{foreachBatch.}} Sample code from Apache Spark examples with delta: [JavaStructuredSessionization with Delta merge|https://github.com/yurkao/delta-merge-sss/blob/master/src/main/java/JavaStructuredSessionization.java] Appreciate your clarification. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org