[spark] branch branch-3.3 updated: [SPARK-34863][SQL] Support complex types for Parquet vectorized reader

2022-04-01 Thread viirya
This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
 new 09d2b0e  [SPARK-34863][SQL] Support complex types for Parquet 
vectorized reader
09d2b0e is described below

commit 09d2b0e92ae423a329c02824cb554482c80aa44f
Author: Chao Sun 
AuthorDate: Fri Apr 1 19:10:11 2022 -0700

[SPARK-34863][SQL] Support complex types for Parquet vectorized reader

### What changes were proposed in this pull request?

This PR adds support for complex types (e.g., list, map, array) for Spark's 
vectorized Parquet reader. In particular, this introduces the following changes:
1. Added a new class `ParquetColumnVector` which encapsulates all the 
necessary information needed when reading a Parquet column, including the 
`ParquetColumn` for the Parquet column, the repetition & definition levels 
(only allocated for a leaf-node of a complex type), as well as the reader for 
the column. In addition, it also contains logic for assembling nested columnar 
batches, via interpreting Parquet repetition & definition levels.
2. Changes are made in `VectorizedParquetRecordReader` to initialize a list 
of `ParquetColumnVector` for the columns read.
3. `VectorizedColumnReader` now also creates a reader for repetition 
column. Depending on whether maximum repetition level is 0, the batch read is 
now split into two code paths, e.g., `readBatch` versus `readBatchNested`.
4. Added logic to handle complex type in `VectorizedRleValuesReader`. For 
data types involving only struct or primitive types, it still goes with the old 
`readBatch` method which now also saves definition levels into a vector for 
later assembly. Otherwise, for data types involving array or map, a separate 
code path `readBatchNested` is introduced to handle repetition levels.
This PR also introduced a new flag 
`spark.sql.parquet.enableNestedColumnVectorizedReader` which turns the feature 
on or off. By default it is on to facilitates all the Parquet related test 
coverage.

### Why are the changes needed?

Whenever read schema containing complex types, at the moment Spark will 
fallback to the row-based reader in parquet-mr, which is much slower. As 
benchmark shows, by adding support into the vectorized reader, we can get ~15x 
on average speed up on reading struct fields, and ~1.5x when reading array of 
struct and map.

### Does this PR introduce _any_ user-facing change?

With the PR Spark should now support reading complex types in its 
vectorized Parquet reader. A new config 
`spark.sql.parquet.enableNestedColumnVectorizedReader` is introduced to turn 
the feature on or off.

### How was this patch tested?

Added new unit tests.

Closes #34659 from sunchao/SPARK-34863-new.

Authored-by: Chao Sun 
Signed-off-by: Liang-Chi Hsieh 
(cherry picked from commit deac8f950edb1d893fe4bf2cc7c4adbd29d1db22)
Signed-off-by: Liang-Chi Hsieh 
---
 .../org/apache/spark/sql/internal/SQLConf.scala|  11 +
 .../datasources/parquet/ParquetColumnVector.java   | 381 +++
 .../datasources/parquet/ParquetReadState.java  |  60 ++-
 .../parquet/SpecificParquetRecordReaderBase.java   |  15 +-
 .../parquet/VectorizedColumnReader.java|  84 +++--
 .../parquet/VectorizedParquetRecordReader.java | 160 +---
 .../parquet/VectorizedRleValuesReader.java | 413 ++---
 .../execution/vectorized/OnHeapColumnVector.java   |   2 +-
 .../execution/vectorized/WritableColumnVector.java |  48 ++-
 .../datasources/parquet/ParquetFileFormat.scala|   8 +-
 .../parquet/ParquetSchemaConverter.scala   |  17 +-
 .../datasources/parquet/ParquetUtils.scala |  27 +-
 .../v2/parquet/ParquetPartitionReaderFactory.scala |   4 +-
 .../sql-tests/results/explain-aqe.sql.out  |   3 +-
 .../resources/sql-tests/results/explain.sql.out|   3 +-
 .../datasources/FileBasedDataSourceTest.scala  |   9 +-
 .../sql/execution/datasources/orc/OrcTest.scala|   2 +
 .../datasources/orc/OrcV1SchemaPruningSuite.scala  |   2 +
 .../datasources/orc/OrcV2SchemaPruningSuite.scala  |   2 +
 .../parquet/ParquetColumnIndexSuite.scala  |  13 +
 .../parquet/ParquetFileFormatSuite.scala   |  37 ++
 .../datasources/parquet/ParquetIOSuite.scala   | 351 +
 .../parquet/ParquetSchemaPruningSuite.scala|   2 +
 .../datasources/parquet/ParquetTest.scala  |   2 +
 .../parquet/ParquetVectorizedSuite.scala   | 330 
 25 files changed, 1813 insertions(+), 173 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 1bba8b6..5bf5992 100644
