[jira] [Created] (SPARK-35800) Improving testability of GroupState in streaming flatMapGroupsWithState

2021-06-17 Thread Tathagata Das (Jira)
Tathagata Das created SPARK-35800:
-

 Summary: Improving testability of GroupState in streaming 
flatMapGroupsWithState
 Key: SPARK-35800
 URL: https://issues.apache.org/jira/browse/SPARK-35800
 Project: Spark
  Issue Type: New Feature
  Components: Structured Streaming
Affects Versions: 3.1.2
Reporter: Tathagata Das


GroupStateImpl is the internal implementation of the GroupState interface which 
mean to be not exposed. Thus, it only has a private constructor. Such access 
control does benefit encapsulation, however, this introduces difficulties for 
unit tests and the users are calling the engine to construct such GroupState 
instances in order to test their customized state transition functions.

The solution is to introduce new interfaces that allow users to create 
instances of GroupState but also access internal values of what they have set 
(for example, has to state been updated, or removed). This would allow them to 
write unit tests of the state transition function with custom GroupState 
objects and then verifying whether the state was updated in an expected way. 




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-34962) Explicit representation of star in MergeIntoTable's Update and Insert action

2021-04-05 Thread Tathagata Das (Jira)
Tathagata Das created SPARK-34962:
-

 Summary: Explicit representation of star in MergeIntoTable's 
Update and Insert action
 Key: SPARK-34962
 URL: https://issues.apache.org/jira/browse/SPARK-34962
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.1.1
Reporter: Tathagata Das
Assignee: Tathagata Das


Currently, UpdateAction and InsertAction in the MergeIntoTable implicitly 
represent `update set *` and `insert *` with empty assignments. That means 
there is no way to differentiate between the representations of "update all 
columns" and "update no columns". For SQL MERGE queries, this inability does 
not matter because the SQL MERGE grammar that generated the MergeIntoTable plan 
does not allow "update no columns". However, other ways of generating the 
MergeIntoTable plan may not have that limitation, and may want to allow 
specifying "update no columns".  For example, in the Delta Lake project we 
provide a type-safe Scala API for Merge, where it is perfectly valid to produce 
a Merge query with an update clause but no update assignments. Currently, we 
cannot use MergeIntoTable to represent this plan, thus complicating the 
generation, and resolution of merge query from scala API. 

This should be fixed by having an explicit representation of * in the 
UpdateAction and InsertAction.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-34720) Incorrect star expansion logic MERGE INSERT * / UPDATE *

2021-03-11 Thread Tathagata Das (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34720?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das updated SPARK-34720:
--
Summary: Incorrect star expansion logic MERGE INSERT * / UPDATE *  (was: 
Incorrect star expansion logic MERGE INSERT / UPDATE *)

> Incorrect star expansion logic MERGE INSERT * / UPDATE *
> 
>
> Key: SPARK-34720
> URL: https://issues.apache.org/jira/browse/SPARK-34720
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.2, 3.1.1
>Reporter: Tathagata Das
>Priority: Major
>
> The natural expectation of INSERT * or UPDATE * in MERGE is to assign source 
> columns to target columns of the same name. However, the current logic here 
> generates the assignment by position. 
> https://github.com/apache/spark/commit/7cfd589868b8430bc79e28e4d547008b222781a5#diff-ed19f376a63eba52eea59ca71f3355d4495fad4fad4db9a3324aade0d4986a47R1214
> This can very easily lead to incorrect results without the user realizing 
> this odd behavior.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-34720) Incorrect star expansion logic MERGE INSERT / UPDATE *

2021-03-11 Thread Tathagata Das (Jira)
Tathagata Das created SPARK-34720:
-

 Summary: Incorrect star expansion logic MERGE INSERT / UPDATE *
 Key: SPARK-34720
 URL: https://issues.apache.org/jira/browse/SPARK-34720
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.1.1, 3.0.2
Reporter: Tathagata Das


The natural expectation of INSERT * or UPDATE * in MERGE is to assign source 
columns to target columns of the same name. However, the current logic here 
generates the assignment by position. 
https://github.com/apache/spark/commit/7cfd589868b8430bc79e28e4d547008b222781a5#diff-ed19f376a63eba52eea59ca71f3355d4495fad4fad4db9a3324aade0d4986a47R1214

This can very easily lead to incorrect results without the user realizing this 
odd behavior.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-32585) Support scala enumeration in ScalaReflection

2020-10-01 Thread Tathagata Das (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-32585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das resolved SPARK-32585.
---
Fix Version/s: 3.1.0
   Resolution: Done

> Support scala enumeration in ScalaReflection
> 
>
> Key: SPARK-32585
> URL: https://issues.apache.org/jira/browse/SPARK-32585
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.1.0
>Reporter: ulysses you
>Priority: Minor
> Fix For: 3.1.0
>
>
> Add code in {{ScalaReflection}} to support scala enumeration and make 
> enumeration type as string type in Spark.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-32794) Rare corner case error in micro-batch engine with some stateful queries + no-data-batches + V1 streaming sources

2020-09-11 Thread Tathagata Das (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-32794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das resolved SPARK-32794.
---
Fix Version/s: (was: 2.4.8)
   2.4.7
   Resolution: Fixed

Issue resolved by pull request 29700
[https://github.com/apache/spark/pull/29700]

> Rare corner case error in micro-batch engine with some stateful queries + 
> no-data-batches + V1 streaming sources 
> -
>
> Key: SPARK-32794
> URL: https://issues.apache.org/jira/browse/SPARK-32794
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.4, 2.4.6, 3.0.0, 3.0.1
>Reporter: Tathagata Das
>Assignee: Tathagata Das
>Priority: Major
> Fix For: 2.4.7, 3.1.0, 3.0.2
>
>
> Structured Streaming micro-batch engine has the contract with V1 data sources 
> that, after a restart, it will call `source.getBatch()` on the last batch 
> attempted before the restart. However, a very rare combination of sequences 
> violates this contract. It occurs only when 
> - The streaming query has specific types of stateful operations with 
> watermarks (e.g., aggregation in append, mapGroupsWithState with timeouts). 
> - These queries can execute a batch even without new data when the 
> previous updates the watermark and the stateful ops are such that the new 
> watermark can cause new output/cleanup. Such batches are called 
> no-data-batches.
> - The last batch before termination was an incomplete no-data-batch. Upon 
> restart, the micro-batch engine fails to call `source.getBatch` when 
> attempting to re-execute the incomplete no-data-batch.
> This occurs because no-data-batches has the same and end offsets, and when a 
> batch is executed, if the start and end offset is same then calling 
> `source.getBatch` is skipped as it is assumed the generated plan will be 
> empty. This only affects V1 data sources which rely on this invariant to 
> initialize differently when the query is being started from scratch or 
> restarted. How will a source misbehave is very source-specific. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-32794) Rare corner case error in micro-batch engine with some stateful queries + no-data-batches + V1 streaming sources

2020-09-09 Thread Tathagata Das (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-32794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das updated SPARK-32794:
--
Fix Version/s: 3.0.2
   3.1.0
   2.4.8

> Rare corner case error in micro-batch engine with some stateful queries + 
> no-data-batches + V1 streaming sources 
> -
>
> Key: SPARK-32794
> URL: https://issues.apache.org/jira/browse/SPARK-32794
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.4, 2.4.6, 3.0.0, 3.0.1
>Reporter: Tathagata Das
>Assignee: Tathagata Das
>Priority: Major
> Fix For: 2.4.8, 3.1.0, 3.0.2
>
>
> Structured Streaming micro-batch engine has the contract with V1 data sources 
> that, after a restart, it will call `source.getBatch()` on the last batch 
> attempted before the restart. However, a very rare combination of sequences 
> violates this contract. It occurs only when 
> - The streaming query has specific types of stateful operations with 
> watermarks (e.g., aggregation in append, mapGroupsWithState with timeouts). 
> - These queries can execute a batch even without new data when the 
> previous updates the watermark and the stateful ops are such that the new 
> watermark can cause new output/cleanup. Such batches are called 
> no-data-batches.
> - The last batch before termination was an incomplete no-data-batch. Upon 
> restart, the micro-batch engine fails to call `source.getBatch` when 
> attempting to re-execute the incomplete no-data-batch.
> This occurs because no-data-batches has the same and end offsets, and when a 
> batch is executed, if the start and end offset is same then calling 
> `source.getBatch` is skipped as it is assumed the generated plan will be 
> empty. This only affects V1 data sources which rely on this invariant to 
> initialize differently when the query is being started from scratch or 
> restarted. How will a source misbehave is very source-specific. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-32794) Rare corner case error in micro-batch engine with some stateful queries + no-data-batches + V1 streaming sources

2020-09-03 Thread Tathagata Das (Jira)
Tathagata Das created SPARK-32794:
-

 Summary: Rare corner case error in micro-batch engine with some 
stateful queries + no-data-batches + V1 streaming sources 
 Key: SPARK-32794
 URL: https://issues.apache.org/jira/browse/SPARK-32794
 Project: Spark
  Issue Type: Bug
  Components: Structured Streaming
Affects Versions: 3.0.0, 2.4.6, 2.3.4, 3.0.1
Reporter: Tathagata Das
Assignee: Tathagata Das


Structured Streaming micro-batch engine has the contract with V1 data sources 
that, after a restart, it will call `source.getBatch()` on the last batch 
attempted before the restart. However, a very rare combination of sequences 
violates this contract. It occurs only when 
- The streaming query has specific types of stateful operations with watermarks 
(e.g., aggregation in append, mapGroupsWithState with timeouts). 
- These queries can execute a batch even without new data when the previous 
updates the watermark and the stateful ops are such that the new watermark can 
cause new output/cleanup. Such batches are called no-data-batches.
- The last batch before termination was an incomplete no-data-batch. Upon 
restart, the micro-batch engine fails to call `source.getBatch` when attempting 
to re-execute the incomplete no-data-batch.

This occurs because no-data-batches has the same and end offsets, and when a 
batch is executed, if the start and end offset is same then calling 
`source.getBatch` is skipped as it is assumed the generated plan will be empty. 
This only affects V1 data sources which rely on this invariant to initialize 
differently when the query is being started from scratch or restarted. How will 
a source misbehave is very source-specific. 




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-32017) Make Pyspark Hadoop 3.2+ Variant available in PyPI

2020-06-18 Thread Tathagata Das (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-32017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das updated SPARK-32017:
--
Fix Version/s: (was: 3.0.1)

> Make Pyspark Hadoop 3.2+ Variant available in PyPI
> --
>
> Key: SPARK-32017
> URL: https://issues.apache.org/jira/browse/SPARK-32017
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark
>Affects Versions: 3.0.0
>Reporter: George Pongracz
>Priority: Minor
>
> The version of Pyspark 3.0.0 currently available in PyPI currently uses 
> hadoop 2.7.4.
> Could a variant (or the default) have its version of Hadoop aligned to 3.2.0 
> as per the downloadable spark binaries.
> This would enable the PyPI version to be compatible with session token 
> authorisations and assist in accessing data residing in object stores with 
> stronger encryption methods.
> If not PyPI then as a tar file in the apache download archives at the least 
> please.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-30657) Streaming limit after streaming dropDuplicates can throw error

2020-01-31 Thread Tathagata Das (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-30657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17027910#comment-17027910
 ] 

Tathagata Das commented on SPARK-30657:
---

This fix by itself (separate from the fix for SPARK-30658) may be backported. 
The solution that I did to always inject StreamingLocalLimitExec is safe from 
correctness point of view, but is a little risky from the performance point of 
view (which I tried to minimize using the optimization). With 2.4.4+, unless 
this is a serious bug that affects many users, I dont think we should backport 
this. And i dont think limit on streaming is that extensively used such that 
this is big bug (it has not been reported for 1.5 years). 

What do you think [~zsxwing]

> Streaming limit after streaming dropDuplicates can throw error
> --
>
> Key: SPARK-30657
> URL: https://issues.apache.org/jira/browse/SPARK-30657
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0, 2.3.1, 2.3.2, 2.3.3, 2.3.4, 2.4.0, 2.4.1, 2.4.2, 
> 2.4.3, 2.4.4
>Reporter: Tathagata Das
>Assignee: Tathagata Das
>Priority: Critical
> Fix For: 3.0.0
>
>
> {{LocalLimitExec}} does not consume the iterator of the child plan. So if 
> there is a limit after a stateful operator like streaming dedup in append 
> mode (e.g. {{streamingdf.dropDuplicates().limit(5}})), the state changes of 
> streaming duplicate may not be committed (most stateful ops commit state 
> changes only after the generated iterator is fully consumed). This leads to 
> the next batch failing with {{java.lang.IllegalStateException: Error reading 
> delta file .../N.delta does not exist}} as the state store delta file was 
> never generated.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-30658) Limit after on streaming dataframe before streaming agg returns wrong results

2020-01-31 Thread Tathagata Das (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-30658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17027906#comment-17027906
 ] 

Tathagata Das commented on SPARK-30658:
---

I am a little afraid to backport this because this is hacky change in the 
incremental planner which is already quite complicated to reason about

> Limit after on streaming dataframe before streaming agg returns wrong results
> -
>
> Key: SPARK-30658
> URL: https://issues.apache.org/jira/browse/SPARK-30658
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0, 2.3.1, 2.3.2, 2.3.3, 2.3.4, 2.4.0, 2.4.1, 2.4.2, 
> 2.4.3, 2.4.4
>Reporter: Tathagata Das
>Assignee: Tathagata Das
>Priority: Critical
> Fix For: 3.0.0
>
>
> Limit before a streaming aggregate (i.e. {{df.limit(5).groupBy().count()}}) 
> in complete mode was not being planned as a streaming limit. The planner rule 
> planned a logical limit with a stateful streaming limit plan only if the 
> query is in append mode. As a result, instead of allowing max 5 rows across 
> batches, the planned streaming query was allowing 5 rows in every batch thus 
> producing incorrect results.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-30658) Limit after on streaming dataframe before streaming agg returns wrong results

2020-01-31 Thread Tathagata Das (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-30658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17027907#comment-17027907
 ] 

Tathagata Das commented on SPARK-30658:
---

Fixed in this PR https://github.com/apache/spark/pull/27373

> Limit after on streaming dataframe before streaming agg returns wrong results
> -
>
> Key: SPARK-30658
> URL: https://issues.apache.org/jira/browse/SPARK-30658
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0, 2.3.1, 2.3.2, 2.3.3, 2.3.4, 2.4.0, 2.4.1, 2.4.2, 
> 2.4.3, 2.4.4
>Reporter: Tathagata Das
>Assignee: Tathagata Das
>Priority: Critical
> Fix For: 3.0.0
>
>
> Limit before a streaming aggregate (i.e. {{df.limit(5).groupBy().count()}}) 
> in complete mode was not being planned as a streaming limit. The planner rule 
> planned a logical limit with a stateful streaming limit plan only if the 
> query is in append mode. As a result, instead of allowing max 5 rows across 
> batches, the planned streaming query was allowing 5 rows in every batch thus 
> producing incorrect results.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-30657) Streaming limit after streaming dropDuplicates can throw error

2020-01-31 Thread Tathagata Das (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30657?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das resolved SPARK-30657.
---
Resolution: Fixed

> Streaming limit after streaming dropDuplicates can throw error
> --
>
> Key: SPARK-30657
> URL: https://issues.apache.org/jira/browse/SPARK-30657
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0, 2.3.1, 2.3.2, 2.3.3, 2.3.4, 2.4.0, 2.4.1, 2.4.2, 
> 2.4.3, 2.4.4
>Reporter: Tathagata Das
>Assignee: Tathagata Das
>Priority: Critical
> Fix For: 3.0.0
>
>
> {{LocalLimitExec}} does not consume the iterator of the child plan. So if 
> there is a limit after a stateful operator like streaming dedup in append 
> mode (e.g. {{streamingdf.dropDuplicates().limit(5}})), the state changes of 
> streaming duplicate may not be committed (most stateful ops commit state 
> changes only after the generated iterator is fully consumed). This leads to 
> the next batch failing with {{java.lang.IllegalStateException: Error reading 
> delta file .../N.delta does not exist}} as the state store delta file was 
> never generated.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-30658) Limit after on streaming dataframe before streaming agg returns wrong results

2020-01-31 Thread Tathagata Das (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das resolved SPARK-30658.
---
Resolution: Fixed

> Limit after on streaming dataframe before streaming agg returns wrong results
> -
>
> Key: SPARK-30658
> URL: https://issues.apache.org/jira/browse/SPARK-30658
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0, 2.3.1, 2.3.2, 2.3.3, 2.3.4, 2.4.0, 2.4.1, 2.4.2, 
> 2.4.3, 2.4.4
>Reporter: Tathagata Das
>Assignee: Tathagata Das
>Priority: Critical
> Fix For: 3.0.0
>
>
> Limit before a streaming aggregate (i.e. {{df.limit(5).groupBy().count()}}) 
> in complete mode was not being planned as a streaming limit. The planner rule 
> planned a logical limit with a stateful streaming limit plan only if the 
> query is in append mode. As a result, instead of allowing max 5 rows across 
> batches, the planned streaming query was allowing 5 rows in every batch thus 
> producing incorrect results.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-30658) Limit after on streaming dataframe before streaming agg returns wrong results

2020-01-31 Thread Tathagata Das (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das reassigned SPARK-30658:
-

Assignee: Tathagata Das

> Limit after on streaming dataframe before streaming agg returns wrong results
> -
>
> Key: SPARK-30658
> URL: https://issues.apache.org/jira/browse/SPARK-30658
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0, 2.3.1, 2.3.2, 2.3.3, 2.3.4, 2.4.0, 2.4.1, 2.4.2, 
> 2.4.3, 2.4.4
>Reporter: Tathagata Das
>Assignee: Tathagata Das
>Priority: Critical
> Fix For: 3.0.0
>
>
> Limit before a streaming aggregate (i.e. [[df.limit(5).groupBy().count()}}) 
> in complete mode was not being planned as a streaming limit. The planner rule 
> planned a logical limit with a stateful streaming limit plan only if the 
> query is in append mode. As a result, instead of allowing max 5 rows across 
> batches, the planned streaming query was allowing 5 rows in every batch thus 
> producing incorrect results.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-30658) Limit after on streaming dataframe before streaming agg returns wrong results

