[jira] [Updated] (SPARK-19874) Hide API docs for "org.apache.spark.sql.internal"
[ https://issues.apache.org/jira/browse/SPARK-19874?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-19874: - Priority: Minor (was: Major) > Hide API docs for "org.apache.spark.sql.internal" > - > > Key: SPARK-19874 > URL: https://issues.apache.org/jira/browse/SPARK-19874 > Project: Spark > Issue Type: Bug > Components: Build >Affects Versions: 2.1.0 >Reporter: Shixiong Zhu >Assignee: Shixiong Zhu >Priority: Minor > Fix For: 2.1.1, 2.2.0 > > > The API docs should not include the "org.apache.spark.sql.internal" package > because they are internal private APIs. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-19540) Add ability to clone SparkSession with an identical copy of the SessionState
[ https://issues.apache.org/jira/browse/SPARK-19540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu reassigned SPARK-19540: Assignee: Kunal Khamar > Add ability to clone SparkSession with an identical copy of the SessionState > > > Key: SPARK-19540 > URL: https://issues.apache.org/jira/browse/SPARK-19540 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.2.0 >Reporter: Kunal Khamar >Assignee: Kunal Khamar > Fix For: 2.2.0 > > > Forking a newSession() from SparkSession currently makes a new SparkSession > that does not retain SessionState (i.e. temporary tables, SQL config, > registered functions etc.) This change adds a method cloneSession() which > creates a new SparkSession with a copy of the parent's SessionState. > Subsequent changes to base session are not propagated to cloned session, > clone is independent after creation. > If the base is changed after clone has been created, say user registers new > UDF, then the new UDF will not be available inside the clone. Same goes for > configs and temp tables. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-19540) Add ability to clone SparkSession with an identical copy of the SessionState
[ https://issues.apache.org/jira/browse/SPARK-19540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu resolved SPARK-19540. -- Resolution: Fixed Fix Version/s: 2.2.0 > Add ability to clone SparkSession with an identical copy of the SessionState > > > Key: SPARK-19540 > URL: https://issues.apache.org/jira/browse/SPARK-19540 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.2.0 >Reporter: Kunal Khamar > Fix For: 2.2.0 > > > Forking a newSession() from SparkSession currently makes a new SparkSession > that does not retain SessionState (i.e. temporary tables, SQL config, > registered functions etc.) This change adds a method cloneSession() which > creates a new SparkSession with a copy of the parent's SessionState. > Subsequent changes to base session are not propagated to cloned session, > clone is independent after creation. > If the base is changed after clone has been created, say user registers new > UDF, then the new UDF will not be available inside the clone. Same goes for > configs and temp tables. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-19874) Hide API docs for "org.apache.spark.sql.internal"
Shixiong Zhu created SPARK-19874: Summary: Hide API docs for "org.apache.spark.sql.internal" Key: SPARK-19874 URL: https://issues.apache.org/jira/browse/SPARK-19874 Project: Spark Issue Type: Bug Components: Build Affects Versions: 2.1.0 Reporter: Shixiong Zhu Assignee: Shixiong Zhu The API docs should not include the "org.apache.spark.sql.internal" package because they are internal private APIs. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-19858) Add output mode to flatMapGroupsWithState and disallow invalid cases
[ https://issues.apache.org/jira/browse/SPARK-19858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-19858: - Affects Version/s: (was: 2.1.1) > Add output mode to flatMapGroupsWithState and disallow invalid cases > > > Key: SPARK-19858 > URL: https://issues.apache.org/jira/browse/SPARK-19858 > Project: Spark > Issue Type: Sub-task > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Shixiong Zhu >Assignee: Shixiong Zhu > Fix For: 2.2.0 > > -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-19858) Add output mode to flatMapGroupsWithState and disallow invalid cases
[ https://issues.apache.org/jira/browse/SPARK-19858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu resolved SPARK-19858. -- Resolution: Fixed Fix Version/s: 2.2.0 > Add output mode to flatMapGroupsWithState and disallow invalid cases > > > Key: SPARK-19858 > URL: https://issues.apache.org/jira/browse/SPARK-19858 > Project: Spark > Issue Type: Sub-task > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Shixiong Zhu >Assignee: Shixiong Zhu > Fix For: 2.2.0 > > -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-19413) Basic mapGroupsWithState API
[ https://issues.apache.org/jira/browse/SPARK-19413?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-19413: - Fix Version/s: (was: 2.1.1) > Basic mapGroupsWithState API > > > Key: SPARK-19413 > URL: https://issues.apache.org/jira/browse/SPARK-19413 > Project: Spark > Issue Type: Sub-task > Components: Structured Streaming >Reporter: Tathagata Das >Assignee: Tathagata Das > Fix For: 2.2.0 > > > Basic API (without timeouts) as described in the parent JIRA SPARK-19067 -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19413) Basic mapGroupsWithState API
[ https://issues.apache.org/jira/browse/SPARK-19413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15902105#comment-15902105 ] Shixiong Zhu commented on SPARK-19413: -- Reverted the patch from branch 2.1. This feature will not go into 2.1.1. > Basic mapGroupsWithState API > > > Key: SPARK-19413 > URL: https://issues.apache.org/jira/browse/SPARK-19413 > Project: Spark > Issue Type: Sub-task > Components: Structured Streaming >Reporter: Tathagata Das >Assignee: Tathagata Das > Fix For: 2.2.0 > > > Basic API (without timeouts) as described in the parent JIRA SPARK-19067 -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-19413) Basic mapGroupsWithState API
[ https://issues.apache.org/jira/browse/SPARK-19413?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-19413: - Target Version/s: 2.2.0 (was: 2.1.1, 2.2.0) > Basic mapGroupsWithState API > > > Key: SPARK-19413 > URL: https://issues.apache.org/jira/browse/SPARK-19413 > Project: Spark > Issue Type: Sub-task > Components: Structured Streaming >Reporter: Tathagata Das >Assignee: Tathagata Das > Fix For: 2.2.0 > > > Basic API (without timeouts) as described in the parent JIRA SPARK-19067 -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Issue Comment Deleted] (SPARK-19413) Basic mapGroupsWithState API
[ https://issues.apache.org/jira/browse/SPARK-19413?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-19413: - Comment: was deleted (was: User 'tdas' has created a pull request for this issue: https://github.com/apache/spark/pull/16849) > Basic mapGroupsWithState API > > > Key: SPARK-19413 > URL: https://issues.apache.org/jira/browse/SPARK-19413 > Project: Spark > Issue Type: Sub-task > Components: Structured Streaming >Reporter: Tathagata Das >Assignee: Tathagata Das > Fix For: 2.1.1, 2.2.0 > > > Basic API (without timeouts) as described in the parent JIRA SPARK-19067 -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-19858) Add output mode to flatMapGroupsWithState and disallow invalid cases
[ https://issues.apache.org/jira/browse/SPARK-19858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-19858: - Issue Type: Sub-task (was: Improvement) Parent: SPARK-19067 > Add output mode to flatMapGroupsWithState and disallow invalid cases > > > Key: SPARK-19858 > URL: https://issues.apache.org/jira/browse/SPARK-19858 > Project: Spark > Issue Type: Sub-task > Components: Structured Streaming >Affects Versions: 2.1.1, 2.2.0 >Reporter: Shixiong Zhu >Assignee: Shixiong Zhu > -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-19858) Add output mode to flatMapGroupsWithState and disallow invalid cases
[ https://issues.apache.org/jira/browse/SPARK-19858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-19858: - Affects Version/s: 2.1.1 > Add output mode to flatMapGroupsWithState and disallow invalid cases > > > Key: SPARK-19858 > URL: https://issues.apache.org/jira/browse/SPARK-19858 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.1.1, 2.2.0 >Reporter: Shixiong Zhu >Assignee: Shixiong Zhu > -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-19481) Fix flaky test: o.a.s.repl.ReplSuite should clone and clean line object in ClosureCleaner
[ https://issues.apache.org/jira/browse/SPARK-19481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-19481: - Fix Version/s: 2.0.3 > Fix flaky test: o.a.s.repl.ReplSuite should clone and clean line object in > ClosureCleaner > - > > Key: SPARK-19481 > URL: https://issues.apache.org/jira/browse/SPARK-19481 > Project: Spark > Issue Type: Test > Components: Spark Shell >Affects Versions: 2.1.0 >Reporter: Shixiong Zhu >Assignee: Shixiong Zhu > Fix For: 2.0.3, 2.1.1, 2.2.0 > > > org.apache.spark.repl.cancelOnInterrupt leaks a SparkContext and makes the > tests unstable. See: > http://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.repl.ReplSuite&test_name=should+clone+and+clean+line+object+in+ClosureCleaner -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-19859) The new watermark should override the old one
[ https://issues.apache.org/jira/browse/SPARK-19859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu resolved SPARK-19859. -- Resolution: Fixed Fix Version/s: 2.2.0 2.1.1 > The new watermark should override the old one > - > > Key: SPARK-19859 > URL: https://issues.apache.org/jira/browse/SPARK-19859 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.1.0 >Reporter: Shixiong Zhu >Assignee: Shixiong Zhu > Fix For: 2.1.1, 2.2.0 > > > The new watermark should override the old one. Otherwise, we just pick up the > first column which has a watermark, it may be unexpected. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-19841) StreamingDeduplicateExec.watermarkPredicate should filter rows based on keys
[ https://issues.apache.org/jira/browse/SPARK-19841?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu resolved SPARK-19841. -- Resolution: Fixed Fix Version/s: 2.2.0 > StreamingDeduplicateExec.watermarkPredicate should filter rows based on keys > > > Key: SPARK-19841 > URL: https://issues.apache.org/jira/browse/SPARK-19841 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Shixiong Zhu >Assignee: Shixiong Zhu > Fix For: 2.2.0 > > > Right now it just uses the rows to filter but a column position in > keyExpressions may be different than the position in row. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-19859) The new watermark should override the old one
Shixiong Zhu created SPARK-19859: Summary: The new watermark should override the old one Key: SPARK-19859 URL: https://issues.apache.org/jira/browse/SPARK-19859 Project: Spark Issue Type: Bug Components: Structured Streaming Affects Versions: 2.1.0 Reporter: Shixiong Zhu Assignee: Shixiong Zhu The new watermark should override the old one. Otherwise, we just pick up the first column which has a watermark, it may be unexpected. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-19858) Add output mode to flatMapGroupsWithState and disallow invalid cases
Shixiong Zhu created SPARK-19858: Summary: Add output mode to flatMapGroupsWithState and disallow invalid cases Key: SPARK-19858 URL: https://issues.apache.org/jira/browse/SPARK-19858 Project: Spark Issue Type: Improvement Components: Structured Streaming Affects Versions: 2.2.0 Reporter: Shixiong Zhu Assignee: Shixiong Zhu -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-19853) Uppercase Kafka topics fail when startingOffsets are SpecificOffsets
[ https://issues.apache.org/jira/browse/SPARK-19853?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-19853: - Target Version/s: 2.2.0 > Uppercase Kafka topics fail when startingOffsets are SpecificOffsets > > > Key: SPARK-19853 > URL: https://issues.apache.org/jira/browse/SPARK-19853 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.1.0 >Reporter: Chris Bowden >Priority: Trivial > > When using the KafkaSource with Structured Streaming, consumer assignments > are not what the user expects if startingOffsets is set to an explicit set of > topics/partitions in JSON where the topic(s) happen to have uppercase > characters. When StartingOffsets is constructed, the original string value > from options is transformed toLowerCase to make matching on "earliest" and > "latest" case insensitive. However, the toLowerCase json is passed to > SpecificOffsets for the terminal condition, so topic names may not be what > the user intended by the time assignments are made with the underlying > KafkaConsumer. > From KafkaSourceProvider: > {code} > val startingOffsets = > caseInsensitiveParams.get(STARTING_OFFSETS_OPTION_KEY).map(_.trim.toLowerCase) > match { > case Some("latest") => LatestOffsets > case Some("earliest") => EarliestOffsets > case Some(json) => SpecificOffsets(JsonUtils.partitionOffsets(json)) > case None => LatestOffsets > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19764) Executors hang with supposedly running task that are really finished.
[ https://issues.apache.org/jira/browse/SPARK-19764?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15900127#comment-15900127 ] Shixiong Zhu commented on SPARK-19764: -- So you don't set an UncaughtExceptionHandler and this OOM happened on the driver side? If so, then it's not a Spark bug. > Executors hang with supposedly running task that are really finished. > - > > Key: SPARK-19764 > URL: https://issues.apache.org/jira/browse/SPARK-19764 > Project: Spark > Issue Type: Bug > Components: PySpark, Spark Core >Affects Versions: 2.0.2 > Environment: Ubuntu 16.04 LTS > OpenJDK Runtime Environment (build 1.8.0_121-8u121-b13-0ubuntu1.16.04.2-b13) > Spark 2.0.2 - Spark Cluster Manager >Reporter: Ari Gesher > Attachments: driver-log-stderr.log, executor-2.log, netty-6153.jpg, > SPARK-19764.tgz > > > We've come across a job that won't finish. Running on a six-node cluster, > each of the executors end up with 5-7 tasks that are never marked as > completed. > Here's an excerpt from the web UI: > ||Index ▴||ID||Attempt||Status||Locality Level||Executor ID / Host||Launch > Time||Duration||Scheduler Delay||Task Deserialization Time||GC Time||Result > Serialization Time||Getting Result Time||Peak Execution Memory||Shuffle Read > Size / Records||Errors|| > |105 | 1131 | 0 | SUCCESS |PROCESS_LOCAL |4 / 172.31.24.171 | > 2017/02/27 22:51:36 | 1.9 min | 9 ms | 4 ms | 0.7 s | 2 ms| 6 ms| > 384.1 MB| 90.3 MB / 572 | | > |106| 1168| 0| RUNNING |ANY| 2 / 172.31.16.112| 2017/02/27 > 22:53:25|6.5 h |0 ms| 0 ms| 1 s |0 ms| 0 ms| |384.1 MB > |98.7 MB / 624 | | > However, the Executor reports the task as finished: > {noformat} > 17/02/27 22:53:25 INFO Executor: Running task 106.0 in stage 5.0 (TID 1168) > 17/02/27 22:55:29 INFO Executor: Finished task 106.0 in stage 5.0 (TID 1168). > 2633558 bytes result sent via BlockManager) > {noformat} > As does the driver log: > {noformat} > 17/02/27 22:53:25 INFO Executor: Running task 106.0 in stage 5.0 (TID 1168) > 17/02/27 22:55:29 INFO Executor: Finished task 106.0 in stage 5.0 (TID 1168). > 2633558 bytes result sent via BlockManager) > {noformat} > Full log from this executor and the {{stderr}} from > {{app-20170227223614-0001/2/stderr}} attached. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-19851) Add support for EVERY and ANY (SOME) aggregates
[ https://issues.apache.org/jira/browse/SPARK-19851?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-19851: - Component/s: (was: Spark Core) > Add support for EVERY and ANY (SOME) aggregates > --- > > Key: SPARK-19851 > URL: https://issues.apache.org/jira/browse/SPARK-19851 > Project: Spark > Issue Type: Improvement > Components: PySpark, SQL >Affects Versions: 2.1.0 >Reporter: Michael Styles > > Add support for EVERY and ANY (SOME) aggregates. > - EVERY returns true if all input values are true. > - ANY returns true if at least one input value is true. > - SOME is equivalent to ANY. > Both aggregates are part of the SQL standard. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19764) Executors hang with supposedly running task that are really finished.
[ https://issues.apache.org/jira/browse/SPARK-19764?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15899980#comment-15899980 ] Shixiong Zhu commented on SPARK-19764: -- [~agesher] Do you have the OOM stack trace? So that we can fix it. > Executors hang with supposedly running task that are really finished. > - > > Key: SPARK-19764 > URL: https://issues.apache.org/jira/browse/SPARK-19764 > Project: Spark > Issue Type: Bug > Components: PySpark, Spark Core >Affects Versions: 2.0.2 > Environment: Ubuntu 16.04 LTS > OpenJDK Runtime Environment (build 1.8.0_121-8u121-b13-0ubuntu1.16.04.2-b13) > Spark 2.0.2 - Spark Cluster Manager >Reporter: Ari Gesher > Attachments: driver-log-stderr.log, executor-2.log, netty-6153.jpg, > SPARK-19764.tgz > > > We've come across a job that won't finish. Running on a six-node cluster, > each of the executors end up with 5-7 tasks that are never marked as > completed. > Here's an excerpt from the web UI: > ||Index ▴||ID||Attempt||Status||Locality Level||Executor ID / Host||Launch > Time||Duration||Scheduler Delay||Task Deserialization Time||GC Time||Result > Serialization Time||Getting Result Time||Peak Execution Memory||Shuffle Read > Size / Records||Errors|| > |105 | 1131 | 0 | SUCCESS |PROCESS_LOCAL |4 / 172.31.24.171 | > 2017/02/27 22:51:36 | 1.9 min | 9 ms | 4 ms | 0.7 s | 2 ms| 6 ms| > 384.1 MB| 90.3 MB / 572 | | > |106| 1168| 0| RUNNING |ANY| 2 / 172.31.16.112| 2017/02/27 > 22:53:25|6.5 h |0 ms| 0 ms| 1 s |0 ms| 0 ms| |384.1 MB > |98.7 MB / 624 | | > However, the Executor reports the task as finished: > {noformat} > 17/02/27 22:53:25 INFO Executor: Running task 106.0 in stage 5.0 (TID 1168) > 17/02/27 22:55:29 INFO Executor: Finished task 106.0 in stage 5.0 (TID 1168). > 2633558 bytes result sent via BlockManager) > {noformat} > As does the driver log: > {noformat} > 17/02/27 22:53:25 INFO Executor: Running task 106.0 in stage 5.0 (TID 1168) > 17/02/27 22:55:29 INFO Executor: Finished task 106.0 in stage 5.0 (TID 1168). > 2633558 bytes result sent via BlockManager) > {noformat} > Full log from this executor and the {{stderr}} from > {{app-20170227223614-0001/2/stderr}} attached. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-19841) StreamingDeduplicateExec.watermarkPredicate should filter rows based on keys
Shixiong Zhu created SPARK-19841: Summary: StreamingDeduplicateExec.watermarkPredicate should filter rows based on keys Key: SPARK-19841 URL: https://issues.apache.org/jira/browse/SPARK-19841 Project: Spark Issue Type: Bug Components: Structured Streaming Affects Versions: 2.2.0 Reporter: Shixiong Zhu Assignee: Shixiong Zhu Right now it just uses the rows to filter but a column position in keyExpressions may be different than the position in row. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19831) Sending the heartbeat master from worker maybe blocked by other rpc messages
[ https://issues.apache.org/jira/browse/SPARK-19831?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15897789#comment-15897789 ] Shixiong Zhu commented on SPARK-19831: -- Cores running in the receive method should be quick. If that's not true, such codes should be run in a separate thread. Which part of codes in Worker did you find is very slow? > Sending the heartbeat master from worker maybe blocked by other rpc messages > -- > > Key: SPARK-19831 > URL: https://issues.apache.org/jira/browse/SPARK-19831 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.2.0 >Reporter: hustfxj >Priority: Minor > > Cleaning the application may cost much time at worker, then it will block > that the worker send heartbeats master because the worker is extend > *ThreadSafeRpcEndpoint*. If the heartbeat from a worker is blocked by the > message *ApplicationFinished*, master will think the worker is dead. If the > worker has a driver, the driver will be scheduled by master again. So I think > it is the bug on spark. It may solve this problem by the followed suggests: > 1. It had better put the cleaning the application in a single asynchronous > thread like 'cleanupThreadExecutor'. Thus it won't block other rpc messages > like *SendHeartbeat*; > 2. It had better not send the heartbeat master by Rpc channel. Because any > other rpc message may block the rpc channel. It had better send the heartbeat > master at an asynchronous timing thread . -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-19822) CheckpointSuite.testCheckpointedOperation: should not check checkpointFilesOfLatestTime by the PATH string.
[ https://issues.apache.org/jira/browse/SPARK-19822?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu resolved SPARK-19822. -- Resolution: Fixed Assignee: Genmao Yu Fix Version/s: 2.2.0 2.1.1 2.0.3 > CheckpointSuite.testCheckpointedOperation: should not check > checkpointFilesOfLatestTime by the PATH string. > --- > > Key: SPARK-19822 > URL: https://issues.apache.org/jira/browse/SPARK-19822 > Project: Spark > Issue Type: Bug > Components: Tests >Affects Versions: 2.0.2, 2.1.0 >Reporter: Genmao Yu >Assignee: Genmao Yu >Priority: Minor > Fix For: 2.0.3, 2.1.1, 2.2.0 > > -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-19798) Query returns stale results when tables are modified on other sessions
[ https://issues.apache.org/jira/browse/SPARK-19798?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-19798: - Component/s: (was: Spark Core) SQL > Query returns stale results when tables are modified on other sessions > -- > > Key: SPARK-19798 > URL: https://issues.apache.org/jira/browse/SPARK-19798 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: Giambattista > > I observed the problem on master branch with thrift server in multisession > mode (default), but I was able to replicate also with spark-shell as well > (see below the sequence for replicating). > I observed cases where changes made in a session (table insert, table > renaming) are not visible to other derived sessions (created with > session.newSession). > The problem seems due to the fact that each session has its own > tableRelationCache and it does not get refreshed. > IMO tableRelationCache should be shared in sharedState, maybe in the > cacheManager so that refresh of caches for data that is not session-specific > such as temporary tables gets centralized. > --- Spark shell script > val spark2 = spark.newSession > spark.sql("CREATE TABLE test (a int) using parquet") > spark2.sql("select * from test").show // OK returns empty > spark.sql("select * from test").show // OK returns empty > spark.sql("insert into TABLE test values 1,2,3") > spark2.sql("select * from test").show // ERROR returns empty > spark.sql("select * from test").show // OK returns 3,2,1 > spark.sql("create table test2 (a int) using parquet") > spark.sql("insert into TABLE test2 values 4,5,6") > spark2.sql("select * from test2").show // OK returns 6,4,5 > spark.sql("select * from test2").show // OK returns 6,4,5 > spark.sql("alter table test rename to test3") > spark.sql("alter table test2 rename to test") > spark.sql("alter table test3 rename to test2") > spark2.sql("select * from test").show // ERROR returns empty > spark.sql("select * from test").show // OK returns 6,4,5 > spark2.sql("select * from test2").show // ERROR throws > java.io.FileNotFoundException > spark.sql("select * from test2").show // OK returns 3,1,2 -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-19821) Throw out the Read-only disk information when create file for Shuffle
[ https://issues.apache.org/jira/browse/SPARK-19821?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-19821: - Priority: Minor (was: Major) > Throw out the Read-only disk information when create file for Shuffle > - > > Key: SPARK-19821 > URL: https://issues.apache.org/jira/browse/SPARK-19821 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Affects Versions: 2.0.2 >Reporter: DjvuLee >Priority: Minor > > java.io.FileNotFoundException: > /data01/yarn/nmdata/usercache/tiger/appcache/application_1486364177723_1047735/blockmgr-23098754-a97a-4673-ba73-3de5e167da87/2c/shuffle_55_47_0.index.0347f74b-a9c1-473e-b81f-40be394cc00f > (Input/output error) > at java.io.FileOutputStream.open0(Native Method) > at java.io.FileOutputStream.open(FileOutputStream.java:270) > at java.io.FileOutputStream.(FileOutputStream.java:213) > at java.io.FileOutputStream.(FileOutputStream.java:162) > at > org.apache.spark.shuffle.IndexShuffleBlockResolver.writeIndexFileAndCommit(IndexShuffleBlockResolver.scala:143) > at > org.apache.spark.shuffle.sort.UnsafeShuffleWriter.closeAndWriteOutput(UnsafeShuffleWriter.java:219) > at > org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:164) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) > at org.apache.spark.scheduler.Task.run(Task.scala:86) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:314) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19821) Throw out the Read-only disk information when create file for Shuffle
[ https://issues.apache.org/jira/browse/SPARK-19821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15896079#comment-15896079 ] Shixiong Zhu commented on SPARK-19821: -- This is more like a Java issue. > Throw out the Read-only disk information when create file for Shuffle > - > > Key: SPARK-19821 > URL: https://issues.apache.org/jira/browse/SPARK-19821 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Affects Versions: 2.0.2 >Reporter: DjvuLee >Priority: Minor > > java.io.FileNotFoundException: > /data01/yarn/nmdata/usercache/tiger/appcache/application_1486364177723_1047735/blockmgr-23098754-a97a-4673-ba73-3de5e167da87/2c/shuffle_55_47_0.index.0347f74b-a9c1-473e-b81f-40be394cc00f > (Input/output error) > at java.io.FileOutputStream.open0(Native Method) > at java.io.FileOutputStream.open(FileOutputStream.java:270) > at java.io.FileOutputStream.(FileOutputStream.java:213) > at java.io.FileOutputStream.(FileOutputStream.java:162) > at > org.apache.spark.shuffle.IndexShuffleBlockResolver.writeIndexFileAndCommit(IndexShuffleBlockResolver.scala:143) > at > org.apache.spark.shuffle.sort.UnsafeShuffleWriter.closeAndWriteOutput(UnsafeShuffleWriter.java:219) > at > org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:164) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) > at org.apache.spark.scheduler.Task.run(Task.scala:86) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:314) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-19816) DataFrameCallbackSuite doesn't recover the log level
[ https://issues.apache.org/jira/browse/SPARK-19816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-19816: - Affects Version/s: (was: 2.2.0) > DataFrameCallbackSuite doesn't recover the log level > > > Key: SPARK-19816 > URL: https://issues.apache.org/jira/browse/SPARK-19816 > Project: Spark > Issue Type: Test > Components: SQL, Tests >Affects Versions: 2.1.0 >Reporter: Shixiong Zhu >Assignee: Shixiong Zhu > Fix For: 2.1.1, 2.2.0 > > > "DataFrameCallbackSuite.execute callback functions when a DataFrame action > failed" sets the log level to "fatal" but doesn't recover it. Hence, tests > running after it won't output any logs except fatal logs. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-19816) DataFrameCallbackSuite doesn't recover the log level
[ https://issues.apache.org/jira/browse/SPARK-19816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-19816: - Affects Version/s: 2.1.0 > DataFrameCallbackSuite doesn't recover the log level > > > Key: SPARK-19816 > URL: https://issues.apache.org/jira/browse/SPARK-19816 > Project: Spark > Issue Type: Test > Components: SQL, Tests >Affects Versions: 2.1.0 >Reporter: Shixiong Zhu >Assignee: Shixiong Zhu > Fix For: 2.1.1, 2.2.0 > > > "DataFrameCallbackSuite.execute callback functions when a DataFrame action > failed" sets the log level to "fatal" but doesn't recover it. Hence, tests > running after it won't output any logs except fatal logs. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-19718) Fix flaky test: org.apache.spark.sql.kafka010.KafkaSourceStressForDontFailOnDataLossSuite: stress test for failOnDataLoss=false
[ https://issues.apache.org/jira/browse/SPARK-19718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu resolved SPARK-19718. -- Resolution: Fixed Assignee: Shixiong Zhu Fix Version/s: 2.2.0 > Fix flaky test: > org.apache.spark.sql.kafka010.KafkaSourceStressForDontFailOnDataLossSuite: > stress test for failOnDataLoss=false > --- > > Key: SPARK-19718 > URL: https://issues.apache.org/jira/browse/SPARK-19718 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Shixiong Zhu >Assignee: Shixiong Zhu > Fix For: 2.2.0 > > > SPARK-19617 changed HDFSMetadataLog to enable interrupts when using the local > file system. However, now we hit HADOOP-12074: `Shell.runCommand` converts > `InterruptedException` to `new IOException(ie.toString())` before Hadoop 2.8. > Test failure: > https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.6/2504/consoleFull > {code} > [info] - stress test for failOnDataLoss=false *** FAILED *** (1 minute, 1 > second) > [info] org.apache.spark.sql.streaming.StreamingQueryException: Query [id = > 27d45f4f-14dc-4c74-8b52-4bbd4f2b9bec, runId = > 23b8c1ea-4da9-4096-967a-692933e4b319] terminated with exception: > java.lang.InterruptedException > [info] at > org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:304) > [info] at > org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:190) > [info] Cause: java.io.IOException: java.lang.InterruptedException > [info] at org.apache.hadoop.util.Shell.runCommand(Shell.java:578) > [info] at org.apache.hadoop.util.Shell.run(Shell.java:478) > [info] at > org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:766) > [info] at org.apache.hadoop.util.Shell.execCommand(Shell.java:859) > [info] at org.apache.hadoop.util.Shell.execCommand(Shell.java:842) > [info] at > org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:661) > [info] at > org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:300) > [info] at > org.apache.hadoop.fs.FileSystem.primitiveCreate(FileSystem.java:1014) > [info] at > org.apache.hadoop.fs.DelegateToFileSystem.createInternal(DelegateToFileSystem.java:85) > [info] at > org.apache.hadoop.fs.ChecksumFs$ChecksumFSOutputSummer.(ChecksumFs.java:354) > [info] at > org.apache.hadoop.fs.ChecksumFs.createInternal(ChecksumFs.java:394) > [info] at > org.apache.hadoop.fs.AbstractFileSystem.create(AbstractFileSystem.java:577) > [info] at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:680) > [info] at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:676) > [info] at > org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90) > [info] at org.apache.hadoop.fs.FileContext.create(FileContext.java:676) > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-19816) DataFrameCallbackSuite doesn't recover the log level
Shixiong Zhu created SPARK-19816: Summary: DataFrameCallbackSuite doesn't recover the log level Key: SPARK-19816 URL: https://issues.apache.org/jira/browse/SPARK-19816 Project: Spark Issue Type: Test Components: SQL, Tests Affects Versions: 2.2.0 Reporter: Shixiong Zhu Assignee: Shixiong Zhu "DataFrameCallbackSuite.execute callback functions when a DataFrame action failed" sets the log level to "fatal" but doesn't recover it. Hence, tests running after it won't output any logs except fatal logs. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-19774) StreamExecution should call stop() on sources when a stream fails
[ https://issues.apache.org/jira/browse/SPARK-19774?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu resolved SPARK-19774. -- Resolution: Fixed Fix Version/s: 2.2.0 2.1.1 > StreamExecution should call stop() on sources when a stream fails > - > > Key: SPARK-19774 > URL: https://issues.apache.org/jira/browse/SPARK-19774 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.0.2, 2.1.0 >Reporter: Burak Yavuz >Assignee: Burak Yavuz > Fix For: 2.1.1, 2.2.0 > > > We call stop() on a Structured Streaming Source only when the stream is > shutdown when a user calls streamingQuery.stop(). We should actually stop all > sources when the stream fails as well, otherwise we may leak resources, e.g. > connections to Kafka. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-19779) structured streaming exist needless tmp file
[ https://issues.apache.org/jira/browse/SPARK-19779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu reassigned SPARK-19779: Assignee: Feng Gui > structured streaming exist needless tmp file > - > > Key: SPARK-19779 > URL: https://issues.apache.org/jira/browse/SPARK-19779 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.0.3, 2.1.1, 2.2.0 >Reporter: Feng Gui >Assignee: Feng Gui >Priority: Minor > Fix For: 2.0.3, 2.1.1, 2.2.0 > > > The PR (https://github.com/apache/spark/pull/17012) can to fix restart a > Structured Streaming application using hdfs as fileSystem, but also exist a > problem that a tmp file of delta file is still reserved in hdfs. And > Structured Streaming don't delete the tmp file generated when restart > streaming job in future, so we need to delete the tmp file after restart > streaming job. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-19779) structured streaming exist needless tmp file
[ https://issues.apache.org/jira/browse/SPARK-19779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu resolved SPARK-19779. -- Resolution: Fixed Fix Version/s: 2.2.0 2.1.1 2.0.3 > structured streaming exist needless tmp file > - > > Key: SPARK-19779 > URL: https://issues.apache.org/jira/browse/SPARK-19779 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.0.3, 2.1.1, 2.2.0 >Reporter: Feng Gui >Priority: Minor > Fix For: 2.0.3, 2.1.1, 2.2.0 > > > The PR (https://github.com/apache/spark/pull/17012) can to fix restart a > Structured Streaming application using hdfs as fileSystem, but also exist a > problem that a tmp file of delta file is still reserved in hdfs. And > Structured Streaming don't delete the tmp file generated when restart > streaming job in future, so we need to delete the tmp file after restart > streaming job. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-19779) structured streaming exist needless tmp file
[ https://issues.apache.org/jira/browse/SPARK-19779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-19779: - Affects Version/s: 2.1.1 2.0.3 > structured streaming exist needless tmp file > - > > Key: SPARK-19779 > URL: https://issues.apache.org/jira/browse/SPARK-19779 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.0.3, 2.1.1, 2.2.0 >Reporter: Feng Gui >Priority: Minor > > The PR (https://github.com/apache/spark/pull/17012) can to fix restart a > Structured Streaming application using hdfs as fileSystem, but also exist a > problem that a tmp file of delta file is still reserved in hdfs. And > Structured Streaming don't delete the tmp file generated when restart > streaming job in future, so we need to delete the tmp file after restart > streaming job. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-19779) structured streaming exist needless tmp file
[ https://issues.apache.org/jira/browse/SPARK-19779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-19779: - Affects Version/s: (was: 2.1.0) 2.2.0 > structured streaming exist needless tmp file > - > > Key: SPARK-19779 > URL: https://issues.apache.org/jira/browse/SPARK-19779 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Feng Gui >Priority: Minor > > The PR (https://github.com/apache/spark/pull/17012) can to fix restart a > Structured Streaming application using hdfs as fileSystem, but also exist a > problem that a tmp file of delta file is still reserved in hdfs. And > Structured Streaming don't delete the tmp file generated when restart > streaming job in future, so we need to delete the tmp file after restart > streaming job. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-19788) DataStreamReader/DataStreamWriter.option shall accept user-defined type
[ https://issues.apache.org/jira/browse/SPARK-19788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15891722#comment-15891722 ] Shixiong Zhu edited comment on SPARK-19788 at 3/2/17 7:04 AM: -- I remember that it's because we want to support both Scala and Python (maybe also R). If it accepts user-defined types, we don't know how to convert Python options to Scala options. was (Author: zsxwing): I remember that we want to support both Scala and Python. If it accepts user-defined types, we don't know how to convert Python options to Scala options. > DataStreamReader/DataStreamWriter.option shall accept user-defined type > --- > > Key: SPARK-19788 > URL: https://issues.apache.org/jira/browse/SPARK-19788 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.1.0 >Reporter: Nan Zhu > > There are many other data sources/sinks which has very different > configuration ways than Kafka, FileSystem, etc. > The expected type of the configuration entry passed to them might be a nested > collection type, e.g. Map[String, Map[String, String]], or even a > user-defined type(for example, the one I am working on) > Right now, option can only accept String -> String/Boolean/Long/Double OR a > complete Map[String, String]...my suggestion is that we can accept > Map[String, Any], and the type of 'parameters' in SourceProvider.createSource > can also be Map[String, Any], this will create much more flexibility to the > user > The drawback is that, it is a breaking change ( we can mitigate this by > deprecating the current one, and progressively evolve to the new one if the > proposal is accepted) > [~zsxwing] what do you think? -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19788) DataStreamReader/DataStreamWriter.option shall accept user-defined type
[ https://issues.apache.org/jira/browse/SPARK-19788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15891722#comment-15891722 ] Shixiong Zhu commented on SPARK-19788: -- I remember that we want to support both Scala and Python. If it accepts user-defined types, we don't know how to convert Python options to Scala options. > DataStreamReader/DataStreamWriter.option shall accept user-defined type > --- > > Key: SPARK-19788 > URL: https://issues.apache.org/jira/browse/SPARK-19788 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.1.0 >Reporter: Nan Zhu > > There are many other data sources/sinks which has very different > configuration ways than Kafka, FileSystem, etc. > The expected type of the configuration entry passed to them might be a nested > collection type, e.g. Map[String, Map[String, String]], or even a > user-defined type(for example, the one I am working on) > Right now, option can only accept String -> String/Boolean/Long/Double OR a > complete Map[String, String]...my suggestion is that we can accept > Map[String, Any], and the type of 'parameters' in SourceProvider.createSource > can also be Map[String, Any], this will create much more flexibility to the > user > The drawback is that, it is a breaking change ( we can mitigate this by > deprecating the current one, and progressively evolve to the new one if the > proposal is accepted) > [~zsxwing] what do you think? -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19768) Error for both aggregate and non-aggregate queries in Structured Streaming - "This query does not support recovering from checkpoint location"
[ https://issues.apache.org/jira/browse/SPARK-19768?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15890546#comment-15890546 ] Shixiong Zhu commented on SPARK-19768: -- Yeah, just recalled that I fixed the error message in https://github.com/apache/spark/pull/16970/files#diff-cb5dbc84ef906b6de2b6e36da45a86c3L78 There is a page about the supported operators on this page http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes > Error for both aggregate and non-aggregate queries in Structured Streaming > - "This query does not support recovering from checkpoint location" > - > > Key: SPARK-19768 > URL: https://issues.apache.org/jira/browse/SPARK-19768 > Project: Spark > Issue Type: Question > Components: Structured Streaming >Affects Versions: 2.1.0 >Reporter: Amit Baghel > > I am running JavaStructuredKafkaWordCount.java example with > checkpointLocation. Output mode is "complete". Below is relevant code. > {code} > // Generate running word count > Dataset wordCounts = lines.flatMap(new FlatMapFunction String>() { > @Override > public Iterator call(String x) { > return Arrays.asList(x.split(" ")).iterator(); > } > }, Encoders.STRING()).groupBy("value").count(); > // Start running the query that prints the running counts to the console > StreamingQuery query = wordCounts.writeStream() > .outputMode("complete") > .format("console") > .option("checkpointLocation", "/tmp/checkpoint-data") > .start(); > {code} > This example runs successfully and writes data in checkpoint directory. When > I re-run the program it throws below exception > {code} > Exception in thread "main" org.apache.spark.sql.AnalysisException: This query > does not support recovering from checkpoint location. Delete > /tmp/checkpoint-data/offsets to start over.; > at > org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:219) > at > org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:269) > at > org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:262) > at > com.spark.JavaStructuredKafkaWordCount.main(JavaStructuredKafkaWordCount.java:85) > {code} > Then I modified JavaStructuredKafkaWordCount.java to have non aggregate query > with output mode as "append". Please see the code below. > {code} > // no aggregations > Dataset wordCounts = lines.flatMap(new FlatMapFunction String>() { > @Override > public Iterator call(String x) { > return Arrays.asList(x.split(" ")).iterator(); > } > }, Encoders.STRING()).select("value"); > // append mode with console > StreamingQuery query = wordCounts.writeStream() > .outputMode("append") > .format("console") > .option("checkpointLocation", "/tmp/checkpoint-data") > .start(); > {code} > This modified code runs successfully and writes data in checkpoint directory. > When I re-run the program it throws same exception > {code} > Exception in thread "main" org.apache.spark.sql.AnalysisException: This query > does not support recovering from checkpoint location. Delete > /tmp/checkpoint-data/offsets to start over.; > at > org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:219) > at > org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:269) > at > org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:262) > at > com.spark.JavaStructuredKafkaWordCount.main(JavaStructuredKafkaWordCount.java:85) > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-19633) FileSource read from FileSink
[ https://issues.apache.org/jira/browse/SPARK-19633?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu resolved SPARK-19633. -- Resolution: Fixed Assignee: Liwei Lin Fix Version/s: 2.2.0 > FileSource read from FileSink > - > > Key: SPARK-19633 > URL: https://issues.apache.org/jira/browse/SPARK-19633 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming >Affects Versions: 2.1.0 >Reporter: Michael Armbrust >Assignee: Liwei Lin >Priority: Critical > Fix For: 2.2.0 > > > Right now, you can't start a streaming query from a location that is being > written to by the file sink. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19768) Error for both aggregate and non-aggregate queries in Structured Streaming - "This query does not support recovering from checkpoint location"
[ https://issues.apache.org/jira/browse/SPARK-19768?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15889639#comment-15889639 ] Shixiong Zhu commented on SPARK-19768: -- It should work for both aggregate and non-aggregate queries, but it only supports "append" mode. > Error for both aggregate and non-aggregate queries in Structured Streaming > - "This query does not support recovering from checkpoint location" > - > > Key: SPARK-19768 > URL: https://issues.apache.org/jira/browse/SPARK-19768 > Project: Spark > Issue Type: Question > Components: Structured Streaming >Affects Versions: 2.1.0 >Reporter: Amit Baghel > > I am running JavaStructuredKafkaWordCount.java example with > checkpointLocation. Output mode is "complete". Below is relevant code. > {code} > // Generate running word count > Dataset wordCounts = lines.flatMap(new FlatMapFunction String>() { > @Override > public Iterator call(String x) { > return Arrays.asList(x.split(" ")).iterator(); > } > }, Encoders.STRING()).groupBy("value").count(); > // Start running the query that prints the running counts to the console > StreamingQuery query = wordCounts.writeStream() > .outputMode("complete") > .format("console") > .option("checkpointLocation", "/tmp/checkpoint-data") > .start(); > {code} > This example runs successfully and writes data in checkpoint directory. When > I re-run the program it throws below exception > {code} > Exception in thread "main" org.apache.spark.sql.AnalysisException: This query > does not support recovering from checkpoint location. Delete > /tmp/checkpoint-data/offsets to start over.; > at > org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:219) > at > org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:269) > at > org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:262) > at > com.spark.JavaStructuredKafkaWordCount.main(JavaStructuredKafkaWordCount.java:85) > {code} > Then I modified JavaStructuredKafkaWordCount.java to have non aggregate query > with output mode as "append". Please see the code below. > {code} > // no aggregations > Dataset wordCounts = lines.flatMap(new FlatMapFunction String>() { > @Override > public Iterator call(String x) { > return Arrays.asList(x.split(" ")).iterator(); > } > }, Encoders.STRING()).select("value"); > // append mode with console > StreamingQuery query = wordCounts.writeStream() > .outputMode("append") > .format("console") > .option("checkpointLocation", "/tmp/checkpoint-data") > .start(); > {code} > This modified code runs successfully and writes data in checkpoint directory. > When I re-run the program it throws same exception > {code} > Exception in thread "main" org.apache.spark.sql.AnalysisException: This query > does not support recovering from checkpoint location. Delete > /tmp/checkpoint-data/offsets to start over.; > at > org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:219) > at > org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:269) > at > org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:262) > at > com.spark.JavaStructuredKafkaWordCount.main(JavaStructuredKafkaWordCount.java:85) > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19764) Executors hang with supposedly running task that are really finished.
[ https://issues.apache.org/jira/browse/SPARK-19764?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15889611#comment-15889611 ] Shixiong Zhu commented on SPARK-19764: -- These are master and workers. From the master log, you are using pyspark with the client mode. The driver logs should just output to the console. Could you paste the output of pyspark shell? > Executors hang with supposedly running task that are really finished. > - > > Key: SPARK-19764 > URL: https://issues.apache.org/jira/browse/SPARK-19764 > Project: Spark > Issue Type: Bug > Components: PySpark, Spark Core >Affects Versions: 2.0.2 > Environment: Ubuntu 16.04 LTS > OpenJDK Runtime Environment (build 1.8.0_121-8u121-b13-0ubuntu1.16.04.2-b13) > Spark 2.0.2 - Spark Cluster Manager >Reporter: Ari Gesher > Attachments: driver-log-stderr.log, executor-2.log, netty-6153.jpg, > SPARK-19764.tgz > > > We've come across a job that won't finish. Running on a six-node cluster, > each of the executors end up with 5-7 tasks that are never marked as > completed. > Here's an excerpt from the web UI: > ||Index ▴||ID||Attempt||Status||Locality Level||Executor ID / Host||Launch > Time||Duration||Scheduler Delay||Task Deserialization Time||GC Time||Result > Serialization Time||Getting Result Time||Peak Execution Memory||Shuffle Read > Size / Records||Errors|| > |105 | 1131 | 0 | SUCCESS |PROCESS_LOCAL |4 / 172.31.24.171 | > 2017/02/27 22:51:36 | 1.9 min | 9 ms | 4 ms | 0.7 s | 2 ms| 6 ms| > 384.1 MB| 90.3 MB / 572 | | > |106| 1168| 0| RUNNING |ANY| 2 / 172.31.16.112| 2017/02/27 > 22:53:25|6.5 h |0 ms| 0 ms| 1 s |0 ms| 0 ms| |384.1 MB > |98.7 MB / 624 | | > However, the Executor reports the task as finished: > {noformat} > 17/02/27 22:53:25 INFO Executor: Running task 106.0 in stage 5.0 (TID 1168) > 17/02/27 22:55:29 INFO Executor: Finished task 106.0 in stage 5.0 (TID 1168). > 2633558 bytes result sent via BlockManager) > {noformat} > As does the driver log: > {noformat} > 17/02/27 22:53:25 INFO Executor: Running task 106.0 in stage 5.0 (TID 1168) > 17/02/27 22:55:29 INFO Executor: Finished task 106.0 in stage 5.0 (TID 1168). > 2633558 bytes result sent via BlockManager) > {noformat} > Full log from this executor and the {{stderr}} from > {{app-20170227223614-0001/2/stderr}} attached. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19764) Executors hang with supposedly running task that are really finished.
[ https://issues.apache.org/jira/browse/SPARK-19764?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15889026#comment-15889026 ] Shixiong Zhu commented on SPARK-19764: -- [~agesher] driver-log-stderr.log is actually the executor log. Did you upload a wrong file? > Executors hang with supposedly running task that are really finished. > - > > Key: SPARK-19764 > URL: https://issues.apache.org/jira/browse/SPARK-19764 > Project: Spark > Issue Type: Bug > Components: PySpark, Spark Core >Affects Versions: 2.0.2 > Environment: Ubuntu 16.04 LTS > OpenJDK Runtime Environment (build 1.8.0_121-8u121-b13-0ubuntu1.16.04.2-b13) > Spark 2.0.2 - Spark Cluster Manager >Reporter: Ari Gesher > Attachments: driver-log-stderr.log, executor-2.log > > > We've come across a job that won't finish. Running on a six-node cluster, > each of the executors end up with 5-7 tasks that are never marked as > completed. > Here's an excerpt from the web UI: > ||Index ▴||ID||Attempt||Status||Locality Level||Executor ID / Host||Launch > Time||Duration||Scheduler Delay||Task Deserialization Time||GC Time||Result > Serialization Time||Getting Result Time||Peak Execution Memory||Shuffle Read > Size / Records||Errors|| > |105 | 1131 | 0 | SUCCESS |PROCESS_LOCAL |4 / 172.31.24.171 | > 2017/02/27 22:51:36 | 1.9 min | 9 ms | 4 ms | 0.7 s | 2 ms| 6 ms| > 384.1 MB| 90.3 MB / 572 | | > |106| 1168| 0| RUNNING |ANY| 2 / 172.31.16.112| 2017/02/27 > 22:53:25|6.5 h |0 ms| 0 ms| 1 s |0 ms| 0 ms| |384.1 MB > |98.7 MB / 624 | | > However, the Executor reports the task as finished: > {noformat} > 17/02/27 22:53:25 INFO Executor: Running task 106.0 in stage 5.0 (TID 1168) > 17/02/27 22:55:29 INFO Executor: Finished task 106.0 in stage 5.0 (TID 1168). > 2633558 bytes result sent via BlockManager) > {noformat} > As does the driver log: > {noformat} > 17/02/27 22:53:25 INFO Executor: Running task 106.0 in stage 5.0 (TID 1168) > 17/02/27 22:55:29 INFO Executor: Finished task 106.0 in stage 5.0 (TID 1168). > 2633558 bytes result sent via BlockManager) > {noformat} > Full log from this executor and the {{stderr}} from > {{app-20170227223614-0001/2/stderr}} attached. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-19772) Flaky test: pyspark.streaming.tests.WindowFunctionTests
[ https://issues.apache.org/jira/browse/SPARK-19772?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-19772: - Component/s: (was: Structured Streaming) DStreams > Flaky test: pyspark.streaming.tests.WindowFunctionTests > --- > > Key: SPARK-19772 > URL: https://issues.apache.org/jira/browse/SPARK-19772 > Project: Spark > Issue Type: Bug > Components: DStreams, PySpark, Tests >Affects Versions: 2.2.0 >Reporter: Kay Ousterhout > Labels: flaky-test > > Here's the link to the failed build: > https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73598 > FAIL [16.440s]: test_count_by_value_and_window > (pyspark.streaming.tests.WindowFunctionTests) > -- > Traceback (most recent call last): > File > "/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/streaming/tests.py", > line 668, in test_count_by_value_and_window > self._test_func(input, func, expected) > File > "/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/streaming/tests.py", > line 162, in _test_func > self.assertEqual(expected, result) > AssertionError: Lists differ: [[(0,[312 chars] 2), (5, 1)], [(0, 1), (1, 1), > (2, 1), (3, 1), (4, 1), (5, 1)]] != [[(0,[312 chars] 2), (5, 1)]] > First list contains 1 additional elements. > First extra element 9: > [(0, 1), (1, 1), (2, 1), (3, 1), (4, 1), (5, 1)] > [[(0, 1)], >[(0, 2), (1, 1)], >[(0, 3), (1, 2), (2, 1)], >[(0, 4), (1, 3), (2, 2), (3, 1)], >[(0, 5), (1, 4), (2, 3), (3, 2), (4, 1)], >[(0, 5), (1, 5), (2, 4), (3, 3), (4, 2), (5, 1)], >[(0, 4), (1, 4), (2, 4), (3, 3), (4, 2), (5, 1)], >[(0, 3), (1, 3), (2, 3), (3, 3), (4, 2), (5, 1)], > - [(0, 2), (1, 2), (2, 2), (3, 2), (4, 2), (5, 1)], > ? ^ > + [(0, 2), (1, 2), (2, 2), (3, 2), (4, 2), (5, 1)]] > ? ^ > - [(0, 1), (1, 1), (2, 1), (3, 1), (4, 1), (5, 1)]] > Stdout: > ('timeout after', 15) -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19767) API Doc pages for Streaming with Kafka 0.10 not current
[ https://issues.apache.org/jira/browse/SPARK-19767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15888939#comment-15888939 ] Shixiong Zhu commented on SPARK-19767: -- Make sure you installed all required libraries in https://github.com/apache/spark/blob/master/docs/README.md > API Doc pages for Streaming with Kafka 0.10 not current > --- > > Key: SPARK-19767 > URL: https://issues.apache.org/jira/browse/SPARK-19767 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.1.0 >Reporter: Nick >Priority: Minor > > The API docs linked from the Spark Kafka 0.10 Integration page are not > current. For instance, on the page >https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html > the code examples show the new API (i.e. class ConsumerStrategies). However, > following the links > API Docs --> (Scala | Java) > lead to API pages that do not have class ConsumerStrategies) . The API doc > package names also have {code}streaming.kafka{code} as opposed to > {code}streaming.kafka10{code} > as in the code examples on streaming-kafka-0-10-integration.html. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19767) API Doc pages for Streaming with Kafka 0.10 not current
[ https://issues.apache.org/jira/browse/SPARK-19767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15888670#comment-15888670 ] Shixiong Zhu commented on SPARK-19767: -- You can use {{SKIP_API=1 jekyll build}} to build the docs. I guess the api doc is broken now. > API Doc pages for Streaming with Kafka 0.10 not current > --- > > Key: SPARK-19767 > URL: https://issues.apache.org/jira/browse/SPARK-19767 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.1.0 >Reporter: Nick >Priority: Minor > > The API docs linked from the Spark Kafka 0.10 Integration page are not > current. For instance, on the page >https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html > the code examples show the new API (i.e. class ConsumerStrategies). However, > following the links > API Docs --> (Scala | Java) > lead to API pages that do not have class ConsumerStrategies) . The API doc > package names also have {code}streaming.kafka{code} as opposed to > {code}streaming.kafka10{code} > as in the code examples on streaming-kafka-0-10-integration.html. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Issue Comment Deleted] (SPARK-19617) Fix the race condition when starting and stopping a query quickly
[ https://issues.apache.org/jira/browse/SPARK-19617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-19617: - Comment: was deleted (was: User 'gf53520' has created a pull request for this issue: https://github.com/apache/spark/pull/16980) > Fix the race condition when starting and stopping a query quickly > - > > Key: SPARK-19617 > URL: https://issues.apache.org/jira/browse/SPARK-19617 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.0.2, 2.1.0 >Reporter: Shixiong Zhu >Assignee: Shixiong Zhu > Fix For: 2.1.1, 2.2.0 > > > The streaming thread in StreamExecution uses the following ways to check if > it should exit: > - Catch an InterruptException. > - `StreamExecution.state` is TERMINATED. > when starting and stopping a query quickly, the above two checks may both > fail. > - Hit [HADOOP-14084|https://issues.apache.org/jira/browse/HADOOP-14084] and > swallow InterruptException > - StreamExecution.stop is called before `state` becomes `ACTIVE`. Then > [runBatches|https://github.com/apache/spark/blob/dcc2d540a53f0bd04baead43fdee1c170ef2b9f3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L252] > changes the state from `TERMINATED` to `ACTIVE`. > If the above cases both happen, the query will hang forever. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-19645) structured streaming job restart bug
[ https://issues.apache.org/jira/browse/SPARK-19645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu resolved SPARK-19645. -- Resolution: Duplicate > structured streaming job restart bug > > > Key: SPARK-19645 > URL: https://issues.apache.org/jira/browse/SPARK-19645 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.1.0 >Reporter: Feng Gui >Priority: Critical > > We are trying to use Structured Streaming in product, however currently > there exists a bug refer to the process of streaming job restart. > The following is the concrete error message: > {quote} >Caused by: java.lang.IllegalStateException: Error committing version 2 > into HDFSStateStore[id = (op=0, part=136), dir = > /tmp/Pipeline_112346-continueagg-bxaxs/state/0/136] > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.commit(HDFSBackedStateStoreProvider.scala:162) > at > org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3.apply(StatefulAggregate.scala:173) > at > org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3.apply(StatefulAggregate.scala:123) > at > org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:64) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:99) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.IOException: Failed to rename > /tmp/Pipeline_112346-continueagg-bxaxs/state/0/136/temp--5345709896617324284 > to /tmp/Pipeline_112346-continueagg-bxaxs/state/0/136/2.delta > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$commitUpdates(HDFSBackedStateStoreProvider.scala:259) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.commit(HDFSBackedStateStoreProvider.scala:156) > ... 14 more > {quote} > The bug can be easily reproduced, just modify {color:red} val > metadataRoot = "hdfs://localhost:8020/tmp/checkpoint" {color} in StreamTest > and then run the test {color:red} sort after aggregate in complete mode > {color} in StreamingAggregationSuite. The main reason is that when restart > streaming job spark will recompute WAL offsets and generate the same hdfs > delta file(latest delta file generated before restart and named > "currentBatchId.delta") . In my opinion, this is a bug. If you guy consider > that this is a bug also, I can fix it. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-19677) HDFSBackedStateStoreProvider fails to overwrite existing file
[ https://issues.apache.org/jira/browse/SPARK-19677?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-19677: - Affects Version/s: 2.0.0 2.0.1 2.0.2 > HDFSBackedStateStoreProvider fails to overwrite existing file > - > > Key: SPARK-19677 > URL: https://issues.apache.org/jira/browse/SPARK-19677 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.0.0, 2.0.1, 2.0.2, 2.1.0 >Reporter: Roberto Agostino Vitillo >Assignee: Roberto Agostino Vitillo >Priority: Critical > Fix For: 2.0.3, 2.1.1, 2.2.0 > > > I got the exception below after restarting a crashed Structured Streaming > application. This seems to be due to the fact that > {{/tmp/checkpoint/state/0/0/214451.delta}} already exists in HDFS. > {code} > 17/02/20 14:14:26 ERROR StreamExecution: Query [id = > 5023231c-2433-4013-a8b9-d54bb5751445, runId = > 4168cf31-7d0b-4435-9b58-28919abd937b] terminated with error > org.apache.spark.SparkException: Job aborted. > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:147) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:121) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:121) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:121) > at > org.apache.spark.sql.execution.streaming.FileStreamSink.addBatch(FileStreamSink.scala:78) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply$mcV$sp(StreamExecution.scala:503) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply(StreamExecution.scala:503) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply(StreamExecution.scala:503) > at > org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:262) > at > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:46) > at > org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:502) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply$mcV$sp(StreamExecution.scala:255) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:244) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:244) > at > org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:262) > at > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:46) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:244) > at > org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43) > at > org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:239) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:177) > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in > stage 1.0 (TID 100, localhost, executor driver): > org.apache.spark.SparkException: Task failed while writing rows > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:204) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:129) > at > org.apache.spark.sql.
[jira] [Resolved] (SPARK-19677) HDFSBackedStateStoreProvider fails to overwrite existing file
[ https://issues.apache.org/jira/browse/SPARK-19677?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu resolved SPARK-19677. -- Resolution: Fixed Assignee: Roberto Agostino Vitillo Fix Version/s: 2.2.0 2.1.1 2.0.3 > HDFSBackedStateStoreProvider fails to overwrite existing file > - > > Key: SPARK-19677 > URL: https://issues.apache.org/jira/browse/SPARK-19677 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.0.0, 2.0.1, 2.0.2, 2.1.0 >Reporter: Roberto Agostino Vitillo >Assignee: Roberto Agostino Vitillo >Priority: Critical > Fix For: 2.0.3, 2.1.1, 2.2.0 > > > I got the exception below after restarting a crashed Structured Streaming > application. This seems to be due to the fact that > {{/tmp/checkpoint/state/0/0/214451.delta}} already exists in HDFS. > {code} > 17/02/20 14:14:26 ERROR StreamExecution: Query [id = > 5023231c-2433-4013-a8b9-d54bb5751445, runId = > 4168cf31-7d0b-4435-9b58-28919abd937b] terminated with error > org.apache.spark.SparkException: Job aborted. > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:147) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:121) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:121) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:121) > at > org.apache.spark.sql.execution.streaming.FileStreamSink.addBatch(FileStreamSink.scala:78) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply$mcV$sp(StreamExecution.scala:503) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply(StreamExecution.scala:503) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply(StreamExecution.scala:503) > at > org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:262) > at > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:46) > at > org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:502) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply$mcV$sp(StreamExecution.scala:255) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:244) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:244) > at > org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:262) > at > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:46) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:244) > at > org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43) > at > org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:239) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:177) > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in > stage 1.0 (TID 100, localhost, executor driver): > org.apache.spark.SparkException: Task failed while writing rows > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:204) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFo
[jira] [Resolved] (SPARK-19768) Error for both aggregate and non-aggregate queries in Structured Streaming - "This query does not support recovering from checkpoint location"
[ https://issues.apache.org/jira/browse/SPARK-19768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu resolved SPARK-19768. -- Resolution: Not A Bug > Error for both aggregate and non-aggregate queries in Structured Streaming > - "This query does not support recovering from checkpoint location" > - > > Key: SPARK-19768 > URL: https://issues.apache.org/jira/browse/SPARK-19768 > Project: Spark > Issue Type: Question > Components: Structured Streaming >Affects Versions: 2.1.0 >Reporter: Amit Baghel > > I am running JavaStructuredKafkaWordCount.java example with > checkpointLocation. Output mode is "complete". Below is relevant code. > {code} > // Generate running word count > Dataset wordCounts = lines.flatMap(new FlatMapFunction String>() { > @Override > public Iterator call(String x) { > return Arrays.asList(x.split(" ")).iterator(); > } > }, Encoders.STRING()).groupBy("value").count(); > // Start running the query that prints the running counts to the console > StreamingQuery query = wordCounts.writeStream() > .outputMode("complete") > .format("console") > .option("checkpointLocation", "/tmp/checkpoint-data") > .start(); > {code} > This example runs successfully and writes data in checkpoint directory. When > I re-run the program it throws below exception > {code} > Exception in thread "main" org.apache.spark.sql.AnalysisException: This query > does not support recovering from checkpoint location. Delete > /tmp/checkpoint-data/offsets to start over.; > at > org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:219) > at > org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:269) > at > org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:262) > at > com.spark.JavaStructuredKafkaWordCount.main(JavaStructuredKafkaWordCount.java:85) > {code} > Then I modified JavaStructuredKafkaWordCount.java to have non aggregate query > with output mode as "append". Please see the code below. > {code} > // no aggregations > Dataset wordCounts = lines.flatMap(new FlatMapFunction String>() { > @Override > public Iterator call(String x) { > return Arrays.asList(x.split(" ")).iterator(); > } > }, Encoders.STRING()).select("value"); > // append mode with console > StreamingQuery query = wordCounts.writeStream() > .outputMode("append") > .format("console") > .option("checkpointLocation", "/tmp/checkpoint-data") > .start(); > {code} > This modified code runs successfully and writes data in checkpoint directory. > When I re-run the program it throws same exception > {code} > Exception in thread "main" org.apache.spark.sql.AnalysisException: This query > does not support recovering from checkpoint location. Delete > /tmp/checkpoint-data/offsets to start over.; > at > org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:219) > at > org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:269) > at > org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:262) > at > com.spark.JavaStructuredKafkaWordCount.main(JavaStructuredKafkaWordCount.java:85) > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19768) Error for both aggregate and non-aggregate queries in Structured Streaming - "This query does not support recovering from checkpoint location"
[ https://issues.apache.org/jira/browse/SPARK-19768?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15888632#comment-15888632 ] Shixiong Zhu commented on SPARK-19768: -- You are using the console sink which doesn't support recovery. You can use a file sink if you want to test checkpointing. > Error for both aggregate and non-aggregate queries in Structured Streaming > - "This query does not support recovering from checkpoint location" > - > > Key: SPARK-19768 > URL: https://issues.apache.org/jira/browse/SPARK-19768 > Project: Spark > Issue Type: Question > Components: Structured Streaming >Affects Versions: 2.1.0 >Reporter: Amit Baghel > > I am running JavaStructuredKafkaWordCount.java example with > checkpointLocation. Output mode is "complete". Below is relevant code. > {code} > // Generate running word count > Dataset wordCounts = lines.flatMap(new FlatMapFunction String>() { > @Override > public Iterator call(String x) { > return Arrays.asList(x.split(" ")).iterator(); > } > }, Encoders.STRING()).groupBy("value").count(); > // Start running the query that prints the running counts to the console > StreamingQuery query = wordCounts.writeStream() > .outputMode("complete") > .format("console") > .option("checkpointLocation", "/tmp/checkpoint-data") > .start(); > {code} > This example runs successfully and writes data in checkpoint directory. When > I re-run the program it throws below exception > {code} > Exception in thread "main" org.apache.spark.sql.AnalysisException: This query > does not support recovering from checkpoint location. Delete > /tmp/checkpoint-data/offsets to start over.; > at > org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:219) > at > org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:269) > at > org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:262) > at > com.spark.JavaStructuredKafkaWordCount.main(JavaStructuredKafkaWordCount.java:85) > {code} > Then I modified JavaStructuredKafkaWordCount.java to have non aggregate query > with output mode as "append". Please see the code below. > {code} > // no aggregations > Dataset wordCounts = lines.flatMap(new FlatMapFunction String>() { > @Override > public Iterator call(String x) { > return Arrays.asList(x.split(" ")).iterator(); > } > }, Encoders.STRING()).select("value"); > // append mode with console > StreamingQuery query = wordCounts.writeStream() > .outputMode("append") > .format("console") > .option("checkpointLocation", "/tmp/checkpoint-data") > .start(); > {code} > This modified code runs successfully and writes data in checkpoint directory. > When I re-run the program it throws same exception > {code} > Exception in thread "main" org.apache.spark.sql.AnalysisException: This query > does not support recovering from checkpoint location. Delete > /tmp/checkpoint-data/offsets to start over.; > at > org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:219) > at > org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:269) > at > org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:262) > at > com.spark.JavaStructuredKafkaWordCount.main(JavaStructuredKafkaWordCount.java:85) > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-19768) Error for both aggregate and non-aggregate queries in Structured Streaming - "This query does not support recovering from checkpoint location"
[ https://issues.apache.org/jira/browse/SPARK-19768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-19768: - Issue Type: Question (was: Bug) > Error for both aggregate and non-aggregate queries in Structured Streaming > - "This query does not support recovering from checkpoint location" > - > > Key: SPARK-19768 > URL: https://issues.apache.org/jira/browse/SPARK-19768 > Project: Spark > Issue Type: Question > Components: Structured Streaming >Affects Versions: 2.1.0 >Reporter: Amit Baghel > > I am running JavaStructuredKafkaWordCount.java example with > checkpointLocation. Output mode is "complete". Below is relevant code. > {code} > // Generate running word count > Dataset wordCounts = lines.flatMap(new FlatMapFunction String>() { > @Override > public Iterator call(String x) { > return Arrays.asList(x.split(" ")).iterator(); > } > }, Encoders.STRING()).groupBy("value").count(); > // Start running the query that prints the running counts to the console > StreamingQuery query = wordCounts.writeStream() > .outputMode("complete") > .format("console") > .option("checkpointLocation", "/tmp/checkpoint-data") > .start(); > {code} > This example runs successfully and writes data in checkpoint directory. When > I re-run the program it throws below exception > {code} > Exception in thread "main" org.apache.spark.sql.AnalysisException: This query > does not support recovering from checkpoint location. Delete > /tmp/checkpoint-data/offsets to start over.; > at > org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:219) > at > org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:269) > at > org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:262) > at > com.spark.JavaStructuredKafkaWordCount.main(JavaStructuredKafkaWordCount.java:85) > {code} > Then I modified JavaStructuredKafkaWordCount.java to have non aggregate query > with output mode as "append". Please see the code below. > {code} > // no aggregations > Dataset wordCounts = lines.flatMap(new FlatMapFunction String>() { > @Override > public Iterator call(String x) { > return Arrays.asList(x.split(" ")).iterator(); > } > }, Encoders.STRING()).select("value"); > // append mode with console > StreamingQuery query = wordCounts.writeStream() > .outputMode("append") > .format("console") > .option("checkpointLocation", "/tmp/checkpoint-data") > .start(); > {code} > This modified code runs successfully and writes data in checkpoint directory. > When I re-run the program it throws same exception > {code} > Exception in thread "main" org.apache.spark.sql.AnalysisException: This query > does not support recovering from checkpoint location. Delete > /tmp/checkpoint-data/offsets to start over.; > at > org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:219) > at > org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:269) > at > org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:262) > at > com.spark.JavaStructuredKafkaWordCount.main(JavaStructuredKafkaWordCount.java:85) > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19738) Consider adding error handler to DataStreamWriter
[ https://issues.apache.org/jira/browse/SPARK-19738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15887092#comment-15887092 ] Shixiong Zhu commented on SPARK-19738: -- [~gaaldornick] could you check if SPARK-18699 is enough? It will put all bad records to a column specified by the user. > Consider adding error handler to DataStreamWriter > - > > Key: SPARK-19738 > URL: https://issues.apache.org/jira/browse/SPARK-19738 > Project: Spark > Issue Type: Improvement > Components: SQL, Structured Streaming >Affects Versions: 2.1.0 >Reporter: Jayesh lalwani > > For Structured streaming implementations, it is important that the > applications stay always On. However, right now, errors stop the driver. In > some cases, this is not desirable behavior. For example, I have the following > application > {code} > import org.apache.spark.sql.types._ > val userSchema = new StructType().add("name", "string").add("age", "integer") > val csvDF = > spark.readStream.schema(userSchema).csv("s3://bucket/jayesh/streamingerror/") > csvDF.writeStream.format("console").start() > {code} > I send the following input to it > {quote} > 1,Iron man > 2,SUperman > {quote} > Obviously, the data is bad. This causes the executor to throw an exception > that propogates to the driver, which promptly shuts down. The driver is > running in supervised mode, and it gets restarted. The application reads the > same bad input and shuts down again. This goes ad-infinitum. This behavior is > desirable, in cases, the error is recoverable. For example, if the executor > cannot talk to the database, we want the application to keep trying the same > input again and again till the database recovers. However, for some cases, > this behavior is undesirable. We do not want this to happen when the input is > bad. We want to put the bad record in some sort of dead letter queue. Or > maybe we want to kill the driver only when the number of errors have crossed > a certain threshold. Or maybe we want to email someone. > Proposal: > Add a error handler to the data stream. When the executor fails, it should > call the error handler and pass the Exception to the error handler. The error > handler could eat the exception, or transform it, or update counts in an > accumulator, etc > {code} > import org.apache.spark.sql.types._ > val userSchema = new StructType().add("name", "string").add("age", "integer") > val csvDF = > spark.readStream.schema(userSchema).csv("s3://bucket/jayesh/streamingerror/") > csvDF.writeStream.format("console").errorhandler("com.jayesh.ErrorHandler").start() > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-19738) Consider adding error handler to DataStreamWriter
[ https://issues.apache.org/jira/browse/SPARK-19738?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-19738: - Component/s: SQL > Consider adding error handler to DataStreamWriter > - > > Key: SPARK-19738 > URL: https://issues.apache.org/jira/browse/SPARK-19738 > Project: Spark > Issue Type: Improvement > Components: SQL, Structured Streaming >Affects Versions: 2.1.0 >Reporter: Jayesh lalwani > > For Structured streaming implementations, it is important that the > applications stay always On. However, right now, errors stop the driver. In > some cases, this is not desirable behavior. For example, I have the following > application > {code} > import org.apache.spark.sql.types._ > val userSchema = new StructType().add("name", "string").add("age", "integer") > val csvDF = > spark.readStream.schema(userSchema).csv("s3://bucket/jayesh/streamingerror/") > csvDF.writeStream.format("console").start() > {code} > I send the following input to it > {quote} > 1,Iron man > 2,SUperman > {quote} > Obviously, the data is bad. This causes the executor to throw an exception > that propogates to the driver, which promptly shuts down. The driver is > running in supervised mode, and it gets restarted. The application reads the > same bad input and shuts down again. This goes ad-infinitum. This behavior is > desirable, in cases, the error is recoverable. For example, if the executor > cannot talk to the database, we want the application to keep trying the same > input again and again till the database recovers. However, for some cases, > this behavior is undesirable. We do not want this to happen when the input is > bad. We want to put the bad record in some sort of dead letter queue. Or > maybe we want to kill the driver only when the number of errors have crossed > a certain threshold. Or maybe we want to email someone. > Proposal: > Add a error handler to the data stream. When the executor fails, it should > call the error handler and pass the Exception to the error handler. The error > handler could eat the exception, or transform it, or update counts in an > accumulator, etc > {code} > import org.apache.spark.sql.types._ > val userSchema = new StructType().add("name", "string").add("age", "integer") > val csvDF = > spark.readStream.schema(userSchema).csv("s3://bucket/jayesh/streamingerror/") > csvDF.writeStream.format("console").errorhandler("com.jayesh.ErrorHandler").start() > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-19751) Create Data frame API fails with a self referencing bean
[ https://issues.apache.org/jira/browse/SPARK-19751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-19751: - Component/s: (was: Spark Core) SQL > Create Data frame API fails with a self referencing bean > > > Key: SPARK-19751 > URL: https://issues.apache.org/jira/browse/SPARK-19751 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 >Reporter: Avinash Venkateshaiah >Priority: Minor > > createDataset API throws a stack overflow exception when we try creating a > Dataset using a bean encoder. The bean is self referencing > BEAN: > public class HierObj implements Serializable { > String name; > List children; > public String getName() { > return name; > } > public void setName(String name) { > this.name = name; > } > public List getChildren() { > return children; > } > public void setChildren(List children) { > this.children = children; > } > } > // create an object > HierObj hierObj = new HierObj(); > hierObj.setName("parent"); > List children = new ArrayList(); > HierObj child1 = new HierObj(); > child1.setName("child1"); > HierObj child2 = new HierObj(); > child2.setName("child2"); > children.add(child1); > children.add(child2); > hierObj.setChildren(children); > // create a dataset > Dataset ds = sparkSession().createDataset(Arrays.asList(hierObj), > Encoders.bean(HierObj.class)); -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-19749) Name socket source with a meaningful name
[ https://issues.apache.org/jira/browse/SPARK-19749?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu resolved SPARK-19749. -- Resolution: Fixed Fix Version/s: 2.2.0 > Name socket source with a meaningful name > - > > Key: SPARK-19749 > URL: https://issues.apache.org/jira/browse/SPARK-19749 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.0.2, 2.1.0 >Reporter: Genmao Yu >Assignee: Genmao Yu >Priority: Trivial > Fix For: 2.2.0 > > -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-19594) StreamingQueryListener fails to handle QueryTerminatedEvent if more then one listeners exists
[ https://issues.apache.org/jira/browse/SPARK-19594?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu reassigned SPARK-19594: Assignee: Eyal Zituny > StreamingQueryListener fails to handle QueryTerminatedEvent if more then one > listeners exists > - > > Key: SPARK-19594 > URL: https://issues.apache.org/jira/browse/SPARK-19594 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.1.0 >Reporter: Eyal Zituny >Assignee: Eyal Zituny >Priority: Minor > Fix For: 2.1.1, 2.2.0 > > > reproduce: > *create a spark session > *add multiple streaming query listeners > *create a simple query > *stop the query > result -> only the first listener handle the QueryTerminatedEvent > this might happen because the query run id is being removed from > activeQueryRunIds once the onQueryTerminated is called > (StreamingQueryListenerBus:115) -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-19594) StreamingQueryListener fails to handle QueryTerminatedEvent if more then one listeners exists
[ https://issues.apache.org/jira/browse/SPARK-19594?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu resolved SPARK-19594. -- Resolution: Fixed Fix Version/s: 2.2.0 2.1.1 > StreamingQueryListener fails to handle QueryTerminatedEvent if more then one > listeners exists > - > > Key: SPARK-19594 > URL: https://issues.apache.org/jira/browse/SPARK-19594 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.1.0 >Reporter: Eyal Zituny >Priority: Minor > Fix For: 2.1.1, 2.2.0 > > > reproduce: > *create a spark session > *add multiple streaming query listeners > *create a simple query > *stop the query > result -> only the first listener handle the QueryTerminatedEvent > this might happen because the query run id is being removed from > activeQueryRunIds once the onQueryTerminated is called > (StreamingQueryListenerBus:115) -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-19718) Fix flaky test: org.apache.spark.sql.kafka010.KafkaSourceStressForDontFailOnDataLossSuite: stress test for failOnDataLoss=false
[ https://issues.apache.org/jira/browse/SPARK-19718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-19718: - Description: SPARK-19617 changed HDFSMetadataLog to enable interrupts when using the local file system. However, now we hit HADOOP-12074: `Shell.runCommand` converts `InterruptedException` to `new IOException(ie.toString())` before Hadoop 2.8. Test failure: https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.6/2504/consoleFull {code} [info] - stress test for failOnDataLoss=false *** FAILED *** (1 minute, 1 second) [info] org.apache.spark.sql.streaming.StreamingQueryException: Query [id = 27d45f4f-14dc-4c74-8b52-4bbd4f2b9bec, runId = 23b8c1ea-4da9-4096-967a-692933e4b319] terminated with exception: java.lang.InterruptedException [info] at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:304) [info] at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:190) [info] Cause: java.io.IOException: java.lang.InterruptedException [info] at org.apache.hadoop.util.Shell.runCommand(Shell.java:578) [info] at org.apache.hadoop.util.Shell.run(Shell.java:478) [info] at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:766) [info] at org.apache.hadoop.util.Shell.execCommand(Shell.java:859) [info] at org.apache.hadoop.util.Shell.execCommand(Shell.java:842) [info] at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:661) [info] at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:300) [info] at org.apache.hadoop.fs.FileSystem.primitiveCreate(FileSystem.java:1014) [info] at org.apache.hadoop.fs.DelegateToFileSystem.createInternal(DelegateToFileSystem.java:85) [info] at org.apache.hadoop.fs.ChecksumFs$ChecksumFSOutputSummer.(ChecksumFs.java:354) [info] at org.apache.hadoop.fs.ChecksumFs.createInternal(ChecksumFs.java:394) [info] at org.apache.hadoop.fs.AbstractFileSystem.create(AbstractFileSystem.java:577) [info] at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:680) [info] at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:676) [info] at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90) [info] at org.apache.hadoop.fs.FileContext.create(FileContext.java:676) {code} > Fix flaky test: > org.apache.spark.sql.kafka010.KafkaSourceStressForDontFailOnDataLossSuite: > stress test for failOnDataLoss=false > --- > > Key: SPARK-19718 > URL: https://issues.apache.org/jira/browse/SPARK-19718 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Shixiong Zhu > > SPARK-19617 changed HDFSMetadataLog to enable interrupts when using the local > file system. However, now we hit HADOOP-12074: `Shell.runCommand` converts > `InterruptedException` to `new IOException(ie.toString())` before Hadoop 2.8. > Test failure: > https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.6/2504/consoleFull > {code} > [info] - stress test for failOnDataLoss=false *** FAILED *** (1 minute, 1 > second) > [info] org.apache.spark.sql.streaming.StreamingQueryException: Query [id = > 27d45f4f-14dc-4c74-8b52-4bbd4f2b9bec, runId = > 23b8c1ea-4da9-4096-967a-692933e4b319] terminated with exception: > java.lang.InterruptedException > [info] at > org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:304) > [info] at > org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:190) > [info] Cause: java.io.IOException: java.lang.InterruptedException > [info] at org.apache.hadoop.util.Shell.runCommand(Shell.java:578) > [info] at org.apache.hadoop.util.Shell.run(Shell.java:478) > [info] at > org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:766) > [info] at org.apache.hadoop.util.Shell.execCommand(Shell.java:859) > [info] at org.apache.hadoop.util.Shell.execCommand(Shell.java:842) > [info] at > org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:661) > [info] at > org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:300) > [info] at > org.apache.hadoop.fs.FileSystem.primitiveCreate(FileSystem.java:1014) > [info] at > org.apache.hadoop.fs.DelegateToFileSystem.createInternal(DelegateToFileSystem.java:85) > [info] at > org.apache.hadoop.fs.ChecksumFs$ChecksumFSOutputSummer.(ChecksumFs.java:354) > [info] at > org.apache.hadoop.fs.ChecksumFs.createInternal(ChecksumFs.jav
[jira] [Created] (SPARK-19718) Fix flaky test: org.apache.spark.sql.kafka010.KafkaSourceStressForDontFailOnDataLossSuite: stress test for failOnDataLoss=false
Shixiong Zhu created SPARK-19718: Summary: Fix flaky test: org.apache.spark.sql.kafka010.KafkaSourceStressForDontFailOnDataLossSuite: stress test for failOnDataLoss=false Key: SPARK-19718 URL: https://issues.apache.org/jira/browse/SPARK-19718 Project: Spark Issue Type: Bug Components: Structured Streaming Affects Versions: 2.2.0 Reporter: Shixiong Zhu -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19644) Memory leak in Spark Streaming
[ https://issues.apache.org/jira/browse/SPARK-19644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15879397#comment-15879397 ] Shixiong Zhu commented on SPARK-19644: -- [~deenbandhu] Do you use Scala 2.10 or Scala 2.11? > Memory leak in Spark Streaming > -- > > Key: SPARK-19644 > URL: https://issues.apache.org/jira/browse/SPARK-19644 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 2.0.2 > Environment: 3 AWS EC2 c3.xLarge > Number of cores - 3 > Number of executors 3 > Memory to each executor 2GB >Reporter: Deenbandhu Agarwal >Priority: Critical > Labels: memory_leak, performance > Attachments: Dominator_tree.png, heapdump.png, Path2GCRoot.png > > > I am using streaming on the production for some aggregation and fetching data > from cassandra and saving data back to cassandra. > I see a gradual increase in old generation heap capacity from 1161216 Bytes > to 1397760 Bytes over a period of six hours. > After 50 hours of processing instances of class > scala.collection.immutable.$colon$colon incresed to 12,811,793 which is a > huge number. > I think this is a clear case of memory leak -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-19617) Fix the race condition when starting and stopping a query quickly
[ https://issues.apache.org/jira/browse/SPARK-19617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu resolved SPARK-19617. -- Resolution: Fixed Fix Version/s: 2.1.1 > Fix the race condition when starting and stopping a query quickly > - > > Key: SPARK-19617 > URL: https://issues.apache.org/jira/browse/SPARK-19617 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.0.2, 2.1.0 >Reporter: Shixiong Zhu >Assignee: Shixiong Zhu > Fix For: 2.1.1, 2.2.0 > > > The streaming thread in StreamExecution uses the following ways to check if > it should exit: > - Catch an InterruptException. > - `StreamExecution.state` is TERMINATED. > when starting and stopping a query quickly, the above two checks may both > fail. > - Hit [HADOOP-14084|https://issues.apache.org/jira/browse/HADOOP-14084] and > swallow InterruptException > - StreamExecution.stop is called before `state` becomes `ACTIVE`. Then > [runBatches|https://github.com/apache/spark/blob/dcc2d540a53f0bd04baead43fdee1c170ef2b9f3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L252] > changes the state from `TERMINATED` to `ACTIVE`. > If the above cases both happen, the query will hang forever. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19675) ExecutorClassLoader loads classes from SystemClassLoader
[ https://issues.apache.org/jira/browse/SPARK-19675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15877276#comment-15877276 ] Shixiong Zhu commented on SPARK-19675: -- Just to clarify one thing: Executors in your case will run in a separate process and know nothing about the magic class loader in the driver. > ExecutorClassLoader loads classes from SystemClassLoader > > > Key: SPARK-19675 > URL: https://issues.apache.org/jira/browse/SPARK-19675 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.0, 2.2.0 > Environment: sbt / Play Framework >Reporter: Kohki Nishio >Priority: Minor > > Spark Executor loads classes from SystemClassLoader which contains > sbt-launch.jar and it contains Scala2.10 binary, however Spark itself is > built on Scala2.11, thus it's throwing InvalidClassException > java.io.InvalidClassException: scala.Option; local class incompatible: stream > classdesc serialVersionUID = -114498752079829388, local class > serialVersionUID = 5081326844987135632 > at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:616) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1630) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1630) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) > ExecutorClassLoader's desired class loder (parentLoader) actually contains > the correct path (scala-library-2.11.8.jar) but it is not being used. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-19675) ExecutorClassLoader loads classes from SystemClassLoader
[ https://issues.apache.org/jira/browse/SPARK-19675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15877262#comment-15877262 ] Shixiong Zhu edited comment on SPARK-19675 at 2/22/17 1:59 AM: --- [~taroplus] ExecutorClassLoader does try to load from its parent classloader: https://github.com/apache/spark/blob/master/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala#L77 The issue you reported is that it should load classes from the remote driver instead of the current parent classloader. Right? was (Author: zsxwing): [~taroplus] ExecutorClassLoader does try to load from its parent classloader: https://github.com/apache/spark/blob/master/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala#L77 The issue you reported is that it should load from the remote driver instead of the current parent classloader. Right? > ExecutorClassLoader loads classes from SystemClassLoader > > > Key: SPARK-19675 > URL: https://issues.apache.org/jira/browse/SPARK-19675 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.0, 2.2.0 > Environment: sbt / Play Framework >Reporter: Kohki Nishio >Priority: Minor > > Spark Executor loads classes from SystemClassLoader which contains > sbt-launch.jar and it contains Scala2.10 binary, however Spark itself is > built on Scala2.11, thus it's throwing InvalidClassException > java.io.InvalidClassException: scala.Option; local class incompatible: stream > classdesc serialVersionUID = -114498752079829388, local class > serialVersionUID = 5081326844987135632 > at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:616) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1630) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1630) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) > ExecutorClassLoader's desired class loder (parentLoader) actually contains > the correct path (scala-library-2.11.8.jar) but it is not being used. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19675) ExecutorClassLoader loads classes from SystemClassLoader
[ https://issues.apache.org/jira/browse/SPARK-19675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15877262#comment-15877262 ] Shixiong Zhu commented on SPARK-19675: -- [~taroplus] ExecutorClassLoader does try to load from its parent classloader: https://github.com/apache/spark/blob/master/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala#L77 The issue you reported is that it should load from the remote driver instead of the current parent classloader. Right? > ExecutorClassLoader loads classes from SystemClassLoader > > > Key: SPARK-19675 > URL: https://issues.apache.org/jira/browse/SPARK-19675 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.0, 2.2.0 > Environment: sbt / Play Framework >Reporter: Kohki Nishio >Priority: Minor > > Spark Executor loads classes from SystemClassLoader which contains > sbt-launch.jar and it contains Scala2.10 binary, however Spark itself is > built on Scala2.11, thus it's throwing InvalidClassException > java.io.InvalidClassException: scala.Option; local class incompatible: stream > classdesc serialVersionUID = -114498752079829388, local class > serialVersionUID = 5081326844987135632 > at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:616) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1630) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1630) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) > ExecutorClassLoader's desired class loder (parentLoader) actually contains > the correct path (scala-library-2.11.8.jar) but it is not being used. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-19675) ExecutorClassLoader loads classes from SystemClassLoader
[ https://issues.apache.org/jira/browse/SPARK-19675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15877255#comment-15877255 ] Shixiong Zhu edited comment on SPARK-19675 at 2/22/17 1:53 AM: --- [~taroplus] Yeah, I should have checked `sbt run` with myself. Looks like it does use some class loader magic to support multiple Scala versions. However, I don't see how to do this in Spark. In general, the recommended way to run Spark application is using spark-submit rather than `sbt run`. I checked the local mode and it works. So the issue happens when launching executor processes in local-cluster, client and cluster modes. There are couples problems to support `sbt run`. E.g., - How to know if the driver is using `sbt run`? - How to launch a new executor process with the SBT ClassLoader automatically in a remote node? The remote node may not have SBT installed. This is really a low priority feature. Welcome to reopen this ticket and submit a PR if you have time to work on this. was (Author: zsxwing): [~taroplus] Yeah, I should have checked `sbt run` with myself. Looks like it does use some class loader magic to support multiple Scala versions. However, I don't see how to do this in Spark. In general, the recommended way to run Spark application is using spark-submit rather than `sbt run`. I checked the local mode and it works. So the issue happens when launching executor processes in local-cluster, client and cluster modes. There are couples problems to support `sbt run`. E.g., - How to know if the driver is using `sbt run`? - How to launch a new executor process with the SBT ClassLoader automatically in a remote node? The remote node may not have SBT installed. This is really a low priority feature. Welcome to submit a PR if you have time to work on this. > ExecutorClassLoader loads classes from SystemClassLoader > > > Key: SPARK-19675 > URL: https://issues.apache.org/jira/browse/SPARK-19675 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.0, 2.2.0 > Environment: sbt / Play Framework >Reporter: Kohki Nishio >Priority: Minor > > Spark Executor loads classes from SystemClassLoader which contains > sbt-launch.jar and it contains Scala2.10 binary, however Spark itself is > built on Scala2.11, thus it's throwing InvalidClassException > java.io.InvalidClassException: scala.Option; local class incompatible: stream > classdesc serialVersionUID = -114498752079829388, local class > serialVersionUID = 5081326844987135632 > at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:616) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1630) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1630) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) > ExecutorClassLoader's desired class loder (parentLoader) actually contains > the correct path (scala-library-2.11.8.jar) but it is not being used. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-19675) ExecutorClassLoader loads classes from SystemClassLoader
[ https://issues.apache.org/jira/browse/SPARK-19675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-19675: - Issue Type: Improvement (was: Bug) > ExecutorClassLoader loads classes from SystemClassLoader > > > Key: SPARK-19675 > URL: https://issues.apache.org/jira/browse/SPARK-19675 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.0, 2.2.0 > Environment: sbt / Play Framework >Reporter: Kohki Nishio >Priority: Minor > > Spark Executor loads classes from SystemClassLoader which contains > sbt-launch.jar and it contains Scala2.10 binary, however Spark itself is > built on Scala2.11, thus it's throwing InvalidClassException > java.io.InvalidClassException: scala.Option; local class incompatible: stream > classdesc serialVersionUID = -114498752079829388, local class > serialVersionUID = 5081326844987135632 > at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:616) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1630) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1630) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) > ExecutorClassLoader's desired class loder (parentLoader) actually contains > the correct path (scala-library-2.11.8.jar) but it is not being used. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19675) ExecutorClassLoader loads classes from SystemClassLoader
[ https://issues.apache.org/jira/browse/SPARK-19675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15877255#comment-15877255 ] Shixiong Zhu commented on SPARK-19675: -- [~taroplus] Yeah, I should have checked `sbt run` with myself. Looks like it does use some class loader magic to support multiple Scala versions. However, I don't see how to do this in Spark. In general, the recommended way to run Spark application is using spark-submit rather than `sbt run`. I checked the local mode and it works. So the issue happens when launching executor processes in local-cluster, client and cluster modes. There are couples problems to support `sbt run`. E.g., - How to know if the driver is using `sbt run`? - How to launch a new executor process with the SBT ClassLoader automatically in a remote node? The remote node may not have SBT installed. This is really a low priority feature. Welcome to submit a PR if you have time to work on this. > ExecutorClassLoader loads classes from SystemClassLoader > > > Key: SPARK-19675 > URL: https://issues.apache.org/jira/browse/SPARK-19675 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.0, 2.2.0 > Environment: sbt / Play Framework >Reporter: Kohki Nishio >Priority: Minor > > Spark Executor loads classes from SystemClassLoader which contains > sbt-launch.jar and it contains Scala2.10 binary, however Spark itself is > built on Scala2.11, thus it's throwing InvalidClassException > java.io.InvalidClassException: scala.Option; local class incompatible: stream > classdesc serialVersionUID = -114498752079829388, local class > serialVersionUID = 5081326844987135632 > at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:616) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1630) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1630) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) > ExecutorClassLoader's desired class loder (parentLoader) actually contains > the correct path (scala-library-2.11.8.jar) but it is not being used. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-19690) Join a streaming DataFrame with a batch DataFrame may not work
Shixiong Zhu created SPARK-19690: Summary: Join a streaming DataFrame with a batch DataFrame may not work Key: SPARK-19690 URL: https://issues.apache.org/jira/browse/SPARK-19690 Project: Spark Issue Type: Bug Components: Structured Streaming Affects Versions: 2.1.0, 2.0.3, 2.1.1 Reporter: Shixiong Zhu When joining a streaming DataFrame with a batch DataFrame, if the batch DataFrame has an aggregation, it will be converted to a streaming physical aggregation. Then the query will crash. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-19675) ExecutorClassLoader loads classes from SystemClassLoader
[ https://issues.apache.org/jira/browse/SPARK-19675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15876854#comment-15876854 ] Shixiong Zhu edited comment on SPARK-19675 at 2/21/17 10:17 PM: [~taroplus] If I understand correctly, SBT launches your application in a new JVM process *without any SBT codes*. But your Spark application requires to run Spark codes which is prebuilt with a specific Scala versions *before creating ExecutorClassLoader*. That's totally different. In your example, `Class.forName("scala.Option", false, executorLoader)` doesn't call `ExecutorClassLoader.findClass` because "scala.Option" is already loaded. {code} scala> :paste // Entering paste mode (ctrl-D to finish) import java.net._ import org.apache.spark._ import org.apache.spark.repl._ val desiredLoader = new URLClassLoader( Array(new URL("file:/tmp/scala-library-2.11.0.jar")), null) val executorLoader = new ExecutorClassLoader( new SparkConf(), null, "", desiredLoader, false) { override def findClass(name: String): Class[_] = { println("finding class: " + name) super.findClass(name) } } Class.forName("scala.Option", false, executorLoader).getClassLoader() // Exiting paste mode, now interpreting. import java.net._ import org.apache.spark._ import org.apache.spark.repl._ desiredLoader: java.net.URLClassLoader = java.net.URLClassLoader@37f41a81 executorLoader: org.apache.spark.repl.ExecutorClassLoader = $anon$1@1c3d9e28 res0: ClassLoader = sun.misc.Launcher$AppClassLoader@4d76f3f8 scala> {code} In the above example, you can see `findClass` is not called. was (Author: zsxwing): [~taroplus] If I understand correctly, SBT launches your application in a new JVM process *without any SBT codes*. But your Spark applications requires to run Spark codes which is prebuilt with a specific Scala versions *before creating ExecutorClassLoader*. That's totally different. In your example, `Class.forName("scala.Option", false, executorLoader)` doesn't call `ExecutorClassLoader.findClass` because "scala.Option" is already loaded. {code} scala> :paste // Entering paste mode (ctrl-D to finish) import java.net._ import org.apache.spark._ import org.apache.spark.repl._ val desiredLoader = new URLClassLoader( Array(new URL("file:/tmp/scala-library-2.11.0.jar")), null) val executorLoader = new ExecutorClassLoader( new SparkConf(), null, "", desiredLoader, false) { override def findClass(name: String): Class[_] = { println("finding class: " + name) super.findClass(name) } } Class.forName("scala.Option", false, executorLoader).getClassLoader() // Exiting paste mode, now interpreting. import java.net._ import org.apache.spark._ import org.apache.spark.repl._ desiredLoader: java.net.URLClassLoader = java.net.URLClassLoader@37f41a81 executorLoader: org.apache.spark.repl.ExecutorClassLoader = $anon$1@1c3d9e28 res0: ClassLoader = sun.misc.Launcher$AppClassLoader@4d76f3f8 scala> {code} In the above example, you can see `findClass` is not called. > ExecutorClassLoader loads classes from SystemClassLoader > > > Key: SPARK-19675 > URL: https://issues.apache.org/jira/browse/SPARK-19675 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.0, 2.2.0 > Environment: sbt / Play Framework >Reporter: Kohki Nishio >Priority: Minor > > Spark Executor loads classes from SystemClassLoader which contains > sbt-launch.jar and it contains Scala2.10 binary, however Spark itself is > built on Scala2.11, thus it's throwing InvalidClassException > java.io.InvalidClassException: scala.Option; local class incompatible: stream > classdesc serialVersionUID = -114498752079829388, local class > serialVersionUID = 5081326844987135632 > at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:616) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1630) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1630) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) > ExecutorClassLoader's desired class loder (parentLoader) actually contains >
[jira] [Commented] (SPARK-19675) ExecutorClassLoader loads classes from SystemClassLoader
[ https://issues.apache.org/jira/browse/SPARK-19675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15876854#comment-15876854 ] Shixiong Zhu commented on SPARK-19675: -- [~taroplus] If I understand correctly, SBT launches your application in a new JVM process *without any SBT codes*. But your Spark applications requires to run Spark codes which is prebuilt with a specific Scala versions *before creating ExecutorClassLoader*. That's totally different. In your example, `Class.forName("scala.Option", false, executorLoader)` doesn't call `ExecutorClassLoader.findClass` because "scala.Option" is already loaded. {code} scala> :paste // Entering paste mode (ctrl-D to finish) import java.net._ import org.apache.spark._ import org.apache.spark.repl._ val desiredLoader = new URLClassLoader( Array(new URL("file:/tmp/scala-library-2.11.0.jar")), null) val executorLoader = new ExecutorClassLoader( new SparkConf(), null, "", desiredLoader, false) { override def findClass(name: String): Class[_] = { println("finding class: " + name) super.findClass(name) } } Class.forName("scala.Option", false, executorLoader).getClassLoader() // Exiting paste mode, now interpreting. import java.net._ import org.apache.spark._ import org.apache.spark.repl._ desiredLoader: java.net.URLClassLoader = java.net.URLClassLoader@37f41a81 executorLoader: org.apache.spark.repl.ExecutorClassLoader = $anon$1@1c3d9e28 res0: ClassLoader = sun.misc.Launcher$AppClassLoader@4d76f3f8 scala> {code} In the above example, you can see `findClass` is not called. > ExecutorClassLoader loads classes from SystemClassLoader > > > Key: SPARK-19675 > URL: https://issues.apache.org/jira/browse/SPARK-19675 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.0, 2.2.0 > Environment: sbt / Play Framework >Reporter: Kohki Nishio >Priority: Minor > > Spark Executor loads classes from SystemClassLoader which contains > sbt-launch.jar and it contains Scala2.10 binary, however Spark itself is > built on Scala2.11, thus it's throwing InvalidClassException > java.io.InvalidClassException: scala.Option; local class incompatible: stream > classdesc serialVersionUID = -114498752079829388, local class > serialVersionUID = 5081326844987135632 > at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:616) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1630) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1630) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) > ExecutorClassLoader's desired class loder (parentLoader) actually contains > the correct path (scala-library-2.11.8.jar) but it is not being used. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Issue Comment Deleted] (SPARK-19617) Fix the race condition when starting and stopping a query quickly
[ https://issues.apache.org/jira/browse/SPARK-19617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-19617: - Comment: was deleted (was: User 'gf53520' has created a pull request for this issue: https://github.com/apache/spark/pull/16980) > Fix the race condition when starting and stopping a query quickly > - > > Key: SPARK-19617 > URL: https://issues.apache.org/jira/browse/SPARK-19617 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.0.2, 2.1.0 >Reporter: Shixiong Zhu >Assignee: Shixiong Zhu > Fix For: 2.2.0 > > > The streaming thread in StreamExecution uses the following ways to check if > it should exit: > - Catch an InterruptException. > - `StreamExecution.state` is TERMINATED. > when starting and stopping a query quickly, the above two checks may both > fail. > - Hit [HADOOP-14084|https://issues.apache.org/jira/browse/HADOOP-14084] and > swallow InterruptException > - StreamExecution.stop is called before `state` becomes `ACTIVE`. Then > [runBatches|https://github.com/apache/spark/blob/dcc2d540a53f0bd04baead43fdee1c170ef2b9f3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L252] > changes the state from `TERMINATED` to `ACTIVE`. > If the above cases both happen, the query will hang forever. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-19680) Offsets out of range with no configured reset policy for partitions
[ https://issues.apache.org/jira/browse/SPARK-19680?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-19680: - Component/s: (was: Structured Streaming) DStreams > Offsets out of range with no configured reset policy for partitions > --- > > Key: SPARK-19680 > URL: https://issues.apache.org/jira/browse/SPARK-19680 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 2.1.0 >Reporter: Schakmann Rene > > I'm using spark streaming with kafka to acutally create a toplist. I want to > read all the messages in kafka. So I set >"auto.offset.reset" -> "earliest" > Nevertheless when I start the job on our spark cluster it is not working I > get: > Error: > {code:title=error.log|borderStyle=solid} > Job aborted due to stage failure: Task 2 in stage 111.0 failed 4 times, > most recent failure: Lost task 2.3 in stage 111.0 (TID 1270, 194.232.55.23, > executor 2): org.apache.kafka.clients.consumer.OffsetOutOfRangeException: > Offsets out of range with no configured reset policy for partitions: > {SearchEvents-2=161803385} > {code} > This is somehow wrong because I did set the auto.offset.reset property > Setup: > Kafka Parameter: > {code:title=Config.Scala|borderStyle=solid} > def getDefaultKafkaReceiverParameter(properties: Properties):Map[String, > Object] = { > Map( > "bootstrap.servers" -> > properties.getProperty("kafka.bootstrap.servers"), > "group.id" -> properties.getProperty("kafka.consumer.group"), > "auto.offset.reset" -> "earliest", > "spark.streaming.kafka.consumer.cache.enabled" -> "false", > "enable.auto.commit" -> "false", > "key.deserializer" -> classOf[StringDeserializer], > "value.deserializer" -> "at.willhaben.sid.DTOByteDeserializer") > } > {code} > Job: > {code:title=Job.Scala|borderStyle=solid} > def processSearchKeyWords(stream: InputDStream[ConsumerRecord[String, > Array[Byte]]], windowDuration: Int, slideDuration: Int, kafkaSink: > Broadcast[KafkaSink[TopList]]): Unit = { > getFilteredStream(stream.map(_.value()), windowDuration, > slideDuration).foreachRDD(rdd => { > val topList = new TopList > topList.setCreated(new Date()) > topList.setTopListEntryList(rdd.take(TopListLength).toList) > CurrentLogger.info("TopList length: " + > topList.getTopListEntryList.size().toString) > kafkaSink.value.send(SendToTopicName, topList) > CurrentLogger.info("Last Run: " + System.currentTimeMillis()) > }) > } > def getFilteredStream(result: DStream[Array[Byte]], windowDuration: Int, > slideDuration: Int): DStream[TopListEntry] = { > val Mapper = MapperObject.readerFor[SearchEventDTO] > result.repartition(100).map(s => Mapper.readValue[SearchEventDTO](s)) > .filter(s => s != null && s.getSearchRequest != null && > s.getSearchRequest.getSearchParameters != null && s.getVertical == > Vertical.BAP && > s.getSearchRequest.getSearchParameters.containsKey(EspParameterEnum.KEYWORD.getName)) > .map(row => { > val name = > row.getSearchRequest.getSearchParameters.get(EspParameterEnum.KEYWORD.getName).getEspSearchParameterDTO.getValue.toLowerCase() > (name, new TopListEntry(name, 1, row.getResultCount)) > }) > .reduceByKeyAndWindow( > (a: TopListEntry, b: TopListEntry) => new TopListEntry(a.getKeyword, > a.getSearchCount + b.getSearchCount, a.getMeanSearchHits + > b.getMeanSearchHits), > (a: TopListEntry, b: TopListEntry) => new TopListEntry(a.getKeyword, > a.getSearchCount - b.getSearchCount, a.getMeanSearchHits - > b.getMeanSearchHits), > Minutes(windowDuration), > Seconds(slideDuration)) > .filter((x: (String, TopListEntry)) => x._2.getSearchCount > 200L) > .map(row => (row._2.getSearchCount, row._2)) > .transform(rdd => rdd.sortByKey(ascending = false)) > .map(row => new TopListEntry(row._2.getKeyword, row._2.getSearchCount, > row._2.getMeanSearchHits / row._2.getSearchCount)) > } > def main(properties: Properties): Unit = { > val sparkSession = SparkUtil.getDefaultSparkSession(properties, TaskName) > val kafkaSink = > sparkSession.sparkContext.broadcast(KafkaSinkUtil.apply[TopList](SparkUtil.getDefaultSparkProperties(properties))) > val kafkaParams: Map[String, Object] = > SparkUtil.getDefaultKafkaReceiverParameter(properties) > val ssc = new StreamingContext(sparkSession.sparkContext, Seconds(30)) > ssc.checkpoint("/home/spark/checkpoints") > val adEventStream = > KafkaUtils.createDirectStream[String, Array[Byte]](ssc, > PreferConsistent, Subscribe[String, Array[Byte]](Array(ReadFromTopicName), > kafkaParams)) > processSearchKeyWords(adEventStream, > SparkUtil.getWindow
[jira] [Updated] (SPARK-19497) dropDuplicates with watermark
[ https://issues.apache.org/jira/browse/SPARK-19497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-19497: - Labels: release_notes (was: ) > dropDuplicates with watermark > - > > Key: SPARK-19497 > URL: https://issues.apache.org/jira/browse/SPARK-19497 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming >Affects Versions: 2.1.0 >Reporter: Michael Armbrust >Assignee: Shixiong Zhu >Priority: Critical > Labels: release_notes > -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-19617) Fix the race condition when starting and stopping a query quickly
[ https://issues.apache.org/jira/browse/SPARK-19617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-19617: - Fix Version/s: 2.2.0 > Fix the race condition when starting and stopping a query quickly > - > > Key: SPARK-19617 > URL: https://issues.apache.org/jira/browse/SPARK-19617 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.0.2, 2.1.0 >Reporter: Shixiong Zhu >Assignee: Shixiong Zhu > Fix For: 2.2.0 > > > The streaming thread in StreamExecution uses the following ways to check if > it should exit: > - Catch an InterruptException. > - `StreamExecution.state` is TERMINATED. > when starting and stopping a query quickly, the above two checks may both > fail. > - Hit [HADOOP-14084|https://issues.apache.org/jira/browse/HADOOP-14084] and > swallow InterruptException > - StreamExecution.stop is called before `state` becomes `ACTIVE`. Then > [runBatches|https://github.com/apache/spark/blob/dcc2d540a53f0bd04baead43fdee1c170ef2b9f3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L252] > changes the state from `TERMINATED` to `ACTIVE`. > If the above cases both happen, the query will hang forever. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19525) Enable Compression of RDD Checkpoints
[ https://issues.apache.org/jira/browse/SPARK-19525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15872717#comment-15872717 ] Shixiong Zhu commented on SPARK-19525: -- I see. This is RDD checkpointing. Sounds a good idea. > Enable Compression of RDD Checkpoints > - > > Key: SPARK-19525 > URL: https://issues.apache.org/jira/browse/SPARK-19525 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.0 >Reporter: Aaditya Ramesh > > In our testing, compressing partitions while writing them to checkpoints on > HDFS using snappy helped performance significantly while also reducing the > variability of the checkpointing operation. In our tests, checkpointing time > was reduced by 3X, and variability was reduced by 2X for data sets of > compressed size approximately 1 GB. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-19525) Enable Compression of Spark Streaming Checkpoints
[ https://issues.apache.org/jira/browse/SPARK-19525?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-19525: - Component/s: (was: Structured Streaming) Spark Core > Enable Compression of Spark Streaming Checkpoints > - > > Key: SPARK-19525 > URL: https://issues.apache.org/jira/browse/SPARK-19525 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.0 >Reporter: Aaditya Ramesh > > In our testing, compressing partitions while writing them to checkpoints on > HDFS using snappy helped performance significantly while also reducing the > variability of the checkpointing operation. In our tests, checkpointing time > was reduced by 3X, and variability was reduced by 2X for data sets of > compressed size approximately 1 GB. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-19525) Enable Compression of RDD Checkpoints
[ https://issues.apache.org/jira/browse/SPARK-19525?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-19525: - Summary: Enable Compression of RDD Checkpoints (was: Enable Compression of Spark Streaming Checkpoints) > Enable Compression of RDD Checkpoints > - > > Key: SPARK-19525 > URL: https://issues.apache.org/jira/browse/SPARK-19525 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.0 >Reporter: Aaditya Ramesh > > In our testing, compressing partitions while writing them to checkpoints on > HDFS using snappy helped performance significantly while also reducing the > variability of the checkpointing operation. In our tests, checkpointing time > was reduced by 3X, and variability was reduced by 2X for data sets of > compressed size approximately 1 GB. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19644) Memory leak in Spark Streaming
[ https://issues.apache.org/jira/browse/SPARK-19644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15872672#comment-15872672 ] Shixiong Zhu commented on SPARK-19644: -- [~deenbandhu] Could you check the GC root, please? These objects are from Scala reflection. Did you run the job in Spark shell? > Memory leak in Spark Streaming > -- > > Key: SPARK-19644 > URL: https://issues.apache.org/jira/browse/SPARK-19644 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 2.0.2 > Environment: 3 AWS EC2 c3.xLarge > Number of cores - 3 > Number of executors 3 > Memory to each executor 2GB >Reporter: Deenbandhu Agarwal >Priority: Critical > Labels: memory_leak, performance > Attachments: heapdump.png > > > I am using streaming on the production for some aggregation and fetching data > from cassandra and saving data back to cassandra. > I see a gradual increase in old generation heap capacity from 1161216 Bytes > to 1397760 Bytes over a period of six hours. > After 50 hours of processing instances of class > scala.collection.immutable.$colon$colon incresed to 12,811,793 which is a > huge number. > I think this is a clear case of memory leak -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19525) Enable Compression of Spark Streaming Checkpoints
[ https://issues.apache.org/jira/browse/SPARK-19525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15872652#comment-15872652 ] Shixiong Zhu commented on SPARK-19525: -- Hm, Spark should support compression for data in RDD. Which code path did you find that not compressing data? > Enable Compression of Spark Streaming Checkpoints > - > > Key: SPARK-19525 > URL: https://issues.apache.org/jira/browse/SPARK-19525 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.1.0 >Reporter: Aaditya Ramesh > > In our testing, compressing partitions while writing them to checkpoints on > HDFS using snappy helped performance significantly while also reducing the > variability of the checkpointing operation. In our tests, checkpointing time > was reduced by 3X, and variability was reduced by 2X for data sets of > compressed size approximately 1 GB. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-19517) KafkaSource fails to initialize partition offsets
[ https://issues.apache.org/jira/browse/SPARK-19517?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu reassigned SPARK-19517: Assignee: Roberto Agostino Vitillo > KafkaSource fails to initialize partition offsets > - > > Key: SPARK-19517 > URL: https://issues.apache.org/jira/browse/SPARK-19517 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.1.0 >Reporter: Roberto Agostino Vitillo >Assignee: Roberto Agostino Vitillo >Priority: Blocker > Fix For: 2.1.1, 2.2.0 > > Attachments: SPARK-19517ProposalforfixingKafkaOffsetMetadata.pdf > > > A Kafka source with many partitions can cause the check-pointing logic to > fail on restart. I got the following exception when trying to restart a > Structured Streaming app that reads from a Kafka topic with hundred > partitions. > {code} > 17/02/08 15:10:09 ERROR StreamExecution: Query [id = > 24e2a21a-4545-4a3e-80ea-bbe777d883ab, runId = > 025609c9-d59c-4de3-88b3-5d5f7eda4a66] terminated with error > java.lang.IllegalArgumentException: Expected e.g. > {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}}, got {"telemetry":{"92":302854 > at > org.apache.spark.sql.kafka010.JsonUtils$.partitionOffsets(JsonUtils.scala:74) > at > org.apache.spark.sql.kafka010.KafkaSourceOffset$.apply(KafkaSourceOffset.scala:59) > at > org.apache.spark.sql.kafka010.KafkaSource$$anon$1.deserialize(KafkaSource.scala:134) > at > org.apache.spark.sql.kafka010.KafkaSource$$anon$1.deserialize(KafkaSource.scala:123) > at > org.apache.spark.sql.execution.streaming.HDFSMetadataLog.get(HDFSMetadataLog.scala:237) > at > org.apache.spark.sql.kafka010.KafkaSource.initialPartitionOffsets$lzycompute(KafkaSource.scala:138) > at > org.apache.spark.sql.kafka010.KafkaSource.initialPartitionOffsets(KafkaSource.scala:121) >… > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-19517) KafkaSource fails to initialize partition offsets
[ https://issues.apache.org/jira/browse/SPARK-19517?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu resolved SPARK-19517. -- Resolution: Fixed Fix Version/s: 2.2.0 2.1.1 > KafkaSource fails to initialize partition offsets > - > > Key: SPARK-19517 > URL: https://issues.apache.org/jira/browse/SPARK-19517 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.1.0 >Reporter: Roberto Agostino Vitillo >Priority: Blocker > Fix For: 2.1.1, 2.2.0 > > Attachments: SPARK-19517ProposalforfixingKafkaOffsetMetadata.pdf > > > A Kafka source with many partitions can cause the check-pointing logic to > fail on restart. I got the following exception when trying to restart a > Structured Streaming app that reads from a Kafka topic with hundred > partitions. > {code} > 17/02/08 15:10:09 ERROR StreamExecution: Query [id = > 24e2a21a-4545-4a3e-80ea-bbe777d883ab, runId = > 025609c9-d59c-4de3-88b3-5d5f7eda4a66] terminated with error > java.lang.IllegalArgumentException: Expected e.g. > {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}}, got {"telemetry":{"92":302854 > at > org.apache.spark.sql.kafka010.JsonUtils$.partitionOffsets(JsonUtils.scala:74) > at > org.apache.spark.sql.kafka010.KafkaSourceOffset$.apply(KafkaSourceOffset.scala:59) > at > org.apache.spark.sql.kafka010.KafkaSource$$anon$1.deserialize(KafkaSource.scala:134) > at > org.apache.spark.sql.kafka010.KafkaSource$$anon$1.deserialize(KafkaSource.scala:123) > at > org.apache.spark.sql.execution.streaming.HDFSMetadataLog.get(HDFSMetadataLog.scala:237) > at > org.apache.spark.sql.kafka010.KafkaSource.initialPartitionOffsets$lzycompute(KafkaSource.scala:138) > at > org.apache.spark.sql.kafka010.KafkaSource.initialPartitionOffsets(KafkaSource.scala:121) >… > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19645) structured streaming job restart bug
[ https://issues.apache.org/jira/browse/SPARK-19645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15872319#comment-15872319 ] Shixiong Zhu commented on SPARK-19645: -- [~guifengl...@gmail.com] Thanks for reporting. Could you submit a PR to fix it? > structured streaming job restart bug > > > Key: SPARK-19645 > URL: https://issues.apache.org/jira/browse/SPARK-19645 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.1.0 >Reporter: guifeng >Priority: Critical > > We are trying to use Structured Streaming in product, however currently > there exists a bug refer to the process of streaming job restart. > The following is the concrete error message: > {quote} >Caused by: java.lang.IllegalStateException: Error committing version 2 > into HDFSStateStore[id = (op=0, part=136), dir = > /tmp/Pipeline_112346-continueagg-bxaxs/state/0/136] > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.commit(HDFSBackedStateStoreProvider.scala:162) > at > org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3.apply(StatefulAggregate.scala:173) > at > org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3.apply(StatefulAggregate.scala:123) > at > org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:64) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:99) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.IOException: Failed to rename > /tmp/Pipeline_112346-continueagg-bxaxs/state/0/136/temp--5345709896617324284 > to /tmp/Pipeline_112346-continueagg-bxaxs/state/0/136/2.delta > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$commitUpdates(HDFSBackedStateStoreProvider.scala:259) > at > org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.commit(HDFSBackedStateStoreProvider.scala:156) > ... 14 more > {quote} > The bug can be easily reproduce when restart previous streaming job, and > the main reason is that when restart streaming job spark will recompute WAL > offsets and generate the same hdfs delta file(latest delta file generated > before restart and named "currentBatchId.delta") . In my opinion, this is a > bug. If you guy consider that this is a bug also, I can fix it. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19497) dropDuplicates with watermark
[ https://issues.apache.org/jira/browse/SPARK-19497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15871016#comment-15871016 ] Shixiong Zhu commented on SPARK-19497: -- [~samelamin] Thanks! I just submitted a PR. Could you help review it, please? > dropDuplicates with watermark > - > > Key: SPARK-19497 > URL: https://issues.apache.org/jira/browse/SPARK-19497 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming >Affects Versions: 2.1.0 >Reporter: Michael Armbrust >Assignee: Shixiong Zhu >Priority: Critical > -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-19628) Duplicate Spark jobs in 2.1.0
[ https://issues.apache.org/jira/browse/SPARK-19628?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-19628: - Component/s: (was: Spark Core) SQL > Duplicate Spark jobs in 2.1.0 > - > > Key: SPARK-19628 > URL: https://issues.apache.org/jira/browse/SPARK-19628 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 >Reporter: Jork Zijlstra > Fix For: 2.0.1 > > Attachments: spark2.0.1.png, spark2.1.0-examplecode.png, > spark2.1.0.png > > > After upgrading to Spark 2.1.0 we noticed that they are duplicate jobs > executed. Going back to Spark 2.0.1 they are gone again > {code} > import org.apache.spark.sql._ > object DoubleJobs { > def main(args: Array[String]) { > System.setProperty("hadoop.home.dir", "/tmp"); > val sparkSession: SparkSession = SparkSession.builder > .master("local[4]") > .appName("spark session example") > .config("spark.driver.maxResultSize", "6G") > .config("spark.sql.orc.filterPushdown", true) > .config("spark.sql.hive.metastorePartitionPruning", true) > .getOrCreate() > sparkSession.sqlContext.setConf("spark.sql.orc.filterPushdown", "true") > val paths = Seq( > ""//some orc source > ) > def dataFrame(path: String): DataFrame = { > sparkSession.read.orc(path) > } > paths.foreach(path => { > dataFrame(path).show(20) > }) > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-19617) Fix the race condition when starting and stopping a query quickly
[ https://issues.apache.org/jira/browse/SPARK-19617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-19617: - Description: The streaming thread in StreamExecution uses the following ways to check if it should exit: - Catch an InterruptException. - `StreamExecution.state` is TERMINATED. when starting and stopping a query quickly, the above two checks may both fail. - Hit [HADOOP-14084|https://issues.apache.org/jira/browse/HADOOP-14084] and swallow InterruptException - StreamExecution.stop is called before `state` becomes `ACTIVE`. Then [runBatches|https://github.com/apache/spark/blob/dcc2d540a53f0bd04baead43fdee1c170ef2b9f3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L252] changes the state from `TERMINATED` to `ACTIVE`. If the above cases both happen, the query will hang forever. was: Saw the following exception in some test log: {code} 17/02/14 21:20:10.987 stream execution thread for this_query [id = 09fd5d6d-bea3-4891-88c7-0d0f1909188d, runId = a564cb52-bc3d-47f1-8baf-7e0e4fa79a5e] WARN Shell: Interrupted while joining on: Thread[Thread-48,5,main] java.lang.InterruptedException at java.lang.Object.wait(Native Method) at java.lang.Thread.join(Thread.java:1249) at java.lang.Thread.join(Thread.java:1323) at org.apache.hadoop.util.Shell.joinThread(Shell.java:626) at org.apache.hadoop.util.Shell.runCommand(Shell.java:577) at org.apache.hadoop.util.Shell.run(Shell.java:479) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:773) at org.apache.hadoop.util.Shell.execCommand(Shell.java:866) at org.apache.hadoop.util.Shell.execCommand(Shell.java:849) at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:733) at org.apache.hadoop.fs.RawLocalFileSystem.mkOneDirWithMode(RawLocalFileSystem.java:491) at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:532) at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:509) at org.apache.hadoop.fs.FileSystem.primitiveMkdir(FileSystem.java:1066) at org.apache.hadoop.fs.DelegateToFileSystem.mkdir(DelegateToFileSystem.java:176) at org.apache.hadoop.fs.FilterFs.mkdir(FilterFs.java:197) at org.apache.hadoop.fs.FileContext$4.next(FileContext.java:730) at org.apache.hadoop.fs.FileContext$4.next(FileContext.java:726) at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90) at org.apache.hadoop.fs.FileContext.mkdir(FileContext.java:733) at org.apache.spark.sql.execution.streaming.HDFSMetadataLog$FileContextManager.mkdirs(HDFSMetadataLog.scala:385) at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.(HDFSMetadataLog.scala:75) at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.(CompactibleFileStreamLog.scala:46) at org.apache.spark.sql.execution.streaming.FileStreamSourceLog.(FileStreamSourceLog.scala:36) at org.apache.spark.sql.execution.streaming.FileStreamSource.(FileStreamSource.scala:59) at org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:246) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2.applyOrElse(StreamExecution.scala:145) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2.applyOrElse(StreamExecution.scala:141) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:268) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:268) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:267) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:257) at org.apache.spark.sql.execution.streaming.StreamExecution.logicalPlan$lzycompute(StreamExecution.scala:141) at org.apache.spark.sql.execution.streaming.StreamExecution.logicalPlan(StreamExecution.scala:136) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:252) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:191) {code} This is the cause of some test timeout failures on Jenkins. > Fix the race condition when starting and stopping a query quickly > - > > Key: SPARK-19617 > URL: https://issues.apache.org/jira/browse/SPARK-19617 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Ve
[jira] [Updated] (SPARK-19617) Fix the race condition when starting and stopping a query quickly
[ https://issues.apache.org/jira/browse/SPARK-19617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-19617: - Summary: Fix the race condition when starting and stopping a query quickly (was: Fix a case that a query may not stop due to HADOOP-14084) > Fix the race condition when starting and stopping a query quickly > - > > Key: SPARK-19617 > URL: https://issues.apache.org/jira/browse/SPARK-19617 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.0.2, 2.1.0 >Reporter: Shixiong Zhu >Assignee: Shixiong Zhu > > Saw the following exception in some test log: > {code} > 17/02/14 21:20:10.987 stream execution thread for this_query [id = > 09fd5d6d-bea3-4891-88c7-0d0f1909188d, runId = > a564cb52-bc3d-47f1-8baf-7e0e4fa79a5e] WARN Shell: Interrupted while joining > on: Thread[Thread-48,5,main] > java.lang.InterruptedException > at java.lang.Object.wait(Native Method) > at java.lang.Thread.join(Thread.java:1249) > at java.lang.Thread.join(Thread.java:1323) > at org.apache.hadoop.util.Shell.joinThread(Shell.java:626) > at org.apache.hadoop.util.Shell.runCommand(Shell.java:577) > at org.apache.hadoop.util.Shell.run(Shell.java:479) > at > org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:773) > at org.apache.hadoop.util.Shell.execCommand(Shell.java:866) > at org.apache.hadoop.util.Shell.execCommand(Shell.java:849) > at > org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:733) > at > org.apache.hadoop.fs.RawLocalFileSystem.mkOneDirWithMode(RawLocalFileSystem.java:491) > at > org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:532) > at > org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:509) > at org.apache.hadoop.fs.FileSystem.primitiveMkdir(FileSystem.java:1066) > at > org.apache.hadoop.fs.DelegateToFileSystem.mkdir(DelegateToFileSystem.java:176) > at org.apache.hadoop.fs.FilterFs.mkdir(FilterFs.java:197) > at org.apache.hadoop.fs.FileContext$4.next(FileContext.java:730) > at org.apache.hadoop.fs.FileContext$4.next(FileContext.java:726) > at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90) > at org.apache.hadoop.fs.FileContext.mkdir(FileContext.java:733) > at > org.apache.spark.sql.execution.streaming.HDFSMetadataLog$FileContextManager.mkdirs(HDFSMetadataLog.scala:385) > at > org.apache.spark.sql.execution.streaming.HDFSMetadataLog.(HDFSMetadataLog.scala:75) > at > org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.(CompactibleFileStreamLog.scala:46) > at > org.apache.spark.sql.execution.streaming.FileStreamSourceLog.(FileStreamSourceLog.scala:36) > at > org.apache.spark.sql.execution.streaming.FileStreamSource.(FileStreamSource.scala:59) > at > org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:246) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2.applyOrElse(StreamExecution.scala:145) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2.applyOrElse(StreamExecution.scala:141) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:268) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:268) > at > org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:267) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:257) > at > org.apache.spark.sql.execution.streaming.StreamExecution.logicalPlan$lzycompute(StreamExecution.scala:141) > at > org.apache.spark.sql.execution.streaming.StreamExecution.logicalPlan(StreamExecution.scala:136) > at > org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:252) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:191) > {code} > This is the cause of some test timeout failures on Jenkins. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-19603) Fix StreamingQuery explain command
[ https://issues.apache.org/jira/browse/SPARK-19603?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu resolved SPARK-19603. -- Resolution: Fixed Fix Version/s: 2.2.0 2.1.1 > Fix StreamingQuery explain command > -- > > Key: SPARK-19603 > URL: https://issues.apache.org/jira/browse/SPARK-19603 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.0.2, 2.1.0 >Reporter: Shixiong Zhu >Assignee: Shixiong Zhu > Fix For: 2.1.1, 2.2.0 > > > Right now StreamingQuery.explain doesn't show the correct streaming physical > plan. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-19617) Fix a case that a query may not stop due to HADOOP-14084
Shixiong Zhu created SPARK-19617: Summary: Fix a case that a query may not stop due to HADOOP-14084 Key: SPARK-19617 URL: https://issues.apache.org/jira/browse/SPARK-19617 Project: Spark Issue Type: Bug Components: Structured Streaming Affects Versions: 2.1.0, 2.0.2 Reporter: Shixiong Zhu Assignee: Shixiong Zhu Saw the following exception in some test log: {code} 17/02/14 21:20:10.987 stream execution thread for this_query [id = 09fd5d6d-bea3-4891-88c7-0d0f1909188d, runId = a564cb52-bc3d-47f1-8baf-7e0e4fa79a5e] WARN Shell: Interrupted while joining on: Thread[Thread-48,5,main] java.lang.InterruptedException at java.lang.Object.wait(Native Method) at java.lang.Thread.join(Thread.java:1249) at java.lang.Thread.join(Thread.java:1323) at org.apache.hadoop.util.Shell.joinThread(Shell.java:626) at org.apache.hadoop.util.Shell.runCommand(Shell.java:577) at org.apache.hadoop.util.Shell.run(Shell.java:479) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:773) at org.apache.hadoop.util.Shell.execCommand(Shell.java:866) at org.apache.hadoop.util.Shell.execCommand(Shell.java:849) at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:733) at org.apache.hadoop.fs.RawLocalFileSystem.mkOneDirWithMode(RawLocalFileSystem.java:491) at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:532) at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:509) at org.apache.hadoop.fs.FileSystem.primitiveMkdir(FileSystem.java:1066) at org.apache.hadoop.fs.DelegateToFileSystem.mkdir(DelegateToFileSystem.java:176) at org.apache.hadoop.fs.FilterFs.mkdir(FilterFs.java:197) at org.apache.hadoop.fs.FileContext$4.next(FileContext.java:730) at org.apache.hadoop.fs.FileContext$4.next(FileContext.java:726) at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90) at org.apache.hadoop.fs.FileContext.mkdir(FileContext.java:733) at org.apache.spark.sql.execution.streaming.HDFSMetadataLog$FileContextManager.mkdirs(HDFSMetadataLog.scala:385) at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.(HDFSMetadataLog.scala:75) at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.(CompactibleFileStreamLog.scala:46) at org.apache.spark.sql.execution.streaming.FileStreamSourceLog.(FileStreamSourceLog.scala:36) at org.apache.spark.sql.execution.streaming.FileStreamSource.(FileStreamSource.scala:59) at org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:246) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2.applyOrElse(StreamExecution.scala:145) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2.applyOrElse(StreamExecution.scala:141) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:268) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:268) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:267) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:257) at org.apache.spark.sql.execution.streaming.StreamExecution.logicalPlan$lzycompute(StreamExecution.scala:141) at org.apache.spark.sql.execution.streaming.StreamExecution.logicalPlan(StreamExecution.scala:136) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:252) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:191) {code} This is the cause of some test timeout failures on Jenkins. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-19599) Clean up HDFSMetadataLog
[ https://issues.apache.org/jira/browse/SPARK-19599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu resolved SPARK-19599. -- Resolution: Fixed Assignee: Shixiong Zhu Fix Version/s: 2.2.0 2.1.1 > Clean up HDFSMetadataLog > > > Key: SPARK-19599 > URL: https://issues.apache.org/jira/browse/SPARK-19599 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Shixiong Zhu >Assignee: Shixiong Zhu > Fix For: 2.1.1, 2.2.0 > > > SPARK-19464 removed support for Hadoop 2.5 and earlier, so we can do some > cleanup for HDFSMetadataLog > Updated: Unfortunately, there is another issue HADOOP-14084 that prevents us > from removing the workaround codes. Anyway, I sill did some clean up and also > updated the comments to point to HADOOP-14084. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-19599) Clean up HDFSMetadataLog
[ https://issues.apache.org/jira/browse/SPARK-19599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-19599: - Description: SPARK-19464 removed support for Hadoop 2.5 and earlier, so we can do some cleanup for HDFSMetadataLog Updated: Unfortunately, there is another issue HADOOP-14084 that prevents us from removing the workaround codes. Anyway, I sill did some clean up to make HDFSMetadataLog simply. was: SPARK-19464 removed support for Hadoop 2.5 and earlier, so we can do some cleanup for HDFSMetadataLog Updated: Unfortunately, there is another issue HADOOP-14084 that prevents us from removing the workaround codes. > Clean up HDFSMetadataLog > > > Key: SPARK-19599 > URL: https://issues.apache.org/jira/browse/SPARK-19599 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Shixiong Zhu > > SPARK-19464 removed support for Hadoop 2.5 and earlier, so we can do some > cleanup for HDFSMetadataLog > Updated: Unfortunately, there is another issue HADOOP-14084 that prevents us > from removing the workaround codes. Anyway, I sill did some clean up to make > HDFSMetadataLog simply. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-19599) Clean up HDFSMetadataLog
[ https://issues.apache.org/jira/browse/SPARK-19599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-19599: - Description: SPARK-19464 removed support for Hadoop 2.5 and earlier, so we can do some cleanup for HDFSMetadataLog Updated: Unfortunately, there is another issue HADOOP-14084 that prevents us from removing the workaround codes. Anyway, I sill did some clean up and also updated the comments to point to HADOOP-14084. was: SPARK-19464 removed support for Hadoop 2.5 and earlier, so we can do some cleanup for HDFSMetadataLog Updated: Unfortunately, there is another issue HADOOP-14084 that prevents us from removing the workaround codes. Anyway, I sill did some clean up to make HDFSMetadataLog simple. > Clean up HDFSMetadataLog > > > Key: SPARK-19599 > URL: https://issues.apache.org/jira/browse/SPARK-19599 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Shixiong Zhu > > SPARK-19464 removed support for Hadoop 2.5 and earlier, so we can do some > cleanup for HDFSMetadataLog > Updated: Unfortunately, there is another issue HADOOP-14084 that prevents us > from removing the workaround codes. Anyway, I sill did some clean up and also > updated the comments to point to HADOOP-14084. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-19599) Clean up HDFSMetadataLog
[ https://issues.apache.org/jira/browse/SPARK-19599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-19599: - Description: SPARK-19464 removed support for Hadoop 2.5 and earlier, so we can do some cleanup for HDFSMetadataLog Updated: Unfortunately, there is another issue HADOOP-14084 that prevents us from removing the workaround codes. Anyway, I sill did some clean up to make HDFSMetadataLog simple. was: SPARK-19464 removed support for Hadoop 2.5 and earlier, so we can do some cleanup for HDFSMetadataLog Updated: Unfortunately, there is another issue HADOOP-14084 that prevents us from removing the workaround codes. Anyway, I sill did some clean up to make HDFSMetadataLog simply. > Clean up HDFSMetadataLog > > > Key: SPARK-19599 > URL: https://issues.apache.org/jira/browse/SPARK-19599 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Shixiong Zhu > > SPARK-19464 removed support for Hadoop 2.5 and earlier, so we can do some > cleanup for HDFSMetadataLog > Updated: Unfortunately, there is another issue HADOOP-14084 that prevents us > from removing the workaround codes. Anyway, I sill did some clean up to make > HDFSMetadataLog simple. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-19599) Clean up HDFSMetadataLog
[ https://issues.apache.org/jira/browse/SPARK-19599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-19599: - Summary: Clean up HDFSMetadataLog (was: Clean up HDFSMetadataLog for Hadoop 2.6+) > Clean up HDFSMetadataLog > > > Key: SPARK-19599 > URL: https://issues.apache.org/jira/browse/SPARK-19599 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Shixiong Zhu > > SPARK-19464 removed support for Hadoop 2.5 and earlier, so we can do some > cleanup for HDFSMetadataLog > Updated: Unfortunately, there is another issue HADOOP-14084 that prevents us > from removing the workaround codes. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-19599) Clean up HDFSMetadataLog for Hadoop 2.6+
[ https://issues.apache.org/jira/browse/SPARK-19599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-19599: - Description: SPARK-19464 removed support for Hadoop 2.5 and earlier, so we can do some cleanup for HDFSMetadataLog Updated: Unfortunately, there is another issue HADOOP-14084 that prevents us from removing the workaround codes. was:SPARK-19464 removed support for Hadoop 2.5 and earlier, so we can do some cleanup for HDFSMetadataLog > Clean up HDFSMetadataLog for Hadoop 2.6+ > > > Key: SPARK-19599 > URL: https://issues.apache.org/jira/browse/SPARK-19599 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Shixiong Zhu > > SPARK-19464 removed support for Hadoop 2.5 and earlier, so we can do some > cleanup for HDFSMetadataLog > Updated: Unfortunately, there is another issue HADOOP-14084 that prevents us > from removing the workaround codes. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19594) StreamingQueryListener fails to handle QueryTerminatedEvent if more then one listeners exists
[ https://issues.apache.org/jira/browse/SPARK-19594?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15868469#comment-15868469 ] Shixiong Zhu commented on SPARK-19594: -- I suggest that overriding "def postToAll(event: E)" and remove the query id after all listeners process the event. > StreamingQueryListener fails to handle QueryTerminatedEvent if more then one > listeners exists > - > > Key: SPARK-19594 > URL: https://issues.apache.org/jira/browse/SPARK-19594 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.1.0 >Reporter: Eyal Zituny >Priority: Minor > > reproduce: > *create a spark session > *add multiple streaming query listeners > *create a simple query > *stop the query > result -> only the first listener handle the QueryTerminatedEvent > this might happen because the query run id is being removed from > activeQueryRunIds once the onQueryTerminated is called > (StreamingQueryListenerBus:115) -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17689) _temporary files breaks the Spark SQL streaming job.
[ https://issues.apache.org/jira/browse/SPARK-17689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15868466#comment-15868466 ] Shixiong Zhu commented on SPARK-17689: -- Just curious: who created "_temporary"? > _temporary files breaks the Spark SQL streaming job. > > > Key: SPARK-17689 > URL: https://issues.apache.org/jira/browse/SPARK-17689 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Reporter: Prashant Sharma > > Steps to reproduce: > 1) Start a streaming job which reads from HDFS location hdfs://xyz/* > 2) Write content to hdfs://xyz/a > . > . > repeat a few times. > And then job breaks as follows. > org.apache.spark.SparkException: Job aborted due to stage failure: Task 49 in > stage 304.0 failed 1 times, most recent failure: Lost task 49.0 in stage > 304.0 (TID 14794, localhost): java.io.FileNotFoundException: File does not > exist: hdfs://localhost:9000/input/t5/_temporary > at > org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1309) > at > org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301) > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > at > org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1317) > at > org.apache.spark.sql.execution.datasources.HadoopFsRelation$$anonfun$7$$anonfun$apply$4.apply(fileSourceInterfaces.scala:464) > at > org.apache.spark.sql.execution.datasources.HadoopFsRelation$$anonfun$7$$anonfun$apply$4.apply(fileSourceInterfaces.scala:462) > at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > at > scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) > at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) > at scala.collection.AbstractIterator.to(Iterator.scala:1336) > at > scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336) > at > scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) > at scala.collection.AbstractIterator.toArray(Iterator.scala:1336) > at > org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:912) > at > org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:912) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1919) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1919) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) > at org.apache.spark.scheduler.Task.run(Task.scala:86) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-19603) Fix StreamingQuery explain command
[ https://issues.apache.org/jira/browse/SPARK-19603?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-19603: - Summary: Fix StreamingQuery explain command (was: Fix the stream explain command) > Fix StreamingQuery explain command > -- > > Key: SPARK-19603 > URL: https://issues.apache.org/jira/browse/SPARK-19603 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.0.2, 2.1.0 >Reporter: Shixiong Zhu >Assignee: Shixiong Zhu > > Right now StreamingQuery.explain doesn't show the correct streaming physical > plan. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-19603) Fix the stream explain command
Shixiong Zhu created SPARK-19603: Summary: Fix the stream explain command Key: SPARK-19603 URL: https://issues.apache.org/jira/browse/SPARK-19603 Project: Spark Issue Type: Bug Components: Structured Streaming Affects Versions: 2.1.0, 2.0.2 Reporter: Shixiong Zhu Assignee: Shixiong Zhu Right now StreamingQuery.explain doesn't show the correct streaming physical plan. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org