--- a/sql/catalyst/sr

[spark] branch master updated: [SPARK-34863][SQL] Support complex types for Parquet vectorized reader

2022-04-01 Thread viirya
This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new deac8f9  [SPARK-34863][SQL] Support complex types for Parquet 
vectorized reader
deac8f9 is described below

commit deac8f950edb1d893fe4bf2cc7c4adbd29d1db22
Author: Chao Sun 
AuthorDate: Fri Apr 1 19:10:11 2022 -0700

[SPARK-34863][SQL] Support complex types for Parquet vectorized reader

### What changes were proposed in this pull request?

This PR adds support for complex types (e.g., list, map, array) for Spark's 
vectorized Parquet reader. In particular, this introduces the following changes:
1. Added a new class `ParquetColumnVector` which encapsulates all the 
necessary information needed when reading a Parquet column, including the 
`ParquetColumn` for the Parquet column, the repetition & definition levels 
(only allocated for a leaf-node of a complex type), as well as the reader for 
the column. In addition, it also contains logic for assembling nested columnar 
batches, via interpreting Parquet repetition & definition levels.
2. Changes are made in `VectorizedParquetRecordReader` to initialize a list 
of `ParquetColumnVector` for the columns read.
3. `VectorizedColumnReader` now also creates a reader for repetition 
column. Depending on whether maximum repetition level is 0, the batch read is 
now split into two code paths, e.g., `readBatch` versus `readBatchNested`.
4. Added logic to handle complex type in `VectorizedRleValuesReader`. For 
data types involving only struct or primitive types, it still goes with the old 
`readBatch` method which now also saves definition levels into a vector for 
later assembly. Otherwise, for data types involving array or map, a separate 
code path `readBatchNested` is introduced to handle repetition levels.
This PR also introduced a new flag 
`spark.sql.parquet.enableNestedColumnVectorizedReader` which turns the feature 
on or off. By default it is on to facilitates all the Parquet related test 
coverage.

### Why are the changes needed?

Whenever read schema containing complex types, at the moment Spark will 
fallback to the row-based reader in parquet-mr, which is much slower. As 
benchmark shows, by adding support into the vectorized reader, we can get ~15x 
on average speed up on reading struct fields, and ~1.5x when reading array of 
struct and map.

### Does this PR introduce _any_ user-facing change?

With the PR Spark should now support reading complex types in its 
vectorized Parquet reader. A new config 
`spark.sql.parquet.enableNestedColumnVectorizedReader` is introduced to turn 
the feature on or off.

### How was this patch tested?

Added new unit tests.

Closes #34659 from sunchao/SPARK-34863-new.