2020-01-31 Thread Tathagata Das (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das updated SPARK-30658:
--
Description: Limit before a streaming aggregate (i.e. 
{{df.limit(5).groupBy().count()}}) in complete mode was not being planned as a 
streaming limit. The planner rule planned a logical limit with a stateful 
streaming limit plan only if the query is in append mode. As a result, instead 
of allowing max 5 rows across batches, the planned streaming query was allowing 
5 rows in every batch thus producing incorrect results.  (was: Limit before a 
streaming aggregate (i.e. [[df.limit(5).groupBy().count()}}) in complete mode 
was not being planned as a streaming limit. The planner rule planned a logical 
limit with a stateful streaming limit plan only if the query is in append mode. 
As a result, instead of allowing max 5 rows across batches, the planned 
streaming query was allowing 5 rows in every batch thus producing incorrect 
results.)

> Limit after on streaming dataframe before streaming agg returns wrong results
> -
>
> Key: SPARK-30658
> URL: https://issues.apache.org/jira/browse/SPARK-30658
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0, 2.3.1, 2.3.2, 2.3.3, 2.3.4, 2.4.0, 2.4.1, 2.4.2, 
> 2.4.3, 2.4.4
>Reporter: Tathagata Das
>Assignee: Tathagata Das
>Priority: Critical
> Fix For: 3.0.0
>
>
> Limit before a streaming aggregate (i.e. {{df.limit(5).groupBy().count()}}) 
> in complete mode was not being planned as a streaming limit. The planner rule 
> planned a logical limit with a stateful streaming limit plan only if the 
> query is in append mode. As a result, instead of allowing max 5 rows across 
> batches, the planned streaming query was allowing 5 rows in every batch thus 
> producing incorrect results.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-29438) Failed to get state store in stream-stream join

2020-01-30 Thread Tathagata Das (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-29438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das resolved SPARK-29438.
---
Fix Version/s: 3.0.0
   Resolution: Fixed

Issue resolved by pull request 26162
[https://github.com/apache/spark/pull/26162]

> Failed to get state store in stream-stream join
> ---
>
> Key: SPARK-29438
> URL: https://issues.apache.org/jira/browse/SPARK-29438
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.4
>Reporter: Genmao Yu
>Assignee: Jungtaek Lim
>Priority: Critical
> Fix For: 3.0.0
>
>
> Now, Spark use the `TaskPartitionId` to determine the StateStore path.
> {code:java}
> TaskPartitionId   \ 
> StateStoreVersion  --> StoreProviderId -> StateStore
> StateStoreName/  
> {code}
> In spark stages, the task partition id is determined by the number of tasks. 
> As we said the StateStore file path depends on the task partition id. So if 
> stream-stream join task partition id is changed against last batch, it will 
> get wrong StateStore data or fail with non-exist StateStore data. In some 
> corner cases, it happened. Following is a sample pseudocode:
> {code:java}
> val df3 = streamDf1.join(streamDf2)
> val df5 = streamDf3.join(batchDf4)
> val df = df3.union(df5)
> df.writeStream...start()
> {code}
> A simplified DAG like this:
> {code:java}
> DataSourceV2Scan   Scan Relation DataSourceV2Scan   DataSourceV2Scan
>  (streamDf3)|   (streamDf1)(streamDf2)
>  |  |   | |
>   Exchange(200)  Exchange(200)   Exchange(200) Exchange(200)
>  |  |   | | 
>SortSort | |
>  \  /   \ /
>   \/ \   /
> SortMergeJoinStreamingSymmetricHashJoin
>  \ /
>\ /
>  \ /
> Union
> {code}
> Stream-Steam join task Id will start from 200 to 399 as they are in the same 
> stage with `SortMergeJoin`. But when there is no new incoming data in 
> `streamDf3` in some batch, it will generate a empty LocalRelation, and then 
> the SortMergeJoin will be replaced with a BroadcastHashJoin. In this case, 
> Stream-Steam join task Id will start from 1 to 200. Finally, it will get 
> wrong StateStore path through TaskPartitionId, and failed with error reading 
> state store delta file.
> {code:java}
> LocalTableScan   Scan Relation DataSourceV2Scan   DataSourceV2Scan
>  |  |   | |
> BroadcastExchange   |  Exchange(200) Exchange(200)
>  |  |   | | 
>  |  |   | |
>   \/ \   /
>\ /\ /
>   BroadcastHashJoin StreamingSymmetricHashJoin
>  \ /
>\ /
>  \ /
> Union
> {code}
> In my job, I closed the auto BroadcastJoin feature (set 
> spark.sql.autoBroadcastJoinThreshold=-1) to walk around this bug. We should 
> make the StateStore path determinate but not depends on TaskPartitionId.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-29438) Failed to get state store in stream-stream join

2020-01-30 Thread Tathagata Das (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-29438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das reassigned SPARK-29438:
-

Assignee: Jungtaek Lim

> Failed to get state store in stream-stream join
> ---
>
> Key: SPARK-29438
> URL: https://issues.apache.org/jira/browse/SPARK-29438
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.4
>Reporter: Genmao Yu
>Assignee: Jungtaek Lim
>Priority: Critical
>
> Now, Spark use the `TaskPartitionId` to determine the StateStore path.
> {code:java}
> TaskPartitionId   \ 
> StateStoreVersion  --> StoreProviderId -> StateStore
> StateStoreName/  
> {code}
> In spark stages, the task partition id is determined by the number of tasks. 
> As we said the StateStore file path depends on the task partition id. So if 
> stream-stream join task partition id is changed against last batch, it will 
> get wrong StateStore data or fail with non-exist StateStore data. In some 
> corner cases, it happened. Following is a sample pseudocode:
> {code:java}
> val df3 = streamDf1.join(streamDf2)
> val df5 = streamDf3.join(batchDf4)
> val df = df3.union(df5)
> df.writeStream...start()
> {code}
> A simplified DAG like this:
> {code:java}
> DataSourceV2Scan   Scan Relation DataSourceV2Scan   DataSourceV2Scan
>  (streamDf3)|   (streamDf1)(streamDf2)
>  |  |   | |
>   Exchange(200)  Exchange(200)   Exchange(200) Exchange(200)
>  |  |   | | 
>SortSort | |
>  \  /   \ /
>   \/ \   /
> SortMergeJoinStreamingSymmetricHashJoin
>  \ /
>\ /
>  \ /
> Union
> {code}
> Stream-Steam join task Id will start from 200 to 399 as they are in the same 
> stage with `SortMergeJoin`. But when there is no new incoming data in 
> `streamDf3` in some batch, it will generate a empty LocalRelation, and then 
> the SortMergeJoin will be replaced with a BroadcastHashJoin. In this case, 
> Stream-Steam join task Id will start from 1 to 200. Finally, it will get 
> wrong StateStore path through TaskPartitionId, and failed with error reading 
> state store delta file.
> {code:java}
> LocalTableScan   Scan Relation DataSourceV2Scan   DataSourceV2Scan
>  |  |   | |
> BroadcastExchange   |  Exchange(200) Exchange(200)
>  |  |   | | 
>  |  |   | |
>   \/ \   /
>\ /\ /
>   BroadcastHashJoin StreamingSymmetricHashJoin
>  \ /
>\ /
>  \ /
> Union
> {code}
> In my job, I closed the auto BroadcastJoin feature (set 
> spark.sql.autoBroadcastJoinThreshold=-1) to walk around this bug. We should 
> make the StateStore path determinate but not depends on TaskPartitionId.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-30658) Limit after on streaming dataframe before streaming agg returns wrong results

2020-01-28 Thread Tathagata Das (Jira)
Tathagata Das created SPARK-30658:
-

 Summary: Limit after on streaming dataframe before streaming agg 
returns wrong results
 Key: SPARK-30658
 URL: https://issues.apache.org/jira/browse/SPARK-30658
 Project: Spark
  Issue Type: Bug
  Components: Structured Streaming
Affects Versions: 2.4.4, 2.4.3, 2.4.2, 2.4.1, 2.4.0, 2.3.4, 2.3.3, 2.3.2, 
2.3.1, 2.3.0
Reporter: Tathagata Das


Limit before a streaming aggregate (i.e. [[df.limit(5).groupBy().count()}}) in 
complete mode was not being planned as a streaming limit. The planner rule 
planned a logical limit with a stateful streaming limit plan only if the query 
is in append mode. As a result, instead of allowing max 5 rows across batches, 
the planned streaming query was allowing 5 rows in every batch thus producing 
incorrect results.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-30657) Streaming limit after streaming dropDuplicates can throw error

2020-01-28 Thread Tathagata Das (Jira)
Tathagata Das created SPARK-30657:
-

 Summary: Streaming limit after streaming dropDuplicates can throw 
error
 Key: SPARK-30657
 URL: https://issues.apache.org/jira/browse/SPARK-30657
 Project: Spark
  Issue Type: Bug
  Components: Structured Streaming
Affects Versions: 2.4.4, 2.4.3, 2.4.2, 2.4.1, 2.4.0, 2.3.4, 2.3.3, 2.3.2, 
2.3.1, 2.3.0
Reporter: Tathagata Das
Assignee: Tathagata Das


{{LocalLimitExec}} does not consume the iterator of the child plan. So if there 
is a limit after a stateful operator like streaming dedup in append mode (e.g. 
{{streamingdf.dropDuplicates().limit(5}})), the state changes of streaming 
duplicate may not be committed (most stateful ops commit state changes only 
after the generated iterator is fully consumed). This leads to the next batch 
failing with {{java.lang.IllegalStateException: Error reading delta file 
.../N.delta does not exist}} as the state store delta file was never generated.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-30609) Allow default merge command resolution to be bypassed by DSv2 sources

2020-01-22 Thread Tathagata Das (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30609?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das resolved SPARK-30609.
---
Fix Version/s: 3.0.0
   Resolution: Fixed

Issue resolved by pull request 27326
[https://github.com/apache/spark/pull/27326]

> Allow default merge command resolution to be bypassed by DSv2 sources
> -
>
> Key: SPARK-30609
> URL: https://issues.apache.org/jira/browse/SPARK-30609
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.4
>Reporter: Tathagata Das
>Assignee: Tathagata Das
>Priority: Major
> Fix For: 3.0.0
>
>
> Problem: Some DSv2 sources may want to customize the merge resolution logic. 
> For example, a table that can accept any schema 
> (TableCapability.ACCEPT_ANY_SCHEMA) may want to allow certain merge queries 
> that are blocked (that is, throws AnalysisError) by the default resolution 
> logic. So there should be a way to completely bypass the merge resolution 
> logic in the Analyzer. 
> Potential solution: Skip resolving the merge expressions if the target is a 
> DSv2 table with  ACCEPT_ANY_SCHEMA capability.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-30609) Allow default merge command resolution to be bypassed by DSv2 sources

2020-01-22 Thread Tathagata Das (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30609?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das reassigned SPARK-30609:
-

Assignee: Tathagata Das

> Allow default merge command resolution to be bypassed by DSv2 sources
> -
>
> Key: SPARK-30609
> URL: https://issues.apache.org/jira/browse/SPARK-30609
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.4
>Reporter: Tathagata Das
>Assignee: Tathagata Das
>Priority: Major
>
> Problem: Some DSv2 sources may want to customize the merge resolution logic. 
> For example, a table that can accept any schema 
> (TableCapability.ACCEPT_ANY_SCHEMA) may want to allow certain merge queries 
> that are blocked (that is, throws AnalysisError) by the default resolution 
> logic. So there should be a way to completely bypass the merge resolution 
> logic in the Analyzer. 
> Potential solution: Skip resolving the merge expressions if the target is a 
> DSv2 table with  ACCEPT_ANY_SCHEMA capability.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-30609) Allow default merge command resolution to be bypassed by DSv2 sources

2020-01-22 Thread Tathagata Das (Jira)
Tathagata Das created SPARK-30609:
-

 Summary: Allow default merge command resolution to be bypassed by 
DSv2 sources
 Key: SPARK-30609
 URL: https://issues.apache.org/jira/browse/SPARK-30609
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.4.4
Reporter: Tathagata Das


Problem: Some DSv2 sources may want to customize the merge resolution logic. 
For example, a table that can accept any schema 
(TableCapability.ACCEPT_ANY_SCHEMA) may want to allow certain merge queries 
that are blocked (that is, throws AnalysisError) by the default resolution 
logic. So there should be a way to completely bypass the merge resolution logic 
in the Analyzer. 

Potential solution: Skip resolving the merge expressions if the target is a 
DSv2 table with  ACCEPT_ANY_SCHEMA capability.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-27453) DataFrameWriter.partitionBy is Silently Dropped by DSV1

2019-04-16 Thread Tathagata Das (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das resolved SPARK-27453.
---
   Resolution: Fixed
Fix Version/s: 2.4.2
   3.0.0

Issue resolved by pull request 24365
[https://github.com/apache/spark/pull/24365]

> DataFrameWriter.partitionBy is Silently Dropped by DSV1
> ---
>
> Key: SPARK-27453
> URL: https://issues.apache.org/jira/browse/SPARK-27453
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.4.1, 1.5.2, 1.6.3, 2.0.2, 2.1.3, 2.2.3, 2.4.1
>Reporter: Michael Armbrust
>Assignee: Liwen Sun
>Priority: Critical
> Fix For: 3.0.0, 2.4.2
>
>
> This is a long standing quirk of the interaction between {{DataFrameWriter}} 
> and {{CreatableRelationProvider}} (and the other forms of the DSV1 API).  
> Users can specify columns in {{partitionBy}} and our internal data sources 
> will use this information.  Unfortunately, for external systems, this data is 
> silently dropped with no feedback given to the user.
> In the long run, I think that DataSourceV2 is a better answer. However, I 
> don't think we should wait for that API to stabilize before offering some 
> kind of solution to developers of external data sources. I also do not think 
> we should break binary compatibility of this API, but I do think that  small 
> surgical fix could alleviate the issue.
> I would propose that we could propagate partitioning information (when 
> present) along with the other configuration options passed to the data source 
> in the {{String, String}} map.
> I think its very unlikely that there are both data sources that validate 
> extra options and users who are using (no-op) partitioning with them, but out 
> of an abundance of caution we should protect the behavior change behind a 
> {{legacy}} flag that can be turned off.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26629) Error with multiple file stream in a query + restart on a batch that has no data for one file stream

2019-01-15 Thread Tathagata Das (JIRA)
Tathagata Das created SPARK-26629:
-

 Summary: Error with multiple file stream in a query + restart on a 
batch that has no data for one file stream
 Key: SPARK-26629
 URL: https://issues.apache.org/jira/browse/SPARK-26629
 Project: Spark
  Issue Type: Bug
  Components: Structured Streaming
Affects Versions: 2.4.0, 2.3.2, 2.3.1, 2.3.0, 2.4.1
Reporter: Tathagata Das
Assignee: Tathagata Das


When a streaming query has multiple file streams, and there is a batch where 
one of the file streams dont have data in that batch, then if the query has to 
restart from that, it will throw the following error.

{code}
java.lang.IllegalStateException: batch 1 doesn't exist
at 
org.apache.spark.sql.execution.streaming.HDFSMetadataLog$.verifyBatchIds(HDFSMetadataLog.scala:300)
at 
org.apache.spark.sql.execution.streaming.FileStreamSourceLog.get(FileStreamSourceLog.scala:120)
at 
org.apache.spark.sql.execution.streaming.FileStreamSource.getBatch(FileStreamSource.scala:181)
at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$populateStartOffsets$2.apply(MicroBatchExecution.scala:294)
at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$populateStartOffsets$2.apply(MicroBatchExecution.scala:291)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at 
org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$populateStartOffsets(MicroBatchExecution.scala:291)
at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:178)
at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:175)
at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:175)
at 
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:251)
at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:61)
at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:175)
at 
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:169)
at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:205)
{code}

**Reason**
Existing {{HDFSMetadata.verifyBatchIds}} throws error whenever the batchIds 
list was empty. In the context of {{FileStreamSource.getBatch}} (where verify 
is called) and FileStreamSourceLog (subclass of HDFSMetadata), this is usually 
okay because, in a streaming query with one file stream, the batchIds can never 
be empty:

A batch is planned only when the FileStreamSourceLog has seen new offset (that 
is, there are new data files).
So FileStreamSource.getBatch will be called on X to Y where X will always be > 
Y. This calls internally {{HDFSMetadata.verifyBatchIds (X+1, Y)}} with X+1-Y 
ids.
For example, {{FileStreamSource.getBatch(4, 5)}} will call {{verify(batchIds = 
Seq(5), start = 5, end = 5)}}. However, the invariant of X > Y is not true when 
there are two file stream sources, as a batch may be planned even when only one 
of the file streams has data. So one of the file stream may not have data, 
which can call {{FileStreamSource.getBatch(X, X) -> verify(batchIds = 
Seq.empty, start = X+1, end = X) -> failure}}.

Note that FileStreamSource.getBatch(X, X) gets called only when restarting a 
query in a batch where a file source did not have data. This is because, in 
normal planning of batches, MicroBatchExecution avoids calling 
{{FileStreamSource.getBatch(X, X)}} when offset X has not changed. However, 
when restarting a stream at such a batch, 
{{MicroBatchExecution.populateStartOffsets()}} calls 
{{FileStreamSource.getBatch(X, X)}} (DataSource V1 hack to initialize the 
source with last known offsets) thus 

[jira] [Created] (SPARK-26425) Add more constraint checks in file streaming source to avoid checkpoint corruption

2018-12-21 Thread Tathagata Das (JIRA)
Tathagata Das created SPARK-26425:
-

 Summary: Add more constraint checks in file streaming source to 
avoid checkpoint corruption
 Key: SPARK-26425
 URL: https://issues.apache.org/jira/browse/SPARK-26425
 Project: Spark
  Issue Type: Improvement
  Components: Structured Streaming
Affects Versions: 2.4.0
Reporter: Tathagata Das
Assignee: Tathagata Das


Two issues observed in production. 
- HDFSMetadataLog.getLatest() tries to read older versions when it is not able 
to read the latest listed version file. Not sure why this was done but this 
should not be done. If the latest listed file is not readable, then something 
is horribly wrong and we should fail rather than report an older version as 
that can completely corrupt the checkpoint directory. 
- FileStreamSource should check whether adding the a new batch to the 
FileStreamSourceLog succeeded or not (similar to how StreamExecution checks for 
the OffsetSeqLog)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-25752) Add trait to easily whitelist logical operators that produce named output from CleanupAliases

2018-10-16 Thread Tathagata Das (JIRA)
Tathagata Das created SPARK-25752:
-

 Summary: Add trait to easily whitelist logical operators that 
