[jira] [Resolved] (SPARK-39771) If spark.default.parallelism is unset, RDD defaultPartitioner may pick a value that is too large to successfully run
[ https://issues.apache.org/jira/browse/SPARK-39771?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuanjian Li resolved SPARK-39771. - Fix Version/s: 4.0.0 Resolution: Fixed Issue resolved by pull request 45266 [https://github.com/apache/spark/pull/45266] > If spark.default.parallelism is unset, RDD defaultPartitioner may pick a > value that is too large to successfully run > > > Key: SPARK-39771 > URL: https://issues.apache.org/jira/browse/SPARK-39771 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 1.0.0 >Reporter: Josh Rosen >Priority: Major > Labels: pull-request-available > Fix For: 4.0.0 > > > [According to its > docs|https://github.com/apache/spark/blob/899f6c90eb2de5b46a36710a131d7417010ce4b3/core/src/main/scala/org/apache/spark/Partitioner.scala#L45-L65], > {{Partitioner.defaultPartitioner}} will use the maximum number of RDD > partitions as its partition count when {{spark.default.parallelism}} is not > set. If that number of upstream partitions is very large then this can result > in shuffles where {{{}numMappers * numReducers = numMappers^2{}}}, which can > cause various problems that prevent the job from successfully running. > To help users identify when they have run into this problem, I think we > should add warning logs to Spark. > As an example of the problem, let's say that I have an RDD with 100,000 > partitions and then do a {{reduceByKey}} on it without specifying an explicit > partitioner or partition count. In this case, Spark will plan a reduce stage > with 100,000 partitions: > {code:java} > scala> sc.parallelize(1 to 10, 10).map(x => (x, x)).reduceByKey(_ + > _).toDebugString > res7: String = > (10) ShuffledRDD[21] at reduceByKey at :25 [] >+-(10) MapPartitionsRDD[20] at map at :25 [] > | ParallelCollectionRDD[19] at parallelize at :25 [] > {code} > This results in the creation of 10 billion shuffle blocks, so if this job > _does_ run it is likely to be extremely show. However, it's more likely that > the driver will crash when serializing map output statuses: if we were able > to use one bit per mapper / reducer pair (which is probably overly optimistic > in terms of compressibility) then the map statuses would be ~1.25 gigabytes > (and the actual size is probably much larger)! > I don't think that users are likely to intentionally wind up in this > scenario: it's more likely that either (a) their job depends on > {{spark.default.parallelism}} being set but it was run on an environment > lacking a value for that config, or (b) their input data significantly grew > in size. These scenarios may be rare, but they can be frustrating to debug > (especially if a failure occurs midway through a long-running job). > I think we should do something to handle this scenario. > A good starting point might be for {{Partitioner.defaultPartitioner}} to log > a warning when the default partition size exceeds some threshold. > In addition, I think it might be a good idea to log a similar warning in > {{MapOutputTrackerMaster}} right before we start trying to serialize map > statuses: in a real-world situation where this problem cropped up, the map > stage ran successfully but the driver crashed when serializing map statuses. > Putting a warning about partition counts here makes it more likely that users > will spot that error in the logs and be able to identify the source of the > problem (compared to a warning that appears much earlier in the job and > therefore much farther from the likely site of a crash). -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-45082) Review and fix issues in API docs
[ https://issues.apache.org/jira/browse/SPARK-45082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuanjian Li updated SPARK-45082: Fix Version/s: (was: 3.1.1) > Review and fix issues in API docs > - > > Key: SPARK-45082 > URL: https://issues.apache.org/jira/browse/SPARK-45082 > Project: Spark > Issue Type: Improvement > Components: Documentation >Affects Versions: 3.5.0 >Reporter: Yuanjian Li >Assignee: Yuanjian Li >Priority: Major > > Compare the 3.4 API doc with the 3.5 RC3 cut. Fix the following issues: > * Remove the leaking class/object in API doc -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-45082) Review and fix issues in API docs
[ https://issues.apache.org/jira/browse/SPARK-45082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuanjian Li updated SPARK-45082: Description: Compare the 3.4 API doc with the 3.5 RC3 cut. Fix the following issues: * Remove the leaking class/object in API doc was: Compare the 3.1.1 API doc with the latest release version 3.0.1. Fix the following issues: * Add missing `Since` annotation for new APIs * Remove the leaking class/object in API doc > Review and fix issues in API docs > - > > Key: SPARK-45082 > URL: https://issues.apache.org/jira/browse/SPARK-45082 > Project: Spark > Issue Type: Improvement > Components: Documentation >Affects Versions: 3.5.0 >Reporter: Yuanjian Li >Assignee: Yuanjian Li >Priority: Major > Fix For: 3.1.1 > > > Compare the 3.4 API doc with the 3.5 RC3 cut. Fix the following issues: > * Remove the leaking class/object in API doc -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-45082) Review and fix issues in API docs
Yuanjian Li created SPARK-45082: --- Summary: Review and fix issues in API docs Key: SPARK-45082 URL: https://issues.apache.org/jira/browse/SPARK-45082 Project: Spark Issue Type: Improvement Components: Documentation Affects Versions: 3.1.1 Reporter: Yuanjian Li Assignee: Yuanjian Li Fix For: 3.1.1 Compare the 3.1.1 API doc with the latest release version 3.0.1. Fix the following issues: * Add missing `Since` annotation for new APIs * Remove the leaking class/object in API doc -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-45082) Review and fix issues in API docs
[ https://issues.apache.org/jira/browse/SPARK-45082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuanjian Li updated SPARK-45082: Affects Version/s: 3.5.0 (was: 3.1.1) > Review and fix issues in API docs > - > > Key: SPARK-45082 > URL: https://issues.apache.org/jira/browse/SPARK-45082 > Project: Spark > Issue Type: Improvement > Components: Documentation >Affects Versions: 3.5.0 >Reporter: Yuanjian Li >Assignee: Yuanjian Li >Priority: Major > Fix For: 3.1.1 > > > Compare the 3.1.1 API doc with the latest release version 3.0.1. Fix the > following issues: > * Add missing `Since` annotation for new APIs > * Remove the leaking class/object in API doc -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-44471) Add Github action test job for branch-3.5
Yuanjian Li created SPARK-44471: --- Summary: Add Github action test job for branch-3.5 Key: SPARK-44471 URL: https://issues.apache.org/jira/browse/SPARK-44471 Project: Spark Issue Type: Task Components: Project Infra Affects Versions: 3.5.0 Reporter: Yuanjian Li Assignee: Yuanjian Li Fix For: 3.5.0 -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-44470) Setting version to 4.0.0-SNAPSHOT
[ https://issues.apache.org/jira/browse/SPARK-44470?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuanjian Li updated SPARK-44470: Description: Start to prepare Apache Spark 4.0.0 and the published snapshot version should not conflict with branch-3.5. (was: Start to prepare Apache Spark 3.5.0 and the published snapshot version should not conflict with branch-3.4.) > Setting version to 4.0.0-SNAPSHOT > - > > Key: SPARK-44470 > URL: https://issues.apache.org/jira/browse/SPARK-44470 > Project: Spark > Issue Type: Task > Components: Build >Affects Versions: 3.5.0 >Reporter: Yuanjian Li >Assignee: Yuanjian Li >Priority: Major > Fix For: 3.5.0 > > > Start to prepare Apache Spark 4.0.0 and the published snapshot version should > not conflict with branch-3.5. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-44470) Setting version to 4.0.0-SNAPSHOT
[ https://issues.apache.org/jira/browse/SPARK-44470?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuanjian Li reassigned SPARK-44470: --- Assignee: Yuanjian Li (was: Xinrong Meng) > Setting version to 4.0.0-SNAPSHOT > - > > Key: SPARK-44470 > URL: https://issues.apache.org/jira/browse/SPARK-44470 > Project: Spark > Issue Type: Task > Components: Build >Affects Versions: 3.5.0 >Reporter: Yuanjian Li >Assignee: Yuanjian Li >Priority: Major > Fix For: 3.5.0 > > > Start to prepare Apache Spark 3.5.0 and the published snapshot version should > not conflict with branch-3.4. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-44470) Setting version to 4.0.0-SNAPSHOT
[ https://issues.apache.org/jira/browse/SPARK-44470?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuanjian Li updated SPARK-44470: Parent: (was: SPARK-42523) Issue Type: Task (was: Sub-task) > Setting version to 4.0.0-SNAPSHOT > - > > Key: SPARK-44470 > URL: https://issues.apache.org/jira/browse/SPARK-44470 > Project: Spark > Issue Type: Task > Components: Build >Affects Versions: 3.5.0 >Reporter: Yuanjian Li >Assignee: Xinrong Meng >Priority: Major > Fix For: 3.5.0 > > > Start to prepare Apache Spark 3.5.0 and the published snapshot version should > not conflict with branch-3.4. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-44470) Setting version to 4.0.0-SNAPSHOT
Yuanjian Li created SPARK-44470: --- Summary: Setting version to 4.0.0-SNAPSHOT Key: SPARK-44470 URL: https://issues.apache.org/jira/browse/SPARK-44470 Project: Spark Issue Type: Sub-task Components: Build Affects Versions: 3.5.0 Reporter: Yuanjian Li Assignee: Xinrong Meng Fix For: 3.5.0 Start to prepare Apache Spark 3.5.0 and the published snapshot version should not conflict with branch-3.4. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-39371) Review and fix issues in Scala/Java API docs of Core module
Yuanjian Li created SPARK-39371: --- Summary: Review and fix issues in Scala/Java API docs of Core module Key: SPARK-39371 URL: https://issues.apache.org/jira/browse/SPARK-39371 Project: Spark Issue Type: Task Components: Spark Core Affects Versions: 3.3.0 Reporter: Yuanjian Li -- This message was sent by Atlassian Jira (v8.20.7#820007) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-38204) All state operators are at a risk of inconsistency between state partitioning and operator partitioning
[ https://issues.apache.org/jira/browse/SPARK-38204?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuanjian Li resolved SPARK-38204. - Fix Version/s: 3.3.0 Resolution: Fixed Issue resolved by pull request 35673 [https://github.com/apache/spark/pull/35673] > All state operators are at a risk of inconsistency between state partitioning > and operator partitioning > --- > > Key: SPARK-38204 > URL: https://issues.apache.org/jira/browse/SPARK-38204 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.2.3, 2.3.4, 2.4.8, 3.0.3, 3.1.2, 3.2.1, 3.3.0 >Reporter: Jungtaek Lim >Assignee: Jungtaek Lim >Priority: Blocker > Labels: correctness > Fix For: 3.3.0 > > > Except stream-stream join, all stateful operators use ClusteredDistribution > as a requirement of child distribution. > ClusteredDistribution is very relaxed one - any output partitioning can > satisfy the distribution if the partitioning can ensure all tuples having > same grouping keys are placed in same partition. > To illustrate an example, support we do streaming aggregation like below code: > {code:java} > df > .withWatermark("timestamp", "30 minutes") > .groupBy("group1", "group2", window("timestamp", "10 minutes")) > .agg(count("*")) {code} > In the code, streaming aggregation operator will be involved in physical > plan, which would have ClusteredDistribution("group1", "group2", "window"). > The problem is, various output partitionings can satisfy this distribution: > * RangePartitioning > ** This accepts exact and subset of the grouping key, with any order of keys > (combination), with any sort order (asc/desc) > * HashPartitioning > ** This accepts exact and subset of the grouping key, with any order of keys > (combination) > * (upcoming Spark 3.3.0+) DataSourcePartitioning > ** output partitioning provided by data source will be able to satisfy > ClusteredDistribution, which will make things worse (assuming data source can > provide different output partitioning relatively easier) > e.g. even we only consider HashPartitioning, HashPartitioning("group1"), > HashPartitioning("group2"), HashPartitioning("group1", "group2"), > HashPartitioning("group2", "group1"), HashPartitioning("group1", "group2", > "window"), etc. > The requirement of state partitioning is much more strict, since we should > not change the partitioning once it is partitioned and built. *It should > ensure that all tuples having same grouping keys are placed in same partition > (same partition ID) across query lifetime.* > *The impedance of distribution requirement between ClusteredDistribution and > state partitioning leads correctness issue silently.* > For example, let's assume we have a streaming query like below: > {code:java} > df > .withWatermark("timestamp", "30 minutes") > .repartition("group2") > .groupBy("group1", "group2", window("timestamp", "10 minutes")) > .agg(count("*")) {code} > repartition("group2") satisfies ClusteredDistribution("group1", "group2", > "window"), so Spark won't introduce additional shuffle there, and state > partitioning would be HashPartitioning("group2"). > we run this query for a while, and stop the query, and change the manual > partitioning like below: > {code:java} > df > .withWatermark("timestamp", "30 minutes") > .repartition("group1") > .groupBy("group1", "group2", window("timestamp", "10 minutes")) > .agg(count("*")) {code} > repartition("group1") also satisfies ClusteredDistribution("group1", > "group2", "window"), so Spark won't introduce additional shuffle there. That > said, child output partitioning of streaming aggregation operator would be > HashPartitioning("group1"), whereas state partitioning is > HashPartitioning("group2"). > [https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovery-semantics-after-changes-in-a-streaming-query] > In SS guide doc we enumerate the unsupported modifications of the query > during the lifetime of streaming query, but there is no notion of this. > Making this worse, Spark doesn't store any information on state partitioning > (that said, there is no way to validate), so *Spark simply allows this change > and brings up correctness issue while the streaming query runs like no > problem at all.* The only way to indicate the correctness is from the result > of the query. > We have no idea whether end users already suffer from this in their queries > or not. *The only way to look into is to list up all state rows and apply > hash function with expected grouping keys, and confirm all rows provide the > exact partition ID where they are in.* If it turns out as broken, we will >
[jira] [Assigned] (SPARK-38204) All state operators are at a risk of inconsistency between state partitioning and operator partitioning
[ https://issues.apache.org/jira/browse/SPARK-38204?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuanjian Li reassigned SPARK-38204: --- Assignee: Jungtaek Lim > All state operators are at a risk of inconsistency between state partitioning > and operator partitioning > --- > > Key: SPARK-38204 > URL: https://issues.apache.org/jira/browse/SPARK-38204 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.2.3, 2.3.4, 2.4.8, 3.0.3, 3.1.2, 3.2.1, 3.3.0 >Reporter: Jungtaek Lim >Assignee: Jungtaek Lim >Priority: Blocker > Labels: correctness > > Except stream-stream join, all stateful operators use ClusteredDistribution > as a requirement of child distribution. > ClusteredDistribution is very relaxed one - any output partitioning can > satisfy the distribution if the partitioning can ensure all tuples having > same grouping keys are placed in same partition. > To illustrate an example, support we do streaming aggregation like below code: > {code:java} > df > .withWatermark("timestamp", "30 minutes") > .groupBy("group1", "group2", window("timestamp", "10 minutes")) > .agg(count("*")) {code} > In the code, streaming aggregation operator will be involved in physical > plan, which would have ClusteredDistribution("group1", "group2", "window"). > The problem is, various output partitionings can satisfy this distribution: > * RangePartitioning > ** This accepts exact and subset of the grouping key, with any order of keys > (combination), with any sort order (asc/desc) > * HashPartitioning > ** This accepts exact and subset of the grouping key, with any order of keys > (combination) > * (upcoming Spark 3.3.0+) DataSourcePartitioning > ** output partitioning provided by data source will be able to satisfy > ClusteredDistribution, which will make things worse (assuming data source can > provide different output partitioning relatively easier) > e.g. even we only consider HashPartitioning, HashPartitioning("group1"), > HashPartitioning("group2"), HashPartitioning("group1", "group2"), > HashPartitioning("group2", "group1"), HashPartitioning("group1", "group2", > "window"), etc. > The requirement of state partitioning is much more strict, since we should > not change the partitioning once it is partitioned and built. *It should > ensure that all tuples having same grouping keys are placed in same partition > (same partition ID) across query lifetime.* > *The impedance of distribution requirement between ClusteredDistribution and > state partitioning leads correctness issue silently.* > For example, let's assume we have a streaming query like below: > {code:java} > df > .withWatermark("timestamp", "30 minutes") > .repartition("group2") > .groupBy("group1", "group2", window("timestamp", "10 minutes")) > .agg(count("*")) {code} > repartition("group2") satisfies ClusteredDistribution("group1", "group2", > "window"), so Spark won't introduce additional shuffle there, and state > partitioning would be HashPartitioning("group2"). > we run this query for a while, and stop the query, and change the manual > partitioning like below: > {code:java} > df > .withWatermark("timestamp", "30 minutes") > .repartition("group1") > .groupBy("group1", "group2", window("timestamp", "10 minutes")) > .agg(count("*")) {code} > repartition("group1") also satisfies ClusteredDistribution("group1", > "group2", "window"), so Spark won't introduce additional shuffle there. That > said, child output partitioning of streaming aggregation operator would be > HashPartitioning("group1"), whereas state partitioning is > HashPartitioning("group2"). > [https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovery-semantics-after-changes-in-a-streaming-query] > In SS guide doc we enumerate the unsupported modifications of the query > during the lifetime of streaming query, but there is no notion of this. > Making this worse, Spark doesn't store any information on state partitioning > (that said, there is no way to validate), so *Spark simply allows this change > and brings up correctness issue while the streaming query runs like no > problem at all.* The only way to indicate the correctness is from the result > of the query. > We have no idea whether end users already suffer from this in their queries > or not. *The only way to look into is to list up all state rows and apply > hash function with expected grouping keys, and confirm all rows provide the > exact partition ID where they are in.* If it turns out as broken, we will > have to have a tool to “re”partition the state correctly, or in worst case, > have to ask throwing out checkpoint and reprocess. >
[jira] [Assigned] (SPARK-37970) Introduce a new interface on streaming data source to notify the latest seen offset
[ https://issues.apache.org/jira/browse/SPARK-37970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuanjian Li reassigned SPARK-37970: --- Assignee: Jungtaek Lim > Introduce a new interface on streaming data source to notify the latest seen > offset > --- > > Key: SPARK-37970 > URL: https://issues.apache.org/jira/browse/SPARK-37970 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming >Affects Versions: 3.3.0 >Reporter: Jungtaek Lim >Assignee: Jungtaek Lim >Priority: Major > > We figure out the case of streaming data source that knowing the latest seen > offset when restarting query would be handy and useful to implement some > feature. One useful case is enabling the data source to track the offset by > itself, for the case where the external storage of data source is not > exposing any API to provide the latest available offset. > We will propose a new interface on streaming data source, which indicates > Spark to give the latest seen offset whenever the query is being restarted. > For the first start of the query, the initial offset of the data source > should be retrieved from calling initialOffset. -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-37970) Introduce a new interface on streaming data source to notify the latest seen offset
[ https://issues.apache.org/jira/browse/SPARK-37970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuanjian Li resolved SPARK-37970. - Fix Version/s: 3.3.0 Resolution: Fixed Issue resolved by pull request 35259 [https://github.com/apache/spark/pull/35259] > Introduce a new interface on streaming data source to notify the latest seen > offset > --- > > Key: SPARK-37970 > URL: https://issues.apache.org/jira/browse/SPARK-37970 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming >Affects Versions: 3.3.0 >Reporter: Jungtaek Lim >Assignee: Jungtaek Lim >Priority: Major > Fix For: 3.3.0 > > > We figure out the case of streaming data source that knowing the latest seen > offset when restarting query would be handy and useful to implement some > feature. One useful case is enabling the data source to track the offset by > itself, for the case where the external storage of data source is not > exposing any API to provide the latest available offset. > We will propose a new interface on streaming data source, which indicates > Spark to give the latest seen offset whenever the query is being restarted. > For the first start of the query, the initial offset of the data source > should be retrieved from calling initialOffset. -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-36649) Support Trigger.AvailableNow on Kafka data source
[ https://issues.apache.org/jira/browse/SPARK-36649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuanjian Li resolved SPARK-36649. - Fix Version/s: 3.3.0 Resolution: Fixed Issue resolved by pull request 35238 [https://github.com/apache/spark/pull/35238] > Support Trigger.AvailableNow on Kafka data source > - > > Key: SPARK-36649 > URL: https://issues.apache.org/jira/browse/SPARK-36649 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 3.3.0 >Reporter: Jungtaek Lim >Priority: Major > Fix For: 3.3.0 > > > SPARK-36533 introduces a new trigger Trigger.AvailableNow, but only > introduces the new functionality to the file stream source. Given that Kafka > data source is the one of major data sources being used in streaming query, > we should make Kafka data source support this. -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-36041) Introduce the RocksDBStateStoreProvider in the programming guide
[ https://issues.apache.org/jira/browse/SPARK-36041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17395839#comment-17395839 ] Yuanjian Li commented on SPARK-36041: - [~Gengliang.Wang] Thanks for reminding, PR submitted. > Introduce the RocksDBStateStoreProvider in the programming guide > > > Key: SPARK-36041 > URL: https://issues.apache.org/jira/browse/SPARK-36041 > Project: Spark > Issue Type: Sub-task > Components: Structured Streaming >Affects Versions: 3.2.0 >Reporter: Yuanjian Li >Priority: Blocker > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-34198) Add RocksDB StateStore implementation
[ https://issues.apache.org/jira/browse/SPARK-34198?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17395822#comment-17395822 ] Yuanjian Li commented on SPARK-34198: - [~Gengliang.Wang] Thanks for reminding. I'll submit the document PR now. > Add RocksDB StateStore implementation > - > > Key: SPARK-34198 > URL: https://issues.apache.org/jira/browse/SPARK-34198 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming >Affects Versions: 3.2.0 >Reporter: L. C. Hsieh >Priority: Major > > Currently Spark SS only has one built-in StateStore implementation > HDFSBackedStateStore. Actually it uses in-memory map to store state rows. As > there are more and more streaming applications, some of them requires to use > large state in stateful operations such as streaming aggregation and join. > Several other major streaming frameworks already use RocksDB for state > management. So it is proven to be good choice for large state usage. But > Spark SS still lacks of a built-in state store for the requirement. > We would like to explore the possibility to add RocksDB-based StateStore into > Spark SS. > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-36347) Upgrade the RocksDB version to 6.20.3
Yuanjian Li created SPARK-36347: --- Summary: Upgrade the RocksDB version to 6.20.3 Key: SPARK-36347 URL: https://issues.apache.org/jira/browse/SPARK-36347 Project: Spark Issue Type: Sub-task Components: Structured Streaming Affects Versions: 3.2.0 Reporter: Yuanjian Li As the discussion in [https://github.com/apache/spark/pull/32928/files#r654049392,] after confirming the compatibility, we can use a newer RocksDB version for the state store implementation. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-10816) EventTime based sessionization (session window)
[ https://issues.apache.org/jira/browse/SPARK-10816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17384586#comment-17384586 ] Yuanjian Li commented on SPARK-10816: - Thrilled to see this issue got resolved finally! Thank you all! [~kabhwan] [~viirya] > EventTime based sessionization (session window) > --- > > Key: SPARK-10816 > URL: https://issues.apache.org/jira/browse/SPARK-10816 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming >Reporter: Reynold Xin >Assignee: Jungtaek Lim >Priority: Major > Fix For: 3.2.0 > > Attachments: SPARK-10816 Support session window natively.pdf, Session > Window Support For Structure Streaming.pdf > > > Currently structured streaming supports two kinds of windows: tumbling window > and sliding window. Another useful window function is session window. Which > is not supported by SS. > Unlike time window (tumbling window and sliding window), session window > doesn't have static window begin and end time. Session window creation > depends on defined session gap which can be static or dynamic. > For static session gap, the events which are falling in a certain period of > time (gap) are considered as a session window. A session window closes when > it does not receive events for the gap. For dynamic gap, the gap could be > changed from event to event. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-36041) Introduce the RocksDBStateStoreProvider in the programming guide
Yuanjian Li created SPARK-36041: --- Summary: Introduce the RocksDBStateStoreProvider in the programming guide Key: SPARK-36041 URL: https://issues.apache.org/jira/browse/SPARK-36041 Project: Spark Issue Type: Sub-task Components: Structured Streaming Affects Versions: 3.2.0 Reporter: Yuanjian Li -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-35988) The implementation for RocksDBStateStoreProvider
Yuanjian Li created SPARK-35988: --- Summary: The implementation for RocksDBStateStoreProvider Key: SPARK-35988 URL: https://issues.apache.org/jira/browse/SPARK-35988 Project: Spark Issue Type: Sub-task Components: Structured Streaming Affects Versions: 3.2.0 Reporter: Yuanjian Li Add the implementation for the RocksDBStateStoreProvider. It's the subclass of StateStoreProvider that leverages all the functionalities implemented in the RocksDB instance. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-34198) Add RocksDB StateStore implementation
[ https://issues.apache.org/jira/browse/SPARK-34198?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17364361#comment-17364361 ] Yuanjian Li commented on SPARK-34198: - Based on the discussion in the community, I just changed the title and description for this Jira. All the major sub-tasks and PRs have been submitted. > Add RocksDB StateStore implementation > - > > Key: SPARK-34198 > URL: https://issues.apache.org/jira/browse/SPARK-34198 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming >Affects Versions: 3.2.0 >Reporter: L. C. Hsieh >Priority: Major > > Currently Spark SS only has one built-in StateStore implementation > HDFSBackedStateStore. Actually it uses in-memory map to store state rows. As > there are more and more streaming applications, some of them requires to use > large state in stateful operations such as streaming aggregation and join. > Several other major streaming frameworks already use RocksDB for state > management. So it is proven to be good choice for large state usage. But > Spark SS still lacks of a built-in state store for the requirement. > We would like to explore the possibility to add RocksDB-based StateStore into > Spark SS. > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-34198) Add RocksDB StateStore implementation
[ https://issues.apache.org/jira/browse/SPARK-34198?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuanjian Li updated SPARK-34198: Summary: Add RocksDB StateStore implementation (was: Add RocksDB StateStore as external module) > Add RocksDB StateStore implementation > - > > Key: SPARK-34198 > URL: https://issues.apache.org/jira/browse/SPARK-34198 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming >Affects Versions: 3.2.0 >Reporter: L. C. Hsieh >Priority: Major > > Currently Spark SS only has one built-in StateStore implementation > HDFSBackedStateStore. Actually it uses in-memory map to store state rows. As > there are more and more streaming applications, some of them requires to use > large state in stateful operations such as streaming aggregation and join. > Several other major streaming frameworks already use RocksDB for state > management. So it is proven to be good choice for large state usage. But > Spark SS still lacks of a built-in state store for the requirement. > We would like to explore the possibility to add RocksDB-based StateStore into > Spark SS. > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-34198) Add RocksDB StateStore as external module
[ https://issues.apache.org/jira/browse/SPARK-34198?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuanjian Li updated SPARK-34198: Description: Currently Spark SS only has one built-in StateStore implementation HDFSBackedStateStore. Actually it uses in-memory map to store state rows. As there are more and more streaming applications, some of them requires to use large state in stateful operations such as streaming aggregation and join. Several other major streaming frameworks already use RocksDB for state management. So it is proven to be good choice for large state usage. But Spark SS still lacks of a built-in state store for the requirement. We would like to explore the possibility to add RocksDB-based StateStore into Spark SS. was: Currently Spark SS only has one built-in StateStore implementation HDFSBackedStateStore. Actually it uses in-memory map to store state rows. As there are more and more streaming applications, some of them requires to use large state in stateful operations such as streaming aggregation and join. Several other major streaming frameworks already use RocksDB for state management. So it is proven to be good choice for large state usage. But Spark SS still lacks of a built-in state store for the requirement. We would like to explore the possibility to add RocksDB-based StateStore into Spark SS. For the concern about adding RocksDB as a direct dependency, our plan is to add this StateStore as an external module first. > Add RocksDB StateStore as external module > - > > Key: SPARK-34198 > URL: https://issues.apache.org/jira/browse/SPARK-34198 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming >Affects Versions: 3.2.0 >Reporter: L. C. Hsieh >Priority: Major > > Currently Spark SS only has one built-in StateStore implementation > HDFSBackedStateStore. Actually it uses in-memory map to store state rows. As > there are more and more streaming applications, some of them requires to use > large state in stateful operations such as streaming aggregation and join. > Several other major streaming frameworks already use RocksDB for state > management. So it is proven to be good choice for large state usage. But > Spark SS still lacks of a built-in state store for the requirement. > We would like to explore the possibility to add RocksDB-based StateStore into > Spark SS. > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-35788) Metrics support for RocksDB instance
Yuanjian Li created SPARK-35788: --- Summary: Metrics support for RocksDB instance Key: SPARK-35788 URL: https://issues.apache.org/jira/browse/SPARK-35788 Project: Spark Issue Type: Sub-task Components: Structured Streaming Affects Versions: 3.2.0 Reporter: Yuanjian Li Add more metrics for the RocksDB instance. We transform the native states from RocksDB. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-35785) Cleanup support for RocksDB instance
Yuanjian Li created SPARK-35785: --- Summary: Cleanup support for RocksDB instance Key: SPARK-35785 URL: https://issues.apache.org/jira/browse/SPARK-35785 Project: Spark Issue Type: Sub-task Components: Structured Streaming Affects Versions: 3.2.0 Reporter: Yuanjian Li Add the functionality of cleaning up files of old versions for the RocksDB instance and RocksDBFileManager. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-35784) Implementation for RocksDB instance
Yuanjian Li created SPARK-35784: --- Summary: Implementation for RocksDB instance Key: SPARK-35784 URL: https://issues.apache.org/jira/browse/SPARK-35784 Project: Spark Issue Type: Sub-task Components: Structured Streaming Affects Versions: 3.2.0 Reporter: Yuanjian Li The implementation for the RocksDB instance which is used in the RocksDB state store. It plays a role as a handler for the RocksDB instance and RocksDBFileManager. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-35628) RocksDBFileManager - load checkpoint from DFS
Yuanjian Li created SPARK-35628: --- Summary: RocksDBFileManager - load checkpoint from DFS Key: SPARK-35628 URL: https://issues.apache.org/jira/browse/SPARK-35628 Project: Spark Issue Type: Sub-task Components: Structured Streaming Affects Versions: 3.2.0 Reporter: Yuanjian Li The implementation for the load path of the checkpoint data. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-35436) RocksDBFileManager - save checkpoint to DFS
Yuanjian Li created SPARK-35436: --- Summary: RocksDBFileManager - save checkpoint to DFS Key: SPARK-35436 URL: https://issues.apache.org/jira/browse/SPARK-35436 Project: Spark Issue Type: Sub-task Components: Structured Streaming Affects Versions: 3.2.0 Reporter: Yuanjian Li The implementation for the save operation of RocksDBFileManager. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-34198) Add RocksDB StateStore as external module
[ https://issues.apache.org/jira/browse/SPARK-34198?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17329276#comment-17329276 ] Yuanjian Li commented on SPARK-34198: - [~viirya] [~kabhwan] Make sense, I should bring it to the community to confirm and list the reasons. I thought it's not a decision but implementation details. > Add RocksDB StateStore as external module > - > > Key: SPARK-34198 > URL: https://issues.apache.org/jira/browse/SPARK-34198 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming >Affects Versions: 3.2.0 >Reporter: L. C. Hsieh >Priority: Major > > Currently Spark SS only has one built-in StateStore implementation > HDFSBackedStateStore. Actually it uses in-memory map to store state rows. As > there are more and more streaming applications, some of them requires to use > large state in stateful operations such as streaming aggregation and join. > Several other major streaming frameworks already use RocksDB for state > management. So it is proven to be good choice for large state usage. But > Spark SS still lacks of a built-in state store for the requirement. > We would like to explore the possibility to add RocksDB-based StateStore into > Spark SS. For the concern about adding RocksDB as a direct dependency, our > plan is to add this StateStore as an external module first. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-34198) Add RocksDB StateStore as external module
[ https://issues.apache.org/jira/browse/SPARK-34198?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17326427#comment-17326427 ] Yuanjian Li commented on SPARK-34198: - [~viirya] Since the RocksDBStateStore can solve the major drawbacks for the current HDFS based one. I think it's a better choice to directly add it as a build-in RocksDBStateStoreProvider. It's also convenient for the end-users to choose it directly. If you agree, I will change the description and the title for this ticket. > Add RocksDB StateStore as external module > - > > Key: SPARK-34198 > URL: https://issues.apache.org/jira/browse/SPARK-34198 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming >Affects Versions: 3.2.0 >Reporter: L. C. Hsieh >Priority: Major > > Currently Spark SS only has one built-in StateStore implementation > HDFSBackedStateStore. Actually it uses in-memory map to store state rows. As > there are more and more streaming applications, some of them requires to use > large state in stateful operations such as streaming aggregation and join. > Several other major streaming frameworks already use RocksDB for state > management. So it is proven to be good choice for large state usage. But > Spark SS still lacks of a built-in state store for the requirement. > We would like to explore the possibility to add RocksDB-based StateStore into > Spark SS. For the concern about adding RocksDB as a direct dependency, our > plan is to add this StateStore as an external module first. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-35172) The implementation of RocksDBCheckpointMetadata
[ https://issues.apache.org/jira/browse/SPARK-35172?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuanjian Li updated SPARK-35172: Summary: The implementation of RocksDBCheckpointMetadata (was: Implementation for RocksDBCheckpointMetadata) > The implementation of RocksDBCheckpointMetadata > --- > > Key: SPARK-35172 > URL: https://issues.apache.org/jira/browse/SPARK-35172 > Project: Spark > Issue Type: Sub-task > Components: Structured Streaming >Affects Versions: 3.2.0 >Reporter: Yuanjian Li >Priority: Major > > The RocksDBCheckpointMetadata persists the metadata for each committed batch > in JSON format. The object contains all RocksDB file names and the number of > total keys. > The metadata binds closely with the directory structure of > RocksDBFileManager, as described in the design doc - [Directory Structure and > Format for Files stored in > DFS|https://docs.google.com/document/d/10wVGaUorgPt4iVe4phunAcjU924fa3-_Kf29-2nxH6Y/edit#heading=h.zgvw85ijoz2] -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-35172) Implementation for RocksDBCheckpointMetadata
[ https://issues.apache.org/jira/browse/SPARK-35172?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuanjian Li updated SPARK-35172: Description: The RocksDBCheckpointMetadata persists the metadata for each committed batch in JSON format. The object contains all RocksDB file names and the number of total keys. The metadata binds closely with the directory structure of RocksDBFileManager, as described in the design doc - [Directory Structure and Format for Files stored in DFS|https://docs.google.com/document/d/10wVGaUorgPt4iVe4phunAcjU924fa3-_Kf29-2nxH6Y/edit#heading=h.zgvw85ijoz2] was: The RocksDBCheckpointMetadata persists the metadata for each committed batch in JSON format. The schema for the object contains all RocksDB file names and the number of total keys. The metadata binds closely with the directory structure of RocksDBFileManager, as described in the design doc - [Directory Structure and Format for Files stored in DFS|https://docs.google.com/document/d/10wVGaUorgPt4iVe4phunAcjU924fa3-_Kf29-2nxH6Y/edit#heading=h.zgvw85ijoz2] > Implementation for RocksDBCheckpointMetadata > > > Key: SPARK-35172 > URL: https://issues.apache.org/jira/browse/SPARK-35172 > Project: Spark > Issue Type: Sub-task > Components: Structured Streaming >Affects Versions: 3.2.0 >Reporter: Yuanjian Li >Priority: Major > > The RocksDBCheckpointMetadata persists the metadata for each committed batch > in JSON format. The object contains all RocksDB file names and the number of > total keys. > The metadata binds closely with the directory structure of > RocksDBFileManager, as described in the design doc - [Directory Structure and > Format for Files stored in > DFS|https://docs.google.com/document/d/10wVGaUorgPt4iVe4phunAcjU924fa3-_Kf29-2nxH6Y/edit#heading=h.zgvw85ijoz2] -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-35172) Implementation for RocksDBCheckpointMetadata
Yuanjian Li created SPARK-35172: --- Summary: Implementation for RocksDBCheckpointMetadata Key: SPARK-35172 URL: https://issues.apache.org/jira/browse/SPARK-35172 Project: Spark Issue Type: Sub-task Components: Structured Streaming Affects Versions: 3.2.0 Reporter: Yuanjian Li The RocksDBCheckpointMetadata persists the metadata for each committed batch in JSON format. The schema for the object contains all RocksDB file names and the number of total keys. The metadata binds closely with the directory structure of RocksDBFileManager, as described in the design doc - [Directory Structure and Format for Files stored in DFS|https://docs.google.com/document/d/10wVGaUorgPt4iVe4phunAcjU924fa3-_Kf29-2nxH6Y/edit#heading=h.zgvw85ijoz2] -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-35171) Declare the markdown package as a dependency of the SparkR package
Yuanjian Li created SPARK-35171: --- Summary: Declare the markdown package as a dependency of the SparkR package Key: SPARK-35171 URL: https://issues.apache.org/jira/browse/SPARK-35171 Project: Spark Issue Type: Improvement Components: R Affects Versions: 3.1.0 Reporter: Yuanjian Li If we didn't install pandoc locally, the make-distribution package will fail with the following message: {quote} --- re-building ‘sparkr-vignettes.Rmd’ using rmarkdown Warning in engine$weave(file, quiet = quiet, encoding = enc) : Pandoc (>= 1.12.3) not available. Falling back to R Markdown v1. Error: processing vignette 'sparkr-vignettes.Rmd' failed with diagnostics: The 'markdown' package should be declared as a dependency of the 'SparkR' package (e.g., in the 'Suggests' field of DESCRIPTION), because the latter contains vignette(s) built with the 'markdown' package. Please see https://github.com/yihui/knitr/issues/1864 for more information. --- failed re-building ‘sparkr-vignettes.Rmd’ {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-34526) Skip checking glob path in FileStreamSink.hasMetadata
[ https://issues.apache.org/jira/browse/SPARK-34526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuanjian Li updated SPARK-34526: Description: When checking the path in {{FileStreamSink.hasMetadata}}, we should ignore the error and assume the user wants to read a batch output. This is to keep the original behavior of ignoring the error. (was: Some users may use a very long glob path to read and `isDirectory` may fail when the path is too long. We should ignore the error when the path is a glob path since the file streaming sink doesn’t support glob paths.) > Skip checking glob path in FileStreamSink.hasMetadata > - > > Key: SPARK-34526 > URL: https://issues.apache.org/jira/browse/SPARK-34526 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 3.1.0 >Reporter: Yuanjian Li >Priority: Major > > When checking the path in {{FileStreamSink.hasMetadata}}, we should ignore > the error and assume the user wants to read a batch output. This is to keep > the original behavior of ignoring the error. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-34198) Add RocksDB StateStore as external module
[ https://issues.apache.org/jira/browse/SPARK-34198?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17325031#comment-17325031 ] Yuanjian Li commented on SPARK-34198: - Thanks all for joining the discussion! Databricks decided to donate the commercial implementation of the RocksDBStateStore implementation. I just linked the design doc for this. The code work will start this week. Please help with reviewing the design and PRs. Hope we can have this great feature in the 3.2 release! > Add RocksDB StateStore as external module > - > > Key: SPARK-34198 > URL: https://issues.apache.org/jira/browse/SPARK-34198 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming >Affects Versions: 3.2.0 >Reporter: L. C. Hsieh >Priority: Major > > Currently Spark SS only has one built-in StateStore implementation > HDFSBackedStateStore. Actually it uses in-memory map to store state rows. As > there are more and more streaming applications, some of them requires to use > large state in stateful operations such as streaming aggregation and join. > Several other major streaming frameworks already use RocksDB for state > management. So it is proven to be good choice for large state usage. But > Spark SS still lacks of a built-in state store for the requirement. > We would like to explore the possibility to add RocksDB-based StateStore into > Spark SS. For the concern about adding RocksDB as a direct dependency, our > plan is to add this StateStore as an external module first. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-34871) Move checkpoint resolving logic to the rule ResolveWriteToStream
Yuanjian Li created SPARK-34871: --- Summary: Move checkpoint resolving logic to the rule ResolveWriteToStream Key: SPARK-34871 URL: https://issues.apache.org/jira/browse/SPARK-34871 Project: Spark Issue Type: Bug Components: Structured Streaming Affects Versions: 3.2.0 Reporter: Yuanjian Li After SPARK-34748, we have a rule ResolveWriteToStream for the analysis logic for the resolving logic of stream write plans. Based on it, we can further move the checkpoint location resolving work in the rule. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-34526) Skip checking glob path in FileStreamSink.hasMetadata
[ https://issues.apache.org/jira/browse/SPARK-34526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuanjian Li updated SPARK-34526: Summary: Skip checking glob path in FileStreamSink.hasMetadata (was: Add a flag to skip checking file sink format and handle glob path) > Skip checking glob path in FileStreamSink.hasMetadata > - > > Key: SPARK-34526 > URL: https://issues.apache.org/jira/browse/SPARK-34526 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 3.1.0 >Reporter: Yuanjian Li >Priority: Major > > This ticket fixes the following issues related to file sink format checking > together: > * Some users may use a very long glob path to read and `isDirectory` may > fail when the path is too long. We should ignore the error when the path is a > glob path since the file streaming sink doesn’t support glob paths. > * Checking whether a directory is outputted by File Streaming Sink may fail > for various issues happening in the storage. We should add a flag to allow > users to disable the checking logic and read the directory as a batch output. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-34526) Skip checking glob path in FileStreamSink.hasMetadata
[ https://issues.apache.org/jira/browse/SPARK-34526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuanjian Li updated SPARK-34526: Description: Some users may use a very long glob path to read and `isDirectory` may fail when the path is too long. We should ignore the error when the path is a glob path since the file streaming sink doesn’t support glob paths. (was: This ticket fixes the following issues related to file sink format checking together: * Some users may use a very long glob path to read and `isDirectory` may fail when the path is too long. We should ignore the error when the path is a glob path since the file streaming sink doesn’t support glob paths. * Checking whether a directory is outputted by File Streaming Sink may fail for various issues happening in the storage. We should add a flag to allow users to disable the checking logic and read the directory as a batch output.) > Skip checking glob path in FileStreamSink.hasMetadata > - > > Key: SPARK-34526 > URL: https://issues.apache.org/jira/browse/SPARK-34526 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 3.1.0 >Reporter: Yuanjian Li >Priority: Major > > Some users may use a very long glob path to read and `isDirectory` may fail > when the path is too long. We should ignore the error when the path is a glob > path since the file streaming sink doesn’t support glob paths. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-34748) Create a rule of the analysis logic for streaming write
Yuanjian Li created SPARK-34748: --- Summary: Create a rule of the analysis logic for streaming write Key: SPARK-34748 URL: https://issues.apache.org/jira/browse/SPARK-34748 Project: Spark Issue Type: Bug Components: Structured Streaming Affects Versions: 3.2.0 Reporter: Yuanjian Li Currently, the analysis logic for streaming write is mixed in StreamingQueryManager. If we create a specific analyzer rule and separated logical plans, it should be helpful for further extension. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-34526) Add a flag to skip checking file sink format and handle glob path
[ https://issues.apache.org/jira/browse/SPARK-34526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuanjian Li updated SPARK-34526: Description: This ticket fixes the following issues related to file sink format checking together: * Some users may use a very long glob path to read and `isDirectory` may fail when the path is too long. We should ignore the error when the path is a glob path since the file streaming sink doesn’t support glob paths. * Checking whether a directory is outputted by File Streaming Sink may fail for various issues happening in the storage. We should add a flag to allow users to disable the checking logic and read the directory as a batch output. was: This ticket fixes the following issues related to file sink format checking together: * Some users may use a very long glob path to read and `isDirectory`{{}} may fail when the path is too long. We should ignore the error when the path is a glob path since file streaming sink doesn’t support glob paths. * Checking whether a directory is outputted by File Streaming Sink may fail for various issues happening in the storage. We should add a flag to allow users to disable it. > Add a flag to skip checking file sink format and handle glob path > - > > Key: SPARK-34526 > URL: https://issues.apache.org/jira/browse/SPARK-34526 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 3.1.0 >Reporter: Yuanjian Li >Priority: Major > > This ticket fixes the following issues related to file sink format checking > together: > * Some users may use a very long glob path to read and `isDirectory` may > fail when the path is too long. We should ignore the error when the path is a > glob path since the file streaming sink doesn’t support glob paths. > * Checking whether a directory is outputted by File Streaming Sink may fail > for various issues happening in the storage. We should add a flag to allow > users to disable the checking logic and read the directory as a batch output. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-34526) Add a flag to skip checking file sink format and handle glob path
Yuanjian Li created SPARK-34526: --- Summary: Add a flag to skip checking file sink format and handle glob path Key: SPARK-34526 URL: https://issues.apache.org/jira/browse/SPARK-34526 Project: Spark Issue Type: Bug Components: Structured Streaming Affects Versions: 3.1.0 Reporter: Yuanjian Li This ticket fixes the following issues related to file sink format checking together: * Some users may use a very long glob path to read and `isDirectory`{{}} may fail when the path is too long. We should ignore the error when the path is a glob path since file streaming sink doesn’t support glob paths. * Checking whether a directory is outputted by File Streaming Sink may fail for various issues happening in the storage. We should add a flag to allow users to disable it. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-10816) EventTime based sessionization (session window)
[ https://issues.apache.org/jira/browse/SPARK-10816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17287585#comment-17287585 ] Yuanjian Li commented on SPARK-10816: - Great thanks for your heads up! [~viirya] [~kabhwan] {quote}Now that there're two committers from different teams finding the feature as useful, looks like we could try pushing this out again. {quote} Big +1. Really excited to revive this feature with you. I'll also take some time to reload the old context soon. {quote}Probably the code size is different because the design is actually quite different {quote} That's right. From my roughly investigation, The main difference list below: * State store format design: As Shixiong described in [this comment|https://issues.apache.org/jira/browse/SPARK-10816?focusedCommentId=16645370=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16645370], my approach is easy to implement but not scale well in the case of non-numeric aggregate. * The structure of the physical plan node: Jungtaek's approach leverages the aggregation iterator. My approach reused the way of `WindowExec`. About authorship, really appreciate your trust [~kabhwan]! I can help with confirming with the co-authors. Comparing with other issues, I think this should be the easiest one and can be discussed at the end. :) > EventTime based sessionization (session window) > --- > > Key: SPARK-10816 > URL: https://issues.apache.org/jira/browse/SPARK-10816 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming >Reporter: Reynold Xin >Priority: Major > Attachments: SPARK-10816 Support session window natively.pdf, Session > Window Support For Structure Streaming.pdf > > > Currently structured streaming supports two kinds of windows: tumbling window > and sliding window. Another useful window function is session window. Which > is not supported by SS. > Unlike time window (tumbling window and sliding window), session window > doesn't have static window begin and end time. Session window creation > depends on defined session gap which can be static or dynamic. > For static session gap, the events which are falling in a certain period of > time (gap) are considered as a session window. A session window closes when > it does not receive events for the gap. For dynamic gap, the gap could be > changed from event to event. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-33638) Full support of V2 table creation in Structured Streaming writer path
[ https://issues.apache.org/jira/browse/SPARK-33638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17273283#comment-17273283 ] Yuanjian Li commented on SPARK-33638: - Ah yes, it should be 3.2. I'll work on this after the 3.1 release. > Full support of V2 table creation in Structured Streaming writer path > - > > Key: SPARK-33638 > URL: https://issues.apache.org/jira/browse/SPARK-33638 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 3.2.0 >Reporter: Yuanjian Li >Priority: Blocker > > Currently, we want to add support of creating if not exists in > DataStreamWriter.toTable API. Since the file format in streaming doesn't > support DSv2 for now, the current implementation mainly focuses on V1 > support. We need more work to do for the full support of V2 table creation. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-34235) Make spark.sql.hive as a private package
Yuanjian Li created SPARK-34235: --- Summary: Make spark.sql.hive as a private package Key: SPARK-34235 URL: https://issues.apache.org/jira/browse/SPARK-34235 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.1.1 Reporter: Yuanjian Li Following the comment [https://github.com/apache/spark/pull/31271#discussion_r562598983,] we need to make spark.sql.hive as a private package and remove them from the API documentation. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-34185) Review and fix issues in API docs
Yuanjian Li created SPARK-34185: --- Summary: Review and fix issues in API docs Key: SPARK-34185 URL: https://issues.apache.org/jira/browse/SPARK-34185 Project: Spark Issue Type: Improvement Components: docs Affects Versions: 3.1.1 Reporter: Yuanjian Li Compare the 3.1.1 API doc with the latest release version 3.0.1. Fix the following issues: * Add missing `Since` annotation for new APIs * Remove the leaking class/object in API doc -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-33915) Allow json expression to be pushable column
[ https://issues.apache.org/jira/browse/SPARK-33915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17256840#comment-17256840 ] Yuanjian Li commented on SPARK-33915: - +1 for the second one if it can pass all the tests. Feel free to submit a PR. I think it's an essential improvement. Also cc [~cloud_fan] > Allow json expression to be pushable column > --- > > Key: SPARK-33915 > URL: https://issues.apache.org/jira/browse/SPARK-33915 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.0.1 >Reporter: Ted Yu >Priority: Major > > Currently PushableColumnBase provides no support for json / jsonb expression. > Example of json expression: > {code} > get_json_object(phone, '$.code') = '1200' > {code} > If non-string literal is part of the expression, the presence of cast() would > complicate the situation. > Implication is that implementation of SupportsPushDownFilters doesn't have a > chance to perform pushdown even if third party DB engine supports json > expression pushdown. > This issue is for discussion and implementation of Spark core changes which > would allow json expression to be recognized as pushable column. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-33659) Document the current behavior for DataStreamWriter.toTable API
[ https://issues.apache.org/jira/browse/SPARK-33659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17243795#comment-17243795 ] Yuanjian Li commented on SPARK-33659: - I'm working on this. > Document the current behavior for DataStreamWriter.toTable API > -- > > Key: SPARK-33659 > URL: https://issues.apache.org/jira/browse/SPARK-33659 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 3.1.0 >Reporter: Yuanjian Li >Priority: Blocker > > Follow up work for SPARK-33577 and need to be done before the 3.1 release. As > we didn't have full support for the V2 table created in the API, the > following documentation work is needed: > * figure out the effects when configurations are (provider/partitionBy) > conflicting with existing table, and document in javadoc of {{toTable}}. I > think you'll need to make a matrix and describe which takes effect (table vs > input) - creating table vs table exists, DSv1 vs DSv2 (4 different situations > should be all documented). > * document the lack of functionality on creating v2 table in javadoc of > {{toTable}}, and guide that they should ensure table is created in prior to > avoid the behavior unintended/insufficient table is being created. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-33659) Document the current behavior for DataStreamWriter.toTable API
Yuanjian Li created SPARK-33659: --- Summary: Document the current behavior for DataStreamWriter.toTable API Key: SPARK-33659 URL: https://issues.apache.org/jira/browse/SPARK-33659 Project: Spark Issue Type: Improvement Components: Structured Streaming Affects Versions: 3.1.0 Reporter: Yuanjian Li Follow up work for SPARK-33577 and need to be done before the 3.1 release. As we didn't have full support for the V2 table created in the API, the following documentation work is needed: * figure out the effects when configurations are (provider/partitionBy) conflicting with existing table, and document in javadoc of {{toTable}}. I think you'll need to make a matrix and describe which takes effect (table vs input) - creating table vs table exists, DSv1 vs DSv2 (4 different situations should be all documented). * document the lack of functionality on creating v2 table in javadoc of {{toTable}}, and guide that they should ensure table is created in prior to avoid the behavior unintended/insufficient table is being created. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-33638) Full support of V2 table creation in DataStreamWriter.toTable API
[ https://issues.apache.org/jira/browse/SPARK-33638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuanjian Li updated SPARK-33638: Priority: Blocker (was: Major) > Full support of V2 table creation in DataStreamWriter.toTable API > - > > Key: SPARK-33638 > URL: https://issues.apache.org/jira/browse/SPARK-33638 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 3.1.0 >Reporter: Yuanjian Li >Priority: Blocker > > Currently, we want to add support of creating if not exists in > DataStreamWriter.toTable API. Since the file format in streaming doesn't > support DSv2 for now, the current implementation mainly focuses on V1 > support. We need more work to do for the full support of V2 table creation. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-33638) Full support of V2 table creation in DataStreamWriter.toTable API
[ https://issues.apache.org/jira/browse/SPARK-33638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuanjian Li updated SPARK-33638: Description: Currently, we want to add support of creating if not exists in DataStreamWriter.toTable API. Since the file format in streaming doesn't support DSv2 for now, the current implementation mainly focuses on V1 support. We need more work to do for the full support of V2 table creation. (was: Currently, we add support of creating if not exists in DataStreamWriter.toTable API. Since the file format in streaming doesn't support DSv2 for now, the current implementation mainly focuses on V1 support. We need more work to do for the full support of V2 table creation.) > Full support of V2 table creation in DataStreamWriter.toTable API > - > > Key: SPARK-33638 > URL: https://issues.apache.org/jira/browse/SPARK-33638 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 3.1.0 >Reporter: Yuanjian Li >Priority: Major > > Currently, we want to add support of creating if not exists in > DataStreamWriter.toTable API. Since the file format in streaming doesn't > support DSv2 for now, the current implementation mainly focuses on V1 > support. We need more work to do for the full support of V2 table creation. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-33638) Full support of V2 table creation in DataStreamWriter.toTable API
[ https://issues.apache.org/jira/browse/SPARK-33638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17243070#comment-17243070 ] Yuanjian Li commented on SPARK-33638: - [https://github.com/apache/spark/pull/30521#discussion_r533869095] For the partition column checking, different with V1 API, we can't pass the partition info directly in the V2 table now. To make the behavior consistent between V1 and V2, seems two ways we can consider: * Make it possible to pass the partition column info to DSv2 * Check the partition column conflict in API directly for both V1/V2 > Full support of V2 table creation in DataStreamWriter.toTable API > - > > Key: SPARK-33638 > URL: https://issues.apache.org/jira/browse/SPARK-33638 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 3.1.0 >Reporter: Yuanjian Li >Priority: Major > > Currently, we add support of creating if not exists in > DataStreamWriter.toTable API. Since the file format in streaming doesn't > support DSv2 for now, the current implementation mainly focuses on V1 > support. We need more work to do for the full support of V2 table creation. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-33638) Full support of V2 table creation in DataStreamWriter.toTable API
[ https://issues.apache.org/jira/browse/SPARK-33638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17242898#comment-17242898 ] Yuanjian Li commented on SPARK-33638: - There is already some discussion in the PR comments: [https://github.com/apache/spark/pull/30521] Since the file format in streaming doesn't support DSv2 for now, the current implementation mainly focuses on V1 support(per the [comment|https://github.com/apache/spark/pull/30521#issuecomment-737488404]). Also, for providing a convenient API, we added the support for creating a table if not exists in the API, which also focuses on the V1 table(per the [comment|https://github.com/apache/spark/pull/30521#issuecomment-736948942]). But as our initial rationalization of the stream table API described in the [comment|[https://github.com/apache/spark/pull/30521#issuecomment-737515816|https://github.com/apache/spark/pull/30521#issuecomment-737515816,]], the table API should support DSv2 table, which supports streaming writes. To unblock the current V1 support and add the new API, we create this ticket to let the V2 support on track. > Full support of V2 table creation in DataStreamWriter.toTable API > - > > Key: SPARK-33638 > URL: https://issues.apache.org/jira/browse/SPARK-33638 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 3.1.0 >Reporter: Yuanjian Li >Priority: Major > > Currently, we add support of creating if not exists in > DataStreamWriter.toTable API. Since the file format in streaming doesn't > support DSv2 for now, the current implementation mainly focuses on V1 > support. We need more work to do for the full support of V2 table creation. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-33638) Full support of V2 table creation in DataStreamWriter.toTable API
Yuanjian Li created SPARK-33638: --- Summary: Full support of V2 table creation in DataStreamWriter.toTable API Key: SPARK-33638 URL: https://issues.apache.org/jira/browse/SPARK-33638 Project: Spark Issue Type: Improvement Components: Structured Streaming Affects Versions: 3.1.0 Reporter: Yuanjian Li Currently, we add support of creating if not exists in DataStreamWriter.toTable API. Since the file format in streaming doesn't support DSv2 for now, the current implementation mainly focuses on V1 support. We need more work to do for the full support of V2 table creation. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-33577) Add support for V1Table in stream writer table API
[ https://issues.apache.org/jira/browse/SPARK-33577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuanjian Li updated SPARK-33577: Description: After SPARK-32896, we have table API for stream writer but only support DataSource v2 tables. Here we add the following enhancements: * Create non-existing tables by default * Support both managed and external V1Tables was: After SPARK-32896, we have table API for stream writer but only support DataSource v2 tables. Here we add the following supports: * Create non-existing tables by default * Support both managed and external V1Tables > Add support for V1Table in stream writer table API > -- > > Key: SPARK-33577 > URL: https://issues.apache.org/jira/browse/SPARK-33577 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 3.0.0 >Reporter: Yuanjian Li >Priority: Major > > After SPARK-32896, we have table API for stream writer but only support > DataSource v2 tables. Here we add the following enhancements: > * Create non-existing tables by default > * Support both managed and external V1Tables -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-33577) Add support for V1Table in stream writer table API
Yuanjian Li created SPARK-33577: --- Summary: Add support for V1Table in stream writer table API Key: SPARK-33577 URL: https://issues.apache.org/jira/browse/SPARK-33577 Project: Spark Issue Type: Improvement Components: Structured Streaming Affects Versions: 3.0.0 Reporter: Yuanjian Li After SPARK-32896, we have table API for stream writer but only support DataSource v2 tables. Here we add the following supports: * Create non-existing tables by default * Support both managed and external V1Tables -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-33244) Unify the code paths for spark.table and spark.read.table
[ https://issues.apache.org/jira/browse/SPARK-33244?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuanjian Li updated SPARK-33244: Description: * The code paths of `spark.table` and `spark.read.table` should be the same. This behavior is broke in SPARK-32592 since we need to respect options in `spark.read.table` API. * Add comments for `{{spark.table`}} to emphasize it also support streaming temp view reading was: * The code paths of `spark.table` and `spark.read.table` should be the same. This behavior is broke in SPARK-32592 since we need to respect options in `spark.read.table` API. * Add comment for `{{spark.table`}} to emphasize it also support streaming temp view reading > Unify the code paths for spark.table and spark.read.table > - > > Key: SPARK-33244 > URL: https://issues.apache.org/jira/browse/SPARK-33244 > Project: Spark > Issue Type: Improvement > Components: SQL, Structured Streaming >Affects Versions: 3.0.0 >Reporter: Yuanjian Li >Priority: Major > > * The code paths of `spark.table` and `spark.read.table` should be the same. > This behavior is broke in SPARK-32592 since we need to respect options in > `spark.read.table` API. > * Add comments for `{{spark.table`}} to emphasize it also support streaming > temp view reading -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-33244) Unify the code paths for spark.table and spark.read.table
[ https://issues.apache.org/jira/browse/SPARK-33244?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuanjian Li updated SPARK-33244: Description: * The code paths of `spark.table` and `spark.read.table` should be the same. This behavior is broke in SPARK-32592 since we need to respect options in `spark.read.table` API. * Add comment for `{{spark.table`}} to emphasize it also support streaming temp view reading was:The code paths of `spark.table` and `spark.read.table` should be the same. This behavior is broke in SPARK-32592 since we need to respect options in `spark.read.table` API. > Unify the code paths for spark.table and spark.read.table > - > > Key: SPARK-33244 > URL: https://issues.apache.org/jira/browse/SPARK-33244 > Project: Spark > Issue Type: Improvement > Components: SQL, Structured Streaming >Affects Versions: 3.0.0 >Reporter: Yuanjian Li >Priority: Major > > * The code paths of `spark.table` and `spark.read.table` should be the same. > This behavior is broke in SPARK-32592 since we need to respect options in > `spark.read.table` API. > * Add comment for `{{spark.table`}} to emphasize it also support streaming > temp view reading -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-33244) Unify the code paths for spark.table and spark.read.table
[ https://issues.apache.org/jira/browse/SPARK-33244?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuanjian Li updated SPARK-33244: Description: The code paths of `spark.table` and `spark.read.table` should be the same. This behavior is broke in SPARK-32592 since we need to respect options in `spark.read.table` API. (was: * Block reading streaming temp view via `spark.table` API * The code paths of `spark.table` and `spark.read.table` should be the same. This behavior is broke in SPARK-32592 since we need to respect options in `spark.read.table` API.) > Unify the code paths for spark.table and spark.read.table > - > > Key: SPARK-33244 > URL: https://issues.apache.org/jira/browse/SPARK-33244 > Project: Spark > Issue Type: Improvement > Components: SQL, Structured Streaming >Affects Versions: 3.0.0 >Reporter: Yuanjian Li >Priority: Major > > The code paths of `spark.table` and `spark.read.table` should be the same. > This behavior is broke in SPARK-32592 since we need to respect options in > `spark.read.table` API. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-33244) Unify the code paths for spark.table and spark.read.table
[ https://issues.apache.org/jira/browse/SPARK-33244?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuanjian Li updated SPARK-33244: Summary: Unify the code paths for spark.table and spark.read.table (was: Block reading streaming temp view via `spark.table` API) > Unify the code paths for spark.table and spark.read.table > - > > Key: SPARK-33244 > URL: https://issues.apache.org/jira/browse/SPARK-33244 > Project: Spark > Issue Type: Improvement > Components: SQL, Structured Streaming >Affects Versions: 3.0.0 >Reporter: Yuanjian Li >Priority: Major > > * Block reading streaming temp view via `spark.table` API > * The code paths of `spark.table` and `spark.read.table` should be the same. > This behavior is broke in SPARK-32592 since we need to respect options in > `spark.read.table` API. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-33244) Block reading streaming temp view via `spark.table` API
[ https://issues.apache.org/jira/browse/SPARK-33244?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuanjian Li updated SPARK-33244: Description: * Block reading streaming temp view via `spark.table` API * The code paths of `spark.table` and `spark.read.table` should be the same. This behavior is broke in SPARK-32592 since we need to respect options in `spark.read.table` API. was: The code paths of `spark.table` and `spark.read.table` should be the same. This behavior is broke in SPARK-32592 since we need to respect options in `spark.read.table` API. > Block reading streaming temp view via `spark.table` API > --- > > Key: SPARK-33244 > URL: https://issues.apache.org/jira/browse/SPARK-33244 > Project: Spark > Issue Type: Improvement > Components: SQL, Structured Streaming >Affects Versions: 3.0.0 >Reporter: Yuanjian Li >Priority: Major > > * Block reading streaming temp view via `spark.table` API > * The code paths of `spark.table` and `spark.read.table` should be the same. > This behavior is broke in SPARK-32592 since we need to respect options in > `spark.read.table` API. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-33244) Block reading streaming temp view via `spark.table` API
[ https://issues.apache.org/jira/browse/SPARK-33244?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuanjian Li updated SPARK-33244: Description: The code paths of `spark.table` and `spark.read.table` should be the same. This behavior is broke in SPARK-32592 since we need to respect options in `spark.read.table` API. was:The code paths of `spark.table` and `spark.read.table` should be the same. This behavior is broke in SPARK-32592 since we need to respect options in `spark.read.table` API. > Block reading streaming temp view via `spark.table` API > --- > > Key: SPARK-33244 > URL: https://issues.apache.org/jira/browse/SPARK-33244 > Project: Spark > Issue Type: Improvement > Components: SQL, Structured Streaming >Affects Versions: 3.0.0 >Reporter: Yuanjian Li >Priority: Major > > > The code paths of `spark.table` and `spark.read.table` should be the same. > This behavior is broke in SPARK-32592 since we need to respect options in > `spark.read.table` API. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-33244) Block reading streaming temp view via `spark.table` API
[ https://issues.apache.org/jira/browse/SPARK-33244?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuanjian Li updated SPARK-33244: Summary: Block reading streaming temp view via `spark.table` API (was: Unify the code paths for spark.table and spark.read.table) > Block reading streaming temp view via `spark.table` API > --- > > Key: SPARK-33244 > URL: https://issues.apache.org/jira/browse/SPARK-33244 > Project: Spark > Issue Type: Improvement > Components: SQL, Structured Streaming >Affects Versions: 3.0.0 >Reporter: Yuanjian Li >Priority: Major > > The code paths of `spark.table` and `spark.read.table` should be the same. > This behavior is broke in SPARK-32592 since we need to respect options in > `spark.read.table` API. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-33244) Unify the code paths for spark.table and spark.read.table
[ https://issues.apache.org/jira/browse/SPARK-33244?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuanjian Li updated SPARK-33244: Component/s: Structured Streaming > Unify the code paths for spark.table and spark.read.table > - > > Key: SPARK-33244 > URL: https://issues.apache.org/jira/browse/SPARK-33244 > Project: Spark > Issue Type: Improvement > Components: SQL, Structured Streaming >Affects Versions: 3.0.0 >Reporter: Yuanjian Li >Priority: Major > > The code paths of `spark.table` and `spark.read.table` should be the same. > This behavior is broke in SPARK-32592 since we need to respect options in > `spark.read.table` API. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-33244) Unify the code paths for spark.table and spark.read.table
Yuanjian Li created SPARK-33244: --- Summary: Unify the code paths for spark.table and spark.read.table Key: SPARK-33244 URL: https://issues.apache.org/jira/browse/SPARK-33244 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.0.0 Reporter: Yuanjian Li The code paths of `spark.table` and `spark.read.table` should be the same. This behavior is broke in SPARK-32592 since we need to respect options in `spark.read.table` API. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-32885) Add DataStreamReader.table API
Yuanjian Li created SPARK-32885: --- Summary: Add DataStreamReader.table API Key: SPARK-32885 URL: https://issues.apache.org/jira/browse/SPARK-32885 Project: Spark Issue Type: New Feature Components: Structured Streaming Affects Versions: 3.1.0 Reporter: Yuanjian Li This ticket aims to add a new `table` API in DataStreamReader, which is similar to the table API in DataFrameReader. Users can directly use this API to get a Streaming DataFrame on a table. Below is a simple example: Application 1 for initializing and starting the streaming job: {code:java} val path = "/home/yuanjian.li/runtime/to_be_deleted" val tblName = "my_table" // Write some data to `my_table` spark.range(3).write.format("parquet").option("path", path).saveAsTable(tblName) // Read the table as a streaming source, write result to destination directory val table = spark.readStream.table(tblName) table.writeStream.format("parquet").option("checkpointLocation", "/home/yuanjian.li/runtime/to_be_deleted_ck").start("/home/yuanjian.li/runtime/to_be_deleted_2") {code} Application 2 for appending new data: {code:java} // Append new data into the path spark.range(5).write.format("parquet").option("path", "/home/yuanjian.li/runtime/to_be_deleted").mode("append").save(){code} Check result: {code:java} // The desitination directory should contains all written data spark.read.parquet("/home/yuanjian.li/runtime/to_be_deleted_2").show() {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-32844) Make `DataFrameReader.table` take the specified options for datasource v1
Yuanjian Li created SPARK-32844: --- Summary: Make `DataFrameReader.table` take the specified options for datasource v1 Key: SPARK-32844 URL: https://issues.apache.org/jira/browse/SPARK-32844 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.1.0 Reporter: Yuanjian Li Same as the work in SPARK-32592, we need to keep the same behavior in datasource v1 of taking the specified options for DataFrameReader.table API. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-32782) Refactory StreamingRelationV2 and move it to catalyst
Yuanjian Li created SPARK-32782: --- Summary: Refactory StreamingRelationV2 and move it to catalyst Key: SPARK-32782 URL: https://issues.apache.org/jira/browse/SPARK-32782 Project: Spark Issue Type: Improvement Components: Structured Streaming Affects Versions: 3.1.0 Reporter: Yuanjian Li Currently, the StreamingRelationV2 is bind with TableProvider. To make it more flexible and have better expansibility, it should be moved to the catalyst module and bind with the Table interface. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-32456) Check the Distinct by assuming it as Aggregate for Structured Streaming
[ https://issues.apache.org/jira/browse/SPARK-32456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuanjian Li updated SPARK-32456: Description: We want to fix 2 things here: 1. Give better error message for Distinct related operations in append mode that doesn't have a watermark Check the following example: {code:java} val s1 = spark.readStream.format("rate").option("rowsPerSecond", 1).load().createOrReplaceTempView("s1") val s2 = spark.readStream.format("rate").option("rowsPerSecond", 1).load().createOrReplaceTempView("s2") val unionRes = spark.sql("select s1.value , s1.timestamp from s1 union select s2.value, s2.timestamp from s2") unionResult.writeStream.option("checkpointLocation", ${pathA}).start(${pathB}){code} We'll get the following confusing exception: {code:java} java.util.NoSuchElementException: None.get at scala.None$.get(Option.scala:529) at scala.None$.get(Option.scala:527) at org.apache.spark.sql.execution.streaming.StateStoreSaveExec.$anonfun$doExecute$9(statefulOperators.scala:346) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:561) at org.apache.spark.sql.execution.streaming.StateStoreWriter.timeTakenMs(statefulOperators.scala:112) ... {code} The union clause in SQL has the requirement of deduplication, the parser will generate {{Distinct(Union)}} and the optimizer rule {{ReplaceDistinctWithAggregate}} will change it to {{Aggregate(Union)}}. So the root cause here is the checking logic for Aggregate is missing for Distinct. Actually it happens for all Distinct related operations in Structured Streaming, e.g {code:java} val df = spark.readStream.format("rate").load() df.createOrReplaceTempView("deduptest") val distinct = spark.sql("select distinct value from deduptest") distinct.writeStream.option("checkpointLocation", ${pathA}).start(${pathB}){code} 2. Make {{Distinct}} in complete mode runnable. The distinct in complete mode will throw the exception: {quote} {{Complete output mode not supported when there are no streaming aggregations on streaming DataFrames/Datasets;}} {quote} was: Check the following example: {code:java} val s1 = spark.readStream.format("rate").option("rowsPerSecond", 1).load().createOrReplaceTempView("s1") val s2 = spark.readStream.format("rate").option("rowsPerSecond", 1).load().createOrReplaceTempView("s2") val unionRes = spark.sql("select s1.value , s1.timestamp from s1 union select s2.value, s2.timestamp from s2") unionResult.writeStream.option("checkpointLocation", ${pathA}).start(${pathB}){code} We'll get the following confusing exception: {code:java} java.util.NoSuchElementException: None.get at scala.None$.get(Option.scala:529) at scala.None$.get(Option.scala:527) at org.apache.spark.sql.execution.streaming.StateStoreSaveExec.$anonfun$doExecute$9(statefulOperators.scala:346) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:561) at org.apache.spark.sql.execution.streaming.StateStoreWriter.timeTakenMs(statefulOperators.scala:112) ... {code} The union clause in SQL has the requirement of deduplication, the parser will generate {{Distinct(Union)}} and the optimizer rule {{ReplaceDistinctWithAggregate}} will change it to {{Aggregate(Union)}}. So the root cause here is the checking logic for Aggregate is missing for Distinct. Actually it happens for all Distinct related operations in Structured Streaming, e.g {code:java} val df = spark.readStream.format("rate").load() df.createOrReplaceTempView("deduptest") val distinct = spark.sql("select distinct value from deduptest") distinct.writeStream.option("checkpointLocation", ${pathA}).start(${pathB}){code} > Check the Distinct by assuming it as Aggregate for Structured Streaming > --- > > Key: SPARK-32456 > URL: https://issues.apache.org/jira/browse/SPARK-32456 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 3.0.0 >Reporter: Yuanjian Li >Priority: Major > > We want to fix 2 things here: > 1. Give better error message for Distinct related operations in append mode > that doesn't have a watermark > Check the following example: > {code:java} > val s1 = spark.readStream.format("rate").option("rowsPerSecond", > 1).load().createOrReplaceTempView("s1") > val s2 = spark.readStream.format("rate").option("rowsPerSecond", > 1).load().createOrReplaceTempView("s2") > val unionRes = spark.sql("select s1.value , s1.timestamp from s1 union select > s2.value, s2.timestamp from s2") > unionResult.writeStream.option("checkpointLocation", >
[jira] [Updated] (SPARK-32456) Check the Distinct by assuming it as Aggregate for Structured Streaming
[ https://issues.apache.org/jira/browse/SPARK-32456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuanjian Li updated SPARK-32456: Summary: Check the Distinct by assuming it as Aggregate for Structured Streaming (was: Give better error message for Distinct related operations in append mode without watermark) > Check the Distinct by assuming it as Aggregate for Structured Streaming > --- > > Key: SPARK-32456 > URL: https://issues.apache.org/jira/browse/SPARK-32456 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 3.0.0 >Reporter: Yuanjian Li >Priority: Major > > Check the following example: > > {code:java} > val s1 = spark.readStream.format("rate").option("rowsPerSecond", > 1).load().createOrReplaceTempView("s1") > val s2 = spark.readStream.format("rate").option("rowsPerSecond", > 1).load().createOrReplaceTempView("s2") > val unionRes = spark.sql("select s1.value , s1.timestamp from s1 union select > s2.value, s2.timestamp from s2") > unionResult.writeStream.option("checkpointLocation", > ${pathA}).start(${pathB}){code} > We'll get the following confusing exception: > {code:java} > java.util.NoSuchElementException: None.get > at scala.None$.get(Option.scala:529) > at scala.None$.get(Option.scala:527) > at > org.apache.spark.sql.execution.streaming.StateStoreSaveExec.$anonfun$doExecute$9(statefulOperators.scala:346) > at > scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) > at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:561) > at > org.apache.spark.sql.execution.streaming.StateStoreWriter.timeTakenMs(statefulOperators.scala:112) > ... > {code} > The union clause in SQL has the requirement of deduplication, the parser will > generate {{Distinct(Union)}} and the optimizer rule > {{ReplaceDistinctWithAggregate}} will change it to {{Aggregate(Union)}}. So > the root cause here is the checking logic for Aggregate is missing for > Distinct. > > Actually it happens for all Distinct related operations in Structured > Streaming, e.g > {code:java} > val df = spark.readStream.format("rate").load() > df.createOrReplaceTempView("deduptest") > val distinct = spark.sql("select distinct value from deduptest") > distinct.writeStream.option("checkpointLocation", > ${pathA}).start(${pathB}){code} > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-32456) Give better error message for Distinct related operations in append mode without watermark
[ https://issues.apache.org/jira/browse/SPARK-32456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuanjian Li updated SPARK-32456: Description: Check the following example: {code:java} val s1 = spark.readStream.format("rate").option("rowsPerSecond", 1).load().createOrReplaceTempView("s1") val s2 = spark.readStream.format("rate").option("rowsPerSecond", 1).load().createOrReplaceTempView("s2") val unionRes = spark.sql("select s1.value , s1.timestamp from s1 union select s2.value, s2.timestamp from s2") unionResult.writeStream.option("checkpointLocation", ${pathA}).start(${pathB}){code} We'll get the following confusing exception: {code:java} java.util.NoSuchElementException: None.get at scala.None$.get(Option.scala:529) at scala.None$.get(Option.scala:527) at org.apache.spark.sql.execution.streaming.StateStoreSaveExec.$anonfun$doExecute$9(statefulOperators.scala:346) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:561) at org.apache.spark.sql.execution.streaming.StateStoreWriter.timeTakenMs(statefulOperators.scala:112) ... {code} The union clause in SQL has the requirement of deduplication, the parser will generate {{Distinct(Union)}} and the optimizer rule {{ReplaceDistinctWithAggregate}} will change it to {{Aggregate(Union)}}. So the root cause here is the checking logic for Aggregate is missing for Distinct. Actually it happens for all Distinct related operations in Structured Streaming, e.g {code:java} val df = spark.readStream.format("rate").load() df.createOrReplaceTempView("deduptest") val distinct = spark.sql("select distinct value from deduptest") distinct.writeStream.option("checkpointLocation", ${pathA}).start(${pathB}){code} was: Check the following example: {code:java} val s1 = spark.readStream.format("rate").option("rowsPerSecond", 1).load().createOrReplaceTempView("s1") val s2 = spark.readStream.format("rate").option("rowsPerSecond", 1).load().createOrReplaceTempView("s2") val unionRes = spark.sql("select s1.value , s1.timestamp from s1 union select s2.value, s2.timestamp from s2") unionResult.writeStream.option("checkpointLocation", ${pathA}).start(${pathB}) {code} We'll get the following confusing exception: {code:java} java.util.NoSuchElementException: None.get at scala.None$.get(Option.scala:529) at scala.None$.get(Option.scala:527) at org.apache.spark.sql.execution.streaming.StateStoreSaveExec.$anonfun$doExecute$9(statefulOperators.scala:346) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:561) at org.apache.spark.sql.execution.streaming.StateStoreWriter.timeTakenMs(statefulOperators.scala:112) ... {code} The union clause in SQL has the requirement of deduplication, the parser will generate {{Distinct(Union)}} and the optimizer rule {{ReplaceDistinctWithAggregate}} will change it to {{Aggregate(Union)}}. So the root cause here is the checking logic for Aggregate is missing for Distinct. > Give better error message for Distinct related operations in append mode > without watermark > -- > > Key: SPARK-32456 > URL: https://issues.apache.org/jira/browse/SPARK-32456 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 3.0.0 >Reporter: Yuanjian Li >Priority: Major > > Check the following example: > > {code:java} > val s1 = spark.readStream.format("rate").option("rowsPerSecond", > 1).load().createOrReplaceTempView("s1") > val s2 = spark.readStream.format("rate").option("rowsPerSecond", > 1).load().createOrReplaceTempView("s2") > val unionRes = spark.sql("select s1.value , s1.timestamp from s1 union select > s2.value, s2.timestamp from s2") > unionResult.writeStream.option("checkpointLocation", > ${pathA}).start(${pathB}){code} > We'll get the following confusing exception: > {code:java} > java.util.NoSuchElementException: None.get > at scala.None$.get(Option.scala:529) > at scala.None$.get(Option.scala:527) > at > org.apache.spark.sql.execution.streaming.StateStoreSaveExec.$anonfun$doExecute$9(statefulOperators.scala:346) > at > scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) > at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:561) > at > org.apache.spark.sql.execution.streaming.StateStoreWriter.timeTakenMs(statefulOperators.scala:112) > ... > {code} > The union clause in SQL has the requirement of deduplication, the parser will > generate {{Distinct(Union)}} and the optimizer rule > {{ReplaceDistinctWithAggregate}} will change it to
[jira] [Updated] (SPARK-32456) Give better error message for Distinct related operations in append mode without watermark
[ https://issues.apache.org/jira/browse/SPARK-32456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuanjian Li updated SPARK-32456: Summary: Give better error message for Distinct related operations in append mode without watermark (was: Give better error message for union streams in append mode that don't have a watermark) > Give better error message for Distinct related operations in append mode > without watermark > -- > > Key: SPARK-32456 > URL: https://issues.apache.org/jira/browse/SPARK-32456 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 3.0.0 >Reporter: Yuanjian Li >Priority: Major > > Check the following example: > > {code:java} > val s1 = spark.readStream.format("rate").option("rowsPerSecond", > 1).load().createOrReplaceTempView("s1") > val s2 = spark.readStream.format("rate").option("rowsPerSecond", > 1).load().createOrReplaceTempView("s2") > val unionRes = spark.sql("select s1.value , s1.timestamp from s1 union select > s2.value, s2.timestamp from s2") > unionResult.writeStream.option("checkpointLocation", ${pathA}).start(${pathB}) > {code} > > We'll get the following confusing exception: > {code:java} > java.util.NoSuchElementException: None.get > at scala.None$.get(Option.scala:529) > at scala.None$.get(Option.scala:527) > at > org.apache.spark.sql.execution.streaming.StateStoreSaveExec.$anonfun$doExecute$9(statefulOperators.scala:346) > at > scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) > at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:561) > at > org.apache.spark.sql.execution.streaming.StateStoreWriter.timeTakenMs(statefulOperators.scala:112) > ... > {code} > The union clause in SQL has the requirement of deduplication, the parser will > generate {{Distinct(Union)}} and the optimizer rule > {{ReplaceDistinctWithAggregate}} will change it to {{Aggregate(Union)}}. So > the root cause here is the checking logic for Aggregate is missing for > Distinct. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-32456) Give better error message for union streams in append mode that don't have a watermark
Yuanjian Li created SPARK-32456: --- Summary: Give better error message for union streams in append mode that don't have a watermark Key: SPARK-32456 URL: https://issues.apache.org/jira/browse/SPARK-32456 Project: Spark Issue Type: Improvement Components: Structured Streaming Affects Versions: 3.0.0 Reporter: Yuanjian Li Check the following example: {code:java} val s1 = spark.readStream.format("rate").option("rowsPerSecond", 1).load().createOrReplaceTempView("s1") val s2 = spark.readStream.format("rate").option("rowsPerSecond", 1).load().createOrReplaceTempView("s2") val unionRes = spark.sql("select s1.value , s1.timestamp from s1 union select s2.value, s2.timestamp from s2") unionResult.writeStream.option("checkpointLocation", ${pathA}).start(${pathB}) {code} We'll get the following confusing exception: {code:java} java.util.NoSuchElementException: None.get at scala.None$.get(Option.scala:529) at scala.None$.get(Option.scala:527) at org.apache.spark.sql.execution.streaming.StateStoreSaveExec.$anonfun$doExecute$9(statefulOperators.scala:346) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:561) at org.apache.spark.sql.execution.streaming.StateStoreWriter.timeTakenMs(statefulOperators.scala:112) ... {code} The union clause in SQL has the requirement of deduplication, the parser will generate {{Distinct(Union)}} and the optimizer rule {{ReplaceDistinctWithAggregate}} will change it to {{Aggregate(Union)}}. So the root cause here is the checking logic for Aggregate is missing for Distinct. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-32386) Fix temp view leaking in Structured Streaming tests
[ https://issues.apache.org/jira/browse/SPARK-32386?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuanjian Li resolved SPARK-32386. - Resolution: Won't Fix In various suites, we need this temp view for checking results, e.g KafkaDontFailOnDataLossSuite. > Fix temp view leaking in Structured Streaming tests > --- > > Key: SPARK-32386 > URL: https://issues.apache.org/jira/browse/SPARK-32386 > Project: Spark > Issue Type: Bug > Components: Structured Streaming, Tests >Affects Versions: 3.0.0 >Reporter: Yuanjian Li >Priority: Minor > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-32389) Add all hive.execution suites in the parallel test group
Yuanjian Li created SPARK-32389: --- Summary: Add all hive.execution suites in the parallel test group Key: SPARK-32389 URL: https://issues.apache.org/jira/browse/SPARK-32389 Project: Spark Issue Type: Improvement Components: Tests Affects Versions: 3.0.0 Reporter: Yuanjian Li Similar to SPARK-27460, we add an extra parallel test group for all `hive.executiton` suites to reduce the Jenkins testing time. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-32386) Fix temp view leaking in Structured Streaming tests
Yuanjian Li created SPARK-32386: --- Summary: Fix temp view leaking in Structured Streaming tests Key: SPARK-32386 URL: https://issues.apache.org/jira/browse/SPARK-32386 Project: Spark Issue Type: Bug Components: Tests Affects Versions: 3.0.0 Reporter: Yuanjian Li -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-32386) Fix temp view leaking in Structured Streaming tests
[ https://issues.apache.org/jira/browse/SPARK-32386?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuanjian Li updated SPARK-32386: Component/s: Structured Streaming > Fix temp view leaking in Structured Streaming tests > --- > > Key: SPARK-32386 > URL: https://issues.apache.org/jira/browse/SPARK-32386 > Project: Spark > Issue Type: Bug > Components: Structured Streaming, Tests >Affects Versions: 3.0.0 >Reporter: Yuanjian Li >Priority: Minor > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-32115) Incorrect results for SUBSTRING when overflow
[ https://issues.apache.org/jira/browse/SPARK-32115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17148409#comment-17148409 ] Yuanjian Li commented on SPARK-32115: - Thank you for verifying! [~dongjoon] > Incorrect results for SUBSTRING when overflow > - > > Key: SPARK-32115 > URL: https://issues.apache.org/jira/browse/SPARK-32115 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.3, 2.0.2, 2.1.3, 2.2.3, 2.3.4, 2.4.6, 3.0.0 >Reporter: Yuanjian Li >Assignee: Yuanjian Li >Priority: Blocker > Labels: correctness > Fix For: 2.4.7, 3.0.1, 3.1.0 > > > SQL query SELECT SUBSTRING("abc", -1207959552, -1207959552) incorrectly > returns "abc" against expected output of "". > This is a result of integer overflow in addition > [https://github.com/apache/spark/blob/8c44d744631516a5cdaf63406e69a9dd11e5b878/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java#L345] -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-32115) Incorrect results for SUBSTRING when overflow
[ https://issues.apache.org/jira/browse/SPARK-32115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuanjian Li updated SPARK-32115: Description: SQL query SELECT SUBSTRING("abc", -1207959552, -1207959552) incorrectly returns "abc" against expected output of "". This is a result of integer overflow in addition [https://github.com/apache/spark/blob/8c44d744631516a5cdaf63406e69a9dd11e5b878/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java#L345] was: SQL query SELECT SUBSTRING("abc", -1207959552, -1207959552) in Spark incorrectly returns "abc" against expected output of "". This is a result of integer overflow in addition [https://github.com/apache/spark/blob/8c44d744631516a5cdaf63406e69a9dd11e5b878/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java#L345] > Incorrect results for SUBSTRING when overflow > - > > Key: SPARK-32115 > URL: https://issues.apache.org/jira/browse/SPARK-32115 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.0.0 >Reporter: Yuanjian Li >Priority: Major > > SQL query SELECT SUBSTRING("abc", -1207959552, -1207959552) incorrectly > returns "abc" against expected output of "". > This is a result of integer overflow in addition > [https://github.com/apache/spark/blob/8c44d744631516a5cdaf63406e69a9dd11e5b878/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java#L345] -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-32115) Incorrect results for SUBSTRING when overflow
Yuanjian Li created SPARK-32115: --- Summary: Incorrect results for SUBSTRING when overflow Key: SPARK-32115 URL: https://issues.apache.org/jira/browse/SPARK-32115 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.0.0 Reporter: Yuanjian Li SQL query SELECT SUBSTRING("abc", -1207959552, -1207959552) in Spark incorrectly returns "abc" against expected output of "". This is a result of integer overflow in addition [https://github.com/apache/spark/blob/8c44d744631516a5cdaf63406e69a9dd11e5b878/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java#L345] -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-31905) Add compatibility tests for streaming state store format
[ https://issues.apache.org/jira/browse/SPARK-31905?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuanjian Li updated SPARK-31905: Summary: Add compatibility tests for streaming state store format (was: Add compatibility tests for streaming aggregation state store format) > Add compatibility tests for streaming state store format > > > Key: SPARK-31905 > URL: https://issues.apache.org/jira/browse/SPARK-31905 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 3.0.0 >Reporter: Yuanjian Li >Priority: Major > > After SPARK-31894, we have a validation checking for the streaming state > store. It's better to add integrated tests in the PR builder as soon as the > breaking changes introduced. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-31894) Introduce UnsafeRow format validation for streaming state store
[ https://issues.apache.org/jira/browse/SPARK-31894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17135431#comment-17135431 ] Yuanjian Li commented on SPARK-31894: - This issue is blocking by SPARK-31990, see more details in https://github.com/apache/spark/pull/28707#issuecomment-643916110. > Introduce UnsafeRow format validation for streaming state store > --- > > Key: SPARK-31894 > URL: https://issues.apache.org/jira/browse/SPARK-31894 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 3.1.0 >Reporter: Yuanjian Li >Priority: Major > > Currently, Structured Streaming directly puts the UnsafeRow into StateStore > without any schema validation. It's a dangerous behavior when users reusing > the checkpoint file during migration. Any changes or bug fix related to the > aggregate function may cause random exceptions, even the wrong answer, e.g > SPARK-28067. > Here we introduce an UnsafeRow format validation for the state store. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-31990) Streaming's state store compatibility is broken
[ https://issues.apache.org/jira/browse/SPARK-31990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17135428#comment-17135428 ] Yuanjian Li commented on SPARK-31990: - [~maropu] Thanks for the quick fix! [~dongjoon] This issue is found when investigating SPARK-31894, and it's also a blocker issue for it. I added a detailed analysis in SPARK-31894's [PR comments|https://github.com/apache/spark/pull/28707#issuecomment-643916110]. I think the simple revert fix in [https://github.com/apache/spark/pull/28830] can be merged without tests. I'll add a new integrated test in SPARK-31905. > Streaming's state store compatibility is broken > --- > > Key: SPARK-31990 > URL: https://issues.apache.org/jira/browse/SPARK-31990 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 3.0.0 >Reporter: Xiao Li >Priority: Blocker > Labels: correctness > > [This > line|https://github.com/apache/spark/pull/28062/files#diff-7a46f10c3cedbf013cf255564d9483cdR2458] > of [https://github.com/apache/spark/pull/28062] changed the order of > groupCols in dropDuplicates(). Thus, the executor JVM could probably crash, > throw a random exception or even return a wrong answer when using the > checkpoint written by the previous version. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-31905) Add compatibility tests for streaming aggregation state store format
[ https://issues.apache.org/jira/browse/SPARK-31905?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuanjian Li updated SPARK-31905: Summary: Add compatibility tests for streaming aggregation state store format (was: Add compatibility tests for streaming aggregation) > Add compatibility tests for streaming aggregation state store format > > > Key: SPARK-31905 > URL: https://issues.apache.org/jira/browse/SPARK-31905 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 3.0.0 >Reporter: Yuanjian Li >Priority: Major > > After SPARK-31894, we have a validation checking for the streaming state > store. It's better to add integrated tests in the PR builder as soon as the > breaking changes introduced. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-31905) Add compatibility tests for streaming aggregation
[ https://issues.apache.org/jira/browse/SPARK-31905?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuanjian Li updated SPARK-31905: Summary: Add compatibility tests for streaming aggregation (was: Add compatibility test for streaming aggregation) > Add compatibility tests for streaming aggregation > - > > Key: SPARK-31905 > URL: https://issues.apache.org/jira/browse/SPARK-31905 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 3.0.0 >Reporter: Yuanjian Li >Priority: Major > > After SPARK-31894, we have a validation checking for the streaming state > store. It's better to add integrated tests in the PR builder as soon as the > breaking changes introduced. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-31905) Add compatibility test for streaming aggregation
Yuanjian Li created SPARK-31905: --- Summary: Add compatibility test for streaming aggregation Key: SPARK-31905 URL: https://issues.apache.org/jira/browse/SPARK-31905 Project: Spark Issue Type: Bug Components: Structured Streaming Affects Versions: 3.0.0 Reporter: Yuanjian Li After SPARK-31894, we have a validation checking for the streaming state store. It's better to add integrated tests in the PR builder as soon as the breaking changes introduced. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-31894) Introduce UnsafeRow format validation for streaming state store
Yuanjian Li created SPARK-31894: --- Summary: Introduce UnsafeRow format validation for streaming state store Key: SPARK-31894 URL: https://issues.apache.org/jira/browse/SPARK-31894 Project: Spark Issue Type: Bug Components: Structured Streaming Affects Versions: 3.1.0 Reporter: Yuanjian Li Currently, Structured Streaming directly puts the UnsafeRow into StateStore without any schema validation. It's a dangerous behavior when users reusing the checkpoint file during migration. Any changes or bug fix related to the aggregate function may cause random exceptions, even the wrong answer, e.g SPARK-28067. Here we introduce an UnsafeRow format validation for the state store. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-31663) Grouping sets with having clause returns the wrong result
[ https://issues.apache.org/jira/browse/SPARK-31663?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuanjian Li updated SPARK-31663: Description: Grouping sets with having clause returns the wrong result when the condition of having contained conflicting naming. See the below example: {code:java} select sum(a) as b FROM VALUES (1, 10), (2, 20) AS T(a, b) group by GROUPING SETS ((b), (a, b)) having b > 10{code} The `b` in `having b > 10` should be resolved as `T.b` not `sum(a)`, so the right result should be {code:java} +---+ | b| +---+ | 2| | 2| +---+{code} instead of an empty result. The root cause is similar to SPARK-31519, it's caused by we parsed HAVING as Filter(..., Agg(...)) and resolved these two parts in different rules. The CUBE and ROLLUP have the same issue. Other systems worked as expected, I checked PostgreSQL 9.6 and MS SQL Server 2017. was: Grouping sets with having clause returns the wrong result when the condition of having contained conflicting naming. See the below example: {code:java} select sum(a) as b FROM VALUES (1, 10), (2, 20) AS T(a, b) group by GROUPING SETS ((b), (a, b)) having b > 10{code} The `b` in `having b > 10` should be resolved as `T.b` not `sum(a)`, so the right result should be {code:java} +---+ | b| +---+ | 2| | 2| +---+{code} instead of an empty result. The root cause is similar to SPARK-31519, it's caused by we parsed HAVING as Filter(..., Agg(...)) and resolved these two parts in different rules. The CUBE and ROLLUP have the same issue. > Grouping sets with having clause returns the wrong result > - > > Key: SPARK-31663 > URL: https://issues.apache.org/jira/browse/SPARK-31663 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.5, 3.0.0 >Reporter: Yuanjian Li >Priority: Major > > Grouping sets with having clause returns the wrong result when the condition > of having contained conflicting naming. See the below example: > {code:java} > select sum(a) as b FROM VALUES (1, 10), (2, 20) AS T(a, b) group by GROUPING > SETS ((b), (a, b)) having b > 10{code} > The `b` in `having b > 10` should be resolved as `T.b` not `sum(a)`, so the > right result should be > {code:java} > +---+ > | b| > +---+ > | 2| > | 2| > +---+{code} > instead of an empty result. > The root cause is similar to SPARK-31519, it's caused by we parsed HAVING as > Filter(..., Agg(...)) and resolved these two parts in different rules. The > CUBE and ROLLUP have the same issue. > Other systems worked as expected, I checked PostgreSQL 9.6 and MS SQL Server > 2017. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-31663) Grouping sets with having clause returns the wrong result
[ https://issues.apache.org/jira/browse/SPARK-31663?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuanjian Li updated SPARK-31663: Description: Grouping sets with having clause returns the wrong result when the condition of having contained conflicting naming. See the below example: {code:java} select sum(a) as b FROM VALUES (1, 10), (2, 20) AS T(a, b) group by GROUPING SETS ((b), (a, b)) having b > 10{code} The `b` in `having b > 10` should be resolved as `T.b` not `sum(a)`, so the right result should be {code:java} +---+ | b| +---+ | 2| | 2| +---+{code} instead of an empty result. The root cause is similar to SPARK-31519, it's caused by we parsed HAVING as Filter(..., Agg(...)) and resolved these two parts in different rules. The CUBE and ROLLUP have the same issue. was: Grouping sets with having clause returns the wrong result when the condition of having contained conflicting naming. See the below example: {quote} select sum(a) as b FROM VALUES (1, 10), (2, 20) AS T(a, b) group by GROUPING SETS ((b), (a, b)) having b > 10 {quote} The `b` in `having b > 10` should be resolved as `T.b` not `sum(a)`, so the right result should be {quote} +---+ | b| +---+ | 2| | 2| +---+ {quote} instead of an empty result. The root cause is similar to SPARK-31519, it's caused by we parsed HAVING as Filter(..., Agg(...)) and resolved these two parts in different rules. The CUBE and ROLLUP have the same issue. > Grouping sets with having clause returns the wrong result > - > > Key: SPARK-31663 > URL: https://issues.apache.org/jira/browse/SPARK-31663 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.5, 3.0.0 >Reporter: Yuanjian Li >Priority: Major > > Grouping sets with having clause returns the wrong result when the condition > of having contained conflicting naming. See the below example: > {code:java} > select sum(a) as b FROM VALUES (1, 10), (2, 20) AS T(a, b) group by GROUPING > SETS ((b), (a, b)) having b > 10{code} > The `b` in `having b > 10` should be resolved as `T.b` not `sum(a)`, so the > right result should be > {code:java} > +---+ > | b| > +---+ > | 2| > | 2| > +---+{code} > instead of an empty result. > The root cause is similar to SPARK-31519, it's caused by we parsed HAVING as > Filter(..., Agg(...)) and resolved these two parts in different rules. The > CUBE and ROLLUP have the same issue. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-31663) Grouping sets with having clause returns the wrong result
[ https://issues.apache.org/jira/browse/SPARK-31663?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuanjian Li updated SPARK-31663: Labels: (was: correct) > Grouping sets with having clause returns the wrong result > - > > Key: SPARK-31663 > URL: https://issues.apache.org/jira/browse/SPARK-31663 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.5, 3.0.0 >Reporter: Yuanjian Li >Priority: Major > > Grouping sets with having clause returns the wrong result when the condition > of having contained conflicting naming. See the below example: > {quote} > select sum(a) as b FROM VALUES (1, 10), (2, 20) AS T(a, b) group by GROUPING > SETS ((b), (a, b)) having b > 10 > {quote} > The `b` in `having b > 10` should be resolved as `T.b` not `sum(a)`, so the > right result should be > {quote} > +---+ > | b| > +---+ > | 2| > | 2| > +---+ > {quote} > instead of an empty result. > The root cause is similar to SPARK-31519, it's caused by we parsed HAVING as > Filter(..., Agg(...)) and resolved these two parts in different rules. The > CUBE and ROLLUP have the same issue. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-31663) Grouping sets with having clause returns the wrong result
Yuanjian Li created SPARK-31663: --- Summary: Grouping sets with having clause returns the wrong result Key: SPARK-31663 URL: https://issues.apache.org/jira/browse/SPARK-31663 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.4.5, 3.0.0 Reporter: Yuanjian Li Grouping sets with having clause returns the wrong result when the condition of having contained conflicting naming. See the below example: {quote} select sum(a) as b FROM VALUES (1, 10), (2, 20) AS T(a, b) group by GROUPING SETS ((b), (a, b)) having b > 10 {quote} The `b` in `having b > 10` should be resolved as `T.b` not `sum(a)`, so the right result should be {quote} +---+ | b| +---+ | 2| | 2| +---+ {quote} instead of an empty result. The root cause is similar to SPARK-31519, it's caused by we parsed HAVING as Filter(..., Agg(...)) and resolved these two parts in different rules. The CUBE and ROLLUP have the same issue. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-31663) Grouping sets with having clause returns the wrong result
[ https://issues.apache.org/jira/browse/SPARK-31663?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuanjian Li updated SPARK-31663: Labels: correct (was: ) > Grouping sets with having clause returns the wrong result > - > > Key: SPARK-31663 > URL: https://issues.apache.org/jira/browse/SPARK-31663 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.5, 3.0.0 >Reporter: Yuanjian Li >Priority: Major > Labels: correct > > Grouping sets with having clause returns the wrong result when the condition > of having contained conflicting naming. See the below example: > {quote} > select sum(a) as b FROM VALUES (1, 10), (2, 20) AS T(a, b) group by GROUPING > SETS ((b), (a, b)) having b > 10 > {quote} > The `b` in `having b > 10` should be resolved as `T.b` not `sum(a)`, so the > right result should be > {quote} > +---+ > | b| > +---+ > | 2| > | 2| > +---+ > {quote} > instead of an empty result. > The root cause is similar to SPARK-31519, it's caused by we parsed HAVING as > Filter(..., Agg(...)) and resolved these two parts in different rules. The > CUBE and ROLLUP have the same issue. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-31519) Cast in having aggregate expressions returns the wrong result
Yuanjian Li created SPARK-31519: --- Summary: Cast in having aggregate expressions returns the wrong result Key: SPARK-31519 URL: https://issues.apache.org/jira/browse/SPARK-31519 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.0.0 Reporter: Yuanjian Li Cast in having aggregate expressions returns the wrong result. See the below tests: {code:java} scala> spark.sql("create temp view t(a, b) as values (1,10), (2, 20)") res0: org.apache.spark.sql.DataFrame = [] scala> val query = """ | select sum(a) as b, '2020-01-01' as fake | from t | group by b | having b > 10;""" scala> spark.sql(query).show() +---+--+ | b| fake| +---+--+ | 2|2020-01-01| +---+--+ scala> val query = """ | select sum(a) as b, cast('2020-01-01' as date) as fake | from t | group by b | having b > 10;""" scala> spark.sql(query).show() +---++ | b|fake| +---++ +---++ {code} The SQL parser in Spark creates Filter(..., Aggregate(...)) for the HAVING query, and Spark has a special analyzer rule ResolveAggregateFunctions to resolve the aggregate functions and grouping columns in the Filter operator. It works for simple cases in a very tricky way as it relies on rule execution order: 1. Rule ResolveReferences hits the Aggregate operator and resolves attributes inside aggregate functions, but the function itself is still unresolved as it's an UnresolvedFunction. This stops resolving the Filter operator as the child Aggrege operator is still unresolved. 2. Rule ResolveFunctions resolves UnresolvedFunction. This makes the Aggrege operator resolved. 3. Rule ResolveAggregateFunctions resolves the Filter operator if its child is a resolved Aggregate. This rule can correctly resolve the grouping columns. In the example query, I put a CAST, which needs to be resolved by rule ResolveTimeZone, which runs after ResolveAggregateFunctions. This breaks step 3 as the Aggregate operator is unresolved at that time. Then the analyzer starts next round and the Filter operator is resolved by ResolveReferences, which wrongly resolves the grouping columns. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-31515) Canonicalize Cast should consider the value of needTimeZone
Yuanjian Li created SPARK-31515: --- Summary: Canonicalize Cast should consider the value of needTimeZone Key: SPARK-31515 URL: https://issues.apache.org/jira/browse/SPARK-31515 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.0.0 Environment: If we don't need to use `timeZone` information casting `from` type to `to` type, then the timeZoneId should not influence the canonicalize result. Reporter: Yuanjian Li -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-31410) Raise exception instead of silent change for new DateFormatter
Yuanjian Li created SPARK-31410: --- Summary: Raise exception instead of silent change for new DateFormatter Key: SPARK-31410 URL: https://issues.apache.org/jira/browse/SPARK-31410 Project: Spark Issue Type: Sub-task Components: SQL Affects Versions: 3.0.0 Reporter: Yuanjian Li For the cases of new TimestampFormatter return null while legacy formatter can return a value, we need to throw an exception instead of silent change. The legacy config will be referenced in the error message. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-31176) Remove support for 'e'/'c' as datetime pattern charactar
[ https://issues.apache.org/jira/browse/SPARK-31176?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuanjian Li updated SPARK-31176: Parent: SPARK-31408 Issue Type: Sub-task (was: Bug) > Remove support for 'e'/'c' as datetime pattern charactar > - > > Key: SPARK-31176 > URL: https://issues.apache.org/jira/browse/SPARK-31176 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.0.0 >Reporter: Kent Yao >Assignee: Kent Yao >Priority: Major > Fix For: 3.0.0 > > > The meaning of 'u' was day number of week in SimpleDateFormat, it was changed > to year in DateTimeFormatter. So we keep the old meaning of 'u' by > substituting 'u' to 'e' internally and use DateTimeFormatter to parse the > pattern string. In DateTimeFormatter, the 'e' and 'c' also represents > day-of-week, we should mark them as illegal pattern character to stay the > same as before. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-31131) Remove the unnecessary config spark.sql.legacy.timeParser.enabled
[ https://issues.apache.org/jira/browse/SPARK-31131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuanjian Li updated SPARK-31131: Parent: SPARK-31408 Issue Type: Sub-task (was: Bug) > Remove the unnecessary config spark.sql.legacy.timeParser.enabled > -- > > Key: SPARK-31131 > URL: https://issues.apache.org/jira/browse/SPARK-31131 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.0.0, 3.1.0 >Reporter: Kent Yao >Assignee: Kent Yao >Priority: Major > Fix For: 3.0.0 > > > spark.sql.legacy.timeParser.enabled should be removed from SQLConf and the > migration guide -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-31189) Fix errors and missing parts for datetime pattern document
[ https://issues.apache.org/jira/browse/SPARK-31189?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuanjian Li updated SPARK-31189: Parent: SPARK-31408 Issue Type: Sub-task (was: Bug) > Fix errors and missing parts for datetime pattern document > -- > > Key: SPARK-31189 > URL: https://issues.apache.org/jira/browse/SPARK-31189 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.0.0 >Reporter: Kent Yao >Assignee: Kent Yao >Priority: Major > Fix For: 3.0.0 > > > Fix errors and missing parts for datetime pattern document -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org