Authored-by: Chao Sun 
Signed-off-by: Liang-Chi Hsieh 
---
 .../org/apache/spark/sql/internal/SQLConf.scala|  11 +
 .../datasources/parquet/ParquetColumnVector.java   | 381 +++
 .../datasources/parquet/ParquetReadState.java  |  60 ++-
 .../parquet/SpecificParquetRecordReaderBase.java   |  15 +-
 .../parquet/VectorizedColumnReader.java|  84 +++--
 .../parquet/VectorizedParquetRecordReader.java | 160 +---
 .../parquet/VectorizedRleValuesReader.java | 413 ++---
 .../execution/vectorized/OnHeapColumnVector.java   |   2 +-
 .../execution/vectorized/WritableColumnVector.java |  48 ++-
 .../datasources/parquet/ParquetFileFormat.scala|   8 +-
 .../parquet/ParquetSchemaConverter.scala   |  17 +-
 .../datasources/parquet/ParquetUtils.scala |  27 +-
 .../v2/parquet/ParquetPartitionReaderFactory.scala |   4 +-
 .../sql-tests/results/explain-aqe.sql.out  |   3 +-
 .../resources/sql-tests/results/explain.sql.out|   3 +-
 .../datasources/FileBasedDataSourceTest.scala  |   9 +-
 .../sql/execution/datasources/orc/OrcTest.scala|   2 +
 .../datasources/orc/OrcV1SchemaPruningSuite.scala  |   2 +
 .../datasources/orc/OrcV2SchemaPruningSuite.scala  |   2 +
 .../parquet/ParquetColumnIndexSuite.scala  |  13 +
 .../parquet/ParquetFileFormatSuite.scala   |  37 ++
 .../datasources/parquet/ParquetIOSuite.scala   | 351 +
 .../parquet/ParquetSchemaPruningSuite.scala|   2 +
 .../datasources/parquet/ParquetTest.scala  |   2 +
 .../parquet/ParquetVectorizedSuite.scala   | 330 
 25 files changed, 1813 insertions(+), 173 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 9aad649..d268fd0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/inter

[spark] branch master updated (2f8613f -> 0b6ea01)

2022-04-01 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 2f8613f  [SPARK-38684][SS] Fix correctness issue on stream-stream 
outer join with RocksDB state store provider
 add 0b6ea01  [SPARK-38620][WEBUI] Replace `value.formatted(formatString)` 
with `formatString.format(value)` to clean up compilation warning

No new revisions were added by this update.

Summary of changes:
 .../scala/org/apache/spark/sql/streaming/ui/StreamingQueryPage.scala  | 4 ++--
 .../src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala  | 2 +-
 2 files changed, 3 insertions(+), 3 deletions(-)

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.2 updated: [SPARK-38754][SQL][TEST][3.1] Using EquivalentExpressions getEquivalentExprs function instead of getExprState at SubexpressionEliminationSuite

2022-04-01 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new 02a055a  [SPARK-38754][SQL][TEST][3.1] Using EquivalentExpressions 
getEquivalentExprs function instead of getExprState at 
SubexpressionEliminationSuite
02a055a is described below

commit 02a055a42de5597cd42c1c0d4470f0e769571dc3
Author: Dereck Li 
AuthorDate: Fri Apr 1 22:54:11 2022 +0800

[SPARK-38754][SQL][TEST][3.1] Using EquivalentExpressions 
getEquivalentExprs function instead of getExprState at 
SubexpressionEliminationSuite

### What changes were proposed in this pull request?

This pr use EquivalentExpressions getEquivalentExprs function instead of 
getExprState at SubexpressionEliminationSuite, and remove cpus paramter.
### Why are the changes needed?

Fixes build error

### Does this PR introduce _any_ user-facing change?

NO

### How was this patch tested?

CI Tests

Closes #36033 from monkeyboy123/SPARK-38754.

Authored-by: Dereck Li 
Signed-off-by: Wenchen Fan 
(cherry picked from commit f677272d08de030ff9c4045ceec062168105b75c)
Signed-off-by: Wenchen Fan 
---
 .../spark/sql/catalyst/expressions/SubexpressionEliminationSuite.scala  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubexpressionEliminationSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubexpressionEliminationSuite.scala
index b414019..72b39bb 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubexpressionEliminationSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubexpressionEliminationSuite.scala
@@ -401,7 +401,7 @@ class SubexpressionEliminationSuite extends SparkFunSuite 
with ExpressionEvalHel
   val equivalence = new EquivalentExpressions
   val expression = DynamicPruningExpression(Exists(LocalRelation()))
   equivalence.addExprTree(expression)
