[jira] [Resolved] (SPARK-47920) Add documentation for python streaming data source

2024-05-22 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47920?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved SPARK-47920.
--
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 46139
[https://github.com/apache/spark/pull/46139]

> Add documentation for python streaming data source
> --
>
> Key: SPARK-47920
> URL: https://issues.apache.org/jira/browse/SPARK-47920
> Project: Spark
>  Issue Type: New Feature
>  Components: PySpark, SS
>Affects Versions: 4.0.0
>Reporter: Chaoqin Li
>Assignee: Chaoqin Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>
> Add documentation (user guide) for Python data source API.
> The DOC should explain how to develop and use DataSourceStreamReader and 
> DataSourceStreamWriter



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-48314) FileStreamSource shouldn't double cache files for availableNow

2024-05-21 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48314?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved SPARK-48314.
--
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 46627
[https://github.com/apache/spark/pull/46627]

> FileStreamSource shouldn't double cache files for availableNow
> --
>
> Key: SPARK-48314
> URL: https://issues.apache.org/jira/browse/SPARK-48314
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.5.1
>Reporter: Adam Binford
>Assignee: Adam Binford
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>
> FileStreamSource loads and saves all files at initialization for 
> Trigger.AvailableNow. However files will also be cached in unreadFiles, which 
> is a waste and causes issues identified in 
> https://issues.apache.org/jira/browse/SPARK-44924 for streams that are 
> reading more than 10k files per batch. We should always skip using the 
> unreadFiles cache when using available now trigger, as there is no need for 
> it anyway.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-48314) FileStreamSource shouldn't double cache files for availableNow

2024-05-21 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48314?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim reassigned SPARK-48314:


Assignee: Adam Binford

> FileStreamSource shouldn't double cache files for availableNow
> --
>
> Key: SPARK-48314
> URL: https://issues.apache.org/jira/browse/SPARK-48314
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.5.1
>Reporter: Adam Binford
>Assignee: Adam Binford
>Priority: Major
>  Labels: pull-request-available
>
> FileStreamSource loads and saves all files at initialization for 
> Trigger.AvailableNow. However files will also be cached in unreadFiles, which 
> is a waste and causes issues identified in 
> https://issues.apache.org/jira/browse/SPARK-44924 for streams that are 
> reading more than 10k files per batch. We should always skip using the 
> unreadFiles cache when using available now trigger, as there is no need for 
> it anyway.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-48330) Fix the python streaming data source timeout issue for large trigger interval

2024-05-20 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim reassigned SPARK-48330:


Assignee: Chaoqin Li

> Fix the python streaming data source timeout issue for large trigger interval
> -
>
> Key: SPARK-48330
> URL: https://issues.apache.org/jira/browse/SPARK-48330
> Project: Spark
>  Issue Type: Task
>  Components: PySpark, SS
>Affects Versions: 4.0.0
>Reporter: Chaoqin Li
>Assignee: Chaoqin Li
>Priority: Major
>  Labels: pull-request-available
>
> Currently we run long running python worker process for python streaming 
> source and sink to perform planning, commit and abort in driver side. Testing 
> indicate that current implementation cause connection timeout error when 
> streaming query has large trigger interval
> For python streaming source, keep the long running worker archaetecture but 
> set the socket timeout to be infinity to avoid timeout error.
> For python streaming sink, since StreamingWrite is also created per 
> microbatch in scala side, long running worker cannot be attached to s 
> StreamingWrite instance. Therefore we abandon the long running worker 
> architecture, simply call commit() or abort() and exit the worker and allow 
> spark to reuse worker for us.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-48330) Fix the python streaming data source timeout issue for large trigger interval

2024-05-20 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved SPARK-48330.
--
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 46651
[https://github.com/apache/spark/pull/46651]

> Fix the python streaming data source timeout issue for large trigger interval
> -
>
> Key: SPARK-48330
> URL: https://issues.apache.org/jira/browse/SPARK-48330
> Project: Spark
>  Issue Type: Task
>  Components: PySpark, SS
>Affects Versions: 4.0.0
>Reporter: Chaoqin Li
>Assignee: Chaoqin Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>
> Currently we run long running python worker process for python streaming 
> source and sink to perform planning, commit and abort in driver side. Testing 
> indicate that current implementation cause connection timeout error when 
> streaming query has large trigger interval
> For python streaming source, keep the long running worker archaetecture but 
> set the socket timeout to be infinity to avoid timeout error.
> For python streaming sink, since StreamingWrite is also created per 
> microbatch in scala side, long running worker cannot be attached to s 
> StreamingWrite instance. Therefore we abandon the long running worker 
> architecture, simply call commit() or abort() and exit the worker and allow 
> spark to reuse worker for us.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-44924) Add configurations for FileStreamSource cached files

2024-05-19 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim reassigned SPARK-44924:


Assignee: kevin nacios

> Add configurations for FileStreamSource cached files
> 
>
> Key: SPARK-44924
> URL: https://issues.apache.org/jira/browse/SPARK-44924
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 3.1.0
>Reporter: kevin nacios
>Assignee: kevin nacios
>Priority: Minor
>  Labels: pull-request-available
>
> With https://issues.apache.org/jira/browse/SPARK-30866, caching of listed 
> files was added for structured streaming to reduce cost of relisting from 
> filesystem each batch.  The settings that drive this are currently hardcoded 
> and there is no way to change them.  
>  
> This impacts some of our workloads where we process large datasets where its 
> unknown how "heavy" some files are, so a single batch can take a long period 
> of time.  When we set maxFilesPerTrigger to 100k files, a subsequent batch 
> using the cached max of 10k files is causing the job to take longer since the 
> cluster is capable of handling the 100k files but is stuck doing 10% of the 
> workload.  The benefit of the caching doesn't outweigh the cost of the 
> performance on the rest of the job.
>  
> With config settings available for this, we could either absorb some 
> increased driver memory usage for caching the next 100k files, or opt to 
> disable caching entirely and just relist files each batch by setting the 
> cache amount to 0.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-44924) Add configurations for FileStreamSource cached files

2024-05-19 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved SPARK-44924.
--
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 45362
[https://github.com/apache/spark/pull/45362]

> Add configurations for FileStreamSource cached files
> 
>
> Key: SPARK-44924
> URL: https://issues.apache.org/jira/browse/SPARK-44924
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 3.1.0
>Reporter: kevin nacios
>Assignee: kevin nacios
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>
> With https://issues.apache.org/jira/browse/SPARK-30866, caching of listed 
> files was added for structured streaming to reduce cost of relisting from 
> filesystem each batch.  The settings that drive this are currently hardcoded 
> and there is no way to change them.  
>  
> This impacts some of our workloads where we process large datasets where its 
> unknown how "heavy" some files are, so a single batch can take a long period 
> of time.  When we set maxFilesPerTrigger to 100k files, a subsequent batch 
> using the cached max of 10k files is causing the job to take longer since the 
> cluster is capable of handling the 100k files but is stuck doing 10% of the 
> workload.  The benefit of the caching doesn't outweigh the cost of the 
> performance on the rest of the job.
>  
> With config settings available for this, we could either absorb some 
> increased driver memory usage for caching the next 100k files, or opt to 
> disable caching entirely and just relist files each batch by setting the 
> cache amount to 0.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-48105) Fix the data corruption issue when state store unload and snapshotting happens concurrently for HDFS state store

2024-05-17 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim updated SPARK-48105:
-
Labels: correctness pull-request-available  (was: pull-request-available)

