[GitHub] [spark] HeartSaVioR commented on pull request #32875: [SPARK-35703][SQL] Relax constraint for bucket join and remove HashClusteredDistribution

2022-02-07 Thread GitBox


HeartSaVioR commented on pull request #32875:
URL: https://github.com/apache/spark/pull/32875#issuecomment-1031237652


   I just took the step on the work - please review #35419


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on pull request #32875: [SPARK-35703][SQL] Relax constraint for bucket join and remove HashClusteredDistribution

2022-02-04 Thread GitBox


HeartSaVioR commented on pull request #32875:
URL: https://github.com/apache/spark/pull/32875#issuecomment-1029748381


   I confirmed that StreamingAggregation has same problem with stream-stream 
join problem described in SPARK-24588.
   
   Test code:
   
   ```
 test("simple count, update mode, check plan") {
   val inputData = MemoryStream[Int]
   
   val aggregated =
 inputData.toDF()
   .select('value as 'a, 'value * 2 as 'b)
   .repartition('b)
   .groupBy('a, 'b)
   .agg(count("*"))
   .as[(Int, Int, Long)]
   
   testStream(aggregated, Update)(
 AddData(inputData, 3),
 CheckLastBatch((3, 6, 1)),
 AddData(inputData, 3, 2),
 CheckLastBatch((3, 6, 2), (2, 4, 1)),
 StopStream,
 StartStream(),
 AddData(inputData, 3, 2, 1),
 CheckLastBatch((3, 6, 3), (2, 4, 2), (1, 2, 1)),
 // By default we run in new tuple mode.
 AddData(inputData, 4, 4, 4, 4),
 CheckLastBatch((4, 8, 4)),
 Execute { query =>
   logWarning(s"DEBUG: ${query.lastExecution.executedPlan}")
 }
   )
 }
   ```
   
   Output:
   
   ```
   16:52:16.736 WARN org.apache.spark.sql.streaming.StreamingAggregationSuite: 
DEBUG: WriteToDataSourceV2 
org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@61581663, 
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$2099/0x000800fcc040@650b39f1
   +- *(4) HashAggregate(keys=[a#3, b#4], functions=[count(1)], output=[a#3, 
b#4, count(1)#11L])
  +- StateStoreSave [a#3, b#4], state info [ checkpoint = 
file:/tmp/streaming.metadata-62c5613a-04b1-4c6a-befe-044ca4b97407/state, runId 
= b8546016-958d-401f-bb2a-cc05e5a9a156, opId = 0, ver = 3, numPartitions = 5], 
Update, 0, 2
 +- *(3) HashAggregate(keys=[a#3, b#4], functions=[merge_count(1)], 
output=[a#3, b#4, count#78L])
+- StateStoreRestore [a#3, b#4], state info [ checkpoint = 
file:/tmp/streaming.metadata-62c5613a-04b1-4c6a-befe-044ca4b97407/state, runId 
= b8546016-958d-401f-bb2a-cc05e5a9a156, opId = 0, ver = 3, numPartitions = 5], 2
   +- *(2) HashAggregate(keys=[a#3, b#4], 
functions=[merge_count(1)], output=[a#3, b#4, count#78L])
  +- *(2) HashAggregate(keys=[a#3, b#4], 
functions=[partial_count(1)], output=[a#3, b#4, count#78L])
 +- Exchange hashpartitioning(b#4, 5), REPARTITION_BY_COL, 
[id=#580]
+- *(1) Project [value#1 AS a#3, (value#1 * 2) AS b#4]
   +- MicroBatchScan[value#1] MemoryStreamDataSource
   ```
   
   Note that there was only a single shuffle performed via `.repartition('b)` 
and it satisfies the child distribution on a & b since the required 
distribution is ClusteredDistribution.
   
   While this seems OK and produces correct output, we can modify the query in 
various ways to break the query in further run - 1) remove `.repartition('b)` 
2) replace it with `.repartition('a)` 3) replace it with `.repartition('a, 'b)` 
4) replace it with `.repartition('b, 'a)`. All cases will satisfy 
ClusteredDistribution("a", "b") and does not trigger shuffle with hash 
partitioning.
   
   The problem persisted on **all stateful operators** (otherwise this PR had 
to touch more places). Since HashClusteredDistribution was introduced in 
SPARK-21865 (2.3.0), Spark 2.3.0+ would have this problem.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on pull request #32875: [SPARK-35703][SQL] Relax constraint for bucket join and remove HashClusteredDistribution

