[GitHub] [spark] HeartSaVioR commented on pull request #32875: [SPARK-35703][SQL] Relax constraint for bucket join and remove HashClusteredDistribution
HeartSaVioR commented on pull request #32875: URL: https://github.com/apache/spark/pull/32875#issuecomment-1031237652 I just took the step on the work - please review #35419 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on pull request #32875: [SPARK-35703][SQL] Relax constraint for bucket join and remove HashClusteredDistribution
HeartSaVioR commented on pull request #32875: URL: https://github.com/apache/spark/pull/32875#issuecomment-1029748381 I confirmed that StreamingAggregation has same problem with stream-stream join problem described in SPARK-24588. Test code: ``` test("simple count, update mode, check plan") { val inputData = MemoryStream[Int] val aggregated = inputData.toDF() .select('value as 'a, 'value * 2 as 'b) .repartition('b) .groupBy('a, 'b) .agg(count("*")) .as[(Int, Int, Long)] testStream(aggregated, Update)( AddData(inputData, 3), CheckLastBatch((3, 6, 1)), AddData(inputData, 3, 2), CheckLastBatch((3, 6, 2), (2, 4, 1)), StopStream, StartStream(), AddData(inputData, 3, 2, 1), CheckLastBatch((3, 6, 3), (2, 4, 2), (1, 2, 1)), // By default we run in new tuple mode. AddData(inputData, 4, 4, 4, 4), CheckLastBatch((4, 8, 4)), Execute { query => logWarning(s"DEBUG: ${query.lastExecution.executedPlan}") } ) } ``` Output: ``` 16:52:16.736 WARN org.apache.spark.sql.streaming.StreamingAggregationSuite: DEBUG: WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@61581663, org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$2099/0x000800fcc040@650b39f1 +- *(4) HashAggregate(keys=[a#3, b#4], functions=[count(1)], output=[a#3, b#4, count(1)#11L]) +- StateStoreSave [a#3, b#4], state info [ checkpoint = file:/tmp/streaming.metadata-62c5613a-04b1-4c6a-befe-044ca4b97407/state, runId = b8546016-958d-401f-bb2a-cc05e5a9a156, opId = 0, ver = 3, numPartitions = 5], Update, 0, 2 +- *(3) HashAggregate(keys=[a#3, b#4], functions=[merge_count(1)], output=[a#3, b#4, count#78L]) +- StateStoreRestore [a#3, b#4], state info [ checkpoint = file:/tmp/streaming.metadata-62c5613a-04b1-4c6a-befe-044ca4b97407/state, runId = b8546016-958d-401f-bb2a-cc05e5a9a156, opId = 0, ver = 3, numPartitions = 5], 2 +- *(2) HashAggregate(keys=[a#3, b#4], functions=[merge_count(1)], output=[a#3, b#4, count#78L]) +- *(2) HashAggregate(keys=[a#3, b#4], functions=[partial_count(1)], output=[a#3, b#4, count#78L]) +- Exchange hashpartitioning(b#4, 5), REPARTITION_BY_COL, [id=#580] +- *(1) Project [value#1 AS a#3, (value#1 * 2) AS b#4] +- MicroBatchScan[value#1] MemoryStreamDataSource ``` Note that there was only a single shuffle performed via `.repartition('b)` and it satisfies the child distribution on a & b since the required distribution is ClusteredDistribution. While this seems OK and produces correct output, we can modify the query in various ways to break the query in further run - 1) remove `.repartition('b)` 2) replace it with `.repartition('a)` 3) replace it with `.repartition('a, 'b)` 4) replace it with `.repartition('b, 'a)`. All cases will satisfy ClusteredDistribution("a", "b") and does not trigger shuffle with hash partitioning. The problem persisted on **all stateful operators** (otherwise this PR had to touch more places). Since HashClusteredDistribution was introduced in SPARK-21865 (2.3.0), Spark 2.3.0+ would have this problem. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on pull request #32875: [SPARK-35703][SQL] Relax constraint for bucket join and remove HashClusteredDistribution
HeartSaVioR commented on pull request #32875: URL: https://github.com/apache/spark/pull/32875#issuecomment-1024660436 If we feel the name of HashClusteredDistribution is too generic to infer that it's tightly coupled with Spark's hash partitioning, it might not be a crazy idea we add some prefix in the classname to make it clear. (Not 100% sure we would like to have generic version of HashClusteredDistribution later though.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on pull request #32875: [SPARK-35703][SQL] Relax constraint for bucket join and remove HashClusteredDistribution
HeartSaVioR commented on pull request #32875: URL: https://github.com/apache/spark/pull/32875#issuecomment-1023908402 That said, I see other missing spots in stateful operators using ClusteredDistribution (except stream-stream join), so unfortunately it seems to be a long-standing issue. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on pull request #32875: [SPARK-35703][SQL] Relax constraint for bucket join and remove HashClusteredDistribution
HeartSaVioR commented on pull request #32875: URL: https://github.com/apache/spark/pull/32875#issuecomment-1023907445 When we specify HashClusteredDistribution on stateful operator, there are major assumptions that 1) HashClusteredDistribution creates HashPartitioning and we will never ever change it for the future. 2) We will never ever change the implementation of `partitionIdExpression` in HashPartitioning for the future. 3) No partitioning except HashPartitioning can satisfy HashClusteredDistribution. (I think we may be better to leave code comment for above to prevent the changes against HashClusteredDistribution.) Let's say, the child operator is range partitioned and we add stateful operator with ClusteredDistribution as required distribution. The range partitioning can satisfy ClusteredDistribution but the physical partitioning of the child is totally different with state and it leads correctness issue (even silently). Seems like DataSourcePartitioning doesn't allow the partitioning from data source to be satisfy HashClusteredDistribution - it only checks with ClusteredDistribution. This must not be changed unless the partitioning from data source guarantees the same physical partitioning with Spark's internal hash partitioning, which we don't have any way to guarantee it in the interface of Partitioning. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on pull request #32875: [SPARK-35703][SQL] Relax constraint for bucket join and remove HashClusteredDistribution
HeartSaVioR commented on pull request #32875: URL: https://github.com/apache/spark/pull/32875#issuecomment-1023854865 I would suspect we will do the same mistake unintentionally if we don't explicitly call out. Please bring back the way for operators to explicitly require Spark's internal hash partition on specifying requirement on distribution. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on pull request #32875: [SPARK-35703][SQL] Relax constraint for bucket join and remove HashClusteredDistribution
HeartSaVioR commented on pull request #32875: URL: https://github.com/apache/spark/pull/32875#issuecomment-1023839698 Yes, since we can't change the state partitioning, state partitioning must be considered as the first priority for stateful operators - even we make some improvements here (having information of physical partitioning of state) and feel safer to store state as same as source's partition, the operator must request state's partitioning so that it doesn't break anything when source has changed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on pull request #32875: [SPARK-35703][SQL] Relax constraint for bucket join and remove HashClusteredDistribution
HeartSaVioR commented on pull request #32875: URL: https://github.com/apache/spark/pull/32875#issuecomment-1023827145 cc. @tdas @zsxwing @brkyvz Could you please help double check my theory? Thanks in advance. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on pull request #32875: [SPARK-35703][SQL] Relax constraint for bucket join and remove HashClusteredDistribution
HeartSaVioR commented on pull request #32875: URL: https://github.com/apache/spark/pull/32875#issuecomment-1023813343 > Avoid shuffle when joining DSv2(s) bucketed in the way different than Spark bucketing. This already assumes DSv2 bucketing algorithm can be different from Spark's one and Spark avoids shuffle for this case. It is of course a great improvement in general, but in streaming context, state partitioning is not considered here. Given state partitioning is not flexible and we are yet to make metadata on state to have the information of the partitioning, our major assumption of the state partitioning is using Spark's internal hash function with the number of shuffle partition as the number of partition. If there is any case the assumption can be broken, we are yet to allow the case for streaming query. That said, there's a room for improvement. > So I think if data source bucketed as same as join keys, stream-stream join should be able to avoid shuffle before this PR. > After this PR, the stream-stream join behavior is not changed, it should still be able to avoid shuffle. I meant shuffle must be performed for stream-stream join if the source doesn't follow the Spark's internal hash function to retain the major assumption. Same for other stateful operators. That said, we may have already broken these cases since we didn't change these operators in this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on pull request #32875: [SPARK-35703][SQL] Relax constraint for bucket join and remove HashClusteredDistribution
HeartSaVioR commented on pull request #32875: URL: https://github.com/apache/spark/pull/32875#issuecomment-1023715717 Oh well... I may need to try hacking the file stream source to support bucketing (regardless of whether it works correctly or not) and check the physical plan. cc. @cloud-fan Could you please help triage that the problem may exist even before this PR? Would using HashClusteredDistribution "force" using Spark's internal hash function on distribution? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on pull request #32875: [SPARK-35703][SQL] Relax constraint for bucket join and remove HashClusteredDistribution
HeartSaVioR commented on pull request #32875: URL: https://github.com/apache/spark/pull/32875#issuecomment-1023682745 > Currently the rule only skips shuffle in join if both sides report the same distribution If there is any chance for both sides to change the distributions altogether, rule will skip shuffle in join since they are already having same distribution, but stream-stream join should read from state as well which may be partitioned in different way and the partitioning is not flexible. That said, state partitioning cannot be changed during query lifetime, at least for now. And we don't have a way to maintain the information of state partition (we only ensure the number of partitions of state, via sticking the config value for the number of "shuffle" partition), so the safest way for state is to follow the Spark's hash partitioning. The thing is the hash function - even if source is partitioned/bucketed in same criteria (columns), the hash function and the number of partitions must be same as Spark's state as well for stateful operators. That said, at least as of now, stateful operators cannot leverage the benefits on source side distribution. There's a room to improve this, like let state follow the distribution of inputs, but also store the distribution info to metadata so that the stateful operator forces the state distribution (triggering shuffle) if they are different in future runs. > with the first follow-up by @cloud-fan I think we've already restored the previous behavior. Could you please refer the commit/PR for this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on pull request #32875: [SPARK-35703][SQL] Relax constraint for bucket join and remove HashClusteredDistribution
HeartSaVioR commented on pull request #32875: URL: https://github.com/apache/spark/pull/32875#issuecomment-1022894109 @sunchao Sorry for the post-review. I didn't know this PR may affect streaming query and indicated later. I discussed with @cloud-fan about this change, and we are concerned about any possibility on skipping shuffle against grouping keys in stateful operators, "including stream-stream join". In Structured Streaming, state is partitioned with grouping keys based on Spark's internal hash function, and the number of partition is static. That said, if Spark does not respect the distribution of state against stateful operator, it leads to correctness problem. So please consider that same key is co-located for three aspects (left, right, state) in stream-stream join. It's going to apply the same for non-join case, e.g. aggregation against bucket table. other stateful operators will have two aspects, (key, state). In short, state must be considered. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org