produce named output from CleanupAliases
 Key: SPARK-25752
 URL: https://issues.apache.org/jira/browse/SPARK-25752
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.4.0
Reporter: Tathagata Das
Assignee: Tathagata Das


The rule `CleanupAliases` cleans up aliases from logical operators that do not 
match a whitelist. This whitelist is hardcoded inside the rule which is 
cumbersome. This PR is to clean that up by making a trait `HasNamedOutput` that 
will be ignored by `CleanupAliases` and other ops that require aliases to be 
preserved in the operator should extend it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-25639) Add documentation on foreachBatch, and multiple watermark policy

2018-10-08 Thread Tathagata Das (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das reassigned SPARK-25639:
-

Assignee: Tathagata Das

> Add documentation on foreachBatch, and multiple watermark policy
> 
>
> Key: SPARK-25639
> URL: https://issues.apache.org/jira/browse/SPARK-25639
> Project: Spark
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 2.4.0
>Reporter: Tathagata Das
>Assignee: Tathagata Das
>Priority: Blocker
> Fix For: 2.4.1
>
>
> Things to add
> - Python foreach
> - Scala, Java and Python foreachBatch
> - Multiple watermark policy
> - The semantics of what changes are allowed to the streaming between restarts.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-25639) Add documentation on foreachBatch, and multiple watermark policy

2018-10-08 Thread Tathagata Das (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das resolved SPARK-25639.
---
   Resolution: Fixed
Fix Version/s: 2.4.1

Issue resolved by pull request 22627
[https://github.com/apache/spark/pull/22627]

> Add documentation on foreachBatch, and multiple watermark policy
> 
>
> Key: SPARK-25639
> URL: https://issues.apache.org/jira/browse/SPARK-25639
> Project: Spark
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 2.4.0
>Reporter: Tathagata Das
>Assignee: Tathagata Das
>Priority: Blocker
> Fix For: 2.4.1
>
>
> Things to add
> - Python foreach
> - Scala, Java and Python foreachBatch
> - Multiple watermark policy
> - The semantics of what changes are allowed to the streaming between restarts.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-25639) Add documentation on foreachBatch, and multiple watermark policy

2018-10-04 Thread Tathagata Das (JIRA)
Tathagata Das created SPARK-25639:
-

 Summary: Add documentation on foreachBatch, and multiple watermark 
policy
 Key: SPARK-25639
 URL: https://issues.apache.org/jira/browse/SPARK-25639
 Project: Spark
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 2.4.0
Reporter: Tathagata Das


Things to add
- Python foreach
- Scala, Java and Python foreachBatch
- Multiple watermark policy
- The semantics of what changes are allowed to the streaming between restarts.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-25399) Reusing execution threads from continuous processing for microbatch streaming can result in correctness issues

