Yurii Oleynikov created SPARK-30634:
---------------------------------------

             Summary: Delta Merge and Arbitrary Stateful Processing in 
Structured streaming  (foreachBatch)
                 Key: SPARK-30634
                 URL: https://issues.apache.org/jira/browse/SPARK-30634
             Project: Spark
          Issue Type: Question
          Components: Examples, Spark Core, Structured Streaming
    Affects Versions: 2.4.3
         Environment: Spark 2.4.3 (scala 2.11.12)

Delta: 0.5.0

Java(TM) SE Runtime Environment (build 1.8.0_91-b14)

OS: Ubuntu 18.04 LTS

 
            Reporter: Yurii Oleynikov
         Attachments: Capture1.PNG

Hi , I've faced strange behaviour with Delta merge and Arbitrary Stateful 
Processing in Structured streaming.

I have an application that makes Arbitrary Stateful Processing in Structured 
Streaming and used delta.merge to update delta table.

 

I've noticed that longs inside implementation of 
{{MapGroupsWithStateFunction}}/ {{FlatMapGroupsWithStateFunction}} in my 
application outputted twice.

While finding a root cause I've also found that number State rows reported by 
Spark is also doubles.

I thought that may be there's a bug in my code, so I back to 
{{JavaStructuredSessionization}} Apache Spark examples and changed it a bit. 
Still got same result.

The problem happens only if I do not perform datch.DF.persist inside 
foreachBatch.
{code:java}
StreamingQuery query = sessionUpdates
        .writeStream()
        .outputMode("update")
        .foreachBatch((VoidFunction2<Dataset<SessionUpdate>, Long>) (batchDf, 
v2) -> {
            // following doubles number of spark state rows and causes 
MapGroupsWithStateFunction to log twice withport persisting
            deltaTable.as("sessions").merge(batchDf.toDF().as("updates"), 
mergeExpr)
                    .whenNotMatched().insertAll()
                    .whenMatched()
                    .updateAll()
                    .execute();
        })
        .trigger(Trigger.ProcessingTime(10000))
        .queryName("ACME")
        .start(); 
{code}
According to 
[https://docs.databricks.com/_static/notebooks/merge-in-streaming.html] and 
[Apache spark 
docs|https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch]
 there's seems to be no need to persist dataset/dataframe inside 
{{foreachBatch.}}

Sample code from Apache Spark examples with delta: 
[JavaStructuredSessionization with Delta 
merge|https://github.com/yurkao/delta-merge-sss/blob/master/src/main/java/JavaStructuredSessionization.java]

 

 

Appreciate your clarification.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to