HeartSaVioR edited a comment on pull request #32875:
URL: https://github.com/apache/spark/pull/32875#issuecomment-1029748381
I confirmed that StreamingAggregation has same problem with stream-stream
join problem described in SPARK-24588.
Test code:
```
test("simple count, update mode, check plan") {
val inputData = MemoryStream[Int]
val aggregated =
inputData.toDF()
.select('value as 'a, 'value * 2 as 'b)
.repartition('b)
.groupBy('a, 'b)
.agg(count("*"))
.as[(Int, Int, Long)]
testStream(aggregated, Update)(
AddData(inputData, 3),
CheckLastBatch((3, 6, 1)),
AddData(inputData, 3, 2),
CheckLastBatch((3, 6, 2), (2, 4, 1)),
StopStream,
StartStream(),
AddData(inputData, 3, 2, 1),
CheckLastBatch((3, 6, 3), (2, 4, 2), (1, 2, 1)),
// By default we run in new tuple mode.
AddData(inputData, 4, 4, 4, 4),
CheckLastBatch((4, 8, 4)),
Execute { query =>
logWarning(s"DEBUG: ${query.lastExecution.executedPlan}")
}
)
}
```
Output:
```
16:52:16.736 WARN org.apache.spark.sql.streaming.StreamingAggregationSuite:
DEBUG: WriteToDataSourceV2
org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@61581663,
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$2099/0x0000000800fcc040@650b39f1
+- *(4) HashAggregate(keys=[a#3, b#4], functions=[count(1)], output=[a#3,
b#4, count(1)#11L])
+- StateStoreSave [a#3, b#4], state info [ checkpoint =
file:/tmp/streaming.metadata-62c5613a-04b1-4c6a-befe-044ca4b97407/state, runId
= b8546016-958d-401f-bb2a-cc05e5a9a156, opId = 0, ver = 3, numPartitions = 5],
Update, 0, 2
+- *(3) HashAggregate(keys=[a#3, b#4], functions=[merge_count(1)],
output=[a#3, b#4, count#78L])
+- StateStoreRestore [a#3, b#4], state info [ checkpoint =
file:/tmp/streaming.metadata-62c5613a-04b1-4c6a-befe-044ca4b97407/state, runId
= b8546016-958d-401f-bb2a-cc05e5a9a156, opId = 0, ver = 3, numPartitions = 5], 2
+- *(2) HashAggregate(keys=[a#3, b#4],
functions=[merge_count(1)], output=[a#3, b#4, count#78L])
+- *(2) HashAggregate(keys=[a#3, b#4],
functions=[partial_count(1)], output=[a#3, b#4, count#78L])
+- Exchange hashpartitioning(b#4, 5), REPARTITION_BY_COL,
[id=#580]
+- *(1) Project [value#1 AS a#3, (value#1 * 2) AS b#4]
+- MicroBatchScan[value#1] MemoryStreamDataSource
```
Note that there was only a single shuffle performed via `.repartition('b)`
and it satisfies the child distribution on a & b since the required
distribution is ClusteredDistribution.
While this seems OK and produces correct output, we can modify the query in
various ways to break the query in further run - 1) remove `.repartition('b)`
2) replace it with `.repartition('a)` 3) replace it with `.repartition('a, 'b)`
4) replace it with `.repartition('b, 'a)`. All cases will satisfy
`ClusteredDistribution("a", "b")` and does not trigger shuffle with hash
partitioning. (Actually Spark should ensure hash partitioning with 'b only for
the lifetime of this query, as state has been stored.)
The problem persisted on **all stateful operators** (otherwise this PR had
to touch more places). Since HashClusteredDistribution was introduced in
SPARK-21865 (2.3.0), Spark 2.3.0+ would have this problem.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]