2022-01-28 Thread GitBox


HeartSaVioR commented on pull request #32875:
URL: https://github.com/apache/spark/pull/32875#issuecomment-1024660436


   If we feel the name of HashClusteredDistribution is too generic to infer 
that it's tightly coupled with Spark's hash partitioning, it might not be a 
crazy idea we add some prefix in the classname to make it clear. (Not 100% sure 
we would like to have generic version of HashClusteredDistribution later 
though.)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on pull request #32875: [SPARK-35703][SQL] Relax constraint for bucket join and remove HashClusteredDistribution

2022-01-27 Thread GitBox


HeartSaVioR commented on pull request #32875:
URL: https://github.com/apache/spark/pull/32875#issuecomment-1023908402


   That said, I see other missing spots in stateful operators using 
ClusteredDistribution (except stream-stream join), so unfortunately it seems to 
be a long-standing issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on pull request #32875: [SPARK-35703][SQL] Relax constraint for bucket join and remove HashClusteredDistribution

2022-01-27 Thread GitBox


HeartSaVioR commented on pull request #32875:
URL: https://github.com/apache/spark/pull/32875#issuecomment-1023907445


   When we specify HashClusteredDistribution on stateful operator, there are 
major assumptions that 
   
   1) HashClusteredDistribution creates HashPartitioning and we will never ever 
change it for the future.
   2) We will never ever change the implementation of `partitionIdExpression` 
in HashPartitioning for the future.
   3) No partitioning except HashPartitioning can satisfy 
HashClusteredDistribution.
   
   (I think we may be better to leave code comment for above to prevent the 
changes against HashClusteredDistribution.)
   
   Let's say, the child operator is range partitioned and we add stateful 
operator with ClusteredDistribution as required distribution. The range 
partitioning can satisfy ClusteredDistribution but the physical partitioning of 
the child is totally different with state and it leads correctness issue (even 
silently).
   
   Seems like DataSourcePartitioning doesn't allow the partitioning from data 
source to be satisfy HashClusteredDistribution - it only checks with 
ClusteredDistribution. This must not be changed unless the partitioning from 
data source guarantees the same physical partitioning with Spark's internal 
hash partitioning, which we don't have any way to guarantee it in the interface 
of Partitioning.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on pull request #32875: [SPARK-35703][SQL] Relax constraint for bucket join and remove HashClusteredDistribution

2022-01-27 Thread GitBox


HeartSaVioR commented on pull request #32875:
URL: https://github.com/apache/spark/pull/32875#issuecomment-1023854865


   I would suspect we will do the same mistake unintentionally if we don't 
explicitly call out. Please bring back the way for operators to explicitly 
require Spark's internal hash partition on specifying requirement on 
distribution.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on pull request #32875: [SPARK-35703][SQL] Relax constraint for bucket join and remove HashClusteredDistribution

2022-01-27 Thread GitBox


HeartSaVioR commented on pull request #32875:
URL: https://github.com/apache/spark/pull/32875#issuecomment-1023839698


   Yes, since we can't change the state partitioning, state partitioning must 
be considered as the first priority for stateful operators - even we make some 
improvements here (having information of physical partitioning of state) and 
feel safer to store state as same as source's partition, the operator must 
request state's partitioning so that it doesn't break anything when source has 
changed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on pull request #32875: [SPARK-35703][SQL] Relax constraint for bucket join and remove HashClusteredDistribution

2022-01-27 Thread GitBox


HeartSaVioR commented on pull request #32875:
URL: https://github.com/apache/spark/pull/32875#issuecomment-1023827145


   cc. @tdas @zsxwing @brkyvz Could you please help double check my theory? 
Thanks in advance.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on pull request #32875: [SPARK-35703][SQL] Relax constraint for bucket join and remove HashClusteredDistribution

2022-01-27 Thread GitBox