>  Fix the data corruption issue when state store unload and snapshotting 
> happens concurrently for HDFS state store
> -
>
> Key: SPARK-48105
> URL: https://issues.apache.org/jira/browse/SPARK-48105
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 4.0.0, 3.5.2, 3.4.4
>Reporter: Huanli Wang
>Assignee: Huanli Wang
>Priority: Blocker
>  Labels: correctness, pull-request-available
> Fix For: 4.0.0, 3.5.2, 3.4.4
>
>
> There are two race conditions between state store snapshotting and state 
> store unloading which could result in query failure and potential data 
> corruption.
>  
> Case 1:
>  # the maintenance thread pool encounters some issues and call the 
> [stopMaintenanceTask,|https://github.com/apache/spark/blob/d9d79a54a3cd487380039c88ebe9fa708e0dcf23/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L774]
>  this function further calls 
> [threadPool.stop.|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L587]
>  However, this function doesn't wait for the stop operation to be completed 
> and move to do the state store [unload and 
> clear.|https://github.com/apache/spark/blob/d9d79a54a3cd487380039c88ebe9fa708e0dcf23/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L775-L778]
>  # the provider unload will [close the state 
> store|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L719-L721]
>  which [clear the values of 
> loadedMaps|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L353-L355]
>  for HDFS backed state store.
>  # if the not-yet-stop maintenance thread is still running and trying to do 
> the snapshot, but the data in the underlying `HDFSBackedStateStoreMap` has 
> been removed. if this snapshot process completes successfully, then we will 
> write corrupted data and the following batches will consume this corrupted 
> data.
> Case 2:
>  # In executor_1, the maintenance thread is going to do the snapshot for 
> state_store_1, it retrieves the `HDFSBackedStateStoreMap` object from the 
> loadedMaps, after this, the maintenance thread [releases the lock of the 
> loadedMaps|https://github.com/apache/spark/blob/c6696cdcd611a682ebf5b7a183e2970ecea3b58c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L750-L751].
>  # state_store_1 is loaded in another executor, e.g. executor_2.
>  # another state store, state_store_2, is loaded on executor_1 and 
> [reportActiveStoreInstance|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L854-L871]
>  to driver.
>  # executor_1 does the 
> [unload|https://github.com/apache/spark/blob/c6696cdcd611a682ebf5b7a183e2970ecea3b58c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L713]
>  for those no longer active state store which clears the data entries in the 
> `HDFSBackedStateStoreMap`
>  # the snapshotting thread is terminated and uploads the incomplete snapshot 
> to cloud because the [iterator doesn't have next 
> element|https://github.com/apache/spark/blob/c6696cdcd611a682ebf5b7a183e2970ecea3b58c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L634]
>  after doing the clear.
>  # future batches are consuming the corrupted data.
>  
> Proposed fix:
>  * When we close the hdfs state store, we should only remove the entry from 
> `loadedMaps` rather than doing the active data cleanup. JVM GC should be able 
> to help us GC those objects.
>  * we should wait for the maintenance thread to stop before unloading the 
> providers. 
>  
> Thanks [~anishshri-db] for helping debug this issue!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-48105) Fix the data corruption issue when state store unload and snapshotting happens concurrently for HDFS state store

2024-05-17 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim updated SPARK-48105:
-
Issue Type: Bug  (was: Improvement)

>  Fix the data corruption issue when state store unload and snapshotting 
> happens concurrently for HDFS state store
> -
>
> Key: SPARK-48105
> URL: https://issues.apache.org/jira/browse/SPARK-48105
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 4.0.0, 3.5.2, 3.4.4
>Reporter: Huanli Wang
>Assignee: Huanli Wang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 4.0.0, 3.5.2, 3.4.4
>
>
> There are two race conditions between state store snapshotting and state 
> store unloading which could result in query failure and potential data 
> corruption.
>  
> Case 1:
>  # the maintenance thread pool encounters some issues and call the 
> [stopMaintenanceTask,|https://github.com/apache/spark/blob/d9d79a54a3cd487380039c88ebe9fa708e0dcf23/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L774]
>  this function further calls 
> [threadPool.stop.|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L587]
>  However, this function doesn't wait for the stop operation to be completed 
> and move to do the state store [unload and 
> clear.|https://github.com/apache/spark/blob/d9d79a54a3cd487380039c88ebe9fa708e0dcf23/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L775-L778]
>  # the provider unload will [close the state 
> store|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L719-L721]
>  which [clear the values of 
> loadedMaps|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L353-L355]
>  for HDFS backed state store.
>  # if the not-yet-stop maintenance thread is still running and trying to do 
> the snapshot, but the data in the underlying `HDFSBackedStateStoreMap` has 
> been removed. if this snapshot process completes successfully, then we will 
> write corrupted data and the following batches will consume this corrupted 
> data.
> Case 2:
>  # In executor_1, the maintenance thread is going to do the snapshot for 
> state_store_1, it retrieves the `HDFSBackedStateStoreMap` object from the 
> loadedMaps, after this, the maintenance thread [releases the lock of the 
> loadedMaps|https://github.com/apache/spark/blob/c6696cdcd611a682ebf5b7a183e2970ecea3b58c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L750-L751].
>  # state_store_1 is loaded in another executor, e.g. executor_2.
>  # another state store, state_store_2, is loaded on executor_1 and 
> [reportActiveStoreInstance|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L854-L871]
>  to driver.
>  # executor_1 does the 
> [unload|https://github.com/apache/spark/blob/c6696cdcd611a682ebf5b7a183e2970ecea3b58c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L713]
>  for those no longer active state store which clears the data entries in the 
> `HDFSBackedStateStoreMap`
>  # the snapshotting thread is terminated and uploads the incomplete snapshot 
> to cloud because the [iterator doesn't have next 
> element|https://github.com/apache/spark/blob/c6696cdcd611a682ebf5b7a183e2970ecea3b58c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L634]
>  after doing the clear.
>  # future batches are consuming the corrupted data.
>  
> Proposed fix:
>  * When we close the hdfs state store, we should only remove the entry from 
> `loadedMaps` rather than doing the active data cleanup. JVM GC should be able 
> to help us GC those objects.
>  * we should wait for the maintenance thread to stop before unloading the 
> providers. 
>  
> Thanks [~anishshri-db] for helping debug this issue!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-48105) Fix the data corruption issue when state store unload and snapshotting happens concurrently for HDFS state store

2024-05-17 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim updated SPARK-48105:
-
Priority: Blocker  (was: Major)

>  Fix the data corruption issue when state store unload and snapshotting 
> happens concurrently for HDFS state store
> -
>
> Key: SPARK-48105
> URL: https://issues.apache.org/jira/browse/SPARK-48105
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 4.0.0, 3.5.2, 3.4.4
>Reporter: Huanli Wang
>Assignee: Huanli Wang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 4.0.0, 3.5.2, 3.4.4
>
>
> There are two race conditions between state store snapshotting and state 
> store unloading which could result in query failure and potential data 
> corruption.
>  
> Case 1:
>  # the maintenance thread pool encounters some issues and call the 
> [stopMaintenanceTask,|https://github.com/apache/spark/blob/d9d79a54a3cd487380039c88ebe9fa708e0dcf23/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L774]
>  this function further calls 
> [threadPool.stop.|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L587]
>  However, this function doesn't wait for the stop operation to be completed 
> and move to do the state store [unload and 
> clear.|https://github.com/apache/spark/blob/d9d79a54a3cd487380039c88ebe9fa708e0dcf23/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L775-L778]
>  # the provider unload will [close the state 
> store|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L719-L721]
>  which [clear the values of 
> loadedMaps|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L353-L355]
>  for HDFS backed state store.
>  # if the not-yet-stop maintenance thread is still running and trying to do 
> the snapshot, but the data in the underlying `HDFSBackedStateStoreMap` has 
> been removed. if this snapshot process completes successfully, then we will 
> write corrupted data and the following batches will consume this corrupted 
> data.
> Case 2:
>  # In executor_1, the maintenance thread is going to do the snapshot for 
> state_store_1, it retrieves the `HDFSBackedStateStoreMap` object from the 
> loadedMaps, after this, the maintenance thread [releases the lock of the 
> loadedMaps|https://github.com/apache/spark/blob/c6696cdcd611a682ebf5b7a183e2970ecea3b58c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L750-L751].
>  # state_store_1 is loaded in another executor, e.g. executor_2.
>  # another state store, state_store_2, is loaded on executor_1 and 
> [reportActiveStoreInstance|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L854-L871]
>  to driver.
>  # executor_1 does the 
> [unload|https://github.com/apache/spark/blob/c6696cdcd611a682ebf5b7a183e2970ecea3b58c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L713]
>  for those no longer active state store which clears the data entries in the 
> `HDFSBackedStateStoreMap`
>  # the snapshotting thread is terminated and uploads the incomplete snapshot 
> to cloud because the [iterator doesn't have next 
> element|https://github.com/apache/spark/blob/c6696cdcd611a682ebf5b7a183e2970ecea3b58c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L634]
>  after doing the clear.
>  # future batches are consuming the corrupted data.
>  
> Proposed fix:
>  * When we close the hdfs state store, we should only remove the entry from 
> `loadedMaps` rather than doing the active data cleanup. JVM GC should be able 
> to help us GC those objects.
>  * we should wait for the maintenance thread to stop before unloading the 
> providers. 
>  
> Thanks [~anishshri-db] for helping debug this issue!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-48105) Fix the data corruption issue when state store unload and snapshotting happens concurrently for HDFS state store

2024-05-17 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim updated SPARK-48105:
-
Fix Version/s: 3.4.4

>  Fix the data corruption issue when state store unload and snapshotting 
> happens concurrently for HDFS state store
> -
>
> Key: SPARK-48105
> URL: https://issues.apache.org/jira/browse/SPARK-48105
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 4.0.0, 3.5.2, 3.4.4
>Reporter: Huanli Wang
>Assignee: Huanli Wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0, 3.5.2, 3.4.4
>
>
> There are two race conditions between state store snapshotting and state 
> store unloading which could result in query failure and potential data 
> corruption.
>  
> Case 1:
>  # the maintenance thread pool encounters some issues and call the 
> [stopMaintenanceTask,|https://github.com/apache/spark/blob/d9d79a54a3cd487380039c88ebe9fa708e0dcf23/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L774]
>  this function further calls 
> [threadPool.stop.|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L587]
>  However, this function doesn't wait for the stop operation to be completed 
> and move to do the state store [unload and 
> clear.|https://github.com/apache/spark/blob/d9d79a54a3cd487380039c88ebe9fa708e0dcf23/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L775-L778]
>  # the provider unload will [close the state 
> store|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L719-L721]
>  which [clear the values of 
> loadedMaps|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L353-L355]
>  for HDFS backed state store.
>  # if the not-yet-stop maintenance thread is still running and trying to do 
> the snapshot, but the data in the underlying `HDFSBackedStateStoreMap` has 
> been removed. if this snapshot process completes successfully, then we will 
> write corrupted data and the following batches will consume this corrupted 
> data.
> Case 2:
>  # In executor_1, the maintenance thread is going to do the snapshot for 
> state_store_1, it retrieves the `HDFSBackedStateStoreMap` object from the 
> loadedMaps, after this, the maintenance thread [releases the lock of the 
> loadedMaps|https://github.com/apache/spark/blob/c6696cdcd611a682ebf5b7a183e2970ecea3b58c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L750-L751].
>  # state_store_1 is loaded in another executor, e.g. executor_2.
>  # another state store, state_store_2, is loaded on executor_1 and 
> [reportActiveStoreInstance|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L854-L871]
>  to driver.
>  # executor_1 does the 
> [unload|https://github.com/apache/spark/blob/c6696cdcd611a682ebf5b7a183e2970ecea3b58c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L713]
>  for those no longer active state store which clears the data entries in the 
> `HDFSBackedStateStoreMap`
>  # the snapshotting thread is terminated and uploads the incomplete snapshot 
> to cloud because the [iterator doesn't have next 
> element|https://github.com/apache/spark/blob/c6696cdcd611a682ebf5b7a183e2970ecea3b58c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L634]
>  after doing the clear.
>  # future batches are consuming the corrupted data.
>  
> Proposed fix:
>  * When we close the hdfs state store, we should only remove the entry from 
> `loadedMaps` rather than doing the active data cleanup. JVM GC should be able 
> to help us GC those objects.
>  * we should wait for the maintenance thread to stop before unloading the 
> providers. 
>  
> Thanks [~anishshri-db] for helping debug this issue!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-48293) Add test for when ForeachBatchUserFuncException wraps interrupted exception due to query stop

2024-05-16 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48293?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved SPARK-48293.
--
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 46601
[https://github.com/apache/spark/pull/46601]

> Add test for when ForeachBatchUserFuncException wraps interrupted exception 
> due to query stop
> -
>
> Key: SPARK-48293
> URL: https://issues.apache.org/jira/browse/SPARK-48293
> Project: Spark
>  Issue Type: Task
>  Components: Structured Streaming
>Affects Versions: 4.0.0
>Reporter: B. Micheal Okutubo
>Assignee: B. Micheal Okutubo
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>
> Add test for when ForeachBatchUserFuncException wraps interrupted exception 
> due to query stop



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-48105) Fix the data corruption issue when state store unload and snapshotting happens concurrently for HDFS state store

2024-05-14 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim updated SPARK-48105:
-
Fix Version/s: 3.5.2

>  Fix the data corruption issue when state store unload and snapshotting 
> happens concurrently for HDFS state store
> -
>
> Key: SPARK-48105
> URL: https://issues.apache.org/jira/browse/SPARK-48105
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 4.0.0, 3.5.2, 3.4.4
>Reporter: Huanli Wang
>Assignee: Huanli Wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0, 3.5.2
>
>
> There are two race conditions between state store snapshotting and state 
> store unloading which could result in query failure and potential data 
> corruption.
>  
> Case 1:
>  # the maintenance thread pool encounters some issues and call the 
> [stopMaintenanceTask,|https://github.com/apache/spark/blob/d9d79a54a3cd487380039c88ebe9fa708e0dcf23/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L774]
>  this function further calls 
> [threadPool.stop.|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L587]
>  However, this function doesn't wait for the stop operation to be completed 
> and move to do the state store [unload and 
> clear.|https://github.com/apache/spark/blob/d9d79a54a3cd487380039c88ebe9fa708e0dcf23/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L775-L778]
>  # the provider unload will [close the state 
> store|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L719-L721]
>  which [clear the values of 
> loadedMaps|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L353-L355]
>  for HDFS backed state store.
>  # if the not-yet-stop maintenance thread is still running and trying to do 
> the snapshot, but the data in the underlying `HDFSBackedStateStoreMap` has 
> been removed. if this snapshot process completes successfully, then we will 
> write corrupted data and the following batches will consume this corrupted 
> data.
> Case 2:
>  # In executor_1, the maintenance thread is going to do the snapshot for 
> state_store_1, it retrieves the `HDFSBackedStateStoreMap` object from the 
> loadedMaps, after this, the maintenance thread [releases the lock of the 
> loadedMaps|https://github.com/apache/spark/blob/c6696cdcd611a682ebf5b7a183e2970ecea3b58c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L750-L751].
>  # state_store_1 is loaded in another executor, e.g. executor_2.
>  # another state store, state_store_2, is loaded on executor_1 and 
> [reportActiveStoreInstance|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L854-L871]
>  to driver.
>  # executor_1 does the 
> [unload|https://github.com/apache/spark/blob/c6696cdcd611a682ebf5b7a183e2970ecea3b58c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L713]
>  for those no longer active state store which clears the data entries in the 
> `HDFSBackedStateStoreMap`
>  # the snapshotting thread is terminated and uploads the incomplete snapshot 
> to cloud because the [iterator doesn't have next 
> element|https://github.com/apache/spark/blob/c6696cdcd611a682ebf5b7a183e2970ecea3b58c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L634]
>  after doing the clear.
>  # future batches are consuming the corrupted data.
>  
> Proposed fix:
>  * When we close the hdfs state store, we should only remove the entry from 
> `loadedMaps` rather than doing the active data cleanup. JVM GC should be able 
> to help us GC those objects.
>  * we should wait for the maintenance thread to stop before unloading the 
> providers. 
>  
> Thanks [~anishshri-db] for helping debug this issue!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-48233) Tests for non-stateful streaming with collations

2024-05-14 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved SPARK-48233.
--
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 46247
[https://github.com/apache/spark/pull/46247]

> Tests for non-stateful streaming with collations
> 
>
> Key: SPARK-48233
> URL: https://issues.apache.org/jira/browse/SPARK-48233
> Project: Spark
>  Issue Type: Task
>  Components: Spark Core
>Affects Versions: 4.0.0
>Reporter: Aleksandar Tomic
>Assignee: Aleksandar Tomic
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-48267) Regression e2e test with SPARK-47305

2024-05-14 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim updated SPARK-48267:
-
Fix Version/s: 3.5.2

> Regression e2e test with SPARK-47305
> 
>
> Key: SPARK-48267
> URL: https://issues.apache.org/jira/browse/SPARK-48267
> Project: Spark
>  Issue Type: Test
>  Components: Structured Streaming
>Affects Versions: 4.0.0
>Reporter: Jungtaek Lim
>Assignee: Jungtaek Lim
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0, 3.5.2
>
>
> In SPARK-47305, we fixed a bug in QO rule and added a test to demonstrate the 
> issue and verify the fix works, but the scope of test was just to bugfix 
> itself because we had no clear idea of (simpler) reproducer for e2e example.
> We finally came up with simple reproducer which is e2e streaming query. We'd 
> like to put this reproducer into test as regression test.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-48267) Regression e2e test with SPARK-47305

2024-05-14 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved SPARK-48267.
--
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 46569
[https://github.com/apache/spark/pull/46569]

> Regression e2e test with SPARK-47305
> 
>
> Key: SPARK-48267
> URL: https://issues.apache.org/jira/browse/SPARK-48267
> Project: Spark
>  Issue Type: Test
>  Components: Structured Streaming
>Affects Versions: 4.0.0
>Reporter: Jungtaek Lim
>Assignee: Jungtaek Lim
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>
> In SPARK-47305, we fixed a bug in QO rule and added a test to demonstrate the 
> issue and verify the fix works, but the scope of test was just to bugfix 
> itself because we had no clear idea of (simpler) reproducer for e2e example.
> We finally came up with simple reproducer which is e2e streaming query. We'd 
> like to put this reproducer into test as regression test.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-48267) Regression e2e test with SPARK-47305

2024-05-14 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim reassigned SPARK-48267:


Assignee: Jungtaek Lim

> Regression e2e test with SPARK-47305
> 
>
> Key: SPARK-48267
> URL: https://issues.apache.org/jira/browse/SPARK-48267
> Project: Spark
>  Issue Type: Test
>  Components: Structured Streaming
>Affects Versions: 4.0.0
>Reporter: Jungtaek Lim
>Assignee: Jungtaek Lim
>Priority: Major
>  Labels: pull-request-available
>
> In SPARK-47305, we fixed a bug in QO rule and added a test to demonstrate the 
> issue and verify the fix works, but the scope of test was just to bugfix 
> itself because we had no clear idea of (simpler) reproducer for e2e example.
> We finally came up with simple reproducer which is e2e streaming query. We'd 
> like to put this reproducer into test as regression test.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-48267) Regression e2e test with SPARK-47305

2024-05-13 Thread Jungtaek Lim (Jira)
Jungtaek Lim created SPARK-48267:


 Summary: Regression e2e test with SPARK-47305
 Key: SPARK-48267
 URL: https://issues.apache.org/jira/browse/SPARK-48267
 Project: Spark
  Issue Type: Test
  Components: Structured Streaming
Affects Versions: 4.0.0
Reporter: Jungtaek Lim


In SPARK-47305, we fixed a bug in QO rule and added a test to demonstrate the 
issue and verify the fix works, but the scope of test was just to bugfix itself 
because we had no clear idea of (simpler) reproducer for e2e example.

We finally came up with simple reproducer which is e2e streaming query. We'd 
like to put this reproducer into test as regression test.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-48208) Skip reporting memory usage metrics if bounded memory usage is enabled

2024-05-09 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48208?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim reassigned SPARK-48208:


Assignee: Anish Shrigondekar

> Skip reporting memory usage metrics if bounded memory usage is enabled
> --
>
> Key: SPARK-48208
> URL: https://issues.apache.org/jira/browse/SPARK-48208
> Project: Spark
>  Issue Type: Task
>  Components: Structured Streaming
>Affects Versions: 4.0.0
>Reporter: Anish Shrigondekar
>Assignee: Anish Shrigondekar
>Priority: Major
>  Labels: pull-request-available
>
> Skip reporting memory usage metrics if bounded memory usage is enabled



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-48208) Skip reporting memory usage metrics if bounded memory usage is enabled

2024-05-09 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48208?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved SPARK-48208.
--
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 46491
[https://github.com/apache/spark/pull/46491]

> Skip reporting memory usage metrics if bounded memory usage is enabled
> --
>
> Key: SPARK-48208
> URL: https://issues.apache.org/jira/browse/SPARK-48208
> Project: Spark
>  Issue Type: Task
>  Components: Structured Streaming
>Affects Versions: 4.0.0
>Reporter: Anish Shrigondekar
>Assignee: Anish Shrigondekar
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>
> Skip reporting memory usage metrics if bounded memory usage is enabled



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-47960) Support Chaining Stateful Operators in TransformWithState

2024-05-07 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47960?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim reassigned SPARK-47960:


Assignee: Bhuwan Sahni

> Support Chaining Stateful Operators in TransformWithState
> -
>
> Key: SPARK-47960
> URL: https://issues.apache.org/jira/browse/SPARK-47960
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Affects Versions: 4.0.0
>Reporter: Bhuwan Sahni
>Assignee: Bhuwan Sahni
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>
> This issue tracks adding support to chain stateful operators after the 
> Arbitrary State API, transformWithState. In order to support chaining, we 
> need to allow the user to specify the new eventTimeColumn in the output from 
> StatefulProcessor. Any watermark evaluation expressions downstream after 
> transformWithState would use the user specified eventTimeColumn.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-47960) Support Chaining Stateful Operators in TransformWithState

2024-05-07 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47960?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved SPARK-47960.
--
Resolution: Fixed

Issue resolved by pull request 45376
[https://github.com/apache/spark/pull/45376]

> Support Chaining Stateful Operators in TransformWithState
> -
>
> Key: SPARK-47960
> URL: https://issues.apache.org/jira/browse/SPARK-47960
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Affects Versions: 4.0.0
>Reporter: Bhuwan Sahni
>Assignee: Bhuwan Sahni
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>
> This issue tracks adding support to chain stateful operators after the 
> Arbitrary State API, transformWithState. In order to support chaining, we 
> need to allow the user to specify the new eventTimeColumn in the output from 
> StatefulProcessor. Any watermark evaluation expressions downstream after 
> transformWithState would use the user specified eventTimeColumn.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-48105) Fix the data corruption issue when state store unload and snapshotting happens concurrently for HDFS state store

2024-05-06 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim updated SPARK-48105:
-
Affects Version/s: 3.5.2
   3.4.4

>  Fix the data corruption issue when state store unload and snapshotting 
> happens concurrently for HDFS state store
> -
>
> Key: SPARK-48105
> URL: https://issues.apache.org/jira/browse/SPARK-48105
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 4.0.0, 3.5.2, 3.4.4
>Reporter: Huanli Wang
>Priority: Major
>  Labels: pull-request-available
>
> There are two race conditions between state store snapshotting and state 
> store unloading which could result in query failure and potential data 
> corruption.
>  
> Case 1:
>  # the maintenance thread pool encounters some issues and call the 
> [stopMaintenanceTask,|https://github.com/apache/spark/blob/d9d79a54a3cd487380039c88ebe9fa708e0dcf23/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L774]
>  this function further calls 
> [threadPool.stop.|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L587]
>  However, this function doesn't wait for the stop operation to be completed 
> and move to do the state store [unload and 
> clear.|https://github.com/apache/spark/blob/d9d79a54a3cd487380039c88ebe9fa708e0dcf23/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L775-L778]
>  # the provider unload will [close the state 
> store|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L719-L721]
>  which [clear the values of 
> loadedMaps|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L353-L355]
>  for HDFS backed state store.
>  # if the not-yet-stop maintenance thread is still running and trying to do 
> the snapshot, but the data in the underlying `HDFSBackedStateStoreMap` has 
> been removed. if this snapshot process completes successfully, then we will 
> write corrupted data and the following batches will consume this corrupted 
> data.
> Case 2:
>  # In executor_1, the maintenance thread is going to do the snapshot for 
> state_store_1, it retrieves the `HDFSBackedStateStoreMap` object from the 
> loadedMaps, after this, the maintenance thread [releases the lock of the 
> loadedMaps|https://github.com/apache/spark/blob/c6696cdcd611a682ebf5b7a183e2970ecea3b58c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L750-L751].
>  # state_store_1 is loaded in another executor, e.g. executor_2.
>  # another state store, state_store_2, is loaded on executor_1 and 
> [reportActiveStoreInstance|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L854-L871]
>  to driver.
>  # executor_1 does the 
> [unload|https://github.com/apache/spark/blob/c6696cdcd611a682ebf5b7a183e2970ecea3b58c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L713]
>  for those no longer active state store which clears the data entries in the 
> `HDFSBackedStateStoreMap`
>  # the snapshotting thread is terminated and uploads the incomplete snapshot 
> to cloud because the [iterator doesn't have next 
> element|https://github.com/apache/spark/blob/c6696cdcd611a682ebf5b7a183e2970ecea3b58c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L634]
>  after doing the clear.
>  # future batches are consuming the corrupted data.
>  
> Proposed fix:
>  * When we close the hdfs state store, we should only remove the entry from 
> `loadedMaps` rather than doing the active data cleanup. JVM GC should be able 
> to help us GC those objects.
>  * we should wait for the maintenance thread to stop before unloading the 
> providers. 
>  
> Thanks [~anishshri-db] for helping debug this issue!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-48105) Fix the data corruption issue when state store unload and snapshotting happens concurrently for HDFS state store

2024-05-06 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved SPARK-48105.
--
Fix Version/s: 4.0.0
 Assignee: Huanli Wang
   Resolution: Fixed

Issue resolved via https://github.com/apache/spark/pull/46351

>  Fix the data corruption issue when state store unload and snapshotting 
> happens concurrently for HDFS state store
> -
>
> Key: SPARK-48105
> URL: https://issues.apache.org/jira/browse/SPARK-48105
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 4.0.0, 3.5.2, 3.4.4
>Reporter: Huanli Wang
>Assignee: Huanli Wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>
> There are two race conditions between state store snapshotting and state 
> store unloading which could result in query failure and potential data 
> corruption.
>  
> Case 1:
>  # the maintenance thread pool encounters some issues and call the 
> [stopMaintenanceTask,|https://github.com/apache/spark/blob/d9d79a54a3cd487380039c88ebe9fa708e0dcf23/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L774]
>  this function further calls 
> [threadPool.stop.|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L587]
>  However, this function doesn't wait for the stop operation to be completed 
> and move to do the state store [unload and 
> clear.|https://github.com/apache/spark/blob/d9d79a54a3cd487380039c88ebe9fa708e0dcf23/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L775-L778]
>  # the provider unload will [close the state 
> store|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L719-L721]
>  which [clear the values of 
> loadedMaps|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L353-L355]
>  for HDFS backed state store.
>  # if the not-yet-stop maintenance thread is still running and trying to do 
> the snapshot, but the data in the underlying `HDFSBackedStateStoreMap` has 
> been removed. if this snapshot process completes successfully, then we will 
> write corrupted data and the following batches will consume this corrupted 
> data.
> Case 2:
>  # In executor_1, the maintenance thread is going to do the snapshot for 
> state_store_1, it retrieves the `HDFSBackedStateStoreMap` object from the 
> loadedMaps, after this, the maintenance thread [releases the lock of the 
> loadedMaps|https://github.com/apache/spark/blob/c6696cdcd611a682ebf5b7a183e2970ecea3b58c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L750-L751].
>  # state_store_1 is loaded in another executor, e.g. executor_2.
>  # another state store, state_store_2, is loaded on executor_1 and 
> [reportActiveStoreInstance|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L854-L871]
>  to driver.
>  # executor_1 does the 
> [unload|https://github.com/apache/spark/blob/c6696cdcd611a682ebf5b7a183e2970ecea3b58c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L713]
>  for those no longer active state store which clears the data entries in the 
> `HDFSBackedStateStoreMap`
>  # the snapshotting thread is terminated and uploads the incomplete snapshot 
> to cloud because the [iterator doesn't have next 
> element|https://github.com/apache/spark/blob/c6696cdcd611a682ebf5b7a183e2970ecea3b58c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L634]
>  after doing the clear.
>  # future batches are consuming the corrupted data.
>  
> Proposed fix:
>  * When we close the hdfs state store, we should only remove the entry from 
> `loadedMaps` rather than doing the active data cleanup. JVM GC should be able 
> to help us GC those objects.
>  * we should wait for the maintenance thread to stop before unloading the 
> providers. 
>  
> Thanks [~anishshri-db] for helping debug this issue!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-48102) Track time to acquire source progress metrics for streaming triggers

2024-05-02 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48102?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved SPARK-48102.
--
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 46350
[https://github.com/apache/spark/pull/46350]

> Track time to acquire source progress metrics for streaming triggers
> 
>
> Key: SPARK-48102
> URL: https://issues.apache.org/jira/browse/SPARK-48102
> Project: Spark
>  Issue Type: Task
>  Components: Structured Streaming
>Affects Versions: 4.0.0
>Reporter: Anish Shrigondekar
>Assignee: Anish Shrigondekar
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>
> Track time to acquire source progress metrics for streaming triggers



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-48102) Track time to acquire source progress metrics for streaming triggers

2024-05-02 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48102?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim reassigned SPARK-48102:


Assignee: Anish Shrigondekar

> Track time to acquire source progress metrics for streaming triggers
> 
>
> Key: SPARK-48102
> URL: https://issues.apache.org/jira/browse/SPARK-48102
> Project: Spark
>  Issue Type: Task
>  Components: Structured Streaming
>Affects Versions: 4.0.0
>Reporter: Anish Shrigondekar
>Assignee: Anish Shrigondekar
>Priority: Major
>  Labels: pull-request-available
>
> Track time to acquire source progress metrics for streaming triggers



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-47920) Add documentation for python streaming data source

2024-05-02 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47920?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim updated SPARK-47920:
-
Affects Version/s: 4.0.0
   (was: 3.5.1)

> Add documentation for python streaming data source
> --
>
> Key: SPARK-47920
> URL: https://issues.apache.org/jira/browse/SPARK-47920
> Project: Spark
>  Issue Type: New Feature
>  Components: PySpark, SS
>Affects Versions: 4.0.0
>Reporter: Chaoqin Li
>Priority: Major
>  Labels: pull-request-available
>
> Add documentation (user guide) for Python data source API.
> The DOC should explain how to develop and use DataSourceStreamReader and 
> DataSourceStreamWriter



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-48073) StateStore schema incompatibility between 3.2 and 3.4

2024-05-01 Thread Jungtaek Lim (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-48073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842785#comment-17842785
 ] 

Jungtaek Lim commented on SPARK-48073:
--

I roughly remember that Encoder.bean() had changed to address some issue; 
probably generated model class of avro or protobuf or so. Looks like we are 
affected by side effect.

While I agree this is a behavioral change (and maybe a breaking change) I'm 
less sure what we could do. It's not something stateful operator can handle - 
it's just a change of input. Do you want to propose a new SQL conf to fall back 
the behavior of Encoder.bean() to 3.2?

> StateStore schema incompatibility between 3.2 and 3.4
> -
>
> Key: SPARK-48073
> URL: https://issues.apache.org/jira/browse/SPARK-48073
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 3.2.4
>Reporter: L. C. Hsieh
>Priority: Major
>
> One our customer encountered some schema incompatibility problems when 
> upgrading from Spark 3.2 to 3.4 with structured streaming application.
> It seems in 3.4 `Encoders.bean()` includes properties with only getter with 
> or without setter, whereas in 3.2, only properties with both getter and 
> setter are included.
> For example, here are schemas for an AtomicLong property/field generated by 
> each version:
> 3.2: 
> StructType(StructField(opaque,LongType,true),StructField(plain,LongType,true))
> 3.4: 
> StructType(StructField(acquire,LongType,false),StructField(andDecrement,LongType,false),StructField(andIncrement,LongType,false),StructField(opaque,LongType,false),StructField(plain,LongType,false))
> Note that the null ability flag also changes.
> Primitive long schema has nullable=true in 3.2, but false in 3.4.
> I am not sure if the issue is aware by the community before, and if there is 
> workaround for that?
> Thanks.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-47793) Implement SimpleDataSourceStreamReader for python streaming data source

2024-04-30 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47793?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved SPARK-47793.
--
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 45977
[https://github.com/apache/spark/pull/45977]

> Implement SimpleDataSourceStreamReader for python streaming data source
> ---
>
> Key: SPARK-47793
> URL: https://issues.apache.org/jira/browse/SPARK-47793
> Project: Spark
>  Issue Type: New Feature
>  Components: PySpark, SS
>Affects Versions: 3.5.1
>Reporter: Chaoqin Li
>Assignee: Chaoqin Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>
>  SimpleDataSourceStreamReader is a simplified version of the DataStreamReader 
> interface.
>  # It doesn’t require developers to reason about data partitioning.
>  # It doesn’t require getting the latest offset before reading data.
> There are 3 functions that needs to be defined 
> 1. Read data and return the end offset.
> _def read(self, start: Offset) -> (Iterator[Tuple], Offset)_
> 2. Read data between start and end offset, this is required for exactly once 
> read.
> _def read2(self, start: Offset, end: Offset) -> Iterator[Tuple]_
> 3. initial start offset of the streaming query.
> def initialOffset() -> dict
> Implementation: Wrap the SimpleDataSourceStreamReader instance in a 
> DataSourceStreamReader internally and make the prefetching and caching 
> transparent to the data source developer. The record prefetched in python 
> process will be sent to JVM as arrow record batches.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-47793) Implement SimpleDataSourceStreamReader for python streaming data source

2024-04-30 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47793?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim reassigned SPARK-47793:


Assignee: Chaoqin Li

> Implement SimpleDataSourceStreamReader for python streaming data source
> ---
>
> Key: SPARK-47793
> URL: https://issues.apache.org/jira/browse/SPARK-47793
> Project: Spark
>  Issue Type: New Feature
>  Components: PySpark, SS
>Affects Versions: 3.5.1
>Reporter: Chaoqin Li
>Assignee: Chaoqin Li
>Priority: Major
>  Labels: pull-request-available
>
>  SimpleDataSourceStreamReader is a simplified version of the DataStreamReader 
> interface.
>  # It doesn’t require developers to reason about data partitioning.
>  # It doesn’t require getting the latest offset before reading data.
> There are 3 functions that needs to be defined 
> 1. Read data and return the end offset.
> _def read(self, start: Offset) -> (Iterator[Tuple], Offset)_
> 2. Read data between start and end offset, this is required for exactly once 
> read.
> _def read2(self, start: Offset, end: Offset) -> Iterator[Tuple]_
> 3. initial start offset of the streaming query.
> def initialOffset() -> dict
> Implementation: Wrap the SimpleDataSourceStreamReader instance in a 
> DataSourceStreamReader internally and make the prefetching and caching 
> transparent to the data source developer. The record prefetched in python 
> process will be sent to JVM as arrow record batches.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-48050) Log logical plan at query start

2024-04-30 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim reassigned SPARK-48050:


Assignee: Fanyue Xia

> Log logical plan at query start
> ---
>
> Key: SPARK-48050
> URL: https://issues.apache.org/jira/browse/SPARK-48050
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 3.5.0, 3.5.1
>Reporter: Fanyue Xia
>Assignee: Fanyue Xia
>Priority: Major
>  Labels: pull-request-available
>
> We should log the logical plan of queries at query start. Have the logical 
> plan in the logs will help us determine whether logical plans have changed 
> between query runs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-48050) Log logical plan at query start

2024-04-30 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved SPARK-48050.
--
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 46292
[https://github.com/apache/spark/pull/46292]

> Log logical plan at query start
> ---
>
> Key: SPARK-48050
> URL: https://issues.apache.org/jira/browse/SPARK-48050
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 3.5.0, 3.5.1
>Reporter: Fanyue Xia
>Assignee: Fanyue Xia
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>
> We should log the logical plan of queries at query start. Have the logical 
> plan in the logs will help us determine whether logical plans have changed 
> between query runs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-48018) Null groupId causing missing param error when throwing KafkaException.couldNotReadOffsetRange

2024-04-26 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48018?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim reassigned SPARK-48018:


Assignee: B. Micheal Okutubo

> Null groupId causing missing param error when throwing 
> KafkaException.couldNotReadOffsetRange
> -
>
> Key: SPARK-48018
> URL: https://issues.apache.org/jira/browse/SPARK-48018
> Project: Spark
>  Issue Type: Task
>  Components: Structured Streaming
>Affects Versions: 4.0.0
>Reporter: B. Micheal Okutubo
>Assignee: B. Micheal Okutubo
>Priority: Major
>  Labels: pull-request-available
>
> [INTERNAL_ERROR] Undefined error message parameter for error class: 
> 'KAFKA_DATA_LOSS.COULD_NOT_READ_OFFSET_RANGE'
> when groupId is null when we are about to throw 
> KafkaException.couldNotReadOffsetRange error.
> The error framework requires all params to be non-null.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-48018) Null groupId causing missing param error when throwing KafkaException.couldNotReadOffsetRange

2024-04-26 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48018?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved SPARK-48018.
--
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 46253
[https://github.com/apache/spark/pull/46253]

> Null groupId causing missing param error when throwing 
> KafkaException.couldNotReadOffsetRange
> -
>
> Key: SPARK-48018
> URL: https://issues.apache.org/jira/browse/SPARK-48018
> Project: Spark
>  Issue Type: Task
>  Components: Structured Streaming
>Affects Versions: 4.0.0
>Reporter: B. Micheal Okutubo
>Assignee: B. Micheal Okutubo
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>
> [INTERNAL_ERROR] Undefined error message parameter for error class: 
> 'KAFKA_DATA_LOSS.COULD_NOT_READ_OFFSET_RANGE'
> when groupId is null when we are about to throw 
> KafkaException.couldNotReadOffsetRange error.
> The error framework requires all params to be non-null.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-47805) [Arbitrary State Support] State TTL support - MapState

2024-04-23 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47805?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved SPARK-47805.
--
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 45991
[https://github.com/apache/spark/pull/45991]

> [Arbitrary State Support] State TTL support - MapState
> --
>
> Key: SPARK-47805
> URL: https://issues.apache.org/jira/browse/SPARK-47805
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 4.0.0
>Reporter: Eric Marnadi
>Assignee: Eric Marnadi
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>
> Add support for expiring state value based on ttl for Map State in 
> transformWithState operator.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-47840) Remove foldable propagation across Streaming Aggregate/Join nodes

2024-04-15 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim reassigned SPARK-47840:


Assignee: Bhuwan Sahni

> Remove foldable propagation across Streaming Aggregate/Join nodes
> -
>
> Key: SPARK-47840
> URL: https://issues.apache.org/jira/browse/SPARK-47840
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 4.0.0, 3.5.1
>Reporter: Bhuwan Sahni
>Assignee: Bhuwan Sahni
>Priority: Major
>  Labels: pull-request-available
>
> Streaming queries with Union of 2 data streams followed by an Aggregate 
> (groupBy) can produce incorrect results if the grouping key is a constant 
> literal for micro-batch duration.
> The query produces incorrect results because the query optimizer recognizes 
> the literal value in the grouping key as foldable and replaces the grouping 
> key expression with the actual literal value. This optimization is correct 
> for batch queries. However Streaming queries also read information from 
> StateStore, and the output contains both the results from StateStore 
> (computed in previous microbatches) and data from input sources (computed in 
> this microbatch). The HashAggregate node after StateStore always reads 
> grouping key value as the optimized literal (as the grouping key expression 
> is optimized into a literal by the optimizer). This ends up replacing keys in 
> StateStore with the literal value resulting in incorrect output. 
> See an example logical and physical plan below for a query performing a union 
> on 2 data streams, followed by a groupBy. Note that the name#4 expression has 
> been optimized to ds1. The Streaming query Aggregate adds StateStoreSave node 
> as child of HashAggregate, however any grouping key read from StateStore will 
> still be read as ds1 due to the optimization. 
>  
> *Optimized Logical Plan*
> {quote}=== Applying Rule 
> org.apache.spark.sql.catalyst.optimizer.FoldablePropagation ===
> === Old Plan ===
> WriteToMicroBatchDataSource MemorySink, eb67645e-30fc-41a8-8006-35bb7649c202, 
> Complete, 0
> +- Aggregate [name#4|#4], [name#4, count(1) AS count#31L|#4, count(1) AS 
> count#31L]
>    +- Project [ds1 AS name#4|#4]
>       +- StreamingDataSourceV2ScanRelation[value#1|#1] MemoryStreamDataSource
> === New Plan ===
> WriteToMicroBatchDataSource MemorySink, eb67645e-30fc-41a8-8006-35bb7649c202, 
> Complete, 0
> +- Aggregate [ds1], [ds1 AS name#4, count(1) AS count#31L|#4, count(1) AS 
> count#31L]
>    +- Project [ds1 AS name#4|#4]
>       +- StreamingDataSourceV2ScanRelation[value#1|#1] MemoryStreamDataSource
> 
> {quote}
> *Corresponding Physical Plan*
> {quote}WriteToDataSourceV2 MicroBatchWrite[epoch: 0, writer: 
> org.apache.spark.sql.execution.streaming.sources.MemoryStreamingWrite@2b4c6242],
>  
> org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$3143/1859075634@35709d26
> +- HashAggregate(keys=[ds1#39|#39], functions=[finalmerge_count(merge 
> count#38L) AS count(1)#30L|#38L) AS count(1)#30L], output=[name#4, 
> count#31L|#4, count#31L])
>    +- StateStoreSave [ds1#39|#39], state info [ checkpoint = 
> [file:/tmp/streaming.metadata-e470782a-18a3-463c-9e61-3a10d0bdf180/state|file:///tmp/streaming.metadata-e470782a-18a3-463c-9e61-3a10d0bdf180/state],
>  runId = 4dedecca-910c-4518-855e-456702617414, opId = 0, ver = 0, 
> numPartitions = 5], Complete, 0, 0, 2
>       +- HashAggregate(keys=[ds1#39|#39], functions=[merge_count(merge 
> count#38L) AS count#38L|#38L) AS count#38L], output=[ds1#39, count#38L|#39, 
> count#38L])
>          +- StateStoreRestore [ds1#39|#39], state info [ checkpoint = 
> [file:/tmp/streaming.metadata-e470782a-18a3-463c-9e61-3a10d0bdf180/state|file:///tmp/streaming.metadata-e470782a-18a3-463c-9e61-3a10d0bdf180/state],
>  runId = 4dedecca-910c-4518-855e-456702617414, opId = 0, ver = 0, 
> numPartitions = 5], 2
>             +- HashAggregate(keys=[ds1#39|#39], functions=[merge_count(merge 
> count#38L) AS count#38L|#38L) AS count#38L], output=[ds1#39, count#38L|#39, 
> count#38L])
>                +- HashAggregate(keys=[ds1 AS ds1#39|#39], 
> functions=[partial_count(1) AS count#38L|#38L], output=[ds1#39, 
> count#38L|#39, count#38L])
>                   +- Project
>                      +- MicroBatchScan[value#1|#1] MemoryStreamDataSource
> {quote}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-47840) Remove foldable propagation across Streaming Aggregate/Join nodes

2024-04-15 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved SPARK-47840.
--
Fix Version/s: 3.5.2
   4.0.0
   Resolution: Fixed

Issue resolved by pull request 46035
[https://github.com/apache/spark/pull/46035]

> Remove foldable propagation across Streaming Aggregate/Join nodes
> -
>
> Key: SPARK-47840
> URL: https://issues.apache.org/jira/browse/SPARK-47840
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 4.0.0, 3.5.1
>Reporter: Bhuwan Sahni
>Assignee: Bhuwan Sahni
>Priority: Major
>  Labels: pull-request-available
> Fix For: 3.5.2, 4.0.0
>
>
> Streaming queries with Union of 2 data streams followed by an Aggregate 
> (groupBy) can produce incorrect results if the grouping key is a constant 
> literal for micro-batch duration.
> The query produces incorrect results because the query optimizer recognizes 
> the literal value in the grouping key as foldable and replaces the grouping 
> key expression with the actual literal value. This optimization is correct 
> for batch queries. However Streaming queries also read information from 
> StateStore, and the output contains both the results from StateStore 
> (computed in previous microbatches) and data from input sources (computed in 
> this microbatch). The HashAggregate node after StateStore always reads 
> grouping key value as the optimized literal (as the grouping key expression 
> is optimized into a literal by the optimizer). This ends up replacing keys in 
> StateStore with the literal value resulting in incorrect output. 
> See an example logical and physical plan below for a query performing a union 
> on 2 data streams, followed by a groupBy. Note that the name#4 expression has 
> been optimized to ds1. The Streaming query Aggregate adds StateStoreSave node 
> as child of HashAggregate, however any grouping key read from StateStore will 
> still be read as ds1 due to the optimization. 
>  
> *Optimized Logical Plan*
> {quote}=== Applying Rule 
> org.apache.spark.sql.catalyst.optimizer.FoldablePropagation ===
> === Old Plan ===
> WriteToMicroBatchDataSource MemorySink, eb67645e-30fc-41a8-8006-35bb7649c202, 
> Complete, 0
> +- Aggregate [name#4|#4], [name#4, count(1) AS count#31L|#4, count(1) AS 
> count#31L]
>    +- Project [ds1 AS name#4|#4]
>       +- StreamingDataSourceV2ScanRelation[value#1|#1] MemoryStreamDataSource
> === New Plan ===
> WriteToMicroBatchDataSource MemorySink, eb67645e-30fc-41a8-8006-35bb7649c202, 
> Complete, 0
> +- Aggregate [ds1], [ds1 AS name#4, count(1) AS count#31L|#4, count(1) AS 
> count#31L]
>    +- Project [ds1 AS name#4|#4]
>       +- StreamingDataSourceV2ScanRelation[value#1|#1] MemoryStreamDataSource
> 
> {quote}
> *Corresponding Physical Plan*
> {quote}WriteToDataSourceV2 MicroBatchWrite[epoch: 0, writer: 
> org.apache.spark.sql.execution.streaming.sources.MemoryStreamingWrite@2b4c6242],
>  
> org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$3143/1859075634@35709d26
> +- HashAggregate(keys=[ds1#39|#39], functions=[finalmerge_count(merge 
> count#38L) AS count(1)#30L|#38L) AS count(1)#30L], output=[name#4, 
> count#31L|#4, count#31L])
>    +- StateStoreSave [ds1#39|#39], state info [ checkpoint = 
> [file:/tmp/streaming.metadata-e470782a-18a3-463c-9e61-3a10d0bdf180/state|file:///tmp/streaming.metadata-e470782a-18a3-463c-9e61-3a10d0bdf180/state],
>  runId = 4dedecca-910c-4518-855e-456702617414, opId = 0, ver = 0, 
> numPartitions = 5], Complete, 0, 0, 2
>       +- HashAggregate(keys=[ds1#39|#39], functions=[merge_count(merge 
> count#38L) AS count#38L|#38L) AS count#38L], output=[ds1#39, count#38L|#39, 
> count#38L])
>          +- StateStoreRestore [ds1#39|#39], state info [ checkpoint = 
> [file:/tmp/streaming.metadata-e470782a-18a3-463c-9e61-3a10d0bdf180/state|file:///tmp/streaming.metadata-e470782a-18a3-463c-9e61-3a10d0bdf180/state],
>  runId = 4dedecca-910c-4518-855e-456702617414, opId = 0, ver = 0, 
> numPartitions = 5], 2
>             +- HashAggregate(keys=[ds1#39|#39], functions=[merge_count(merge 
> count#38L) AS count#38L|#38L) AS count#38L], output=[ds1#39, count#38L|#39, 
> count#38L])
>                +- HashAggregate(keys=[ds1 AS ds1#39|#39], 
> functions=[partial_count(1) AS count#38L|#38L], output=[ds1#39, 
> count#38L|#39, count#38L])
>                   +- Project
>                      +- MicroBatchScan[value#1|#1] MemoryStreamDataSource
> {quote}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: 

[jira] [Resolved] (SPARK-47673) [Arbitrary State Support] State TTL support - ListState

2024-04-15 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47673?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved SPARK-47673.
--
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 45932
[https://github.com/apache/spark/pull/45932]

> [Arbitrary State Support] State TTL support - ListState
> ---
>
> Key: SPARK-47673
> URL: https://issues.apache.org/jira/browse/SPARK-47673
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 4.0.0
>Reporter: Eric Marnadi
>Assignee: Eric Marnadi
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>
> Add support for expiring state value based on ttl for List State in 
> transformWithState operator.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-47788) Ensure the same hash partitioning scheme/hash function is used across batches

2024-04-15 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47788?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved SPARK-47788.
--
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 45971
[https://github.com/apache/spark/pull/45971]

> Ensure the same hash partitioning scheme/hash function is used across batches
> -
>
> Key: SPARK-47788
> URL: https://issues.apache.org/jira/browse/SPARK-47788
> Project: Spark
>  Issue Type: Test
>  Components: Structured Streaming
>Affects Versions: 3.5.1
>Reporter: Fanyue Xia
>Assignee: Fanyue Xia
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>
> To really make sure that any changes to hash function / partitioner in Spark 
> doesn’t cause logical correctness issues in existing running streaming 
> queries, we should add a new unit test, to ensure hash function stability is 
> maintained.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-47788) Ensure the same hash partitioning scheme/hash function is used across batches

2024-04-15 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47788?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim reassigned SPARK-47788:


Assignee: Fanyue Xia

> Ensure the same hash partitioning scheme/hash function is used across batches
> -
>
> Key: SPARK-47788
> URL: https://issues.apache.org/jira/browse/SPARK-47788
> Project: Spark
>  Issue Type: Test
>  Components: Structured Streaming
>Affects Versions: 3.5.1
>Reporter: Fanyue Xia
>Assignee: Fanyue Xia
>Priority: Major
>  Labels: pull-request-available
>
> To really make sure that any changes to hash function / partitioner in Spark 
> doesn’t cause logical correctness issues in existing running streaming 
> queries, we should add a new unit test, to ensure hash function stability is 
> maintained.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-47848) Fix thread safe access for loadedMaps in close

2024-04-15 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved SPARK-47848.
--
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 46048
[https://github.com/apache/spark/pull/46048]

> Fix thread safe access for loadedMaps in close
> --
>
> Key: SPARK-47848
> URL: https://issues.apache.org/jira/browse/SPARK-47848
> Project: Spark
>  Issue Type: Task
>  Components: Structured Streaming
>Affects Versions: 4.0.0
>Reporter: Anish Shrigondekar
>Assignee: Anish Shrigondekar
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>
> Fix thread safe access for loadedMaps in close



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-47733) Add operational metrics for TWS operators

2024-04-12 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim reassigned SPARK-47733:


Assignee: Anish Shrigondekar  (was: Jing Zhan)

> Add operational metrics for TWS operators
> -
>
> Key: SPARK-47733
> URL: https://issues.apache.org/jira/browse/SPARK-47733
> Project: Spark
>  Issue Type: Task
>  Components: Structured Streaming
>Affects Versions: 4.0.0
>Reporter: Jing Zhan
>Assignee: Anish Shrigondekar
>Priority: Major
>  Labels: pull-request-available
>
> Add metrics to improve observability for the newly added operator 
> TransformWithState and some changes we've made into RocksDB.
> Proposed metrics to add:
>  * on the RocksDB StateStore metrics side, we will add the following:
>  ** num external col families
>  ** num internal col families
>  * on the operator side, we will add the following:
>  ** number of state vars
>  ** count of state vars by type
>  ** output mode
>  ** timeout mode
>  ** registered timers in batch
>  ** expired timers in batch
>  ** initial state enabled or not
>  ** number of state vars removed in batch



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-47733) Add operational metrics for TWS operators

2024-04-12 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim reassigned SPARK-47733:


Assignee: Jing Zhan

> Add operational metrics for TWS operators
> -
>
> Key: SPARK-47733
> URL: https://issues.apache.org/jira/browse/SPARK-47733
> Project: Spark
>  Issue Type: Task
>  Components: Structured Streaming
>Affects Versions: 4.0.0
>Reporter: Jing Zhan
>Assignee: Jing Zhan
>Priority: Major
>  Labels: pull-request-available
>
> Add metrics to improve observability for the newly added operator 
> TransformWithState and some changes we've made into RocksDB.
> Proposed metrics to add:
>  * on the RocksDB StateStore metrics side, we will add the following:
>  ** num external col families
>  ** num internal col families
>  * on the operator side, we will add the following:
>  ** number of state vars
>  ** count of state vars by type
>  ** output mode
>  ** timeout mode
>  ** registered timers in batch
>  ** expired timers in batch
>  ** initial state enabled or not
>  ** number of state vars removed in batch



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-47784) [State API v2] Merge TimeoutMode and TTLMode into TimeMode

2024-04-11 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47784?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved SPARK-47784.
--
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 45960
[https://github.com/apache/spark/pull/45960]

> [State API v2] Merge TimeoutMode and TTLMode into TimeMode
> --
>
> Key: SPARK-47784
> URL: https://issues.apache.org/jira/browse/SPARK-47784
> Project: Spark
>  Issue Type: Story
>  Components: Structured Streaming
>Affects Versions: 4.0.0
>Reporter: Bhuwan Sahni
>Assignee: Bhuwan Sahni
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>
> Currently, users need to specify the notion of time 
> (ProcessingTime/EventTime) for timers and ttl separately. This allows users 
> to use a single parameter.
> We do not expect users to use mix/match EventTime/ProcessingTime for timers 
> and ttl in a single query because it makes hard to reason about the time 
> semantics (when will timer be fired?, when will the state be evicted? etc.). 
> Its simpler to stick to one notion of time throughout timers and ttl.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-47776) State store operation cannot work properly with binary inequality collation

2024-04-09 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim reassigned SPARK-47776:


Assignee: Jungtaek Lim

> State store operation cannot work properly with binary inequality collation
> ---
>
> Key: SPARK-47776
> URL: https://issues.apache.org/jira/browse/SPARK-47776
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 4.0.0
>Reporter: Jungtaek Lim
>Assignee: Jungtaek Lim
>Priority: Blocker
>  Labels: pull-request-available
>
> Arguably this is a correctness issue, though we haven't released collation 
> feature yet.
> collation introduces the concept of binary (in)equality, which means in some 
> collation we no longer be able to just compare the binary format of two 
> UnsafeRows to determine equality.
> For example, 'aaa' and 'AAA' can be "semantically" same in case insensitive 
> collation.
> State store is basically key-value storage, and the most provider 
> implementations rely on the fact that all the columns in the key schema 
> support binary equality. We need to disallow using binary inequality column 
> in the key schema, before we could support this in majority of state store 
> providers (or high-level of state store.)
> Why this is correctness issue? For example, streaming aggregation will 
> produce an output of aggregation which does not care about the semantic 
> equality.
> e.g. df.groupBy(strCol).count() 
> Although strCol is case insensitive, 'a' and 'A' won't be counted together in 
> streaming aggregation, while they should be.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-47776) State store operation cannot work properly with binary inequality collation

2024-04-09 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved SPARK-47776.
--
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 45951
[https://github.com/apache/spark/pull/45951]

> State store operation cannot work properly with binary inequality collation
> ---
>
> Key: SPARK-47776
> URL: https://issues.apache.org/jira/browse/SPARK-47776
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 4.0.0
>Reporter: Jungtaek Lim
>Assignee: Jungtaek Lim
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>
> Arguably this is a correctness issue, though we haven't released collation 
> feature yet.
> collation introduces the concept of binary (in)equality, which means in some 
> collation we no longer be able to just compare the binary format of two 
> UnsafeRows to determine equality.
> For example, 'aaa' and 'AAA' can be "semantically" same in case insensitive 
> collation.
> State store is basically key-value storage, and the most provider 
> implementations rely on the fact that all the columns in the key schema 
> support binary equality. We need to disallow using binary inequality column 
> in the key schema, before we could support this in majority of state store 
> providers (or high-level of state store.)
> Why this is correctness issue? For example, streaming aggregation will 
> produce an output of aggregation which does not care about the semantic 
> equality.
> e.g. df.groupBy(strCol).count() 
> Although strCol is case insensitive, 'a' and 'A' won't be counted together in 
> streaming aggregation, while they should be.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-47718) .sql() does not recognize watermark defined upstream

2024-04-09 Thread Jungtaek Lim (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-47718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17835303#comment-17835303
 ] 

Jungtaek Lim commented on SPARK-47718:
--

window('createTime', '1 hour', '30 minutes')

'createTime' is a literal, not the column reference. Please try again with 
window(createTime, '1 hour', '30 minutes') and reopen if the issue persists.

> .sql() does not recognize watermark defined upstream
> 
>
> Key: SPARK-47718
> URL: https://issues.apache.org/jira/browse/SPARK-47718
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 3.5.1
>Reporter: Chloe He
>Priority: Major
>  Labels: pull-request-available
>
> I have a data pipeline set up in such a way that it reads data from a Kafka 
> source, does some transformation on the data using pyspark, then writes the 
> output into a sink (Kafka, Redis, etc).
>  
> My entire pipeline in written in SQL, so I wish to use the .sql() method to 
> execute SQL on my streaming source directly.
>  
> However, I'm running into the issue where my watermark is not being 
> recognized by the downstream query via the .sql() method.
>  
> ```
> Python 3.11.8 | packaged by conda-forge | (main, Feb 16 2024, 20:49:36) 
> [Clang 16.0.6 ] on darwin
> Type "help", "copyright", "credits" or "license" for more information.
> >>> import pyspark
> >>> print(pyspark.__version__)
> 3.5.1
> >>> from pyspark.sql import SparkSession
> >>>
> >>> session = SparkSession.builder \
> ...     .config("spark.jars.packages", 
> "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1")\
> ...     .getOrCreate()
> >>> from pyspark.sql.functions import col, from_json
> >>> from pyspark.sql.types import StructField, StructType, TimestampType, 
> >>> LongType, DoubleType, IntegerType
> >>> schema = StructType(
> ...     [
> ...         StructField('createTime', TimestampType(), True),
> ...         StructField('orderId', LongType(), True),
> ...         StructField('payAmount', DoubleType(), True),
> ...         StructField('payPlatform', IntegerType(), True),
> ...         StructField('provinceId', IntegerType(), True),
> ...     ])
> >>>
> >>> streaming_df = session.readStream\
> ...     .format("kafka")\
> ...     .option("kafka.bootstrap.servers", "localhost:9092")\
> ...     .option("subscribe", "payment_msg")\
> ...     .option("startingOffsets","earliest")\
> ...     .load()\
> ...     .select(from_json(col("value").cast("string"), 
> schema).alias("parsed_value"))\
> ...     .select("parsed_value.*")\
> ...     .withWatermark("createTime", "10 seconds")
> >>>
> >>> streaming_df.createOrReplaceTempView("streaming_df")
> >>> session.sql("""
> ... SELECT
> ...     window.start, window.end, provinceId, sum(payAmount) as totalPayAmount
> ...     FROM streaming_df
> ...     GROUP BY provinceId, window('createTime', '1 hour', '30 minutes')
> ...     ORDER BY window.start
> ... """)\
> ...   .writeStream\
> ...   .format("kafka") \
> ...   .option("checkpointLocation", "checkpoint") \
> ...   .option("kafka.bootstrap.servers", "localhost:9092") \
> ...   .option("topic", "sink") \
> ...   .start()
> ```
>  
> This throws exception
> ```
> pyspark.errors.exceptions.captured.AnalysisException: Append output mode not 
> supported when there are streaming aggregations on streaming 
> DataFrames/DataSets without watermark; line 6 pos 4;
> ```
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-47718) .sql() does not recognize watermark defined upstream

2024-04-09 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved SPARK-47718.
--
Resolution: Not A Bug

> .sql() does not recognize watermark defined upstream
> 
>
> Key: SPARK-47718
> URL: https://issues.apache.org/jira/browse/SPARK-47718
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 3.5.1
>Reporter: Chloe He
>Priority: Major
>  Labels: pull-request-available
>
> I have a data pipeline set up in such a way that it reads data from a Kafka 
> source, does some transformation on the data using pyspark, then writes the 
> output into a sink (Kafka, Redis, etc).
>  
> My entire pipeline in written in SQL, so I wish to use the .sql() method to 
> execute SQL on my streaming source directly.
>  
> However, I'm running into the issue where my watermark is not being 
> recognized by the downstream query via the .sql() method.
>  
> ```
> Python 3.11.8 | packaged by conda-forge | (main, Feb 16 2024, 20:49:36) 
> [Clang 16.0.6 ] on darwin
> Type "help", "copyright", "credits" or "license" for more information.
> >>> import pyspark
> >>> print(pyspark.__version__)
> 3.5.1
> >>> from pyspark.sql import SparkSession
> >>>
> >>> session = SparkSession.builder \
> ...     .config("spark.jars.packages", 
> "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1")\
> ...     .getOrCreate()
> >>> from pyspark.sql.functions import col, from_json
> >>> from pyspark.sql.types import StructField, StructType, TimestampType, 
> >>> LongType, DoubleType, IntegerType
> >>> schema = StructType(
> ...     [
> ...         StructField('createTime', TimestampType(), True),
> ...         StructField('orderId', LongType(), True),
> ...         StructField('payAmount', DoubleType(), True),
> ...         StructField('payPlatform', IntegerType(), True),
> ...         StructField('provinceId', IntegerType(), True),
> ...     ])
> >>>
> >>> streaming_df = session.readStream\
> ...     .format("kafka")\
> ...     .option("kafka.bootstrap.servers", "localhost:9092")\
> ...     .option("subscribe", "payment_msg")\
> ...     .option("startingOffsets","earliest")\
> ...     .load()\
> ...     .select(from_json(col("value").cast("string"), 
> schema).alias("parsed_value"))\
> ...     .select("parsed_value.*")\
> ...     .withWatermark("createTime", "10 seconds")
> >>>
> >>> streaming_df.createOrReplaceTempView("streaming_df")
> >>> session.sql("""
> ... SELECT
> ...     window.start, window.end, provinceId, sum(payAmount) as totalPayAmount
> ...     FROM streaming_df
> ...     GROUP BY provinceId, window('createTime', '1 hour', '30 minutes')
> ...     ORDER BY window.start
> ... """)\
> ...   .writeStream\
> ...   .format("kafka") \
> ...   .option("checkpointLocation", "checkpoint") \
> ...   .option("kafka.bootstrap.servers", "localhost:9092") \
> ...   .option("topic", "sink") \
> ...   .start()
> ```
>  
> This throws exception
> ```
> pyspark.errors.exceptions.captured.AnalysisException: Append output mode not 
> supported when there are streaming aggregations on streaming 
> DataFrames/DataSets without watermark; line 6 pos 4;
> ```
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-47776) State store operation cannot work properly with binary inequality collation

2024-04-09 Thread Jungtaek Lim (Jira)
Jungtaek Lim created SPARK-47776:


 Summary: State store operation cannot work properly with binary 
inequality collation
 Key: SPARK-47776
 URL: https://issues.apache.org/jira/browse/SPARK-47776
 Project: Spark
  Issue Type: Bug
  Components: Structured Streaming
Affects Versions: 4.0.0
Reporter: Jungtaek Lim


Arguably this is a correctness issue, though we haven't released collation 
feature yet.

collation introduces the concept of binary (in)equality, which means in some 
collation we no longer be able to just compare the binary format of two 
UnsafeRows to determine equality.

For example, 'aaa' and 'AAA' can be "semantically" same in case insensitive 
collation.

State store is basically key-value storage, and the most provider 
implementations rely on the fact that all the columns in the key schema support 
binary equality. We need to disallow using binary inequality column in the key 
schema, before we could support this in majority of state store providers (or 
high-level of state store.)

Why this is correctness issue? For example, streaming aggregation will produce 
an output of aggregation which does not care about the semantic equality.

e.g. df.groupBy(strCol).count() 

Although strCol is case insensitive, 'a' and 'A' won't be counted together in 
streaming aggregation, while they should be.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-47718) .sql() does not recognize watermark defined upstream

2024-04-09 Thread Jungtaek Lim (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-47718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17835175#comment-17835175
 ] 

Jungtaek Lim commented on SPARK-47718:
--

I've lowered down to major - this is neither a regression nor correctness issue.

> .sql() does not recognize watermark defined upstream
> 
>
> Key: SPARK-47718
> URL: https://issues.apache.org/jira/browse/SPARK-47718
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 3.5.1
>Reporter: Chloe He
>Priority: Major
>  Labels: pull-request-available
>
> I have a data pipeline set up in such a way that it reads data from a Kafka 
> source, does some transformation on the data using pyspark, then writes the 
> output into a sink (Kafka, Redis, etc).
>  
> My entire pipeline in written in SQL, so I wish to use the .sql() method to 
> execute SQL on my streaming source directly.
>  
> However, I'm running into the issue where my watermark is not being 
> recognized by the downstream query via the .sql() method.
>  
> ```
> Python 3.11.8 | packaged by conda-forge | (main, Feb 16 2024, 20:49:36) 
> [Clang 16.0.6 ] on darwin
> Type "help", "copyright", "credits" or "license" for more information.
> >>> import pyspark
> >>> print(pyspark.__version__)
> 3.5.1
> >>> from pyspark.sql import SparkSession
> >>>
> >>> session = SparkSession.builder \
> ...     .config("spark.jars.packages", 
> "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1")\
> ...     .getOrCreate()
> >>> from pyspark.sql.functions import col, from_json
> >>> from pyspark.sql.types import StructField, StructType, TimestampType, 
> >>> LongType, DoubleType, IntegerType
> >>> schema = StructType(
> ...     [
> ...         StructField('createTime', TimestampType(), True),
> ...         StructField('orderId', LongType(), True),
> ...         StructField('payAmount', DoubleType(), True),
> ...         StructField('payPlatform', IntegerType(), True),
> ...         StructField('provinceId', IntegerType(), True),
> ...     ])
> >>>
> >>> streaming_df = session.readStream\
> ...     .format("kafka")\
> ...     .option("kafka.bootstrap.servers", "localhost:9092")\
> ...     .option("subscribe", "payment_msg")\
> ...     .option("startingOffsets","earliest")\
> ...     .load()\
> ...     .select(from_json(col("value").cast("string"), 
> schema).alias("parsed_value"))\
> ...     .select("parsed_value.*")\
> ...     .withWatermark("createTime", "10 seconds")
> >>>
> >>> streaming_df.createOrReplaceTempView("streaming_df")
> >>> session.sql("""
> ... SELECT
> ...     window.start, window.end, provinceId, sum(payAmount) as totalPayAmount
> ...     FROM streaming_df
> ...     GROUP BY provinceId, window('createTime', '1 hour', '30 minutes')
> ...     ORDER BY window.start
> ... """)\
> ...   .writeStream\
> ...   .format("kafka") \
> ...   .option("checkpointLocation", "checkpoint") \
> ...   .option("kafka.bootstrap.servers", "localhost:9092") \
> ...   .option("topic", "sink") \
> ...   .start()
> ```
>  
> This throws exception
> ```
> pyspark.errors.exceptions.captured.AnalysisException: Append output mode not 
> supported when there are streaming aggregations on streaming 
> DataFrames/DataSets without watermark; line 6 pos 4;
> ```
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-47718) .sql() does not recognize watermark defined upstream

2024-04-09 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim updated SPARK-47718:
-
Priority: Major  (was: Blocker)

> .sql() does not recognize watermark defined upstream
> 
>
> Key: SPARK-47718
> URL: https://issues.apache.org/jira/browse/SPARK-47718
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 3.5.1
>Reporter: Chloe He
>Priority: Major
>  Labels: pull-request-available
>
> I have a data pipeline set up in such a way that it reads data from a Kafka 
> source, does some transformation on the data using pyspark, then writes the 
> output into a sink (Kafka, Redis, etc).
>  
> My entire pipeline in written in SQL, so I wish to use the .sql() method to 
> execute SQL on my streaming source directly.
>  
> However, I'm running into the issue where my watermark is not being 
> recognized by the downstream query via the .sql() method.
>  
> ```
> Python 3.11.8 | packaged by conda-forge | (main, Feb 16 2024, 20:49:36) 
> [Clang 16.0.6 ] on darwin
> Type "help", "copyright", "credits" or "license" for more information.
> >>> import pyspark
> >>> print(pyspark.__version__)
> 3.5.1
> >>> from pyspark.sql import SparkSession
> >>>
> >>> session = SparkSession.builder \
> ...     .config("spark.jars.packages", 
> "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1")\
> ...     .getOrCreate()
> >>> from pyspark.sql.functions import col, from_json
> >>> from pyspark.sql.types import StructField, StructType, TimestampType, 
> >>> LongType, DoubleType, IntegerType
> >>> schema = StructType(
> ...     [
> ...         StructField('createTime', TimestampType(), True),
> ...         StructField('orderId', LongType(), True),
> ...         StructField('payAmount', DoubleType(), True),
> ...         StructField('payPlatform', IntegerType(), True),
> ...         StructField('provinceId', IntegerType(), True),
> ...     ])
> >>>
> >>> streaming_df = session.readStream\
> ...     .format("kafka")\
> ...     .option("kafka.bootstrap.servers", "localhost:9092")\
> ...     .option("subscribe", "payment_msg")\
> ...     .option("startingOffsets","earliest")\
> ...     .load()\
> ...     .select(from_json(col("value").cast("string"), 
> schema).alias("parsed_value"))\
> ...     .select("parsed_value.*")\
> ...     .withWatermark("createTime", "10 seconds")
> >>>
> >>> streaming_df.createOrReplaceTempView("streaming_df")
> >>> session.sql("""
> ... SELECT
> ...     window.start, window.end, provinceId, sum(payAmount) as totalPayAmount
> ...     FROM streaming_df
> ...     GROUP BY provinceId, window('createTime', '1 hour', '30 minutes')
> ...     ORDER BY window.start
> ... """)\
> ...   .writeStream\
> ...   .format("kafka") \
> ...   .option("checkpointLocation", "checkpoint") \
> ...   .option("kafka.bootstrap.servers", "localhost:9092") \
> ...   .option("topic", "sink") \
> ...   .start()
> ```
>  
> This throws exception
> ```
> pyspark.errors.exceptions.captured.AnalysisException: Append output mode not 
> supported when there are streaming aggregations on streaming 
> DataFrames/DataSets without watermark; line 6 pos 4;
> ```
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-47746) Use column ordinals instead of prefix ordering columns in the range scan encoder

2024-04-08 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47746?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved SPARK-47746.
--
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 45905
[https://github.com/apache/spark/pull/45905]

> Use column ordinals instead of prefix ordering columns in the range scan 
> encoder
> 
>
> Key: SPARK-47746
> URL: https://issues.apache.org/jira/browse/SPARK-47746
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 4.0.0
>Reporter: Neil Ramaswamy
>Assignee: Neil Ramaswamy
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>
> Currently, the State V2 implementations do projections in their state 
> managers, and then provide some prefix (ordering) columns to the 
> RocksDBStateEncoder. However, we can avoid doing extra projection by just 
> reading the ordinals we need, in the order we need, in the state encoder.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-47558) [Arbitrary State Support] State TTL support - ValueState

2024-04-07 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved SPARK-47558.
--
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 45674
[https://github.com/apache/spark/pull/45674]

> [Arbitrary State Support] State TTL support - ValueState
> 
>
> Key: SPARK-47558
> URL: https://issues.apache.org/jira/browse/SPARK-47558
> Project: Spark
>  Issue Type: Task
>  Components: Structured Streaming
>Affects Versions: 4.0.0
>Reporter: Bhuwan Sahni
>Assignee: Bhuwan Sahni
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>
> Add support for expiring state value based on ttl for Value State in 
> transformWithState operator.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-47299) Use the same `versions. json` in the dropdown of different versions of PySpark documents

2024-04-07 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim reassigned SPARK-47299:


Assignee: BingKun Pan

> Use the same `versions. json` in the dropdown of different versions of 
> PySpark documents
> 
>
> Key: SPARK-47299
> URL: https://issues.apache.org/jira/browse/SPARK-47299
> Project: Spark
>  Issue Type: Improvement
>  Components: Documentation, PySpark
>Affects Versions: 4.0.0, 3.5.1
>Reporter: BingKun Pan
>Assignee: BingKun Pan
>Priority: Minor
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-47299) Use the same `versions. json` in the dropdown of different versions of PySpark documents

2024-04-07 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved SPARK-47299.
--
Fix Version/s: 3.5.2
   4.0.0
   Resolution: Fixed

Issue resolved by pull request 45400
[https://github.com/apache/spark/pull/45400]

> Use the same `versions. json` in the dropdown of different versions of 
> PySpark documents
> 
>
> Key: SPARK-47299
> URL: https://issues.apache.org/jira/browse/SPARK-47299
> Project: Spark
>  Issue Type: Improvement
>  Components: Documentation, PySpark
>Affects Versions: 4.0.0, 3.5.1
>Reporter: BingKun Pan
>Assignee: BingKun Pan
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 3.5.2, 4.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-47734) Fix flaky pyspark.sql.dataframe.DataFrame.writeStream doctest by stopping streaming query

2024-04-07 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47734?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim updated SPARK-47734:
-
Fix Version/s: 3.4.3

> Fix flaky pyspark.sql.dataframe.DataFrame.writeStream doctest by stopping 
> streaming query
> -
>
> Key: SPARK-47734
> URL: https://issues.apache.org/jira/browse/SPARK-47734
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark, Tests
>Affects Versions: 4.0.0
>Reporter: Josh Rosen
>Assignee: Josh Rosen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0, 3.5.2, 3.4.3
>
>
> https://issues.apache.org/jira/browse/SPARK-47199 didn't fix the flakiness in 
> the pyspark.sql.dataframe.DataFrame.writeStream doctest : the problem is not 
> that we are colliding on the test but, rather, that the test is starting a 
> background thread to write to a directory then deleting that directory from 
> the main test thread, something which is inherently race prone.
> The fix is simple: stop the streaming query in the doctest itself, similar to 
> other streaming doctest examples.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-47744) Add support for negative byte types in RocksDB range scan encoder

2024-04-07 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47744?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved SPARK-47744.
--
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 45906
[https://github.com/apache/spark/pull/45906]

> Add support for negative byte types in RocksDB range scan encoder
> -
>
> Key: SPARK-47744
> URL: https://issues.apache.org/jira/browse/SPARK-47744
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 4.0.0
>Reporter: Neil Ramaswamy
>Assignee: Neil Ramaswamy
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>
> SPARK-47372 introduced the ability for the RocksDB state provider to 
> big-endian encode values so that they could be range scanned. However, signed 
> support for Byte types [was 
> missed|https://github.com/apache/spark/blob/1efbf43160aa4e36710a4668f05fe61534f49648/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala#L326],
>  even though Scala Bytes are 
> [signed|https://www.scala-lang.org/api/2.13.5/scala/Byte.html].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-47744) Add support for negative byte types in RocksDB range scan encoder

2024-04-07 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47744?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim reassigned SPARK-47744:


Assignee: Neil Ramaswamy

> Add support for negative byte types in RocksDB range scan encoder
> -
>
> Key: SPARK-47744
> URL: https://issues.apache.org/jira/browse/SPARK-47744
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 4.0.0
>Reporter: Neil Ramaswamy
>Assignee: Neil Ramaswamy
>Priority: Major
>  Labels: pull-request-available
>
> SPARK-47372 introduced the ability for the RocksDB state provider to 
> big-endian encode values so that they could be range scanned. However, signed 
> support for Byte types [was 
> missed|https://github.com/apache/spark/blob/1efbf43160aa4e36710a4668f05fe61534f49648/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala#L326],
>  even though Scala Bytes are 
> [signed|https://www.scala-lang.org/api/2.13.5/scala/Byte.html].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-47653) Add support for negative numbers with range scan encoder

2024-04-03 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47653?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved SPARK-47653.
--
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 45778
[https://github.com/apache/spark/pull/45778]

> Add support for negative numbers with range scan encoder
> 
>
> Key: SPARK-47653
> URL: https://issues.apache.org/jira/browse/SPARK-47653
> Project: Spark
>  Issue Type: Task
>  Components: Structured Streaming
>Affects Versions: 4.0.0
>Reporter: Anish Shrigondekar
>Assignee: Anish Shrigondekar
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>
> Add support for negative numbers with range scan encoder



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-47655) Integrate timer with Initial State handling for state-v2

2024-04-02 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47655?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved SPARK-47655.
--
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 45780
[https://github.com/apache/spark/pull/45780]

> Integrate timer with Initial State handling for state-v2
> 
>
> Key: SPARK-47655
> URL: https://issues.apache.org/jira/browse/SPARK-47655
> Project: Spark
>  Issue Type: Task
>  Components: Structured Streaming
>Affects Versions: 4.0.0
>Reporter: Jing Zhan
>Assignee: Jing Zhan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-47568) Fix race condition between maintenance thread and task thead for RocksDB snapshot

2024-03-28 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47568?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved SPARK-47568.
--
Fix Version/s: 4.0.0
 Assignee: Bhuwan Sahni
   Resolution: Fixed

Issue resolved via https://github.com/apache/spark/pull/45724

> Fix race condition between maintenance thread and task thead for RocksDB 
> snapshot
> -
>
> Key: SPARK-47568
> URL: https://issues.apache.org/jira/browse/SPARK-47568
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 3.5.0, 4.0.0, 3.5.1, 3.5.2
>Reporter: Bhuwan Sahni
>Assignee: Bhuwan Sahni
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>
> There are currently some race conditions between maintenance thread and task 
> thread which can result in corrupted checkpoint state.
>  # The maintenance thread currently relies on class variable {{lastSnapshot}} 
> to find the latest checkpoint and uploads it to DFS. This checkpoint can be 
> modified at commit time by Task thread if a new snapshot is created.
>  # The task thread does not reset lastSnapshot at load time, which can result 
> in newer snapshots (if a old version is loaded) being considered valid and 
> uploaded to DFS. This results in VersionIdMismatch errors.
> This issue proposes to fix these issues by guarding latestSnapshot variable 
> modification, and setting latestSnapshot properly at load time.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-47363) Initial State without state reader implementation for State API v2.

2024-03-27 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47363?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved SPARK-47363.
--
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 45467
[https://github.com/apache/spark/pull/45467]

> Initial State without state reader implementation for State API v2.
> ---
>
> Key: SPARK-47363
> URL: https://issues.apache.org/jira/browse/SPARK-47363
> Project: Spark
>  Issue Type: Task
>  Components: Structured Streaming
>Affects Versions: 4.0.0
>Reporter: Jing Zhan
>Assignee: Jing Zhan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>
> This PR adds support for users to provide a Dataframe that can be used to 
> instantiate state for the query in the first batch for arbitrary state API v2.
> Note that populating the initial state will only happen for the first batch 
> of the new streaming query. Trying to re-initialize state for the same 
> grouping key will result in an error.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-47107) Implement partition reader for python streaming data source

2024-03-27 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved SPARK-47107.
--
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 45485
[https://github.com/apache/spark/pull/45485]

> Implement partition reader for python streaming data source
> ---
>
> Key: SPARK-47107
> URL: https://issues.apache.org/jira/browse/SPARK-47107
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark, SS
>Affects Versions: 4.0.0
>Reporter: Chaoqin Li
>Assignee: Chaoqin Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>
> Piggy back the PythonPartitionReaderFactory to implement reading a data 
> partition for python streaming data source. Add test case to verify that 
> python streaming data source can read and process data end to end.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-47107) Implement partition reader for python streaming data source

2024-03-27 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim reassigned SPARK-47107:


Assignee: Chaoqin Li

> Implement partition reader for python streaming data source
> ---
>
> Key: SPARK-47107
> URL: https://issues.apache.org/jira/browse/SPARK-47107
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark, SS
>Affects Versions: 4.0.0
>Reporter: Chaoqin Li
>Assignee: Chaoqin Li
>Priority: Major
>  Labels: pull-request-available
>
> Piggy back the PythonPartitionReaderFactory to implement reading a data 
> partition for python streaming data source. Add test case to verify that 
> python streaming data source can read and process data end to end.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-47570) Integrate range scan encoder changes with timer implementation

2024-03-26 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47570?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim reassigned SPARK-47570:


Assignee: Jing Zhan

> Integrate range scan encoder changes with timer implementation
> --
>
> Key: SPARK-47570
> URL: https://issues.apache.org/jira/browse/SPARK-47570
> Project: Spark
>  Issue Type: Task
>  Components: Structured Streaming
>Affects Versions: 4.0.0
>Reporter: Jing Zhan
>Assignee: Jing Zhan
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-47570) Integrate range scan encoder changes with timer implementation

2024-03-26 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47570?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved SPARK-47570.
--
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 45709
[https://github.com/apache/spark/pull/45709]

> Integrate range scan encoder changes with timer implementation
> --
>
> Key: SPARK-47570
> URL: https://issues.apache.org/jira/browse/SPARK-47570
> Project: Spark
>  Issue Type: Task
>  Components: Structured Streaming
>Affects Versions: 4.0.0
>Reporter: Jing Zhan
>Assignee: Jing Zhan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-47273) Implement python stream writer interface

2024-03-26 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved SPARK-47273.
--
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 45305
[https://github.com/apache/spark/pull/45305]

> Implement python stream writer interface
> 
>
> Key: SPARK-47273
> URL: https://issues.apache.org/jira/browse/SPARK-47273
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark, SS
>Affects Versions: 4.0.0
>Reporter: Chaoqin Li
>Assignee: Chaoqin Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>
> In order to support developing spark streaming sink in python, we need to 
> implement python stream writer interface.
> Reuse PythonPartitionWriter to implement the serialization and execution of 
> write callback in executor.
> Implement python worker process to run python streaming data sink committer 
> and communicate with JVM through socket in spark driver. For each python 
> streaming data sink instance there will be a long live python worker process 
> created. Inside the python process, the python write committer will receive 
> abort or commit function call and send back result through socket.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-47273) Implement python stream writer interface

2024-03-26 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim reassigned SPARK-47273:


Assignee: Chaoqin Li

> Implement python stream writer interface
> 
>
> Key: SPARK-47273
> URL: https://issues.apache.org/jira/browse/SPARK-47273
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark, SS
>Affects Versions: 4.0.0
>Reporter: Chaoqin Li
>Assignee: Chaoqin Li
>Priority: Major
>  Labels: pull-request-available
>
> In order to support developing spark streaming sink in python, we need to 
> implement python stream writer interface.
> Reuse PythonPartitionWriter to implement the serialization and execution of 
> write callback in executor.
> Implement python worker process to run python streaming data sink committer 
> and communicate with JVM through socket in spark driver. For each python 
> streaming data sink instance there will be a long live python worker process 
> created. Inside the python process, the python write committer will receive 
> abort or commit function call and send back result through socket.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-47512) Tag operation type for RocksDB instance lock acquisition

2024-03-21 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved SPARK-47512.
--
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 45651
[https://github.com/apache/spark/pull/45651]

> Tag operation type for RocksDB instance lock acquisition
> 
>
> Key: SPARK-47512
> URL: https://issues.apache.org/jira/browse/SPARK-47512
> Project: Spark
>  Issue Type: Task
>  Components: Structured Streaming
>Affects Versions: 4.0.0
>Reporter: Anish Shrigondekar
>Assignee: Anish Shrigondekar
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>
> Tag operation type for RocksDB instance lock acquisition



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-47449) Refactor and split list/timer unit tests

2024-03-19 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47449?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved SPARK-47449.
--
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 45573
[https://github.com/apache/spark/pull/45573]

> Refactor and split list/timer unit tests
> 
>
> Key: SPARK-47449
> URL: https://issues.apache.org/jira/browse/SPARK-47449
> Project: Spark
>  Issue Type: Task
>  Components: Structured Streaming
>Affects Versions: 4.0.0
>Reporter: Jing Zhan
>Assignee: Jing Zhan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>
> Refactor ListState and timer related unit tests.
> As planned in test plan for state-v2, list/timer should be tested in both 
> integration and unit tests. Currently timer related tests could be refactored 
> to use base suite class in {{{}ValueStateSuite{}}}, and list state unit tests 
> are needed in addition to {{{}TransformWithListStateSuite{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-47329) Persist df while using foreachbatch and stateful streaming query to prevent state from being re-loaded in each batch

2024-03-18 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47329?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved SPARK-47329.
--
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 45432
[https://github.com/apache/spark/pull/45432]

> Persist df while using foreachbatch and stateful streaming query to prevent 
> state from being re-loaded in each batch
> 
>
> Key: SPARK-47329
> URL: https://issues.apache.org/jira/browse/SPARK-47329
> Project: Spark
>  Issue Type: Task
>  Components: Structured Streaming
>Affects Versions: 4.0.0
>Reporter: Anish Shrigondekar
>Assignee: Anish Shrigondekar
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>
> Persist df while using foreachbatch and stateful streaming query to prevent 
> state from being re-loaded in each batch



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-46913) Implement timer functionality for transformWithState operator

2024-03-13 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved SPARK-46913.
--
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 45051
[https://github.com/apache/spark/pull/45051]

> Implement timer functionality for transformWithState operator
> -
>
> Key: SPARK-46913
> URL: https://issues.apache.org/jira/browse/SPARK-46913
> Project: Spark
>  Issue Type: Task
>  Components: Structured Streaming
>Affects Versions: 4.0.0
>Reporter: Anish Shrigondekar
>Assignee: Anish Shrigondekar
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>
> Implement timer functionality for transformWithState operator



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-47272) MapState Implementation for State V2

2024-03-12 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47272?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved SPARK-47272.
--
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 45341
[https://github.com/apache/spark/pull/45341]

> MapState Implementation for State V2
> 
>
> Key: SPARK-47272
> URL: https://issues.apache.org/jira/browse/SPARK-47272
> Project: Spark
>  Issue Type: Task
>  Components: Structured Streaming
>Affects Versions: 4.0.0
>Reporter: Jing Zhan
>Assignee: Jing Zhan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>
> This task adds changes for MapState implementation in State Api v2. This 
> implementation adds a new encoder/decoder to encode grouping key and user key 
> into a composite key to be put into RocksDB so that we could retrieve 
> key-value pair by user specified user key by one rocksdb get.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-46962) Implement python worker to run python streaming data source

2024-03-11 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46962?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim reassigned SPARK-46962:


Assignee: Chaoqin Li

> Implement python worker to run python streaming data source
> ---
>
> Key: SPARK-46962
> URL: https://issues.apache.org/jira/browse/SPARK-46962
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 4.0.0
>Reporter: Chaoqin Li
>Assignee: Chaoqin Li
>Priority: Major
>  Labels: pull-request-available
>
> Implement python worker to run python streaming data source and communicate 
> with JVM through socket. Create a PythonMicrobatchStream to invoke RPC 
> function call



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-46962) Implement python worker to run python streaming data source

2024-03-11 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46962?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved SPARK-46962.
--
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 45023
[https://github.com/apache/spark/pull/45023]

> Implement python worker to run python streaming data source
> ---
>
> Key: SPARK-46962
> URL: https://issues.apache.org/jira/browse/SPARK-46962
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 4.0.0
>Reporter: Chaoqin Li
>Assignee: Chaoqin Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>
> Implement python worker to run python streaming data source and communicate 
> with JVM through socket. Create a PythonMicrobatchStream to invoke RPC 
> function call



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-47331) Serialization using case classes/primitives/POJO based on SQL encoder for Arbitrary State API v2.

2024-03-10 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved SPARK-47331.
--
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 45447
[https://github.com/apache/spark/pull/45447]

> Serialization using case classes/primitives/POJO based on SQL encoder for 
> Arbitrary State API v2. 
> --
>
> Key: SPARK-47331
> URL: https://issues.apache.org/jira/browse/SPARK-47331
> Project: Spark
>  Issue Type: Task
>  Components: Structured Streaming
>Affects Versions: 4.0.0
>Reporter: Jing Zhan
>Assignee: Jing Zhan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>
> In the new operator for arbitrary state-v2, we cannot rely on the 
> session/encoder being available since the initialization for the various 
> state instances happens on the executors. Also, we can only support limited 
> state types with the available encoders. Hence, for the state serialization, 
> we propose to serialize primitives/case classes/POJO with SQL encoder. 
> Leveraging SQL encoder can speed up the serialization.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-47331) Serialization using case classes/primitives/POJO based on SQL encoder for Arbitrary State API v2.

2024-03-10 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim reassigned SPARK-47331:


Assignee: Jing Zhan

> Serialization using case classes/primitives/POJO based on SQL encoder for 
> Arbitrary State API v2. 
> --
>
> Key: SPARK-47331
> URL: https://issues.apache.org/jira/browse/SPARK-47331
> Project: Spark
>  Issue Type: Task
>  Components: Structured Streaming
>Affects Versions: 4.0.0
>Reporter: Jing Zhan
>Assignee: Jing Zhan
>Priority: Major
>  Labels: pull-request-available
>
> In the new operator for arbitrary state-v2, we cannot rely on the 
> session/encoder being available since the initialization for the various 
> state instances happens on the executors. Also, we can only support limited 
> state types with the available encoders. Hence, for the state serialization, 
> we propose to serialize primitives/case classes/POJO with SQL encoder. 
> Leveraging SQL encoder can speed up the serialization.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-47305) PruneFilters incorrectly tags isStreaming flag when replacing child of Filter with LocalRelation

2024-03-06 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved SPARK-47305.
--
Fix Version/s: 3.4.3
   3.5.2
   4.0.0
   Resolution: Fixed

Issue resolved by pull request 45406
[https://github.com/apache/spark/pull/45406]

> PruneFilters incorrectly tags isStreaming flag when replacing child of Filter 
> with LocalRelation
> 
>
> Key: SPARK-47305
> URL: https://issues.apache.org/jira/browse/SPARK-47305
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 3.4.2, 3.4.0, 3.4.1, 3.5.0, 4.0.0, 3.5.1
>Reporter: Jungtaek Lim
>Assignee: Jungtaek Lim
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 3.4.3, 3.5.2, 4.0.0
>
>
> This seems to be a very old bug in optimizer. Related ticket:  
> https://issues.apache.org/jira/browse/SPARK-21765
> When filter is evaluated to be always false, PruneFilters replaces the filter 
> with empty LocalRelation, which effectively prunes filter. The logic cares 
> about migration of the isStreaming flag, but incorrectly migrated in some 
> case, via picking up the value of isStreaming flag from root node rather than 
> filter (or child).
> isStreaming flag is true if the value of isStreaming flag from any of 
> children is true. Flipping the coin, some children might have isStreaming 
> flag as "false". If the filter being pruned is a descendant to such children 
> (in other word, ancestor of streaming node), LocalRelation is incorrectly 
> tagged as streaming where it should be batch.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-47305) PruneFilters incorrectly tags isStreaming flag when replacing child of Filter with LocalRelation

2024-03-06 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim reassigned SPARK-47305:


Assignee: Jungtaek Lim

> PruneFilters incorrectly tags isStreaming flag when replacing child of Filter 
> with LocalRelation
> 
>
> Key: SPARK-47305
> URL: https://issues.apache.org/jira/browse/SPARK-47305
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 3.4.2, 3.4.0, 3.4.1, 3.5.0, 4.0.0, 3.5.1
>Reporter: Jungtaek Lim
>Assignee: Jungtaek Lim
>Priority: Critical
>  Labels: pull-request-available
>
> This seems to be a very old bug in optimizer. Related ticket:  
> https://issues.apache.org/jira/browse/SPARK-21765
> When filter is evaluated to be always false, PruneFilters replaces the filter 
> with empty LocalRelation, which effectively prunes filter. The logic cares 
> about migration of the isStreaming flag, but incorrectly migrated in some 
> case, via picking up the value of isStreaming flag from root node rather than 
> filter (or child).
> isStreaming flag is true if the value of isStreaming flag from any of 
> children is true. Flipping the coin, some children might have isStreaming 
> flag as "false". If the filter being pruned is a descendant to such children 
> (in other word, ancestor of streaming node), LocalRelation is incorrectly 
> tagged as streaming where it should be batch.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-47305) PruneFilters incorrectly tags isStreaming flag when replacing child of Filter with LocalRelation

2024-03-06 Thread Jungtaek Lim (Jira)
Jungtaek Lim created SPARK-47305:


 Summary: PruneFilters incorrectly tags isStreaming flag when 
replacing child of Filter with LocalRelation
 Key: SPARK-47305
 URL: https://issues.apache.org/jira/browse/SPARK-47305
 Project: Spark
  Issue Type: Bug
  Components: Structured Streaming
Affects Versions: 3.5.1, 3.5.0, 3.4.1, 3.4.0, 3.4.2, 4.0.0
Reporter: Jungtaek Lim


This seems to be a very old bug in optimizer. Related ticket:  
https://issues.apache.org/jira/browse/SPARK-21765

When filter is evaluated to be always false, PruneFilters replaces the filter 
with empty LocalRelation, which effectively prunes filter. The logic cares 
about migration of the isStreaming flag, but incorrectly migrated in some case, 
via picking up the value of isStreaming flag from root node rather than filter 
(or child).

isStreaming flag is true if the value of isStreaming flag from any of children 
is true. Flipping the coin, some children might have isStreaming flag as 
"false". If the filter being pruned is a descendant to such children (in other 
word, ancestor of streaming node), LocalRelation is incorrectly tagged as 
streaming where it should be batch.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-47200) Handle and classify errors from ForEachBatchSink user function

2024-02-29 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47200?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved SPARK-47200.
--
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 45299
[https://github.com/apache/spark/pull/45299]

> Handle and classify errors from ForEachBatchSink user function
> --
>
> Key: SPARK-47200
> URL: https://issues.apache.org/jira/browse/SPARK-47200
> Project: Spark
>  Issue Type: Task
>  Components: Structured Streaming
>Affects Versions: 4.0.0
>Reporter: B. Micheal Okutubo
>Assignee: B. Micheal Okutubo
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>
> Any exception can be thrown from the user provided function for 
> ForEachBatchSink. We want to classify this class of errors. Including errors 
> from Python and Scala functions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-47135) Implement error classes for Kafka data loss exceptions

2024-02-28 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47135?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved SPARK-47135.
--
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 45221
[https://github.com/apache/spark/pull/45221]

> Implement error classes for Kafka data loss exceptions 
> ---
>
> Key: SPARK-47135
> URL: https://issues.apache.org/jira/browse/SPARK-47135
> Project: Spark
>  Issue Type: Task
>  Components: Structured Streaming
>Affects Versions: 4.0.0
>Reporter: B. Micheal Okutubo
>Assignee: B. Micheal Okutubo
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>
> In the kafka connector code, we have several code that throws the java 
> *IllegalStateException* to report data loss, while reading from Kafka. We 
> want to properly classify those exceptions using the new error framework. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-47135) Implement error classes for Kafka data loss exceptions

2024-02-28 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47135?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim reassigned SPARK-47135:


Assignee: B. Micheal Okutubo

> Implement error classes for Kafka data loss exceptions 
> ---
>
> Key: SPARK-47135
> URL: https://issues.apache.org/jira/browse/SPARK-47135
> Project: Spark
>  Issue Type: Task
>  Components: Structured Streaming
>Affects Versions: 4.0.0
>Reporter: B. Micheal Okutubo
>Assignee: B. Micheal Okutubo
>Priority: Major
>  Labels: pull-request-available
>
> In the kafka connector code, we have several code that throws the java 
> *IllegalStateException* to report data loss, while reading from Kafka. We 
> want to properly classify those exceptions using the new error framework. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-45599) Percentile can produce a wrong answer if -0.0 and 0.0 are mixed in the dataset

2024-02-26 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim updated SPARK-45599:
-
Fix Version/s: 3.5.2
   (was: 3.5.1)

> Percentile can produce a wrong answer if -0.0 and 0.0 are mixed in the dataset
> --
>
> Key: SPARK-45599
> URL: https://issues.apache.org/jira/browse/SPARK-45599
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.4.1, 1.6.3, 3.3.0, 3.2.3, 3.5.0
>Reporter: Robert Joseph Evans
>Assignee: Nicholas Chammas
>Priority: Critical
>  Labels: correctness, pull-request-available
> Fix For: 4.0.0, 3.5.2
>
>
> I think this actually impacts all versions that have ever supported 
> percentile and it may impact other things because the bug is in OpenHashMap.
>  
> I am really surprised that we caught this bug because everything has to hit 
> just wrong to make it happen. in python/pyspark if you run
>  
> {code:python}
> from math import *
> from pyspark.sql.types import *
> data = [(1.779652973678931e+173,), (9.247723870123388e-295,), 
> (5.891823952773268e+98,), (inf,), (1.9042708096454302e+195,), 
> (-3.085825028509117e+74,), (-1.9569489404314425e+128,), 
> (2.0738138203216883e+201,), (inf,), (2.5212410617263588e-282,), 
> (-2.646144697462316e-35,), (-3.468683249247593e-196,), (nan,), (None,), 
> (nan,), (1.822129180806602e-245,), (5.211702553315461e-259,), (-1.0,), 
> (-5.682293414619055e+46,), (-4.585039307326895e+166,), 
> (-5.936844510098297e-82,), (-5234708055733.116,), (4920675036.053339,), 
> (None,), (4.4501477170144023e-308,), (2.176024662699802e-210,), 
> (-5.046677974902737e+132,), (-5.490780063080251e-09,), 
> (1.703824427218836e-55,), (-1.1961155424160076e+102,), 
> (1.4403274475565667e+41,), (None,), (5.4470705929955455e-86,), 
> (5.120795466142678e-215,), (-9.01991342808203e+282,), 
> (4.051866849943636e-254,), (-3588518231990.927,), (-1.8891559842111865e+63,), 
> (3.4543959813437507e-304,), (-7.590734560275502e-63,), 
> (9.376528689861087e+117,), (-2.1696969883753554e-292,), 
> (7.227411393136537e+206,), (-2.428999624265911e-293,), 
> (5.741383583382542e-14,), (-1.4882040107841963e+286,), 
> (2.1973064836362255e-159,), (0.028096279323357867,), 
> (8.475809563703283e-64,), (3.002803065141241e-139,), 
> (-1.1041009815645263e+203,), (1.8461539468514548e-225,), 
> (-5.620339412794757e-251,), (3.5103766991437114e-60,), 
> (2.4925669515657655e+165,), (3.217759099462207e+108,), 
> (-8.796717685143486e+203,), (2.037360925124577e+292,), 
> (-6.542279108216022e+206,), (-7.951172614280046e-74,), 
> (6.226527569272003e+152,), (-5.673977270111637e-84,), 
> (-1.0186016078084965e-281,), (1.7976931348623157e+308,), 
> (4.205809391029644e+137,), (-9.871721037428167e+119,), (None,), 
> (-1.6663254121185628e-256,), (1.0075153091760986e-236,), (-0.0,), (0.0,), 
> (1.7976931348623157e+308,), (4.3214483342777574e-117,), 
> (-7.973642629411105e-89,), (-1.1028137694801181e-297,), 
> (2.9000325280299273e-39,), (-1.077534929323113e-264,), 
> (-1.1847952892216515e+137,), (nan,), (7.849390806334983e+226,), 
> (-1.831402251805194e+65,), (-2.664533698035492e+203,), 
> (-2.2385155698231885e+285,), (-2.3016388448634844e-155,), 
> (-9.607772864590422e+217,), (3.437191836077251e+209,), 
> (1.9846569552093057e-137,), (-3.010452936419635e-233,), 
> (1.4309793775440402e-87,), (-2.9383643865423363e-103,), 
> (-4.696878567317712e-162,), (8.391630779050713e-135,), (nan,), 
> (-3.3885098786542755e-128,), (-4.5154178008513483e-122,), (nan,), (nan,), 
> (2.187766760184779e+306,), (7.679268835670585e+223,), 
> (6.3131466321042515e+153,), (1.779652973678931e+173,), 
> (9.247723870123388e-295,), (5.891823952773268e+98,), (inf,), 
> (1.9042708096454302e+195,), (-3.085825028509117e+74,), 
> (-1.9569489404314425e+128,), (2.0738138203216883e+201,), (inf,), 
> (2.5212410617263588e-282,), (-2.646144697462316e-35,), 
> (-3.468683249247593e-196,), (nan,), (None,), (nan,), 
> (1.822129180806602e-245,), (5.211702553315461e-259,), (-1.0,), 
> (-5.682293414619055e+46,), (-4.585039307326895e+166,), 
> (-5.936844510098297e-82,), (-5234708055733.116,), (4920675036.053339,), 
> (None,), (4.4501477170144023e-308,), (2.176024662699802e-210,), 
> (-5.046677974902737e+132,), (-5.490780063080251e-09,), 
> (1.703824427218836e-55,), (-1.1961155424160076e+102,), 
> (1.4403274475565667e+41,), (None,), (5.4470705929955455e-86,), 
> (5.120795466142678e-215,), (-9.01991342808203e+282,), 
> (4.051866849943636e-254,), (-3588518231990.927,), (-1.8891559842111865e+63,), 
> (3.4543959813437507e-304,), (-7.590734560275502e-63,), 
> (9.376528689861087e+117,), (-2.1696969883753554e-292,), 
> (7.227411393136537e+206,), (-2.428999624265911e-293,), 
> (5.741383583382542e-14,), (-1.4882040107841963e+286,), 
> 

[jira] [Updated] (SPARK-47023) Upgrade `aircompressor` to 0.26

2024-02-23 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim updated SPARK-47023:
-
Description: 
`aircompressor` is a transitive dependency from Apache ORC and Parquet.

`aircompressor` v0.26 reported the following bug fixes recently.
 - [Fix out of bounds read/write in Snappy 
decompressor]([https://github.com/airlift/aircompressor/commit/b89db180bb97debe025b640dc40ed43816e8c7d2])
 - [Fix ZstdOutputStream corruption on double 
close]([https://github.com/airlift/aircompressor/commit/b89db180bb97debe025b640dc40ed43816e8c7d2])

  was:
`aircompressor` is a transitive dependency from Apache ORC and Parquet.

`aircompressor` v1.26 reported the following bug fixes recently.
 
- [Fix out of bounds read/write in Snappy 
decompressor](https://github.com/airlift/aircompressor/commit/b89db180bb97debe025b640dc40ed43816e8c7d2)
- [Fix ZstdOutputStream corruption on double 
close](https://github.com/airlift/aircompressor/commit/b89db180bb97debe025b640dc40ed43816e8c7d2)


> Upgrade `aircompressor` to 0.26
> ---
>
> Key: SPARK-47023
> URL: https://issues.apache.org/jira/browse/SPARK-47023
> Project: Spark
>  Issue Type: Sub-task
>  Components: Build
>Affects Versions: 3.5.0, 4.0.0
>Reporter: Dongjoon Hyun
>Assignee: Dongjoon Hyun
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0, 3.5.1
>
>
> `aircompressor` is a transitive dependency from Apache ORC and Parquet.
> `aircompressor` v0.26 reported the following bug fixes recently.
>  - [Fix out of bounds read/write in Snappy 
> decompressor]([https://github.com/airlift/aircompressor/commit/b89db180bb97debe025b640dc40ed43816e8c7d2])
>  - [Fix ZstdOutputStream corruption on double 
> close]([https://github.com/airlift/aircompressor/commit/b89db180bb97debe025b640dc40ed43816e8c7d2])



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-47023) Upgrade `aircompressor` to 0.26

2024-02-23 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim updated SPARK-47023:
-
Summary: Upgrade `aircompressor` to 0.26  (was: Upgrade `aircompressor` to 
1.26)

> Upgrade `aircompressor` to 0.26
> ---
>
> Key: SPARK-47023
> URL: https://issues.apache.org/jira/browse/SPARK-47023
> Project: Spark
>  Issue Type: Sub-task
>  Components: Build
>Affects Versions: 3.5.0, 4.0.0
>Reporter: Dongjoon Hyun
>Assignee: Dongjoon Hyun
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0, 3.5.1
>
>
> `aircompressor` is a transitive dependency from Apache ORC and Parquet.
> `aircompressor` v1.26 reported the following bug fixes recently.
>  
> - [Fix out of bounds read/write in Snappy 
> decompressor](https://github.com/airlift/aircompressor/commit/b89db180bb97debe025b640dc40ed43816e8c7d2)
> - [Fix ZstdOutputStream corruption on double 
> close](https://github.com/airlift/aircompressor/commit/b89db180bb97debe025b640dc40ed43816e8c7d2)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-47036) RocksDB versionID Mismatch in SST files with Compaction

2024-02-21 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim updated SPARK-47036:
-
Fix Version/s: 3.5.2

> RocksDB versionID Mismatch in SST files with Compaction
> ---
>
> Key: SPARK-47036
> URL: https://issues.apache.org/jira/browse/SPARK-47036
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 3.5.0, 4.0.0, 3.5.1, 3.5.2
>Reporter: Bhuwan Sahni
>Assignee: Bhuwan Sahni
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0, 3.5.2
>
>
> RocksDB compaction can result in version Id mismatch errors if the same 
> version is committed twice from the same executor. (Multiple commits can 
> happen due to Spark Stage/task retry).
> A particular scenario where this can happen is provided below: 
>  # Version V1 is loaded on executor A, RocksDB State Store has 195.sst, 
> 196.sst, 197.sst and 198.sst files. 
> 2. State changes are made, which result in creation of a new table file 
> 200.sst. 
> 3. State store is committed as version V2. The SST file 200.sst (as 
> 000200-8c80161a-bc23-4e3b-b175-cffe38e427c7.sst) is uploaded to DFS, and 
> previous 4 files are reused. A new metadata file is created to track the 
> exact SST files with unique IDs, and uploaded with RocksDB Manifest as part 
> of V1.zip.
> 4. Rocks DB compaction is triggered at the same time. The compaction creates 
> a new L1 file (201.sst), and deletes existing 5 SST files.
> 5. Spark Stage is retried. 
> 6. Version V1 is reloaded on the same executor. The local files are 
> inspected, and 201.sst is deleted. The 4 SST files in version V1 are 
> downloaded again to local file system. 
> 7. Any local files which are deleted (as part of version load) are also 
> removed from local → DFS file upload tracking. **However, the files already 
> deleted as a result of compaction are not removed from tracking. This is the 
> bug which resulted in the failure.**
> 8. State store is committed as version V1. However, the local mapping of SST 
> files to DFS file path still has 200.sst in its tracking, hence the SST file 
> is not re-uploaded. A new metadata file is created to track the exact SST 
> files with unique IDs, and uploaded with the new RocksDB Manifest as part of 
> V2.zip. (The V2.zip file is overwritten here atomically)
> 9. A new executor tried to load version V2. However, the SST files in (1) are 
> now incompatible with Manifest file in (6) resulting in the version Id 
> mismatch failure.
>  
> We need to ensure that any files deleted from local filesystem post 
> compaction are not tracked in uploadedDFSFiles mapping if the same version is 
> loaded again.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-47036) RocksDB versionID Mismatch in SST files with Compaction

2024-02-21 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim reassigned SPARK-47036:


Assignee: Bhuwan Sahni

> RocksDB versionID Mismatch in SST files with Compaction
> ---
>
> Key: SPARK-47036
> URL: https://issues.apache.org/jira/browse/SPARK-47036
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 3.5.0, 4.0.0, 3.5.1, 3.5.2
>Reporter: Bhuwan Sahni
>Assignee: Bhuwan Sahni
>Priority: Major
>  Labels: pull-request-available
>
> RocksDB compaction can result in version Id mismatch errors if the same 
> version is committed twice from the same executor. (Multiple commits can 
> happen due to Spark Stage/task retry).
> A particular scenario where this can happen is provided below: 
>  # Version V1 is loaded on executor A, RocksDB State Store has 195.sst, 
> 196.sst, 197.sst and 198.sst files. 
> 2. State changes are made, which result in creation of a new table file 
> 200.sst. 
> 3. State store is committed as version V2. The SST file 200.sst (as 
> 000200-8c80161a-bc23-4e3b-b175-cffe38e427c7.sst) is uploaded to DFS, and 
> previous 4 files are reused. A new metadata file is created to track the 
> exact SST files with unique IDs, and uploaded with RocksDB Manifest as part 
> of V1.zip.
> 4. Rocks DB compaction is triggered at the same time. The compaction creates 
> a new L1 file (201.sst), and deletes existing 5 SST files.
> 5. Spark Stage is retried. 
> 6. Version V1 is reloaded on the same executor. The local files are 
> inspected, and 201.sst is deleted. The 4 SST files in version V1 are 
> downloaded again to local file system. 
> 7. Any local files which are deleted (as part of version load) are also 
> removed from local → DFS file upload tracking. **However, the files already 
> deleted as a result of compaction are not removed from tracking. This is the 
> bug which resulted in the failure.**
> 8. State store is committed as version V1. However, the local mapping of SST 
> files to DFS file path still has 200.sst in its tracking, hence the SST file 
> is not re-uploaded. A new metadata file is created to track the exact SST 
> files with unique IDs, and uploaded with the new RocksDB Manifest as part of 
> V2.zip. (The V2.zip file is overwritten here atomically)
> 9. A new executor tried to load version V2. However, the SST files in (1) are 
> now incompatible with Manifest file in (6) resulting in the version Id 
> mismatch failure.
>  
> We need to ensure that any files deleted from local filesystem post 
> compaction are not tracked in uploadedDFSFiles mapping if the same version is 
> loaded again.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-47036) RocksDB versionID Mismatch in SST files with Compaction

2024-02-21 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved SPARK-47036.
--
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 45092
[https://github.com/apache/spark/pull/45092]

> RocksDB versionID Mismatch in SST files with Compaction
> ---
>
> Key: SPARK-47036
> URL: https://issues.apache.org/jira/browse/SPARK-47036
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 3.5.0, 4.0.0, 3.5.1, 3.5.2
>Reporter: Bhuwan Sahni
>Assignee: Bhuwan Sahni
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>
> RocksDB compaction can result in version Id mismatch errors if the same 
> version is committed twice from the same executor. (Multiple commits can 
> happen due to Spark Stage/task retry).
> A particular scenario where this can happen is provided below: 
>  # Version V1 is loaded on executor A, RocksDB State Store has 195.sst, 
> 196.sst, 197.sst and 198.sst files. 
> 2. State changes are made, which result in creation of a new table file 
> 200.sst. 
> 3. State store is committed as version V2. The SST file 200.sst (as 
> 000200-8c80161a-bc23-4e3b-b175-cffe38e427c7.sst) is uploaded to DFS, and 
> previous 4 files are reused. A new metadata file is created to track the 
> exact SST files with unique IDs, and uploaded with RocksDB Manifest as part 
> of V1.zip.
> 4. Rocks DB compaction is triggered at the same time. The compaction creates 
> a new L1 file (201.sst), and deletes existing 5 SST files.
> 5. Spark Stage is retried. 
> 6. Version V1 is reloaded on the same executor. The local files are 
> inspected, and 201.sst is deleted. The 4 SST files in version V1 are 
> downloaded again to local file system. 
> 7. Any local files which are deleted (as part of version load) are also 
> removed from local → DFS file upload tracking. **However, the files already 
> deleted as a result of compaction are not removed from tracking. This is the 
> bug which resulted in the failure.**
> 8. State store is committed as version V1. However, the local mapping of SST 
> files to DFS file path still has 200.sst in its tracking, hence the SST file 
> is not re-uploaded. A new metadata file is created to track the exact SST 
> files with unique IDs, and uploaded with the new RocksDB Manifest as part of 
> V2.zip. (The V2.zip file is overwritten here atomically)
> 9. A new executor tried to load version V2. However, the SST files in (1) are 
> now incompatible with Manifest file in (6) resulting in the version Id 
> mismatch failure.
>  
> We need to ensure that any files deleted from local filesystem post 
> compaction are not tracked in uploadedDFSFiles mapping if the same version is 
> loaded again.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-46928) Support ListState in Arbitrary State API v2

2024-02-20 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved SPARK-46928.
--
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 44961
[https://github.com/apache/spark/pull/44961]

> Support ListState in Arbitrary State API v2
> ---
>
> Key: SPARK-46928
> URL: https://issues.apache.org/jira/browse/SPARK-46928
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Affects Versions: 4.0.0
>Reporter: Bhuwan Sahni
>Assignee: Bhuwan Sahni
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>
> As part of Arbitrary State API v2 
> ([https://docs.google.com/document/d/1QtC5qd4WQEia9kl1Qv74WE0TiXYy3x6zeTykygwPWig),]
>  we need to support ListState. This task encounters adding support for 
> ListState in Scala. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-46928) Support ListState in Arbitrary State API v2

2024-02-20 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim reassigned SPARK-46928:


Assignee: Bhuwan Sahni

> Support ListState in Arbitrary State API v2
> ---
>
> Key: SPARK-46928
> URL: https://issues.apache.org/jira/browse/SPARK-46928
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Affects Versions: 4.0.0
>Reporter: Bhuwan Sahni
>Assignee: Bhuwan Sahni
>Priority: Major
>  Labels: pull-request-available
>
> As part of Arbitrary State API v2 
> ([https://docs.google.com/document/d/1QtC5qd4WQEia9kl1Qv74WE0TiXYy3x6zeTykygwPWig),]
>  we need to support ListState. This task encounters adding support for 
> ListState in Scala. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-47052) Separate state tracking variables from MicroBatchExecution/StreamExecution

2024-02-20 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47052?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim reassigned SPARK-47052:


Assignee: Boyang Jerry Peng

> Separate state tracking variables from MicroBatchExecution/StreamExecution
> --
>
> Key: SPARK-47052
> URL: https://issues.apache.org/jira/browse/SPARK-47052
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 4.0.0
>Reporter: Boyang Jerry Peng
>Assignee: Boyang Jerry Peng
>Priority: Major
>  Labels: pull-request-available
>
> To improve code clarity and maintainability, I propose that we move all the 
> variables that track mutable state and metrics for streaming query into a 
> separate class.  With this refactor, it would be easy to track and find all 
> the mutable state a microbatch can have.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-47052) Separate state tracking variables from MicroBatchExecution/StreamExecution

2024-02-20 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47052?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved SPARK-47052.
--
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 45109
[https://github.com/apache/spark/pull/45109]

> Separate state tracking variables from MicroBatchExecution/StreamExecution
> --
>
> Key: SPARK-47052
> URL: https://issues.apache.org/jira/browse/SPARK-47052
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 4.0.0
>Reporter: Boyang Jerry Peng
>Assignee: Boyang Jerry Peng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>
> To improve code clarity and maintainability, I propose that we move all the 
> variables that track mutable state and metrics for streaming query into a 
> separate class.  With this refactor, it would be easy to track and find all 
> the mutable state a microbatch can have.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-46906) Add a check for stateful operator change for streaming

2024-02-20 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46906?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved SPARK-46906.
--
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 44927
[https://github.com/apache/spark/pull/44927]

> Add a check for stateful operator change for streaming
> --
>
> Key: SPARK-46906
> URL: https://issues.apache.org/jira/browse/SPARK-46906
> Project: Spark
>  Issue Type: Task
>  Components: Structured Streaming
>Affects Versions: 4.0.0
>Reporter: Jing Zhan
>Assignee: Jing Zhan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>
> Currently user will get a misleading error as 
> org.apache.spark.sql.execution.streaming.state.StateSchemaNotCompatible if 
> restarting query in the same checkpoint location and changing their stateful 
> operator. We need to catches such errors and throws a new error with 
> informative message.
> After physical planning, before execution phase, we will read from state 
> metadata with the current operator id to fetch operator name of committed 
> batch with the same operator id. If operator name does not match, throws the 
> error.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-46934) Unable to create Hive View from certain Spark Dataframe StructType

2024-02-19 Thread Jungtaek Lim (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-46934?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17818634#comment-17818634
 ] 

Jungtaek Lim commented on SPARK-46934:
--

Maybe the priority also has to be updated as well - we could say it's a release 
blocker if we have a consensus this is a blocker. Looks like it doesn't.

> Unable to create Hive View from certain Spark Dataframe StructType
> --
>
> Key: SPARK-46934
> URL: https://issues.apache.org/jira/browse/SPARK-46934
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.3.0, 3.3.2, 3.3.4
> Environment: Tested in Spark 3.3.0, 3.3.2.
>Reporter: Yu-Ting LIN
>Assignee: Kent Yao
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>
> We are trying to create a Hive View using following SQL command "CREATE OR 
> REPLACE VIEW yuting AS SELECT INFO_ANN FROM table_2611810".
> Our table_2611810 has certain columns contain special characters such as "/". 
> Here is the schema of this table.
> {code:java}
> contigName              string
> start                   bigint
> end                     bigint
> names                   array
> referenceAllele         string
> alternateAlleles        array
> qual                    double
> filters                 array
> splitFromMultiAllelic    boolean
> INFO_NCAMP              int
> INFO_ODDRATIO           double
> INFO_NM                 double
> INFO_DBSNP_CAF          array
> INFO_SPANPAIR           int
> INFO_TLAMP              int
> INFO_PSTD               double
> INFO_QSTD               double
> INFO_SBF                double
> INFO_AF                 array
> INFO_QUAL               double
> INFO_SHIFT3             int
> INFO_VARBIAS            string
> INFO_HICOV              int
> INFO_PMEAN              double
> INFO_MSI                double
> INFO_VD                 int
> INFO_DP                 int
> INFO_HICNT              int
> INFO_ADJAF              double
> INFO_SVLEN              int
> INFO_RSEQ               string
> INFO_MSigDb             array
> INFO_NMD                array
> INFO_ANN                
> array,Annotation_Impact:string,Gene_Name:string,Gene_ID:string,Feature_Type:string,Feature_ID:string,Transcript_BioType:string,Rank:struct,HGVS_c:string,HGVS_p:string,cDNA_pos/cDNA_length:struct,CDS_pos/CDS_length:struct,AA_pos/AA_length:struct,Distance:int,ERRORS/WARNINGS/INFO:string>>
> INFO_BIAS               string
> INFO_MQ                 double
> INFO_HIAF               double
> INFO_END                int
> INFO_SPLITREAD          int
> INFO_GDAMP              int
> INFO_LSEQ               string
> INFO_LOF                array
> INFO_SAMPLE             string
> INFO_AMPFLAG            int
> INFO_SN                 double
> INFO_SVTYPE             string
> INFO_TYPE               string
> INFO_MSILEN             double
> INFO_DUPRATE            double
> INFO_DBSNP_COMMON       int
> INFO_REFBIAS            string
> genotypes               
> array,ALD:array,AF:array,phased:boolean,calls:array,VD:int,depth:int,RD:array>>
>  {code}
> You can see that column INFO_ANN is an array of struct and it contains column 
> which has "/" inside such as "cDNA_pos/cDNA_length", etc. 
> We believe that it is the root cause that cause the following SparkException:
> {code:java}
> scala> val schema = spark.sql("CREATE OR REPLACE VIEW yuting AS SELECT 
> INFO_ANN FROM table_2611810")
> 24/01/31 07:50:02.658 [main] WARN  o.a.spark.sql.catalyst.util.package - 
> Truncated the string representation of a plan since it was too large. This 
> behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
> org.apache.spark.SparkException: Cannot recognize hive type string: 
> array,Annotation_Impact:string,Gene_Name:string,Gene_ID:string,Feature_Type:string,Feature_ID:string,Transcript_BioType:string,Rank:struct,HGVS_c:string,HGVS_p:string,cDNA_pos/cDNA_length:struct,CDS_pos/CDS_length:struct,AA_pos/AA_length:struct,Distance:int,ERRORS/WARNINGS/INFO:string>>,
>  column: INFO_ANN
>   at 
> org.apache.spark.sql.errors.QueryExecutionErrors$.cannotRecognizeHiveTypeError(QueryExecutionErrors.scala:1455)
>   at 
> org.apache.spark.sql.hive.client.HiveClientImpl$.getSparkSQLDataType(HiveClientImpl.scala:1022)
>   at 
> org.apache.spark.sql.hive.client.HiveClientImpl$.$anonfun$verifyColumnDataType$1(HiveClientImpl.scala:1037)
>   at scala.collection.Iterator.foreach(Iterator.scala:943)
>   at scala.collection.Iterator.foreach$(Iterator.scala:943)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>   at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>   at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>   at 

[jira] [Assigned] (SPARK-47053) Docker image for release has to bump versions of some python libraries for 3.5.1

2024-02-14 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim reassigned SPARK-47053:


Assignee: Jungtaek Lim

> Docker image for release has to bump versions of some python libraries for 
> 3.5.1
> 
>
> Key: SPARK-47053
> URL: https://issues.apache.org/jira/browse/SPARK-47053
> Project: Spark
>  Issue Type: Bug
>  Components: Project Infra
>Affects Versions: 4.0.0, 3.5.1
>Reporter: Jungtaek Lim
>Assignee: Jungtaek Lim
>Priority: Major
>
> As title says. It fails on doc generation phase when I try to run release 
> script against branch-3.5 for 3.5.1 RC.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-47053) Docker image for release has to bump versions of some python libraries for 3.5.1

2024-02-14 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved SPARK-47053.
--
Fix Version/s: 3.5.1
   Resolution: Fixed

Issue resolved by pull request 45111
[https://github.com/apache/spark/pull/45111]

> Docker image for release has to bump versions of some python libraries for 
> 3.5.1
> 
>
> Key: SPARK-47053
> URL: https://issues.apache.org/jira/browse/SPARK-47053
> Project: Spark
>  Issue Type: Bug
>  Components: Project Infra
>Affects Versions: 4.0.0, 3.5.1
>Reporter: Jungtaek Lim
>Assignee: Jungtaek Lim
>Priority: Major
> Fix For: 3.5.1
>
>
> As title says. It fails on doc generation phase when I try to run release 
> script against branch-3.5 for 3.5.1 RC.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-47053) Docker image for release has to bump versions of some python libraries for 3.5.1

2024-02-14 Thread Jungtaek Lim (Jira)
Jungtaek Lim created SPARK-47053:


 Summary: Docker image for release has to bump versions of some 
python libraries for 3.5.1
 Key: SPARK-47053
 URL: https://issues.apache.org/jira/browse/SPARK-47053
 Project: Spark
  Issue Type: Bug
  Components: Project Infra
Affects Versions: 4.0.0, 3.5.1
Reporter: Jungtaek Lim


As title says. It fails on doc generation phase when I try to run release 
script against branch-3.5 for 3.5.1 RC.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-46979) Add support for defining state encoder for key/value and col family independently

2024-02-13 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46979?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim reassigned SPARK-46979:


Assignee: Anish Shrigondekar

> Add support for defining state encoder for key/value and col family 
> independently
> -
>
> Key: SPARK-46979
> URL: https://issues.apache.org/jira/browse/SPARK-46979
> Project: Spark
>  Issue Type: Task
>  Components: Structured Streaming
>Affects Versions: 4.0.0
>Reporter: Anish Shrigondekar
>Assignee: Anish Shrigondekar
>Priority: Major
>  Labels: pull-request-available
>
> Add support for defining state encoder for key/value and col family 
> independently



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-46979) Add support for defining state encoder for key/value and col family independently

2024-02-13 Thread Jungtaek Lim (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46979?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved SPARK-46979.
--
Fix Version/s: 4.0.0
   Resolution: Fixed

Issue resolved by pull request 45038
[https://github.com/apache/spark/pull/45038]

> Add support for defining state encoder for key/value and col family 
> independently
> -
>
> Key: SPARK-46979
> URL: https://issues.apache.org/jira/browse/SPARK-46979
> Project: Spark
>  Issue Type: Task
>  Components: Structured Streaming
>Affects Versions: 4.0.0
>Reporter: Anish Shrigondekar
>Assignee: Anish Shrigondekar
>Priority: Major
>  Labels: pull-request-available
> Fix For: 4.0.0
>
>
> Add support for defining state encoder for key/value and col family 
> independently



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



  1   2   3   4   5   6   7   8   9   10   >