HeartSaVioR edited a comment on pull request #32875:
URL: https://github.com/apache/spark/pull/32875#issuecomment-1029748381


   I confirmed that StreamingAggregation has same problem with stream-stream 
join problem described in SPARK-24588.
   
   Test code:
   
   ```
     test("simple count, update mode, check plan") {
       val inputData = MemoryStream[Int]
   
       val aggregated =
         inputData.toDF()
           .select('value as 'a, 'value * 2 as 'b)
           .repartition('b)
           .groupBy('a, 'b)
           .agg(count("*"))
           .as[(Int, Int, Long)]
   
       testStream(aggregated, Update)(
         AddData(inputData, 3),
         CheckLastBatch((3, 6, 1)),
         AddData(inputData, 3, 2),
         CheckLastBatch((3, 6, 2), (2, 4, 1)),
         StopStream,
         StartStream(),
         AddData(inputData, 3, 2, 1),
         CheckLastBatch((3, 6, 3), (2, 4, 2), (1, 2, 1)),
         // By default we run in new tuple mode.
         AddData(inputData, 4, 4, 4, 4),
         CheckLastBatch((4, 8, 4)),
         Execute { query =>
           logWarning(s"DEBUG: ${query.lastExecution.executedPlan}")
         }
       )
     }
   ```
   
   Output:
   
   ```
   16:52:16.736 WARN org.apache.spark.sql.streaming.StreamingAggregationSuite: 
DEBUG: WriteToDataSourceV2 
org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@61581663, 
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$2099/0x0000000800fcc040@650b39f1
   +- *(4) HashAggregate(keys=[a#3, b#4], functions=[count(1)], output=[a#3, 
b#4, count(1)#11L])
      +- StateStoreSave [a#3, b#4], state info [ checkpoint = 
file:/tmp/streaming.metadata-62c5613a-04b1-4c6a-befe-044ca4b97407/state, runId 
= b8546016-958d-401f-bb2a-cc05e5a9a156, opId = 0, ver = 3, numPartitions = 5], 
Update, 0, 2
         +- *(3) HashAggregate(keys=[a#3, b#4], functions=[merge_count(1)], 
output=[a#3, b#4, count#78L])
            +- StateStoreRestore [a#3, b#4], state info [ checkpoint = 
file:/tmp/streaming.metadata-62c5613a-04b1-4c6a-befe-044ca4b97407/state, runId 
= b8546016-958d-401f-bb2a-cc05e5a9a156, opId = 0, ver = 3, numPartitions = 5], 2
               +- *(2) HashAggregate(keys=[a#3, b#4], 
functions=[merge_count(1)], output=[a#3, b#4, count#78L])
                  +- *(2) HashAggregate(keys=[a#3, b#4], 
functions=[partial_count(1)], output=[a#3, b#4, count#78L])
                     +- Exchange hashpartitioning(b#4, 5), REPARTITION_BY_COL, 
[id=#580]
                        +- *(1) Project [value#1 AS a#3, (value#1 * 2) AS b#4]
                           +- MicroBatchScan[value#1] MemoryStreamDataSource
   ```
   
   Note that there was only a single shuffle performed via `.repartition('b)` 
and it satisfies the child distribution on a & b since the required 
distribution is ClusteredDistribution.
   
   While this seems OK and produces correct output, we can modify the query in 
various ways to break the query in further run - 1) remove `.repartition('b)` 
2) replace it with `.repartition('a)` 3) replace it with `.repartition('a, 'b)` 
4) replace it with `.repartition('b, 'a)`. All cases will satisfy 
`ClusteredDistribution("a", "b")` (if my understanding is correct) and does not 
trigger shuffle with hash partitioning. (Actually Spark should ensure hash 
partitioning with 'b only for the lifetime of this query, as state has been 
stored. That is another problem we have no solution for now.)
   
   The problem persisted on **all stateful operators** (otherwise this PR had 
to touch more places). Since HashClusteredDistribution was introduced in 
SPARK-21865 (2.3.0), Spark 2.3.0+ would have this problem.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to