2018-09-11 Thread Tathagata Das (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das resolved SPARK-25399.
---
   Resolution: Fixed
Fix Version/s: 2.4.0
   3.0.0

Issue resolved by pull request 22386
[https://github.com/apache/spark/pull/22386]

> Reusing execution threads from continuous processing for microbatch streaming 
> can result in correctness issues
> --
>
> Key: SPARK-25399
> URL: https://issues.apache.org/jira/browse/SPARK-25399
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Mukul Murthy
>Assignee: Mukul Murthy
>Priority: Critical
>  Labels: correctness
> Fix For: 3.0.0, 2.4.0
>
>
> Continuous processing sets some thread local variables that, when read by a 
> thread running a microbatch stream, may result in incorrect or no previous 
> state being read and resulting in wrong answers. This was caught by a job 
> running the StreamSuite tests, and only repros occasionally when the same 
> threads are used.
> The issue is in StateStoreRDD.compute - when we compute currentVersion, we 
> read from a thread local variable which is set by continuous processing 
> threads. If this value is set, we then think we're on the wrong state version.
> I imagine very few people, if any, would run into this bug, because you'd 
> have to use continuous processing and then microbatch processing in the same 
> cluster. However, it can result in silent correctness issues, and it would be 
> very difficult for someone to tell if they were impacted by this or not.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-25399) Reusing execution threads from continuous processing for microbatch streaming can result in correctness issues

2018-09-11 Thread Tathagata Das (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das reassigned SPARK-25399:
-

Assignee: Mukul Murthy

> Reusing execution threads from continuous processing for microbatch streaming 
> can result in correctness issues
> --
>
> Key: SPARK-25399
> URL: https://issues.apache.org/jira/browse/SPARK-25399
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Mukul Murthy
>Assignee: Mukul Murthy
>Priority: Critical
>  Labels: correctness
> Fix For: 2.4.0, 3.0.0
>
>
> Continuous processing sets some thread local variables that, when read by a 
> thread running a microbatch stream, may result in incorrect or no previous 
> state being read and resulting in wrong answers. This was caught by a job 
> running the StreamSuite tests, and only repros occasionally when the same 
> threads are used.
> The issue is in StateStoreRDD.compute - when we compute currentVersion, we 
> read from a thread local variable which is set by continuous processing 
> threads. If this value is set, we then think we're on the wrong state version.
> I imagine very few people, if any, would run into this bug, because you'd 
> have to use continuous processing and then microbatch processing in the same 
> cluster. However, it can result in silent correctness issues, and it would be 
> very difficult for someone to tell if they were impacted by this or not.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-25106) A new Kafka consumer gets created for every batch

2018-08-23 Thread Tathagata Das (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16591170#comment-16591170
 ] 

Tathagata Das commented on SPARK-25106:
---

This is interesting! I dont know how this could be happening. This needs 
investigation.
cc [~zsxwing] since he is working on kafka 2.0 transaction support.

> A new Kafka consumer gets created for every batch
> -
>
> Key: SPARK-25106
> URL: https://issues.apache.org/jira/browse/SPARK-25106
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.1
>Reporter: Alexis Seigneurin
>Priority: Major
> Attachments: console.txt
>
>
> I have a fairly simple piece of code that reads from Kafka, applies some 
> transformations - including applying a UDF - and writes the result to the 
> console. Every time a batch is created, a new consumer is created (and not 
> closed), eventually leading to a "too many open files" error.
> I created a test case, with the code available here: 
> [https://github.com/aseigneurin/spark-kafka-issue]
> To reproduce:
>  # Start Kafka and create a topic called "persons"
>  # Run "Producer" to generate data
>  # Run "Consumer"
> I am attaching the log where you can see a new consumer being initialized 
> between every batch.
> Please note this issue does *not* appear with Spark 2.2.2, and it does not 
> appear either when I don't apply the UDF.
> I am suspecting - although I did go far enough to confirm - that this issue 
> is related to the improvement made in SPARK-23623.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-25204) rate source test is flaky

2018-08-23 Thread Tathagata Das (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25204?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das resolved SPARK-25204.
---
   Resolution: Fixed
Fix Version/s: 3.0.0

Issue resolved by pull request 22191
[https://github.com/apache/spark/pull/22191]

> rate source test is flaky
> -
>
> Key: SPARK-25204
> URL: https://issues.apache.org/jira/browse/SPARK-25204
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.1
>Reporter: Jose Torres
>Assignee: Jose Torres
>Priority: Minor
> Fix For: 3.0.0
>
>
> We try to restart a manually clocked rate stream in a test. This is 
> inherently race-prone, because the stream will go backwards in time (and 
> throw an assertion failure) if the clock can't be incremented before it tries 
> to schedule the first batch.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-25204) rate source test is flaky

2018-08-23 Thread Tathagata Das (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25204?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das reassigned SPARK-25204:
-

Assignee: Jose Torres

> rate source test is flaky
> -
>
> Key: SPARK-25204
> URL: https://issues.apache.org/jira/browse/SPARK-25204
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.1
>Reporter: Jose Torres
>Assignee: Jose Torres
>Priority: Minor
> Fix For: 3.0.0
>
>
> We try to restart a manually clocked rate stream in a test. This is 
> inherently race-prone, because the stream will go backwards in time (and 
> throw an assertion failure) if the clock can't be incremented before it tries 
> to schedule the first batch.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-25184) Flaky test: FlatMapGroupsWithState "streaming with processing time timeout"

2018-08-22 Thread Tathagata Das (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das resolved SPARK-25184.
---
   Resolution: Fixed
Fix Version/s: 3.0.0

Issue resolved by pull request 22182
[https://github.com/apache/spark/pull/22182]

> Flaky test: FlatMapGroupsWithState "streaming with processing time timeout"
> ---
>
> Key: SPARK-25184
> URL: https://issues.apache.org/jira/browse/SPARK-25184
> Project: Spark
>  Issue Type: Test
>  Components: Structured Streaming
>Affects Versions: 2.3.2
>Reporter: Tathagata Das
>Assignee: Tathagata Das
>Priority: Minor
> Fix For: 3.0.0
>
>
> {code}
> Assert on query failed: Check total state rows = List(1), updated state rows 
> = List(2): Array() did not equal List(1) incorrect total rows, recent 
> progresses:
> {
>   "id" : "3598002e-0120-4937-8a36-226e0af992b6",
>   "runId" : "e7efe911-72fb-48aa-ba35-775057eabe55",
>   "name" : null,
>   "timestamp" : "1970-01-01T00:00:12.000Z",
>   "batchId" : 3,
>   "numInputRows" : 0,
>   "durationMs" : {
> "getEndOffset" : 0,
> "setOffsetRange" : 0,
> "triggerExecution" : 0
>   },
>   "stateOperators" : [ ],
>   "sources" : [ {
> "description" : "MemoryStream[value#474622]",
> "startOffset" : 2,
> "endOffset" : 2,
> "numInputRows" : 0
>   } ],
>   "sink" : {
> "description" : "MemorySink"
>   }
> }
> org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:528)
>   org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1560)
>   
> org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:501)
>   
> org.apache.spark.sql.streaming.StateStoreMetricsTest$$anonfun$assertNumStateRows$1.apply(StateStoreMetricsTest.scala:55)
>   
> org.apache.spark.sql.streaming.StateStoreMetricsTest$$anonfun$assertNumStateRows$1.apply(StateStoreMetricsTest.scala:33)
>   
> org.apache.spark.sql.streaming.StreamTest$$anonfun$executeAction$1$11.apply$mcZ$sp(StreamTest.scala:657)
>   
> org.apache.spark.sql.streaming.StreamTest$class.verify$1(StreamTest.scala:428)
>   
> org.apache.spark.sql.streaming.StreamTest$class.executeAction$1(StreamTest.scala:657)
>   
> org.apache.spark.sql.streaming.StreamTest$$anonfun$liftedTree1$1$1.apply(StreamTest.scala:775)
>   
> org.apache.spark.sql.streaming.StreamTest$$anonfun$liftedTree1$1$1.apply(StreamTest.scala:762)
> == Progress ==
>
> StartStream(ProcessingTime(1000),org.apache.spark.sql.streaming.util.StreamManualClock@5a12704,Map(),null)
>AddData to MemoryStream[value#474622]: a
>AdvanceManualClock(1000)
>CheckNewAnswer: [a,1]
>AssertOnQuery(, Check total state rows = List(1), updated state 
> rows = List(1))
>AddData to MemoryStream[value#474622]: b
>AdvanceManualClock(1000)
>CheckNewAnswer: [b,1]
>AssertOnQuery(, Check total state rows = List(2), updated state 
> rows = List(1))
>AddData to MemoryStream[value#474622]: b
>AdvanceManualClock(1)
>CheckNewAnswer: [a,-1],[b,2]
>AssertOnQuery(, Check total state rows = List(1), updated state 
> rows = List(2))
>StopStream
>
> StartStream(ProcessingTime(1000),org.apache.spark.sql.streaming.util.StreamManualClock@5a12704,Map(),null)
>AddData to MemoryStream[value#474622]: c
>AdvanceManualClock(11000)
>CheckNewAnswer: [b,-1],[c,1]
> => AssertOnQuery(, Check total state rows = List(1), updated state 
> rows = List(2))
>AdvanceManualClock(12000)
>AssertOnQuery(, )
>AssertOnQuery(, name)
>CheckNewAnswer: [c,-1]
>AssertOnQuery(, Check total state rows = List(0), updated state 
> rows = List(0))
> == Stream ==
> Output Mode: Update
> Stream state: {MemoryStream[value#474622]: 3}
> Thread state: alive
> Thread stack trace: java.lang.Object.wait(Native Method)
> org.apache.spark.util.ManualClock.waitTillTime(ManualClock.scala:61)
> org.apache.spark.sql.streaming.util.StreamManualClock.waitTillTime(StreamManualClock.scala:34)
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:65)
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:166)
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:293)
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:203)
> == Sink ==
> 0: [a,1]
> 1: [b,1]
> 2: [a,-1] [b,2]
> 3: [b,-1] [c,1]
> == Plan ==
> == Parsed Logical Plan ==
> SerializeFromObject [staticinvoke(class 
> org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
> assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true, false) 
> AS 

[jira] [Assigned] (SPARK-25184) Flaky test: FlatMapGroupsWithState "streaming with processing time timeout"

2018-08-22 Thread Tathagata Das (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das reassigned SPARK-25184:
-

Assignee: Tathagata Das

> Flaky test: FlatMapGroupsWithState "streaming with processing time timeout"
> ---
>
> Key: SPARK-25184
> URL: https://issues.apache.org/jira/browse/SPARK-25184
> Project: Spark
>  Issue Type: Test
>  Components: Structured Streaming
>Affects Versions: 2.3.2
>Reporter: Tathagata Das
>Assignee: Tathagata Das
>Priority: Minor
> Fix For: 3.0.0
>
>
> {code}
> Assert on query failed: Check total state rows = List(1), updated state rows 
> = List(2): Array() did not equal List(1) incorrect total rows, recent 
> progresses:
> {
>   "id" : "3598002e-0120-4937-8a36-226e0af992b6",
>   "runId" : "e7efe911-72fb-48aa-ba35-775057eabe55",
>   "name" : null,
>   "timestamp" : "1970-01-01T00:00:12.000Z",
>   "batchId" : 3,
>   "numInputRows" : 0,
>   "durationMs" : {
> "getEndOffset" : 0,
> "setOffsetRange" : 0,
> "triggerExecution" : 0
>   },
>   "stateOperators" : [ ],
>   "sources" : [ {
> "description" : "MemoryStream[value#474622]",
> "startOffset" : 2,
> "endOffset" : 2,
> "numInputRows" : 0
>   } ],
>   "sink" : {
> "description" : "MemorySink"
>   }
> }
> org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:528)
>   org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1560)
>   
> org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:501)
>   
> org.apache.spark.sql.streaming.StateStoreMetricsTest$$anonfun$assertNumStateRows$1.apply(StateStoreMetricsTest.scala:55)
>   
> org.apache.spark.sql.streaming.StateStoreMetricsTest$$anonfun$assertNumStateRows$1.apply(StateStoreMetricsTest.scala:33)
>   
> org.apache.spark.sql.streaming.StreamTest$$anonfun$executeAction$1$11.apply$mcZ$sp(StreamTest.scala:657)
>   
> org.apache.spark.sql.streaming.StreamTest$class.verify$1(StreamTest.scala:428)
>   
> org.apache.spark.sql.streaming.StreamTest$class.executeAction$1(StreamTest.scala:657)
>   
> org.apache.spark.sql.streaming.StreamTest$$anonfun$liftedTree1$1$1.apply(StreamTest.scala:775)
>   
> org.apache.spark.sql.streaming.StreamTest$$anonfun$liftedTree1$1$1.apply(StreamTest.scala:762)
> == Progress ==
>
> StartStream(ProcessingTime(1000),org.apache.spark.sql.streaming.util.StreamManualClock@5a12704,Map(),null)
>AddData to MemoryStream[value#474622]: a
>AdvanceManualClock(1000)
>CheckNewAnswer: [a,1]
>AssertOnQuery(, Check total state rows = List(1), updated state 
> rows = List(1))
>AddData to MemoryStream[value#474622]: b
>AdvanceManualClock(1000)
>CheckNewAnswer: [b,1]
>AssertOnQuery(, Check total state rows = List(2), updated state 
> rows = List(1))
>AddData to MemoryStream[value#474622]: b
>AdvanceManualClock(1)
>CheckNewAnswer: [a,-1],[b,2]
>AssertOnQuery(, Check total state rows = List(1), updated state 
> rows = List(2))
>StopStream
>
> StartStream(ProcessingTime(1000),org.apache.spark.sql.streaming.util.StreamManualClock@5a12704,Map(),null)
>AddData to MemoryStream[value#474622]: c
>AdvanceManualClock(11000)
>CheckNewAnswer: [b,-1],[c,1]
> => AssertOnQuery(, Check total state rows = List(1), updated state 
> rows = List(2))
>AdvanceManualClock(12000)
>AssertOnQuery(, )
>AssertOnQuery(, name)
>CheckNewAnswer: [c,-1]
>AssertOnQuery(, Check total state rows = List(0), updated state 
> rows = List(0))
> == Stream ==
> Output Mode: Update
> Stream state: {MemoryStream[value#474622]: 3}
> Thread state: alive
> Thread stack trace: java.lang.Object.wait(Native Method)
> org.apache.spark.util.ManualClock.waitTillTime(ManualClock.scala:61)
> org.apache.spark.sql.streaming.util.StreamManualClock.waitTillTime(StreamManualClock.scala:34)
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:65)
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:166)
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:293)
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:203)
> == Sink ==
> 0: [a,1]
> 1: [b,1]
> 2: [a,-1] [b,2]
> 3: [b,-1] [c,1]
> == Plan ==
> == Parsed Logical Plan ==
> SerializeFromObject [staticinvoke(class 
> org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
> assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true, false) 
> AS _1#474630, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, 
> StringType, fromString, 

[jira] [Created] (SPARK-25184) Flaky test: FlatMapGroupsWithState "streaming with processing time timeout"

2018-08-21 Thread Tathagata Das (JIRA)
Tathagata Das created SPARK-25184:
-

 Summary: Flaky test: FlatMapGroupsWithState "streaming with 
processing time timeout"
 Key: SPARK-25184
 URL: https://issues.apache.org/jira/browse/SPARK-25184
 Project: Spark
  Issue Type: Test
  Components: Structured Streaming
Affects Versions: 2.3.2
Reporter: Tathagata Das


{code}
Assert on query failed: Check total state rows = List(1), updated state rows = 
List(2): Array() did not equal List(1) incorrect total rows, recent progresses:
{
  "id" : "3598002e-0120-4937-8a36-226e0af992b6",
  "runId" : "e7efe911-72fb-48aa-ba35-775057eabe55",
  "name" : null,
  "timestamp" : "1970-01-01T00:00:12.000Z",
  "batchId" : 3,
  "numInputRows" : 0,
  "durationMs" : {
"getEndOffset" : 0,
"setOffsetRange" : 0,
"triggerExecution" : 0
  },
  "stateOperators" : [ ],
  "sources" : [ {
"description" : "MemoryStream[value#474622]",
"startOffset" : 2,
"endOffset" : 2,
"numInputRows" : 0
  } ],
  "sink" : {
"description" : "MemorySink"
  }
}
org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:528)
org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1560)

org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:501)

org.apache.spark.sql.streaming.StateStoreMetricsTest$$anonfun$assertNumStateRows$1.apply(StateStoreMetricsTest.scala:55)

org.apache.spark.sql.streaming.StateStoreMetricsTest$$anonfun$assertNumStateRows$1.apply(StateStoreMetricsTest.scala:33)

org.apache.spark.sql.streaming.StreamTest$$anonfun$executeAction$1$11.apply$mcZ$sp(StreamTest.scala:657)

org.apache.spark.sql.streaming.StreamTest$class.verify$1(StreamTest.scala:428)

org.apache.spark.sql.streaming.StreamTest$class.executeAction$1(StreamTest.scala:657)

org.apache.spark.sql.streaming.StreamTest$$anonfun$liftedTree1$1$1.apply(StreamTest.scala:775)

org.apache.spark.sql.streaming.StreamTest$$anonfun$liftedTree1$1$1.apply(StreamTest.scala:762)


== Progress ==
   
StartStream(ProcessingTime(1000),org.apache.spark.sql.streaming.util.StreamManualClock@5a12704,Map(),null)
   AddData to MemoryStream[value#474622]: a
   AdvanceManualClock(1000)
   CheckNewAnswer: [a,1]
   AssertOnQuery(, Check total state rows = List(1), updated state 
rows = List(1))
   AddData to MemoryStream[value#474622]: b
   AdvanceManualClock(1000)
   CheckNewAnswer: [b,1]
   AssertOnQuery(, Check total state rows = List(2), updated state 
rows = List(1))
   AddData to MemoryStream[value#474622]: b
   AdvanceManualClock(1)
   CheckNewAnswer: [a,-1],[b,2]
   AssertOnQuery(, Check total state rows = List(1), updated state 
rows = List(2))
   StopStream
   
StartStream(ProcessingTime(1000),org.apache.spark.sql.streaming.util.StreamManualClock@5a12704,Map(),null)
   AddData to MemoryStream[value#474622]: c
   AdvanceManualClock(11000)
   CheckNewAnswer: [b,-1],[c,1]
=> AssertOnQuery(, Check total state rows = List(1), updated state 
rows = List(2))
   AdvanceManualClock(12000)
   AssertOnQuery(, )
   AssertOnQuery(, name)
   CheckNewAnswer: [c,-1]
   AssertOnQuery(, Check total state rows = List(0), updated state 
rows = List(0))

== Stream ==
Output Mode: Update
Stream state: {MemoryStream[value#474622]: 3}
Thread state: alive
Thread stack trace: java.lang.Object.wait(Native Method)
org.apache.spark.util.ManualClock.waitTillTime(ManualClock.scala:61)
org.apache.spark.sql.streaming.util.StreamManualClock.waitTillTime(StreamManualClock.scala:34)
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:65)
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:166)
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:293)
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:203)


== Sink ==
0: [a,1]
1: [b,1]
2: [a,-1] [b,2]
3: [b,-1] [c,1]


== Plan ==
== Parsed Logical Plan ==
SerializeFromObject [staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true, false) AS 
_1#474630, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, 
StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, 
true]))._2, true, false) AS _2#474631]
+- FlatMapGroupsWithState , cast(value#474625 as string).toString, 
cast(value#474622 as string).toString, [value#474625], [value#474622], 
obj#474629: scala.Tuple2, class[count[0]: bigint], Update, false, 
ProcessingTimeTimeout
   +- AppendColumns , class java.lang.String, 
[StructField(value,StringType,true)], cast(value#474622 as string).toString, 
[staticinvoke(class org.apache.spark.unsafe.types.UTF8String, 

[jira] [Commented] (SPARK-24763) Remove redundant key data from value in streaming aggregation

2018-08-21 Thread Tathagata Das (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16588234#comment-16588234
 ] 

Tathagata Das commented on SPARK-24763:
---

They will be. The merge script always puts the major version (i.e. 3.0.0)
there. Those will be redirected to 2.4.0 as well when we make the 2.4.0
release.

On Tue, Aug 21, 2018 at 3:39 PM, Jungtaek Lim (JIRA) 



> Remove redundant key data from value in streaming aggregation
> -
>
> Key: SPARK-24763
> URL: https://issues.apache.org/jira/browse/SPARK-24763
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Jungtaek Lim
>Assignee: Jungtaek Lim
>Priority: Major
> Fix For: 2.4.0, 3.0.0
>
>
> Key/Value of state in streaming aggregation is formatted as below:
>  * key: UnsafeRow containing group-by fields
>  * value: UnsafeRow containing key fields and another fields for aggregation 
> results
> which data for key is stored to both key and value.
> This is to avoid doing projection row to value while storing, and joining key 
> and value to restore origin row to boost performance, but while doing a 
> simple benchmark test, I found it not much helpful compared to "project and 
> join". (will paste test result in comment)
> So I would propose a new option: remove redundant in stateful aggregation. 
> I'm avoiding to modify default behavior of stateful aggregation, because 
> state value will not be compatible between current and option enabled.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-24441) Expose total estimated size of states in HDFSBackedStateStoreProvider

2018-08-21 Thread Tathagata Das (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24441?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das reassigned SPARK-24441:
-

Assignee: Jungtaek Lim

> Expose total estimated size of states in HDFSBackedStateStoreProvider
> -
>
> Key: SPARK-24441
> URL: https://issues.apache.org/jira/browse/SPARK-24441
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.3.0
>Reporter: Jungtaek Lim
>Assignee: Jungtaek Lim
>Priority: Major
> Fix For: 3.0.0
>
>
> While Spark exposes state metrics for single state, Spark still doesn't 
> expose overall memory usage of state (loadedMaps) in 
> HDFSBackedStateStoreProvider. 
> The rationalize of the patch is that state backed by 
> HDFSBackedStateStoreProvider will consume more memory than the number what we 
> can get from query status due to caching multiple versions of states. The 
> memory footprint to be much larger than query status reports in situations 
> where the state store is getting a lot of updates: while shallow-copying map 
> incurs additional small memory usages due to the size of map entities and 
> references, but row objects will still be shared across the versions. If 
> there're lots of updates between batches, less row objects will be shared and 
> more row objects will exist in memory consuming much memory then what we 
> expect.
> It would be better to expose it as well so that end users can determine 
> actual memory usage for state.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-24441) Expose total estimated size of states in HDFSBackedStateStoreProvider

2018-08-21 Thread Tathagata Das (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24441?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das resolved SPARK-24441.
---
   Resolution: Fixed
Fix Version/s: 3.0.0

Issue resolved by pull request 21469
[https://github.com/apache/spark/pull/21469]

> Expose total estimated size of states in HDFSBackedStateStoreProvider
> -
>
> Key: SPARK-24441
> URL: https://issues.apache.org/jira/browse/SPARK-24441
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.3.0
>Reporter: Jungtaek Lim
>Assignee: Jungtaek Lim
>Priority: Major
> Fix For: 3.0.0
>
>
> While Spark exposes state metrics for single state, Spark still doesn't 
> expose overall memory usage of state (loadedMaps) in 
> HDFSBackedStateStoreProvider. 
> The rationalize of the patch is that state backed by 
> HDFSBackedStateStoreProvider will consume more memory than the number what we 
> can get from query status due to caching multiple versions of states. The 
> memory footprint to be much larger than query status reports in situations 
> where the state store is getting a lot of updates: while shallow-copying map 
> incurs additional small memory usages due to the size of map entities and 
> references, but row objects will still be shared across the versions. If 
> there're lots of updates between batches, less row objects will be shared and 
> more row objects will exist in memory consuming much memory then what we 
> expect.
> It would be better to expose it as well so that end users can determine 
> actual memory usage for state.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-24763) Remove redundant key data from value in streaming aggregation

2018-08-21 Thread Tathagata Das (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24763?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das reassigned SPARK-24763:
-

Assignee: Jungtaek Lim  (was: Tathagata Das)

> Remove redundant key data from value in streaming aggregation
> -
>
> Key: SPARK-24763
> URL: https://issues.apache.org/jira/browse/SPARK-24763
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Jungtaek Lim
>Assignee: Jungtaek Lim
>Priority: Major
> Fix For: 2.4.0, 3.0.0
>
>
> Key/Value of state in streaming aggregation is formatted as below:
>  * key: UnsafeRow containing group-by fields
>  * value: UnsafeRow containing key fields and another fields for aggregation 
> results
> which data for key is stored to both key and value.
> This is to avoid doing projection row to value while storing, and joining key 
> and value to restore origin row to boost performance, but while doing a 
> simple benchmark test, I found it not much helpful compared to "project and 
> join". (will paste test result in comment)
> So I would propose a new option: remove redundant in stateful aggregation. 
> I'm avoiding to modify default behavior of stateful aggregation, because 
> state value will not be compatible between current and option enabled.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-24763) Remove redundant key data from value in streaming aggregation

2018-08-21 Thread Tathagata Das (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24763?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das reassigned SPARK-24763:
-

Assignee: Tathagata Das

> Remove redundant key data from value in streaming aggregation
> -
>
> Key: SPARK-24763
> URL: https://issues.apache.org/jira/browse/SPARK-24763
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Jungtaek Lim
>Assignee: Tathagata Das
>Priority: Major
> Fix For: 2.4.0, 3.0.0
>
>
> Key/Value of state in streaming aggregation is formatted as below:
>  * key: UnsafeRow containing group-by fields
>  * value: UnsafeRow containing key fields and another fields for aggregation 
> results
> which data for key is stored to both key and value.
> This is to avoid doing projection row to value while storing, and joining key 
> and value to restore origin row to boost performance, but while doing a 
> simple benchmark test, I found it not much helpful compared to "project and 
> join". (will paste test result in comment)
> So I would propose a new option: remove redundant in stateful aggregation. 
> I'm avoiding to modify default behavior of stateful aggregation, because 
> state value will not be compatible between current and option enabled.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-24763) Remove redundant key data from value in streaming aggregation

2018-08-21 Thread Tathagata Das (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24763?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das resolved SPARK-24763.
---
   Resolution: Done
Fix Version/s: 3.0.0
   2.4.0

> Remove redundant key data from value in streaming aggregation
> -
>
> Key: SPARK-24763
> URL: https://issues.apache.org/jira/browse/SPARK-24763
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Jungtaek Lim
>Assignee: Jungtaek Lim
>Priority: Major
> Fix For: 2.4.0, 3.0.0
>
>
> Key/Value of state in streaming aggregation is formatted as below:
>  * key: UnsafeRow containing group-by fields
>  * value: UnsafeRow containing key fields and another fields for aggregation 
> results
> which data for key is stored to both key and value.
> This is to avoid doing projection row to value while storing, and joining key 
> and value to restore origin row to boost performance, but while doing a 
> simple benchmark test, I found it not much helpful compared to "project and 
> join". (will paste test result in comment)
> So I would propose a new option: remove redundant in stateful aggregation. 
> I'm avoiding to modify default behavior of stateful aggregation, because 
> state value will not be compatible between current and option enabled.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-24699) Watermark / Append mode should work with Trigger.Once

2018-07-23 Thread Tathagata Das (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24699?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das resolved SPARK-24699.
---
   Resolution: Fixed
Fix Version/s: 3.0.0

Issue resolved by pull request 21746
[https://github.com/apache/spark/pull/21746]

> Watermark / Append mode should work with Trigger.Once
> -
>
> Key: SPARK-24699
> URL: https://issues.apache.org/jira/browse/SPARK-24699
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.1
>Reporter: Chris Horn
>Assignee: Tathagata Das
>Priority: Major
> Fix For: 3.0.0
>
> Attachments: watermark-once.scala, watermark-stream.scala
>
>
> I have a use case where I would like to trigger a structured streaming job 
> from an external scheduler (once every 15 minutes or so) and have it write 
> window aggregates to Kafka.
> I am able to get my code to work when running with `Trigger.ProcessingTime` 
> but when I switch to `Trigger.Once` the watermarking feature of structured 
> streams does not persist to (or is not recollected from) the checkpoint state.
> This causes the stream to never generate output because the watermark is 
> perpetually stuck at `1970-01-01T00:00:00Z`.
> I have created a failing test case in the `EventTimeWatermarkSuite`, I will 
> create a [WIP] pull request on github and link it here.
>  
> It seems that even if it generated the watermark, and given the current 
> streaming behavior, I would have to trigger the job twice to generate any 
> output.
>  
> The microbatcher only calculates the watermark off of the previous batch's 
> input and emits new aggs based off of that timestamp.
> This state is not available to a newly started `MicroBatchExecution` stream.
> Would it be an appropriate strategy to create a new checkpoint file with the 
> most up to watermark or watermark and query stats?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-24699) Watermark / Append mode should work with Trigger.Once

2018-07-23 Thread Tathagata Das (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24699?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das reassigned SPARK-24699:
-

Assignee: Tathagata Das

> Watermark / Append mode should work with Trigger.Once
> -
>
> Key: SPARK-24699
> URL: https://issues.apache.org/jira/browse/SPARK-24699
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.1
>Reporter: Chris Horn
>Assignee: Tathagata Das
>Priority: Major
> Fix For: 3.0.0
>
> Attachments: watermark-once.scala, watermark-stream.scala
>
>
> I have a use case where I would like to trigger a structured streaming job 
> from an external scheduler (once every 15 minutes or so) and have it write 
> window aggregates to Kafka.
> I am able to get my code to work when running with `Trigger.ProcessingTime` 
> but when I switch to `Trigger.Once` the watermarking feature of structured 
> streams does not persist to (or is not recollected from) the checkpoint state.
> This causes the stream to never generate output because the watermark is 
> perpetually stuck at `1970-01-01T00:00:00Z`.
> I have created a failing test case in the `EventTimeWatermarkSuite`, I will 
> create a [WIP] pull request on github and link it here.
>  
> It seems that even if it generated the watermark, and given the current 
> streaming behavior, I would have to trigger the job twice to generate any 
> output.
>  
> The microbatcher only calculates the watermark off of the previous batch's 
> input and emits new aggs based off of that timestamp.
> This state is not available to a newly started `MicroBatchExecution` stream.
> Would it be an appropriate strategy to create a new checkpoint file with the 
> most up to watermark or watermark and query stats?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-22187) Update unsaferow format for saved state such that we can set timeouts when state is null

2018-07-19 Thread Tathagata Das (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-22187?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das resolved SPARK-22187.
---
   Resolution: Fixed
Fix Version/s: 3.0.0

Issue resolved by pull request 21739
[https://github.com/apache/spark/pull/21739]

> Update unsaferow format for saved state such that we can set timeouts when 
> state is null
> 
>
> Key: SPARK-22187
> URL: https://issues.apache.org/jira/browse/SPARK-22187
> Project: Spark
>  Issue Type: Sub-task
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Tathagata Das
>Assignee: Tathagata Das
>Priority: Major
>  Labels: release-notes, releasenotes
> Fix For: 3.0.0
>
>
> Currently the group state of user-defined-type is encoded as top-level 
> columns in the unsaferows stores in state store. The timeout timestamp is 
> also saved as (when needed) as the last top-level column. Since, the 
> groupState is serialized to top level columns, you cannot save "null" as a 
> value of state (setting null in all the top-level columns is not equivalent). 
> So we dont let the user to set the timeout without initializing the state for 
> a key. Based on user experience, his leads to confusion. 
> This JIRA is to change the row format such that the state is saved as nested 
> columns. This would allow the state to be set to null, and avoid these 
> confusing corner cases.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-24717) Split out min retain version of state for memory in HDFSBackedStateStoreProvider

2018-07-19 Thread Tathagata Das (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24717?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das resolved SPARK-24717.
---
   Resolution: Fixed
Fix Version/s: 3.0.0

Issue resolved by pull request 21700
[https://github.com/apache/spark/pull/21700]

> Split out min retain version of state for memory in 
> HDFSBackedStateStoreProvider
> 
>
> Key: SPARK-24717
> URL: https://issues.apache.org/jira/browse/SPARK-24717
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Jungtaek Lim
>Assignee: Jungtaek Lim
>Priority: Major
> Fix For: 3.0.0
>
>
> HDFSBackedStateStoreProvider has only one configuration for minimum versions 
> to retain of state which applies to both memory cache and files. As default 
> version of "spark.sql.streaming.minBatchesToRetain" is set to high (100), 
> which doesn't require strictly 100x of memory, but I'm seeing 10x ~ 80x of 
> memory consumption for various workloads. In addition, in some cases, 
> requiring 2x of memory is even unacceptable, so we should split out 
> configuration for memory and let users adjust to trade-off memory usage vs 
> cache miss.
> In normal case, default value '2' would cover both cases: success and 
> restoring failure with less than or around 2x of memory usage, and '1' would 
> only cover success case but no longer require more than 1x of memory. In 
> extreme case, user can set the value to '0' to completely disable the map 
> cache to maximize executor memory.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-24717) Split out min retain version of state for memory in HDFSBackedStateStoreProvider

2018-07-19 Thread Tathagata Das (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24717?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das reassigned SPARK-24717:
-

Assignee: Jungtaek Lim

> Split out min retain version of state for memory in 
> HDFSBackedStateStoreProvider
> 
>
> Key: SPARK-24717
> URL: https://issues.apache.org/jira/browse/SPARK-24717
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Jungtaek Lim
>Assignee: Jungtaek Lim
>Priority: Major
>
> HDFSBackedStateStoreProvider has only one configuration for minimum versions 
> to retain of state which applies to both memory cache and files. As default 
> version of "spark.sql.streaming.minBatchesToRetain" is set to high (100), 
> which doesn't require strictly 100x of memory, but I'm seeing 10x ~ 80x of 
> memory consumption for various workloads. In addition, in some cases, 
> requiring 2x of memory is even unacceptable, so we should split out 
> configuration for memory and let users adjust to trade-off memory usage vs 
> cache miss.
> In normal case, default value '2' would cover both cases: success and 
> restoring failure with less than or around 2x of memory usage, and '1' would 
> only cover success case but no longer require more than 1x of memory. In 
> extreme case, user can set the value to '0' to completely disable the map 
> cache to maximize executor memory.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-24697) Fix the reported start offsets in streaming query progress

2018-07-11 Thread Tathagata Das (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das resolved SPARK-24697.
---
   Resolution: Fixed
Fix Version/s: 3.0.0

Issue resolved by pull request 21744
[https://github.com/apache/spark/pull/21744]

> Fix the reported start offsets in streaming query progress
> --
>
> Key: SPARK-24697
> URL: https://issues.apache.org/jira/browse/SPARK-24697
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.3.1
>Reporter: Arun Mahadevan
>Priority: Major
> Fix For: 3.0.0
>
>
> Streaming query reports progress during each trigger (e.g. after runBatch in 
> MicrobatchExcecution). However the reported progress has wrong offsets since 
> the offsets are committed and committedOffsets is updated to the 
> availableOffsets before the progress is reported.
> This leads to weird progress where startOffset and endOffsets are always the 
> same.
> Sample output for Kafka source below. Here 11 rows are processed in the 
> microbatch however the start and end offsets are same.
>  
> {code:java}
> {
>  "id" : "76bf5515-55be-46af-bc79-9fc92cc6d856",
>  "runId" : "b526f0f4-24bf-4ddc-b6e8-7b0cc83bdbe8",
> ...
> "sources" : [ {
>  "description" : "KafkaV2[Subscribe[topic2]]",
>  "startOffset" : {
>  "topic2" : {
>  "0" : 44
>  }
>  },
>  "endOffset" : {
>  "topic2" : {
>  "0" : 44
>  }
>  },
>  "numInputRows" : 11,
>  "inputRowsPerSecond" : 1.099670098970309,
>  "processedRowsPerSecond" : 1.8829168093118795
>  } ],
> ...
> }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-24697) Fix the reported start offsets in streaming query progress

2018-07-11 Thread Tathagata Das (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das reassigned SPARK-24697:
-

Assignee: Tathagata Das

> Fix the reported start offsets in streaming query progress
> --
>
> Key: SPARK-24697
> URL: https://issues.apache.org/jira/browse/SPARK-24697
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.3.1
>Reporter: Arun Mahadevan
>Assignee: Tathagata Das
>Priority: Major
> Fix For: 3.0.0
>
>
> Streaming query reports progress during each trigger (e.g. after runBatch in 
> MicrobatchExcecution). However the reported progress has wrong offsets since 
> the offsets are committed and committedOffsets is updated to the 
> availableOffsets before the progress is reported.
> This leads to weird progress where startOffset and endOffsets are always the 
> same.
> Sample output for Kafka source below. Here 11 rows are processed in the 
> microbatch however the start and end offsets are same.
>  
> {code:java}
> {
>  "id" : "76bf5515-55be-46af-bc79-9fc92cc6d856",
>  "runId" : "b526f0f4-24bf-4ddc-b6e8-7b0cc83bdbe8",
> ...
> "sources" : [ {
>  "description" : "KafkaV2[Subscribe[topic2]]",
>  "startOffset" : {
>  "topic2" : {
>  "0" : 44
>  }
>  },
>  "endOffset" : {
>  "topic2" : {
>  "0" : 44
>  }
>  },
>  "numInputRows" : 11,
>  "inputRowsPerSecond" : 1.099670098970309,
>  "processedRowsPerSecond" : 1.8829168093118795
>  } ],
> ...
> }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-24730) Add policy to choose max as global watermark when streaming query has multiple watermarks

2018-07-10 Thread Tathagata Das (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24730?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das resolved SPARK-24730.
---
   Resolution: Fixed
Fix Version/s: 3.0.0

Issue resolved by pull request 21701
[https://github.com/apache/spark/pull/21701]

> Add policy to choose max as global watermark when streaming query has 
> multiple watermarks
> -
>
> Key: SPARK-24730
> URL: https://issues.apache.org/jira/browse/SPARK-24730
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Affects Versions: 2.3.1
>Reporter: Tathagata Das
>Assignee: Tathagata Das
>Priority: Major
> Fix For: 3.0.0
>
>
> Currently, when a streaming query has multiple watermark, the policy is to 
> choose the min of them as the global watermark. This is safe to do as the 
> global watermark moves with the slowest stream, and is therefore is safe as 
> it does not unexpectedly drop some data as late, etc. While this is indeed 
> the safe thing to do, in some cases, you may want the watermark to advance 
> with the fastest stream, that is, take the max of multiple watermarks. This 
> JIRA is to add that configuration.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-24662) Structured Streaming should support LIMIT

2018-07-10 Thread Tathagata Das (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das resolved SPARK-24662.
---
   Resolution: Fixed
Fix Version/s: 3.0.0

Issue resolved by pull request 21662
[https://github.com/apache/spark/pull/21662]

> Structured Streaming should support LIMIT
> -
>
> Key: SPARK-24662
> URL: https://issues.apache.org/jira/browse/SPARK-24662
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Affects Versions: 2.3.1
>Reporter: Mukul Murthy
>Assignee: Mukul Murthy
>Priority: Major
> Fix For: 3.0.0
>
>
> Make structured streams support the LIMIT operator. 
> This will undo SPARK-24525 as the limit operator would be a superior solution.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-24662) Structured Streaming should support LIMIT

2018-07-10 Thread Tathagata Das (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das reassigned SPARK-24662:
-

Assignee: Mukul Murthy

> Structured Streaming should support LIMIT
> -
>
> Key: SPARK-24662
> URL: https://issues.apache.org/jira/browse/SPARK-24662
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Affects Versions: 2.3.1
>Reporter: Mukul Murthy
>Assignee: Mukul Murthy
>Priority: Major
> Fix For: 3.0.0
>
>
> Make structured streams support the LIMIT operator. 
> This will undo SPARK-24525 as the limit operator would be a superior solution.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24730) Add policy to choose max as global watermark when streaming query has multiple watermarks

2018-07-02 Thread Tathagata Das (JIRA)
Tathagata Das created SPARK-24730:
-

 Summary: Add policy to choose max as global watermark when 
streaming query has multiple watermarks
 Key: SPARK-24730
 URL: https://issues.apache.org/jira/browse/SPARK-24730
 Project: Spark
  Issue Type: New Feature
  Components: Structured Streaming
Affects Versions: 2.3.1
Reporter: Tathagata Das
Assignee: Tathagata Das


Currently, when a streaming query has multiple watermark, the policy is to 
choose the min of them as the global watermark. This is safe to do as the 
global watermark moves with the slowest stream, and is therefore is safe as it 
does not unexpectedly drop some data as late, etc. While this is indeed the 
safe thing to do, in some cases, you may want the watermark to advance with the 
fastest stream, that is, take the max of multiple watermarks. This JIRA is to 
add that configuration.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-24386) implement continuous processing coalesce(1)

2018-06-28 Thread Tathagata Das (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24386?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das resolved SPARK-24386.
---
   Resolution: Fixed
Fix Version/s: 3.0.0

Issue resolved by pull request 21560
[https://github.com/apache/spark/pull/21560]

> implement continuous processing coalesce(1)
> ---
>
> Key: SPARK-24386
> URL: https://issues.apache.org/jira/browse/SPARK-24386
> Project: Spark
>  Issue Type: Sub-task
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Jose Torres
>Assignee: Jose Torres
>Priority: Major
> Fix For: 3.0.0
>
>
> [~marmbrus] suggested this as a good implementation checkpoint. If we do the 
> shuffle reader and writer correctly, it should be easy to make a custom 
> coalesce(1) execution for continuous processing using them, without having to 
> implement the logic for shuffle writers finding out where shuffle readers are 
> located. (The coalesce(1) can just get the RpcEndpointRef directly from the 
> reader and pass it to the writers.)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-24386) implement continuous processing coalesce(1)

2018-06-28 Thread Tathagata Das (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24386?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das reassigned SPARK-24386:
-

Assignee: Jose Torres

> implement continuous processing coalesce(1)
> ---
>
> Key: SPARK-24386
> URL: https://issues.apache.org/jira/browse/SPARK-24386
> Project: Spark
>  Issue Type: Sub-task
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Jose Torres
>Assignee: Jose Torres
>Priority: Major
>
> [~marmbrus] suggested this as a good implementation checkpoint. If we do the 
> shuffle reader and writer correctly, it should be easy to make a custom 
> coalesce(1) execution for continuous processing using them, without having to 
> implement the logic for shuffle writers finding out where shuffle readers are 
> located. (The coalesce(1) can just get the RpcEndpointRef directly from the 
> reader and pass it to the writers.)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-24396) Add Structured Streaming ForeachWriter for python

2018-06-15 Thread Tathagata Das (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24396?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das resolved SPARK-24396.
---
   Resolution: Fixed
Fix Version/s: 3.0.0

Issue resolved by pull request 21477
[https://github.com/apache/spark/pull/21477]

> Add Structured Streaming ForeachWriter for python
> -
>
> Key: SPARK-24396
> URL: https://issues.apache.org/jira/browse/SPARK-24396
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Affects Versions: 2.3.0
>Reporter: Tathagata Das
>Assignee: Tathagata Das
>Priority: Major
> Fix For: 3.0.0
>
>
> Users should be able to write ForeachWriter code in python, that is, they 
> should be able to use the partitionid and the version/batchId/epochId to 
> conditionally process rows.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24565) Add API for in Structured Streaming for exposing output rows of each microbatch as a DataFrame

2018-06-14 Thread Tathagata Das (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das updated SPARK-24565:
--
Description: 
Currently, the micro-batches in the MicroBatchExecution is not exposed to the 
user through any public API. This was because we did not want to expose the 
micro-batches, so that all the APIs we expose, we can eventually support them 
in the Continuous engine. But now that we have a better sense of building a 
ContinuousExecution, I am considering adding APIs which will run only the 
MicroBatchExecution. I have quite a few use cases where exposing the 
micro-batch output as a dataframe is useful. 
- Pass the output rows of each batch to a library that is designed only the 
batch jobs (example, uses many ML libraries need to collect() while learning).
- Reuse batch data sources for output whose streaming version does not exist 
(e.g. redshift data source).
- Writer the output rows to multiple places by writing twice for each batch. 
This is not the most elegant thing to do for multiple-output streaming queries 
but is likely to be better than running two streaming queries processing the 
same data twice.

The proposal is to add a method {{foreachBatch(f: Dataset[T] => Unit)}} to 
Scala/Java/Python {{DataStreamWriter}}.


  was:
Currently, the micro-batches in the MicroBatchExecution is not exposed to the 
user through any public API. This was because we did not want to expose the 
micro-batches, so that all the APIs we expose, we can eventually support them 
in the Continuous engine. But now that we have a better sense of building a 
ContinuousExecution, I am considering adding APIs which will run only the 
MicroBatchExecution. I have quite a few use cases where exposing the 
micro-batch output as a dataframe is useful. 
- Pass the output rows of each batch to a library that is designed only the 
batch jobs (example, uses many ML libraries need to collect() while learning).
- Reuse batch data sources for output whose streaming version does not exist 
(e.g. redshift data source).
- Writer the output rows to multiple places by writing twice for each batch. 
This is not the most elegant thing to do for multiple-output streaming queries 
but is likely to be better than running two streaming queries processing the 
same data twice.

The proposal is to add a method {{foreachBatch(f: Dataset[T] => Unit)}} to 
Scala/Java/Python `DataStreamWriter`.



> Add API for in Structured Streaming for exposing output rows of each 
> microbatch as a DataFrame
> --
>
> Key: SPARK-24565
> URL: https://issues.apache.org/jira/browse/SPARK-24565
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.3.0
>Reporter: Tathagata Das
>Assignee: Tathagata Das
>Priority: Major
>
> Currently, the micro-batches in the MicroBatchExecution is not exposed to the 
> user through any public API. This was because we did not want to expose the 
> micro-batches, so that all the APIs we expose, we can eventually support them 
> in the Continuous engine. But now that we have a better sense of building a 
> ContinuousExecution, I am considering adding APIs which will run only the 
> MicroBatchExecution. I have quite a few use cases where exposing the 
> micro-batch output as a dataframe is useful. 
> - Pass the output rows of each batch to a library that is designed only the 
> batch jobs (example, uses many ML libraries need to collect() while learning).
> - Reuse batch data sources for output whose streaming version does not exist 
> (e.g. redshift data source).
> - Writer the output rows to multiple places by writing twice for each batch. 
> This is not the most elegant thing to do for multiple-output streaming 
> queries but is likely to be better than running two streaming queries 
> processing the same data twice.
> The proposal is to add a method {{foreachBatch(f: Dataset[T] => Unit)}} to 
> Scala/Java/Python {{DataStreamWriter}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24565) Add API for in Structured Streaming for exposing output rows of each microbatch as a DataFrame

2018-06-14 Thread Tathagata Das (JIRA)
Tathagata Das created SPARK-24565:
-

 Summary: Add API for in Structured Streaming for exposing output 
rows of each microbatch as a DataFrame
 Key: SPARK-24565
 URL: https://issues.apache.org/jira/browse/SPARK-24565
 Project: Spark
  Issue Type: Improvement
  Components: Structured Streaming
Affects Versions: 2.3.0
Reporter: Tathagata Das
Assignee: Tathagata Das


Currently, the micro-batches in the MicroBatchExecution is not exposed to the 
user through any public API. This was because we did not want to expose the 
micro-batches, so that all the APIs we expose, we can eventually support them 
in the Continuous engine. But now that we have a better sense of building a 
ContinuousExecution, I am considering adding APIs which will run only the 
MicroBatchExecution. I have quite a few use cases where exposing the 
micro-batch output as a dataframe is useful. 
- Pass the output rows of each batch to a library that is designed only the 
batch jobs (example, uses many ML libraries need to collect() while learning).
- Reuse batch data sources for output whose streaming version does not exist 
(e.g. redshift data source).
- Writer the output rows to multiple places by writing twice for each batch. 
This is not the most elegant thing to do for multiple-output streaming queries 
but is likely to be better than running two streaming queries processing the 
same data twice.

The proposal is to add a method {{foreachBatch(f: Dataset[T] => Unit)}} to 
Scala/Java/Python `DataStreamWriter`.




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-24453) Fix error recovering from the failure in a no-data batch

2018-06-05 Thread Tathagata Das (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das resolved SPARK-24453.
---
   Resolution: Fixed
Fix Version/s: 3.0.0

Issue resolved by pull request 21491
[https://github.com/apache/spark/pull/21491]

> Fix error recovering from the failure in a no-data batch
> 
>
> Key: SPARK-24453
> URL: https://issues.apache.org/jira/browse/SPARK-24453
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Tathagata Das
>Assignee: Tathagata Das
>Priority: Major
> Fix For: 3.0.0
>
>
> ```
> java.lang.AssertionError: assertion failed: Concurrent update to the log. 
> Multiple streaming jobs detected for 159897
> ```
> The error occurs when we are recovering from a failure in a no-data batch 
> (say X) that has been planned (i.e. written to offset log) but not executed 
> (i.e. not written to commit log). Upon recovery, the following sequence of 
> events happen.
> - `MicroBatchExecution.populateStartOffsets` sets `currentBatchId` to X. 
> Since there was no data in the batch, the `availableOffsets` is same as 
> `committedOffsets`, so `isNewDataAvailable` is false.
> - When MicroBatchExecution.constructNextBatch is called, ideally it should 
> immediately return true because the next batch has already been constructed. 
> However, the check of whether the batch has been constructed was `if 
> (isNewDataAvailable) return true`. Since the planned batch is a no-data 
> batch, it escaped this check and proceeded to plan the same batch X once 
> again. And if there is new data since the failure, it does plan a new batch, 
> and try to write new offsets to the `offsetLog` as batchId X, and fail with 
> the above error.
> The correct solution is to check the offset log whether the currentBatchId is 
> the latest or not.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24453) Fix error recovering from the failure in a no-data batch

2018-06-01 Thread Tathagata Das (JIRA)
Tathagata Das created SPARK-24453:
-

 Summary: Fix error recovering from the failure in a no-data batch
 Key: SPARK-24453
 URL: https://issues.apache.org/jira/browse/SPARK-24453
 Project: Spark
  Issue Type: Bug
  Components: Structured Streaming
Affects Versions: 2.4.0
Reporter: Tathagata Das
Assignee: Tathagata Das


```
java.lang.AssertionError: assertion failed: Concurrent update to the log. 
Multiple streaming jobs detected for 159897
```

The error occurs when we are recovering from a failure in a no-data batch (say 
X) that has been planned (i.e. written to offset log) but not executed (i.e. 
not written to commit log). Upon recovery, the following sequence of events 
happen.

- `MicroBatchExecution.populateStartOffsets` sets `currentBatchId` to X. Since 
there was no data in the batch, the `availableOffsets` is same as 
`committedOffsets`, so `isNewDataAvailable` is false.
- When MicroBatchExecution.constructNextBatch is called, ideally it should 
immediately return true because the next batch has already been constructed. 
However, the check of whether the batch has been constructed was `if 
(isNewDataAvailable) return true`. Since the planned batch is a no-data batch, 
it escaped this check and proceeded to plan the same batch X once again. And if 
there is new data since the failure, it does plan a new batch, and try to write 
new offsets to the `offsetLog` as batchId X, and fail with the above error.

The correct solution is to check the offset log whether the currentBatchId is 
the latest or not.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-24397) Add TaskContext.getLocalProperties in Python

2018-05-31 Thread Tathagata Das (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das resolved SPARK-24397.
---
   Resolution: Fixed
Fix Version/s: 3.0.0

Issue resolved by pull request 21437
[https://github.com/apache/spark/pull/21437]

> Add TaskContext.getLocalProperties in Python
> 
>
> Key: SPARK-24397
> URL: https://issues.apache.org/jira/browse/SPARK-24397
> Project: Spark
>  Issue Type: New Feature
>  Components: PySpark
>Affects Versions: 2.3.0
>Reporter: Tathagata Das
>Assignee: Tathagata Das
>Priority: Major
> Fix For: 3.0.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24396) Add Structured Streaming ForeachWriter for python

2018-05-25 Thread Tathagata Das (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16491415#comment-16491415
 ] 

Tathagata Das commented on SPARK-24396:
---

TaskContext.getLocalProperty in Python is needed for getting the 
batchId/epochId that is passed on by StreamExecution as Spark job local 
property.

> Add Structured Streaming ForeachWriter for python
> -
>
> Key: SPARK-24396
> URL: https://issues.apache.org/jira/browse/SPARK-24396
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Affects Versions: 2.3.0
>Reporter: Tathagata Das
>Assignee: Tathagata Das
>Priority: Major
>
> Users should be able to write ForeachWriter code in python, that is, they 
> should be able to use the partitionid and the version/batchId/epochId to 
> conditionally process rows.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-24396) Add Structured Streaming ForeachWriter for python

2018-05-25 Thread Tathagata Das (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16491415#comment-16491415
 ] 

Tathagata Das edited comment on SPARK-24396 at 5/26/18 12:07 AM:
-

TaskContext.getLocalProperty (SPARK-24397) in Python is needed for getting the 
batchId/epochId that is passed on by StreamExecution as Spark job local 
property.


was (Author: tdas):
TaskContext.getLocalProperty in Python is needed for getting the 
batchId/epochId that is passed on by StreamExecution as Spark job local 
property.

> Add Structured Streaming ForeachWriter for python
> -
>
> Key: SPARK-24396
> URL: https://issues.apache.org/jira/browse/SPARK-24396
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Affects Versions: 2.3.0
>Reporter: Tathagata Das
>Assignee: Tathagata Das
>Priority: Major
>
> Users should be able to write ForeachWriter code in python, that is, they 
> should be able to use the partitionid and the version/batchId/epochId to 
> conditionally process rows.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24397) Add TaskContext.getLocalProperties in Python

2018-05-25 Thread Tathagata Das (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das updated SPARK-24397:
--
Issue Type: New Feature  (was: Sub-task)
Parent: (was: SPARK-24396)

> Add TaskContext.getLocalProperties in Python
> 
>
> Key: SPARK-24397
> URL: https://issues.apache.org/jira/browse/SPARK-24397
> Project: Spark
>  Issue Type: New Feature
>  Components: PySpark
>Affects Versions: 2.3.0
>Reporter: Tathagata Das
>Assignee: Tathagata Das
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24397) Add TaskContext.getLocalProperties in Python

2018-05-25 Thread Tathagata Das (JIRA)
Tathagata Das created SPARK-24397:
-

 Summary: Add TaskContext.getLocalProperties in Python
 Key: SPARK-24397
 URL: https://issues.apache.org/jira/browse/SPARK-24397
 Project: Spark
  Issue Type: Sub-task
  Components: PySpark
Affects Versions: 2.3.0
Reporter: Tathagata Das
Assignee: Tathagata Das






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24396) Add Structured Streaming ForeachWriter for python

2018-05-25 Thread Tathagata Das (JIRA)
Tathagata Das created SPARK-24396:
-

 Summary: Add Structured Streaming ForeachWriter for python
 Key: SPARK-24396
 URL: https://issues.apache.org/jira/browse/SPARK-24396
 Project: Spark
  Issue Type: New Feature
  Components: Structured Streaming
Affects Versions: 2.3.0
Reporter: Tathagata Das
Assignee: Tathagata Das


Users should be able to write ForeachWriter code in python, that is, they 
should be able to use the partitionid and the version/batchId/epochId to 
conditionally process rows.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-23416) Flaky test: KafkaSourceStressForDontFailOnDataLossSuite.stress test for failOnDataLoss=false

2018-05-23 Thread Tathagata Das (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23416?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das resolved SPARK-23416.
---
   Resolution: Fixed
Fix Version/s: (was: 2.3.0)
   3.0.0

Issue resolved by pull request 21384
[https://github.com/apache/spark/pull/21384]

> Flaky test: KafkaSourceStressForDontFailOnDataLossSuite.stress test for 
> failOnDataLoss=false
> 
>
> Key: SPARK-23416
> URL: https://issues.apache.org/jira/browse/SPARK-23416
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Jose Torres
>Assignee: Jose Torres
>Priority: Minor
> Fix For: 3.0.0
>
>
> I suspect this is a race condition latent in the DataSourceV2 write path, or 
> at least the interaction of that write path with StreamTest.
> [https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87241/testReport/org.apache.spark.sql.kafka010/KafkaSourceStressForDontFailOnDataLossSuite/stress_test_for_failOnDataLoss_false/]
> h3. Error Message
> org.apache.spark.sql.streaming.StreamingQueryException: Query [id = 
> 16b2a2b1-acdd-44ec-902f-531169193169, runId = 
> 9567facb-e305-4554-8622-830519002edb] terminated with exception: Writing job 
> aborted.
> h3. Stacktrace
> sbt.ForkMain$ForkError: 
> org.apache.spark.sql.streaming.StreamingQueryException: Query [id = 
> 16b2a2b1-acdd-44ec-902f-531169193169, runId = 
> 9567facb-e305-4554-8622-830519002edb] terminated with exception: Writing job 
> aborted. at 
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
>  at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
>  Caused by: sbt.ForkMain$ForkError: org.apache.spark.SparkException: Writing 
> job aborted. at 
> org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2.scala:108)
>  at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>  at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>  at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>  at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>  at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at 
> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at 
> org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247) 
> at 
> org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:294) 
> at 
> org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3272)
>  at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2722) 
> at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2722) 
> at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3253) at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
>  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3252) at 
> org.apache.spark.sql.Dataset.collect(Dataset.scala:2722) at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$15.apply(MicroBatchExecution.scala:488)
>  at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
>  at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:483)
>  at 
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
>  at 
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
>  at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:482)
>  at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133)
>  at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
>  at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
>  at 
> 