-  assert(equivalence.getExprState(expression).isEmpty)
+  assert(equivalence.getEquivalentExprs(expression).size == 0)
 } finally {
   TaskContext.unset()
 }

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.1 updated: [SPARK-38754][SQL][TEST][3.1] Using EquivalentExpressions getEquivalentExprs function instead of getExprState at SubexpressionEliminationSuite

2022-04-01 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
 new f677272  [SPARK-38754][SQL][TEST][3.1] Using EquivalentExpressions 
getEquivalentExprs function instead of getExprState at 
SubexpressionEliminationSuite
f677272 is described below

commit f677272d08de030ff9c4045ceec062168105b75c
Author: Dereck Li 
AuthorDate: Fri Apr 1 22:54:11 2022 +0800

[SPARK-38754][SQL][TEST][3.1] Using EquivalentExpressions 
getEquivalentExprs function instead of getExprState at 
SubexpressionEliminationSuite

### What changes were proposed in this pull request?

This pr use EquivalentExpressions getEquivalentExprs function instead of 
getExprState at SubexpressionEliminationSuite, and remove cpus paramter.
### Why are the changes needed?

Fixes build error

### Does this PR introduce _any_ user-facing change?

NO

### How was this patch tested?

CI Tests

Closes #36033 from monkeyboy123/SPARK-38754.

Authored-by: Dereck Li 
Signed-off-by: Wenchen Fan 
---
 .../sql/catalyst/expressions/SubexpressionEliminationSuite.scala  | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubexpressionEliminationSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubexpressionEliminationSuite.scala
index 6071b4b..0900765 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubexpressionEliminationSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubexpressionEliminationSuite.scala
@@ -330,13 +330,13 @@ class SubexpressionEliminationSuite extends SparkFunSuite 
with ExpressionEvalHel
   test("SPARK-38333: PlanExpression expression should skip addExprTree 
function in Executor") {
 try {
   // suppose we are in executor
-  val context1 = new TaskContextImpl(0, 0, 0, 0, 0, null, null, null, cpus 
= 0)
+  val context1 = new TaskContextImpl(0, 0, 0, 0, 0, null, null, null)
   TaskContext.setTaskContext(context1)
 
   val equivalence = new EquivalentExpressions
   val expression = DynamicPruningExpression(Exists(LocalRelation()))
   equivalence.addExprTree(expression)
-  assert(equivalence.getExprState(expression).isEmpty)
+  assert(equivalence.getEquivalentExprs(expression).size == 0)
 } finally {
   TaskContext.unset()
 }

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] 02/02: [SPARK-38684][SS] Fix correctness issue on stream-stream outer join with RocksDB state store provider

2022-04-01 Thread kabhwan
This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git

commit 7e542f164a2f17501b153a2d7b8c53636a5bb9b8
Author: Jungtaek Lim 
AuthorDate: Fri Apr 1 18:21:48 2022 +0900

[SPARK-38684][SS] Fix correctness issue on stream-stream outer join with 
RocksDB state store provider

(Credit to alex-balikov for the inspiration of the root cause observation, 
and anishshri-db for looking into the issue together.)

This PR fixes the correctness issue on stream-stream outer join with 
RocksDB state store provider, which can occur in certain condition, like below:

* stream-stream time interval outer join
  * left outer join has an issue on left side, right outer join has an 
issue on right side, full outer join has an issue on both sides
* At batch N, produce non-late row(s) on the problematic side
* At the same batch (batch N), some row(s) on the problematic side are 
evicted by the condition of watermark

