[spark] branch branch-3.3 updated: [SPARK-34863][SQL] Support complex types for Parquet vectorized reader
This is an automated email from the ASF dual-hosted git repository. viirya pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.3 by this push: new 09d2b0e [SPARK-34863][SQL] Support complex types for Parquet vectorized reader 09d2b0e is described below commit 09d2b0e92ae423a329c02824cb554482c80aa44f Author: Chao Sun AuthorDate: Fri Apr 1 19:10:11 2022 -0700 [SPARK-34863][SQL] Support complex types for Parquet vectorized reader ### What changes were proposed in this pull request? This PR adds support for complex types (e.g., list, map, array) for Spark's vectorized Parquet reader. In particular, this introduces the following changes: 1. Added a new class `ParquetColumnVector` which encapsulates all the necessary information needed when reading a Parquet column, including the `ParquetColumn` for the Parquet column, the repetition & definition levels (only allocated for a leaf-node of a complex type), as well as the reader for the column. In addition, it also contains logic for assembling nested columnar batches, via interpreting Parquet repetition & definition levels. 2. Changes are made in `VectorizedParquetRecordReader` to initialize a list of `ParquetColumnVector` for the columns read. 3. `VectorizedColumnReader` now also creates a reader for repetition column. Depending on whether maximum repetition level is 0, the batch read is now split into two code paths, e.g., `readBatch` versus `readBatchNested`. 4. Added logic to handle complex type in `VectorizedRleValuesReader`. For data types involving only struct or primitive types, it still goes with the old `readBatch` method which now also saves definition levels into a vector for later assembly. Otherwise, for data types involving array or map, a separate code path `readBatchNested` is introduced to handle repetition levels. This PR also introduced a new flag `spark.sql.parquet.enableNestedColumnVectorizedReader` which turns the feature on or off. By default it is on to facilitates all the Parquet related test coverage. ### Why are the changes needed? Whenever read schema containing complex types, at the moment Spark will fallback to the row-based reader in parquet-mr, which is much slower. As benchmark shows, by adding support into the vectorized reader, we can get ~15x on average speed up on reading struct fields, and ~1.5x when reading array of struct and map. ### Does this PR introduce _any_ user-facing change? With the PR Spark should now support reading complex types in its vectorized Parquet reader. A new config `spark.sql.parquet.enableNestedColumnVectorizedReader` is introduced to turn the feature on or off. ### How was this patch tested? Added new unit tests. Closes #34659 from sunchao/SPARK-34863-new. Authored-by: Chao Sun Signed-off-by: Liang-Chi Hsieh (cherry picked from commit deac8f950edb1d893fe4bf2cc7c4adbd29d1db22) Signed-off-by: Liang-Chi Hsieh --- .../org/apache/spark/sql/internal/SQLConf.scala| 11 + .../datasources/parquet/ParquetColumnVector.java | 381 +++ .../datasources/parquet/ParquetReadState.java | 60 ++- .../parquet/SpecificParquetRecordReaderBase.java | 15 +- .../parquet/VectorizedColumnReader.java| 84 +++-- .../parquet/VectorizedParquetRecordReader.java | 160 +--- .../parquet/VectorizedRleValuesReader.java | 413 ++--- .../execution/vectorized/OnHeapColumnVector.java | 2 +- .../execution/vectorized/WritableColumnVector.java | 48 ++- .../datasources/parquet/ParquetFileFormat.scala| 8 +- .../parquet/ParquetSchemaConverter.scala | 17 +- .../datasources/parquet/ParquetUtils.scala | 27 +- .../v2/parquet/ParquetPartitionReaderFactory.scala | 4 +- .../sql-tests/results/explain-aqe.sql.out | 3 +- .../resources/sql-tests/results/explain.sql.out| 3 +- .../datasources/FileBasedDataSourceTest.scala | 9 +- .../sql/execution/datasources/orc/OrcTest.scala| 2 + .../datasources/orc/OrcV1SchemaPruningSuite.scala | 2 + .../datasources/orc/OrcV2SchemaPruningSuite.scala | 2 + .../parquet/ParquetColumnIndexSuite.scala | 13 + .../parquet/ParquetFileFormatSuite.scala | 37 ++ .../datasources/parquet/ParquetIOSuite.scala | 351 + .../parquet/ParquetSchemaPruningSuite.scala| 2 + .../datasources/parquet/ParquetTest.scala | 2 + .../parquet/ParquetVectorizedSuite.scala | 330 25 files changed, 1813 insertions(+), 173 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 1bba8b6..5bf5992 100644 --- a/sql/catalyst/sr
[spark] branch master updated: [SPARK-34863][SQL] Support complex types for Parquet vectorized reader
This is an automated email from the ASF dual-hosted git repository. viirya pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new deac8f9 [SPARK-34863][SQL] Support complex types for Parquet vectorized reader deac8f9 is described below commit deac8f950edb1d893fe4bf2cc7c4adbd29d1db22 Author: Chao Sun AuthorDate: Fri Apr 1 19:10:11 2022 -0700 [SPARK-34863][SQL] Support complex types for Parquet vectorized reader ### What changes were proposed in this pull request? This PR adds support for complex types (e.g., list, map, array) for Spark's vectorized Parquet reader. In particular, this introduces the following changes: 1. Added a new class `ParquetColumnVector` which encapsulates all the necessary information needed when reading a Parquet column, including the `ParquetColumn` for the Parquet column, the repetition & definition levels (only allocated for a leaf-node of a complex type), as well as the reader for the column. In addition, it also contains logic for assembling nested columnar batches, via interpreting Parquet repetition & definition levels. 2. Changes are made in `VectorizedParquetRecordReader` to initialize a list of `ParquetColumnVector` for the columns read. 3. `VectorizedColumnReader` now also creates a reader for repetition column. Depending on whether maximum repetition level is 0, the batch read is now split into two code paths, e.g., `readBatch` versus `readBatchNested`. 4. Added logic to handle complex type in `VectorizedRleValuesReader`. For data types involving only struct or primitive types, it still goes with the old `readBatch` method which now also saves definition levels into a vector for later assembly. Otherwise, for data types involving array or map, a separate code path `readBatchNested` is introduced to handle repetition levels. This PR also introduced a new flag `spark.sql.parquet.enableNestedColumnVectorizedReader` which turns the feature on or off. By default it is on to facilitates all the Parquet related test coverage. ### Why are the changes needed? Whenever read schema containing complex types, at the moment Spark will fallback to the row-based reader in parquet-mr, which is much slower. As benchmark shows, by adding support into the vectorized reader, we can get ~15x on average speed up on reading struct fields, and ~1.5x when reading array of struct and map. ### Does this PR introduce _any_ user-facing change? With the PR Spark should now support reading complex types in its vectorized Parquet reader. A new config `spark.sql.parquet.enableNestedColumnVectorizedReader` is introduced to turn the feature on or off. ### How was this patch tested? Added new unit tests. Closes #34659 from sunchao/SPARK-34863-new. Authored-by: Chao Sun Signed-off-by: Liang-Chi Hsieh --- .../org/apache/spark/sql/internal/SQLConf.scala| 11 + .../datasources/parquet/ParquetColumnVector.java | 381 +++ .../datasources/parquet/ParquetReadState.java | 60 ++- .../parquet/SpecificParquetRecordReaderBase.java | 15 +- .../parquet/VectorizedColumnReader.java| 84 +++-- .../parquet/VectorizedParquetRecordReader.java | 160 +--- .../parquet/VectorizedRleValuesReader.java | 413 ++--- .../execution/vectorized/OnHeapColumnVector.java | 2 +- .../execution/vectorized/WritableColumnVector.java | 48 ++- .../datasources/parquet/ParquetFileFormat.scala| 8 +- .../parquet/ParquetSchemaConverter.scala | 17 +- .../datasources/parquet/ParquetUtils.scala | 27 +- .../v2/parquet/ParquetPartitionReaderFactory.scala | 4 +- .../sql-tests/results/explain-aqe.sql.out | 3 +- .../resources/sql-tests/results/explain.sql.out| 3 +- .../datasources/FileBasedDataSourceTest.scala | 9 +- .../sql/execution/datasources/orc/OrcTest.scala| 2 + .../datasources/orc/OrcV1SchemaPruningSuite.scala | 2 + .../datasources/orc/OrcV2SchemaPruningSuite.scala | 2 + .../parquet/ParquetColumnIndexSuite.scala | 13 + .../parquet/ParquetFileFormatSuite.scala | 37 ++ .../datasources/parquet/ParquetIOSuite.scala | 351 + .../parquet/ParquetSchemaPruningSuite.scala| 2 + .../datasources/parquet/ParquetTest.scala | 2 + .../parquet/ParquetVectorizedSuite.scala | 330 25 files changed, 1813 insertions(+), 173 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 9aad649..d268fd0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/inter
[spark] branch master updated (2f8613f -> 0b6ea01)
This is an automated email from the ASF dual-hosted git repository. srowen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 2f8613f [SPARK-38684][SS] Fix correctness issue on stream-stream outer join with RocksDB state store provider add 0b6ea01 [SPARK-38620][WEBUI] Replace `value.formatted(formatString)` with `formatString.format(value)` to clean up compilation warning No new revisions were added by this update. Summary of changes: .../scala/org/apache/spark/sql/streaming/ui/StreamingQueryPage.scala | 4 ++-- .../src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.2 updated: [SPARK-38754][SQL][TEST][3.1] Using EquivalentExpressions getEquivalentExprs function instead of getExprState at SubexpressionEliminationSuite
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 02a055a [SPARK-38754][SQL][TEST][3.1] Using EquivalentExpressions getEquivalentExprs function instead of getExprState at SubexpressionEliminationSuite 02a055a is described below commit 02a055a42de5597cd42c1c0d4470f0e769571dc3 Author: Dereck Li AuthorDate: Fri Apr 1 22:54:11 2022 +0800 [SPARK-38754][SQL][TEST][3.1] Using EquivalentExpressions getEquivalentExprs function instead of getExprState at SubexpressionEliminationSuite ### What changes were proposed in this pull request? This pr use EquivalentExpressions getEquivalentExprs function instead of getExprState at SubexpressionEliminationSuite, and remove cpus paramter. ### Why are the changes needed? Fixes build error ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? CI Tests Closes #36033 from monkeyboy123/SPARK-38754. Authored-by: Dereck Li Signed-off-by: Wenchen Fan (cherry picked from commit f677272d08de030ff9c4045ceec062168105b75c) Signed-off-by: Wenchen Fan --- .../spark/sql/catalyst/expressions/SubexpressionEliminationSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubexpressionEliminationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubexpressionEliminationSuite.scala index b414019..72b39bb 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubexpressionEliminationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubexpressionEliminationSuite.scala @@ -401,7 +401,7 @@ class SubexpressionEliminationSuite extends SparkFunSuite with ExpressionEvalHel val equivalence = new EquivalentExpressions val expression = DynamicPruningExpression(Exists(LocalRelation())) equivalence.addExprTree(expression) - assert(equivalence.getExprState(expression).isEmpty) + assert(equivalence.getEquivalentExprs(expression).size == 0) } finally { TaskContext.unset() } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.1 updated: [SPARK-38754][SQL][TEST][3.1] Using EquivalentExpressions getEquivalentExprs function instead of getExprState at SubexpressionEliminationSuite
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.1 by this push: new f677272 [SPARK-38754][SQL][TEST][3.1] Using EquivalentExpressions getEquivalentExprs function instead of getExprState at SubexpressionEliminationSuite f677272 is described below commit f677272d08de030ff9c4045ceec062168105b75c Author: Dereck Li AuthorDate: Fri Apr 1 22:54:11 2022 +0800 [SPARK-38754][SQL][TEST][3.1] Using EquivalentExpressions getEquivalentExprs function instead of getExprState at SubexpressionEliminationSuite ### What changes were proposed in this pull request? This pr use EquivalentExpressions getEquivalentExprs function instead of getExprState at SubexpressionEliminationSuite, and remove cpus paramter. ### Why are the changes needed? Fixes build error ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? CI Tests Closes #36033 from monkeyboy123/SPARK-38754. Authored-by: Dereck Li Signed-off-by: Wenchen Fan --- .../sql/catalyst/expressions/SubexpressionEliminationSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubexpressionEliminationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubexpressionEliminationSuite.scala index 6071b4b..0900765 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubexpressionEliminationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubexpressionEliminationSuite.scala @@ -330,13 +330,13 @@ class SubexpressionEliminationSuite extends SparkFunSuite with ExpressionEvalHel test("SPARK-38333: PlanExpression expression should skip addExprTree function in Executor") { try { // suppose we are in executor - val context1 = new TaskContextImpl(0, 0, 0, 0, 0, null, null, null, cpus = 0) + val context1 = new TaskContextImpl(0, 0, 0, 0, 0, null, null, null) TaskContext.setTaskContext(context1) val equivalence = new EquivalentExpressions val expression = DynamicPruningExpression(Exists(LocalRelation())) equivalence.addExprTree(expression) - assert(equivalence.getExprState(expression).isEmpty) + assert(equivalence.getEquivalentExprs(expression).size == 0) } finally { TaskContext.unset() } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] 02/02: [SPARK-38684][SS] Fix correctness issue on stream-stream outer join with RocksDB state store provider
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git commit 7e542f164a2f17501b153a2d7b8c53636a5bb9b8 Author: Jungtaek Lim AuthorDate: Fri Apr 1 18:21:48 2022 +0900 [SPARK-38684][SS] Fix correctness issue on stream-stream outer join with RocksDB state store provider (Credit to alex-balikov for the inspiration of the root cause observation, and anishshri-db for looking into the issue together.) This PR fixes the correctness issue on stream-stream outer join with RocksDB state store provider, which can occur in certain condition, like below: * stream-stream time interval outer join * left outer join has an issue on left side, right outer join has an issue on right side, full outer join has an issue on both sides * At batch N, produce non-late row(s) on the problematic side * At the same batch (batch N), some row(s) on the problematic side are evicted by the condition of watermark The root cause is same as [SPARK-38320](https://issues.apache.org/jira/browse/SPARK-38320) - weak read consistency on iterator, especially with RocksDB state store provider. (Quoting from SPARK-38320: The problem is due to the StateStore.iterator not reflecting StateStore changes made after its creation.) More specifically, if updates are performed during processing input rows and somehow updates the number of values for grouping key, the update is not seen in SymmetricHashJoinStateManager.removeByValueCondition, and the method does the eviction with the number of values in out of sync. Making it more worse, if the method performs the eviction and updates the number of values for grouping key, it "overwrites" the number of value, effectively drop all rows being inserted in the same batch. Below code blocks are references on understanding the details of the issue. https://github.com/apache/spark/blob/ca7200b0008dc6101a252020e6c34ef7b72d81d6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala#L327-L339 https://github.com/apache/spark/blob/ca7200b0008dc6101a252020e6c34ef7b72d81d6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala#L619-L627 https://github.com/apache/spark/blob/ca7200b0008dc6101a252020e6c34ef7b72d81d6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala#L195-L201 https://github.com/apache/spark/blob/ca7200b0008dc6101a252020e6c34ef7b72d81d6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala#L208-L223 This PR fixes the outer iterators as late evaluation to ensure all updates on processing input rows are reflected "before" outer iterators are initialized. The bug is described in above section. No. New UT added. Closes #36002 from HeartSaVioR/SPARK-38684. Authored-by: Jungtaek Lim Signed-off-by: Jungtaek Lim (cherry picked from commit 2f8613f22c0750c00cf1dcfb2f31c431d8dc1be7) Signed-off-by: Jungtaek Lim --- .../streaming/StreamingSymmetricHashJoinExec.scala | 81 -- .../spark/sql/streaming/StreamingJoinSuite.scala | 63 - 2 files changed, 121 insertions(+), 23 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala index 616ae08..30d87af 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala @@ -318,17 +318,22 @@ case class StreamingSymmetricHashJoinExec( } } +val initIterFn = { () => + val removedRowIter = leftSideJoiner.removeOldState() + removedRowIter.filterNot { kv => +stateFormatVersion match { + case 1 => matchesWithRightSideState(new UnsafeRowPair(kv.key, kv.value)) + case 2 => kv.matched + case _ => throwBadStateFormatVersionException() +} + }.map(pair => joinedRow.withLeft(pair.value).withRight(nullRight)) +} + // NOTE: we need to make sure `outerOutputIter` is evaluated "after" exhausting all of -// elements in `innerOutputIter`, because evaluation of `innerOutputIter` may update -// the match flag which the logic for outer join is relying on. -val removedRowIter = leftSideJoiner.removeOldState() -val outerOutputIter = removedRowIter.filterNot { kv => - stateFormatVersion match { -case 1 => matchesWithRightSideStat
[spark] 01/02: [SPARK-38333][SQL][3.2][FOLLOWUP] fix compilation error
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git commit 6a4b2c227553a472522a4d3f02e389141593497d Author: Jungtaek Lim AuthorDate: Fri Apr 1 19:07:21 2022 +0900 [SPARK-38333][SQL][3.2][FOLLOWUP] fix compilation error --- .../spark/sql/catalyst/expressions/SubexpressionEliminationSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubexpressionEliminationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubexpressionEliminationSuite.scala index 4b353cd..b414019 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubexpressionEliminationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubexpressionEliminationSuite.scala @@ -395,7 +395,7 @@ class SubexpressionEliminationSuite extends SparkFunSuite with ExpressionEvalHel test("SPARK-38333: PlanExpression expression should skip addExprTree function in Executor") { try { // suppose we are in executor - val context1 = new TaskContextImpl(0, 0, 0, 0, 0, null, null, null, cpus = 0) + val context1 = new TaskContextImpl(0, 0, 0, 0, 0, null, null, null) TaskContext.setTaskContext(context1) val equivalence = new EquivalentExpressions - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.2 updated (e78cca9 -> 7e542f1)
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a change to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git. from e78cca9 [SPARK-38333][SQL] PlanExpression expression should skip addExprTree function in Executor new 6a4b2c2 [SPARK-38333][SQL][3.2][FOLLOWUP] fix compilation error new 7e542f1 [SPARK-38684][SS] Fix correctness issue on stream-stream outer join with RocksDB state store provider The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../SubexpressionEliminationSuite.scala| 2 +- .../streaming/StreamingSymmetricHashJoinExec.scala | 81 -- .../spark/sql/streaming/StreamingJoinSuite.scala | 63 - 3 files changed, 122 insertions(+), 24 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.3 updated: [SPARK-38684][SS] Fix correctness issue on stream-stream outer join with RocksDB state store provider
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.3 by this push: new 8a072ef [SPARK-38684][SS] Fix correctness issue on stream-stream outer join with RocksDB state store provider 8a072ef is described below commit 8a072ef6badad69ef5cfdd656d0c068979f6ea76 Author: Jungtaek Lim AuthorDate: Fri Apr 1 18:21:48 2022 +0900 [SPARK-38684][SS] Fix correctness issue on stream-stream outer join with RocksDB state store provider ### What changes were proposed in this pull request? (Credit to alex-balikov for the inspiration of the root cause observation, and anishshri-db for looking into the issue together.) This PR fixes the correctness issue on stream-stream outer join with RocksDB state store provider, which can occur in certain condition, like below: * stream-stream time interval outer join * left outer join has an issue on left side, right outer join has an issue on right side, full outer join has an issue on both sides * At batch N, produce non-late row(s) on the problematic side * At the same batch (batch N), some row(s) on the problematic side are evicted by the condition of watermark The root cause is same as [SPARK-38320](https://issues.apache.org/jira/browse/SPARK-38320) - weak read consistency on iterator, especially with RocksDB state store provider. (Quoting from SPARK-38320: The problem is due to the StateStore.iterator not reflecting StateStore changes made after its creation.) More specifically, if updates are performed during processing input rows and somehow updates the number of values for grouping key, the update is not seen in SymmetricHashJoinStateManager.removeByValueCondition, and the method does the eviction with the number of values in out of sync. Making it more worse, if the method performs the eviction and updates the number of values for grouping key, it "overwrites" the number of value, effectively drop all rows being inserted in the same batch. Below code blocks are references on understanding the details of the issue. https://github.com/apache/spark/blob/ca7200b0008dc6101a252020e6c34ef7b72d81d6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala#L327-L339 https://github.com/apache/spark/blob/ca7200b0008dc6101a252020e6c34ef7b72d81d6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala#L619-L627 https://github.com/apache/spark/blob/ca7200b0008dc6101a252020e6c34ef7b72d81d6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala#L195-L201 https://github.com/apache/spark/blob/ca7200b0008dc6101a252020e6c34ef7b72d81d6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala#L208-L223 This PR fixes the outer iterators as late evaluation to ensure all updates on processing input rows are reflected "before" outer iterators are initialized. ### Why are the changes needed? The bug is described in above section. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New UT added. Closes #36002 from HeartSaVioR/SPARK-38684. Authored-by: Jungtaek Lim Signed-off-by: Jungtaek Lim (cherry picked from commit 2f8613f22c0750c00cf1dcfb2f31c431d8dc1be7) Signed-off-by: Jungtaek Lim --- .../streaming/StreamingSymmetricHashJoinExec.scala | 81 -- .../spark/sql/streaming/StreamingJoinSuite.scala | 63 - 2 files changed, 121 insertions(+), 23 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala index 81888e0..aa888c1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala @@ -324,17 +324,22 @@ case class StreamingSymmetricHashJoinExec( } } +val initIterFn = { () => + val removedRowIter = leftSideJoiner.removeOldState() + removedRowIter.filterNot { kv => +stateFormatVersion match { + case 1 => matchesWithRightSideState(new UnsafeRowPair(kv.key, kv.value)) + case 2 => kv.matched + case _ => throwBadStateFormatVersionException() +} + }.map(pair => joinedRow.withLeft(pair.value).withRight(nullRight)) +} + // NOTE: we need to make sure `outerOu
[spark] branch master updated: [SPARK-38684][SS] Fix correctness issue on stream-stream outer join with RocksDB state store provider
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 2f8613f [SPARK-38684][SS] Fix correctness issue on stream-stream outer join with RocksDB state store provider 2f8613f is described below commit 2f8613f22c0750c00cf1dcfb2f31c431d8dc1be7 Author: Jungtaek Lim AuthorDate: Fri Apr 1 18:21:48 2022 +0900 [SPARK-38684][SS] Fix correctness issue on stream-stream outer join with RocksDB state store provider ### What changes were proposed in this pull request? (Credit to alex-balikov for the inspiration of the root cause observation, and anishshri-db for looking into the issue together.) This PR fixes the correctness issue on stream-stream outer join with RocksDB state store provider, which can occur in certain condition, like below: * stream-stream time interval outer join * left outer join has an issue on left side, right outer join has an issue on right side, full outer join has an issue on both sides * At batch N, produce non-late row(s) on the problematic side * At the same batch (batch N), some row(s) on the problematic side are evicted by the condition of watermark The root cause is same as [SPARK-38320](https://issues.apache.org/jira/browse/SPARK-38320) - weak read consistency on iterator, especially with RocksDB state store provider. (Quoting from SPARK-38320: The problem is due to the StateStore.iterator not reflecting StateStore changes made after its creation.) More specifically, if updates are performed during processing input rows and somehow updates the number of values for grouping key, the update is not seen in SymmetricHashJoinStateManager.removeByValueCondition, and the method does the eviction with the number of values in out of sync. Making it more worse, if the method performs the eviction and updates the number of values for grouping key, it "overwrites" the number of value, effectively drop all rows being inserted in the same batch. Below code blocks are references on understanding the details of the issue. https://github.com/apache/spark/blob/ca7200b0008dc6101a252020e6c34ef7b72d81d6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala#L327-L339 https://github.com/apache/spark/blob/ca7200b0008dc6101a252020e6c34ef7b72d81d6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala#L619-L627 https://github.com/apache/spark/blob/ca7200b0008dc6101a252020e6c34ef7b72d81d6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala#L195-L201 https://github.com/apache/spark/blob/ca7200b0008dc6101a252020e6c34ef7b72d81d6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala#L208-L223 This PR fixes the outer iterators as late evaluation to ensure all updates on processing input rows are reflected "before" outer iterators are initialized. ### Why are the changes needed? The bug is described in above section. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New UT added. Closes #36002 from HeartSaVioR/SPARK-38684. Authored-by: Jungtaek Lim Signed-off-by: Jungtaek Lim --- .../streaming/StreamingSymmetricHashJoinExec.scala | 81 -- .../spark/sql/streaming/StreamingJoinSuite.scala | 63 - 2 files changed, 121 insertions(+), 23 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala index 81888e0..aa888c1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala @@ -324,17 +324,22 @@ case class StreamingSymmetricHashJoinExec( } } +val initIterFn = { () => + val removedRowIter = leftSideJoiner.removeOldState() + removedRowIter.filterNot { kv => +stateFormatVersion match { + case 1 => matchesWithRightSideState(new UnsafeRowPair(kv.key, kv.value)) + case 2 => kv.matched + case _ => throwBadStateFormatVersionException() +} + }.map(pair => joinedRow.withLeft(pair.value).withRight(nullRight)) +} + // NOTE: we need to make sure `outerOutputIter` is evaluated "after" exhausting all of -// elements in `innerOutputIter`, because evaluation of