[jira] [Assigned] (SPARK-23416) Flaky test: KafkaSourceStressForDontFailOnDataLossSuite.stress test for failOnDataLoss=false

2018-05-23 Thread Tathagata Das (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23416?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das reassigned SPARK-23416:
-

Assignee: Jose Torres

> Flaky test: KafkaSourceStressForDontFailOnDataLossSuite.stress test for 
> failOnDataLoss=false
> 
>
> Key: SPARK-23416
> URL: https://issues.apache.org/jira/browse/SPARK-23416
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Jose Torres
>Assignee: Jose Torres
>Priority: Minor
> Fix For: 2.3.0
>
>
> I suspect this is a race condition latent in the DataSourceV2 write path, or 
> at least the interaction of that write path with StreamTest.
> [https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87241/testReport/org.apache.spark.sql.kafka010/KafkaSourceStressForDontFailOnDataLossSuite/stress_test_for_failOnDataLoss_false/]
> h3. Error Message
> org.apache.spark.sql.streaming.StreamingQueryException: Query [id = 
> 16b2a2b1-acdd-44ec-902f-531169193169, runId = 
> 9567facb-e305-4554-8622-830519002edb] terminated with exception: Writing job 
> aborted.
> h3. Stacktrace
> sbt.ForkMain$ForkError: 
> org.apache.spark.sql.streaming.StreamingQueryException: Query [id = 
> 16b2a2b1-acdd-44ec-902f-531169193169, runId = 
> 9567facb-e305-4554-8622-830519002edb] terminated with exception: Writing job 
> aborted. at 
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
>  at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
>  Caused by: sbt.ForkMain$ForkError: org.apache.spark.SparkException: Writing 
> job aborted. at 
> org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2.scala:108)
>  at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>  at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>  at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>  at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>  at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at 
> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at 
> org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247) 
> at 
> org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:294) 
> at 
> org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3272)
>  at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2722) 
> at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2722) 
> at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3253) at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
>  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3252) at 
> org.apache.spark.sql.Dataset.collect(Dataset.scala:2722) at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$15.apply(MicroBatchExecution.scala:488)
>  at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
>  at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:483)
>  at 
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
>  at 
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
>  at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:482)
>  at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133)
>  at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
>  at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
>  at 
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
>  at 
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
>  at 
> 