The root cause is same as 
[SPARK-38320](https://issues.apache.org/jira/browse/SPARK-38320) - weak read 
consistency on iterator, especially with RocksDB state store provider. (Quoting 
from SPARK-38320: The problem is due to the StateStore.iterator not reflecting 
StateStore changes made after its creation.)

More specifically, if updates are performed during processing input rows 
and somehow updates the number of values for grouping key, the update is not 
seen in SymmetricHashJoinStateManager.removeByValueCondition, and the method 
does the eviction with the number of values in out of sync.

Making it more worse, if the method performs the eviction and updates the 
number of values for grouping key, it "overwrites" the number of value, 
effectively drop all rows being inserted in the same batch.

Below code blocks are references on understanding the details of the issue.


https://github.com/apache/spark/blob/ca7200b0008dc6101a252020e6c34ef7b72d81d6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala#L327-L339


https://github.com/apache/spark/blob/ca7200b0008dc6101a252020e6c34ef7b72d81d6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala#L619-L627


https://github.com/apache/spark/blob/ca7200b0008dc6101a252020e6c34ef7b72d81d6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala#L195-L201


https://github.com/apache/spark/blob/ca7200b0008dc6101a252020e6c34ef7b72d81d6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala#L208-L223

This PR fixes the outer iterators as late evaluation to ensure all updates 
on processing input rows are reflected "before" outer iterators are initialized.

The bug is described in above section.

No.

New UT added.

Closes #36002 from HeartSaVioR/SPARK-38684.

Authored-by: Jungtaek Lim 
Signed-off-by: Jungtaek Lim 
(cherry picked from commit 2f8613f22c0750c00cf1dcfb2f31c431d8dc1be7)
Signed-off-by: Jungtaek Lim 
---
 .../streaming/StreamingSymmetricHashJoinExec.scala | 81 --
 .../spark/sql/streaming/StreamingJoinSuite.scala   | 63 -
 2 files changed, 121 insertions(+), 23 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
index 616ae08..30d87af 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
@@ -318,17 +318,22 @@ case class StreamingSymmetricHashJoinExec(
   }
 }
 
+val initIterFn = { () =>
+  val removedRowIter = leftSideJoiner.removeOldState()
+  removedRowIter.filterNot { kv =>
+stateFormatVersion match {
+  case 1 => matchesWithRightSideState(new UnsafeRowPair(kv.key, 
kv.value))
+  case 2 => kv.matched
+  case _ => throwBadStateFormatVersionException()
+}
+  }.map(pair => joinedRow.withLeft(pair.value).withRight(nullRight))
+}
+
 // NOTE: we need to make sure `outerOutputIter` is evaluated "after" 