HeartSaVioR commented on pull request #32875:
URL: https://github.com/apache/spark/pull/32875#issuecomment-1023813343


   > Avoid shuffle when joining DSv2(s) bucketed in the way different than 
Spark bucketing.
   
   This already assumes DSv2 bucketing algorithm can be different from Spark's 
one and Spark avoids shuffle for this case. It is of course a great improvement 
in general, but in streaming context, state partitioning is not considered here.
   
   Given state partitioning is not flexible and we are yet to make metadata on 
state to have the information of the partitioning, our major assumption of the 
state partitioning is using Spark's internal hash function with the number of 
shuffle partition as the number of partition. If there is any case the 
assumption can be broken, we are yet to allow the case for streaming query. 
That said, there's a room for improvement.
   
   > So I think if data source bucketed as same as join keys, stream-stream 
join should be able to avoid shuffle before this PR. > After this PR, the 
stream-stream join behavior is not changed, it should still be able to avoid 
shuffle.
   
   I meant shuffle must be performed for stream-stream join if the source 
doesn't follow the Spark's internal hash function to retain the major 
assumption. Same for other stateful operators. That said, we may have already 
broken these cases since we didn't change these operators in this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on pull request #32875: [SPARK-35703][SQL] Relax constraint for bucket join and remove HashClusteredDistribution

2022-01-27 Thread GitBox


HeartSaVioR commented on pull request #32875:
URL: https://github.com/apache/spark/pull/32875#issuecomment-1023715717


   Oh well... I may need to try hacking the file stream source to support 
bucketing (regardless of whether it works correctly or not) and check the 
physical plan. 
   
   cc. @cloud-fan Could you please help triage that the problem may exist even 
before this PR? Would using HashClusteredDistribution "force" using Spark's 
internal hash function on distribution?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on pull request #32875: [SPARK-35703][SQL] Relax constraint for bucket join and remove HashClusteredDistribution

2022-01-27 Thread GitBox


HeartSaVioR commented on pull request #32875:
URL: https://github.com/apache/spark/pull/32875#issuecomment-1023682745


   > Currently the rule only skips shuffle in join if both sides report the 
same distribution
   
   If there is any chance for both sides to change the distributions 
altogether, rule will skip shuffle in join since they are already having same 
distribution, but stream-stream join should read from state as well which may 
be partitioned in different way and the partitioning is not flexible. 
   
   That said, state partitioning cannot be changed during query lifetime, at 
least for now. And we don't have a way to maintain the information of state 
partition (we only ensure the number of partitions of state, via sticking the 
config value for the number of "shuffle" partition), so the safest way for 
state is to follow the Spark's hash partitioning.
   
   The thing is the hash function - even if source is partitioned/bucketed in 
same criteria (columns), the hash function and the number of partitions must be 
same as Spark's state as well for stateful operators. That said, at least as of 
now, stateful operators cannot leverage the benefits on source side 
distribution. There's a room to improve this, like let state follow the 
distribution of inputs, but also store the distribution info to metadata so 
that the stateful operator forces the state distribution (triggering shuffle) 
if they are different in future runs.
   
   > with the first follow-up by @cloud-fan I think we've already restored the 
previous behavior.
   
   Could you please refer the commit/PR for this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on pull request #32875: [SPARK-35703][SQL] Relax constraint for bucket join and remove HashClusteredDistribution

2022-01-26 Thread GitBox


HeartSaVioR commented on pull request #32875:
URL: https://github.com/apache/spark/pull/32875#issuecomment-1022894109


   @sunchao 
   
   Sorry for the post-review. I didn't know this PR may affect streaming query 
and indicated later. 
   
   I discussed with @cloud-fan about this change, and we are concerned about 
any possibility on skipping shuffle against grouping keys in stateful 
operators, "including stream-stream join".
   
   In Structured Streaming, state is partitioned with grouping keys based on 
Spark's internal hash function, and the number of partition is static. That 
said, if Spark does not respect the distribution of state against stateful 
operator, it leads to correctness problem.
   
   So please consider that same key is co-located for three aspects (left, 
right, state) in stream-stream join. It's going to apply the same for non-join 
case, e.g. aggregation against bucket table. other stateful operators will have 
two aspects, (key, state). In short, state must be considered.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org