[jira] [Assigned] (SPARK-24234) create the bottom-of-task RDD with row buffer

2018-05-21 Thread Tathagata Das (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das reassigned SPARK-24234:
-

Assignee: Jose Torres

> create the bottom-of-task RDD with row buffer
> -
>
> Key: SPARK-24234
> URL: https://issues.apache.org/jira/browse/SPARK-24234
> Project: Spark
>  Issue Type: Sub-task
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Jose Torres
>Assignee: Jose Torres
>Priority: Major
> Fix For: 3.0.0
>
>
> [https://docs.google.com/document/d/1IL4kJoKrZWeyIhklKUJqsW-yEN7V7aL05MmM65AYOfE/edit#heading=h.8t3ci57f7uii]
>  
> This probably ought to be an abstraction of ContinuousDataSourceRDD and 
> ContinuousQueuedDataReader. These classes do pretty much exactly what's 
> needed, except the buffer is filled from the remote data source instead of a 
> remote Spark task.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-24234) create the bottom-of-task RDD with row buffer

2018-05-21 Thread Tathagata Das (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das resolved SPARK-24234.
---
   Resolution: Fixed
Fix Version/s: 3.0.0

Issue resolved by pull request 21337
[https://github.com/apache/spark/pull/21337]

> create the bottom-of-task RDD with row buffer
> -
>
> Key: SPARK-24234
> URL: https://issues.apache.org/jira/browse/SPARK-24234
> Project: Spark
>  Issue Type: Sub-task
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Jose Torres
>Assignee: Jose Torres
>Priority: Major
> Fix For: 3.0.0
>
>
> [https://docs.google.com/document/d/1IL4kJoKrZWeyIhklKUJqsW-yEN7V7aL05MmM65AYOfE/edit#heading=h.8t3ci57f7uii]
>  
> This probably ought to be an abstraction of ContinuousDataSourceRDD and 
> ContinuousQueuedDataReader. These classes do pretty much exactly what's 
> needed, except the buffer is filled from the remote data source instead of a 
> remote Spark task.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-23503) continuous execution should sequence committed epochs

2018-05-18 Thread Tathagata Das (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das resolved SPARK-23503.
---
   Resolution: Fixed
Fix Version/s: 3.0.0

Issue resolved by pull request 20936
[https://github.com/apache/spark/pull/20936]

> continuous execution should sequence committed epochs
> -
>
> Key: SPARK-23503
> URL: https://issues.apache.org/jira/browse/SPARK-23503
> Project: Spark
>  Issue Type: Sub-task
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Jose Torres
>Priority: Major
> Fix For: 3.0.0
>
>
> Currently, the EpochCoordinator doesn't enforce a commit order. If a message 
> for epoch n gets lost in the ether, and epoch n + 1 happens to be ready for 
> commit earlier, epoch n + 1 will be committed.
>  
> This is either incorrect or needlessly confusing, because it's not safe to 
> start from the end offset of epoch n + 1 until epoch n is committed. 
> EpochCoordinator should enforce this sequencing.
>  
> Note that this is not actually a problem right now, because the commit 
> messages go through the same RPC channel from the same place. But we 
> shouldn't implicitly bake this assumption in.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-24158) Enable no-data micro batches for streaming joins

2018-05-16 Thread Tathagata Das (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24158?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das resolved SPARK-24158.
---
   Resolution: Fixed
Fix Version/s: 3.0.0

Issue resolved by pull request 21253
[https://github.com/apache/spark/pull/21253]

> Enable no-data micro batches for streaming joins
> 
>
> Key: SPARK-24158
> URL: https://issues.apache.org/jira/browse/SPARK-24158
> Project: Spark
>  Issue Type: Sub-task
>  Components: Structured Streaming
>Affects Versions: 2.3.0
>Reporter: Tathagata Das
>Assignee: Tathagata Das
>Priority: Major
> Fix For: 3.0.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-24157) Enable no-data micro batches for streaming aggregation and deduplication

2018-05-04 Thread Tathagata Das (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24157?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das resolved SPARK-24157.
---
   Resolution: Fixed
Fix Version/s: 3.0.0

Issue resolved by pull request 21220
[https://github.com/apache/spark/pull/21220]

> Enable no-data micro batches for streaming aggregation and deduplication
> 
>
> Key: SPARK-24157
> URL: https://issues.apache.org/jira/browse/SPARK-24157
> Project: Spark
>  Issue Type: Sub-task
>  Components: Structured Streaming
>Affects Versions: 2.3.0
>Reporter: Tathagata Das
>Assignee: Tathagata Das
>Priority: Major
> Fix For: 3.0.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-24039) remove restarting iterators hack

2018-05-04 Thread Tathagata Das (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24039?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das reassigned SPARK-24039:
-

Assignee: Jose Torres

> remove restarting iterators hack
> 
>
> Key: SPARK-24039
> URL: https://issues.apache.org/jira/browse/SPARK-24039
> Project: Spark
>  Issue Type: Sub-task
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Jose Torres
>Assignee: Jose Torres
>Priority: Major
> Fix For: 3.0.0
>
>
> Currently, continuous processing execution calls next() to restart the query 
> iterator after it returns false. This doesn't work for complex RDDs - we need 
> to call compute() instead.
> This isn't refactoring-only; changes will be required to keep the reader from 
> starting over in each compute() call.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-24039) remove restarting iterators hack

2018-05-04 Thread Tathagata Das (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24039?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das resolved SPARK-24039.
---
   Resolution: Fixed
Fix Version/s: 3.0.0

Issue resolved by pull request 21200
[https://github.com/apache/spark/pull/21200]

> remove restarting iterators hack
> 
>
> Key: SPARK-24039
> URL: https://issues.apache.org/jira/browse/SPARK-24039
> Project: Spark
>  Issue Type: Sub-task
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Jose Torres
>Assignee: Jose Torres
>Priority: Major
> Fix For: 3.0.0
>
>
> Currently, continuous processing execution calls next() to restart the query 
> iterator after it returns false. This doesn't work for complex RDDs - we need 
> to call compute() instead.
> This isn't refactoring-only; changes will be required to keep the reader from 
> starting over in each compute() call.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24159) Enable no-data micro batches for streaming mapGroupswithState

2018-05-02 Thread Tathagata Das (JIRA)
Tathagata Das created SPARK-24159:
-

 Summary: Enable no-data micro batches for streaming 
mapGroupswithState
 Key: SPARK-24159
 URL: https://issues.apache.org/jira/browse/SPARK-24159
 Project: Spark
  Issue Type: Sub-task
  Components: Structured Streaming
Affects Versions: 2.3.0
Reporter: Tathagata Das


When event-time timeout is enabled, then use watermark updates to decide 
whether to run another batch

When processing-time timeout is enabled, then use the processing time and to 
decide when to run more batches.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24158) Enable no-data micro batches for streaming joins

2018-05-02 Thread Tathagata Das (JIRA)
Tathagata Das created SPARK-24158:
-

 Summary: Enable no-data micro batches for streaming joins
 Key: SPARK-24158
 URL: https://issues.apache.org/jira/browse/SPARK-24158
 Project: Spark
  Issue Type: Sub-task
  Components: Structured Streaming
Affects Versions: 2.3.0
Reporter: Tathagata Das






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-24158) Enable no-data micro batches for streaming joins

2018-05-02 Thread Tathagata Das (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24158?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das reassigned SPARK-24158:
-

Assignee: Tathagata Das

> Enable no-data micro batches for streaming joins
> 
>
> Key: SPARK-24158
> URL: https://issues.apache.org/jira/browse/SPARK-24158
> Project: Spark
>  Issue Type: Sub-task
>  Components: Structured Streaming
>Affects Versions: 2.3.0
>Reporter: Tathagata Das
>Assignee: Tathagata Das
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24156) Enable no-data micro batches for more eager streaming state clean up

2018-05-02 Thread Tathagata Das (JIRA)
Tathagata Das created SPARK-24156:
-

 Summary: Enable no-data micro batches for more eager streaming 
state clean up 
 Key: SPARK-24156
 URL: https://issues.apache.org/jira/browse/SPARK-24156
 Project: Spark
  Issue Type: Improvement
  Components: Structured Streaming
Affects Versions: 2.3.0
Reporter: Tathagata Das
Assignee: Tathagata Das


Currently, MicroBatchExecution in Structured Streaming runs batches only when 
there is new data to process. This is sensible in most cases as we dont want to 
unnecessarily use resources when there is nothing new to process. However, in 
some cases of stateful streaming queries, this delays state clean up as well as 
clean-up based output. For example, consider a streaming aggregation query with 
watermark-based state cleanup. The watermark is updated after every batch with 
new data completes. The updated value is used in the next batch to clean up 
state, and output finalized aggregates in append mode. However, if there is no 
data, then the next batch does not occur, and cleanup/output gets delayed 
unnecessarily. This is true for all stateful streaming operators - aggregation, 
deduplication, joins, mapGroupsWithState

This issue tracks the work to enable no-data batches in MicroBatchExecution. 
The major challenge is that all the tests of relevant stateful operations add 
dummy data to force another batch for testing the state cleanup. So a lot of 
the tests are going to be changed. So my plan is to enable no-data batches for 
different stateful operators one at a time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24157) Enable no-data micro batches for streaming aggregation and deduplication

2018-05-02 Thread Tathagata Das (JIRA)
Tathagata Das created SPARK-24157:
-

 Summary: Enable no-data micro batches for streaming aggregation 
and deduplication
 Key: SPARK-24157
 URL: https://issues.apache.org/jira/browse/SPARK-24157
 Project: Spark
  Issue Type: Sub-task
  Components: Structured Streaming
Affects Versions: 2.3.0
Reporter: Tathagata Das
Assignee: Tathagata Das






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-18791) Stream-Stream Joins

2018-05-02 Thread Tathagata Das (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18791?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das resolved SPARK-18791.
---
   Resolution: Done
Fix Version/s: 2.3.0

> Stream-Stream Joins
> ---
>
> Key: SPARK-18791
> URL: https://issues.apache.org/jira/browse/SPARK-18791
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Reporter: Michael Armbrust
>Assignee: Tathagata Das
>Priority: Major
> Fix For: 2.3.0
>
>
> Stream stream join is a much requested, but missing feature in Structured 
> Streaming. While the join API exists in Datasets and DataFrames, it throws 
> UnsupportedOperationException when applied between two streaming 
> Datasets/DataFrames. To support this, we have to maintain the same semantics 
> as other Structured Streaming operations - the result of the operation after 
> consuming two data streams data till positions/offsets X and Y, respectively, 
> must be the same as a single batch join operation on all the data till 
> positions X and Y, respectively. To achieve this, the execution has to buffer 
> past data (i.e. streaming state) from each stream, so that future data can be 
> matched against past data. Here is the set of a few high-level requirements. 
> - Buffer past rows as streaming state (using StateStore), and joining with 
> the past rows.
> - Support state cleanup using the event time watermark when possible.
> - Support different types of joins (inner, left outer, right outer is in 
> highest demand for ETL/enrichment type use cases [kafka -> best-effort enrich 
> -> write to S3])
> - Support cascading join operations (i.e. joining more than 2 streams)
> - Support multiple output modes (Append mode is in highest demand for 
> enabling ETL/enrichment type use cases)
> All the work to incrementally build this is going represented by this JIRA, 
> with specific subtasks for each step. At this point, this is the rough 
> direction as follows:
> - Implement stream-stream inner join in Append Mode, supporting multiple 
> cascaded joins.
> - Extends it stream-stream left/right outer join in Append Mode



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-24094) Change description strings of v2 streaming sources to reflect the change

2018-04-26 Thread Tathagata Das (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24094?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das resolved SPARK-24094.
---
   Resolution: Fixed
Fix Version/s: 3.0.0

Issue resolved by pull request 21160
[https://github.com/apache/spark/pull/21160]

> Change description strings of v2 streaming sources to reflect the change
> 
>
> Key: SPARK-24094
> URL: https://issues.apache.org/jira/browse/SPARK-24094
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.3.0
>Reporter: Tathagata Das
>Assignee: Tathagata Das
>Priority: Trivial
> Fix For: 3.0.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24094) Change description strings of v2 streaming sources to reflect the change

2018-04-25 Thread Tathagata Das (JIRA)
Tathagata Das created SPARK-24094:
-

 Summary: Change description strings of v2 streaming sources to 
reflect the change
 Key: SPARK-24094
 URL: https://issues.apache.org/jira/browse/SPARK-24094
 Project: Spark
  Issue Type: Improvement
  Components: Structured Streaming
Affects Versions: 2.3.0
Reporter: Tathagata Das
Assignee: Tathagata Das






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-24050) StreamingQuery does not calculate input / processing rates in some cases

2018-04-25 Thread Tathagata Das (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das resolved SPARK-24050.
---
   Resolution: Fixed
Fix Version/s: 3.0.0

Issue resolved by pull request 21126
[https://github.com/apache/spark/pull/21126]

> StreamingQuery does not calculate input / processing rates in some cases
> 
>
> Key: SPARK-24050
> URL: https://issues.apache.org/jira/browse/SPARK-24050
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.1.0, 2.1.1, 2.1.2, 2.2.0, 2.2.1, 2.3.0
>Reporter: Tathagata Das
>Assignee: Tathagata Das
>Priority: Major
> Fix For: 3.0.0
>
>
> In some streaming queries, the input and processing rates are not calculated 
> at all (shows up as zero) because MicroBatchExecution fails to associated 
> metrics from the executed plan of a trigger with the sources in the logical 
> plan of the trigger. The way this executed-plan-leaf-to-logical-source 
> attribution works is as follows. With V1 sources, there was no way to 
> identify which execution plan leaves were generated by a streaming source. So 
> did a best-effort attempt to match logical and execution plan leaves when the 
> number of leaves were same. In cases where the number of leaves is different, 
> we just give up and report zero rates. An example where this may happen is as 
> follows.
> {code}
> val cachedStaticDF = someStaticDF.union(anotherStaticDF).cache()
> val streamingInputDF = ...
> val query = streamingInputDF.join(cachedStaticDF).writeStream
> {code}
> In this case, the {{cachedStaticDF}} has multiple logical leaves, but in the 
> trigger's execution plan it only has leaf because a cached subplan is 
> represented as a single InMemoryTableScanExec leaf. This leads to a mismatch 
> in the number of leaves causing the input rates to be computed as zero. 
> With DataSourceV2, all inputs are represented in the executed plan using 
> {{DataSourceV2ScanExec}}, each of which has a reference to the associated 
> logical {{DataSource}} and {{DataSourceReader}}. So its easy to associate the 
> metrics to the original streaming sources. So the solution is to take 
> advantage of the presence of DataSourceV2 whenever possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-24038) refactor continuous write exec to its own class

2018-04-24 Thread Tathagata Das (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das resolved SPARK-24038.
---
   Resolution: Fixed
Fix Version/s: 3.0.0

Issue resolved by pull request 21116
[https://github.com/apache/spark/pull/21116]

> refactor continuous write exec to its own class
> ---
>
> Key: SPARK-24038
> URL: https://issues.apache.org/jira/browse/SPARK-24038
> Project: Spark
>  Issue Type: Sub-task
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Jose Torres
>Assignee: Jose Torres
>Priority: Major
> Fix For: 3.0.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-24038) refactor continuous write exec to its own class

2018-04-24 Thread Tathagata Das (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das reassigned SPARK-24038:
-

Assignee: Jose Torres

> refactor continuous write exec to its own class
> ---
>
> Key: SPARK-24038
> URL: https://issues.apache.org/jira/browse/SPARK-24038
> Project: Spark
>  Issue Type: Sub-task
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Jose Torres
>Assignee: Jose Torres
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-24056) Make consumer creation lazy in Kafka source for Structured streaming

2018-04-24 Thread Tathagata Das (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24056?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das resolved SPARK-24056.
---
   Resolution: Fixed
Fix Version/s: 3.0.0

Issue resolved by pull request 21134
[https://github.com/apache/spark/pull/21134]

> Make consumer creation lazy in Kafka source for Structured streaming
> 
>
> Key: SPARK-24056
> URL: https://issues.apache.org/jira/browse/SPARK-24056
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Tathagata Das
>Assignee: Tathagata Das
>Priority: Minor
> Fix For: 3.0.0
>
>
> Currently, the driver side of the Kafka source (i.e. KafkaMicroBatchReader) 
> eagerly creates a consumer as soon as the Kafk aMicroBatchReader is created. 
> However, we create dummy KafkaMicroBatchReader to get the schema and 
> immediately stop it. Its better to make the consumer creation lazy, it will 
> be created on the first attempt to fetch offsets using the KafkaOffsetReader.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24056) Make consumer creation lazy in Kafka source for Structured streaming

2018-04-23 Thread Tathagata Das (JIRA)
Tathagata Das created SPARK-24056:
-

 Summary: Make consumer creation lazy in Kafka source for 
Structured streaming
 Key: SPARK-24056
 URL: https://issues.apache.org/jira/browse/SPARK-24056
 Project: Spark
  Issue Type: Bug
  Components: Structured Streaming
Affects Versions: 2.4.0
Reporter: Tathagata Das
Assignee: Tathagata Das


Currently, the driver side of the Kafka source (i.e. KafkaMicroBatchReader) 
eagerly creates a consumer as soon as the Kafk aMicroBatchReader is created. 
However, we create dummy KafkaMicroBatchReader to get the schema and 
immediately stop it. Its better to make the consumer creation lazy, it will be 
created on the first attempt to fetch offsets using the KafkaOffsetReader.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-23004) Structured Streaming raise "llegalStateException: Cannot remove after already committed or aborted"

2018-04-23 Thread Tathagata Das (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das resolved SPARK-23004.
---
   Resolution: Fixed
Fix Version/s: 2.3.1
   3.0.0

Issue resolved by pull request 21124
[https://github.com/apache/spark/pull/21124]

> Structured Streaming raise "llegalStateException: Cannot remove after already 
> committed or aborted"
> ---
>
> Key: SPARK-23004
> URL: https://issues.apache.org/jira/browse/SPARK-23004
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.1.0, 2.1.1, 2.1.2, 2.2.0, 2.2.1, 2.3.0
> Environment: Run on yarn or local both raise the exception.
>Reporter: secfree
>Assignee: Tathagata Das
>Priority: Major
> Fix For: 3.0.0, 2.3.1
>
>
> A structured streaming query with a streaming aggregation can throw the 
> following error in rare cases. 
> {code:java}
> java.lang.IllegalStateException: Cannot remove after already committed or 
> aborted at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$verify(HDFSBackedStateStoreProvider.scala:659
>  ) at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.remove(HDFSBackedStateStoreProvider.scala:143)
>  at 
> org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3$$anon$1.hasNext(statefulOperators.scala:233)
>  at 
> org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:191)
>  at 
> org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.(ObjectAggregationIterator.scala:80)
>  at 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:111)
>  at 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:103)
>  at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
>  at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
>  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at 
> org.apache.spark.scheduler.Task.run(Task.scala:108) at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745) 18-01-05 13:29:57 WARN 
> TaskSetManager:66 Lost task 68.0 in stage 1933.0 (TID 196171, localhost, 
> executor driver): java.lang.IllegalStateException: Cannot remove after 
> already committed or aborted at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$verify(HDFSBackedStateStoreProvider.scala:659){code}
>  
> This can happen when the following conditions are accidentally hit. 
>  # Streaming aggregation with aggregation function that is a subset of 
> {{[TypedImperativeAggregation|https://github.com/apache/spark/blob/76b8b840ddc951ee6203f9cccd2c2b9671c1b5e8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala#L473]}}
>  (for example, {{collect_set}}, {{collect_list}}, {{percentile}}, etc.). 
>  # Query running in {{update}} mode
>  # After the shuffle, a partition has exactly 128 records. 
> This happens because of the following. 
>  # The {{StateStoreSaveExec}} used in streaming aggregations has the 
> [following 
> logic|https://github.com/apache/spark/blob/76b8b840ddc951ee6203f9cccd2c2b9671c1b5e8/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala#L359]
>  when used in {{update}} mode.
>  ## There is an iterator that reads data from its parent iterator and updates 
> the StateStore.
>  ## When the parent iterator is fully consumed (i.e. {{baseIterator.hasNext}} 
> returns false) then all state changes are committed by calling 
> {{StateStore.commit}}. 
>  ## The implementation of {{StateStore.commit()}} in {{HDFSBackedStateStore}} 
> does not 

[jira] [Updated] (SPARK-24050) StreamingQuery does not calculate input / processing rates in some cases

2018-04-23 Thread Tathagata Das (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das updated SPARK-24050:
--
Description: 
In some streaming queries, the input and processing rates are not calculated at 
all (shows up as zero) because MicroBatchExecution fails to associated metrics 
from the executed plan of a trigger with the sources in the logical plan of the 
trigger. The way this executed-plan-leaf-to-logical-source attribution works is 
as follows. With V1 sources, there was no way to identify which execution plan 
leaves were generated by a streaming source. So did a best-effort attempt to 
match logical and execution plan leaves when the number of leaves were same. In 
cases where the number of leaves is different, we just give up and report zero 
rates. An example where this may happen is as follows.
{code}
val cachedStaticDF = someStaticDF.union(anotherStaticDF).cache()
val streamingInputDF = ...

val query = streamingInputDF.join(cachedStaticDF).writeStream
{code}

In this case, the {{cachedStaticDF}} has multiple logical leaves, but in the 
trigger's execution plan it only has leaf because a cached subplan is 
represented as a single InMemoryTableScanExec leaf. This leads to a mismatch in 
the number of leaves causing the input rates to be computed as zero. 

With DataSourceV2, all inputs are represented in the executed plan using 
{{DataSourceV2ScanExec}}, each of which has a reference to the associated 
logical {{DataSource}} and {{DataSourceReader}}. So its easy to associate the 
metrics to the original streaming sources. So the solution is to take advantage 
of the presence of DataSourceV2 whenever possible.



  was:
In some streaming queries, the input and processing rates are not calculated at 
all (shows up as zero) because MicroBatchExecution fails to associated metrics 
from the executed plan of a trigger with the sources in the logical plan of the 
trigger. The way this executed-plan-leaf-to-logical-source attribution works is 
as follows. With V1 sources, there was no way to identify which execution plan 
leaves were generated by a streaming source. So did a best-effort attempt to 
match logical and execution plan leaves when the number of leaves were same. In 
cases where the number of leaves is different, we just give up and report zero 
rates. An example where this may happen is as follows.
{code}
val cachedStaticDF = someStaticDF.union(anotherStaticDF).cache()
val streamingInputDF = ...

val query = streamingInputDF.join(cachedStaticDF).writeStream
{code}

In this case, the {{cachedStaticDF}} has multiple logical leaves, but in the 
trigger's execution plan it only has leaf because a cached subplan is 
represented as a single InMemoryTableScanExec leaf. This leads to a mismatch in 
the number of leaves causing the input rates to be computed as zero. 

With DataSourceV2, all inputs are represented in the executed plan using 
{{DataSourceV2ScanExec}}s, each of which has a reference to the associated 
logical {{DataSource}} and {{DataSourceReader}}. So its easy to associate the 
metrics to the original streaming sources. So the solution is to take advantage 
of the presence of DataSourceV2 whenever possible.




> StreamingQuery does not calculate input / processing rates in some cases
> 
>
> Key: SPARK-24050
> URL: https://issues.apache.org/jira/browse/SPARK-24050
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.1.0, 2.1.1, 2.1.2, 2.2.0, 2.2.1, 2.3.0
>Reporter: Tathagata Das
>Assignee: Tathagata Das
>Priority: Major
>
> In some streaming queries, the input and processing rates are not calculated 
> at all (shows up as zero) because MicroBatchExecution fails to associated 
> metrics from the executed plan of a trigger with the sources in the logical 
> plan of the trigger. The way this executed-plan-leaf-to-logical-source 
> attribution works is as follows. With V1 sources, there was no way to 
> identify which execution plan leaves were generated by a streaming source. So 
> did a best-effort attempt to match logical and execution plan leaves when the 
> number of leaves were same. In cases where the number of leaves is different, 
> we just give up and report zero rates. An example where this may happen is as 
> follows.
> {code}
> val cachedStaticDF = someStaticDF.union(anotherStaticDF).cache()
> val streamingInputDF = ...
> val query = streamingInputDF.join(cachedStaticDF).writeStream
> {code}
> In this case, the {{cachedStaticDF}} has multiple logical leaves, but in the 
> trigger's execution plan it only has leaf because a cached subplan is 
> represented as a single InMemoryTableScanExec leaf. This leads to a mismatch 
> in the number of leaves causing the 

[jira] [Created] (SPARK-24050) StreamingQuery does not calculate input / processing rates in some cases

2018-04-22 Thread Tathagata Das (JIRA)
Tathagata Das created SPARK-24050:
-

 Summary: StreamingQuery does not calculate input / processing 
rates in some cases
 Key: SPARK-24050
 URL: https://issues.apache.org/jira/browse/SPARK-24050
 Project: Spark
  Issue Type: Bug
  Components: Structured Streaming
Affects Versions: 2.3.0, 2.2.1, 2.2.0, 2.1.2, 2.1.1, 2.1.0
Reporter: Tathagata Das
Assignee: Tathagata Das


In some streaming queries, the input and processing rates are not calculated at 
all (shows up as zero) because MicroBatchExecution fails to associated metrics 
from the executed plan of a trigger with the sources in the logical plan of the 
trigger. The way this executed-plan-leaf-to-logical-source attribution works is 
as follows. With V1 sources, there was no way to identify which execution plan 
leaves were generated by a streaming source. So did a best-effort attempt to 
match logical and execution plan leaves when the number of leaves were same. In 
cases where the number of leaves is different, we just give up and report zero 
rates. An example where this may happen is as follows.
{code}
val cachedStaticDF = someStaticDF.union(anotherStaticDF).cache()
val streamingInputDF = ...

val query = streamingInputDF.join(cachedStaticDF).writeStream
{code}

In this case, the {{cachedStaticDF}} has multiple logical leaves, but in the 
trigger's execution plan it only has leaf because a cached subplan is 
represented as a single InMemoryTableScanExec leaf. This leads to a mismatch in 
the number of leaves causing the input rates to be computed as zero. 

With DataSourceV2, all inputs are represented in the executed plan using 
{{DataSourceV2ScanExec}}s, each of which has a reference to the associated 
logical {{DataSource}} and {{DataSourceReader}}. So its easy to associate the 
metrics to the original streaming sources. So the solution is to take advantage 
of the presence of DataSourceV2 whenever possible.





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23004) Structured Streaming raise "llegalStateException: Cannot remove after already committed or aborted"

2018-04-22 Thread Tathagata Das (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das updated SPARK-23004:
--
Target Version/s: 2.3.1, 2.4.0  (was: 2.3.1)

> Structured Streaming raise "llegalStateException: Cannot remove after already 
> committed or aborted"
> ---
>
> Key: SPARK-23004
> URL: https://issues.apache.org/jira/browse/SPARK-23004
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.1.0, 2.1.1, 2.1.2, 2.2.0, 2.2.1, 2.3.0
> Environment: Run on yarn or local both raise the exception.
>Reporter: secfree
>Assignee: Tathagata Das
>Priority: Major
>
> A structured streaming query with a streaming aggregation can throw the 
> following error in rare cases. 
> {code:java}
> java.lang.IllegalStateException: Cannot remove after already committed or 
> aborted at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$verify(HDFSBackedStateStoreProvider.scala:659
>  ) at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.remove(HDFSBackedStateStoreProvider.scala:143)
>  at 
> org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3$$anon$1.hasNext(statefulOperators.scala:233)
>  at 
> org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:191)
>  at 
> org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.(ObjectAggregationIterator.scala:80)
>  at 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:111)
>  at 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:103)
>  at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
>  at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
>  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at 
> org.apache.spark.scheduler.Task.run(Task.scala:108) at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745) 18-01-05 13:29:57 WARN 
> TaskSetManager:66 Lost task 68.0 in stage 1933.0 (TID 196171, localhost, 
> executor driver): java.lang.IllegalStateException: Cannot remove after 
> already committed or aborted at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$verify(HDFSBackedStateStoreProvider.scala:659){code}
>  
> This can happen when the following conditions are accidentally hit. 
>  # Streaming aggregation with aggregation function that is a subset of 
> {{[TypedImperativeAggregation|https://github.com/apache/spark/blob/76b8b840ddc951ee6203f9cccd2c2b9671c1b5e8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala#L473]}}
>  (for example, {{collect_set}}, {{collect_list}}, {{percentile}}, etc.). 
>  # Query running in {{update}} mode
>  # After the shuffle, a partition has exactly 128 records. 
> This happens because of the following. 
>  # The {{StateStoreSaveExec}} used in streaming aggregations has the 
> [following 
> logic|https://github.com/apache/spark/blob/76b8b840ddc951ee6203f9cccd2c2b9671c1b5e8/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala#L359]
>  when used in {{update}} mode.
>  ## There is an iterator that reads data from its parent iterator and updates 
> the StateStore.
>  ## When the parent iterator is fully consumed (i.e. {{baseIterator.hasNext}} 
> returns false) then all state changes are committed by calling 
> {{StateStore.commit}}. 
>  ## The implementation of {{StateStore.commit()}} in {{HDFSBackedStateStore}} 
> does not allow itself to be called twice. However, the logic is such that, if 
> {{hasNext}} is called multiple times after {{baseIterator.hasNext}} has 
> 

[jira] [Comment Edited] (SPARK-23004) Structured Streaming raise "llegalStateException: Cannot remove after already committed or aborted"

2018-04-22 Thread Tathagata Das (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16447440#comment-16447440
 ] 

Tathagata Das edited comment on SPARK-23004 at 4/23/18 1:16 AM:


[~joshrosen] hit the issue as well, and thanks to him I could reproduce it. I 
am updating the description of the Jira to include more details. This is a 
long-standing crazy issue!


was (Author: tdas):
[~joshrosen] hit the issue as well, and thanks to him I could reproduce it. I 
am updating the description of the Jira to include more details. This is a 
long-standing crazy issue!

 

 

> Structured Streaming raise "llegalStateException: Cannot remove after already 
> committed or aborted"
> ---
>
> Key: SPARK-23004
> URL: https://issues.apache.org/jira/browse/SPARK-23004
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.1.0, 2.1.1, 2.1.2, 2.2.0, 2.2.1, 2.3.0
> Environment: Run on yarn or local both raise the exception.
>Reporter: secfree
>Assignee: Tathagata Das
>Priority: Major
>
> A structured streaming query with a streaming aggregation can throw the 
> following error in rare cases. 
> {code:java}
> java.lang.IllegalStateException: Cannot remove after already committed or 
> aborted at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$verify(HDFSBackedStateStoreProvider.scala:659
>  ) at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.remove(HDFSBackedStateStoreProvider.scala:143)
>  at 
> org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3$$anon$1.hasNext(statefulOperators.scala:233)
>  at 
> org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:191)
>  at 
> org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.(ObjectAggregationIterator.scala:80)
>  at 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:111)
>  at 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:103)
>  at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
>  at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
>  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at 
> org.apache.spark.scheduler.Task.run(Task.scala:108) at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745) 18-01-05 13:29:57 WARN 
> TaskSetManager:66 Lost task 68.0 in stage 1933.0 (TID 196171, localhost, 
> executor driver): java.lang.IllegalStateException: Cannot remove after 
> already committed or aborted at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$verify(HDFSBackedStateStoreProvider.scala:659){code}
>  
> This can happen when the following conditions are accidentally hit. 
>  # Streaming aggregation with aggregation function that is a subset of 
> {{[TypedImperativeAggregation|https://github.com/apache/spark/blob/76b8b840ddc951ee6203f9cccd2c2b9671c1b5e8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala#L473]}}
>  (for example, {{collect_set}}, {{collect_list}}, {{percentile}}, etc.). 
>  # Query running in {{update}} mode
>  # After the shuffle, a partition has exactly 128 records. 
> This happens because of the following. 
>  # The {{StateStoreSaveExec}} used in streaming aggregations has the 
> [following 
> logic|https://github.com/apache/spark/blob/76b8b840ddc951ee6203f9cccd2c2b9671c1b5e8/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala#L359]
>  when used in {{update}} mode.
>  ## There is an iterator that reads data from its parent iterator and updates 
> 

[jira] [Updated] (SPARK-23004) Structured Streaming raise "llegalStateException: Cannot remove after already committed or aborted"

2018-04-22 Thread Tathagata Das (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das updated SPARK-23004:
--
Component/s: (was: Input/Output)
 Structured Streaming

> Structured Streaming raise "llegalStateException: Cannot remove after already 
> committed or aborted"
> ---
>
> Key: SPARK-23004
> URL: https://issues.apache.org/jira/browse/SPARK-23004
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.1.0, 2.1.1, 2.1.2, 2.2.0, 2.2.1, 2.3.0
> Environment: Run on yarn or local both raise the exception.
>Reporter: secfree
>Assignee: Tathagata Das
>Priority: Major
>
> A structured streaming query with a streaming aggregation can throw the 
> following error in rare cases. 
> {code:java}
> java.lang.IllegalStateException: Cannot remove after already committed or 
> aborted at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$verify(HDFSBackedStateStoreProvider.scala:659
>  ) at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.remove(HDFSBackedStateStoreProvider.scala:143)
>  at 
> org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3$$anon$1.hasNext(statefulOperators.scala:233)
>  at 
> org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:191)
>  at 
> org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.(ObjectAggregationIterator.scala:80)
>  at 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:111)
>  at 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:103)
>  at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
>  at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
>  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at 
> org.apache.spark.scheduler.Task.run(Task.scala:108) at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745) 18-01-05 13:29:57 WARN 
> TaskSetManager:66 Lost task 68.0 in stage 1933.0 (TID 196171, localhost, 
> executor driver): java.lang.IllegalStateException: Cannot remove after 
> already committed or aborted at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$verify(HDFSBackedStateStoreProvider.scala:659){code}
>  
> This can happen when the following conditions are accidentally hit. 
>  # Streaming aggregation with aggregation function that is a subset of 
> {{[TypedImperativeAggregation|https://github.com/apache/spark/blob/76b8b840ddc951ee6203f9cccd2c2b9671c1b5e8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala#L473]}}
>  (for example, {{collect_set}}, {{collect_list}}, {{percentile}}, etc.). 
>  # Query running in {{update}} mode
>  # After the shuffle, a partition has exactly 128 records. 
> This happens because of the following. 
>  # The {{StateStoreSaveExec}} used in streaming aggregations has the 
> [following 
> logic|https://github.com/apache/spark/blob/76b8b840ddc951ee6203f9cccd2c2b9671c1b5e8/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala#L359]
>  when used in {{update}} mode.
>  ## There is an iterator that reads data from its parent iterator and updates 
> the StateStore.
>  ## When the parent iterator is fully consumed (i.e. {{baseIterator.hasNext}} 
> returns false) then all state changes are committed by calling 
> {{StateStore.commit}}. 
>  ## The implementation of {{StateStore.commit()}} in {{HDFSBackedStateStore}} 
> does not allow itself to be called twice. However, the logic is such that, if 
> {{hasNext}} is called multiple times after 

[jira] [Assigned] (SPARK-23004) Structured Streaming raise "llegalStateException: Cannot remove after already committed or aborted"

2018-04-22 Thread Tathagata Das (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das reassigned SPARK-23004:
-

Assignee: Tathagata Das

> Structured Streaming raise "llegalStateException: Cannot remove after already 
> committed or aborted"
> ---
>
> Key: SPARK-23004
> URL: https://issues.apache.org/jira/browse/SPARK-23004
> Project: Spark
>  Issue Type: Bug
>  Components: Input/Output
>Affects Versions: 2.1.0, 2.1.1, 2.1.2, 2.2.0, 2.2.1, 2.3.0
> Environment: Run on yarn or local both raise the exception.
>Reporter: secfree
>Assignee: Tathagata Das
>Priority: Major
>
> A structured streaming query with a streaming aggregation can throw the 
> following error in rare cases. 
> {code:java}
> java.lang.IllegalStateException: Cannot remove after already committed or 
> aborted at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$verify(HDFSBackedStateStoreProvider.scala:659
>  ) at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.remove(HDFSBackedStateStoreProvider.scala:143)
>  at 
> org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3$$anon$1.hasNext(statefulOperators.scala:233)
>  at 
> org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:191)
>  at 
> org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.(ObjectAggregationIterator.scala:80)
>  at 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:111)
>  at 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:103)
>  at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
>  at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
>  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at 
> org.apache.spark.scheduler.Task.run(Task.scala:108) at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745) 18-01-05 13:29:57 WARN 
> TaskSetManager:66 Lost task 68.0 in stage 1933.0 (TID 196171, localhost, 
> executor driver): java.lang.IllegalStateException: Cannot remove after 
> already committed or aborted at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$verify(HDFSBackedStateStoreProvider.scala:659){code}
>  
> This can happen when the following conditions are accidentally hit. 
>  # Streaming aggregation with aggregation function that is a subset of 
> {{[TypedImperativeAggregation|https://github.com/apache/spark/blob/76b8b840ddc951ee6203f9cccd2c2b9671c1b5e8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala#L473]}}
>  (for example, {{collect_set}}, {{collect_list}}, {{percentile}}, etc.). 
>  # Query running in {{update}} mode
>  # After the shuffle, a partition has exactly 128 records. 
> This happens because of the following. 
>  # The {{StateStoreSaveExec}} used in streaming aggregations has the 
> [following 
> logic|https://github.com/apache/spark/blob/76b8b840ddc951ee6203f9cccd2c2b9671c1b5e8/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala#L359]
>  when used in {{update}} mode.
>  ## There is an iterator that reads data from its parent iterator and updates 
> the StateStore.
>  ## When the parent iterator is fully consumed (i.e. {{baseIterator.hasNext}} 
> returns false) then all state changes are committed by calling 
> {{StateStore.commit}}. 
>  ## The implementation of {{StateStore.commit()}} in {{HDFSBackedStateStore}} 
> does not allow itself to be called twice. However, the logic is such that, if 
> {{hasNext}} is called multiple times after {{baseIterator.hasNext}} has 
> returned false then each 

[jira] [Updated] (SPARK-23004) Structured Streaming raise "llegalStateException: Cannot remove after already committed or aborted"

2018-04-22 Thread Tathagata Das (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das updated SPARK-23004:
--
Target Version/s: 2.3.1

> Structured Streaming raise "llegalStateException: Cannot remove after already 
> committed or aborted"
> ---
>
> Key: SPARK-23004
> URL: https://issues.apache.org/jira/browse/SPARK-23004
> Project: Spark
>  Issue Type: Bug
>  Components: Input/Output
>Affects Versions: 2.1.0, 2.1.1, 2.1.2, 2.2.0, 2.2.1, 2.3.0
> Environment: Run on yarn or local both raise the exception.
>Reporter: secfree
>Priority: Major
>
> A structured streaming query with a streaming aggregation can throw the 
> following error in rare cases. 
> {code:java}
> java.lang.IllegalStateException: Cannot remove after already committed or 
> aborted at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$verify(HDFSBackedStateStoreProvider.scala:659
>  ) at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.remove(HDFSBackedStateStoreProvider.scala:143)
>  at 
> org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3$$anon$1.hasNext(statefulOperators.scala:233)
>  at 
> org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:191)
>  at 
> org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.(ObjectAggregationIterator.scala:80)
>  at 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:111)
>  at 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:103)
>  at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
>  at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
>  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at 
> org.apache.spark.scheduler.Task.run(Task.scala:108) at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745) 18-01-05 13:29:57 WARN 
> TaskSetManager:66 Lost task 68.0 in stage 1933.0 (TID 196171, localhost, 
> executor driver): java.lang.IllegalStateException: Cannot remove after 
> already committed or aborted at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$verify(HDFSBackedStateStoreProvider.scala:659){code}
>  
> This can happen when the following conditions are accidentally hit. 
>  # Streaming aggregation with aggregation function that is a subset of 
> {{[TypedImperativeAggregation|https://github.com/apache/spark/blob/76b8b840ddc951ee6203f9cccd2c2b9671c1b5e8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala#L473]}}
>  (for example, {{collect_set}}, {{collect_list}}, {{percentile}}, etc.). 
>  # Query running in {{update}} mode
>  # After the shuffle, a partition has exactly 128 records. 
> This happens because of the following. 
>  # The {{StateStoreSaveExec}} used in streaming aggregations has the 
> [following 
> logic|https://github.com/apache/spark/blob/76b8b840ddc951ee6203f9cccd2c2b9671c1b5e8/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala#L359]
>  when used in {{update}} mode.
>  ## There is an iterator that reads data from its parent iterator and updates 
> the StateStore.
>  ## When the parent iterator is fully consumed (i.e. {{baseIterator.hasNext}} 
> returns false) then all state changes are committed by calling 
> {{StateStore.commit}}. 
>  ## The implementation of {{StateStore.commit()}} in {{HDFSBackedStateStore}} 
> does not allow itself to be called twice. However, the logic is such that, if 
> {{hasNext}} is called multiple times after {{baseIterator.hasNext}} has 
> returned false then each time it will call {{StateStore.commit}}.
>  

[jira] [Updated] (SPARK-23004) Structured Streaming raise "llegalStateException: Cannot remove after already committed or aborted"

2018-04-22 Thread Tathagata Das (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das updated SPARK-23004:
--
Affects Version/s: 2.1.0
   2.1.1
   2.1.2
   2.2.0
   2.3.0

> Structured Streaming raise "llegalStateException: Cannot remove after already 
> committed or aborted"
> ---
>
> Key: SPARK-23004
> URL: https://issues.apache.org/jira/browse/SPARK-23004
> Project: Spark
>  Issue Type: Bug
>  Components: Input/Output
>Affects Versions: 2.1.0, 2.1.1, 2.1.2, 2.2.0, 2.2.1, 2.3.0
> Environment: Run on yarn or local both raise the exception.
>Reporter: secfree
>Priority: Major
>
> A structured streaming query with a streaming aggregation can throw the 
> following error in rare cases. 
> {code:java}
> java.lang.IllegalStateException: Cannot remove after already committed or 
> aborted at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$verify(HDFSBackedStateStoreProvider.scala:659
>  ) at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.remove(HDFSBackedStateStoreProvider.scala:143)
>  at 
> org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3$$anon$1.hasNext(statefulOperators.scala:233)
>  at 
> org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:191)
>  at 
> org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.(ObjectAggregationIterator.scala:80)
>  at 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:111)
>  at 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:103)
>  at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
>  at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
>  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at 
> org.apache.spark.scheduler.Task.run(Task.scala:108) at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745) 18-01-05 13:29:57 WARN 
> TaskSetManager:66 Lost task 68.0 in stage 1933.0 (TID 196171, localhost, 
> executor driver): java.lang.IllegalStateException: Cannot remove after 
> already committed or aborted at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$verify(HDFSBackedStateStoreProvider.scala:659){code}
>  
> This can happen when the following conditions are accidentally hit. 
>  # Streaming aggregation with aggregation function that is a subset of 
> {{[TypedImperativeAggregation|https://github.com/apache/spark/blob/76b8b840ddc951ee6203f9cccd2c2b9671c1b5e8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala#L473]}}
>  (for example, {{collect_set}}, {{collect_list}}, {{percentile}}, etc.). 
>  # Query running in {{update}} mode
>  # After the shuffle, a partition has exactly 128 records. 
> This happens because of the following. 
>  # The {{StateStoreSaveExec}} used in streaming aggregations has the 
> [following 
> logic|https://github.com/apache/spark/blob/76b8b840ddc951ee6203f9cccd2c2b9671c1b5e8/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala#L359]
>  when used in {{update}} mode.
>  ## There is an iterator that reads data from its parent iterator and updates 
> the StateStore.
>  ## When the parent iterator is fully consumed (i.e. {{baseIterator.hasNext}} 
> returns false) then all state changes are committed by calling 
> {{StateStore.commit}}. 
>  ## The implementation of {{StateStore.commit()}} in {{HDFSBackedStateStore}} 
> does not allow itself to be called twice. However, the logic is such that, if 
> {{hasNext}} is called 

[jira] [Updated] (SPARK-23004) Structured Streaming raise "llegalStateException: Cannot remove after already committed or aborted"

2018-04-22 Thread Tathagata Das (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das updated SPARK-23004:
--
Description: 
{{A structured streaming query with streaming aggregations can throw the 
following error in rare cases. 
{code:java}
java.lang.IllegalStateException: Cannot remove after already committed or 
aborted at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$verify(HDFSBackedStateStoreProvider.scala:659
 ) at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.remove(HDFSBackedStateStoreProvider.scala:143)
 at 
org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3$$anon$1.hasNext(statefulOperators.scala:233)
 at 
org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:191)
 at 
org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.(ObjectAggregationIterator.scala:80)
 at 
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:111)
 at 
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:103)
 at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
 at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at 
org.apache.spark.scheduler.Task.run(Task.scala:108) at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) 18-01-05 13:29:57 WARN 
TaskSetManager:66 Lost task 68.0 in stage 1933.0 (TID 196171, localhost, 
executor driver): java.lang.IllegalStateException: Cannot remove after already 
committed or aborted at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$verify(HDFSBackedStateStoreProvider.scala:659){code}
 

This can happen when the following conditions are accidentally hit. 
 # Streaming aggregation with aggregation function that is a subset of 
{{[TypedImperativeAggregation|https://github.com/apache/spark/blob/76b8b840ddc951ee6203f9cccd2c2b9671c1b5e8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala#L473]}}
 (for example, {{collect_set}}, {{collect_list}}, {{percentile}}, etc.). 
 # Query running in {{update}} mode
 # After the shuffle, a partition has exactly 128 records. 

This happens because of the following. 
 # The {{StateStoreSaveExec}} used in streaming aggregations has the [following 
logic|https://github.com/apache/spark/blob/76b8b840ddc951ee6203f9cccd2c2b9671c1b5e8/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala#L359]
 when used in {{update}} mode.
 ## There is an iterator that reads data from its parent iterator and updates 
the StateStore.
 ## When the parent iterator is fully consumed (i.e. \{{baseIterator.hasNext}} 
returns false) then all state changes are committed by calling 
{{StateStore.commit}}. 
 ## The implementation of \{{StateStore.commit()}} in {{HDFSBackedStateStore}} 
does not allow itself to be called twice. However, the logic is such that, if 
{{hasNext}} is called multiple times after \{{baseIterator.hasNext}} has 
returned false then each time it will call \{{StateStore.commit}}.
 ## For most aggregation functions, this is okay because \{{hasNext}} is only 
called once. But thats not the case with \{{ImperativeTypedAggregates}}.
 # {\{ImperativeTypedAggregates}} are executed using 
{{ObjectHashAggregateExec}} which will try to use two kinds of hashmaps for 
aggregations. 
 ## It will first try to use an unsorted hashmap. If the size of the hashmap 
increases beyond a certain threshold (default 128), then it will switch to 
using a sorted hashmap. 
 ## The [switching 
logic|https://github.com/apache/spark/blob/76b8b840ddc951ee6203f9cccd2c2b9671c1b5e8/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationIterator.scala]
 in {{ObjectAggregationIterator}} (used by 

  1   2   3   4   5   6   7   8   9   10   >