exhausting all of
-// elements in `innerOutputIter`, because evaluation of 
`innerOutputIter` may update
-// the match flag which the logic for outer join is relying on.
-val removedRowIter = leftSideJoiner.removeOldState()
-val outerOutputIter = removedRowIter.filterNot { kv =>
-  stateFormatVersion match {
-case 1 => matchesWithRightSideStat

[spark] 01/02: [SPARK-38333][SQL][3.2][FOLLOWUP] fix compilation error

2022-04-01 Thread kabhwan
This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git

commit 6a4b2c227553a472522a4d3f02e389141593497d
Author: Jungtaek Lim 
AuthorDate: Fri Apr 1 19:07:21 2022 +0900

[SPARK-38333][SQL][3.2][FOLLOWUP] fix compilation error
---
 .../spark/sql/catalyst/expressions/SubexpressionEliminationSuite.scala  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubexpressionEliminationSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubexpressionEliminationSuite.scala
index 4b353cd..b414019 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubexpressionEliminationSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubexpressionEliminationSuite.scala
@@ -395,7 +395,7 @@ class SubexpressionEliminationSuite extends SparkFunSuite 
with ExpressionEvalHel
   test("SPARK-38333: PlanExpression expression should skip addExprTree 
function in Executor") {
 try {
   // suppose we are in executor
-  val context1 = new TaskContextImpl(0, 0, 0, 0, 0, null, null, null, cpus 
= 0)
+  val context1 = new TaskContextImpl(0, 0, 0, 0, 0, null, null, null)
   TaskContext.setTaskContext(context1)
 
   val equivalence = new EquivalentExpressions

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.2 updated (e78cca9 -> 7e542f1)

2022-04-01 Thread kabhwan
This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a change to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git.


from e78cca9  [SPARK-38333][SQL] PlanExpression expression should skip 
addExprTree function in Executor
 new 6a4b2c2  [SPARK-38333][SQL][3.2][FOLLOWUP] fix compilation error
 new 7e542f1  [SPARK-38684][SS] Fix correctness issue on stream-stream 
outer join with RocksDB state store provider

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../SubexpressionEliminationSuite.scala|  2 +-
 .../streaming/StreamingSymmetricHashJoinExec.scala | 81 --
 .../spark/sql/streaming/StreamingJoinSuite.scala   | 63 -
 3 files changed, 122 insertions(+), 24 deletions(-)

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.3 updated: [SPARK-38684][SS] Fix correctness issue on stream-stream outer join with RocksDB state store provider

2022-04-01 Thread kabhwan
This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
 new 8a072ef  [SPARK-38684][SS] Fix correctness issue on stream-stream 
outer join with RocksDB state store provider
8a072ef is described below

commit 8a072ef6badad69ef5cfdd656d0c068979f6ea76
Author: Jungtaek Lim 
AuthorDate: Fri Apr 1 18:21:48 2022 +0900

[SPARK-38684][SS] Fix correctness issue on stream-stream outer join with 
RocksDB state store provider

### What changes were proposed in this pull request?

(Credit to alex-balikov for the inspiration of the root cause observation, 
and anishshri-db for looking into the issue together.)

This PR fixes the correctness issue on stream-stream outer join with 
RocksDB state store provider, which can occur in certain condition, like below:

* stream-stream time interval outer join
  * left outer join has an issue on left side, right outer join has an 
issue on right side, full outer join has an issue on both sides
* At batch N, produce non-late row(s) on the problematic side
* At the same batch (batch N), some row(s) on the problematic side are 
evicted by the condition of watermark

The root cause is same as 
[SPARK-38320](https://issues.apache.org/jira/browse/SPARK-38320) - weak read 
consistency on iterator, especially with RocksDB state store provider. (Quoting 
from SPARK-38320: The problem is due to the StateStore.iterator not reflecting 
StateStore changes made after its creation.)

More specifically, if updates are performed during processing input rows 
and somehow updates the number of values for grouping key, the update is not 
seen in SymmetricHashJoinStateManager.removeByValueCondition, and the method 
does the eviction with the number of values in out of sync.

Making it more worse, if the method performs the eviction and updates the 
number of values for grouping key, it "overwrites" the number of value, 
effectively drop all rows being inserted in the same batch.

Below code blocks are references on understanding the details of the issue.


https://github.com/apache/spark/blob/ca7200b0008dc6101a252020e6c34ef7b72d81d6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala#L327-L339


https://github.com/apache/spark/blob/ca7200b0008dc6101a252020e6c34ef7b72d81d6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala#L619-L627


https://github.com/apache/spark/blob/ca7200b0008dc6101a252020e6c34ef7b72d81d6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala#L195-L201


https://github.com/apache/spark/blob/ca7200b0008dc6101a252020e6c34ef7b72d81d6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala#L208-L223

This PR fixes the outer iterators as late evaluation to ensure all updates 
on processing input rows are reflected "before" outer iterators are initialized.

### Why are the changes needed?

The bug is described in above section.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New UT added.

Closes #36002 from HeartSaVioR/SPARK-38684.

Authored-by: Jungtaek Lim 
Signed-off-by: Jungtaek Lim 
(cherry picked from commit 2f8613f22c0750c00cf1dcfb2f31c431d8dc1be7)
Signed-off-by: Jungtaek Lim 
---
 .../streaming/StreamingSymmetricHashJoinExec.scala | 81 --
 .../spark/sql/streaming/StreamingJoinSuite.scala   | 63 -
 2 files changed, 121 insertions(+), 23 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
index 81888e0..aa888c1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
@@ -324,17 +324,22 @@ case class StreamingSymmetricHashJoinExec(
   }
 }
 
+val initIterFn = { () =>
+  val removedRowIter = leftSideJoiner.removeOldState()
+  removedRowIter.filterNot { kv =>
+stateFormatVersion match {
+  case 1 => matchesWithRightSideState(new UnsafeRowPair(kv.key, 
kv.value))
+  case 2 => kv.matched
+  case _ => throwBadStateFormatVersionException()
+}
+  }.map(pair => joinedRow.withLeft(pair.value).withRight(nullRight))
+}
+
 // NOTE: we need to make sure `outerOu

[spark] branch master updated: [SPARK-38684][SS] Fix correctness issue on stream-stream outer join with RocksDB state store provider

2022-04-01 Thread kabhwan
This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 2f8613f  [SPARK-38684][SS] Fix correctness issue on stream-stream 
outer join with RocksDB state store provider
2f8613f is described below

commit 2f8613f22c0750c00cf1dcfb2f31c431d8dc1be7
Author: Jungtaek Lim 
AuthorDate: Fri Apr 1 18:21:48 2022 +0900

[SPARK-38684][SS] Fix correctness issue on stream-stream outer join with 
RocksDB state store provider

### What changes were proposed in this pull request?

(Credit to alex-balikov for the inspiration of the root cause observation, 
and anishshri-db for looking into the issue together.)

This PR fixes the correctness issue on stream-stream outer join with 
RocksDB state store provider, which can occur in certain condition, like below:

* stream-stream time interval outer join
  * left outer join has an issue on left side, right outer join has an 
issue on right side, full outer join has an issue on both sides
* At batch N, produce non-late row(s) on the problematic side
* At the same batch (batch N), some row(s) on the problematic side are 
evicted by the condition of watermark

The root cause is same as 
[SPARK-38320](https://issues.apache.org/jira/browse/SPARK-38320) - weak read 
consistency on iterator, especially with RocksDB state store provider. (Quoting 
from SPARK-38320: The problem is due to the StateStore.iterator not reflecting 
StateStore changes made after its creation.)

More specifically, if updates are performed during processing input rows 
and somehow updates the number of values for grouping key, the update is not 
seen in SymmetricHashJoinStateManager.removeByValueCondition, and the method 
does the eviction with the number of values in out of sync.

Making it more worse, if the method performs the eviction and updates the 
number of values for grouping key, it "overwrites" the number of value, 
effectively drop all rows being inserted in the same batch.

Below code blocks are references on understanding the details of the issue.


https://github.com/apache/spark/blob/ca7200b0008dc6101a252020e6c34ef7b72d81d6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala#L327-L339


https://github.com/apache/spark/blob/ca7200b0008dc6101a252020e6c34ef7b72d81d6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala#L619-L627


https://github.com/apache/spark/blob/ca7200b0008dc6101a252020e6c34ef7b72d81d6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala#L195-L201


https://github.com/apache/spark/blob/ca7200b0008dc6101a252020e6c34ef7b72d81d6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala#L208-L223

This PR fixes the outer iterators as late evaluation to ensure all updates 
on processing input rows are reflected "before" outer iterators are initialized.

### Why are the changes needed?

The bug is described in above section.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New UT added.

Closes #36002 from HeartSaVioR/SPARK-38684.

Authored-by: Jungtaek Lim 
Signed-off-by: Jungtaek Lim 
---
 .../streaming/StreamingSymmetricHashJoinExec.scala | 81 --
 .../spark/sql/streaming/StreamingJoinSuite.scala   | 63 -
 2 files changed, 121 insertions(+), 23 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
index 81888e0..aa888c1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
@@ -324,17 +324,22 @@ case class StreamingSymmetricHashJoinExec(
   }
 }
 
+val initIterFn = { () =>
+  val removedRowIter = leftSideJoiner.removeOldState()
+  removedRowIter.filterNot { kv =>
+stateFormatVersion match {
+  case 1 => matchesWithRightSideState(new UnsafeRowPair(kv.key, 
kv.value))
+  case 2 => kv.matched
+  case _ => throwBadStateFormatVersionException()
+}
+  }.map(pair => joinedRow.withLeft(pair.value).withRight(nullRight))
+}
+
 // NOTE: we need to make sure `outerOutputIter` is evaluated "after" 
exhausting all of
-// elements in `innerOutputIter`, because evaluation of