[jira] [Updated] (SPARK-15943) Spark driver hangs up periodically (cannot receive any reply in 120 seconds)
[ https://issues.apache.org/jira/browse/SPARK-15943?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Zhemzhitsky updated SPARK-15943: --- Attachment: executor-threaddump.log driver-stacktrace.log > Spark driver hangs up periodically (cannot receive any reply in 120 seconds) > > > Key: SPARK-15943 > URL: https://issues.apache.org/jira/browse/SPARK-15943 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.6.0 > Environment: Ubuntu Server 14.04 > Spark 1.6.0 > Hadoop 2.6.0 >Reporter: Sergey Zhemzhitsky >Priority: Critical > Labels: spark > Attachments: driver-stacktrace.log, executor-threaddump.log > > > Spark driver hangs up periodically with the following stacktrace on driver > [^driver-stacktrace.log] and threaddump on the executor that causes this > issue [^executor-threaddump.log] -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-15943) Spark driver hangs up periodically (cannot receive any reply in 120 seconds)
[ https://issues.apache.org/jira/browse/SPARK-15943?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Zhemzhitsky updated SPARK-15943: --- Attachment: executor-threaddump-jstack.log > Spark driver hangs up periodically (cannot receive any reply in 120 seconds) > > > Key: SPARK-15943 > URL: https://issues.apache.org/jira/browse/SPARK-15943 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.6.0 > Environment: Ubuntu Server 14.04 > Spark 1.6.0 > Hadoop 2.6.0 >Reporter: Sergey Zhemzhitsky >Priority: Critical > Labels: spark > Attachments: driver-stacktrace.log, executor-threaddump-jstack.log, > executor-threaddump.log > > > Spark driver hangs up periodically with the following stacktrace on driver > [^driver-stacktrace.log] and threaddump on the executor that causes this > issue [^executor-threaddump.log] -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-15943) Spark driver hangs up periodically (cannot receive any reply in 120 seconds)
[ https://issues.apache.org/jira/browse/SPARK-15943?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Zhemzhitsky updated SPARK-15943: --- Description: Spark driver hangs up periodically with the following stacktrace on driver [^driver-stacktrace.log] and threaddump on the executor that causes this issue [^executor-threaddump.log] The attached [^executor-threaddump.log] has been retrieved by means of spark user interface. Trying to dump the threads by means of jstack allowed us to retrieve only the following stacktrace [^] was: Spark driver hangs up periodically with the following stacktrace on driver [^driver-stacktrace.log] and threaddump on the executor that causes this issue [^executor-threaddump.log] > Spark driver hangs up periodically (cannot receive any reply in 120 seconds) > > > Key: SPARK-15943 > URL: https://issues.apache.org/jira/browse/SPARK-15943 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.6.0 > Environment: Ubuntu Server 14.04 > Spark 1.6.0 > Hadoop 2.6.0 >Reporter: Sergey Zhemzhitsky >Priority: Critical > Labels: spark > Attachments: driver-stacktrace.log, executor-threaddump-jstack.log, > executor-threaddump.log > > > Spark driver hangs up periodically with the following stacktrace on driver > [^driver-stacktrace.log] and threaddump on the executor that causes this > issue [^executor-threaddump.log] > The attached [^executor-threaddump.log] has been retrieved by means of spark > user interface. Trying to dump the threads by means of jstack allowed us to > retrieve only the following stacktrace [^] -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-15943) Spark driver hangs up periodically (cannot receive any reply in 120 seconds)
[ https://issues.apache.org/jira/browse/SPARK-15943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15329581#comment-15329581 ] Sergey Zhemzhitsky commented on SPARK-15943: Will try to use *spark.memory.useLegacyMode* to see if it helps until spark 1.6.2 is released. > Spark driver hangs up periodically (cannot receive any reply in 120 seconds) > > > Key: SPARK-15943 > URL: https://issues.apache.org/jira/browse/SPARK-15943 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.6.0 > Environment: Ubuntu Server 14.04 > Spark 1.6.0 > Hadoop 2.6.0 >Reporter: Sergey Zhemzhitsky >Priority: Critical > Labels: spark > Attachments: driver-stacktrace.log, executor-threaddump-jstack.log, > executor-threaddump.log > > > Spark driver hangs up periodically with the following stacktrace on driver > [^driver-stacktrace.log] and threaddump on the executor that causes this > issue [^executor-threaddump.log] > The attached [^executor-threaddump.log] has been retrieved by means of spark > user interface. Trying to dump the threads by means of *jstack -l -F * > allowed us to retrieve only the following stacktrace > [^executor-threaddump-jstack.log] -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-15943) Spark driver hangs up periodically (cannot receive any reply in 120 seconds)
Sergey Zhemzhitsky created SPARK-15943: -- Summary: Spark driver hangs up periodically (cannot receive any reply in 120 seconds) Key: SPARK-15943 URL: https://issues.apache.org/jira/browse/SPARK-15943 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.6.0 Environment: Ubuntu Server 14.04 Spark 1.6.0 Hadoop 2.6.0 Reporter: Sergey Zhemzhitsky Priority: Critical Spark driver hangs up periodically with the following stacktrace on driver [^driver-stacktrace.log] and threaddump on the executor that causes this issue [^executor-threaddump.log] -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-15943) Spark driver hangs up periodically (cannot receive any reply in 120 seconds)
[ https://issues.apache.org/jira/browse/SPARK-15943?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Zhemzhitsky updated SPARK-15943: --- Description: Spark driver hangs up periodically with the following stacktrace on driver [^driver-stacktrace.log] and threaddump on the executor that causes this issue [^executor-threaddump.log] The attached [^executor-threaddump.log] has been retrieved by means of spark user interface. Trying to dump the threads by means of *jstack -l -F * allowed us to retrieve only the following stacktrace [^executor-threaddump-jstack.log] was: Spark driver hangs up periodically with the following stacktrace on driver [^driver-stacktrace.log] and threaddump on the executor that causes this issue [^executor-threaddump.log] The attached [^executor-threaddump.log] has been retrieved by means of spark user interface. Trying to dump the threads by means of jstack allowed us to retrieve only the following stacktrace [^] > Spark driver hangs up periodically (cannot receive any reply in 120 seconds) > > > Key: SPARK-15943 > URL: https://issues.apache.org/jira/browse/SPARK-15943 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.6.0 > Environment: Ubuntu Server 14.04 > Spark 1.6.0 > Hadoop 2.6.0 >Reporter: Sergey Zhemzhitsky >Priority: Critical > Labels: spark > Attachments: driver-stacktrace.log, executor-threaddump-jstack.log, > executor-threaddump.log > > > Spark driver hangs up periodically with the following stacktrace on driver > [^driver-stacktrace.log] and threaddump on the executor that causes this > issue [^executor-threaddump.log] > The attached [^executor-threaddump.log] has been retrieved by means of spark > user interface. Trying to dump the threads by means of *jstack -l -F * > allowed us to retrieve only the following stacktrace > [^executor-threaddump-jstack.log] -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-15943) Spark driver hangs up periodically (cannot receive any reply in 120 seconds)
[ https://issues.apache.org/jira/browse/SPARK-15943?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Zhemzhitsky updated SPARK-15943: --- Priority: Major (was: Critical) > Spark driver hangs up periodically (cannot receive any reply in 120 seconds) > > > Key: SPARK-15943 > URL: https://issues.apache.org/jira/browse/SPARK-15943 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.6.0 > Environment: Ubuntu Server 14.04 > Spark 1.6.0 > Hadoop 2.6.0 >Reporter: Sergey Zhemzhitsky > Labels: spark > Attachments: driver-stacktrace.log, executor-threaddump-jstack.log, > executor-threaddump.log > > > Spark driver hangs up periodically with the following stacktrace on driver > [^driver-stacktrace.log] and threaddump on the executor that causes this > issue [^executor-threaddump.log] > The attached [^executor-threaddump.log] has been retrieved by means of spark > user interface. Trying to dump the threads by means of *jstack -l -F * > allowed us to retrieve only the following stacktrace > [^executor-threaddump-jstack.log] -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20404) Regression with accumulator names when migrating from 1.6 to 2.x
[ https://issues.apache.org/jira/browse/SPARK-20404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Zhemzhitsky updated SPARK-20404: --- Environment: Spark: 2.1 Scala: 2.11 Spark master: local > Regression with accumulator names when migrating from 1.6 to 2.x > > > Key: SPARK-20404 > URL: https://issues.apache.org/jira/browse/SPARK-20404 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.0.0, 2.0.1, 2.0.2, 2.1.0 > Environment: Spark: 2.1 > Scala: 2.11 > Spark master: local >Reporter: Sergey Zhemzhitsky > Attachments: spark-context-accum-option.patch > > > Creating accumulator with explicitly specified name equal to _null_, like the > following > {code:java} > sparkContext.accumulator(0, null) > {code} > throws exception at runtime > {code:none} > ERROR | DAGScheduler | dag-scheduler-event-loop | Failed to update > accumulators for task 0 > java.lang.NullPointerException > at > org.apache.spark.util.AccumulatorV2$$anonfun$1.apply(AccumulatorV2.scala:106) > at > org.apache.spark.util.AccumulatorV2$$anonfun$1.apply(AccumulatorV2.scala:106) > at scala.Option.exists(Option.scala:240) > at org.apache.spark.util.AccumulatorV2.toInfo(AccumulatorV2.scala:106) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1091) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1080) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > org.apache.spark.scheduler.DAGScheduler.updateAccumulators(DAGScheduler.scala:1080) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1183) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1647) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > {code} > The issue is in wrapping name into _Some_ instead of _Option_ when creating > accumulators. > Patch is available. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20404) Regression with accumulator names when migrating from 1.6 to 2.x
[ https://issues.apache.org/jira/browse/SPARK-20404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15976506#comment-15976506 ] Sergey Zhemzhitsky commented on SPARK-20404: I would agree, if the error occurs just at the time of creating the accumulator, but in this case the error may occur at any time (possibly after hours of running the job) when you will try to update the accumulator. So, for me the current behaviour seems to be misleading and confusing, rather than an expected one. > Regression with accumulator names when migrating from 1.6 to 2.x > > > Key: SPARK-20404 > URL: https://issues.apache.org/jira/browse/SPARK-20404 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.0.0, 2.0.1, 2.0.2, 2.1.0 > Environment: Spark: 2.1 > Scala: 2.11 > Spark master: local >Reporter: Sergey Zhemzhitsky > Attachments: spark-context-accum-option.patch > > > Creating accumulator with explicitly specified name equal to _null_, like the > following > {code:java} > sparkContext.accumulator(0, null) > {code} > throws exception at runtime > {code:none} > ERROR | DAGScheduler | dag-scheduler-event-loop | Failed to update > accumulators for task 0 > java.lang.NullPointerException > at > org.apache.spark.util.AccumulatorV2$$anonfun$1.apply(AccumulatorV2.scala:106) > at > org.apache.spark.util.AccumulatorV2$$anonfun$1.apply(AccumulatorV2.scala:106) > at scala.Option.exists(Option.scala:240) > at org.apache.spark.util.AccumulatorV2.toInfo(AccumulatorV2.scala:106) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1091) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1080) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > org.apache.spark.scheduler.DAGScheduler.updateAccumulators(DAGScheduler.scala:1080) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1183) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1647) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > {code} > The issue is in wrapping name into _Some_ instead of _Option_ when creating > accumulators. > Patch is available. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20404) Regression with accumulator names when migrating from 1.6 to 2.x
[ https://issues.apache.org/jira/browse/SPARK-20404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15976400#comment-15976400 ] Sergey Zhemzhitsky commented on SPARK-20404: Is it a normal behavior for spark 2+? It seems that using Option to wrap a value, that may or may not be null is more appropriate way, than using Some. > Regression with accumulator names when migrating from 1.6 to 2.x > > > Key: SPARK-20404 > URL: https://issues.apache.org/jira/browse/SPARK-20404 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.0.0, 2.0.1, 2.0.2, 2.1.0 > Environment: Spark: 2.1 > Scala: 2.11 > Spark master: local >Reporter: Sergey Zhemzhitsky > Attachments: spark-context-accum-option.patch > > > Creating accumulator with explicitly specified name equal to _null_, like the > following > {code:java} > sparkContext.accumulator(0, null) > {code} > throws exception at runtime > {code:none} > ERROR | DAGScheduler | dag-scheduler-event-loop | Failed to update > accumulators for task 0 > java.lang.NullPointerException > at > org.apache.spark.util.AccumulatorV2$$anonfun$1.apply(AccumulatorV2.scala:106) > at > org.apache.spark.util.AccumulatorV2$$anonfun$1.apply(AccumulatorV2.scala:106) > at scala.Option.exists(Option.scala:240) > at org.apache.spark.util.AccumulatorV2.toInfo(AccumulatorV2.scala:106) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1091) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1080) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > org.apache.spark.scheduler.DAGScheduler.updateAccumulators(DAGScheduler.scala:1080) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1183) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1647) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > {code} > The issue is in wrapping name into _Some_ instead of _Option_ when creating > accumulators. > Patch is available. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20404) Regression with accumulator names when migrating from 1.6 to 2.x
[ https://issues.apache.org/jira/browse/SPARK-20404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Zhemzhitsky updated SPARK-20404: --- Attachment: (was: spark-context-accum-option.patch) > Regression with accumulator names when migrating from 1.6 to 2.x > > > Key: SPARK-20404 > URL: https://issues.apache.org/jira/browse/SPARK-20404 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.0.0, 2.0.1, 2.0.2, 2.1.0 > Environment: Spark: 2.1 > Scala: 2.11 > Spark master: local >Reporter: Sergey Zhemzhitsky > Attachments: spark-context-accum-option.patch > > > Creating accumulator with explicitly specified name equal to _null_, like the > following > {code:java} > sparkContext.accumulator(0, null) > {code} > throws exception at runtime > {code:none} > ERROR | DAGScheduler | dag-scheduler-event-loop | Failed to update > accumulators for task 0 > java.lang.NullPointerException > at > org.apache.spark.util.AccumulatorV2$$anonfun$1.apply(AccumulatorV2.scala:106) > at > org.apache.spark.util.AccumulatorV2$$anonfun$1.apply(AccumulatorV2.scala:106) > at scala.Option.exists(Option.scala:240) > at org.apache.spark.util.AccumulatorV2.toInfo(AccumulatorV2.scala:106) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1091) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1080) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > org.apache.spark.scheduler.DAGScheduler.updateAccumulators(DAGScheduler.scala:1080) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1183) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1647) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > {code} > The issue is in wrapping name into _Some_ instead of _Option_ when creating > accumulators. > Patch is available. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20404) Regression with accumulator names when migrating from 1.6 to 2.x
[ https://issues.apache.org/jira/browse/SPARK-20404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Zhemzhitsky updated SPARK-20404: --- Attachment: spark-context-accum-option.patch > Regression with accumulator names when migrating from 1.6 to 2.x > > > Key: SPARK-20404 > URL: https://issues.apache.org/jira/browse/SPARK-20404 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.0.0, 2.0.1, 2.0.2, 2.1.0 > Environment: Spark: 2.1 > Scala: 2.11 > Spark master: local >Reporter: Sergey Zhemzhitsky > Attachments: spark-context-accum-option.patch > > > Creating accumulator with explicitly specified name equal to _null_, like the > following > {code:java} > sparkContext.accumulator(0, null) > {code} > throws exception at runtime > {code:none} > ERROR | DAGScheduler | dag-scheduler-event-loop | Failed to update > accumulators for task 0 > java.lang.NullPointerException > at > org.apache.spark.util.AccumulatorV2$$anonfun$1.apply(AccumulatorV2.scala:106) > at > org.apache.spark.util.AccumulatorV2$$anonfun$1.apply(AccumulatorV2.scala:106) > at scala.Option.exists(Option.scala:240) > at org.apache.spark.util.AccumulatorV2.toInfo(AccumulatorV2.scala:106) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1091) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1080) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > org.apache.spark.scheduler.DAGScheduler.updateAccumulators(DAGScheduler.scala:1080) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1183) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1647) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > {code} > The issue is in wrapping name into _Some_ instead of _Option_ when creating > accumulators. > Patch is available. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20404) Regression with accumulator names when migrating from 1.6 to 2.x
[ https://issues.apache.org/jira/browse/SPARK-20404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15976726#comment-15976726 ] Sergey Zhemzhitsky commented on SPARK-20404: Thanks for drawing attention to this. Just updated the patch. > Regression with accumulator names when migrating from 1.6 to 2.x > > > Key: SPARK-20404 > URL: https://issues.apache.org/jira/browse/SPARK-20404 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.0.0, 2.0.1, 2.0.2, 2.1.0 > Environment: Spark: 2.1 > Scala: 2.11 > Spark master: local >Reporter: Sergey Zhemzhitsky > Attachments: spark-context-accum-option.patch > > > Creating accumulator with explicitly specified name equal to _null_, like the > following > {code:java} > sparkContext.accumulator(0, null) > {code} > throws exception at runtime > {code:none} > ERROR | DAGScheduler | dag-scheduler-event-loop | Failed to update > accumulators for task 0 > java.lang.NullPointerException > at > org.apache.spark.util.AccumulatorV2$$anonfun$1.apply(AccumulatorV2.scala:106) > at > org.apache.spark.util.AccumulatorV2$$anonfun$1.apply(AccumulatorV2.scala:106) > at scala.Option.exists(Option.scala:240) > at org.apache.spark.util.AccumulatorV2.toInfo(AccumulatorV2.scala:106) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1091) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1080) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > org.apache.spark.scheduler.DAGScheduler.updateAccumulators(DAGScheduler.scala:1080) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1183) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1647) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > {code} > The issue is in wrapping name into _Some_ instead of _Option_ when creating > accumulators. > Patch is available. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-20404) Regression with accumulator names when migrating from 1.6 to 2.x
Sergey Zhemzhitsky created SPARK-20404: -- Summary: Regression with accumulator names when migrating from 1.6 to 2.x Key: SPARK-20404 URL: https://issues.apache.org/jira/browse/SPARK-20404 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.1.0, 2.0.2, 2.0.1, 2.0.0 Reporter: Sergey Zhemzhitsky Creating accumulator with explicitly specified name equal to _null_, like the following {code:java} sparkContext.accumulator(0, null) {code} throws exception at runtime {code:none} ERROR | DAGScheduler | dag-scheduler-event-loop | Failed to update accumulators for task 0 java.lang.NullPointerException at org.apache.spark.util.AccumulatorV2$$anonfun$1.apply(AccumulatorV2.scala:106) at org.apache.spark.util.AccumulatorV2$$anonfun$1.apply(AccumulatorV2.scala:106) at scala.Option.exists(Option.scala:240) at org.apache.spark.util.AccumulatorV2.toInfo(AccumulatorV2.scala:106) at org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1091) at org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1080) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.updateAccumulators(DAGScheduler.scala:1080) at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1183) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1647) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) {code} The issue is in wrapping name into `Some` instead of `Option` when creating accumulators. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20404) Regression with accumulator names when migrating from 1.6 to 2.x
[ https://issues.apache.org/jira/browse/SPARK-20404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Zhemzhitsky updated SPARK-20404: --- Description: Creating accumulator with explicitly specified name equal to _null_, like the following {code:java} sparkContext.accumulator(0, null) {code} throws exception at runtime {code:none} ERROR | DAGScheduler | dag-scheduler-event-loop | Failed to update accumulators for task 0 java.lang.NullPointerException at org.apache.spark.util.AccumulatorV2$$anonfun$1.apply(AccumulatorV2.scala:106) at org.apache.spark.util.AccumulatorV2$$anonfun$1.apply(AccumulatorV2.scala:106) at scala.Option.exists(Option.scala:240) at org.apache.spark.util.AccumulatorV2.toInfo(AccumulatorV2.scala:106) at org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1091) at org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1080) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.updateAccumulators(DAGScheduler.scala:1080) at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1183) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1647) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) {code} The issue is in wrapping name into _Some_ instead of _Option_ when creating accumulators. Patch is available. was: Creating accumulator with explicitly specified name equal to _null_, like the following {code:java} sparkContext.accumulator(0, null) {code} throws exception at runtime {code:none} ERROR | DAGScheduler | dag-scheduler-event-loop | Failed to update accumulators for task 0 java.lang.NullPointerException at org.apache.spark.util.AccumulatorV2$$anonfun$1.apply(AccumulatorV2.scala:106) at org.apache.spark.util.AccumulatorV2$$anonfun$1.apply(AccumulatorV2.scala:106) at scala.Option.exists(Option.scala:240) at org.apache.spark.util.AccumulatorV2.toInfo(AccumulatorV2.scala:106) at org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1091) at org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1080) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.updateAccumulators(DAGScheduler.scala:1080) at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1183) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1647) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) {code} The issue is in wrapping name into `Some` instead of `Option` when creating accumulators. Patch is available. > Regression with accumulator names when migrating from 1.6 to 2.x > > > Key: SPARK-20404 > URL: https://issues.apache.org/jira/browse/SPARK-20404 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.0.0, 2.0.1, 2.0.2, 2.1.0 >Reporter: Sergey Zhemzhitsky > Attachments: spark-context-accum-option.patch > > > Creating accumulator with explicitly specified name equal to _null_, like the > following > {code:java} > sparkContext.accumulator(0, null) > {code} > throws exception at runtime > {code:none} > ERROR | DAGScheduler | dag-scheduler-event-loop | Failed to update > accumulators for task 0 > java.lang.NullPointerException > at > org.apache.spark.util.AccumulatorV2$$anonfun$1.apply(AccumulatorV2.scala:106) > at > org.apache.spark.util.AccumulatorV2$$anonfun$1.apply(AccumulatorV2.scala:106) > at scala.Option.exists(Option.scala:240) > at org.apache.spark.util.AccumulatorV2.toInfo(AccumulatorV2.scala:106) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1091) > at >
[jira] [Updated] (SPARK-20404) Regression with accumulator names when migrating from 1.6 to 2.x
[ https://issues.apache.org/jira/browse/SPARK-20404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Zhemzhitsky updated SPARK-20404: --- Attachment: spark-context-accum-option.patch > Regression with accumulator names when migrating from 1.6 to 2.x > > > Key: SPARK-20404 > URL: https://issues.apache.org/jira/browse/SPARK-20404 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.0.0, 2.0.1, 2.0.2, 2.1.0 >Reporter: Sergey Zhemzhitsky > Attachments: spark-context-accum-option.patch > > > Creating accumulator with explicitly specified name equal to _null_, like the > following > {code:java} > sparkContext.accumulator(0, null) > {code} > throws exception at runtime > {code:none} > ERROR | DAGScheduler | dag-scheduler-event-loop | Failed to update > accumulators for task 0 > java.lang.NullPointerException > at > org.apache.spark.util.AccumulatorV2$$anonfun$1.apply(AccumulatorV2.scala:106) > at > org.apache.spark.util.AccumulatorV2$$anonfun$1.apply(AccumulatorV2.scala:106) > at scala.Option.exists(Option.scala:240) > at org.apache.spark.util.AccumulatorV2.toInfo(AccumulatorV2.scala:106) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1091) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1080) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > org.apache.spark.scheduler.DAGScheduler.updateAccumulators(DAGScheduler.scala:1080) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1183) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1647) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > {code} > The issue is in wrapping name into _Some_ instead of _Option_ when creating > accumulators. > Patch is available. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20404) Regression with accumulator names when migrating from 1.6 to 2.x
[ https://issues.apache.org/jira/browse/SPARK-20404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Zhemzhitsky updated SPARK-20404: --- Flags: Patch Description: Creating accumulator with explicitly specified name equal to _null_, like the following {code:java} sparkContext.accumulator(0, null) {code} throws exception at runtime {code:none} ERROR | DAGScheduler | dag-scheduler-event-loop | Failed to update accumulators for task 0 java.lang.NullPointerException at org.apache.spark.util.AccumulatorV2$$anonfun$1.apply(AccumulatorV2.scala:106) at org.apache.spark.util.AccumulatorV2$$anonfun$1.apply(AccumulatorV2.scala:106) at scala.Option.exists(Option.scala:240) at org.apache.spark.util.AccumulatorV2.toInfo(AccumulatorV2.scala:106) at org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1091) at org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1080) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.updateAccumulators(DAGScheduler.scala:1080) at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1183) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1647) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) {code} The issue is in wrapping name into `Some` instead of `Option` when creating accumulators. Patch is available. was: Creating accumulator with explicitly specified name equal to _null_, like the following {code:java} sparkContext.accumulator(0, null) {code} throws exception at runtime {code:none} ERROR | DAGScheduler | dag-scheduler-event-loop | Failed to update accumulators for task 0 java.lang.NullPointerException at org.apache.spark.util.AccumulatorV2$$anonfun$1.apply(AccumulatorV2.scala:106) at org.apache.spark.util.AccumulatorV2$$anonfun$1.apply(AccumulatorV2.scala:106) at scala.Option.exists(Option.scala:240) at org.apache.spark.util.AccumulatorV2.toInfo(AccumulatorV2.scala:106) at org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1091) at org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1080) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.updateAccumulators(DAGScheduler.scala:1080) at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1183) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1647) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) {code} The issue is in wrapping name into `Some` instead of `Option` when creating accumulators. > Regression with accumulator names when migrating from 1.6 to 2.x > > > Key: SPARK-20404 > URL: https://issues.apache.org/jira/browse/SPARK-20404 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.0.0, 2.0.1, 2.0.2, 2.1.0 >Reporter: Sergey Zhemzhitsky > Attachments: spark-context-accum-option.patch > > > Creating accumulator with explicitly specified name equal to _null_, like the > following > {code:java} > sparkContext.accumulator(0, null) > {code} > throws exception at runtime > {code:none} > ERROR | DAGScheduler | dag-scheduler-event-loop | Failed to update > accumulators for task 0 > java.lang.NullPointerException > at > org.apache.spark.util.AccumulatorV2$$anonfun$1.apply(AccumulatorV2.scala:106) > at > org.apache.spark.util.AccumulatorV2$$anonfun$1.apply(AccumulatorV2.scala:106) > at scala.Option.exists(Option.scala:240) > at org.apache.spark.util.AccumulatorV2.toInfo(AccumulatorV2.scala:106) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1091) > at >
[jira] [Updated] (SPARK-21549) Spark fails to complete job correctly in case of OutputFormat which do not write into hdfs
[ https://issues.apache.org/jira/browse/SPARK-21549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Zhemzhitsky updated SPARK-21549: --- Priority: Blocker (was: Critical) > Spark fails to complete job correctly in case of OutputFormat which do not > write into hdfs > -- > > Key: SPARK-21549 > URL: https://issues.apache.org/jira/browse/SPARK-21549 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 > Environment: spark 2.2.0 > scala 2.11 >Reporter: Sergey Zhemzhitsky >Priority: Blocker > > Spark fails to complete job correctly in case of custom OutputFormat > implementations. > There are OutputFormat implementations which do not need to use > *mapreduce.output.fileoutputformat.outputdir* standard hadoop property. > [But spark reads this property from the > configuration|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala#L79] > while setting up an OutputCommitter > {code:javascript} > val committer = FileCommitProtocol.instantiate( > className = classOf[HadoopMapReduceCommitProtocol].getName, > jobId = stageId.toString, > outputPath = conf.value.get("mapreduce.output.fileoutputformat.outputdir"), > isAppend = false).asInstanceOf[HadoopMapReduceCommitProtocol] > committer.setupJob(jobContext) > {code} > ... and then uses this property later on while [commiting the > job|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L132], > [aborting the > job|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L141], > [creating task's temp > path|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L95] > In that cases when the job completes then following exception is thrown > {code} > Can not create a Path from a null string > java.lang.IllegalArgumentException: Can not create a Path from a null string > at org.apache.hadoop.fs.Path.checkPathArg(Path.java:123) > at org.apache.hadoop.fs.Path.(Path.java:135) > at org.apache.hadoop.fs.Path.(Path.java:89) > at > org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.absPathStagingDir(HadoopMapReduceCommitProtocol.scala:58) > at > org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.abortJob(HadoopMapReduceCommitProtocol.scala:141) > at > org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.write(SparkHadoopMapReduceWriter.scala:106) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1085) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) > at > org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1084) > ... > {code} > So it seems that all the jobs which use OutputFormats which don't write data > into HDFS-compatible file systems are broken. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-21549) Spark fails to abort job correctly in case of custom OutputFormat implementations
Sergey Zhemzhitsky created SPARK-21549: -- Summary: Spark fails to abort job correctly in case of custom OutputFormat implementations Key: SPARK-21549 URL: https://issues.apache.org/jira/browse/SPARK-21549 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.2.0 Environment: spark 2.2.0 scala 2.11 Reporter: Sergey Zhemzhitsky Priority: Critical Spark fails to abort job correctly in case of custom OutputFormat implementations. There are OutputFormat implementations which do not need to use *mapreduce.output.fileoutputformat.outputdir* standard hadoop property. [But spark reads this property from the configuration.|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala#L79] while setting up an OutputCommitter {code:javascript} val committer = FileCommitProtocol.instantiate( className = classOf[HadoopMapReduceCommitProtocol].getName, jobId = stageId.toString, outputPath = conf.value.get("mapreduce.output.fileoutputformat.outputdir"), isAppend = false).asInstanceOf[HadoopMapReduceCommitProtocol] committer.setupJob(jobContext) {code} In that case if job fails Spark executes [committer.abortJob|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala#L106] {code:javascript} committer.abortJob(jobContext) {code} ... and fails with the following exception {code} Can not create a Path from a null string java.lang.IllegalArgumentException: Can not create a Path from a null string at org.apache.hadoop.fs.Path.checkPathArg(Path.java:123) at org.apache.hadoop.fs.Path.(Path.java:135) at org.apache.hadoop.fs.Path.(Path.java:89) at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.absPathStagingDir(HadoopMapReduceCommitProtocol.scala:58) at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.abortJob(HadoopMapReduceCommitProtocol.scala:141) at org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.write(SparkHadoopMapReduceWriter.scala:106) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1085) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1084) {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21549) Spark fails to abort job correctly in case of custom OutputFormat implementations
[ https://issues.apache.org/jira/browse/SPARK-21549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Zhemzhitsky updated SPARK-21549: --- Description: Spark fails to abort job correctly in case of custom OutputFormat implementations. There are OutputFormat implementations which do not need to use *mapreduce.output.fileoutputformat.outputdir* standard hadoop property. [But spark reads this property from the configuration|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala#L79] while setting up an OutputCommitter {code:javascript} val committer = FileCommitProtocol.instantiate( className = classOf[HadoopMapReduceCommitProtocol].getName, jobId = stageId.toString, outputPath = conf.value.get("mapreduce.output.fileoutputformat.outputdir"), isAppend = false).asInstanceOf[HadoopMapReduceCommitProtocol] committer.setupJob(jobContext) {code} In that case if job fails Spark executes [committer.abortJob|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala#L106] {code:javascript} committer.abortJob(jobContext) {code} ... and fails with the following exception {code} Can not create a Path from a null string java.lang.IllegalArgumentException: Can not create a Path from a null string at org.apache.hadoop.fs.Path.checkPathArg(Path.java:123) at org.apache.hadoop.fs.Path.(Path.java:135) at org.apache.hadoop.fs.Path.(Path.java:89) at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.absPathStagingDir(HadoopMapReduceCommitProtocol.scala:58) at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.abortJob(HadoopMapReduceCommitProtocol.scala:141) at org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.write(SparkHadoopMapReduceWriter.scala:106) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1085) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1084) {code} was: Spark fails to abort job correctly in case of custom OutputFormat implementations. There are OutputFormat implementations which do not need to use *mapreduce.output.fileoutputformat.outputdir* standard hadoop property. [But spark reads this property from the configuration.|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala#L79] while setting up an OutputCommitter {code:javascript} val committer = FileCommitProtocol.instantiate( className = classOf[HadoopMapReduceCommitProtocol].getName, jobId = stageId.toString, outputPath = conf.value.get("mapreduce.output.fileoutputformat.outputdir"), isAppend = false).asInstanceOf[HadoopMapReduceCommitProtocol] committer.setupJob(jobContext) {code} In that case if job fails Spark executes [committer.abortJob|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala#L106] {code:javascript} committer.abortJob(jobContext) {code} ... and fails with the following exception {code} Can not create a Path from a null string java.lang.IllegalArgumentException: Can not create a Path from a null string at org.apache.hadoop.fs.Path.checkPathArg(Path.java:123) at org.apache.hadoop.fs.Path.(Path.java:135) at org.apache.hadoop.fs.Path.(Path.java:89) at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.absPathStagingDir(HadoopMapReduceCommitProtocol.scala:58) at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.abortJob(HadoopMapReduceCommitProtocol.scala:141) at org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.write(SparkHadoopMapReduceWriter.scala:106) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1085) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1084)
[jira] [Updated] (SPARK-21549) Spark fails to complete job correctly in case of custom OutputFormat implementations
[ https://issues.apache.org/jira/browse/SPARK-21549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Zhemzhitsky updated SPARK-21549: --- Summary: Spark fails to complete job correctly in case of custom OutputFormat implementations (was: Spark fails to abort job correctly in case of custom OutputFormat implementations) > Spark fails to complete job correctly in case of custom OutputFormat > implementations > > > Key: SPARK-21549 > URL: https://issues.apache.org/jira/browse/SPARK-21549 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 > Environment: spark 2.2.0 > scala 2.11 >Reporter: Sergey Zhemzhitsky >Priority: Critical > > Spark fails to abort job correctly in case of custom OutputFormat > implementations. > There are OutputFormat implementations which do not need to use > *mapreduce.output.fileoutputformat.outputdir* standard hadoop property. > [But spark reads this property from the > configuration|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala#L79] > while setting up an OutputCommitter > {code:javascript} > val committer = FileCommitProtocol.instantiate( > className = classOf[HadoopMapReduceCommitProtocol].getName, > jobId = stageId.toString, > outputPath = conf.value.get("mapreduce.output.fileoutputformat.outputdir"), > isAppend = false).asInstanceOf[HadoopMapReduceCommitProtocol] > committer.setupJob(jobContext) > {code} > In that case if job fails Spark executes > [committer.abortJob|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala#L106] > {code:javascript} > committer.abortJob(jobContext) > {code} > ... and fails with the following exception > {code} > Can not create a Path from a null string > java.lang.IllegalArgumentException: Can not create a Path from a null string > at org.apache.hadoop.fs.Path.checkPathArg(Path.java:123) > at org.apache.hadoop.fs.Path.(Path.java:135) > at org.apache.hadoop.fs.Path.(Path.java:89) > at > org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.absPathStagingDir(HadoopMapReduceCommitProtocol.scala:58) > at > org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.abortJob(HadoopMapReduceCommitProtocol.scala:141) > at > org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.write(SparkHadoopMapReduceWriter.scala:106) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1085) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) > at > org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1084) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21549) Spark fails to complete job correctly in case of OutputFormat which do not write into hdfs
[ https://issues.apache.org/jira/browse/SPARK-21549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Zhemzhitsky updated SPARK-21549: --- Priority: Critical (was: Blocker) > Spark fails to complete job correctly in case of OutputFormat which do not > write into hdfs > -- > > Key: SPARK-21549 > URL: https://issues.apache.org/jira/browse/SPARK-21549 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 > Environment: spark 2.2.0 > scala 2.11 >Reporter: Sergey Zhemzhitsky >Priority: Critical > > Spark fails to complete job correctly in case of custom OutputFormat > implementations. > There are OutputFormat implementations which do not need to use > *mapreduce.output.fileoutputformat.outputdir* standard hadoop property. > [But spark reads this property from the > configuration|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala#L79] > while setting up an OutputCommitter > {code:javascript} > val committer = FileCommitProtocol.instantiate( > className = classOf[HadoopMapReduceCommitProtocol].getName, > jobId = stageId.toString, > outputPath = conf.value.get("mapreduce.output.fileoutputformat.outputdir"), > isAppend = false).asInstanceOf[HadoopMapReduceCommitProtocol] > committer.setupJob(jobContext) > {code} > ... and then uses this property later on while [commiting the > job|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L132], > [aborting the > job|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L141], > [creating task's temp > path|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L95] > In that cases when the job completes then following exception is thrown > {code} > Can not create a Path from a null string > java.lang.IllegalArgumentException: Can not create a Path from a null string > at org.apache.hadoop.fs.Path.checkPathArg(Path.java:123) > at org.apache.hadoop.fs.Path.(Path.java:135) > at org.apache.hadoop.fs.Path.(Path.java:89) > at > org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.absPathStagingDir(HadoopMapReduceCommitProtocol.scala:58) > at > org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.abortJob(HadoopMapReduceCommitProtocol.scala:141) > at > org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.write(SparkHadoopMapReduceWriter.scala:106) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1085) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) > at > org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1084) > ... > {code} > So it seems that all the jobs which use OutputFormats which don't write data > into HDFS-compatible file systems are broken. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21549) Spark fails to complete job correctly in case of OutputFormat which do not write into hdfs
[ https://issues.apache.org/jira/browse/SPARK-21549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Zhemzhitsky updated SPARK-21549: --- Summary: Spark fails to complete job correctly in case of OutputFormat which do not write into hdfs (was: Spark fails to complete job correctly in case of custom OutputFormat implementations) > Spark fails to complete job correctly in case of OutputFormat which do not > write into hdfs > -- > > Key: SPARK-21549 > URL: https://issues.apache.org/jira/browse/SPARK-21549 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 > Environment: spark 2.2.0 > scala 2.11 >Reporter: Sergey Zhemzhitsky >Priority: Critical > > Spark fails to complete job correctly in case of custom OutputFormat > implementations. > There are OutputFormat implementations which do not need to use > *mapreduce.output.fileoutputformat.outputdir* standard hadoop property. > [But spark reads this property from the > configuration|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala#L79] > while setting up an OutputCommitter > {code:javascript} > val committer = FileCommitProtocol.instantiate( > className = classOf[HadoopMapReduceCommitProtocol].getName, > jobId = stageId.toString, > outputPath = conf.value.get("mapreduce.output.fileoutputformat.outputdir"), > isAppend = false).asInstanceOf[HadoopMapReduceCommitProtocol] > committer.setupJob(jobContext) > {code} > ... and then uses this property later on while [commiting the > job|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L132], > [aborting the > job|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L141], > [creating task's temp > path|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L95] > In that cases when the job completes then following exception is thrown > {code} > Can not create a Path from a null string > java.lang.IllegalArgumentException: Can not create a Path from a null string > at org.apache.hadoop.fs.Path.checkPathArg(Path.java:123) > at org.apache.hadoop.fs.Path.(Path.java:135) > at org.apache.hadoop.fs.Path.(Path.java:89) > at > org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.absPathStagingDir(HadoopMapReduceCommitProtocol.scala:58) > at > org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.abortJob(HadoopMapReduceCommitProtocol.scala:141) > at > org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.write(SparkHadoopMapReduceWriter.scala:106) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1085) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) > at > org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1084) > ... > {code} > So it seems that all the jobs which use OutputFormats which don't write data > into HDFS-compatible file systems are broken. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21549) Spark fails to complete job correctly in case of OutputFormat which do not write into hdfs
[ https://issues.apache.org/jira/browse/SPARK-21549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Zhemzhitsky updated SPARK-21549: --- Priority: Blocker (was: Critical) > Spark fails to complete job correctly in case of OutputFormat which do not > write into hdfs > -- > > Key: SPARK-21549 > URL: https://issues.apache.org/jira/browse/SPARK-21549 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 > Environment: spark 2.2.0 > scala 2.11 >Reporter: Sergey Zhemzhitsky >Priority: Blocker > > Spark fails to complete job correctly in case of custom OutputFormat > implementations. > There are OutputFormat implementations which do not need to use > *mapreduce.output.fileoutputformat.outputdir* standard hadoop property. > [But spark reads this property from the > configuration|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala#L79] > while setting up an OutputCommitter > {code:javascript} > val committer = FileCommitProtocol.instantiate( > className = classOf[HadoopMapReduceCommitProtocol].getName, > jobId = stageId.toString, > outputPath = conf.value.get("mapreduce.output.fileoutputformat.outputdir"), > isAppend = false).asInstanceOf[HadoopMapReduceCommitProtocol] > committer.setupJob(jobContext) > {code} > ... and then uses this property later on while [commiting the > job|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L132], > [aborting the > job|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L141], > [creating task's temp > path|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L95] > In that cases when the job completes then following exception is thrown > {code} > Can not create a Path from a null string > java.lang.IllegalArgumentException: Can not create a Path from a null string > at org.apache.hadoop.fs.Path.checkPathArg(Path.java:123) > at org.apache.hadoop.fs.Path.(Path.java:135) > at org.apache.hadoop.fs.Path.(Path.java:89) > at > org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.absPathStagingDir(HadoopMapReduceCommitProtocol.scala:58) > at > org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.abortJob(HadoopMapReduceCommitProtocol.scala:141) > at > org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.write(SparkHadoopMapReduceWriter.scala:106) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1085) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) > at > org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1084) > ... > {code} > So it seems that all the jobs which use OutputFormats which don't write data > into HDFS-compatible file systems are broken. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21549) Spark fails to complete job correctly in case of custom OutputFormat implementations
[ https://issues.apache.org/jira/browse/SPARK-21549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Zhemzhitsky updated SPARK-21549: --- Description: Spark fails to complete job correctly in case of custom OutputFormat implementations. There are OutputFormat implementations which do not need to use *mapreduce.output.fileoutputformat.outputdir* standard hadoop property. [But spark reads this property from the configuration|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala#L79] while setting up an OutputCommitter {code:javascript} val committer = FileCommitProtocol.instantiate( className = classOf[HadoopMapReduceCommitProtocol].getName, jobId = stageId.toString, outputPath = conf.value.get("mapreduce.output.fileoutputformat.outputdir"), isAppend = false).asInstanceOf[HadoopMapReduceCommitProtocol] committer.setupJob(jobContext) {code} ... and then uses this property later on while [commiting the job|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L132], [aborting the job|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L141], [creating task's temp path|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L95] In that cases when the job completes then following exception is thrown {code} Can not create a Path from a null string java.lang.IllegalArgumentException: Can not create a Path from a null string at org.apache.hadoop.fs.Path.checkPathArg(Path.java:123) at org.apache.hadoop.fs.Path.(Path.java:135) at org.apache.hadoop.fs.Path.(Path.java:89) at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.absPathStagingDir(HadoopMapReduceCommitProtocol.scala:58) at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.abortJob(HadoopMapReduceCommitProtocol.scala:141) at org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.write(SparkHadoopMapReduceWriter.scala:106) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1085) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1084) ... {code} So it seems that all the jobs which use OutputFormats which don't write data into HDFS-compatible file systems are broken. was: Spark fails to abort job correctly in case of custom OutputFormat implementations. There are OutputFormat implementations which do not need to use *mapreduce.output.fileoutputformat.outputdir* standard hadoop property. [But spark reads this property from the configuration|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala#L79] while setting up an OutputCommitter {code:javascript} val committer = FileCommitProtocol.instantiate( className = classOf[HadoopMapReduceCommitProtocol].getName, jobId = stageId.toString, outputPath = conf.value.get("mapreduce.output.fileoutputformat.outputdir"), isAppend = false).asInstanceOf[HadoopMapReduceCommitProtocol] committer.setupJob(jobContext) {code} In that case if job fails Spark executes [committer.abortJob|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala#L106] {code:javascript} committer.abortJob(jobContext) {code} ... and fails with the following exception {code} Can not create a Path from a null string java.lang.IllegalArgumentException: Can not create a Path from a null string at org.apache.hadoop.fs.Path.checkPathArg(Path.java:123) at org.apache.hadoop.fs.Path.(Path.java:135) at org.apache.hadoop.fs.Path.(Path.java:89) at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.absPathStagingDir(HadoopMapReduceCommitProtocol.scala:58) at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.abortJob(HadoopMapReduceCommitProtocol.scala:141) at org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.write(SparkHadoopMapReduceWriter.scala:106) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1085) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085) at
[jira] [Commented] (SPARK-20404) Regression with accumulator names when migrating from 1.6 to 2.x
[ https://issues.apache.org/jira/browse/SPARK-20404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15980901#comment-15980901 ] Sergey Zhemzhitsky commented on SPARK-20404: [~srowen] Here it is: https://github.com/apache/spark/pull/17740 > Regression with accumulator names when migrating from 1.6 to 2.x > > > Key: SPARK-20404 > URL: https://issues.apache.org/jira/browse/SPARK-20404 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.0.0, 2.0.1, 2.0.2, 2.1.0 > Environment: Spark: 2.1 > Scala: 2.11 > Spark master: local >Reporter: Sergey Zhemzhitsky > Attachments: spark-context-accum-option.patch > > > Creating accumulator with explicitly specified name equal to _null_, like the > following > {code:java} > sparkContext.accumulator(0, null) > {code} > throws exception at runtime > {code:none} > ERROR | DAGScheduler | dag-scheduler-event-loop | Failed to update > accumulators for task 0 > java.lang.NullPointerException > at > org.apache.spark.util.AccumulatorV2$$anonfun$1.apply(AccumulatorV2.scala:106) > at > org.apache.spark.util.AccumulatorV2$$anonfun$1.apply(AccumulatorV2.scala:106) > at scala.Option.exists(Option.scala:240) > at org.apache.spark.util.AccumulatorV2.toInfo(AccumulatorV2.scala:106) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1091) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1080) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > org.apache.spark.scheduler.DAGScheduler.updateAccumulators(DAGScheduler.scala:1080) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1183) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1647) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > {code} > The issue is in wrapping name into _Some_ instead of _Option_ when creating > accumulators. > Patch is available. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20886) HadoopMapReduceCommitProtocol to fail with message if FileOutputCommitter.getWorkPath==null
[ https://issues.apache.org/jira/browse/SPARK-20886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16171227#comment-16171227 ] Sergey Zhemzhitsky commented on SPARK-20886: [~ste...@apache.org], [~hyukjin.kwon], does this patch also fixes NPE described in SPARK-21549? > HadoopMapReduceCommitProtocol to fail with message if > FileOutputCommitter.getWorkPath==null > --- > > Key: SPARK-20886 > URL: https://issues.apache.org/jira/browse/SPARK-20886 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.3.0 >Reporter: Steve Loughran >Assignee: Steve Loughran >Priority: Trivial > Fix For: 2.3.0 > > > This is minor, and the root cause is my fault *elsewhere*, but its the patch > I used to track down the problem. > If {{HadoopMapReduceCommitProtocol}} has a {{FileOutputCommitter}} for > committing things, and *somehow* that's been configured with a > {{JobAttemptContext}}, not a {{TaskAttemptContext}}, then the committer NPEs. > A {{require()}} statement can validate the working path and so point the > blame at whoever's code is confused. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21549) Spark fails to complete job correctly in case of OutputFormat which do not write into hdfs
[ https://issues.apache.org/jira/browse/SPARK-21549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16173755#comment-16173755 ] Sergey Zhemzhitsky commented on SPARK-21549: [~ste...@apache.org] I've updated PR to prevent using FileSystems at all. Instead, there is just an additional check whether there are absolute files to rename during commit. > Spark fails to complete job correctly in case of OutputFormat which do not > write into hdfs > -- > > Key: SPARK-21549 > URL: https://issues.apache.org/jira/browse/SPARK-21549 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 > Environment: spark 2.2.0 > scala 2.11 >Reporter: Sergey Zhemzhitsky > > Spark fails to complete job correctly in case of custom OutputFormat > implementations. > There are OutputFormat implementations which do not need to use > *mapreduce.output.fileoutputformat.outputdir* standard hadoop property. > [But spark reads this property from the > configuration|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala#L79] > while setting up an OutputCommitter > {code:javascript} > val committer = FileCommitProtocol.instantiate( > className = classOf[HadoopMapReduceCommitProtocol].getName, > jobId = stageId.toString, > outputPath = conf.value.get("mapreduce.output.fileoutputformat.outputdir"), > isAppend = false).asInstanceOf[HadoopMapReduceCommitProtocol] > committer.setupJob(jobContext) > {code} > ... and then uses this property later on while [commiting the > job|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L132], > [aborting the > job|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L141], > [creating task's temp > path|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L95] > In that cases when the job completes then following exception is thrown > {code} > Can not create a Path from a null string > java.lang.IllegalArgumentException: Can not create a Path from a null string > at org.apache.hadoop.fs.Path.checkPathArg(Path.java:123) > at org.apache.hadoop.fs.Path.(Path.java:135) > at org.apache.hadoop.fs.Path.(Path.java:89) > at > org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.absPathStagingDir(HadoopMapReduceCommitProtocol.scala:58) > at > org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.abortJob(HadoopMapReduceCommitProtocol.scala:141) > at > org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.write(SparkHadoopMapReduceWriter.scala:106) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1085) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) > at > org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1084) > ... > {code} > So it seems that all the jobs which use OutputFormats which don't write data > into HDFS-compatible file systems are broken. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21549) Spark fails to complete job correctly in case of OutputFormat which do not write into hdfs
[ https://issues.apache.org/jira/browse/SPARK-21549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16173184#comment-16173184 ] Sergey Zhemzhitsky commented on SPARK-21549: [~mridulm80], [~WeiqingYang], [~ste...@apache.org], I've implemented the fix in this PR (https://github.com/apache/spark/pull/19294), which sets user's current working directory (which is typically her home directory in case of distributed filesystems) as output directory. The patch allows using OutputFormats which write to external systems, databases, etc. by means of RDD API. I far as I understand the requirement for output paths to be specified is only necessary to allow files to be committed to an absolute output location, that is not the case for output formats which write data to external systems. So using user's working directory for such situations seems to be ok. > Spark fails to complete job correctly in case of OutputFormat which do not > write into hdfs > -- > > Key: SPARK-21549 > URL: https://issues.apache.org/jira/browse/SPARK-21549 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 > Environment: spark 2.2.0 > scala 2.11 >Reporter: Sergey Zhemzhitsky > > Spark fails to complete job correctly in case of custom OutputFormat > implementations. > There are OutputFormat implementations which do not need to use > *mapreduce.output.fileoutputformat.outputdir* standard hadoop property. > [But spark reads this property from the > configuration|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala#L79] > while setting up an OutputCommitter > {code:javascript} > val committer = FileCommitProtocol.instantiate( > className = classOf[HadoopMapReduceCommitProtocol].getName, > jobId = stageId.toString, > outputPath = conf.value.get("mapreduce.output.fileoutputformat.outputdir"), > isAppend = false).asInstanceOf[HadoopMapReduceCommitProtocol] > committer.setupJob(jobContext) > {code} > ... and then uses this property later on while [commiting the > job|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L132], > [aborting the > job|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L141], > [creating task's temp > path|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L95] > In that cases when the job completes then following exception is thrown > {code} > Can not create a Path from a null string > java.lang.IllegalArgumentException: Can not create a Path from a null string > at org.apache.hadoop.fs.Path.checkPathArg(Path.java:123) > at org.apache.hadoop.fs.Path.(Path.java:135) > at org.apache.hadoop.fs.Path.(Path.java:89) > at > org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.absPathStagingDir(HadoopMapReduceCommitProtocol.scala:58) > at > org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.abortJob(HadoopMapReduceCommitProtocol.scala:141) > at > org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.write(SparkHadoopMapReduceWriter.scala:106) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1085) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) > at > org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1084) > ... > {code} > So it seems that all the jobs which use OutputFormats which don't write data > into HDFS-compatible file systems are broken. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-22184) GraphX fails in case of insufficient memory and checkpoints enabled
Sergey Zhemzhitsky created SPARK-22184: -- Summary: GraphX fails in case of insufficient memory and checkpoints enabled Key: SPARK-22184 URL: https://issues.apache.org/jira/browse/SPARK-22184 Project: Spark Issue Type: Bug Components: GraphX Affects Versions: 2.2.0 Environment: spark 2.2.0 scala 2.11 Reporter: Sergey Zhemzhitsky Fix For: 2.2.1 GraphX fails with FileNotFoundException in case of insufficient memory when checkpoints are enabled. Here is the stacktrace {code} Job aborted due to stage failure: Task creation failed: java.io.FileNotFoundException: File file:/tmp/spark-90119695-a126-47b5-b047-d656fee10c17/9b16e2a9-6c80-45eb-8736-bbb6eb840146/rdd-28/part-0 does not exist java.io.FileNotFoundException: File file:/tmp/spark-90119695-a126-47b5-b047-d656fee10c17/9b16e2a9-6c80-45eb-8736-bbb6eb840146/rdd-28/part-0 does not exist at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:539) at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:752) at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:529) at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:409) at org.apache.spark.rdd.ReliableCheckpointRDD.getPreferredLocations(ReliableCheckpointRDD.scala:89) at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:274) at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:274) at scala.Option.map(Option.scala:146) at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:274) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1697) ... {code} As GraphX uses cached RDDs intensively, the issue is only reproducible when previously cached and checkpointed Vertex and Edge RDDs are evicted from memory and forced to be read from disk. For testing purposes the following parameters may be set to emulate low memory environment {code} val sparkConf = new SparkConf() .set("spark.graphx.pregel.checkpointInterval", "2") // set testing memory to evict cached RDDs from it and force // reading checkpointed RDDs from disk .set("spark.testing.reservedMemory", "128") .set("spark.testing.memory", "256") {code} This issue also includes SPARK-22150 and cannot be fixed until SPARK-22150 is fixed too. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22184) GraphX fails in case of insufficient memory and checkpoints enabled
[ https://issues.apache.org/jira/browse/SPARK-22184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188306#comment-16188306 ] Sergey Zhemzhitsky commented on SPARK-22184: Hi [~sowen], Should I merge changes of PR that fixes this issue into SPARK-22150 and provide a single PR which includes fixes for standard checkpointers as well as GraphX ones? I'm asking because in case of GraphX there are also changes in Pregel and I believe that PR with just changes of PeriodicCheckpointer (without GraphX) can be reviewed and merged into master faster than the one which includes all the changes. What do you think? > GraphX fails in case of insufficient memory and checkpoints enabled > --- > > Key: SPARK-22184 > URL: https://issues.apache.org/jira/browse/SPARK-22184 > Project: Spark > Issue Type: Bug > Components: GraphX >Affects Versions: 2.2.0 > Environment: spark 2.2.0 > scala 2.11 >Reporter: Sergey Zhemzhitsky > > GraphX fails with FileNotFoundException in case of insufficient memory when > checkpoints are enabled. > Here is the stacktrace > {code} > Job aborted due to stage failure: Task creation failed: > java.io.FileNotFoundException: File > file:/tmp/spark-90119695-a126-47b5-b047-d656fee10c17/9b16e2a9-6c80-45eb-8736-bbb6eb840146/rdd-28/part-0 > does not exist > java.io.FileNotFoundException: File > file:/tmp/spark-90119695-a126-47b5-b047-d656fee10c17/9b16e2a9-6c80-45eb-8736-bbb6eb840146/rdd-28/part-0 > does not exist > at > org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:539) > at > org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:752) > at > org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:529) > at > org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:409) > at > org.apache.spark.rdd.ReliableCheckpointRDD.getPreferredLocations(ReliableCheckpointRDD.scala:89) > at > org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:274) > at > org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:274) > at scala.Option.map(Option.scala:146) > at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:274) > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1697) > ... > {code} > As GraphX uses cached RDDs intensively, the issue is only reproducible when > previously cached and checkpointed Vertex and Edge RDDs are evicted from > memory and forced to be read from disk. > For testing purposes the following parameters may be set to emulate low > memory environment > {code} > val sparkConf = new SparkConf() > .set("spark.graphx.pregel.checkpointInterval", "2") > // set testing memory to evict cached RDDs from it and force > // reading checkpointed RDDs from disk > .set("spark.testing.reservedMemory", "128") > .set("spark.testing.memory", "256") > {code} > This issue also includes SPARK-22150 and cannot be fixed until SPARK-22150 is > fixed too. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Reopened] (SPARK-22184) GraphX fails in case of insufficient memory and checkpoints enabled
[ https://issues.apache.org/jira/browse/SPARK-22184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Zhemzhitsky reopened SPARK-22184: > GraphX fails in case of insufficient memory and checkpoints enabled > --- > > Key: SPARK-22184 > URL: https://issues.apache.org/jira/browse/SPARK-22184 > Project: Spark > Issue Type: Bug > Components: GraphX >Affects Versions: 2.2.0 > Environment: spark 2.2.0 > scala 2.11 >Reporter: Sergey Zhemzhitsky > > GraphX fails with FileNotFoundException in case of insufficient memory when > checkpoints are enabled. > Here is the stacktrace > {code} > Job aborted due to stage failure: Task creation failed: > java.io.FileNotFoundException: File > file:/tmp/spark-90119695-a126-47b5-b047-d656fee10c17/9b16e2a9-6c80-45eb-8736-bbb6eb840146/rdd-28/part-0 > does not exist > java.io.FileNotFoundException: File > file:/tmp/spark-90119695-a126-47b5-b047-d656fee10c17/9b16e2a9-6c80-45eb-8736-bbb6eb840146/rdd-28/part-0 > does not exist > at > org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:539) > at > org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:752) > at > org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:529) > at > org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:409) > at > org.apache.spark.rdd.ReliableCheckpointRDD.getPreferredLocations(ReliableCheckpointRDD.scala:89) > at > org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:274) > at > org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:274) > at scala.Option.map(Option.scala:146) > at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:274) > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1697) > ... > {code} > As GraphX uses cached RDDs intensively, the issue is only reproducible when > previously cached and checkpointed Vertex and Edge RDDs are evicted from > memory and forced to be read from disk. > For testing purposes the following parameters may be set to emulate low > memory environment > {code} > val sparkConf = new SparkConf() > .set("spark.graphx.pregel.checkpointInterval", "2") > // set testing memory to evict cached RDDs from it and force > // reading checkpointed RDDs from disk > .set("spark.testing.reservedMemory", "128") > .set("spark.testing.memory", "256") > {code} > This issue also includes SPARK-22150 and cannot be fixed until SPARK-22150 is > fixed too. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-22184) GraphX fails in case of insufficient memory and checkpoints enabled
[ https://issues.apache.org/jira/browse/SPARK-22184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188306#comment-16188306 ] Sergey Zhemzhitsky edited comment on SPARK-22184 at 10/2/17 5:27 PM: - Hi [~sowen], Would you mind if I reopen this issue, until it is clear enough whether it is better to include GraphX fixes into SPARK-22150 or keep the changes separate from each other? Could you please suggest, should I merge changes of PR that fixes this issue into SPARK-22150 and provide a single PR which includes fixes for standard checkpointers as well as GraphX ones? I'm just asking because in case of GraphX there are also changes in Pregel and I believe that PR (SPARK-22150) with just changes of PeriodicCheckpointer (without GraphX) can be reviewed and merged into master faster than the one which includes all the changes. What do you think? was (Author: szhemzhitsky): Hi [~sowen], Should I merge changes of PR that fixes this issue into SPARK-22150 and provide a single PR which includes fixes for standard checkpointers as well as GraphX ones? I'm asking because in case of GraphX there are also changes in Pregel and I believe that PR with just changes of PeriodicCheckpointer (without GraphX) can be reviewed and merged into master faster than the one which includes all the changes. What do you think? > GraphX fails in case of insufficient memory and checkpoints enabled > --- > > Key: SPARK-22184 > URL: https://issues.apache.org/jira/browse/SPARK-22184 > Project: Spark > Issue Type: Bug > Components: GraphX >Affects Versions: 2.2.0 > Environment: spark 2.2.0 > scala 2.11 >Reporter: Sergey Zhemzhitsky > > GraphX fails with FileNotFoundException in case of insufficient memory when > checkpoints are enabled. > Here is the stacktrace > {code} > Job aborted due to stage failure: Task creation failed: > java.io.FileNotFoundException: File > file:/tmp/spark-90119695-a126-47b5-b047-d656fee10c17/9b16e2a9-6c80-45eb-8736-bbb6eb840146/rdd-28/part-0 > does not exist > java.io.FileNotFoundException: File > file:/tmp/spark-90119695-a126-47b5-b047-d656fee10c17/9b16e2a9-6c80-45eb-8736-bbb6eb840146/rdd-28/part-0 > does not exist > at > org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:539) > at > org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:752) > at > org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:529) > at > org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:409) > at > org.apache.spark.rdd.ReliableCheckpointRDD.getPreferredLocations(ReliableCheckpointRDD.scala:89) > at > org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:274) > at > org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:274) > at scala.Option.map(Option.scala:146) > at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:274) > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1697) > ... > {code} > As GraphX uses cached RDDs intensively, the issue is only reproducible when > previously cached and checkpointed Vertex and Edge RDDs are evicted from > memory and forced to be read from disk. > For testing purposes the following parameters may be set to emulate low > memory environment > {code} > val sparkConf = new SparkConf() > .set("spark.graphx.pregel.checkpointInterval", "2") > // set testing memory to evict cached RDDs from it and force > // reading checkpointed RDDs from disk > .set("spark.testing.reservedMemory", "128") > .set("spark.testing.memory", "256") > {code} > This issue also includes SPARK-22150 and cannot be fixed until SPARK-22150 is > fixed too. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-22150) PeriodicCheckpointer fails with FileNotFoundException in case of dependant RDDs
Sergey Zhemzhitsky created SPARK-22150: -- Summary: PeriodicCheckpointer fails with FileNotFoundException in case of dependant RDDs Key: SPARK-22150 URL: https://issues.apache.org/jira/browse/SPARK-22150 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.2.0, 2.1.1, 2.1.0, 2.0.2, 2.0.1, 2.0.0, 1.6.3, 1.6.2, 1.6.1, 1.6.0, 1.5.2, 1.5.1, 1.5.0 Environment: spark 2.2.0 scala 2.11 Reporter: Sergey Zhemzhitsky PeriodicCheckpointer fails with FileNotFoundException in case of checkpointing dependant RDDs (consider iterative algorithms), i.e. when the RDD to checkpoint depends on already checkpointed RDD. Here is the exception {code} Job aborted due to stage failure: Task creation failed: java.io.FileNotFoundException: File file:/tmp/spark-e2046e49-5d8a-4633-b525-52a00bab32f0/17d9b542-d48d-4f23-b368-3957a42644a6/rdd-20/part-0 does not exist java.io.FileNotFoundException: File file:/tmp/spark-e2046e49-5d8a-4633-b525-52a00bab32f0/17d9b542-d48d-4f23-b368-3957a42644a6/rdd-20/part-0 does not exist at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:539) at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:752) at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:529) at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:409) at org.apache.spark.rdd.ReliableCheckpointRDD.getPreferredLocations(ReliableCheckpointRDD.scala:89) at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:274) at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:274) at scala.Option.map(Option.scala:146) at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:274) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1697) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply$mcVI$sp(DAGScheduler.scala:1708) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply(DAGScheduler.scala:1707) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply(DAGScheduler.scala:1707) at scala.collection.immutable.List.foreach(List.scala:381) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1707) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1705) at scala.collection.immutable.List.foreach(List.scala:381) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1705) at org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1671) at org.apache.spark.scheduler.DAGScheduler$$anonfun$16.apply(DAGScheduler.scala:989) at org.apache.spark.scheduler.DAGScheduler$$anonfun$16.apply(DAGScheduler.scala:987) {code} The issue seems to be in this [piece of code|https://github.com/apache/spark/blob/0a7f5f2798b6e8b2ba15e8b3aa07d5953ad1c695/core/src/main/scala/org/apache/spark/util/PeriodicCheckpointer.scala#L94] {code:java} if (checkpointInterval != -1 && (updateCount % checkpointInterval) == 0 && sc.getCheckpointDir.nonEmpty) { // Add new checkpoint before removing old checkpoints. checkpoint(newData) checkpointQueue.enqueue(newData) // Remove checkpoints before the latest one. var canDelete = true while (checkpointQueue.size > 1 && canDelete) { // Delete the oldest checkpoint only if the next checkpoint exists. if (isCheckpointed(checkpointQueue.head)) { removeCheckpointFile() } else { canDelete = false } } } {code} Given that _checkpointQueue.head_ is checkpointed and materialized and _newData_ depends on _checkpointQueue.head_, then the exception happens on action of RDDs representing _newData_ -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21549) Spark fails to complete job correctly in case of OutputFormat which do not write into hdfs
[ https://issues.apache.org/jira/browse/SPARK-21549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Zhemzhitsky updated SPARK-21549: --- Fix Version/s: 2.2.1 > Spark fails to complete job correctly in case of OutputFormat which do not > write into hdfs > -- > > Key: SPARK-21549 > URL: https://issues.apache.org/jira/browse/SPARK-21549 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 > Environment: spark 2.2.0 > scala 2.11 >Reporter: Sergey Zhemzhitsky > Fix For: 2.2.1 > > > Spark fails to complete job correctly in case of custom OutputFormat > implementations. > There are OutputFormat implementations which do not need to use > *mapreduce.output.fileoutputformat.outputdir* standard hadoop property. > [But spark reads this property from the > configuration|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala#L79] > while setting up an OutputCommitter > {code:javascript} > val committer = FileCommitProtocol.instantiate( > className = classOf[HadoopMapReduceCommitProtocol].getName, > jobId = stageId.toString, > outputPath = conf.value.get("mapreduce.output.fileoutputformat.outputdir"), > isAppend = false).asInstanceOf[HadoopMapReduceCommitProtocol] > committer.setupJob(jobContext) > {code} > ... and then uses this property later on while [commiting the > job|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L132], > [aborting the > job|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L141], > [creating task's temp > path|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L95] > In that cases when the job completes then following exception is thrown > {code} > Can not create a Path from a null string > java.lang.IllegalArgumentException: Can not create a Path from a null string > at org.apache.hadoop.fs.Path.checkPathArg(Path.java:123) > at org.apache.hadoop.fs.Path.(Path.java:135) > at org.apache.hadoop.fs.Path.(Path.java:89) > at > org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.absPathStagingDir(HadoopMapReduceCommitProtocol.scala:58) > at > org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.abortJob(HadoopMapReduceCommitProtocol.scala:141) > at > org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.write(SparkHadoopMapReduceWriter.scala:106) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1085) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) > at > org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1084) > ... > {code} > So it seems that all the jobs which use OutputFormats which don't write data > into HDFS-compatible file systems are broken. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21549) Spark fails to complete job correctly in case of OutputFormat which do not write into hdfs
[ https://issues.apache.org/jira/browse/SPARK-21549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16135545#comment-16135545 ] Sergey Zhemzhitsky commented on SPARK-21549: [~mridulm80], [~WeiqingYang] does it make sense to implement the provided workaround with valid and writable directory just within SparkHadoopMapRedceWriter if the needed property is not set, to prevent affecting all the jobs which don't write to hdfs, at least until there is a better solution? > Spark fails to complete job correctly in case of OutputFormat which do not > write into hdfs > -- > > Key: SPARK-21549 > URL: https://issues.apache.org/jira/browse/SPARK-21549 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 > Environment: spark 2.2.0 > scala 2.11 >Reporter: Sergey Zhemzhitsky > > Spark fails to complete job correctly in case of custom OutputFormat > implementations. > There are OutputFormat implementations which do not need to use > *mapreduce.output.fileoutputformat.outputdir* standard hadoop property. > [But spark reads this property from the > configuration|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala#L79] > while setting up an OutputCommitter > {code:javascript} > val committer = FileCommitProtocol.instantiate( > className = classOf[HadoopMapReduceCommitProtocol].getName, > jobId = stageId.toString, > outputPath = conf.value.get("mapreduce.output.fileoutputformat.outputdir"), > isAppend = false).asInstanceOf[HadoopMapReduceCommitProtocol] > committer.setupJob(jobContext) > {code} > ... and then uses this property later on while [commiting the > job|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L132], > [aborting the > job|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L141], > [creating task's temp > path|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L95] > In that cases when the job completes then following exception is thrown > {code} > Can not create a Path from a null string > java.lang.IllegalArgumentException: Can not create a Path from a null string > at org.apache.hadoop.fs.Path.checkPathArg(Path.java:123) > at org.apache.hadoop.fs.Path.(Path.java:135) > at org.apache.hadoop.fs.Path.(Path.java:89) > at > org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.absPathStagingDir(HadoopMapReduceCommitProtocol.scala:58) > at > org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.abortJob(HadoopMapReduceCommitProtocol.scala:141) > at > org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.write(SparkHadoopMapReduceWriter.scala:106) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1085) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) > at > org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1084) > ... > {code} > So it seems that all the jobs which use OutputFormats which don't write data > into HDFS-compatible file systems are broken. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22150) PeriodicCheckpointer fails with FileNotFoundException in case of dependant RDDs
[ https://issues.apache.org/jira/browse/SPARK-22150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16522015#comment-16522015 ] Sergey Zhemzhitsky commented on SPARK-22150: Just a kind remainder... > PeriodicCheckpointer fails with FileNotFoundException in case of dependant > RDDs > --- > > Key: SPARK-22150 > URL: https://issues.apache.org/jira/browse/SPARK-22150 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.6.0, 1.6.1, 1.6.2, 1.6.3, 2.0.0, > 2.0.1, 2.0.2, 2.1.0, 2.1.1, 2.2.0 > Environment: spark 2.2.0 > scala 2.11 >Reporter: Sergey Zhemzhitsky >Priority: Major > > PeriodicCheckpointer fails with FileNotFoundException in case of > checkpointing dependant RDDs (consider iterative algorithms), i.e. when the > RDD to checkpoint depends on already checkpointed RDD. > Here is the exception > {code} > Job aborted due to stage failure: Task creation failed: > java.io.FileNotFoundException: File > file:/tmp/spark-e2046e49-5d8a-4633-b525-52a00bab32f0/17d9b542-d48d-4f23-b368-3957a42644a6/rdd-20/part-0 > does not exist > java.io.FileNotFoundException: File > file:/tmp/spark-e2046e49-5d8a-4633-b525-52a00bab32f0/17d9b542-d48d-4f23-b368-3957a42644a6/rdd-20/part-0 > does not exist > at > org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:539) > at > org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:752) > at > org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:529) > at > org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:409) > at > org.apache.spark.rdd.ReliableCheckpointRDD.getPreferredLocations(ReliableCheckpointRDD.scala:89) > at > org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:274) > at > org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:274) > at scala.Option.map(Option.scala:146) > at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:274) > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1697) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply$mcVI$sp(DAGScheduler.scala:1708) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply(DAGScheduler.scala:1707) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply(DAGScheduler.scala:1707) > at scala.collection.immutable.List.foreach(List.scala:381) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1707) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1705) > at scala.collection.immutable.List.foreach(List.scala:381) > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1705) > at > org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1671) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$16.apply(DAGScheduler.scala:989) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$16.apply(DAGScheduler.scala:987) > {code} > The issue seems to be in this [piece of > code|https://github.com/apache/spark/blob/0a7f5f2798b6e8b2ba15e8b3aa07d5953ad1c695/core/src/main/scala/org/apache/spark/util/PeriodicCheckpointer.scala#L94] > {code:java} > if (checkpointInterval != -1 && (updateCount % checkpointInterval) == 0 > && sc.getCheckpointDir.nonEmpty) { > // Add new checkpoint before removing old checkpoints. > checkpoint(newData) > checkpointQueue.enqueue(newData) > // Remove checkpoints before the latest one. > var canDelete = true > while (checkpointQueue.size > 1 && canDelete) { > // Delete the oldest checkpoint only if the next checkpoint exists. > if (isCheckpointed(checkpointQueue.head)) { > removeCheckpointFile() > } else { > canDelete = false > } > } > } > {code} > Given that _checkpointQueue.head_ is checkpointed and materialized and > _newData_ depends on _checkpointQueue.head_, then the exception happens on > action of RDDs representing _newData_ -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (SPARK-22184) GraphX fails in case of insufficient memory and checkpoints enabled
[ https://issues.apache.org/jira/browse/SPARK-22184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16522017#comment-16522017 ] Sergey Zhemzhitsky commented on SPARK-22184: Just a kind remainder... > GraphX fails in case of insufficient memory and checkpoints enabled > --- > > Key: SPARK-22184 > URL: https://issues.apache.org/jira/browse/SPARK-22184 > Project: Spark > Issue Type: Bug > Components: GraphX >Affects Versions: 2.2.0 > Environment: spark 2.2.0 > scala 2.11 >Reporter: Sergey Zhemzhitsky >Priority: Major > > GraphX fails with FileNotFoundException in case of insufficient memory when > checkpoints are enabled. > Here is the stacktrace > {code} > Job aborted due to stage failure: Task creation failed: > java.io.FileNotFoundException: File > file:/tmp/spark-90119695-a126-47b5-b047-d656fee10c17/9b16e2a9-6c80-45eb-8736-bbb6eb840146/rdd-28/part-0 > does not exist > java.io.FileNotFoundException: File > file:/tmp/spark-90119695-a126-47b5-b047-d656fee10c17/9b16e2a9-6c80-45eb-8736-bbb6eb840146/rdd-28/part-0 > does not exist > at > org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:539) > at > org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:752) > at > org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:529) > at > org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:409) > at > org.apache.spark.rdd.ReliableCheckpointRDD.getPreferredLocations(ReliableCheckpointRDD.scala:89) > at > org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:274) > at > org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:274) > at scala.Option.map(Option.scala:146) > at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:274) > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1697) > ... > {code} > As GraphX uses cached RDDs intensively, the issue is only reproducible when > previously cached and checkpointed Vertex and Edge RDDs are evicted from > memory and forced to be read from disk. > For testing purposes the following parameters may be set to emulate low > memory environment > {code} > val sparkConf = new SparkConf() > .set("spark.graphx.pregel.checkpointInterval", "2") > // set testing memory to evict cached RDDs from it and force > // reading checkpointed RDDs from disk > .set("spark.testing.reservedMemory", "128") > .set("spark.testing.memory", "256") > {code} > This issue also includes SPARK-22150 and cannot be fixed until SPARK-22150 is > fixed too. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24154) AccumulatorV2 loses type information during serialization
Sergey Zhemzhitsky created SPARK-24154: -- Summary: AccumulatorV2 loses type information during serialization Key: SPARK-24154 URL: https://issues.apache.org/jira/browse/SPARK-24154 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.3.0, 2.2.1, 2.2.0, 2.3.1 Environment: Scala 2.11 Spark 2.2.0 Reporter: Sergey Zhemzhitsky AccumulatorV2 loses type information during serialization. It happens [here|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L164] during *writeReplace* call {code:scala} final protected def writeReplace(): Any = { if (atDriverSide) { if (!isRegistered) { throw new UnsupportedOperationException( "Accumulator must be registered before send to executor") } val copyAcc = copyAndReset() assert(copyAcc.isZero, "copyAndReset must return a zero value copy") val isInternalAcc = name.isDefined && name.get.startsWith(InternalAccumulator.METRICS_PREFIX) if (isInternalAcc) { // Do not serialize the name of internal accumulator and send it to executor. copyAcc.metadata = metadata.copy(name = None) } else { // For non-internal accumulators, we still need to send the name because users may need to // access the accumulator name at executor side, or they may keep the accumulators sent from // executors and access the name when the registered accumulator is already garbage // collected(e.g. SQLMetrics). copyAcc.metadata = metadata } copyAcc } else { this } } {code} It means that it is hardly possible to create new accumulators easily by adding new behaviour to existing ones by means of mix-ins or inheritance (without overriding *copy*). For example the following snippet ... {code:scala} trait TripleCount { self: LongAccumulator => abstract override def add(v: jl.Long): Unit = { self.add(v * 3) } } val acc = new LongAccumulator with TripleCount sc.register(acc) val data = 1 to 10 val rdd = sc.makeRDD(data, 5) rdd.foreach(acc.add(_)) acc.value shouldBe 3 * data.sum {code} ... fails with {code:none} org.scalatest.exceptions.TestFailedException: 55 was not equal to 165 at org.scalatest.MatchersHelper$.indicateFailure(MatchersHelper.scala:340) at org.scalatest.Matchers$AnyShouldWrapper.shouldBe(Matchers.scala:6864) {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24154) AccumulatorV2 loses type information during serialization
[ https://issues.apache.org/jira/browse/SPARK-24154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Zhemzhitsky updated SPARK-24154: --- Description: AccumulatorV2 loses type information during serialization. It happens [here|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L164] during *writeReplace* call {code:scala} final protected def writeReplace(): Any = { if (atDriverSide) { if (!isRegistered) { throw new UnsupportedOperationException( "Accumulator must be registered before send to executor") } val copyAcc = copyAndReset() assert(copyAcc.isZero, "copyAndReset must return a zero value copy") val isInternalAcc = name.isDefined && name.get.startsWith(InternalAccumulator.METRICS_PREFIX) if (isInternalAcc) { // Do not serialize the name of internal accumulator and send it to executor. copyAcc.metadata = metadata.copy(name = None) } else { // For non-internal accumulators, we still need to send the name because users may need to // access the accumulator name at executor side, or they may keep the accumulators sent from // executors and access the name when the registered accumulator is already garbage // collected(e.g. SQLMetrics). copyAcc.metadata = metadata } copyAcc } else { this } } {code} It means that it is hardly possible to create new accumulators easily by adding new behaviour to existing ones by means of mix-ins or inheritance (without overriding *copy*). For example the following snippet ... {code:scala} trait TripleCount { self: LongAccumulator => abstract override def add(v: jl.Long): Unit = { self.add(v * 3) } } val acc = new LongAccumulator with TripleCount sc.register(acc) val data = 1 to 10 val rdd = sc.makeRDD(data, 5) rdd.foreach(acc.add(_)) acc.value shouldBe 3 * data.sum {code} ... fails with {code:none} org.scalatest.exceptions.TestFailedException: 55 was not equal to 165 at org.scalatest.MatchersHelper$.indicateFailure(MatchersHelper.scala:340) at org.scalatest.Matchers$AnyShouldWrapper.shouldBe(Matchers.scala:6864) {code} Also such a behaviour seems to be error prone and confusing because an implementor gets not the same thing as he/she sees in the code. was: AccumulatorV2 loses type information during serialization. It happens [here|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L164] during *writeReplace* call {code:scala} final protected def writeReplace(): Any = { if (atDriverSide) { if (!isRegistered) { throw new UnsupportedOperationException( "Accumulator must be registered before send to executor") } val copyAcc = copyAndReset() assert(copyAcc.isZero, "copyAndReset must return a zero value copy") val isInternalAcc = name.isDefined && name.get.startsWith(InternalAccumulator.METRICS_PREFIX) if (isInternalAcc) { // Do not serialize the name of internal accumulator and send it to executor. copyAcc.metadata = metadata.copy(name = None) } else { // For non-internal accumulators, we still need to send the name because users may need to // access the accumulator name at executor side, or they may keep the accumulators sent from // executors and access the name when the registered accumulator is already garbage // collected(e.g. SQLMetrics). copyAcc.metadata = metadata } copyAcc } else { this } } {code} It means that it is hardly possible to create new accumulators easily by adding new behaviour to existing ones by means of mix-ins or inheritance (without overriding *copy*). For example the following snippet ... {code:scala} trait TripleCount { self: LongAccumulator => abstract override def add(v: jl.Long): Unit = { self.add(v * 3) } } val acc = new LongAccumulator with TripleCount sc.register(acc) val data = 1 to 10 val rdd = sc.makeRDD(data, 5) rdd.foreach(acc.add(_)) acc.value shouldBe 3 * data.sum {code} ... fails with {code:none} org.scalatest.exceptions.TestFailedException: 55 was not equal to 165 at org.scalatest.MatchersHelper$.indicateFailure(MatchersHelper.scala:340) at org.scalatest.Matchers$AnyShouldWrapper.shouldBe(Matchers.scala:6864) {code} > AccumulatorV2 loses type information during serialization > - > > Key: SPARK-24154 > URL: https://issues.apache.org/jira/browse/SPARK-24154 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0, 2.2.1, 2.3.0, 2.3.1 > Environment: Scala 2.11 > Spark 2.2.0 >Reporter: Sergey Zhemzhitsky >Priority: Major > > AccumulatorV2 loses
[jira] [Commented] (SPARK-24154) AccumulatorV2 loses type information during serialization
[ https://issues.apache.org/jira/browse/SPARK-24154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462630#comment-16462630 ] Sergey Zhemzhitsky commented on SPARK-24154: > If users have to support mixin traits, they can still use accumulator v1. But accumulators V1 are deprecated and will be removed one day I believe. > AccumulatorV2 loses type information during serialization > - > > Key: SPARK-24154 > URL: https://issues.apache.org/jira/browse/SPARK-24154 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0, 2.2.1, 2.3.0, 2.3.1 > Environment: Scala 2.11 > Spark 2.2.0 >Reporter: Sergey Zhemzhitsky >Priority: Major > > AccumulatorV2 loses type information during serialization. > It happens > [here|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L164] > during *writeReplace* call > {code:scala} > final protected def writeReplace(): Any = { > if (atDriverSide) { > if (!isRegistered) { > throw new UnsupportedOperationException( > "Accumulator must be registered before send to executor") > } > val copyAcc = copyAndReset() > assert(copyAcc.isZero, "copyAndReset must return a zero value copy") > val isInternalAcc = name.isDefined && > name.get.startsWith(InternalAccumulator.METRICS_PREFIX) > if (isInternalAcc) { > // Do not serialize the name of internal accumulator and send it to > executor. > copyAcc.metadata = metadata.copy(name = None) > } else { > // For non-internal accumulators, we still need to send the name > because users may need to > // access the accumulator name at executor side, or they may keep the > accumulators sent from > // executors and access the name when the registered accumulator is > already garbage > // collected(e.g. SQLMetrics). > copyAcc.metadata = metadata > } > copyAcc > } else { > this > } > } > {code} > It means that it is hardly possible to create new accumulators easily by > adding new behaviour to existing ones by means of mix-ins or inheritance > (without overriding *copy*). > For example the following snippet ... > {code:scala} > trait TripleCount { > self: LongAccumulator => > abstract override def add(v: jl.Long): Unit = { > self.add(v * 3) > } > } > val acc = new LongAccumulator with TripleCount > sc.register(acc) > val data = 1 to 10 > val rdd = sc.makeRDD(data, 5) > rdd.foreach(acc.add(_)) > acc.value shouldBe 3 * data.sum > {code} > ... fails with > {code:none} > org.scalatest.exceptions.TestFailedException: 55 was not equal to 165 > at org.scalatest.MatchersHelper$.indicateFailure(MatchersHelper.scala:340) > at org.scalatest.Matchers$AnyShouldWrapper.shouldBe(Matchers.scala:6864) > {code} > Also such a behaviour seems to be error prone and confusing because an > implementor gets not the same thing as he/she sees in the code. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22150) PeriodicCheckpointer fails with FileNotFoundException in case of dependant RDDs
[ https://issues.apache.org/jira/browse/SPARK-22150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221018#comment-16221018 ] Sergey Zhemzhitsky commented on SPARK-22150: Hello guys, is there a chance for this issue to be looked through as well as the corresponding PR? It would be really great for the fix to be included into spark 2.2.1/2.3.0. > PeriodicCheckpointer fails with FileNotFoundException in case of dependant > RDDs > --- > > Key: SPARK-22150 > URL: https://issues.apache.org/jira/browse/SPARK-22150 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.6.0, 1.6.1, 1.6.2, 1.6.3, 2.0.0, > 2.0.1, 2.0.2, 2.1.0, 2.1.1, 2.2.0 > Environment: spark 2.2.0 > scala 2.11 >Reporter: Sergey Zhemzhitsky > > PeriodicCheckpointer fails with FileNotFoundException in case of > checkpointing dependant RDDs (consider iterative algorithms), i.e. when the > RDD to checkpoint depends on already checkpointed RDD. > Here is the exception > {code} > Job aborted due to stage failure: Task creation failed: > java.io.FileNotFoundException: File > file:/tmp/spark-e2046e49-5d8a-4633-b525-52a00bab32f0/17d9b542-d48d-4f23-b368-3957a42644a6/rdd-20/part-0 > does not exist > java.io.FileNotFoundException: File > file:/tmp/spark-e2046e49-5d8a-4633-b525-52a00bab32f0/17d9b542-d48d-4f23-b368-3957a42644a6/rdd-20/part-0 > does not exist > at > org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:539) > at > org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:752) > at > org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:529) > at > org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:409) > at > org.apache.spark.rdd.ReliableCheckpointRDD.getPreferredLocations(ReliableCheckpointRDD.scala:89) > at > org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:274) > at > org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:274) > at scala.Option.map(Option.scala:146) > at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:274) > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1697) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply$mcVI$sp(DAGScheduler.scala:1708) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply(DAGScheduler.scala:1707) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply(DAGScheduler.scala:1707) > at scala.collection.immutable.List.foreach(List.scala:381) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1707) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1705) > at scala.collection.immutable.List.foreach(List.scala:381) > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1705) > at > org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1671) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$16.apply(DAGScheduler.scala:989) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$16.apply(DAGScheduler.scala:987) > {code} > The issue seems to be in this [piece of > code|https://github.com/apache/spark/blob/0a7f5f2798b6e8b2ba15e8b3aa07d5953ad1c695/core/src/main/scala/org/apache/spark/util/PeriodicCheckpointer.scala#L94] > {code:java} > if (checkpointInterval != -1 && (updateCount % checkpointInterval) == 0 > && sc.getCheckpointDir.nonEmpty) { > // Add new checkpoint before removing old checkpoints. > checkpoint(newData) > checkpointQueue.enqueue(newData) > // Remove checkpoints before the latest one. > var canDelete = true > while (checkpointQueue.size > 1 && canDelete) { > // Delete the oldest checkpoint only if the next checkpoint exists. > if (isCheckpointed(checkpointQueue.head)) { > removeCheckpointFile() > } else { > canDelete = false > } > } > } > {code} > Given that _checkpointQueue.head_ is checkpointed and materialized and > _newData_ depends on _checkpointQueue.head_, then the exception happens on > action of
[jira] [Commented] (SPARK-22184) GraphX fails in case of insufficient memory and checkpoints enabled
[ https://issues.apache.org/jira/browse/SPARK-22184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221019#comment-16221019 ] Sergey Zhemzhitsky commented on SPARK-22184: Hello guys, is there a chance for this issue to be looked through as well as the corresponding PR? It would be really great for the fix to be included into spark 2.2.1/2.3.0. > GraphX fails in case of insufficient memory and checkpoints enabled > --- > > Key: SPARK-22184 > URL: https://issues.apache.org/jira/browse/SPARK-22184 > Project: Spark > Issue Type: Bug > Components: GraphX >Affects Versions: 2.2.0 > Environment: spark 2.2.0 > scala 2.11 >Reporter: Sergey Zhemzhitsky > > GraphX fails with FileNotFoundException in case of insufficient memory when > checkpoints are enabled. > Here is the stacktrace > {code} > Job aborted due to stage failure: Task creation failed: > java.io.FileNotFoundException: File > file:/tmp/spark-90119695-a126-47b5-b047-d656fee10c17/9b16e2a9-6c80-45eb-8736-bbb6eb840146/rdd-28/part-0 > does not exist > java.io.FileNotFoundException: File > file:/tmp/spark-90119695-a126-47b5-b047-d656fee10c17/9b16e2a9-6c80-45eb-8736-bbb6eb840146/rdd-28/part-0 > does not exist > at > org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:539) > at > org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:752) > at > org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:529) > at > org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:409) > at > org.apache.spark.rdd.ReliableCheckpointRDD.getPreferredLocations(ReliableCheckpointRDD.scala:89) > at > org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:274) > at > org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:274) > at scala.Option.map(Option.scala:146) > at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:274) > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1697) > ... > {code} > As GraphX uses cached RDDs intensively, the issue is only reproducible when > previously cached and checkpointed Vertex and Edge RDDs are evicted from > memory and forced to be read from disk. > For testing purposes the following parameters may be set to emulate low > memory environment > {code} > val sparkConf = new SparkConf() > .set("spark.graphx.pregel.checkpointInterval", "2") > // set testing memory to evict cached RDDs from it and force > // reading checkpointed RDDs from disk > .set("spark.testing.reservedMemory", "128") > .set("spark.testing.memory", "256") > {code} > This issue also includes SPARK-22150 and cannot be fixed until SPARK-22150 is > fixed too. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-23697) Accumulators of Spark 1.x no longer work with Spark 2.x
[ https://issues.apache.org/jira/browse/SPARK-23697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Zhemzhitsky updated SPARK-23697: --- Description: I've noticed that accumulators of Spark 1.x no longer work with Spark 2.x failing with java.lang.AssertionError: assertion failed: copyAndReset must return a zero value copy It happens while serializing an accumulator [here|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L165] {code:java} val copyAcc = copyAndReset() assert(copyAcc.isZero, "copyAndReset must return a zero value copy"){code} ... although copyAndReset returns zero-value copy for sure, just consider the accumulator below {code:java} val concatParam = new AccumulatorParam[jl.StringBuilder] { override def zero(initialValue: jl.StringBuilder): jl.StringBuilder = new jl.StringBuilder() override def addInPlace(r1: jl.StringBuilder, r2: jl.StringBuilder): jl.StringBuilder = r1.append(r2) }{code} So, Spark treats zero value as non-zero due to how [isZero|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L489] is implemented in LegacyAccumulatorWrapper. {code:java} override def isZero: Boolean = _value == param.zero(initialValue){code} All this means that the values to be accumulated must implement equals and hashCode, otherwise isZero is very likely to always return false. So I'm wondering whether the assertion {code:java} assert(copyAcc.isZero, "copyAndReset must return a zero value copy"){code} is really necessary and whether it can be safely removed from there? If not - is it ok to just override writeReplace for LegacyAccumulatorWrapperto prevent such failures? was: I've noticed that accumulators of Spark 1.x no longer work with Spark 2.x failing with java.lang.AssertionError: assertion failed: copyAndReset must return a zero value copy It happens while serializing an accumulator [here|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L165] {code:java} val copyAcc = copyAndReset() assert(copyAcc.isZero, "copyAndReset must return a zero value copy"){code} ... although copyAndReset returns zero-value copy for sure, just consider the accumulator below {code:java} val concatParam = new AccumulatorParam[jl.StringBuilder] { override def zero(initialValue: jl.StringBuilder): jl.StringBuilder = new jl.StringBuilder() override def addInPlace(r1: jl.StringBuilder, r2: jl.StringBuilder): jl.StringBuilder = r1.append(r2) }{code} So, Spark treats zero value as non-zero due to how [isZero|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L489] is implemented in LegacyAccumulatorWrapper. {code:java} override def isZero: Boolean = _value == param.zero(initialValue){code} All this means that the values to be accumulated must implement equals and hashCode, otherwise isZero is very likely to always return false. So I'm wondering whether the assertion {code:java} assert(copyAcc.isZero, "copyAndReset must return a zero value copy"){code} is really necessary and whether it can be safely removed from there. > Accumulators of Spark 1.x no longer work with Spark 2.x > --- > > Key: SPARK-23697 > URL: https://issues.apache.org/jira/browse/SPARK-23697 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0, 2.2.1 > Environment: Spark 2.2.0 > Scala 2.11 >Reporter: Sergey Zhemzhitsky >Priority: Major > > I've noticed that accumulators of Spark 1.x no longer work with Spark 2.x > failing with > java.lang.AssertionError: assertion failed: copyAndReset must return a zero > value copy > It happens while serializing an accumulator > [here|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L165] > {code:java} > val copyAcc = copyAndReset() > assert(copyAcc.isZero, "copyAndReset must return a zero value copy"){code} > ... although copyAndReset returns zero-value copy for sure, just consider the > accumulator below > {code:java} > val concatParam = new AccumulatorParam[jl.StringBuilder] { > override def zero(initialValue: jl.StringBuilder): jl.StringBuilder = new > jl.StringBuilder() > override def addInPlace(r1: jl.StringBuilder, r2: jl.StringBuilder): > jl.StringBuilder = r1.append(r2) > }{code} > So, Spark treats zero value as non-zero due to how >
[jira] [Updated] (SPARK-23697) Accumulators of Spark 1.x no longer work with Spark 2.x
[ https://issues.apache.org/jira/browse/SPARK-23697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Zhemzhitsky updated SPARK-23697: --- Description: I've noticed that accumulators of Spark 1.x no longer work with Spark 2.x failing with java.lang.AssertionError: assertion failed: copyAndReset must return a zero value copy It happens while serializing an accumulator [here|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L165] {code:java} val copyAcc = copyAndReset() assert(copyAcc.isZero, "copyAndReset must return a zero value copy"){code} ... although copyAndReset returns zero-value copy for sure, just consider the accumulator below {code:java} val concatParam = new AccumulatorParam[jl.StringBuilder] { override def zero(initialValue: jl.StringBuilder): jl.StringBuilder = new jl.StringBuilder() override def addInPlace(r1: jl.StringBuilder, r2: jl.StringBuilder): jl.StringBuilder = r1.append(r2) }{code} So, Spark treats zero value as non-zero due to how [isZero|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L489] is implemented in LegacyAccumulatorWrapper. {code:java} override def isZero: Boolean = _value == param.zero(initialValue){code} All this means that the values to be accumulated must implement equals and hashCode, otherwise isZero is very likely to always return false. So I'm wondering whether the assertion {code:java} assert(copyAcc.isZero, "copyAndReset must return a zero value copy"){code} is really necessary and whether it can be safely removed from there. was: I've noticed that accumulators of Spark 1.x no longer work with Spark 2.x failing with java.lang.AssertionError: assertion failed: copyAndReset must return a zero value copy It happens while serializing an accumulator [here|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L165] {code:java} val copyAcc = copyAndReset() assert(copyAcc.isZero, "copyAndReset must return a zero value copy"){code} ... although copyAndReset returns zero-value copy for sure, just consider the accumulator below {code:java} val concatParam = new AccumulatorParam[jl.StringBuilder] { override def zero(initialValue: jl.StringBuilder): jl.StringBuilder = new jl.StringBuilder() override def addInPlace(r1: jl.StringBuilder, r2: jl.StringBuilder): jl.StringBuilder = r1.append(r2) }{code} So, Spark treats zero value as non-zero due to how [isZero|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L489] is implemented in LegacyAccumulatorWrapper {code:java} override def isZero: Boolean = _value == param.zero(initialValue){code} All this means that the values to be accumulated must implement equals and hashCode, otherwise isZero is very likely to always return false. So I'm wondering whether the assertion {code:java} assert(copyAcc.isZero, "copyAndReset must return a zero value copy"){code} is really necessary and whether it can be safely removed from there. > Accumulators of Spark 1.x no longer work with Spark 2.x > --- > > Key: SPARK-23697 > URL: https://issues.apache.org/jira/browse/SPARK-23697 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0, 2.2.1 > Environment: Spark 2.2.0 > Scala 2.11 >Reporter: Sergey Zhemzhitsky >Priority: Major > > I've noticed that accumulators of Spark 1.x no longer work with Spark 2.x > failing with > java.lang.AssertionError: assertion failed: copyAndReset must return a zero > value copy > It happens while serializing an accumulator > [here|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L165] > {code:java} > val copyAcc = copyAndReset() > assert(copyAcc.isZero, "copyAndReset must return a zero value copy"){code} > ... although copyAndReset returns zero-value copy for sure, just consider the > accumulator below > {code:java} > val concatParam = new AccumulatorParam[jl.StringBuilder] { > override def zero(initialValue: jl.StringBuilder): jl.StringBuilder = new > jl.StringBuilder() > override def addInPlace(r1: jl.StringBuilder, r2: jl.StringBuilder): > jl.StringBuilder = r1.append(r2) > }{code} > So, Spark treats zero value as non-zero due to how > [isZero|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L489] > is implemented in LegacyAccumulatorWrapper. > {code:java} > override def isZero:
[jira] [Updated] (SPARK-23697) Accumulators of Spark 1.x no longer work with Spark 2.x
[ https://issues.apache.org/jira/browse/SPARK-23697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Zhemzhitsky updated SPARK-23697: --- Description: I've noticed that accumulators of Spark 1.x no longer work with Spark 2.x failing with java.lang.AssertionError: assertion failed: copyAndReset must return a zero value copy It happens while serializing an accumulator [here|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L165] {code:java} val copyAcc = copyAndReset() assert(copyAcc.isZero, "copyAndReset must return a zero value copy"){code} ... although copyAndReset returns zero-value copy for sure, just consider the accumulator below {code:java} val concatParam = new AccumulatorParam[jl.StringBuilder] { override def zero(initialValue: jl.StringBuilder): jl.StringBuilder = new jl.StringBuilder() override def addInPlace(r1: jl.StringBuilder, r2: jl.StringBuilder): jl.StringBuilder = r1.append(r2) }{code} So, Spark treats zero value as non-zero due to how [isZero|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L489] is implemented in LegacyAccumulatorWrapper. {code:java} override def isZero: Boolean = _value == param.zero(initialValue){code} All this means that the values to be accumulated must implement equals and hashCode, otherwise isZero is very likely to always return false. So I'm wondering whether the assertion {code:java} assert(copyAcc.isZero, "copyAndReset must return a zero value copy"){code} is really necessary and whether it can be safely removed from there? If not - is it ok to just override writeReplace for LegacyAccumulatorWrapper to prevent such failures? was: I've noticed that accumulators of Spark 1.x no longer work with Spark 2.x failing with java.lang.AssertionError: assertion failed: copyAndReset must return a zero value copy It happens while serializing an accumulator [here|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L165] {code:java} val copyAcc = copyAndReset() assert(copyAcc.isZero, "copyAndReset must return a zero value copy"){code} ... although copyAndReset returns zero-value copy for sure, just consider the accumulator below {code:java} val concatParam = new AccumulatorParam[jl.StringBuilder] { override def zero(initialValue: jl.StringBuilder): jl.StringBuilder = new jl.StringBuilder() override def addInPlace(r1: jl.StringBuilder, r2: jl.StringBuilder): jl.StringBuilder = r1.append(r2) }{code} So, Spark treats zero value as non-zero due to how [isZero|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L489] is implemented in LegacyAccumulatorWrapper. {code:java} override def isZero: Boolean = _value == param.zero(initialValue){code} All this means that the values to be accumulated must implement equals and hashCode, otherwise isZero is very likely to always return false. So I'm wondering whether the assertion {code:java} assert(copyAcc.isZero, "copyAndReset must return a zero value copy"){code} is really necessary and whether it can be safely removed from there? If not - is it ok to just override writeReplace for LegacyAccumulatorWrapperto prevent such failures? > Accumulators of Spark 1.x no longer work with Spark 2.x > --- > > Key: SPARK-23697 > URL: https://issues.apache.org/jira/browse/SPARK-23697 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0, 2.2.1 > Environment: Spark 2.2.0 > Scala 2.11 >Reporter: Sergey Zhemzhitsky >Priority: Major > > I've noticed that accumulators of Spark 1.x no longer work with Spark 2.x > failing with > java.lang.AssertionError: assertion failed: copyAndReset must return a zero > value copy > It happens while serializing an accumulator > [here|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L165] > {code:java} > val copyAcc = copyAndReset() > assert(copyAcc.isZero, "copyAndReset must return a zero value copy"){code} > ... although copyAndReset returns zero-value copy for sure, just consider the > accumulator below > {code:java} > val concatParam = new AccumulatorParam[jl.StringBuilder] { > override def zero(initialValue: jl.StringBuilder): jl.StringBuilder = new > jl.StringBuilder() > override def addInPlace(r1: jl.StringBuilder, r2: jl.StringBuilder): > jl.StringBuilder = r1.append(r2) > }{code} > So, Spark treats zero value as non-zero due to how >
[jira] [Created] (SPARK-23697) Accumulators of Spark 1.x no longer work with Spark 2.x
Sergey Zhemzhitsky created SPARK-23697: -- Summary: Accumulators of Spark 1.x no longer work with Spark 2.x Key: SPARK-23697 URL: https://issues.apache.org/jira/browse/SPARK-23697 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.2.1, 2.2.0 Environment: Spark 2.2.0 Scala 2.11 Reporter: Sergey Zhemzhitsky I've noticed that accumulators of Spark 1.x no longer work with Spark 2.x failing with java.lang.AssertionError: assertion failed: copyAndReset must return a zero value copy It happens while serializing an accumulator [here|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L165] {code:java} val copyAcc = copyAndReset() assert(copyAcc.isZero, "copyAndReset must return a zero value copy"){code} ... although copyAndReset returns zero-value copy for sure, just consider the accumulator below {code:java} val concatParam = new AccumulatorParam[jl.StringBuilder] { override def zero(initialValue: jl.StringBuilder): jl.StringBuilder = new jl.StringBuilder() override def addInPlace(r1: jl.StringBuilder, r2: jl.StringBuilder): jl.StringBuilder = r1.append(r2) }{code} So, Spark treats zero value as non-zero due to how [isZero|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L489] is implemented in LegacyAccumulatorWrapper {code:java} override def isZero: Boolean = _value == param.zero(initialValue){code} All this means that the values to be accumulated must implement equals and hashCode, otherwise `isZero` is very likely to always return false. So I'm wondering whether the assertion {code:java} assert(copyAcc.isZero, "copyAndReset must return a zero value copy"){code} is really necessary and whether it can be safely removed from there. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-23697) Accumulators of Spark 1.x no longer work with Spark 2.x
[ https://issues.apache.org/jira/browse/SPARK-23697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Zhemzhitsky updated SPARK-23697: --- Description: I've noticed that accumulators of Spark 1.x no longer work with Spark 2.x failing with java.lang.AssertionError: assertion failed: copyAndReset must return a zero value copy It happens while serializing an accumulator [here|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L165] {code:java} val copyAcc = copyAndReset() assert(copyAcc.isZero, "copyAndReset must return a zero value copy"){code} ... although copyAndReset returns zero-value copy for sure, just consider the accumulator below {code:java} val concatParam = new AccumulatorParam[jl.StringBuilder] { override def zero(initialValue: jl.StringBuilder): jl.StringBuilder = new jl.StringBuilder() override def addInPlace(r1: jl.StringBuilder, r2: jl.StringBuilder): jl.StringBuilder = r1.append(r2) }{code} So, Spark treats zero value as non-zero due to how [isZero|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L489] is implemented in LegacyAccumulatorWrapper {code:java} override def isZero: Boolean = _value == param.zero(initialValue){code} All this means that the values to be accumulated must implement equals and hashCode, otherwise isZero is very likely to always return false. So I'm wondering whether the assertion {code:java} assert(copyAcc.isZero, "copyAndReset must return a zero value copy"){code} is really necessary and whether it can be safely removed from there. was: I've noticed that accumulators of Spark 1.x no longer work with Spark 2.x failing with java.lang.AssertionError: assertion failed: copyAndReset must return a zero value copy It happens while serializing an accumulator [here|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L165] {code:java} val copyAcc = copyAndReset() assert(copyAcc.isZero, "copyAndReset must return a zero value copy"){code} ... although copyAndReset returns zero-value copy for sure, just consider the accumulator below {code:java} val concatParam = new AccumulatorParam[jl.StringBuilder] { override def zero(initialValue: jl.StringBuilder): jl.StringBuilder = new jl.StringBuilder() override def addInPlace(r1: jl.StringBuilder, r2: jl.StringBuilder): jl.StringBuilder = r1.append(r2) }{code} So, Spark treats zero value as non-zero due to how [isZero|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L489] is implemented in LegacyAccumulatorWrapper {code:java} override def isZero: Boolean = _value == param.zero(initialValue){code} All this means that the values to be accumulated must implement equals and hashCode, otherwise `isZero` is very likely to always return false. So I'm wondering whether the assertion {code:java} assert(copyAcc.isZero, "copyAndReset must return a zero value copy"){code} is really necessary and whether it can be safely removed from there. > Accumulators of Spark 1.x no longer work with Spark 2.x > --- > > Key: SPARK-23697 > URL: https://issues.apache.org/jira/browse/SPARK-23697 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0, 2.2.1 > Environment: Spark 2.2.0 > Scala 2.11 >Reporter: Sergey Zhemzhitsky >Priority: Major > > I've noticed that accumulators of Spark 1.x no longer work with Spark 2.x > failing with > java.lang.AssertionError: assertion failed: copyAndReset must return a zero > value copy > It happens while serializing an accumulator > [here|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L165] > {code:java} > val copyAcc = copyAndReset() > assert(copyAcc.isZero, "copyAndReset must return a zero value copy"){code} > ... although copyAndReset returns zero-value copy for sure, just consider the > accumulator below > {code:java} > val concatParam = new AccumulatorParam[jl.StringBuilder] { > override def zero(initialValue: jl.StringBuilder): jl.StringBuilder = new > jl.StringBuilder() > override def addInPlace(r1: jl.StringBuilder, r2: jl.StringBuilder): > jl.StringBuilder = r1.append(r2) > }{code} > So, Spark treats zero value as non-zero due to how > [isZero|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L489] > is implemented in LegacyAccumulatorWrapper > {code:java} > override def isZero:
[jira] [Updated] (SPARK-23697) Accumulators of Spark 1.x no longer work with Spark 2.x
[ https://issues.apache.org/jira/browse/SPARK-23697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Zhemzhitsky updated SPARK-23697: --- Description: I've noticed that accumulators of Spark 1.x no longer work with Spark 2.x failing with {code:java} java.lang.AssertionError: assertion failed: copyAndReset must return a zero value copy{code} It happens while serializing an accumulator [here|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L165] {code:java} val copyAcc = copyAndReset() assert(copyAcc.isZero, "copyAndReset must return a zero value copy"){code} ... although copyAndReset returns zero-value copy for sure, just consider the accumulator below {code:java} val concatParam = new AccumulatorParam[jl.StringBuilder] { override def zero(initialValue: jl.StringBuilder): jl.StringBuilder = new jl.StringBuilder() override def addInPlace(r1: jl.StringBuilder, r2: jl.StringBuilder): jl.StringBuilder = r1.append(r2) }{code} So, Spark treats zero value as non-zero due to how [isZero|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L489] is implemented in LegacyAccumulatorWrapper. {code:java} override def isZero: Boolean = _value == param.zero(initialValue){code} All this means that the values to be accumulated must implement equals and hashCode, otherwise isZero is very likely to always return false. So I'm wondering whether the assertion {code:java} assert(copyAcc.isZero, "copyAndReset must return a zero value copy"){code} is really necessary and whether it can be safely removed from there? If not - is it ok to just override writeReplace for LegacyAccumulatorWrapper to prevent such failures? was: I've noticed that accumulators of Spark 1.x no longer work with Spark 2.x failing with java.lang.AssertionError: assertion failed: copyAndReset must return a zero value copy It happens while serializing an accumulator [here|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L165] {code:java} val copyAcc = copyAndReset() assert(copyAcc.isZero, "copyAndReset must return a zero value copy"){code} ... although copyAndReset returns zero-value copy for sure, just consider the accumulator below {code:java} val concatParam = new AccumulatorParam[jl.StringBuilder] { override def zero(initialValue: jl.StringBuilder): jl.StringBuilder = new jl.StringBuilder() override def addInPlace(r1: jl.StringBuilder, r2: jl.StringBuilder): jl.StringBuilder = r1.append(r2) }{code} So, Spark treats zero value as non-zero due to how [isZero|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L489] is implemented in LegacyAccumulatorWrapper. {code:java} override def isZero: Boolean = _value == param.zero(initialValue){code} All this means that the values to be accumulated must implement equals and hashCode, otherwise isZero is very likely to always return false. So I'm wondering whether the assertion {code:java} assert(copyAcc.isZero, "copyAndReset must return a zero value copy"){code} is really necessary and whether it can be safely removed from there? If not - is it ok to just override writeReplace for LegacyAccumulatorWrapper to prevent such failures? > Accumulators of Spark 1.x no longer work with Spark 2.x > --- > > Key: SPARK-23697 > URL: https://issues.apache.org/jira/browse/SPARK-23697 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0, 2.2.1 > Environment: Spark 2.2.0 > Scala 2.11 >Reporter: Sergey Zhemzhitsky >Priority: Major > > I've noticed that accumulators of Spark 1.x no longer work with Spark 2.x > failing with > {code:java} > java.lang.AssertionError: assertion failed: copyAndReset must return a zero > value copy{code} > It happens while serializing an accumulator > [here|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L165] > {code:java} > val copyAcc = copyAndReset() > assert(copyAcc.isZero, "copyAndReset must return a zero value copy"){code} > ... although copyAndReset returns zero-value copy for sure, just consider the > accumulator below > {code:java} > val concatParam = new AccumulatorParam[jl.StringBuilder] { > override def zero(initialValue: jl.StringBuilder): jl.StringBuilder = new > jl.StringBuilder() > override def addInPlace(r1: jl.StringBuilder, r2: jl.StringBuilder): > jl.StringBuilder = r1.append(r2) > }{code} > So, Spark treats zero value as non-zero due to how
[jira] [Updated] (SPARK-23697) Accumulators of Spark 1.x no longer work with Spark 2.x
[ https://issues.apache.org/jira/browse/SPARK-23697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Zhemzhitsky updated SPARK-23697: --- Affects Version/s: 2.3.0 > Accumulators of Spark 1.x no longer work with Spark 2.x > --- > > Key: SPARK-23697 > URL: https://issues.apache.org/jira/browse/SPARK-23697 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0, 2.2.1, 2.3.0 > Environment: Spark 2.2.0 > Scala 2.11 >Reporter: Sergey Zhemzhitsky >Priority: Major > > I've noticed that accumulators of Spark 1.x no longer work with Spark 2.x > failing with > {code:java} > java.lang.AssertionError: assertion failed: copyAndReset must return a zero > value copy{code} > It happens while serializing an accumulator > [here|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L165] > {code:java} > val copyAcc = copyAndReset() > assert(copyAcc.isZero, "copyAndReset must return a zero value copy"){code} > ... although copyAndReset returns zero-value copy for sure, just consider the > accumulator below > {code:java} > val concatParam = new AccumulatorParam[jl.StringBuilder] { > override def zero(initialValue: jl.StringBuilder): jl.StringBuilder = new > jl.StringBuilder() > override def addInPlace(r1: jl.StringBuilder, r2: jl.StringBuilder): > jl.StringBuilder = r1.append(r2) > }{code} > So, Spark treats zero value as non-zero due to how > [isZero|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L489] > is implemented in LegacyAccumulatorWrapper. > {code:java} > override def isZero: Boolean = _value == param.zero(initialValue){code} > All this means that the values to be accumulated must implement equals and > hashCode, otherwise isZero is very likely to always return false. > So I'm wondering whether the assertion > {code:java} > assert(copyAcc.isZero, "copyAndReset must return a zero value copy"){code} > is really necessary and whether it can be safely removed from there? > If not - is it ok to just override writeReplace for LegacyAccumulatorWrapper > to prevent such failures? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26114) Memory leak of PartitionedPairBuffer when coalescing after repartitionAndSortWithinPartitions
Sergey Zhemzhitsky created SPARK-26114: -- Summary: Memory leak of PartitionedPairBuffer when coalescing after repartitionAndSortWithinPartitions Key: SPARK-26114 URL: https://issues.apache.org/jira/browse/SPARK-26114 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.4.0, 2.3.2, 2.2.2 Environment: Spark 3.0.0-SNAPSHOT (master branch) Scala 2.11 Yarn 2.7 Reporter: Sergey Zhemzhitsky Trying to use _coalesce_ after shuffle-oriented transformations leads to OutOfMemoryErrors or _Container killed by YARN for exceeding memory limits. X GB of Y GB physical memory used. Consider boostingspark.yarn.executor.memoryOverhead_ The error happens when trying specify pretty small number of partitions in _coalesce_ call. *How to reproduce?* # Start spark-shell {code:bash} spark-shell \ --num-executors=5 \ --executor-cores=2 \ --master=yarn \ --deploy-mode=client \ --conf spark.executor.memory=1g \ --conf spark.dynamicAllocation.enabled=false \ --conf spark.executor.extraJavaOptions='-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp -Dio.netty.noUnsafe=true' {code} Please note using _-Dio.netty.noUnsafe=true_ property. Preventing off-heap memory usage seems to be the only way to control the amount of memory used for shuffle data transferring by now. Also note that the total number of cores allocated for job is 5x2=10 # Then generate some test data {code:scala} import org.apache.hadoop.io._ import org.apache.hadoop.io.compress._ import org.apache.commons.lang._ import org.apache.spark._ // generate 100M records of sample data sc.makeRDD(1 to 1000, 1000) .flatMap(item => (1 to 10) .map(i => new Text(RandomStringUtils.randomAlphanumeric(3).toLowerCase) -> new Text(RandomStringUtils.randomAlphanumeric(1024 .saveAsSequenceFile("/tmp/random-strings", Some(classOf[GzipCodec])) {code} # Run the sample job {code:scala} import org.apache.hadoop.io._ import org.apache.spark._ import org.apache.spark.storage._ val rdd = sc.sequenceFile("/tmp/random-strings", classOf[Text], classOf[Text]) rdd .map(item => item._1.toString -> item._2.toString) .repartitionAndSortWithinPartitions(new HashPartitioner(1000)) .coalesce(10,false) .count {code} Note that the number of partitions is equal to the total number of cores allocated to the job. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26114) Memory leak of PartitionedPairBuffer when coalescing after repartitionAndSortWithinPartitions
[ https://issues.apache.org/jira/browse/SPARK-26114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Zhemzhitsky updated SPARK-26114: --- Attachment: run1-noparams-dominator-tree.png > Memory leak of PartitionedPairBuffer when coalescing after > repartitionAndSortWithinPartitions > - > > Key: SPARK-26114 > URL: https://issues.apache.org/jira/browse/SPARK-26114 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.2, 2.3.2, 2.4.0 > Environment: Spark 3.0.0-SNAPSHOT (master branch) > Scala 2.11 > Yarn 2.7 >Reporter: Sergey Zhemzhitsky >Priority: Major > Attachments: run1-noparams-dominator-tree.png > > > Trying to use _coalesce_ after shuffle-oriented transformations leads to > OutOfMemoryErrors or _Container killed by YARN for exceeding memory limits. X > GB of Y GB physical memory used. Consider > boostingspark.yarn.executor.memoryOverhead_ > The error happens when trying specify pretty small number of partitions in > _coalesce_ call. > *How to reproduce?* > # Start spark-shell > {code:bash} > spark-shell \ > --num-executors=5 \ > --executor-cores=2 \ > --master=yarn \ > --deploy-mode=client \ > --conf spark.executor.memory=1g \ > --conf spark.dynamicAllocation.enabled=false \ > --conf spark.executor.extraJavaOptions='-XX:+HeapDumpOnOutOfMemoryError > -XX:HeapDumpPath=/tmp -Dio.netty.noUnsafe=true' > {code} > Please note using _-Dio.netty.noUnsafe=true_ property. Preventing off-heap > memory usage seems to be the only way to control the amount of memory used > for shuffle data transferring by now. > Also note that the total number of cores allocated for job is 5x2=10 > # Then generate some test data > {code:scala} > import org.apache.hadoop.io._ > import org.apache.hadoop.io.compress._ > import org.apache.commons.lang._ > import org.apache.spark._ > // generate 100M records of sample data > sc.makeRDD(1 to 1000, 1000) > .flatMap(item => (1 to 10) > .map(i => new Text(RandomStringUtils.randomAlphanumeric(3).toLowerCase) > -> new Text(RandomStringUtils.randomAlphanumeric(1024 > .saveAsSequenceFile("/tmp/random-strings", Some(classOf[GzipCodec])) > {code} > # Run the sample job > {code:scala} > import org.apache.hadoop.io._ > import org.apache.spark._ > import org.apache.spark.storage._ > val rdd = sc.sequenceFile("/tmp/random-strings", classOf[Text], classOf[Text]) > rdd > .map(item => item._1.toString -> item._2.toString) > .repartitionAndSortWithinPartitions(new HashPartitioner(1000)) > .coalesce(10,false) > .count > {code} > Note that the number of partitions is equal to the total number of cores > allocated to the job. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26114) Memory leak of PartitionedPairBuffer when coalescing after repartitionAndSortWithinPartitions
[ https://issues.apache.org/jira/browse/SPARK-26114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Zhemzhitsky updated SPARK-26114: --- Attachment: run1-noparams-dominator-tree-externalsorter.png > Memory leak of PartitionedPairBuffer when coalescing after > repartitionAndSortWithinPartitions > - > > Key: SPARK-26114 > URL: https://issues.apache.org/jira/browse/SPARK-26114 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.2, 2.3.2, 2.4.0 > Environment: Spark 3.0.0-SNAPSHOT (master branch) > Scala 2.11 > Yarn 2.7 >Reporter: Sergey Zhemzhitsky >Priority: Major > Attachments: run1-noparams-dominator-tree-externalsorter.png, > run1-noparams-dominator-tree.png > > > Trying to use _coalesce_ after shuffle-oriented transformations leads to > OutOfMemoryErrors or _Container killed by YARN for exceeding memory limits. X > GB of Y GB physical memory used. Consider > boostingspark.yarn.executor.memoryOverhead_ > The error happens when trying specify pretty small number of partitions in > _coalesce_ call. > *How to reproduce?* > # Start spark-shell > {code:bash} > spark-shell \ > --num-executors=5 \ > --executor-cores=2 \ > --master=yarn \ > --deploy-mode=client \ > --conf spark.executor.memory=1g \ > --conf spark.dynamicAllocation.enabled=false \ > --conf spark.executor.extraJavaOptions='-XX:+HeapDumpOnOutOfMemoryError > -XX:HeapDumpPath=/tmp -Dio.netty.noUnsafe=true' > {code} > Please note using _-Dio.netty.noUnsafe=true_ property. Preventing off-heap > memory usage seems to be the only way to control the amount of memory used > for shuffle data transferring by now. > Also note that the total number of cores allocated for job is 5x2=10 > # Then generate some test data > {code:scala} > import org.apache.hadoop.io._ > import org.apache.hadoop.io.compress._ > import org.apache.commons.lang._ > import org.apache.spark._ > // generate 100M records of sample data > sc.makeRDD(1 to 1000, 1000) > .flatMap(item => (1 to 10) > .map(i => new Text(RandomStringUtils.randomAlphanumeric(3).toLowerCase) > -> new Text(RandomStringUtils.randomAlphanumeric(1024 > .saveAsSequenceFile("/tmp/random-strings", Some(classOf[GzipCodec])) > {code} > # Run the sample job > {code:scala} > import org.apache.hadoop.io._ > import org.apache.spark._ > import org.apache.spark.storage._ > val rdd = sc.sequenceFile("/tmp/random-strings", classOf[Text], classOf[Text]) > rdd > .map(item => item._1.toString -> item._2.toString) > .repartitionAndSortWithinPartitions(new HashPartitioner(1000)) > .coalesce(10,false) > .count > {code} > Note that the number of partitions is equal to the total number of cores > allocated to the job. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26114) Memory leak of PartitionedPairBuffer when coalescing after repartitionAndSortWithinPartitions
[ https://issues.apache.org/jira/browse/SPARK-26114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Zhemzhitsky updated SPARK-26114: --- Attachment: run1-noparams-dominator-tree-externalsorter-gc-root.png > Memory leak of PartitionedPairBuffer when coalescing after > repartitionAndSortWithinPartitions > - > > Key: SPARK-26114 > URL: https://issues.apache.org/jira/browse/SPARK-26114 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.2, 2.3.2, 2.4.0 > Environment: Spark 3.0.0-SNAPSHOT (master branch) > Scala 2.11 > Yarn 2.7 >Reporter: Sergey Zhemzhitsky >Priority: Major > Attachments: run1-noparams-dominator-tree-externalsorter-gc-root.png, > run1-noparams-dominator-tree-externalsorter.png, > run1-noparams-dominator-tree.png > > > Trying to use _coalesce_ after shuffle-oriented transformations leads to > OutOfMemoryErrors or _Container killed by YARN for exceeding memory limits. X > GB of Y GB physical memory used. Consider > boostingspark.yarn.executor.memoryOverhead_ > The error happens when trying specify pretty small number of partitions in > _coalesce_ call. > *How to reproduce?* > # Start spark-shell > {code:bash} > spark-shell \ > --num-executors=5 \ > --executor-cores=2 \ > --master=yarn \ > --deploy-mode=client \ > --conf spark.executor.memory=1g \ > --conf spark.dynamicAllocation.enabled=false \ > --conf spark.executor.extraJavaOptions='-XX:+HeapDumpOnOutOfMemoryError > -XX:HeapDumpPath=/tmp -Dio.netty.noUnsafe=true' > {code} > Please note using _-Dio.netty.noUnsafe=true_ property. Preventing off-heap > memory usage seems to be the only way to control the amount of memory used > for shuffle data transferring by now. > Also note that the total number of cores allocated for job is 5x2=10 > # Then generate some test data > {code:scala} > import org.apache.hadoop.io._ > import org.apache.hadoop.io.compress._ > import org.apache.commons.lang._ > import org.apache.spark._ > // generate 100M records of sample data > sc.makeRDD(1 to 1000, 1000) > .flatMap(item => (1 to 10) > .map(i => new Text(RandomStringUtils.randomAlphanumeric(3).toLowerCase) > -> new Text(RandomStringUtils.randomAlphanumeric(1024 > .saveAsSequenceFile("/tmp/random-strings", Some(classOf[GzipCodec])) > {code} > # Run the sample job > {code:scala} > import org.apache.hadoop.io._ > import org.apache.spark._ > import org.apache.spark.storage._ > val rdd = sc.sequenceFile("/tmp/random-strings", classOf[Text], classOf[Text]) > rdd > .map(item => item._1.toString -> item._2.toString) > .repartitionAndSortWithinPartitions(new HashPartitioner(1000)) > .coalesce(10,false) > .count > {code} > Note that the number of partitions is equal to the total number of cores > allocated to the job. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26114) Memory leak of PartitionedPairBuffer when coalescing after repartitionAndSortWithinPartitions
[ https://issues.apache.org/jira/browse/SPARK-26114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Zhemzhitsky updated SPARK-26114: --- Description: Trying to use _coalesce_ after shuffle-oriented transformations leads to OutOfMemoryErrors or _Container killed by YARN for exceeding memory limits. X GB of Y GB physical memory used. Consider boostingspark.yarn.executor.memoryOverhead_. Discussion is [here|http://apache-spark-developers-list.1001551.n3.nabble.com/Coalesce-behaviour-td25289.html]. The error happens when trying specify pretty small number of partitions in _coalesce_ call. *How to reproduce?* # Start spark-shell {code:bash} spark-shell \ --num-executors=5 \ --executor-cores=2 \ --master=yarn \ --deploy-mode=client \ --conf spark.executor.memory=1g \ --conf spark.dynamicAllocation.enabled=false \ --conf spark.executor.extraJavaOptions='-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp -Dio.netty.noUnsafe=true' {code} Please note using _-Dio.netty.noUnsafe=true_ property. Preventing off-heap memory usage seems to be the only way to control the amount of memory used for shuffle data transferring by now. Also note that the total number of cores allocated for job is 5x2=10 # Then generate some test data {code:scala} import org.apache.hadoop.io._ import org.apache.hadoop.io.compress._ import org.apache.commons.lang._ import org.apache.spark._ // generate 100M records of sample data sc.makeRDD(1 to 1000, 1000) .flatMap(item => (1 to 10) .map(i => new Text(RandomStringUtils.randomAlphanumeric(3).toLowerCase) -> new Text(RandomStringUtils.randomAlphanumeric(1024 .saveAsSequenceFile("/tmp/random-strings", Some(classOf[GzipCodec])) {code} # Run the sample job {code:scala} import org.apache.hadoop.io._ import org.apache.spark._ import org.apache.spark.storage._ val rdd = sc.sequenceFile("/tmp/random-strings", classOf[Text], classOf[Text]) rdd .map(item => item._1.toString -> item._2.toString) .repartitionAndSortWithinPartitions(new HashPartitioner(1000)) .coalesce(10,false) .count {code} Note that the number of partitions is equal to the total number of cores allocated to the job. was: Trying to use _coalesce_ after shuffle-oriented transformations leads to OutOfMemoryErrors or _Container killed by YARN for exceeding memory limits. X GB of Y GB physical memory used. Consider boostingspark.yarn.executor.memoryOverhead_ The error happens when trying specify pretty small number of partitions in _coalesce_ call. *How to reproduce?* # Start spark-shell {code:bash} spark-shell \ --num-executors=5 \ --executor-cores=2 \ --master=yarn \ --deploy-mode=client \ --conf spark.executor.memory=1g \ --conf spark.dynamicAllocation.enabled=false \ --conf spark.executor.extraJavaOptions='-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp -Dio.netty.noUnsafe=true' {code} Please note using _-Dio.netty.noUnsafe=true_ property. Preventing off-heap memory usage seems to be the only way to control the amount of memory used for shuffle data transferring by now. Also note that the total number of cores allocated for job is 5x2=10 # Then generate some test data {code:scala} import org.apache.hadoop.io._ import org.apache.hadoop.io.compress._ import org.apache.commons.lang._ import org.apache.spark._ // generate 100M records of sample data sc.makeRDD(1 to 1000, 1000) .flatMap(item => (1 to 10) .map(i => new Text(RandomStringUtils.randomAlphanumeric(3).toLowerCase) -> new Text(RandomStringUtils.randomAlphanumeric(1024 .saveAsSequenceFile("/tmp/random-strings", Some(classOf[GzipCodec])) {code} # Run the sample job {code:scala} import org.apache.hadoop.io._ import org.apache.spark._ import org.apache.spark.storage._ val rdd = sc.sequenceFile("/tmp/random-strings", classOf[Text], classOf[Text]) rdd .map(item => item._1.toString -> item._2.toString) .repartitionAndSortWithinPartitions(new HashPartitioner(1000)) .coalesce(10,false) .count {code} Note that the number of partitions is equal to the total number of cores allocated to the job. > Memory leak of PartitionedPairBuffer when coalescing after > repartitionAndSortWithinPartitions > - > > Key: SPARK-26114 > URL: https://issues.apache.org/jira/browse/SPARK-26114 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.2, 2.3.2, 2.4.0 > Environment: Spark 3.0.0-SNAPSHOT (master branch) > Scala 2.11 > Yarn 2.7 >Reporter: Sergey Zhemzhitsky >Priority: Major > Attachments: run1-noparams-dominator-tree-externalsorter-gc-root.png, > run1-noparams-dominator-tree-externalsorter.png, > run1-noparams-dominator-tree.png > > > Trying to use
[jira] [Updated] (SPARK-26114) Memory leak of PartitionedPairBuffer when coalescing after repartitionAndSortWithinPartitions
[ https://issues.apache.org/jira/browse/SPARK-26114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Zhemzhitsky updated SPARK-26114: --- Description: Trying to use _coalesce_ after shuffle-oriented transformations leads to OutOfMemoryErrors or _Container killed by YARN for exceeding memory limits. X GB of Y GB physical memory used. Consider boostingspark.yarn.executor.memoryOverhead_. Discussion is [here|http://apache-spark-developers-list.1001551.n3.nabble.com/Coalesce-behaviour-td25289.html]. The error happens when trying specify pretty small number of partitions in _coalesce_ call. *How to reproduce?* # Start spark-shell {code:bash} spark-shell \ --num-executors=5 \ --executor-cores=2 \ --master=yarn \ --deploy-mode=client \ --conf spark.executor.memory=1g \ --conf spark.dynamicAllocation.enabled=false \ --conf spark.executor.extraJavaOptions='-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp -Dio.netty.noUnsafe=true' {code} Please note using _-Dio.netty.noUnsafe=true_ property. Preventing off-heap memory usage seems to be the only way to control the amount of memory used for shuffle data transferring by now. Also note that the total number of cores allocated for job is 5x2=10 # Then generate some test data {code:scala} import org.apache.hadoop.io._ import org.apache.hadoop.io.compress._ import org.apache.commons.lang._ import org.apache.spark._ // generate 100M records of sample data sc.makeRDD(1 to 1000, 1000) .flatMap(item => (1 to 10) .map(i => new Text(RandomStringUtils.randomAlphanumeric(3).toLowerCase) -> new Text(RandomStringUtils.randomAlphanumeric(1024 .saveAsSequenceFile("/tmp/random-strings", Some(classOf[GzipCodec])) {code} # Run the sample job {code:scala} import org.apache.hadoop.io._ import org.apache.spark._ import org.apache.spark.storage._ val rdd = sc.sequenceFile("/tmp/random-strings", classOf[Text], classOf[Text]) rdd .map(item => item._1.toString -> item._2.toString) .repartitionAndSortWithinPartitions(new HashPartitioner(1000)) .coalesce(10,false) .count {code} Note that the number of partitions is equal to the total number of cores allocated to the job. Here is dominator tree from the heapdump !run1-noparams-dominator-tree.png|width=500! was: Trying to use _coalesce_ after shuffle-oriented transformations leads to OutOfMemoryErrors or _Container killed by YARN for exceeding memory limits. X GB of Y GB physical memory used. Consider boostingspark.yarn.executor.memoryOverhead_. Discussion is [here|http://apache-spark-developers-list.1001551.n3.nabble.com/Coalesce-behaviour-td25289.html]. The error happens when trying specify pretty small number of partitions in _coalesce_ call. *How to reproduce?* # Start spark-shell {code:bash} spark-shell \ --num-executors=5 \ --executor-cores=2 \ --master=yarn \ --deploy-mode=client \ --conf spark.executor.memory=1g \ --conf spark.dynamicAllocation.enabled=false \ --conf spark.executor.extraJavaOptions='-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp -Dio.netty.noUnsafe=true' {code} Please note using _-Dio.netty.noUnsafe=true_ property. Preventing off-heap memory usage seems to be the only way to control the amount of memory used for shuffle data transferring by now. Also note that the total number of cores allocated for job is 5x2=10 # Then generate some test data {code:scala} import org.apache.hadoop.io._ import org.apache.hadoop.io.compress._ import org.apache.commons.lang._ import org.apache.spark._ // generate 100M records of sample data sc.makeRDD(1 to 1000, 1000) .flatMap(item => (1 to 10) .map(i => new Text(RandomStringUtils.randomAlphanumeric(3).toLowerCase) -> new Text(RandomStringUtils.randomAlphanumeric(1024 .saveAsSequenceFile("/tmp/random-strings", Some(classOf[GzipCodec])) {code} # Run the sample job {code:scala} import org.apache.hadoop.io._ import org.apache.spark._ import org.apache.spark.storage._ val rdd = sc.sequenceFile("/tmp/random-strings", classOf[Text], classOf[Text]) rdd .map(item => item._1.toString -> item._2.toString) .repartitionAndSortWithinPartitions(new HashPartitioner(1000)) .coalesce(10,false) .count {code} Note that the number of partitions is equal to the total number of cores allocated to the job. > Memory leak of PartitionedPairBuffer when coalescing after > repartitionAndSortWithinPartitions > - > > Key: SPARK-26114 > URL: https://issues.apache.org/jira/browse/SPARK-26114 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.2, 2.3.2, 2.4.0 > Environment: Spark 3.0.0-SNAPSHOT (master branch) > Scala 2.11 > Yarn 2.7 >Reporter: Sergey Zhemzhitsky >
[jira] [Updated] (SPARK-26114) Memory leak of PartitionedPairBuffer when coalescing after repartitionAndSortWithinPartitions
[ https://issues.apache.org/jira/browse/SPARK-26114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Zhemzhitsky updated SPARK-26114: --- Description: Trying to use _coalesce_ after shuffle-oriented transformations leads to OutOfMemoryErrors or _Container killed by YARN for exceeding memory limits. X GB of Y GB physical memory used. Consider boostingspark.yarn.executor.memoryOverhead_. Discussion is [here|http://apache-spark-developers-list.1001551.n3.nabble.com/Coalesce-behaviour-td25289.html]. The error happens when trying specify pretty small number of partitions in _coalesce_ call. *How to reproduce?* # Start spark-shell {code:bash} spark-shell \ --num-executors=5 \ --executor-cores=2 \ --master=yarn \ --deploy-mode=client \ --conf spark.executor.memory=1g \ --conf spark.dynamicAllocation.enabled=false \ --conf spark.executor.extraJavaOptions='-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp -Dio.netty.noUnsafe=true' {code} Please note using _-Dio.netty.noUnsafe=true_ property. Preventing off-heap memory usage seems to be the only way to control the amount of memory used for shuffle data transferring by now. Also note that the total number of cores allocated for job is 5x2=10 # Then generate some test data {code:scala} import org.apache.hadoop.io._ import org.apache.hadoop.io.compress._ import org.apache.commons.lang._ import org.apache.spark._ // generate 100M records of sample data sc.makeRDD(1 to 1000, 1000) .flatMap(item => (1 to 10) .map(i => new Text(RandomStringUtils.randomAlphanumeric(3).toLowerCase) -> new Text(RandomStringUtils.randomAlphanumeric(1024 .saveAsSequenceFile("/tmp/random-strings", Some(classOf[GzipCodec])) {code} # Run the sample job {code:scala} import org.apache.hadoop.io._ import org.apache.spark._ import org.apache.spark.storage._ val rdd = sc.sequenceFile("/tmp/random-strings", classOf[Text], classOf[Text]) rdd .map(item => item._1.toString -> item._2.toString) .repartitionAndSortWithinPartitions(new HashPartitioner(1000)) .coalesce(10,false) .count {code} Note that the number of partitions is equal to the total number of cores allocated to the job. Here is dominator tree from the heapdump !run1-noparams-dominator-tree.png|width=700! 4 instances of ExternalSorter, although there are only 2 concurrently running tasks per executor. !run1-noparams-dominator-tree-externalsorter.png|width=700! And paths to GC root of the already stopped ExternalSorter. !run1-noparams-dominator-tree-externalsorter-gc-root.png|width=700! was: Trying to use _coalesce_ after shuffle-oriented transformations leads to OutOfMemoryErrors or _Container killed by YARN for exceeding memory limits. X GB of Y GB physical memory used. Consider boostingspark.yarn.executor.memoryOverhead_. Discussion is [here|http://apache-spark-developers-list.1001551.n3.nabble.com/Coalesce-behaviour-td25289.html]. The error happens when trying specify pretty small number of partitions in _coalesce_ call. *How to reproduce?* # Start spark-shell {code:bash} spark-shell \ --num-executors=5 \ --executor-cores=2 \ --master=yarn \ --deploy-mode=client \ --conf spark.executor.memory=1g \ --conf spark.dynamicAllocation.enabled=false \ --conf spark.executor.extraJavaOptions='-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp -Dio.netty.noUnsafe=true' {code} Please note using _-Dio.netty.noUnsafe=true_ property. Preventing off-heap memory usage seems to be the only way to control the amount of memory used for shuffle data transferring by now. Also note that the total number of cores allocated for job is 5x2=10 # Then generate some test data {code:scala} import org.apache.hadoop.io._ import org.apache.hadoop.io.compress._ import org.apache.commons.lang._ import org.apache.spark._ // generate 100M records of sample data sc.makeRDD(1 to 1000, 1000) .flatMap(item => (1 to 10) .map(i => new Text(RandomStringUtils.randomAlphanumeric(3).toLowerCase) -> new Text(RandomStringUtils.randomAlphanumeric(1024 .saveAsSequenceFile("/tmp/random-strings", Some(classOf[GzipCodec])) {code} # Run the sample job {code:scala} import org.apache.hadoop.io._ import org.apache.spark._ import org.apache.spark.storage._ val rdd = sc.sequenceFile("/tmp/random-strings", classOf[Text], classOf[Text]) rdd .map(item => item._1.toString -> item._2.toString) .repartitionAndSortWithinPartitions(new HashPartitioner(1000)) .coalesce(10,false) .count {code} Note that the number of partitions is equal to the total number of cores allocated to the job. Here is dominator tree from the heapdump !run1-noparams-dominator-tree.png|width=500! > Memory leak of PartitionedPairBuffer when coalescing after > repartitionAndSortWithinPartitions > - > >
[jira] [Updated] (SPARK-26114) Memory leak of PartitionedPairBuffer when coalescing after repartitionAndSortWithinPartitions
[ https://issues.apache.org/jira/browse/SPARK-26114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Zhemzhitsky updated SPARK-26114: --- Description: Trying to use _coalesce_ after shuffle-oriented transformations leads to OutOfMemoryErrors or _Container killed by YARN for exceeding memory limits. X GB of Y GB physical memory used. Consider boostingspark.yarn.executor.memoryOverhead_. Discussion is [here|http://apache-spark-developers-list.1001551.n3.nabble.com/Coalesce-behaviour-td25289.html]. The error happens when trying specify pretty small number of partitions in _coalesce_ call. *How to reproduce?* # Start spark-shell {code:bash} spark-shell \ --num-executors=5 \ --executor-cores=2 \ --master=yarn \ --deploy-mode=client \ --conf spark.executor.memoryOverhead=512 \ --conf spark.executor.memory=1g \ --conf spark.dynamicAllocation.enabled=false \ --conf spark.executor.extraJavaOptions='-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp -Dio.netty.noUnsafe=true' {code} Please note using _-Dio.netty.noUnsafe=true_ property. Preventing off-heap memory usage seems to be the only way to control the amount of memory used for shuffle data transferring by now. Also note that the total number of cores allocated for job is 5x2=10 # Then generate some test data {code:scala} import org.apache.hadoop.io._ import org.apache.hadoop.io.compress._ import org.apache.commons.lang._ import org.apache.spark._ // generate 100M records of sample data sc.makeRDD(1 to 1000, 1000) .flatMap(item => (1 to 10) .map(i => new Text(RandomStringUtils.randomAlphanumeric(3).toLowerCase) -> new Text(RandomStringUtils.randomAlphanumeric(1024 .saveAsSequenceFile("/tmp/random-strings", Some(classOf[GzipCodec])) {code} # Run the sample job {code:scala} import org.apache.hadoop.io._ import org.apache.spark._ import org.apache.spark.storage._ val rdd = sc.sequenceFile("/tmp/random-strings", classOf[Text], classOf[Text]) rdd .map(item => item._1.toString -> item._2.toString) .repartitionAndSortWithinPartitions(new HashPartitioner(1000)) .coalesce(10,false) .count {code} Note that the number of partitions is equal to the total number of cores allocated to the job. Here is dominator tree from the heapdump !run1-noparams-dominator-tree.png|width=700! 4 instances of ExternalSorter, although there are only 2 concurrently running tasks per executor. !run1-noparams-dominator-tree-externalsorter.png|width=700! And paths to GC root of the already stopped ExternalSorter. !run1-noparams-dominator-tree-externalsorter-gc-root.png|width=700! was: Trying to use _coalesce_ after shuffle-oriented transformations leads to OutOfMemoryErrors or _Container killed by YARN for exceeding memory limits. X GB of Y GB physical memory used. Consider boostingspark.yarn.executor.memoryOverhead_. Discussion is [here|http://apache-spark-developers-list.1001551.n3.nabble.com/Coalesce-behaviour-td25289.html]. The error happens when trying specify pretty small number of partitions in _coalesce_ call. *How to reproduce?* # Start spark-shell {code:bash} spark-shell \ --num-executors=5 \ --executor-cores=2 \ --master=yarn \ --deploy-mode=client \ --conf spark.executor.memory=1g \ --conf spark.dynamicAllocation.enabled=false \ --conf spark.executor.extraJavaOptions='-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp -Dio.netty.noUnsafe=true' {code} Please note using _-Dio.netty.noUnsafe=true_ property. Preventing off-heap memory usage seems to be the only way to control the amount of memory used for shuffle data transferring by now. Also note that the total number of cores allocated for job is 5x2=10 # Then generate some test data {code:scala} import org.apache.hadoop.io._ import org.apache.hadoop.io.compress._ import org.apache.commons.lang._ import org.apache.spark._ // generate 100M records of sample data sc.makeRDD(1 to 1000, 1000) .flatMap(item => (1 to 10) .map(i => new Text(RandomStringUtils.randomAlphanumeric(3).toLowerCase) -> new Text(RandomStringUtils.randomAlphanumeric(1024 .saveAsSequenceFile("/tmp/random-strings", Some(classOf[GzipCodec])) {code} # Run the sample job {code:scala} import org.apache.hadoop.io._ import org.apache.spark._ import org.apache.spark.storage._ val rdd = sc.sequenceFile("/tmp/random-strings", classOf[Text], classOf[Text]) rdd .map(item => item._1.toString -> item._2.toString) .repartitionAndSortWithinPartitions(new HashPartitioner(1000)) .coalesce(10,false) .count {code} Note that the number of partitions is equal to the total number of cores allocated to the job. Here is dominator tree from the heapdump !run1-noparams-dominator-tree.png|width=700! 4 instances of ExternalSorter, although there are only 2 concurrently running tasks per executor. !run1-noparams-dominator-tree-externalsorter.png|width=700!
[jira] [Created] (SPARK-27641) Unregistering a single Metrics Source with no metrics leads to removing all the from other sources with the same name
Sergey Zhemzhitsky created SPARK-27641: -- Summary: Unregistering a single Metrics Source with no metrics leads to removing all the from other sources with the same name Key: SPARK-27641 URL: https://issues.apache.org/jira/browse/SPARK-27641 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.4.2, 2.3.3, 2.2.3 Reporter: Sergey Zhemzhitsky Currently Spark allows registering multiple Metric Sources with the same source name like the following {code:scala} val acc1 = sc.longAccumulator LongAccumulatorSource.register(sc, {"acc1" -> acc1}) val acc2 = sc.longAccumulator LongAccumulatorSource.register(sc, {"acc2" -> acc2}) {code} In that case there are two metric sources registered and both of these sources have the same name - [AccumulatorSource|https://github.com/apache/spark/blob/6ef45301a46c47c12fbc74bb9ceaffea685ed944/core/src/main/scala/org/apache/spark/metrics/source/AccumulatorSource.scala#L47] If you try to unregister the source with no accumulators and metrics registered like the following {code:scala} SparkEnv.get.metricsSystem.removeSource(new LongAccumulatorSource) {code} ... then all the metrics for all the sources with the same name will be unregistered because of the [following|https://github.com/apache/spark/blob/6ef45301a46c47c12fbc74bb9ceaffea685ed944/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala#L171] snippet which removes all matching records which start with the corresponding prefix which includes the source name, but does not include metric name to be removed. {code:scala} def removeSource(source: Source) { sources -= source val regName = buildRegistryName(source) registry.removeMatching((name: String, _: Metric) => name.startsWith(regName)) } {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27641) Unregistering a single Metrics Source with no metrics leads to removing all the metrics from other sources with the same name
[ https://issues.apache.org/jira/browse/SPARK-27641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Zhemzhitsky updated SPARK-27641: --- Summary: Unregistering a single Metrics Source with no metrics leads to removing all the metrics from other sources with the same name (was: Unregistering a single Metrics Source with no metrics leads to removing all the from other sources with the same name) > Unregistering a single Metrics Source with no metrics leads to removing all > the metrics from other sources with the same name > - > > Key: SPARK-27641 > URL: https://issues.apache.org/jira/browse/SPARK-27641 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.3, 2.3.3, 2.4.2 >Reporter: Sergey Zhemzhitsky >Priority: Major > > Currently Spark allows registering multiple Metric Sources with the same > source name like the following > {code:scala} > val acc1 = sc.longAccumulator > LongAccumulatorSource.register(sc, {"acc1" -> acc1}) > val acc2 = sc.longAccumulator > LongAccumulatorSource.register(sc, {"acc2" -> acc2}) > {code} > In that case there are two metric sources registered and both of these > sources have the same name - > [AccumulatorSource|https://github.com/apache/spark/blob/6ef45301a46c47c12fbc74bb9ceaffea685ed944/core/src/main/scala/org/apache/spark/metrics/source/AccumulatorSource.scala#L47] > If you try to unregister the source with no accumulators and metrics > registered like the following > {code:scala} > SparkEnv.get.metricsSystem.removeSource(new LongAccumulatorSource) > {code} > ... then all the metrics for all the sources with the same name will be > unregistered because of the > [following|https://github.com/apache/spark/blob/6ef45301a46c47c12fbc74bb9ceaffea685ed944/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala#L171] > snippet which removes all matching records which start with the > corresponding prefix which includes the source name, but does not include > metric name to be removed. > {code:scala} > def removeSource(source: Source) { > sources -= source > val regName = buildRegistryName(source) > registry.removeMatching((name: String, _: Metric) => > name.startsWith(regName)) > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org