[jira] [Updated] (SPARK-15943) Spark driver hangs up periodically (cannot receive any reply in 120 seconds)

2016-06-14 Thread Sergey Zhemzhitsky (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-15943?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Zhemzhitsky updated SPARK-15943:
---
Attachment: executor-threaddump.log
driver-stacktrace.log

> Spark driver hangs up periodically (cannot receive any reply in 120 seconds)
> 
>
> Key: SPARK-15943
> URL: https://issues.apache.org/jira/browse/SPARK-15943
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.6.0
> Environment: Ubuntu Server 14.04
> Spark 1.6.0
> Hadoop 2.6.0
>Reporter: Sergey Zhemzhitsky
>Priority: Critical
>  Labels: spark
> Attachments: driver-stacktrace.log, executor-threaddump.log
>
>
> Spark driver hangs up periodically with the following stacktrace on driver 
> [^driver-stacktrace.log] and threaddump on the executor that causes this 
> issue [^executor-threaddump.log]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-15943) Spark driver hangs up periodically (cannot receive any reply in 120 seconds)

2016-06-14 Thread Sergey Zhemzhitsky (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-15943?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Zhemzhitsky updated SPARK-15943:
---
Attachment: executor-threaddump-jstack.log

> Spark driver hangs up periodically (cannot receive any reply in 120 seconds)
> 
>
> Key: SPARK-15943
> URL: https://issues.apache.org/jira/browse/SPARK-15943
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.6.0
> Environment: Ubuntu Server 14.04
> Spark 1.6.0
> Hadoop 2.6.0
>Reporter: Sergey Zhemzhitsky
>Priority: Critical
>  Labels: spark
> Attachments: driver-stacktrace.log, executor-threaddump-jstack.log, 
> executor-threaddump.log
>
>
> Spark driver hangs up periodically with the following stacktrace on driver 
> [^driver-stacktrace.log] and threaddump on the executor that causes this 
> issue [^executor-threaddump.log]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-15943) Spark driver hangs up periodically (cannot receive any reply in 120 seconds)

2016-06-14 Thread Sergey Zhemzhitsky (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-15943?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Zhemzhitsky updated SPARK-15943:
---
Description: 
Spark driver hangs up periodically with the following stacktrace on driver 
[^driver-stacktrace.log] and threaddump on the executor that causes this issue 
[^executor-threaddump.log]

The attached [^executor-threaddump.log] has been retrieved by means of spark 
user interface. Trying to dump the threads by means of jstack allowed us to 
retrieve only the following stacktrace [^]


  was:
Spark driver hangs up periodically with the following stacktrace on driver 
[^driver-stacktrace.log] and threaddump on the executor that causes this issue 
[^executor-threaddump.log]




> Spark driver hangs up periodically (cannot receive any reply in 120 seconds)
> 
>
> Key: SPARK-15943
> URL: https://issues.apache.org/jira/browse/SPARK-15943
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.6.0
> Environment: Ubuntu Server 14.04
> Spark 1.6.0
> Hadoop 2.6.0
>Reporter: Sergey Zhemzhitsky
>Priority: Critical
>  Labels: spark
> Attachments: driver-stacktrace.log, executor-threaddump-jstack.log, 
> executor-threaddump.log
>
>
> Spark driver hangs up periodically with the following stacktrace on driver 
> [^driver-stacktrace.log] and threaddump on the executor that causes this 
> issue [^executor-threaddump.log]
> The attached [^executor-threaddump.log] has been retrieved by means of spark 
> user interface. Trying to dump the threads by means of jstack allowed us to 
> retrieve only the following stacktrace [^]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-15943) Spark driver hangs up periodically (cannot receive any reply in 120 seconds)

2016-06-14 Thread Sergey Zhemzhitsky (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15329581#comment-15329581
 ] 

Sergey Zhemzhitsky commented on SPARK-15943:


Will try to use *spark.memory.useLegacyMode* to see if it helps until spark 
1.6.2 is released.

> Spark driver hangs up periodically (cannot receive any reply in 120 seconds)
> 
>
> Key: SPARK-15943
> URL: https://issues.apache.org/jira/browse/SPARK-15943
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.6.0
> Environment: Ubuntu Server 14.04
> Spark 1.6.0
> Hadoop 2.6.0
>Reporter: Sergey Zhemzhitsky
>Priority: Critical
>  Labels: spark
> Attachments: driver-stacktrace.log, executor-threaddump-jstack.log, 
> executor-threaddump.log
>
>
> Spark driver hangs up periodically with the following stacktrace on driver 
> [^driver-stacktrace.log] and threaddump on the executor that causes this 
> issue [^executor-threaddump.log]
> The attached [^executor-threaddump.log] has been retrieved by means of spark 
> user interface. Trying to dump the threads by means of *jstack -l -F * 
> allowed us to retrieve only the following stacktrace 
> [^executor-threaddump-jstack.log]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-15943) Spark driver hangs up periodically (cannot receive any reply in 120 seconds)

2016-06-14 Thread Sergey Zhemzhitsky (JIRA)
Sergey Zhemzhitsky created SPARK-15943:
--

 Summary: Spark driver hangs up periodically (cannot receive any 
reply in 120 seconds)
 Key: SPARK-15943
 URL: https://issues.apache.org/jira/browse/SPARK-15943
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.6.0
 Environment: Ubuntu Server 14.04
Spark 1.6.0
Hadoop 2.6.0
Reporter: Sergey Zhemzhitsky
Priority: Critical


Spark driver hangs up periodically with the following stacktrace on driver 
[^driver-stacktrace.log] and threaddump on the executor that causes this issue 
[^executor-threaddump.log]





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-15943) Spark driver hangs up periodically (cannot receive any reply in 120 seconds)

2016-06-14 Thread Sergey Zhemzhitsky (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-15943?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Zhemzhitsky updated SPARK-15943:
---
Description: 
Spark driver hangs up periodically with the following stacktrace on driver 
[^driver-stacktrace.log] and threaddump on the executor that causes this issue 
[^executor-threaddump.log]

The attached [^executor-threaddump.log] has been retrieved by means of spark 
user interface. Trying to dump the threads by means of *jstack -l -F * 
allowed us to retrieve only the following stacktrace 
[^executor-threaddump-jstack.log]


  was:
Spark driver hangs up periodically with the following stacktrace on driver 
[^driver-stacktrace.log] and threaddump on the executor that causes this issue 
[^executor-threaddump.log]

The attached [^executor-threaddump.log] has been retrieved by means of spark 
user interface. Trying to dump the threads by means of jstack allowed us to 
retrieve only the following stacktrace [^]



> Spark driver hangs up periodically (cannot receive any reply in 120 seconds)
> 
>
> Key: SPARK-15943
> URL: https://issues.apache.org/jira/browse/SPARK-15943
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.6.0
> Environment: Ubuntu Server 14.04
> Spark 1.6.0
> Hadoop 2.6.0
>Reporter: Sergey Zhemzhitsky
>Priority: Critical
>  Labels: spark
> Attachments: driver-stacktrace.log, executor-threaddump-jstack.log, 
> executor-threaddump.log
>
>
> Spark driver hangs up periodically with the following stacktrace on driver 
> [^driver-stacktrace.log] and threaddump on the executor that causes this 
> issue [^executor-threaddump.log]
> The attached [^executor-threaddump.log] has been retrieved by means of spark 
> user interface. Trying to dump the threads by means of *jstack -l -F * 
> allowed us to retrieve only the following stacktrace 
> [^executor-threaddump-jstack.log]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-15943) Spark driver hangs up periodically (cannot receive any reply in 120 seconds)

2016-06-14 Thread Sergey Zhemzhitsky (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-15943?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Zhemzhitsky updated SPARK-15943:
---
Priority: Major  (was: Critical)

> Spark driver hangs up periodically (cannot receive any reply in 120 seconds)
> 
>
> Key: SPARK-15943
> URL: https://issues.apache.org/jira/browse/SPARK-15943
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.6.0
> Environment: Ubuntu Server 14.04
> Spark 1.6.0
> Hadoop 2.6.0
>Reporter: Sergey Zhemzhitsky
>  Labels: spark
> Attachments: driver-stacktrace.log, executor-threaddump-jstack.log, 
> executor-threaddump.log
>
>
> Spark driver hangs up periodically with the following stacktrace on driver 
> [^driver-stacktrace.log] and threaddump on the executor that causes this 
> issue [^executor-threaddump.log]
> The attached [^executor-threaddump.log] has been retrieved by means of spark 
> user interface. Trying to dump the threads by means of *jstack -l -F * 
> allowed us to retrieve only the following stacktrace 
> [^executor-threaddump-jstack.log]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-20404) Regression with accumulator names when migrating from 1.6 to 2.x

2017-04-20 Thread Sergey Zhemzhitsky (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Zhemzhitsky updated SPARK-20404:
---
Environment: 
Spark: 2.1
Scala: 2.11
Spark master: local

> Regression with accumulator names when migrating from 1.6 to 2.x
> 
>
> Key: SPARK-20404
> URL: https://issues.apache.org/jira/browse/SPARK-20404
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.0.0, 2.0.1, 2.0.2, 2.1.0
> Environment: Spark: 2.1
> Scala: 2.11
> Spark master: local
>Reporter: Sergey Zhemzhitsky
> Attachments: spark-context-accum-option.patch
>
>
> Creating accumulator with explicitly specified name equal to _null_, like the 
> following
> {code:java}
> sparkContext.accumulator(0, null)
> {code}
> throws exception at runtime
> {code:none}
> ERROR | DAGScheduler | dag-scheduler-event-loop | Failed to update 
> accumulators for task 0
> java.lang.NullPointerException
>   at 
> org.apache.spark.util.AccumulatorV2$$anonfun$1.apply(AccumulatorV2.scala:106)
>   at 
> org.apache.spark.util.AccumulatorV2$$anonfun$1.apply(AccumulatorV2.scala:106)
>   at scala.Option.exists(Option.scala:240)
>   at org.apache.spark.util.AccumulatorV2.toInfo(AccumulatorV2.scala:106)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1091)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1080)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>   at 
> org.apache.spark.scheduler.DAGScheduler.updateAccumulators(DAGScheduler.scala:1080)
>   at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1183)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1647)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
>   at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> {code}
> The issue is in wrapping name into _Some_ instead of _Option_ when creating 
> accumulators.
> Patch is available.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20404) Regression with accumulator names when migrating from 1.6 to 2.x

2017-04-20 Thread Sergey Zhemzhitsky (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15976506#comment-15976506
 ] 

Sergey Zhemzhitsky commented on SPARK-20404:


I would agree, if the error occurs just at the time of creating the 
accumulator, but in this case the error may occur at any time (possibly after 
hours of running the job) when you will try to update the accumulator. So, for 
me the current behaviour seems to be misleading and confusing, rather than an 
expected one.

> Regression with accumulator names when migrating from 1.6 to 2.x
> 
>
> Key: SPARK-20404
> URL: https://issues.apache.org/jira/browse/SPARK-20404
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.0.0, 2.0.1, 2.0.2, 2.1.0
> Environment: Spark: 2.1
> Scala: 2.11
> Spark master: local
>Reporter: Sergey Zhemzhitsky
> Attachments: spark-context-accum-option.patch
>
>
> Creating accumulator with explicitly specified name equal to _null_, like the 
> following
> {code:java}
> sparkContext.accumulator(0, null)
> {code}
> throws exception at runtime
> {code:none}
> ERROR | DAGScheduler | dag-scheduler-event-loop | Failed to update 
> accumulators for task 0
> java.lang.NullPointerException
>   at 
> org.apache.spark.util.AccumulatorV2$$anonfun$1.apply(AccumulatorV2.scala:106)
>   at 
> org.apache.spark.util.AccumulatorV2$$anonfun$1.apply(AccumulatorV2.scala:106)
>   at scala.Option.exists(Option.scala:240)
>   at org.apache.spark.util.AccumulatorV2.toInfo(AccumulatorV2.scala:106)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1091)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1080)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>   at 
> org.apache.spark.scheduler.DAGScheduler.updateAccumulators(DAGScheduler.scala:1080)
>   at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1183)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1647)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
>   at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> {code}
> The issue is in wrapping name into _Some_ instead of _Option_ when creating 
> accumulators.
> Patch is available.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20404) Regression with accumulator names when migrating from 1.6 to 2.x

2017-04-20 Thread Sergey Zhemzhitsky (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15976400#comment-15976400
 ] 

Sergey Zhemzhitsky commented on SPARK-20404:


Is it a normal behavior for spark 2+? It seems that using Option to wrap a 
value, that may or may not be null is more appropriate way, than using Some.

> Regression with accumulator names when migrating from 1.6 to 2.x
> 
>
> Key: SPARK-20404
> URL: https://issues.apache.org/jira/browse/SPARK-20404
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.0.0, 2.0.1, 2.0.2, 2.1.0
> Environment: Spark: 2.1
> Scala: 2.11
> Spark master: local
>Reporter: Sergey Zhemzhitsky
> Attachments: spark-context-accum-option.patch
>
>
> Creating accumulator with explicitly specified name equal to _null_, like the 
> following
> {code:java}
> sparkContext.accumulator(0, null)
> {code}
> throws exception at runtime
> {code:none}
> ERROR | DAGScheduler | dag-scheduler-event-loop | Failed to update 
> accumulators for task 0
> java.lang.NullPointerException
>   at 
> org.apache.spark.util.AccumulatorV2$$anonfun$1.apply(AccumulatorV2.scala:106)
>   at 
> org.apache.spark.util.AccumulatorV2$$anonfun$1.apply(AccumulatorV2.scala:106)
>   at scala.Option.exists(Option.scala:240)
>   at org.apache.spark.util.AccumulatorV2.toInfo(AccumulatorV2.scala:106)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1091)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1080)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>   at 
> org.apache.spark.scheduler.DAGScheduler.updateAccumulators(DAGScheduler.scala:1080)
>   at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1183)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1647)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
>   at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> {code}
> The issue is in wrapping name into _Some_ instead of _Option_ when creating 
> accumulators.
> Patch is available.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-20404) Regression with accumulator names when migrating from 1.6 to 2.x

2017-04-20 Thread Sergey Zhemzhitsky (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Zhemzhitsky updated SPARK-20404:
---
Attachment: (was: spark-context-accum-option.patch)

> Regression with accumulator names when migrating from 1.6 to 2.x
> 
>
> Key: SPARK-20404
> URL: https://issues.apache.org/jira/browse/SPARK-20404
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.0.0, 2.0.1, 2.0.2, 2.1.0
> Environment: Spark: 2.1
> Scala: 2.11
> Spark master: local
>Reporter: Sergey Zhemzhitsky
> Attachments: spark-context-accum-option.patch
>
>
> Creating accumulator with explicitly specified name equal to _null_, like the 
> following
> {code:java}
> sparkContext.accumulator(0, null)
> {code}
> throws exception at runtime
> {code:none}
> ERROR | DAGScheduler | dag-scheduler-event-loop | Failed to update 
> accumulators for task 0
> java.lang.NullPointerException
>   at 
> org.apache.spark.util.AccumulatorV2$$anonfun$1.apply(AccumulatorV2.scala:106)
>   at 
> org.apache.spark.util.AccumulatorV2$$anonfun$1.apply(AccumulatorV2.scala:106)
>   at scala.Option.exists(Option.scala:240)
>   at org.apache.spark.util.AccumulatorV2.toInfo(AccumulatorV2.scala:106)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1091)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1080)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>   at 
> org.apache.spark.scheduler.DAGScheduler.updateAccumulators(DAGScheduler.scala:1080)
>   at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1183)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1647)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
>   at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> {code}
> The issue is in wrapping name into _Some_ instead of _Option_ when creating 
> accumulators.
> Patch is available.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-20404) Regression with accumulator names when migrating from 1.6 to 2.x

2017-04-20 Thread Sergey Zhemzhitsky (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Zhemzhitsky updated SPARK-20404:
---
Attachment: spark-context-accum-option.patch

> Regression with accumulator names when migrating from 1.6 to 2.x
> 
>
> Key: SPARK-20404
> URL: https://issues.apache.org/jira/browse/SPARK-20404
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.0.0, 2.0.1, 2.0.2, 2.1.0
> Environment: Spark: 2.1
> Scala: 2.11
> Spark master: local
>Reporter: Sergey Zhemzhitsky
> Attachments: spark-context-accum-option.patch
>
>
> Creating accumulator with explicitly specified name equal to _null_, like the 
> following
> {code:java}
> sparkContext.accumulator(0, null)
> {code}
> throws exception at runtime
> {code:none}
> ERROR | DAGScheduler | dag-scheduler-event-loop | Failed to update 
> accumulators for task 0
> java.lang.NullPointerException
>   at 
> org.apache.spark.util.AccumulatorV2$$anonfun$1.apply(AccumulatorV2.scala:106)
>   at 
> org.apache.spark.util.AccumulatorV2$$anonfun$1.apply(AccumulatorV2.scala:106)
>   at scala.Option.exists(Option.scala:240)
>   at org.apache.spark.util.AccumulatorV2.toInfo(AccumulatorV2.scala:106)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1091)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1080)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>   at 
> org.apache.spark.scheduler.DAGScheduler.updateAccumulators(DAGScheduler.scala:1080)
>   at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1183)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1647)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
>   at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> {code}
> The issue is in wrapping name into _Some_ instead of _Option_ when creating 
> accumulators.
> Patch is available.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20404) Regression with accumulator names when migrating from 1.6 to 2.x

2017-04-20 Thread Sergey Zhemzhitsky (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15976726#comment-15976726
 ] 

Sergey Zhemzhitsky commented on SPARK-20404:


Thanks for drawing attention to this. Just updated the patch.

> Regression with accumulator names when migrating from 1.6 to 2.x
> 
>
> Key: SPARK-20404
> URL: https://issues.apache.org/jira/browse/SPARK-20404
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.0.0, 2.0.1, 2.0.2, 2.1.0
> Environment: Spark: 2.1
> Scala: 2.11
> Spark master: local
>Reporter: Sergey Zhemzhitsky
> Attachments: spark-context-accum-option.patch
>
>
> Creating accumulator with explicitly specified name equal to _null_, like the 
> following
> {code:java}
> sparkContext.accumulator(0, null)
> {code}
> throws exception at runtime
> {code:none}
> ERROR | DAGScheduler | dag-scheduler-event-loop | Failed to update 
> accumulators for task 0
> java.lang.NullPointerException
>   at 
> org.apache.spark.util.AccumulatorV2$$anonfun$1.apply(AccumulatorV2.scala:106)
>   at 
> org.apache.spark.util.AccumulatorV2$$anonfun$1.apply(AccumulatorV2.scala:106)
>   at scala.Option.exists(Option.scala:240)
>   at org.apache.spark.util.AccumulatorV2.toInfo(AccumulatorV2.scala:106)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1091)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1080)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>   at 
> org.apache.spark.scheduler.DAGScheduler.updateAccumulators(DAGScheduler.scala:1080)
>   at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1183)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1647)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
>   at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> {code}
> The issue is in wrapping name into _Some_ instead of _Option_ when creating 
> accumulators.
> Patch is available.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-20404) Regression with accumulator names when migrating from 1.6 to 2.x

2017-04-20 Thread Sergey Zhemzhitsky (JIRA)
Sergey Zhemzhitsky created SPARK-20404:
--

 Summary: Regression with accumulator names when migrating from 1.6 
to 2.x
 Key: SPARK-20404
 URL: https://issues.apache.org/jira/browse/SPARK-20404
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.1.0, 2.0.2, 2.0.1, 2.0.0
Reporter: Sergey Zhemzhitsky


Creating accumulator with explicitly specified name equal to _null_, like the 
following
{code:java}
sparkContext.accumulator(0, null)
{code}
throws exception at runtime
{code:none}
ERROR | DAGScheduler | dag-scheduler-event-loop | Failed to update accumulators 
for task 0
java.lang.NullPointerException
at 
org.apache.spark.util.AccumulatorV2$$anonfun$1.apply(AccumulatorV2.scala:106)
at 
org.apache.spark.util.AccumulatorV2$$anonfun$1.apply(AccumulatorV2.scala:106)
at scala.Option.exists(Option.scala:240)
at org.apache.spark.util.AccumulatorV2.toInfo(AccumulatorV2.scala:106)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1091)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1080)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at 
org.apache.spark.scheduler.DAGScheduler.updateAccumulators(DAGScheduler.scala:1080)
at 
org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1183)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1647)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
{code}

The issue is in wrapping name into `Some` instead of `Option` when creating 
accumulators.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-20404) Regression with accumulator names when migrating from 1.6 to 2.x

2017-04-20 Thread Sergey Zhemzhitsky (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Zhemzhitsky updated SPARK-20404:
---
Description: 
Creating accumulator with explicitly specified name equal to _null_, like the 
following
{code:java}
sparkContext.accumulator(0, null)
{code}
throws exception at runtime
{code:none}
ERROR | DAGScheduler | dag-scheduler-event-loop | Failed to update accumulators 
for task 0
java.lang.NullPointerException
at 
org.apache.spark.util.AccumulatorV2$$anonfun$1.apply(AccumulatorV2.scala:106)
at 
org.apache.spark.util.AccumulatorV2$$anonfun$1.apply(AccumulatorV2.scala:106)
at scala.Option.exists(Option.scala:240)
at org.apache.spark.util.AccumulatorV2.toInfo(AccumulatorV2.scala:106)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1091)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1080)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at 
org.apache.spark.scheduler.DAGScheduler.updateAccumulators(DAGScheduler.scala:1080)
at 
org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1183)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1647)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
{code}

The issue is in wrapping name into _Some_ instead of _Option_ when creating 
accumulators.

Patch is available.

  was:
Creating accumulator with explicitly specified name equal to _null_, like the 
following
{code:java}
sparkContext.accumulator(0, null)
{code}
throws exception at runtime
{code:none}
ERROR | DAGScheduler | dag-scheduler-event-loop | Failed to update accumulators 
for task 0
java.lang.NullPointerException
at 
org.apache.spark.util.AccumulatorV2$$anonfun$1.apply(AccumulatorV2.scala:106)
at 
org.apache.spark.util.AccumulatorV2$$anonfun$1.apply(AccumulatorV2.scala:106)
at scala.Option.exists(Option.scala:240)
at org.apache.spark.util.AccumulatorV2.toInfo(AccumulatorV2.scala:106)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1091)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1080)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at 
org.apache.spark.scheduler.DAGScheduler.updateAccumulators(DAGScheduler.scala:1080)
at 
org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1183)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1647)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
{code}

The issue is in wrapping name into `Some` instead of `Option` when creating 
accumulators.

Patch is available.


> Regression with accumulator names when migrating from 1.6 to 2.x
> 
>
> Key: SPARK-20404
> URL: https://issues.apache.org/jira/browse/SPARK-20404
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.0.0, 2.0.1, 2.0.2, 2.1.0
>Reporter: Sergey Zhemzhitsky
> Attachments: spark-context-accum-option.patch
>
>
> Creating accumulator with explicitly specified name equal to _null_, like the 
> following
> {code:java}
> sparkContext.accumulator(0, null)
> {code}
> throws exception at runtime
> {code:none}
> ERROR | DAGScheduler | dag-scheduler-event-loop | Failed to update 
> accumulators for task 0
> java.lang.NullPointerException
>   at 
> org.apache.spark.util.AccumulatorV2$$anonfun$1.apply(AccumulatorV2.scala:106)
>   at 
> org.apache.spark.util.AccumulatorV2$$anonfun$1.apply(AccumulatorV2.scala:106)
>   at scala.Option.exists(Option.scala:240)
>   at org.apache.spark.util.AccumulatorV2.toInfo(AccumulatorV2.scala:106)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1091)
>   at 
> 

[jira] [Updated] (SPARK-20404) Regression with accumulator names when migrating from 1.6 to 2.x

2017-04-20 Thread Sergey Zhemzhitsky (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Zhemzhitsky updated SPARK-20404:
---
Attachment: spark-context-accum-option.patch

> Regression with accumulator names when migrating from 1.6 to 2.x
> 
>
> Key: SPARK-20404
> URL: https://issues.apache.org/jira/browse/SPARK-20404
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.0.0, 2.0.1, 2.0.2, 2.1.0
>Reporter: Sergey Zhemzhitsky
> Attachments: spark-context-accum-option.patch
>
>
> Creating accumulator with explicitly specified name equal to _null_, like the 
> following
> {code:java}
> sparkContext.accumulator(0, null)
> {code}
> throws exception at runtime
> {code:none}
> ERROR | DAGScheduler | dag-scheduler-event-loop | Failed to update 
> accumulators for task 0
> java.lang.NullPointerException
>   at 
> org.apache.spark.util.AccumulatorV2$$anonfun$1.apply(AccumulatorV2.scala:106)
>   at 
> org.apache.spark.util.AccumulatorV2$$anonfun$1.apply(AccumulatorV2.scala:106)
>   at scala.Option.exists(Option.scala:240)
>   at org.apache.spark.util.AccumulatorV2.toInfo(AccumulatorV2.scala:106)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1091)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1080)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>   at 
> org.apache.spark.scheduler.DAGScheduler.updateAccumulators(DAGScheduler.scala:1080)
>   at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1183)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1647)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
>   at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> {code}
> The issue is in wrapping name into _Some_ instead of _Option_ when creating 
> accumulators.
> Patch is available.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-20404) Regression with accumulator names when migrating from 1.6 to 2.x

2017-04-20 Thread Sergey Zhemzhitsky (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Zhemzhitsky updated SPARK-20404:
---
  Flags: Patch
Description: 
Creating accumulator with explicitly specified name equal to _null_, like the 
following
{code:java}
sparkContext.accumulator(0, null)
{code}
throws exception at runtime
{code:none}
ERROR | DAGScheduler | dag-scheduler-event-loop | Failed to update accumulators 
for task 0
java.lang.NullPointerException
at 
org.apache.spark.util.AccumulatorV2$$anonfun$1.apply(AccumulatorV2.scala:106)
at 
org.apache.spark.util.AccumulatorV2$$anonfun$1.apply(AccumulatorV2.scala:106)
at scala.Option.exists(Option.scala:240)
at org.apache.spark.util.AccumulatorV2.toInfo(AccumulatorV2.scala:106)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1091)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1080)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at 
org.apache.spark.scheduler.DAGScheduler.updateAccumulators(DAGScheduler.scala:1080)
at 
org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1183)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1647)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
{code}

The issue is in wrapping name into `Some` instead of `Option` when creating 
accumulators.

Patch is available.

  was:
Creating accumulator with explicitly specified name equal to _null_, like the 
following
{code:java}
sparkContext.accumulator(0, null)
{code}
throws exception at runtime
{code:none}
ERROR | DAGScheduler | dag-scheduler-event-loop | Failed to update accumulators 
for task 0
java.lang.NullPointerException
at 
org.apache.spark.util.AccumulatorV2$$anonfun$1.apply(AccumulatorV2.scala:106)
at 
org.apache.spark.util.AccumulatorV2$$anonfun$1.apply(AccumulatorV2.scala:106)
at scala.Option.exists(Option.scala:240)
at org.apache.spark.util.AccumulatorV2.toInfo(AccumulatorV2.scala:106)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1091)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1080)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at 
org.apache.spark.scheduler.DAGScheduler.updateAccumulators(DAGScheduler.scala:1080)
at 
org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1183)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1647)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
{code}

The issue is in wrapping name into `Some` instead of `Option` when creating 
accumulators.


> Regression with accumulator names when migrating from 1.6 to 2.x
> 
>
> Key: SPARK-20404
> URL: https://issues.apache.org/jira/browse/SPARK-20404
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.0.0, 2.0.1, 2.0.2, 2.1.0
>Reporter: Sergey Zhemzhitsky
> Attachments: spark-context-accum-option.patch
>
>
> Creating accumulator with explicitly specified name equal to _null_, like the 
> following
> {code:java}
> sparkContext.accumulator(0, null)
> {code}
> throws exception at runtime
> {code:none}
> ERROR | DAGScheduler | dag-scheduler-event-loop | Failed to update 
> accumulators for task 0
> java.lang.NullPointerException
>   at 
> org.apache.spark.util.AccumulatorV2$$anonfun$1.apply(AccumulatorV2.scala:106)
>   at 
> org.apache.spark.util.AccumulatorV2$$anonfun$1.apply(AccumulatorV2.scala:106)
>   at scala.Option.exists(Option.scala:240)
>   at org.apache.spark.util.AccumulatorV2.toInfo(AccumulatorV2.scala:106)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1091)
>   at 
> 

[jira] [Updated] (SPARK-21549) Spark fails to complete job correctly in case of OutputFormat which do not write into hdfs

2017-07-28 Thread Sergey Zhemzhitsky (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Zhemzhitsky updated SPARK-21549:
---
Priority: Blocker  (was: Critical)

> Spark fails to complete job correctly in case of OutputFormat which do not 
> write into hdfs
> --
>
> Key: SPARK-21549
> URL: https://issues.apache.org/jira/browse/SPARK-21549
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
> Environment: spark 2.2.0
> scala 2.11
>Reporter: Sergey Zhemzhitsky
>Priority: Blocker
>
> Spark fails to complete job correctly in case of custom OutputFormat 
> implementations.
> There are OutputFormat implementations which do not need to use 
> *mapreduce.output.fileoutputformat.outputdir* standard hadoop property.
> [But spark reads this property from the 
> configuration|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala#L79]
>  while setting up an OutputCommitter
> {code:javascript}
> val committer = FileCommitProtocol.instantiate(
>   className = classOf[HadoopMapReduceCommitProtocol].getName,
>   jobId = stageId.toString,
>   outputPath = conf.value.get("mapreduce.output.fileoutputformat.outputdir"),
>   isAppend = false).asInstanceOf[HadoopMapReduceCommitProtocol]
> committer.setupJob(jobContext)
> {code}
> ... and then uses this property later on while [commiting the 
> job|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L132],
>  [aborting the 
> job|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L141],
>  [creating task's temp 
> path|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L95]
> In that cases when the job completes then following exception is thrown
> {code}
> Can not create a Path from a null string
> java.lang.IllegalArgumentException: Can not create a Path from a null string
>   at org.apache.hadoop.fs.Path.checkPathArg(Path.java:123)
>   at org.apache.hadoop.fs.Path.(Path.java:135)
>   at org.apache.hadoop.fs.Path.(Path.java:89)
>   at 
> org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.absPathStagingDir(HadoopMapReduceCommitProtocol.scala:58)
>   at 
> org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.abortJob(HadoopMapReduceCommitProtocol.scala:141)
>   at 
> org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.write(SparkHadoopMapReduceWriter.scala:106)
>   at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1085)
>   at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)
>   at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>   at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
>   at 
> org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1084)
>   ...
> {code}
> So it seems that all the jobs which use OutputFormats which don't write data 
> into HDFS-compatible file systems are broken.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-21549) Spark fails to abort job correctly in case of custom OutputFormat implementations

2017-07-27 Thread Sergey Zhemzhitsky (JIRA)
Sergey Zhemzhitsky created SPARK-21549:
--

 Summary: Spark fails to abort job correctly in case of custom 
OutputFormat implementations
 Key: SPARK-21549
 URL: https://issues.apache.org/jira/browse/SPARK-21549
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.2.0
 Environment: spark 2.2.0
scala 2.11
Reporter: Sergey Zhemzhitsky
Priority: Critical


Spark fails to abort job correctly in case of custom OutputFormat 
implementations.

There are OutputFormat implementations which do not need to use 
*mapreduce.output.fileoutputformat.outputdir* standard hadoop property.

[But spark reads this property from the 
configuration.|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala#L79]
 while setting up an OutputCommitter
{code:javascript}
val committer = FileCommitProtocol.instantiate(
  className = classOf[HadoopMapReduceCommitProtocol].getName,
  jobId = stageId.toString,
  outputPath = conf.value.get("mapreduce.output.fileoutputformat.outputdir"),
  isAppend = false).asInstanceOf[HadoopMapReduceCommitProtocol]
committer.setupJob(jobContext)
{code}

In that case if job fails Spark executes 
[committer.abortJob|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala#L106]
{code:javascript}
committer.abortJob(jobContext)
{code}
... and fails with the following exception
{code}
Can not create a Path from a null string
java.lang.IllegalArgumentException: Can not create a Path from a null string
  at org.apache.hadoop.fs.Path.checkPathArg(Path.java:123)
  at org.apache.hadoop.fs.Path.(Path.java:135)
  at org.apache.hadoop.fs.Path.(Path.java:89)
  at 
org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.absPathStagingDir(HadoopMapReduceCommitProtocol.scala:58)
  at 
org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.abortJob(HadoopMapReduceCommitProtocol.scala:141)
  at 
org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.write(SparkHadoopMapReduceWriter.scala:106)
  at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1085)
  at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)
  at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)
  at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
  at 
org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1084)
{code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-21549) Spark fails to abort job correctly in case of custom OutputFormat implementations

2017-07-27 Thread Sergey Zhemzhitsky (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Zhemzhitsky updated SPARK-21549:
---
Description: 
Spark fails to abort job correctly in case of custom OutputFormat 
implementations.

There are OutputFormat implementations which do not need to use 
*mapreduce.output.fileoutputformat.outputdir* standard hadoop property.

[But spark reads this property from the 
configuration|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala#L79]
 while setting up an OutputCommitter
{code:javascript}
val committer = FileCommitProtocol.instantiate(
  className = classOf[HadoopMapReduceCommitProtocol].getName,
  jobId = stageId.toString,
  outputPath = conf.value.get("mapreduce.output.fileoutputformat.outputdir"),
  isAppend = false).asInstanceOf[HadoopMapReduceCommitProtocol]
committer.setupJob(jobContext)
{code}

In that case if job fails Spark executes 
[committer.abortJob|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala#L106]
{code:javascript}
committer.abortJob(jobContext)
{code}
... and fails with the following exception
{code}
Can not create a Path from a null string
java.lang.IllegalArgumentException: Can not create a Path from a null string
  at org.apache.hadoop.fs.Path.checkPathArg(Path.java:123)
  at org.apache.hadoop.fs.Path.(Path.java:135)
  at org.apache.hadoop.fs.Path.(Path.java:89)
  at 
org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.absPathStagingDir(HadoopMapReduceCommitProtocol.scala:58)
  at 
org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.abortJob(HadoopMapReduceCommitProtocol.scala:141)
  at 
org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.write(SparkHadoopMapReduceWriter.scala:106)
  at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1085)
  at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)
  at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)
  at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
  at 
org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1084)
{code}

  was:
Spark fails to abort job correctly in case of custom OutputFormat 
implementations.

There are OutputFormat implementations which do not need to use 
*mapreduce.output.fileoutputformat.outputdir* standard hadoop property.

[But spark reads this property from the 
configuration.|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala#L79]
 while setting up an OutputCommitter
{code:javascript}
val committer = FileCommitProtocol.instantiate(
  className = classOf[HadoopMapReduceCommitProtocol].getName,
  jobId = stageId.toString,
  outputPath = conf.value.get("mapreduce.output.fileoutputformat.outputdir"),
  isAppend = false).asInstanceOf[HadoopMapReduceCommitProtocol]
committer.setupJob(jobContext)
{code}

In that case if job fails Spark executes 
[committer.abortJob|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala#L106]
{code:javascript}
committer.abortJob(jobContext)
{code}
... and fails with the following exception
{code}
Can not create a Path from a null string
java.lang.IllegalArgumentException: Can not create a Path from a null string
  at org.apache.hadoop.fs.Path.checkPathArg(Path.java:123)
  at org.apache.hadoop.fs.Path.(Path.java:135)
  at org.apache.hadoop.fs.Path.(Path.java:89)
  at 
org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.absPathStagingDir(HadoopMapReduceCommitProtocol.scala:58)
  at 
org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.abortJob(HadoopMapReduceCommitProtocol.scala:141)
  at 
org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.write(SparkHadoopMapReduceWriter.scala:106)
  at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1085)
  at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)
  at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)
  at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
  at 
org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1084)

[jira] [Updated] (SPARK-21549) Spark fails to complete job correctly in case of custom OutputFormat implementations

2017-07-28 Thread Sergey Zhemzhitsky (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Zhemzhitsky updated SPARK-21549:
---
Summary: Spark fails to complete job correctly in case of custom 
OutputFormat implementations  (was: Spark fails to abort job correctly in case 
of custom OutputFormat implementations)

> Spark fails to complete job correctly in case of custom OutputFormat 
> implementations
> 
>
> Key: SPARK-21549
> URL: https://issues.apache.org/jira/browse/SPARK-21549
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
> Environment: spark 2.2.0
> scala 2.11
>Reporter: Sergey Zhemzhitsky
>Priority: Critical
>
> Spark fails to abort job correctly in case of custom OutputFormat 
> implementations.
> There are OutputFormat implementations which do not need to use 
> *mapreduce.output.fileoutputformat.outputdir* standard hadoop property.
> [But spark reads this property from the 
> configuration|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala#L79]
>  while setting up an OutputCommitter
> {code:javascript}
> val committer = FileCommitProtocol.instantiate(
>   className = classOf[HadoopMapReduceCommitProtocol].getName,
>   jobId = stageId.toString,
>   outputPath = conf.value.get("mapreduce.output.fileoutputformat.outputdir"),
>   isAppend = false).asInstanceOf[HadoopMapReduceCommitProtocol]
> committer.setupJob(jobContext)
> {code}
> In that case if job fails Spark executes 
> [committer.abortJob|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala#L106]
> {code:javascript}
> committer.abortJob(jobContext)
> {code}
> ... and fails with the following exception
> {code}
> Can not create a Path from a null string
> java.lang.IllegalArgumentException: Can not create a Path from a null string
>   at org.apache.hadoop.fs.Path.checkPathArg(Path.java:123)
>   at org.apache.hadoop.fs.Path.(Path.java:135)
>   at org.apache.hadoop.fs.Path.(Path.java:89)
>   at 
> org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.absPathStagingDir(HadoopMapReduceCommitProtocol.scala:58)
>   at 
> org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.abortJob(HadoopMapReduceCommitProtocol.scala:141)
>   at 
> org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.write(SparkHadoopMapReduceWriter.scala:106)
>   at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1085)
>   at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)
>   at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>   at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
>   at 
> org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1084)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-21549) Spark fails to complete job correctly in case of OutputFormat which do not write into hdfs

2017-07-28 Thread Sergey Zhemzhitsky (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Zhemzhitsky updated SPARK-21549:
---
Priority: Critical  (was: Blocker)

> Spark fails to complete job correctly in case of OutputFormat which do not 
> write into hdfs
> --
>
> Key: SPARK-21549
> URL: https://issues.apache.org/jira/browse/SPARK-21549
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
> Environment: spark 2.2.0
> scala 2.11
>Reporter: Sergey Zhemzhitsky
>Priority: Critical
>
> Spark fails to complete job correctly in case of custom OutputFormat 
> implementations.
> There are OutputFormat implementations which do not need to use 
> *mapreduce.output.fileoutputformat.outputdir* standard hadoop property.
> [But spark reads this property from the 
> configuration|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala#L79]
>  while setting up an OutputCommitter
> {code:javascript}
> val committer = FileCommitProtocol.instantiate(
>   className = classOf[HadoopMapReduceCommitProtocol].getName,
>   jobId = stageId.toString,
>   outputPath = conf.value.get("mapreduce.output.fileoutputformat.outputdir"),
>   isAppend = false).asInstanceOf[HadoopMapReduceCommitProtocol]
> committer.setupJob(jobContext)
> {code}
> ... and then uses this property later on while [commiting the 
> job|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L132],
>  [aborting the 
> job|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L141],
>  [creating task's temp 
> path|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L95]
> In that cases when the job completes then following exception is thrown
> {code}
> Can not create a Path from a null string
> java.lang.IllegalArgumentException: Can not create a Path from a null string
>   at org.apache.hadoop.fs.Path.checkPathArg(Path.java:123)
>   at org.apache.hadoop.fs.Path.(Path.java:135)
>   at org.apache.hadoop.fs.Path.(Path.java:89)
>   at 
> org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.absPathStagingDir(HadoopMapReduceCommitProtocol.scala:58)
>   at 
> org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.abortJob(HadoopMapReduceCommitProtocol.scala:141)
>   at 
> org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.write(SparkHadoopMapReduceWriter.scala:106)
>   at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1085)
>   at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)
>   at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>   at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
>   at 
> org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1084)
>   ...
> {code}
> So it seems that all the jobs which use OutputFormats which don't write data 
> into HDFS-compatible file systems are broken.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-21549) Spark fails to complete job correctly in case of OutputFormat which do not write into hdfs

2017-07-28 Thread Sergey Zhemzhitsky (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Zhemzhitsky updated SPARK-21549:
---
Summary: Spark fails to complete job correctly in case of OutputFormat 
which do not write into hdfs  (was: Spark fails to complete job correctly in 
case of custom OutputFormat implementations)

> Spark fails to complete job correctly in case of OutputFormat which do not 
> write into hdfs
> --
>
> Key: SPARK-21549
> URL: https://issues.apache.org/jira/browse/SPARK-21549
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
> Environment: spark 2.2.0
> scala 2.11
>Reporter: Sergey Zhemzhitsky
>Priority: Critical
>
> Spark fails to complete job correctly in case of custom OutputFormat 
> implementations.
> There are OutputFormat implementations which do not need to use 
> *mapreduce.output.fileoutputformat.outputdir* standard hadoop property.
> [But spark reads this property from the 
> configuration|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala#L79]
>  while setting up an OutputCommitter
> {code:javascript}
> val committer = FileCommitProtocol.instantiate(
>   className = classOf[HadoopMapReduceCommitProtocol].getName,
>   jobId = stageId.toString,
>   outputPath = conf.value.get("mapreduce.output.fileoutputformat.outputdir"),
>   isAppend = false).asInstanceOf[HadoopMapReduceCommitProtocol]
> committer.setupJob(jobContext)
> {code}
> ... and then uses this property later on while [commiting the 
> job|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L132],
>  [aborting the 
> job|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L141],
>  [creating task's temp 
> path|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L95]
> In that cases when the job completes then following exception is thrown
> {code}
> Can not create a Path from a null string
> java.lang.IllegalArgumentException: Can not create a Path from a null string
>   at org.apache.hadoop.fs.Path.checkPathArg(Path.java:123)
>   at org.apache.hadoop.fs.Path.(Path.java:135)
>   at org.apache.hadoop.fs.Path.(Path.java:89)
>   at 
> org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.absPathStagingDir(HadoopMapReduceCommitProtocol.scala:58)
>   at 
> org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.abortJob(HadoopMapReduceCommitProtocol.scala:141)
>   at 
> org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.write(SparkHadoopMapReduceWriter.scala:106)
>   at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1085)
>   at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)
>   at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>   at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
>   at 
> org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1084)
>   ...
> {code}
> So it seems that all the jobs which use OutputFormats which don't write data 
> into HDFS-compatible file systems are broken.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-21549) Spark fails to complete job correctly in case of OutputFormat which do not write into hdfs

2017-07-28 Thread Sergey Zhemzhitsky (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Zhemzhitsky updated SPARK-21549:
---
Priority: Blocker  (was: Critical)

> Spark fails to complete job correctly in case of OutputFormat which do not 
> write into hdfs
> --
>
> Key: SPARK-21549
> URL: https://issues.apache.org/jira/browse/SPARK-21549
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
> Environment: spark 2.2.0
> scala 2.11
>Reporter: Sergey Zhemzhitsky
>Priority: Blocker
>
> Spark fails to complete job correctly in case of custom OutputFormat 
> implementations.
> There are OutputFormat implementations which do not need to use 
> *mapreduce.output.fileoutputformat.outputdir* standard hadoop property.
> [But spark reads this property from the 
> configuration|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala#L79]
>  while setting up an OutputCommitter
> {code:javascript}
> val committer = FileCommitProtocol.instantiate(
>   className = classOf[HadoopMapReduceCommitProtocol].getName,
>   jobId = stageId.toString,
>   outputPath = conf.value.get("mapreduce.output.fileoutputformat.outputdir"),
>   isAppend = false).asInstanceOf[HadoopMapReduceCommitProtocol]
> committer.setupJob(jobContext)
> {code}
> ... and then uses this property later on while [commiting the 
> job|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L132],
>  [aborting the 
> job|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L141],
>  [creating task's temp 
> path|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L95]
> In that cases when the job completes then following exception is thrown
> {code}
> Can not create a Path from a null string
> java.lang.IllegalArgumentException: Can not create a Path from a null string
>   at org.apache.hadoop.fs.Path.checkPathArg(Path.java:123)
>   at org.apache.hadoop.fs.Path.(Path.java:135)
>   at org.apache.hadoop.fs.Path.(Path.java:89)
>   at 
> org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.absPathStagingDir(HadoopMapReduceCommitProtocol.scala:58)
>   at 
> org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.abortJob(HadoopMapReduceCommitProtocol.scala:141)
>   at 
> org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.write(SparkHadoopMapReduceWriter.scala:106)
>   at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1085)
>   at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)
>   at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>   at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
>   at 
> org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1084)
>   ...
> {code}
> So it seems that all the jobs which use OutputFormats which don't write data 
> into HDFS-compatible file systems are broken.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-21549) Spark fails to complete job correctly in case of custom OutputFormat implementations

2017-07-28 Thread Sergey Zhemzhitsky (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Zhemzhitsky updated SPARK-21549:
---
Description: 
Spark fails to complete job correctly in case of custom OutputFormat 
implementations.

There are OutputFormat implementations which do not need to use 
*mapreduce.output.fileoutputformat.outputdir* standard hadoop property.

[But spark reads this property from the 
configuration|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala#L79]
 while setting up an OutputCommitter
{code:javascript}
val committer = FileCommitProtocol.instantiate(
  className = classOf[HadoopMapReduceCommitProtocol].getName,
  jobId = stageId.toString,
  outputPath = conf.value.get("mapreduce.output.fileoutputformat.outputdir"),
  isAppend = false).asInstanceOf[HadoopMapReduceCommitProtocol]
committer.setupJob(jobContext)
{code}
... and then uses this property later on while [commiting the 
job|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L132],
 [aborting the 
job|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L141],
 [creating task's temp 
path|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L95]

In that cases when the job completes then following exception is thrown
{code}
Can not create a Path from a null string
java.lang.IllegalArgumentException: Can not create a Path from a null string
  at org.apache.hadoop.fs.Path.checkPathArg(Path.java:123)
  at org.apache.hadoop.fs.Path.(Path.java:135)
  at org.apache.hadoop.fs.Path.(Path.java:89)
  at 
org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.absPathStagingDir(HadoopMapReduceCommitProtocol.scala:58)
  at 
org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.abortJob(HadoopMapReduceCommitProtocol.scala:141)
  at 
org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.write(SparkHadoopMapReduceWriter.scala:106)
  at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1085)
  at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)
  at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)
  at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
  at 
org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1084)
  ...
{code}

So it seems that all the jobs which use OutputFormats which don't write data 
into HDFS-compatible file systems are broken.

  was:
Spark fails to abort job correctly in case of custom OutputFormat 
implementations.

There are OutputFormat implementations which do not need to use 
*mapreduce.output.fileoutputformat.outputdir* standard hadoop property.

[But spark reads this property from the 
configuration|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala#L79]
 while setting up an OutputCommitter
{code:javascript}
val committer = FileCommitProtocol.instantiate(
  className = classOf[HadoopMapReduceCommitProtocol].getName,
  jobId = stageId.toString,
  outputPath = conf.value.get("mapreduce.output.fileoutputformat.outputdir"),
  isAppend = false).asInstanceOf[HadoopMapReduceCommitProtocol]
committer.setupJob(jobContext)
{code}

In that case if job fails Spark executes 
[committer.abortJob|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala#L106]
{code:javascript}
committer.abortJob(jobContext)
{code}
... and fails with the following exception
{code}
Can not create a Path from a null string
java.lang.IllegalArgumentException: Can not create a Path from a null string
  at org.apache.hadoop.fs.Path.checkPathArg(Path.java:123)
  at org.apache.hadoop.fs.Path.(Path.java:135)
  at org.apache.hadoop.fs.Path.(Path.java:89)
  at 
org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.absPathStagingDir(HadoopMapReduceCommitProtocol.scala:58)
  at 
org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.abortJob(HadoopMapReduceCommitProtocol.scala:141)
  at 
org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.write(SparkHadoopMapReduceWriter.scala:106)
  at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1085)
  at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)
  at 

[jira] [Commented] (SPARK-20404) Regression with accumulator names when migrating from 1.6 to 2.x

2017-04-24 Thread Sergey Zhemzhitsky (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15980901#comment-15980901
 ] 

Sergey Zhemzhitsky commented on SPARK-20404:


[~srowen] 

Here it is: https://github.com/apache/spark/pull/17740

> Regression with accumulator names when migrating from 1.6 to 2.x
> 
>
> Key: SPARK-20404
> URL: https://issues.apache.org/jira/browse/SPARK-20404
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.0.0, 2.0.1, 2.0.2, 2.1.0
> Environment: Spark: 2.1
> Scala: 2.11
> Spark master: local
>Reporter: Sergey Zhemzhitsky
> Attachments: spark-context-accum-option.patch
>
>
> Creating accumulator with explicitly specified name equal to _null_, like the 
> following
> {code:java}
> sparkContext.accumulator(0, null)
> {code}
> throws exception at runtime
> {code:none}
> ERROR | DAGScheduler | dag-scheduler-event-loop | Failed to update 
> accumulators for task 0
> java.lang.NullPointerException
>   at 
> org.apache.spark.util.AccumulatorV2$$anonfun$1.apply(AccumulatorV2.scala:106)
>   at 
> org.apache.spark.util.AccumulatorV2$$anonfun$1.apply(AccumulatorV2.scala:106)
>   at scala.Option.exists(Option.scala:240)
>   at org.apache.spark.util.AccumulatorV2.toInfo(AccumulatorV2.scala:106)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1091)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1080)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>   at 
> org.apache.spark.scheduler.DAGScheduler.updateAccumulators(DAGScheduler.scala:1080)
>   at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1183)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1647)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
>   at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> {code}
> The issue is in wrapping name into _Some_ instead of _Option_ when creating 
> accumulators.
> Patch is available.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20886) HadoopMapReduceCommitProtocol to fail with message if FileOutputCommitter.getWorkPath==null

2017-09-19 Thread Sergey Zhemzhitsky (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16171227#comment-16171227
 ] 

Sergey Zhemzhitsky commented on SPARK-20886:


[~ste...@apache.org], [~hyukjin.kwon], does this patch also fixes NPE described 
in SPARK-21549?

> HadoopMapReduceCommitProtocol to fail with message if 
> FileOutputCommitter.getWorkPath==null
> ---
>
> Key: SPARK-20886
> URL: https://issues.apache.org/jira/browse/SPARK-20886
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Steve Loughran
>Assignee: Steve Loughran
>Priority: Trivial
> Fix For: 2.3.0
>
>
> This is minor, and the root cause is my fault *elsewhere*, but its the patch 
> I used to track down the problem.
> If {{HadoopMapReduceCommitProtocol}} has a {{FileOutputCommitter}} for 
> committing things, and *somehow* that's been configured with a 
> {{JobAttemptContext}}, not a {{TaskAttemptContext}}, then the committer NPEs.
> A {{require()}} statement can validate the working path and so point the 
> blame at whoever's code is confused.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21549) Spark fails to complete job correctly in case of OutputFormat which do not write into hdfs

2017-09-20 Thread Sergey Zhemzhitsky (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16173755#comment-16173755
 ] 

Sergey Zhemzhitsky commented on SPARK-21549:


[~ste...@apache.org] I've updated PR to prevent using FileSystems at all. 
Instead, there is just an additional check whether there are absolute files to 
rename during commit.

> Spark fails to complete job correctly in case of OutputFormat which do not 
> write into hdfs
> --
>
> Key: SPARK-21549
> URL: https://issues.apache.org/jira/browse/SPARK-21549
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
> Environment: spark 2.2.0
> scala 2.11
>Reporter: Sergey Zhemzhitsky
>
> Spark fails to complete job correctly in case of custom OutputFormat 
> implementations.
> There are OutputFormat implementations which do not need to use 
> *mapreduce.output.fileoutputformat.outputdir* standard hadoop property.
> [But spark reads this property from the 
> configuration|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala#L79]
>  while setting up an OutputCommitter
> {code:javascript}
> val committer = FileCommitProtocol.instantiate(
>   className = classOf[HadoopMapReduceCommitProtocol].getName,
>   jobId = stageId.toString,
>   outputPath = conf.value.get("mapreduce.output.fileoutputformat.outputdir"),
>   isAppend = false).asInstanceOf[HadoopMapReduceCommitProtocol]
> committer.setupJob(jobContext)
> {code}
> ... and then uses this property later on while [commiting the 
> job|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L132],
>  [aborting the 
> job|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L141],
>  [creating task's temp 
> path|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L95]
> In that cases when the job completes then following exception is thrown
> {code}
> Can not create a Path from a null string
> java.lang.IllegalArgumentException: Can not create a Path from a null string
>   at org.apache.hadoop.fs.Path.checkPathArg(Path.java:123)
>   at org.apache.hadoop.fs.Path.(Path.java:135)
>   at org.apache.hadoop.fs.Path.(Path.java:89)
>   at 
> org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.absPathStagingDir(HadoopMapReduceCommitProtocol.scala:58)
>   at 
> org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.abortJob(HadoopMapReduceCommitProtocol.scala:141)
>   at 
> org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.write(SparkHadoopMapReduceWriter.scala:106)
>   at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1085)
>   at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)
>   at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>   at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
>   at 
> org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1084)
>   ...
> {code}
> So it seems that all the jobs which use OutputFormats which don't write data 
> into HDFS-compatible file systems are broken.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21549) Spark fails to complete job correctly in case of OutputFormat which do not write into hdfs

2017-09-20 Thread Sergey Zhemzhitsky (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16173184#comment-16173184
 ] 

Sergey Zhemzhitsky commented on SPARK-21549:


[~mridulm80], [~WeiqingYang], [~ste...@apache.org], 

I've implemented the fix in this PR 
(https://github.com/apache/spark/pull/19294), which sets user's current working 
directory (which is typically her home directory in case of distributed 
filesystems) as output directory.

The patch allows using OutputFormats which write to external systems, 
databases, etc. by means of RDD API.
I far as I understand the requirement for output paths to be specified is only 
necessary to allow files to be committed to an absolute output location, that 
is not the case for output formats which write data to external systems. 
So using user's working directory for such situations seems to be ok.
 


> Spark fails to complete job correctly in case of OutputFormat which do not 
> write into hdfs
> --
>
> Key: SPARK-21549
> URL: https://issues.apache.org/jira/browse/SPARK-21549
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
> Environment: spark 2.2.0
> scala 2.11
>Reporter: Sergey Zhemzhitsky
>
> Spark fails to complete job correctly in case of custom OutputFormat 
> implementations.
> There are OutputFormat implementations which do not need to use 
> *mapreduce.output.fileoutputformat.outputdir* standard hadoop property.
> [But spark reads this property from the 
> configuration|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala#L79]
>  while setting up an OutputCommitter
> {code:javascript}
> val committer = FileCommitProtocol.instantiate(
>   className = classOf[HadoopMapReduceCommitProtocol].getName,
>   jobId = stageId.toString,
>   outputPath = conf.value.get("mapreduce.output.fileoutputformat.outputdir"),
>   isAppend = false).asInstanceOf[HadoopMapReduceCommitProtocol]
> committer.setupJob(jobContext)
> {code}
> ... and then uses this property later on while [commiting the 
> job|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L132],
>  [aborting the 
> job|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L141],
>  [creating task's temp 
> path|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L95]
> In that cases when the job completes then following exception is thrown
> {code}
> Can not create a Path from a null string
> java.lang.IllegalArgumentException: Can not create a Path from a null string
>   at org.apache.hadoop.fs.Path.checkPathArg(Path.java:123)
>   at org.apache.hadoop.fs.Path.(Path.java:135)
>   at org.apache.hadoop.fs.Path.(Path.java:89)
>   at 
> org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.absPathStagingDir(HadoopMapReduceCommitProtocol.scala:58)
>   at 
> org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.abortJob(HadoopMapReduceCommitProtocol.scala:141)
>   at 
> org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.write(SparkHadoopMapReduceWriter.scala:106)
>   at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1085)
>   at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)
>   at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>   at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
>   at 
> org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1084)
>   ...
> {code}
> So it seems that all the jobs which use OutputFormats which don't write data 
> into HDFS-compatible file systems are broken.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22184) GraphX fails in case of insufficient memory and checkpoints enabled

2017-10-02 Thread Sergey Zhemzhitsky (JIRA)
Sergey Zhemzhitsky created SPARK-22184:
--

 Summary: GraphX fails in case of insufficient memory and 
checkpoints enabled
 Key: SPARK-22184
 URL: https://issues.apache.org/jira/browse/SPARK-22184
 Project: Spark
  Issue Type: Bug
  Components: GraphX
Affects Versions: 2.2.0
 Environment: spark 2.2.0
scala 2.11
Reporter: Sergey Zhemzhitsky
 Fix For: 2.2.1


GraphX fails with FileNotFoundException in case of insufficient memory when 
checkpoints are enabled.

Here is the stacktrace 
{code}
Job aborted due to stage failure: Task creation failed: 
java.io.FileNotFoundException: File 
file:/tmp/spark-90119695-a126-47b5-b047-d656fee10c17/9b16e2a9-6c80-45eb-8736-bbb6eb840146/rdd-28/part-0
 does not exist
java.io.FileNotFoundException: File 
file:/tmp/spark-90119695-a126-47b5-b047-d656fee10c17/9b16e2a9-6c80-45eb-8736-bbb6eb840146/rdd-28/part-0
 does not exist
at 
org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:539)
at 
org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:752)
at 
org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:529)
at 
org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:409)
at 
org.apache.spark.rdd.ReliableCheckpointRDD.getPreferredLocations(ReliableCheckpointRDD.scala:89)
at 
org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:274)
at 
org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:274)
at scala.Option.map(Option.scala:146)
at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:274)
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1697)
...
{code}

As GraphX uses cached RDDs intensively, the issue is only reproducible when 
previously cached and checkpointed Vertex and Edge RDDs are evicted from memory 
and forced to be read from disk. 

For testing purposes the following parameters may be set to emulate low memory 
environment
{code}
val sparkConf = new SparkConf()
  .set("spark.graphx.pregel.checkpointInterval", "2")
  // set testing memory to evict cached RDDs from it and force
  // reading checkpointed RDDs from disk
  .set("spark.testing.reservedMemory", "128")
  .set("spark.testing.memory", "256")
{code}

This issue also includes SPARK-22150 and cannot be fixed until SPARK-22150 is 
fixed too.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22184) GraphX fails in case of insufficient memory and checkpoints enabled

2017-10-02 Thread Sergey Zhemzhitsky (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188306#comment-16188306
 ] 

Sergey Zhemzhitsky commented on SPARK-22184:


Hi [~sowen], 

Should I merge changes of PR that fixes this issue into SPARK-22150 and provide 
a single PR which includes fixes for standard checkpointers as well as  GraphX 
ones? I'm asking because in case of GraphX there are also changes in Pregel and 
I believe that PR with just changes of PeriodicCheckpointer (without GraphX) 
can be reviewed and merged into master faster than the one which includes all 
the changes.

What do you think?

> GraphX fails in case of insufficient memory and checkpoints enabled
> ---
>
> Key: SPARK-22184
> URL: https://issues.apache.org/jira/browse/SPARK-22184
> Project: Spark
>  Issue Type: Bug
>  Components: GraphX
>Affects Versions: 2.2.0
> Environment: spark 2.2.0
> scala 2.11
>Reporter: Sergey Zhemzhitsky
>
> GraphX fails with FileNotFoundException in case of insufficient memory when 
> checkpoints are enabled.
> Here is the stacktrace 
> {code}
> Job aborted due to stage failure: Task creation failed: 
> java.io.FileNotFoundException: File 
> file:/tmp/spark-90119695-a126-47b5-b047-d656fee10c17/9b16e2a9-6c80-45eb-8736-bbb6eb840146/rdd-28/part-0
>  does not exist
> java.io.FileNotFoundException: File 
> file:/tmp/spark-90119695-a126-47b5-b047-d656fee10c17/9b16e2a9-6c80-45eb-8736-bbb6eb840146/rdd-28/part-0
>  does not exist
>   at 
> org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:539)
>   at 
> org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:752)
>   at 
> org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:529)
>   at 
> org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:409)
>   at 
> org.apache.spark.rdd.ReliableCheckpointRDD.getPreferredLocations(ReliableCheckpointRDD.scala:89)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:274)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:274)
>   at scala.Option.map(Option.scala:146)
>   at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:274)
>   at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1697)
> ...
> {code}
> As GraphX uses cached RDDs intensively, the issue is only reproducible when 
> previously cached and checkpointed Vertex and Edge RDDs are evicted from 
> memory and forced to be read from disk. 
> For testing purposes the following parameters may be set to emulate low 
> memory environment
> {code}
> val sparkConf = new SparkConf()
>   .set("spark.graphx.pregel.checkpointInterval", "2")
>   // set testing memory to evict cached RDDs from it and force
>   // reading checkpointed RDDs from disk
>   .set("spark.testing.reservedMemory", "128")
>   .set("spark.testing.memory", "256")
> {code}
> This issue also includes SPARK-22150 and cannot be fixed until SPARK-22150 is 
> fixed too.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Reopened] (SPARK-22184) GraphX fails in case of insufficient memory and checkpoints enabled

2017-10-02 Thread Sergey Zhemzhitsky (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Zhemzhitsky reopened SPARK-22184:


> GraphX fails in case of insufficient memory and checkpoints enabled
> ---
>
> Key: SPARK-22184
> URL: https://issues.apache.org/jira/browse/SPARK-22184
> Project: Spark
>  Issue Type: Bug
>  Components: GraphX
>Affects Versions: 2.2.0
> Environment: spark 2.2.0
> scala 2.11
>Reporter: Sergey Zhemzhitsky
>
> GraphX fails with FileNotFoundException in case of insufficient memory when 
> checkpoints are enabled.
> Here is the stacktrace 
> {code}
> Job aborted due to stage failure: Task creation failed: 
> java.io.FileNotFoundException: File 
> file:/tmp/spark-90119695-a126-47b5-b047-d656fee10c17/9b16e2a9-6c80-45eb-8736-bbb6eb840146/rdd-28/part-0
>  does not exist
> java.io.FileNotFoundException: File 
> file:/tmp/spark-90119695-a126-47b5-b047-d656fee10c17/9b16e2a9-6c80-45eb-8736-bbb6eb840146/rdd-28/part-0
>  does not exist
>   at 
> org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:539)
>   at 
> org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:752)
>   at 
> org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:529)
>   at 
> org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:409)
>   at 
> org.apache.spark.rdd.ReliableCheckpointRDD.getPreferredLocations(ReliableCheckpointRDD.scala:89)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:274)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:274)
>   at scala.Option.map(Option.scala:146)
>   at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:274)
>   at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1697)
> ...
> {code}
> As GraphX uses cached RDDs intensively, the issue is only reproducible when 
> previously cached and checkpointed Vertex and Edge RDDs are evicted from 
> memory and forced to be read from disk. 
> For testing purposes the following parameters may be set to emulate low 
> memory environment
> {code}
> val sparkConf = new SparkConf()
>   .set("spark.graphx.pregel.checkpointInterval", "2")
>   // set testing memory to evict cached RDDs from it and force
>   // reading checkpointed RDDs from disk
>   .set("spark.testing.reservedMemory", "128")
>   .set("spark.testing.memory", "256")
> {code}
> This issue also includes SPARK-22150 and cannot be fixed until SPARK-22150 is 
> fixed too.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-22184) GraphX fails in case of insufficient memory and checkpoints enabled

2017-10-02 Thread Sergey Zhemzhitsky (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188306#comment-16188306
 ] 

Sergey Zhemzhitsky edited comment on SPARK-22184 at 10/2/17 5:27 PM:
-

Hi [~sowen], 

Would you mind if I reopen this issue, until it is clear enough whether it is 
better to include GraphX fixes into SPARK-22150 or keep the changes separate 
from each other?

Could you please suggest, should I merge changes of PR that fixes this issue 
into SPARK-22150 and provide a single PR which includes fixes for standard 
checkpointers as well as  GraphX ones? I'm just asking because in case of 
GraphX there are also changes in Pregel and I believe that PR (SPARK-22150) 
with just changes of PeriodicCheckpointer (without GraphX) can be reviewed and 
merged into master faster than the one which includes all the changes.

What do you think?


was (Author: szhemzhitsky):
Hi [~sowen], 

Should I merge changes of PR that fixes this issue into SPARK-22150 and provide 
a single PR which includes fixes for standard checkpointers as well as  GraphX 
ones? I'm asking because in case of GraphX there are also changes in Pregel and 
I believe that PR with just changes of PeriodicCheckpointer (without GraphX) 
can be reviewed and merged into master faster than the one which includes all 
the changes.

What do you think?

> GraphX fails in case of insufficient memory and checkpoints enabled
> ---
>
> Key: SPARK-22184
> URL: https://issues.apache.org/jira/browse/SPARK-22184
> Project: Spark
>  Issue Type: Bug
>  Components: GraphX
>Affects Versions: 2.2.0
> Environment: spark 2.2.0
> scala 2.11
>Reporter: Sergey Zhemzhitsky
>
> GraphX fails with FileNotFoundException in case of insufficient memory when 
> checkpoints are enabled.
> Here is the stacktrace 
> {code}
> Job aborted due to stage failure: Task creation failed: 
> java.io.FileNotFoundException: File 
> file:/tmp/spark-90119695-a126-47b5-b047-d656fee10c17/9b16e2a9-6c80-45eb-8736-bbb6eb840146/rdd-28/part-0
>  does not exist
> java.io.FileNotFoundException: File 
> file:/tmp/spark-90119695-a126-47b5-b047-d656fee10c17/9b16e2a9-6c80-45eb-8736-bbb6eb840146/rdd-28/part-0
>  does not exist
>   at 
> org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:539)
>   at 
> org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:752)
>   at 
> org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:529)
>   at 
> org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:409)
>   at 
> org.apache.spark.rdd.ReliableCheckpointRDD.getPreferredLocations(ReliableCheckpointRDD.scala:89)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:274)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:274)
>   at scala.Option.map(Option.scala:146)
>   at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:274)
>   at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1697)
> ...
> {code}
> As GraphX uses cached RDDs intensively, the issue is only reproducible when 
> previously cached and checkpointed Vertex and Edge RDDs are evicted from 
> memory and forced to be read from disk. 
> For testing purposes the following parameters may be set to emulate low 
> memory environment
> {code}
> val sparkConf = new SparkConf()
>   .set("spark.graphx.pregel.checkpointInterval", "2")
>   // set testing memory to evict cached RDDs from it and force
>   // reading checkpointed RDDs from disk
>   .set("spark.testing.reservedMemory", "128")
>   .set("spark.testing.memory", "256")
> {code}
> This issue also includes SPARK-22150 and cannot be fixed until SPARK-22150 is 
> fixed too.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22150) PeriodicCheckpointer fails with FileNotFoundException in case of dependant RDDs

2017-09-27 Thread Sergey Zhemzhitsky (JIRA)
Sergey Zhemzhitsky created SPARK-22150:
--

 Summary: PeriodicCheckpointer fails with FileNotFoundException in 
case of dependant RDDs
 Key: SPARK-22150
 URL: https://issues.apache.org/jira/browse/SPARK-22150
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.2.0, 2.1.1, 2.1.0, 2.0.2, 2.0.1, 2.0.0, 1.6.3, 1.6.2, 
1.6.1, 1.6.0, 1.5.2, 1.5.1, 1.5.0
 Environment: spark 2.2.0
scala 2.11
Reporter: Sergey Zhemzhitsky


PeriodicCheckpointer fails with FileNotFoundException in case of checkpointing 
dependant RDDs (consider iterative algorithms), i.e. when the RDD to checkpoint 
depends on already checkpointed RDD.

Here is the exception
{code}
Job aborted due to stage failure: Task creation failed: 
java.io.FileNotFoundException: File 
file:/tmp/spark-e2046e49-5d8a-4633-b525-52a00bab32f0/17d9b542-d48d-4f23-b368-3957a42644a6/rdd-20/part-0
 does not exist
java.io.FileNotFoundException: File 
file:/tmp/spark-e2046e49-5d8a-4633-b525-52a00bab32f0/17d9b542-d48d-4f23-b368-3957a42644a6/rdd-20/part-0
 does not exist
at 
org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:539)
at 
org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:752)
at 
org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:529)
at 
org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:409)
at 
org.apache.spark.rdd.ReliableCheckpointRDD.getPreferredLocations(ReliableCheckpointRDD.scala:89)
at 
org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:274)
at 
org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:274)
at scala.Option.map(Option.scala:146)
at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:274)
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1697)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply$mcVI$sp(DAGScheduler.scala:1708)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply(DAGScheduler.scala:1707)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply(DAGScheduler.scala:1707)
at scala.collection.immutable.List.foreach(List.scala:381)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1707)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1705)
at scala.collection.immutable.List.foreach(List.scala:381)
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1705)
at 
org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1671)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$16.apply(DAGScheduler.scala:989)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$16.apply(DAGScheduler.scala:987)
{code}

The issue seems to be in this [piece of 
code|https://github.com/apache/spark/blob/0a7f5f2798b6e8b2ba15e8b3aa07d5953ad1c695/core/src/main/scala/org/apache/spark/util/PeriodicCheckpointer.scala#L94]

{code:java}
if (checkpointInterval != -1 && (updateCount % checkpointInterval) == 0
  && sc.getCheckpointDir.nonEmpty) {
  // Add new checkpoint before removing old checkpoints.
  checkpoint(newData)
  checkpointQueue.enqueue(newData)
  // Remove checkpoints before the latest one.
  var canDelete = true
  while (checkpointQueue.size > 1 && canDelete) {
// Delete the oldest checkpoint only if the next checkpoint exists.
if (isCheckpointed(checkpointQueue.head)) {
  removeCheckpointFile()
} else {
  canDelete = false
}
  }
}
{code}
Given that _checkpointQueue.head_ is checkpointed and materialized and 
_newData_ depends on _checkpointQueue.head_, then the exception happens on 
action of RDDs representing _newData_



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-21549) Spark fails to complete job correctly in case of OutputFormat which do not write into hdfs

2017-09-27 Thread Sergey Zhemzhitsky (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Zhemzhitsky updated SPARK-21549:
---
Fix Version/s: 2.2.1

> Spark fails to complete job correctly in case of OutputFormat which do not 
> write into hdfs
> --
>
> Key: SPARK-21549
> URL: https://issues.apache.org/jira/browse/SPARK-21549
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
> Environment: spark 2.2.0
> scala 2.11
>Reporter: Sergey Zhemzhitsky
> Fix For: 2.2.1
>
>
> Spark fails to complete job correctly in case of custom OutputFormat 
> implementations.
> There are OutputFormat implementations which do not need to use 
> *mapreduce.output.fileoutputformat.outputdir* standard hadoop property.
> [But spark reads this property from the 
> configuration|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala#L79]
>  while setting up an OutputCommitter
> {code:javascript}
> val committer = FileCommitProtocol.instantiate(
>   className = classOf[HadoopMapReduceCommitProtocol].getName,
>   jobId = stageId.toString,
>   outputPath = conf.value.get("mapreduce.output.fileoutputformat.outputdir"),
>   isAppend = false).asInstanceOf[HadoopMapReduceCommitProtocol]
> committer.setupJob(jobContext)
> {code}
> ... and then uses this property later on while [commiting the 
> job|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L132],
>  [aborting the 
> job|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L141],
>  [creating task's temp 
> path|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L95]
> In that cases when the job completes then following exception is thrown
> {code}
> Can not create a Path from a null string
> java.lang.IllegalArgumentException: Can not create a Path from a null string
>   at org.apache.hadoop.fs.Path.checkPathArg(Path.java:123)
>   at org.apache.hadoop.fs.Path.(Path.java:135)
>   at org.apache.hadoop.fs.Path.(Path.java:89)
>   at 
> org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.absPathStagingDir(HadoopMapReduceCommitProtocol.scala:58)
>   at 
> org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.abortJob(HadoopMapReduceCommitProtocol.scala:141)
>   at 
> org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.write(SparkHadoopMapReduceWriter.scala:106)
>   at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1085)
>   at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)
>   at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>   at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
>   at 
> org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1084)
>   ...
> {code}
> So it seems that all the jobs which use OutputFormats which don't write data 
> into HDFS-compatible file systems are broken.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21549) Spark fails to complete job correctly in case of OutputFormat which do not write into hdfs

2017-08-21 Thread Sergey Zhemzhitsky (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16135545#comment-16135545
 ] 

Sergey Zhemzhitsky commented on SPARK-21549:


[~mridulm80], [~WeiqingYang] does it make sense to implement the provided 
workaround with valid and writable directory just within 
SparkHadoopMapRedceWriter if the needed property is not set, to prevent 
affecting all the jobs which don't write to hdfs, at least until there is a 
better solution?

> Spark fails to complete job correctly in case of OutputFormat which do not 
> write into hdfs
> --
>
> Key: SPARK-21549
> URL: https://issues.apache.org/jira/browse/SPARK-21549
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
> Environment: spark 2.2.0
> scala 2.11
>Reporter: Sergey Zhemzhitsky
>
> Spark fails to complete job correctly in case of custom OutputFormat 
> implementations.
> There are OutputFormat implementations which do not need to use 
> *mapreduce.output.fileoutputformat.outputdir* standard hadoop property.
> [But spark reads this property from the 
> configuration|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala#L79]
>  while setting up an OutputCommitter
> {code:javascript}
> val committer = FileCommitProtocol.instantiate(
>   className = classOf[HadoopMapReduceCommitProtocol].getName,
>   jobId = stageId.toString,
>   outputPath = conf.value.get("mapreduce.output.fileoutputformat.outputdir"),
>   isAppend = false).asInstanceOf[HadoopMapReduceCommitProtocol]
> committer.setupJob(jobContext)
> {code}
> ... and then uses this property later on while [commiting the 
> job|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L132],
>  [aborting the 
> job|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L141],
>  [creating task's temp 
> path|https://github.com/apache/spark/blob/v2.2.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L95]
> In that cases when the job completes then following exception is thrown
> {code}
> Can not create a Path from a null string
> java.lang.IllegalArgumentException: Can not create a Path from a null string
>   at org.apache.hadoop.fs.Path.checkPathArg(Path.java:123)
>   at org.apache.hadoop.fs.Path.(Path.java:135)
>   at org.apache.hadoop.fs.Path.(Path.java:89)
>   at 
> org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.absPathStagingDir(HadoopMapReduceCommitProtocol.scala:58)
>   at 
> org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.abortJob(HadoopMapReduceCommitProtocol.scala:141)
>   at 
> org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.write(SparkHadoopMapReduceWriter.scala:106)
>   at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1085)
>   at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)
>   at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>   at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
>   at 
> org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1084)
>   ...
> {code}
> So it seems that all the jobs which use OutputFormats which don't write data 
> into HDFS-compatible file systems are broken.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22150) PeriodicCheckpointer fails with FileNotFoundException in case of dependant RDDs

2018-06-25 Thread Sergey Zhemzhitsky (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-22150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16522015#comment-16522015
 ] 

Sergey Zhemzhitsky commented on SPARK-22150:


Just a kind remainder...

> PeriodicCheckpointer fails with FileNotFoundException in case of dependant 
> RDDs
> ---
>
> Key: SPARK-22150
> URL: https://issues.apache.org/jira/browse/SPARK-22150
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.6.0, 1.6.1, 1.6.2, 1.6.3, 2.0.0, 
> 2.0.1, 2.0.2, 2.1.0, 2.1.1, 2.2.0
> Environment: spark 2.2.0
> scala 2.11
>Reporter: Sergey Zhemzhitsky
>Priority: Major
>
> PeriodicCheckpointer fails with FileNotFoundException in case of 
> checkpointing dependant RDDs (consider iterative algorithms), i.e. when the 
> RDD to checkpoint depends on already checkpointed RDD.
> Here is the exception
> {code}
> Job aborted due to stage failure: Task creation failed: 
> java.io.FileNotFoundException: File 
> file:/tmp/spark-e2046e49-5d8a-4633-b525-52a00bab32f0/17d9b542-d48d-4f23-b368-3957a42644a6/rdd-20/part-0
>  does not exist
> java.io.FileNotFoundException: File 
> file:/tmp/spark-e2046e49-5d8a-4633-b525-52a00bab32f0/17d9b542-d48d-4f23-b368-3957a42644a6/rdd-20/part-0
>  does not exist
>   at 
> org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:539)
>   at 
> org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:752)
>   at 
> org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:529)
>   at 
> org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:409)
>   at 
> org.apache.spark.rdd.ReliableCheckpointRDD.getPreferredLocations(ReliableCheckpointRDD.scala:89)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:274)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:274)
>   at scala.Option.map(Option.scala:146)
>   at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:274)
>   at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1697)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply$mcVI$sp(DAGScheduler.scala:1708)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply(DAGScheduler.scala:1707)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply(DAGScheduler.scala:1707)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1707)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1705)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1705)
>   at 
> org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1671)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$16.apply(DAGScheduler.scala:989)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$16.apply(DAGScheduler.scala:987)
> {code}
> The issue seems to be in this [piece of 
> code|https://github.com/apache/spark/blob/0a7f5f2798b6e8b2ba15e8b3aa07d5953ad1c695/core/src/main/scala/org/apache/spark/util/PeriodicCheckpointer.scala#L94]
> {code:java}
> if (checkpointInterval != -1 && (updateCount % checkpointInterval) == 0
>   && sc.getCheckpointDir.nonEmpty) {
>   // Add new checkpoint before removing old checkpoints.
>   checkpoint(newData)
>   checkpointQueue.enqueue(newData)
>   // Remove checkpoints before the latest one.
>   var canDelete = true
>   while (checkpointQueue.size > 1 && canDelete) {
> // Delete the oldest checkpoint only if the next checkpoint exists.
> if (isCheckpointed(checkpointQueue.head)) {
>   removeCheckpointFile()
> } else {
>   canDelete = false
> }
>   }
> }
> {code}
> Given that _checkpointQueue.head_ is checkpointed and materialized and 
> _newData_ depends on _checkpointQueue.head_, then the exception happens on 
> action of RDDs representing _newData_



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (SPARK-22184) GraphX fails in case of insufficient memory and checkpoints enabled

2018-06-25 Thread Sergey Zhemzhitsky (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-22184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16522017#comment-16522017
 ] 

Sergey Zhemzhitsky commented on SPARK-22184:


Just a kind remainder...

> GraphX fails in case of insufficient memory and checkpoints enabled
> ---
>
> Key: SPARK-22184
> URL: https://issues.apache.org/jira/browse/SPARK-22184
> Project: Spark
>  Issue Type: Bug
>  Components: GraphX
>Affects Versions: 2.2.0
> Environment: spark 2.2.0
> scala 2.11
>Reporter: Sergey Zhemzhitsky
>Priority: Major
>
> GraphX fails with FileNotFoundException in case of insufficient memory when 
> checkpoints are enabled.
> Here is the stacktrace 
> {code}
> Job aborted due to stage failure: Task creation failed: 
> java.io.FileNotFoundException: File 
> file:/tmp/spark-90119695-a126-47b5-b047-d656fee10c17/9b16e2a9-6c80-45eb-8736-bbb6eb840146/rdd-28/part-0
>  does not exist
> java.io.FileNotFoundException: File 
> file:/tmp/spark-90119695-a126-47b5-b047-d656fee10c17/9b16e2a9-6c80-45eb-8736-bbb6eb840146/rdd-28/part-0
>  does not exist
>   at 
> org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:539)
>   at 
> org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:752)
>   at 
> org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:529)
>   at 
> org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:409)
>   at 
> org.apache.spark.rdd.ReliableCheckpointRDD.getPreferredLocations(ReliableCheckpointRDD.scala:89)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:274)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:274)
>   at scala.Option.map(Option.scala:146)
>   at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:274)
>   at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1697)
> ...
> {code}
> As GraphX uses cached RDDs intensively, the issue is only reproducible when 
> previously cached and checkpointed Vertex and Edge RDDs are evicted from 
> memory and forced to be read from disk. 
> For testing purposes the following parameters may be set to emulate low 
> memory environment
> {code}
> val sparkConf = new SparkConf()
>   .set("spark.graphx.pregel.checkpointInterval", "2")
>   // set testing memory to evict cached RDDs from it and force
>   // reading checkpointed RDDs from disk
>   .set("spark.testing.reservedMemory", "128")
>   .set("spark.testing.memory", "256")
> {code}
> This issue also includes SPARK-22150 and cannot be fixed until SPARK-22150 is 
> fixed too.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24154) AccumulatorV2 loses type information during serialization

2018-05-02 Thread Sergey Zhemzhitsky (JIRA)
Sergey Zhemzhitsky created SPARK-24154:
--

 Summary: AccumulatorV2 loses type information during serialization
 Key: SPARK-24154
 URL: https://issues.apache.org/jira/browse/SPARK-24154
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.3.0, 2.2.1, 2.2.0, 2.3.1
 Environment: Scala 2.11
Spark 2.2.0
Reporter: Sergey Zhemzhitsky


AccumulatorV2 loses type information during serialization.
It happens 
[here|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L164]
 during *writeReplace* call
{code:scala}
final protected def writeReplace(): Any = {
  if (atDriverSide) {
if (!isRegistered) {
  throw new UnsupportedOperationException(
"Accumulator must be registered before send to executor")
}
val copyAcc = copyAndReset()
assert(copyAcc.isZero, "copyAndReset must return a zero value copy")
val isInternalAcc = name.isDefined && 
name.get.startsWith(InternalAccumulator.METRICS_PREFIX)
if (isInternalAcc) {
  // Do not serialize the name of internal accumulator and send it to 
executor.
  copyAcc.metadata = metadata.copy(name = None)
} else {
  // For non-internal accumulators, we still need to send the name because 
users may need to
  // access the accumulator name at executor side, or they may keep the 
accumulators sent from
  // executors and access the name when the registered accumulator is 
already garbage
  // collected(e.g. SQLMetrics).
  copyAcc.metadata = metadata
}
copyAcc
  } else {
this
  }
}
{code}

It means that it is hardly possible to create new accumulators easily by adding 
new behaviour to existing ones by means of mix-ins or inheritance (without 
overriding *copy*).

For example the following snippet ...
{code:scala}
trait TripleCount {
  self: LongAccumulator =>
  abstract override def add(v: jl.Long): Unit = {
self.add(v * 3)
  }
}
val acc = new LongAccumulator with TripleCount
sc.register(acc)

val data = 1 to 10
val rdd = sc.makeRDD(data, 5)

rdd.foreach(acc.add(_))
acc.value shouldBe 3 * data.sum
{code}

... fails with

{code:none}
org.scalatest.exceptions.TestFailedException: 55 was not equal to 165
  at org.scalatest.MatchersHelper$.indicateFailure(MatchersHelper.scala:340)
  at org.scalatest.Matchers$AnyShouldWrapper.shouldBe(Matchers.scala:6864)
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24154) AccumulatorV2 loses type information during serialization

2018-05-02 Thread Sergey Zhemzhitsky (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Zhemzhitsky updated SPARK-24154:
---
Description: 
AccumulatorV2 loses type information during serialization.
It happens 
[here|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L164]
 during *writeReplace* call
{code:scala}
final protected def writeReplace(): Any = {
  if (atDriverSide) {
if (!isRegistered) {
  throw new UnsupportedOperationException(
"Accumulator must be registered before send to executor")
}
val copyAcc = copyAndReset()
assert(copyAcc.isZero, "copyAndReset must return a zero value copy")
val isInternalAcc = name.isDefined && 
name.get.startsWith(InternalAccumulator.METRICS_PREFIX)
if (isInternalAcc) {
  // Do not serialize the name of internal accumulator and send it to 
executor.
  copyAcc.metadata = metadata.copy(name = None)
} else {
  // For non-internal accumulators, we still need to send the name because 
users may need to
  // access the accumulator name at executor side, or they may keep the 
accumulators sent from
  // executors and access the name when the registered accumulator is 
already garbage
  // collected(e.g. SQLMetrics).
  copyAcc.metadata = metadata
}
copyAcc
  } else {
this
  }
}
{code}

It means that it is hardly possible to create new accumulators easily by adding 
new behaviour to existing ones by means of mix-ins or inheritance (without 
overriding *copy*).

For example the following snippet ...
{code:scala}
trait TripleCount {
  self: LongAccumulator =>
  abstract override def add(v: jl.Long): Unit = {
self.add(v * 3)
  }
}
val acc = new LongAccumulator with TripleCount
sc.register(acc)

val data = 1 to 10
val rdd = sc.makeRDD(data, 5)

rdd.foreach(acc.add(_))
acc.value shouldBe 3 * data.sum
{code}

... fails with

{code:none}
org.scalatest.exceptions.TestFailedException: 55 was not equal to 165
  at org.scalatest.MatchersHelper$.indicateFailure(MatchersHelper.scala:340)
  at org.scalatest.Matchers$AnyShouldWrapper.shouldBe(Matchers.scala:6864)
{code}

Also such a behaviour seems to be error prone and confusing because an 
implementor gets not the same thing as he/she sees in the code.

  was:
AccumulatorV2 loses type information during serialization.
It happens 
[here|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L164]
 during *writeReplace* call
{code:scala}
final protected def writeReplace(): Any = {
  if (atDriverSide) {
if (!isRegistered) {
  throw new UnsupportedOperationException(
"Accumulator must be registered before send to executor")
}
val copyAcc = copyAndReset()
assert(copyAcc.isZero, "copyAndReset must return a zero value copy")
val isInternalAcc = name.isDefined && 
name.get.startsWith(InternalAccumulator.METRICS_PREFIX)
if (isInternalAcc) {
  // Do not serialize the name of internal accumulator and send it to 
executor.
  copyAcc.metadata = metadata.copy(name = None)
} else {
  // For non-internal accumulators, we still need to send the name because 
users may need to
  // access the accumulator name at executor side, or they may keep the 
accumulators sent from
  // executors and access the name when the registered accumulator is 
already garbage
  // collected(e.g. SQLMetrics).
  copyAcc.metadata = metadata
}
copyAcc
  } else {
this
  }
}
{code}

It means that it is hardly possible to create new accumulators easily by adding 
new behaviour to existing ones by means of mix-ins or inheritance (without 
overriding *copy*).

For example the following snippet ...
{code:scala}
trait TripleCount {
  self: LongAccumulator =>
  abstract override def add(v: jl.Long): Unit = {
self.add(v * 3)
  }
}
val acc = new LongAccumulator with TripleCount
sc.register(acc)

val data = 1 to 10
val rdd = sc.makeRDD(data, 5)

rdd.foreach(acc.add(_))
acc.value shouldBe 3 * data.sum
{code}

... fails with

{code:none}
org.scalatest.exceptions.TestFailedException: 55 was not equal to 165
  at org.scalatest.MatchersHelper$.indicateFailure(MatchersHelper.scala:340)
  at org.scalatest.Matchers$AnyShouldWrapper.shouldBe(Matchers.scala:6864)
{code}


> AccumulatorV2 loses type information during serialization
> -
>
> Key: SPARK-24154
> URL: https://issues.apache.org/jira/browse/SPARK-24154
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0, 2.2.1, 2.3.0, 2.3.1
> Environment: Scala 2.11
> Spark 2.2.0
>Reporter: Sergey Zhemzhitsky
>Priority: Major
>
> AccumulatorV2 loses 

[jira] [Commented] (SPARK-24154) AccumulatorV2 loses type information during serialization

2018-05-03 Thread Sergey Zhemzhitsky (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462630#comment-16462630
 ] 

Sergey Zhemzhitsky commented on SPARK-24154:


> If users have to support mixin traits, they can still use accumulator v1.

But accumulators V1 are deprecated and will be removed one day I believe.





> AccumulatorV2 loses type information during serialization
> -
>
> Key: SPARK-24154
> URL: https://issues.apache.org/jira/browse/SPARK-24154
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0, 2.2.1, 2.3.0, 2.3.1
> Environment: Scala 2.11
> Spark 2.2.0
>Reporter: Sergey Zhemzhitsky
>Priority: Major
>
> AccumulatorV2 loses type information during serialization.
> It happens 
> [here|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L164]
>  during *writeReplace* call
> {code:scala}
> final protected def writeReplace(): Any = {
>   if (atDriverSide) {
> if (!isRegistered) {
>   throw new UnsupportedOperationException(
> "Accumulator must be registered before send to executor")
> }
> val copyAcc = copyAndReset()
> assert(copyAcc.isZero, "copyAndReset must return a zero value copy")
> val isInternalAcc = name.isDefined && 
> name.get.startsWith(InternalAccumulator.METRICS_PREFIX)
> if (isInternalAcc) {
>   // Do not serialize the name of internal accumulator and send it to 
> executor.
>   copyAcc.metadata = metadata.copy(name = None)
> } else {
>   // For non-internal accumulators, we still need to send the name 
> because users may need to
>   // access the accumulator name at executor side, or they may keep the 
> accumulators sent from
>   // executors and access the name when the registered accumulator is 
> already garbage
>   // collected(e.g. SQLMetrics).
>   copyAcc.metadata = metadata
> }
> copyAcc
>   } else {
> this
>   }
> }
> {code}
> It means that it is hardly possible to create new accumulators easily by 
> adding new behaviour to existing ones by means of mix-ins or inheritance 
> (without overriding *copy*).
> For example the following snippet ...
> {code:scala}
> trait TripleCount {
>   self: LongAccumulator =>
>   abstract override def add(v: jl.Long): Unit = {
> self.add(v * 3)
>   }
> }
> val acc = new LongAccumulator with TripleCount
> sc.register(acc)
> val data = 1 to 10
> val rdd = sc.makeRDD(data, 5)
> rdd.foreach(acc.add(_))
> acc.value shouldBe 3 * data.sum
> {code}
> ... fails with
> {code:none}
> org.scalatest.exceptions.TestFailedException: 55 was not equal to 165
>   at org.scalatest.MatchersHelper$.indicateFailure(MatchersHelper.scala:340)
>   at org.scalatest.Matchers$AnyShouldWrapper.shouldBe(Matchers.scala:6864)
> {code}
> Also such a behaviour seems to be error prone and confusing because an 
> implementor gets not the same thing as he/she sees in the code.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22150) PeriodicCheckpointer fails with FileNotFoundException in case of dependant RDDs

2017-10-26 Thread Sergey Zhemzhitsky (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221018#comment-16221018
 ] 

Sergey Zhemzhitsky commented on SPARK-22150:


Hello guys, is there a chance for this issue to be looked through as well as 
the corresponding PR? 
It would be really great for the fix to be included into spark 2.2.1/2.3.0.

> PeriodicCheckpointer fails with FileNotFoundException in case of dependant 
> RDDs
> ---
>
> Key: SPARK-22150
> URL: https://issues.apache.org/jira/browse/SPARK-22150
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.6.0, 1.6.1, 1.6.2, 1.6.3, 2.0.0, 
> 2.0.1, 2.0.2, 2.1.0, 2.1.1, 2.2.0
> Environment: spark 2.2.0
> scala 2.11
>Reporter: Sergey Zhemzhitsky
>
> PeriodicCheckpointer fails with FileNotFoundException in case of 
> checkpointing dependant RDDs (consider iterative algorithms), i.e. when the 
> RDD to checkpoint depends on already checkpointed RDD.
> Here is the exception
> {code}
> Job aborted due to stage failure: Task creation failed: 
> java.io.FileNotFoundException: File 
> file:/tmp/spark-e2046e49-5d8a-4633-b525-52a00bab32f0/17d9b542-d48d-4f23-b368-3957a42644a6/rdd-20/part-0
>  does not exist
> java.io.FileNotFoundException: File 
> file:/tmp/spark-e2046e49-5d8a-4633-b525-52a00bab32f0/17d9b542-d48d-4f23-b368-3957a42644a6/rdd-20/part-0
>  does not exist
>   at 
> org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:539)
>   at 
> org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:752)
>   at 
> org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:529)
>   at 
> org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:409)
>   at 
> org.apache.spark.rdd.ReliableCheckpointRDD.getPreferredLocations(ReliableCheckpointRDD.scala:89)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:274)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:274)
>   at scala.Option.map(Option.scala:146)
>   at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:274)
>   at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1697)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply$mcVI$sp(DAGScheduler.scala:1708)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply(DAGScheduler.scala:1707)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply(DAGScheduler.scala:1707)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1707)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1705)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1705)
>   at 
> org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1671)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$16.apply(DAGScheduler.scala:989)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$16.apply(DAGScheduler.scala:987)
> {code}
> The issue seems to be in this [piece of 
> code|https://github.com/apache/spark/blob/0a7f5f2798b6e8b2ba15e8b3aa07d5953ad1c695/core/src/main/scala/org/apache/spark/util/PeriodicCheckpointer.scala#L94]
> {code:java}
> if (checkpointInterval != -1 && (updateCount % checkpointInterval) == 0
>   && sc.getCheckpointDir.nonEmpty) {
>   // Add new checkpoint before removing old checkpoints.
>   checkpoint(newData)
>   checkpointQueue.enqueue(newData)
>   // Remove checkpoints before the latest one.
>   var canDelete = true
>   while (checkpointQueue.size > 1 && canDelete) {
> // Delete the oldest checkpoint only if the next checkpoint exists.
> if (isCheckpointed(checkpointQueue.head)) {
>   removeCheckpointFile()
> } else {
>   canDelete = false
> }
>   }
> }
> {code}
> Given that _checkpointQueue.head_ is checkpointed and materialized and 
> _newData_ depends on _checkpointQueue.head_, then the exception happens on 
> action of 

[jira] [Commented] (SPARK-22184) GraphX fails in case of insufficient memory and checkpoints enabled

2017-10-26 Thread Sergey Zhemzhitsky (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221019#comment-16221019
 ] 

Sergey Zhemzhitsky commented on SPARK-22184:


Hello guys, is there a chance for this issue to be looked through as well as 
the corresponding PR? 
It would be really great for the fix to be included into spark 2.2.1/2.3.0.

> GraphX fails in case of insufficient memory and checkpoints enabled
> ---
>
> Key: SPARK-22184
> URL: https://issues.apache.org/jira/browse/SPARK-22184
> Project: Spark
>  Issue Type: Bug
>  Components: GraphX
>Affects Versions: 2.2.0
> Environment: spark 2.2.0
> scala 2.11
>Reporter: Sergey Zhemzhitsky
>
> GraphX fails with FileNotFoundException in case of insufficient memory when 
> checkpoints are enabled.
> Here is the stacktrace 
> {code}
> Job aborted due to stage failure: Task creation failed: 
> java.io.FileNotFoundException: File 
> file:/tmp/spark-90119695-a126-47b5-b047-d656fee10c17/9b16e2a9-6c80-45eb-8736-bbb6eb840146/rdd-28/part-0
>  does not exist
> java.io.FileNotFoundException: File 
> file:/tmp/spark-90119695-a126-47b5-b047-d656fee10c17/9b16e2a9-6c80-45eb-8736-bbb6eb840146/rdd-28/part-0
>  does not exist
>   at 
> org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:539)
>   at 
> org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:752)
>   at 
> org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:529)
>   at 
> org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:409)
>   at 
> org.apache.spark.rdd.ReliableCheckpointRDD.getPreferredLocations(ReliableCheckpointRDD.scala:89)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:274)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:274)
>   at scala.Option.map(Option.scala:146)
>   at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:274)
>   at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1697)
> ...
> {code}
> As GraphX uses cached RDDs intensively, the issue is only reproducible when 
> previously cached and checkpointed Vertex and Edge RDDs are evicted from 
> memory and forced to be read from disk. 
> For testing purposes the following parameters may be set to emulate low 
> memory environment
> {code}
> val sparkConf = new SparkConf()
>   .set("spark.graphx.pregel.checkpointInterval", "2")
>   // set testing memory to evict cached RDDs from it and force
>   // reading checkpointed RDDs from disk
>   .set("spark.testing.reservedMemory", "128")
>   .set("spark.testing.memory", "256")
> {code}
> This issue also includes SPARK-22150 and cannot be fixed until SPARK-22150 is 
> fixed too.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23697) Accumulators of Spark 1.x no longer work with Spark 2.x

2018-03-15 Thread Sergey Zhemzhitsky (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Zhemzhitsky updated SPARK-23697:
---
Description: 
I've noticed that accumulators of Spark 1.x no longer work with Spark 2.x 
failing with
 java.lang.AssertionError: assertion failed: copyAndReset must return a zero 
value copy

It happens while serializing an accumulator 
[here|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L165]
{code:java}
val copyAcc = copyAndReset()
assert(copyAcc.isZero, "copyAndReset must return a zero value copy"){code}
... although copyAndReset returns zero-value copy for sure, just consider the 
accumulator below
{code:java}
val concatParam = new AccumulatorParam[jl.StringBuilder] {
  override def zero(initialValue: jl.StringBuilder): jl.StringBuilder = new 
jl.StringBuilder()
  override def addInPlace(r1: jl.StringBuilder, r2: jl.StringBuilder): 
jl.StringBuilder = r1.append(r2)
}{code}
So, Spark treats zero value as non-zero due to how 
[isZero|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L489]
 is implemented in LegacyAccumulatorWrapper.
{code:java}
override def isZero: Boolean = _value == param.zero(initialValue){code}
All this means that the values to be accumulated must implement equals and 
hashCode, otherwise isZero is very likely to always return false.

So I'm wondering whether the assertion 
{code:java}
assert(copyAcc.isZero, "copyAndReset must return a zero value copy"){code}
is really necessary and whether it can be safely removed from there?

If not - is it ok to just override writeReplace for LegacyAccumulatorWrapperto 
prevent such failures?

  was:
I've noticed that accumulators of Spark 1.x no longer work with Spark 2.x 
failing with
 java.lang.AssertionError: assertion failed: copyAndReset must return a zero 
value copy

It happens while serializing an accumulator 
[here|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L165]
{code:java}
val copyAcc = copyAndReset()
assert(copyAcc.isZero, "copyAndReset must return a zero value copy"){code}
... although copyAndReset returns zero-value copy for sure, just consider the 
accumulator below
{code:java}
val concatParam = new AccumulatorParam[jl.StringBuilder] {
  override def zero(initialValue: jl.StringBuilder): jl.StringBuilder = new 
jl.StringBuilder()
  override def addInPlace(r1: jl.StringBuilder, r2: jl.StringBuilder): 
jl.StringBuilder = r1.append(r2)
}{code}
So, Spark treats zero value as non-zero due to how 
[isZero|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L489]
 is implemented in LegacyAccumulatorWrapper.
{code:java}
override def isZero: Boolean = _value == param.zero(initialValue){code}
All this means that the values to be accumulated must implement equals and 
hashCode, otherwise isZero is very likely to always return false.

So I'm wondering whether the assertion 
{code:java}
assert(copyAcc.isZero, "copyAndReset must return a zero value copy"){code}
is really necessary and whether it can be safely removed from there. 


> Accumulators of Spark 1.x no longer work with Spark 2.x
> ---
>
> Key: SPARK-23697
> URL: https://issues.apache.org/jira/browse/SPARK-23697
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0, 2.2.1
> Environment: Spark 2.2.0
> Scala 2.11
>Reporter: Sergey Zhemzhitsky
>Priority: Major
>
> I've noticed that accumulators of Spark 1.x no longer work with Spark 2.x 
> failing with
>  java.lang.AssertionError: assertion failed: copyAndReset must return a zero 
> value copy
> It happens while serializing an accumulator 
> [here|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L165]
> {code:java}
> val copyAcc = copyAndReset()
> assert(copyAcc.isZero, "copyAndReset must return a zero value copy"){code}
> ... although copyAndReset returns zero-value copy for sure, just consider the 
> accumulator below
> {code:java}
> val concatParam = new AccumulatorParam[jl.StringBuilder] {
>   override def zero(initialValue: jl.StringBuilder): jl.StringBuilder = new 
> jl.StringBuilder()
>   override def addInPlace(r1: jl.StringBuilder, r2: jl.StringBuilder): 
> jl.StringBuilder = r1.append(r2)
> }{code}
> So, Spark treats zero value as non-zero due to how 
> 

[jira] [Updated] (SPARK-23697) Accumulators of Spark 1.x no longer work with Spark 2.x

2018-03-15 Thread Sergey Zhemzhitsky (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Zhemzhitsky updated SPARK-23697:
---
Description: 
I've noticed that accumulators of Spark 1.x no longer work with Spark 2.x 
failing with
 java.lang.AssertionError: assertion failed: copyAndReset must return a zero 
value copy

It happens while serializing an accumulator 
[here|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L165]
{code:java}
val copyAcc = copyAndReset()
assert(copyAcc.isZero, "copyAndReset must return a zero value copy"){code}
... although copyAndReset returns zero-value copy for sure, just consider the 
accumulator below
{code:java}
val concatParam = new AccumulatorParam[jl.StringBuilder] {
  override def zero(initialValue: jl.StringBuilder): jl.StringBuilder = new 
jl.StringBuilder()
  override def addInPlace(r1: jl.StringBuilder, r2: jl.StringBuilder): 
jl.StringBuilder = r1.append(r2)
}{code}
So, Spark treats zero value as non-zero due to how 
[isZero|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L489]
 is implemented in LegacyAccumulatorWrapper.
{code:java}
override def isZero: Boolean = _value == param.zero(initialValue){code}
All this means that the values to be accumulated must implement equals and 
hashCode, otherwise isZero is very likely to always return false.

So I'm wondering whether the assertion 
{code:java}
assert(copyAcc.isZero, "copyAndReset must return a zero value copy"){code}
is really necessary and whether it can be safely removed from there. 

  was:
I've noticed that accumulators of Spark 1.x no longer work with Spark 2.x 
failing with
 java.lang.AssertionError: assertion failed: copyAndReset must return a zero 
value copy

It happens while serializing an accumulator 
[here|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L165]
{code:java}
val copyAcc = copyAndReset()
assert(copyAcc.isZero, "copyAndReset must return a zero value copy"){code}
... although copyAndReset returns zero-value copy for sure, just consider the 
accumulator below
{code:java}
val concatParam = new AccumulatorParam[jl.StringBuilder] {
  override def zero(initialValue: jl.StringBuilder): jl.StringBuilder = new 
jl.StringBuilder()
  override def addInPlace(r1: jl.StringBuilder, r2: jl.StringBuilder): 
jl.StringBuilder = r1.append(r2)
}{code}
So, Spark treats zero value as non-zero due to how 
[isZero|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L489]
 is implemented in LegacyAccumulatorWrapper
{code:java}
override def isZero: Boolean = _value == param.zero(initialValue){code}
All this means that the values to be accumulated must implement equals and 
hashCode, otherwise isZero is very likely to always return false.

So I'm wondering whether the assertion 
{code:java}
assert(copyAcc.isZero, "copyAndReset must return a zero value copy"){code}
is really necessary and whether it can be safely removed from there. 


> Accumulators of Spark 1.x no longer work with Spark 2.x
> ---
>
> Key: SPARK-23697
> URL: https://issues.apache.org/jira/browse/SPARK-23697
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0, 2.2.1
> Environment: Spark 2.2.0
> Scala 2.11
>Reporter: Sergey Zhemzhitsky
>Priority: Major
>
> I've noticed that accumulators of Spark 1.x no longer work with Spark 2.x 
> failing with
>  java.lang.AssertionError: assertion failed: copyAndReset must return a zero 
> value copy
> It happens while serializing an accumulator 
> [here|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L165]
> {code:java}
> val copyAcc = copyAndReset()
> assert(copyAcc.isZero, "copyAndReset must return a zero value copy"){code}
> ... although copyAndReset returns zero-value copy for sure, just consider the 
> accumulator below
> {code:java}
> val concatParam = new AccumulatorParam[jl.StringBuilder] {
>   override def zero(initialValue: jl.StringBuilder): jl.StringBuilder = new 
> jl.StringBuilder()
>   override def addInPlace(r1: jl.StringBuilder, r2: jl.StringBuilder): 
> jl.StringBuilder = r1.append(r2)
> }{code}
> So, Spark treats zero value as non-zero due to how 
> [isZero|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L489]
>  is implemented in LegacyAccumulatorWrapper.
> {code:java}
> override def isZero: 

[jira] [Updated] (SPARK-23697) Accumulators of Spark 1.x no longer work with Spark 2.x

2018-03-15 Thread Sergey Zhemzhitsky (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Zhemzhitsky updated SPARK-23697:
---
Description: 
I've noticed that accumulators of Spark 1.x no longer work with Spark 2.x 
failing with
 java.lang.AssertionError: assertion failed: copyAndReset must return a zero 
value copy

It happens while serializing an accumulator 
[here|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L165]
{code:java}
val copyAcc = copyAndReset()
assert(copyAcc.isZero, "copyAndReset must return a zero value copy"){code}
... although copyAndReset returns zero-value copy for sure, just consider the 
accumulator below
{code:java}
val concatParam = new AccumulatorParam[jl.StringBuilder] {
  override def zero(initialValue: jl.StringBuilder): jl.StringBuilder = new 
jl.StringBuilder()
  override def addInPlace(r1: jl.StringBuilder, r2: jl.StringBuilder): 
jl.StringBuilder = r1.append(r2)
}{code}
So, Spark treats zero value as non-zero due to how 
[isZero|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L489]
 is implemented in LegacyAccumulatorWrapper.
{code:java}
override def isZero: Boolean = _value == param.zero(initialValue){code}
All this means that the values to be accumulated must implement equals and 
hashCode, otherwise isZero is very likely to always return false.

So I'm wondering whether the assertion 
{code:java}
assert(copyAcc.isZero, "copyAndReset must return a zero value copy"){code}
is really necessary and whether it can be safely removed from there?

If not - is it ok to just override writeReplace for LegacyAccumulatorWrapper to 
prevent such failures?

  was:
I've noticed that accumulators of Spark 1.x no longer work with Spark 2.x 
failing with
 java.lang.AssertionError: assertion failed: copyAndReset must return a zero 
value copy

It happens while serializing an accumulator 
[here|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L165]
{code:java}
val copyAcc = copyAndReset()
assert(copyAcc.isZero, "copyAndReset must return a zero value copy"){code}
... although copyAndReset returns zero-value copy for sure, just consider the 
accumulator below
{code:java}
val concatParam = new AccumulatorParam[jl.StringBuilder] {
  override def zero(initialValue: jl.StringBuilder): jl.StringBuilder = new 
jl.StringBuilder()
  override def addInPlace(r1: jl.StringBuilder, r2: jl.StringBuilder): 
jl.StringBuilder = r1.append(r2)
}{code}
So, Spark treats zero value as non-zero due to how 
[isZero|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L489]
 is implemented in LegacyAccumulatorWrapper.
{code:java}
override def isZero: Boolean = _value == param.zero(initialValue){code}
All this means that the values to be accumulated must implement equals and 
hashCode, otherwise isZero is very likely to always return false.

So I'm wondering whether the assertion 
{code:java}
assert(copyAcc.isZero, "copyAndReset must return a zero value copy"){code}
is really necessary and whether it can be safely removed from there?

If not - is it ok to just override writeReplace for LegacyAccumulatorWrapperto 
prevent such failures?


> Accumulators of Spark 1.x no longer work with Spark 2.x
> ---
>
> Key: SPARK-23697
> URL: https://issues.apache.org/jira/browse/SPARK-23697
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0, 2.2.1
> Environment: Spark 2.2.0
> Scala 2.11
>Reporter: Sergey Zhemzhitsky
>Priority: Major
>
> I've noticed that accumulators of Spark 1.x no longer work with Spark 2.x 
> failing with
>  java.lang.AssertionError: assertion failed: copyAndReset must return a zero 
> value copy
> It happens while serializing an accumulator 
> [here|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L165]
> {code:java}
> val copyAcc = copyAndReset()
> assert(copyAcc.isZero, "copyAndReset must return a zero value copy"){code}
> ... although copyAndReset returns zero-value copy for sure, just consider the 
> accumulator below
> {code:java}
> val concatParam = new AccumulatorParam[jl.StringBuilder] {
>   override def zero(initialValue: jl.StringBuilder): jl.StringBuilder = new 
> jl.StringBuilder()
>   override def addInPlace(r1: jl.StringBuilder, r2: jl.StringBuilder): 
> jl.StringBuilder = r1.append(r2)
> }{code}
> So, Spark treats zero value as non-zero due to how 
> 

[jira] [Created] (SPARK-23697) Accumulators of Spark 1.x no longer work with Spark 2.x

2018-03-15 Thread Sergey Zhemzhitsky (JIRA)
Sergey Zhemzhitsky created SPARK-23697:
--

 Summary: Accumulators of Spark 1.x no longer work with Spark 2.x
 Key: SPARK-23697
 URL: https://issues.apache.org/jira/browse/SPARK-23697
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.2.1, 2.2.0
 Environment: Spark 2.2.0
Scala 2.11
Reporter: Sergey Zhemzhitsky


I've noticed that accumulators of Spark 1.x no longer work with Spark 2.x 
failing with
java.lang.AssertionError: assertion failed: copyAndReset must return a zero 
value copy

It happens while serializing an accumulator 
[here|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L165]
{code:java}
val copyAcc = copyAndReset()
assert(copyAcc.isZero, "copyAndReset must return a zero value copy"){code}
... although copyAndReset returns zero-value copy for sure, just consider the 
accumulator below
{code:java}
val concatParam = new AccumulatorParam[jl.StringBuilder] {
  override def zero(initialValue: jl.StringBuilder): jl.StringBuilder = new 
jl.StringBuilder()
  override def addInPlace(r1: jl.StringBuilder, r2: jl.StringBuilder): 
jl.StringBuilder = r1.append(r2)
}{code}

So, Spark treats zero value as non-zero due to how 
[isZero|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L489]
 is implemented in LegacyAccumulatorWrapper
{code:java}
override def isZero: Boolean = _value == param.zero(initialValue){code}

All this means that the values to be accumulated must implement equals and 
hashCode, otherwise `isZero` is very likely to always return false.

So I'm wondering whether the assertion 
{code:java}
assert(copyAcc.isZero, "copyAndReset must return a zero value copy"){code}
is really necessary and whether it can be safely removed from there. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23697) Accumulators of Spark 1.x no longer work with Spark 2.x

2018-03-15 Thread Sergey Zhemzhitsky (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Zhemzhitsky updated SPARK-23697:
---
Description: 
I've noticed that accumulators of Spark 1.x no longer work with Spark 2.x 
failing with
 java.lang.AssertionError: assertion failed: copyAndReset must return a zero 
value copy

It happens while serializing an accumulator 
[here|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L165]
{code:java}
val copyAcc = copyAndReset()
assert(copyAcc.isZero, "copyAndReset must return a zero value copy"){code}
... although copyAndReset returns zero-value copy for sure, just consider the 
accumulator below
{code:java}
val concatParam = new AccumulatorParam[jl.StringBuilder] {
  override def zero(initialValue: jl.StringBuilder): jl.StringBuilder = new 
jl.StringBuilder()
  override def addInPlace(r1: jl.StringBuilder, r2: jl.StringBuilder): 
jl.StringBuilder = r1.append(r2)
}{code}
So, Spark treats zero value as non-zero due to how 
[isZero|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L489]
 is implemented in LegacyAccumulatorWrapper
{code:java}
override def isZero: Boolean = _value == param.zero(initialValue){code}
All this means that the values to be accumulated must implement equals and 
hashCode, otherwise isZero is very likely to always return false.

So I'm wondering whether the assertion 
{code:java}
assert(copyAcc.isZero, "copyAndReset must return a zero value copy"){code}
is really necessary and whether it can be safely removed from there. 

  was:
I've noticed that accumulators of Spark 1.x no longer work with Spark 2.x 
failing with
java.lang.AssertionError: assertion failed: copyAndReset must return a zero 
value copy

It happens while serializing an accumulator 
[here|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L165]
{code:java}
val copyAcc = copyAndReset()
assert(copyAcc.isZero, "copyAndReset must return a zero value copy"){code}
... although copyAndReset returns zero-value copy for sure, just consider the 
accumulator below
{code:java}
val concatParam = new AccumulatorParam[jl.StringBuilder] {
  override def zero(initialValue: jl.StringBuilder): jl.StringBuilder = new 
jl.StringBuilder()
  override def addInPlace(r1: jl.StringBuilder, r2: jl.StringBuilder): 
jl.StringBuilder = r1.append(r2)
}{code}

So, Spark treats zero value as non-zero due to how 
[isZero|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L489]
 is implemented in LegacyAccumulatorWrapper
{code:java}
override def isZero: Boolean = _value == param.zero(initialValue){code}

All this means that the values to be accumulated must implement equals and 
hashCode, otherwise `isZero` is very likely to always return false.

So I'm wondering whether the assertion 
{code:java}
assert(copyAcc.isZero, "copyAndReset must return a zero value copy"){code}
is really necessary and whether it can be safely removed from there. 


> Accumulators of Spark 1.x no longer work with Spark 2.x
> ---
>
> Key: SPARK-23697
> URL: https://issues.apache.org/jira/browse/SPARK-23697
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0, 2.2.1
> Environment: Spark 2.2.0
> Scala 2.11
>Reporter: Sergey Zhemzhitsky
>Priority: Major
>
> I've noticed that accumulators of Spark 1.x no longer work with Spark 2.x 
> failing with
>  java.lang.AssertionError: assertion failed: copyAndReset must return a zero 
> value copy
> It happens while serializing an accumulator 
> [here|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L165]
> {code:java}
> val copyAcc = copyAndReset()
> assert(copyAcc.isZero, "copyAndReset must return a zero value copy"){code}
> ... although copyAndReset returns zero-value copy for sure, just consider the 
> accumulator below
> {code:java}
> val concatParam = new AccumulatorParam[jl.StringBuilder] {
>   override def zero(initialValue: jl.StringBuilder): jl.StringBuilder = new 
> jl.StringBuilder()
>   override def addInPlace(r1: jl.StringBuilder, r2: jl.StringBuilder): 
> jl.StringBuilder = r1.append(r2)
> }{code}
> So, Spark treats zero value as non-zero due to how 
> [isZero|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L489]
>  is implemented in LegacyAccumulatorWrapper
> {code:java}
> override def isZero: 

[jira] [Updated] (SPARK-23697) Accumulators of Spark 1.x no longer work with Spark 2.x

2018-03-16 Thread Sergey Zhemzhitsky (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Zhemzhitsky updated SPARK-23697:
---
Description: 
I've noticed that accumulators of Spark 1.x no longer work with Spark 2.x 
failing with
{code:java}
java.lang.AssertionError: assertion failed: copyAndReset must return a zero 
value copy{code}

 It happens while serializing an accumulator 
[here|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L165]
{code:java}
val copyAcc = copyAndReset()
assert(copyAcc.isZero, "copyAndReset must return a zero value copy"){code}
... although copyAndReset returns zero-value copy for sure, just consider the 
accumulator below
{code:java}
val concatParam = new AccumulatorParam[jl.StringBuilder] {
  override def zero(initialValue: jl.StringBuilder): jl.StringBuilder = new 
jl.StringBuilder()
  override def addInPlace(r1: jl.StringBuilder, r2: jl.StringBuilder): 
jl.StringBuilder = r1.append(r2)
}{code}
So, Spark treats zero value as non-zero due to how 
[isZero|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L489]
 is implemented in LegacyAccumulatorWrapper.
{code:java}
override def isZero: Boolean = _value == param.zero(initialValue){code}
All this means that the values to be accumulated must implement equals and 
hashCode, otherwise isZero is very likely to always return false.

So I'm wondering whether the assertion 
{code:java}
assert(copyAcc.isZero, "copyAndReset must return a zero value copy"){code}
is really necessary and whether it can be safely removed from there?

If not - is it ok to just override writeReplace for LegacyAccumulatorWrapper to 
prevent such failures?

  was:
I've noticed that accumulators of Spark 1.x no longer work with Spark 2.x 
failing with
 java.lang.AssertionError: assertion failed: copyAndReset must return a zero 
value copy

It happens while serializing an accumulator 
[here|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L165]
{code:java}
val copyAcc = copyAndReset()
assert(copyAcc.isZero, "copyAndReset must return a zero value copy"){code}
... although copyAndReset returns zero-value copy for sure, just consider the 
accumulator below
{code:java}
val concatParam = new AccumulatorParam[jl.StringBuilder] {
  override def zero(initialValue: jl.StringBuilder): jl.StringBuilder = new 
jl.StringBuilder()
  override def addInPlace(r1: jl.StringBuilder, r2: jl.StringBuilder): 
jl.StringBuilder = r1.append(r2)
}{code}
So, Spark treats zero value as non-zero due to how 
[isZero|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L489]
 is implemented in LegacyAccumulatorWrapper.
{code:java}
override def isZero: Boolean = _value == param.zero(initialValue){code}
All this means that the values to be accumulated must implement equals and 
hashCode, otherwise isZero is very likely to always return false.

So I'm wondering whether the assertion 
{code:java}
assert(copyAcc.isZero, "copyAndReset must return a zero value copy"){code}
is really necessary and whether it can be safely removed from there?

If not - is it ok to just override writeReplace for LegacyAccumulatorWrapper to 
prevent such failures?


> Accumulators of Spark 1.x no longer work with Spark 2.x
> ---
>
> Key: SPARK-23697
> URL: https://issues.apache.org/jira/browse/SPARK-23697
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0, 2.2.1
> Environment: Spark 2.2.0
> Scala 2.11
>Reporter: Sergey Zhemzhitsky
>Priority: Major
>
> I've noticed that accumulators of Spark 1.x no longer work with Spark 2.x 
> failing with
> {code:java}
> java.lang.AssertionError: assertion failed: copyAndReset must return a zero 
> value copy{code}
>  It happens while serializing an accumulator 
> [here|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L165]
> {code:java}
> val copyAcc = copyAndReset()
> assert(copyAcc.isZero, "copyAndReset must return a zero value copy"){code}
> ... although copyAndReset returns zero-value copy for sure, just consider the 
> accumulator below
> {code:java}
> val concatParam = new AccumulatorParam[jl.StringBuilder] {
>   override def zero(initialValue: jl.StringBuilder): jl.StringBuilder = new 
> jl.StringBuilder()
>   override def addInPlace(r1: jl.StringBuilder, r2: jl.StringBuilder): 
> jl.StringBuilder = r1.append(r2)
> }{code}
> So, Spark treats zero value as non-zero due to how 

[jira] [Updated] (SPARK-23697) Accumulators of Spark 1.x no longer work with Spark 2.x

2018-03-16 Thread Sergey Zhemzhitsky (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Zhemzhitsky updated SPARK-23697:
---
Affects Version/s: 2.3.0

> Accumulators of Spark 1.x no longer work with Spark 2.x
> ---
>
> Key: SPARK-23697
> URL: https://issues.apache.org/jira/browse/SPARK-23697
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0, 2.2.1, 2.3.0
> Environment: Spark 2.2.0
> Scala 2.11
>Reporter: Sergey Zhemzhitsky
>Priority: Major
>
> I've noticed that accumulators of Spark 1.x no longer work with Spark 2.x 
> failing with
> {code:java}
> java.lang.AssertionError: assertion failed: copyAndReset must return a zero 
> value copy{code}
>  It happens while serializing an accumulator 
> [here|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L165]
> {code:java}
> val copyAcc = copyAndReset()
> assert(copyAcc.isZero, "copyAndReset must return a zero value copy"){code}
> ... although copyAndReset returns zero-value copy for sure, just consider the 
> accumulator below
> {code:java}
> val concatParam = new AccumulatorParam[jl.StringBuilder] {
>   override def zero(initialValue: jl.StringBuilder): jl.StringBuilder = new 
> jl.StringBuilder()
>   override def addInPlace(r1: jl.StringBuilder, r2: jl.StringBuilder): 
> jl.StringBuilder = r1.append(r2)
> }{code}
> So, Spark treats zero value as non-zero due to how 
> [isZero|https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L489]
>  is implemented in LegacyAccumulatorWrapper.
> {code:java}
> override def isZero: Boolean = _value == param.zero(initialValue){code}
> All this means that the values to be accumulated must implement equals and 
> hashCode, otherwise isZero is very likely to always return false.
> So I'm wondering whether the assertion 
> {code:java}
> assert(copyAcc.isZero, "copyAndReset must return a zero value copy"){code}
> is really necessary and whether it can be safely removed from there?
> If not - is it ok to just override writeReplace for LegacyAccumulatorWrapper 
> to prevent such failures?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26114) Memory leak of PartitionedPairBuffer when coalescing after repartitionAndSortWithinPartitions

2018-11-19 Thread Sergey Zhemzhitsky (JIRA)
Sergey Zhemzhitsky created SPARK-26114:
--

 Summary: Memory leak of PartitionedPairBuffer when coalescing 
after repartitionAndSortWithinPartitions
 Key: SPARK-26114
 URL: https://issues.apache.org/jira/browse/SPARK-26114
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.4.0, 2.3.2, 2.2.2
 Environment: Spark 3.0.0-SNAPSHOT (master branch)
Scala 2.11
Yarn 2.7
Reporter: Sergey Zhemzhitsky


Trying to use _coalesce_ after shuffle-oriented transformations leads to 
OutOfMemoryErrors or _Container killed by YARN for exceeding memory limits. X 
GB of Y GB physical memory used. Consider 
boostingspark.yarn.executor.memoryOverhead_

The error happens when trying specify pretty small number of partitions in 
_coalesce_ call.

*How to reproduce?*

# Start spark-shell
{code:bash}
spark-shell \ 
  --num-executors=5 \ 
  --executor-cores=2 \ 
  --master=yarn \
  --deploy-mode=client \ 
  --conf spark.executor.memory=1g \ 
  --conf spark.dynamicAllocation.enabled=false \
  --conf spark.executor.extraJavaOptions='-XX:+HeapDumpOnOutOfMemoryError 
-XX:HeapDumpPath=/tmp -Dio.netty.noUnsafe=true'
{code}
Please note using _-Dio.netty.noUnsafe=true_ property. Preventing off-heap 
memory usage seems to be the only way to control the amount of memory used for 
shuffle data transferring by now.
Also note that the total number of cores allocated for job is 5x2=10
# Then generate some test data
{code:scala}
import org.apache.hadoop.io._ 
import org.apache.hadoop.io.compress._ 
import org.apache.commons.lang._ 
import org.apache.spark._ 

// generate 100M records of sample data 
sc.makeRDD(1 to 1000, 1000) 
  .flatMap(item => (1 to 10) 
.map(i => new Text(RandomStringUtils.randomAlphanumeric(3).toLowerCase) -> 
new Text(RandomStringUtils.randomAlphanumeric(1024 
  .saveAsSequenceFile("/tmp/random-strings", Some(classOf[GzipCodec])) 
{code}
# Run the sample job
{code:scala}
import org.apache.hadoop.io._
import org.apache.spark._
import org.apache.spark.storage._

val rdd = sc.sequenceFile("/tmp/random-strings", classOf[Text], classOf[Text])
rdd 
  .map(item => item._1.toString -> item._2.toString) 
  .repartitionAndSortWithinPartitions(new HashPartitioner(1000)) 
  .coalesce(10,false) 
  .count 
{code}
Note that the number of partitions is equal to the total number of cores 
allocated to the job.






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26114) Memory leak of PartitionedPairBuffer when coalescing after repartitionAndSortWithinPartitions

2018-11-19 Thread Sergey Zhemzhitsky (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Zhemzhitsky updated SPARK-26114:
---
Attachment: run1-noparams-dominator-tree.png

> Memory leak of PartitionedPairBuffer when coalescing after 
> repartitionAndSortWithinPartitions
> -
>
> Key: SPARK-26114
> URL: https://issues.apache.org/jira/browse/SPARK-26114
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.2, 2.3.2, 2.4.0
> Environment: Spark 3.0.0-SNAPSHOT (master branch)
> Scala 2.11
> Yarn 2.7
>Reporter: Sergey Zhemzhitsky
>Priority: Major
> Attachments: run1-noparams-dominator-tree.png
>
>
> Trying to use _coalesce_ after shuffle-oriented transformations leads to 
> OutOfMemoryErrors or _Container killed by YARN for exceeding memory limits. X 
> GB of Y GB physical memory used. Consider 
> boostingspark.yarn.executor.memoryOverhead_
> The error happens when trying specify pretty small number of partitions in 
> _coalesce_ call.
> *How to reproduce?*
> # Start spark-shell
> {code:bash}
> spark-shell \ 
>   --num-executors=5 \ 
>   --executor-cores=2 \ 
>   --master=yarn \
>   --deploy-mode=client \ 
>   --conf spark.executor.memory=1g \ 
>   --conf spark.dynamicAllocation.enabled=false \
>   --conf spark.executor.extraJavaOptions='-XX:+HeapDumpOnOutOfMemoryError 
> -XX:HeapDumpPath=/tmp -Dio.netty.noUnsafe=true'
> {code}
> Please note using _-Dio.netty.noUnsafe=true_ property. Preventing off-heap 
> memory usage seems to be the only way to control the amount of memory used 
> for shuffle data transferring by now.
> Also note that the total number of cores allocated for job is 5x2=10
> # Then generate some test data
> {code:scala}
> import org.apache.hadoop.io._ 
> import org.apache.hadoop.io.compress._ 
> import org.apache.commons.lang._ 
> import org.apache.spark._ 
> // generate 100M records of sample data 
> sc.makeRDD(1 to 1000, 1000) 
>   .flatMap(item => (1 to 10) 
> .map(i => new Text(RandomStringUtils.randomAlphanumeric(3).toLowerCase) 
> -> new Text(RandomStringUtils.randomAlphanumeric(1024 
>   .saveAsSequenceFile("/tmp/random-strings", Some(classOf[GzipCodec])) 
> {code}
> # Run the sample job
> {code:scala}
> import org.apache.hadoop.io._
> import org.apache.spark._
> import org.apache.spark.storage._
> val rdd = sc.sequenceFile("/tmp/random-strings", classOf[Text], classOf[Text])
> rdd 
>   .map(item => item._1.toString -> item._2.toString) 
>   .repartitionAndSortWithinPartitions(new HashPartitioner(1000)) 
>   .coalesce(10,false) 
>   .count 
> {code}
> Note that the number of partitions is equal to the total number of cores 
> allocated to the job.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26114) Memory leak of PartitionedPairBuffer when coalescing after repartitionAndSortWithinPartitions

2018-11-19 Thread Sergey Zhemzhitsky (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Zhemzhitsky updated SPARK-26114:
---
Attachment: run1-noparams-dominator-tree-externalsorter.png

> Memory leak of PartitionedPairBuffer when coalescing after 
> repartitionAndSortWithinPartitions
> -
>
> Key: SPARK-26114
> URL: https://issues.apache.org/jira/browse/SPARK-26114
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.2, 2.3.2, 2.4.0
> Environment: Spark 3.0.0-SNAPSHOT (master branch)
> Scala 2.11
> Yarn 2.7
>Reporter: Sergey Zhemzhitsky
>Priority: Major
> Attachments: run1-noparams-dominator-tree-externalsorter.png, 
> run1-noparams-dominator-tree.png
>
>
> Trying to use _coalesce_ after shuffle-oriented transformations leads to 
> OutOfMemoryErrors or _Container killed by YARN for exceeding memory limits. X 
> GB of Y GB physical memory used. Consider 
> boostingspark.yarn.executor.memoryOverhead_
> The error happens when trying specify pretty small number of partitions in 
> _coalesce_ call.
> *How to reproduce?*
> # Start spark-shell
> {code:bash}
> spark-shell \ 
>   --num-executors=5 \ 
>   --executor-cores=2 \ 
>   --master=yarn \
>   --deploy-mode=client \ 
>   --conf spark.executor.memory=1g \ 
>   --conf spark.dynamicAllocation.enabled=false \
>   --conf spark.executor.extraJavaOptions='-XX:+HeapDumpOnOutOfMemoryError 
> -XX:HeapDumpPath=/tmp -Dio.netty.noUnsafe=true'
> {code}
> Please note using _-Dio.netty.noUnsafe=true_ property. Preventing off-heap 
> memory usage seems to be the only way to control the amount of memory used 
> for shuffle data transferring by now.
> Also note that the total number of cores allocated for job is 5x2=10
> # Then generate some test data
> {code:scala}
> import org.apache.hadoop.io._ 
> import org.apache.hadoop.io.compress._ 
> import org.apache.commons.lang._ 
> import org.apache.spark._ 
> // generate 100M records of sample data 
> sc.makeRDD(1 to 1000, 1000) 
>   .flatMap(item => (1 to 10) 
> .map(i => new Text(RandomStringUtils.randomAlphanumeric(3).toLowerCase) 
> -> new Text(RandomStringUtils.randomAlphanumeric(1024 
>   .saveAsSequenceFile("/tmp/random-strings", Some(classOf[GzipCodec])) 
> {code}
> # Run the sample job
> {code:scala}
> import org.apache.hadoop.io._
> import org.apache.spark._
> import org.apache.spark.storage._
> val rdd = sc.sequenceFile("/tmp/random-strings", classOf[Text], classOf[Text])
> rdd 
>   .map(item => item._1.toString -> item._2.toString) 
>   .repartitionAndSortWithinPartitions(new HashPartitioner(1000)) 
>   .coalesce(10,false) 
>   .count 
> {code}
> Note that the number of partitions is equal to the total number of cores 
> allocated to the job.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26114) Memory leak of PartitionedPairBuffer when coalescing after repartitionAndSortWithinPartitions

2018-11-19 Thread Sergey Zhemzhitsky (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Zhemzhitsky updated SPARK-26114:
---
Attachment: run1-noparams-dominator-tree-externalsorter-gc-root.png

> Memory leak of PartitionedPairBuffer when coalescing after 
> repartitionAndSortWithinPartitions
> -
>
> Key: SPARK-26114
> URL: https://issues.apache.org/jira/browse/SPARK-26114
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.2, 2.3.2, 2.4.0
> Environment: Spark 3.0.0-SNAPSHOT (master branch)
> Scala 2.11
> Yarn 2.7
>Reporter: Sergey Zhemzhitsky
>Priority: Major
> Attachments: run1-noparams-dominator-tree-externalsorter-gc-root.png, 
> run1-noparams-dominator-tree-externalsorter.png, 
> run1-noparams-dominator-tree.png
>
>
> Trying to use _coalesce_ after shuffle-oriented transformations leads to 
> OutOfMemoryErrors or _Container killed by YARN for exceeding memory limits. X 
> GB of Y GB physical memory used. Consider 
> boostingspark.yarn.executor.memoryOverhead_
> The error happens when trying specify pretty small number of partitions in 
> _coalesce_ call.
> *How to reproduce?*
> # Start spark-shell
> {code:bash}
> spark-shell \ 
>   --num-executors=5 \ 
>   --executor-cores=2 \ 
>   --master=yarn \
>   --deploy-mode=client \ 
>   --conf spark.executor.memory=1g \ 
>   --conf spark.dynamicAllocation.enabled=false \
>   --conf spark.executor.extraJavaOptions='-XX:+HeapDumpOnOutOfMemoryError 
> -XX:HeapDumpPath=/tmp -Dio.netty.noUnsafe=true'
> {code}
> Please note using _-Dio.netty.noUnsafe=true_ property. Preventing off-heap 
> memory usage seems to be the only way to control the amount of memory used 
> for shuffle data transferring by now.
> Also note that the total number of cores allocated for job is 5x2=10
> # Then generate some test data
> {code:scala}
> import org.apache.hadoop.io._ 
> import org.apache.hadoop.io.compress._ 
> import org.apache.commons.lang._ 
> import org.apache.spark._ 
> // generate 100M records of sample data 
> sc.makeRDD(1 to 1000, 1000) 
>   .flatMap(item => (1 to 10) 
> .map(i => new Text(RandomStringUtils.randomAlphanumeric(3).toLowerCase) 
> -> new Text(RandomStringUtils.randomAlphanumeric(1024 
>   .saveAsSequenceFile("/tmp/random-strings", Some(classOf[GzipCodec])) 
> {code}
> # Run the sample job
> {code:scala}
> import org.apache.hadoop.io._
> import org.apache.spark._
> import org.apache.spark.storage._
> val rdd = sc.sequenceFile("/tmp/random-strings", classOf[Text], classOf[Text])
> rdd 
>   .map(item => item._1.toString -> item._2.toString) 
>   .repartitionAndSortWithinPartitions(new HashPartitioner(1000)) 
>   .coalesce(10,false) 
>   .count 
> {code}
> Note that the number of partitions is equal to the total number of cores 
> allocated to the job.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26114) Memory leak of PartitionedPairBuffer when coalescing after repartitionAndSortWithinPartitions

2018-11-19 Thread Sergey Zhemzhitsky (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Zhemzhitsky updated SPARK-26114:
---
Description: 
Trying to use _coalesce_ after shuffle-oriented transformations leads to 
OutOfMemoryErrors or _Container killed by YARN for exceeding memory limits. X 
GB of Y GB physical memory used. Consider 
boostingspark.yarn.executor.memoryOverhead_.
Discussion is 
[here|http://apache-spark-developers-list.1001551.n3.nabble.com/Coalesce-behaviour-td25289.html].

The error happens when trying specify pretty small number of partitions in 
_coalesce_ call.

*How to reproduce?*

# Start spark-shell
{code:bash}
spark-shell \ 
  --num-executors=5 \ 
  --executor-cores=2 \ 
  --master=yarn \
  --deploy-mode=client \ 
  --conf spark.executor.memory=1g \ 
  --conf spark.dynamicAllocation.enabled=false \
  --conf spark.executor.extraJavaOptions='-XX:+HeapDumpOnOutOfMemoryError 
-XX:HeapDumpPath=/tmp -Dio.netty.noUnsafe=true'
{code}
Please note using _-Dio.netty.noUnsafe=true_ property. Preventing off-heap 
memory usage seems to be the only way to control the amount of memory used for 
shuffle data transferring by now.
Also note that the total number of cores allocated for job is 5x2=10
# Then generate some test data
{code:scala}
import org.apache.hadoop.io._ 
import org.apache.hadoop.io.compress._ 
import org.apache.commons.lang._ 
import org.apache.spark._ 

// generate 100M records of sample data 
sc.makeRDD(1 to 1000, 1000) 
  .flatMap(item => (1 to 10) 
.map(i => new Text(RandomStringUtils.randomAlphanumeric(3).toLowerCase) -> 
new Text(RandomStringUtils.randomAlphanumeric(1024 
  .saveAsSequenceFile("/tmp/random-strings", Some(classOf[GzipCodec])) 
{code}
# Run the sample job
{code:scala}
import org.apache.hadoop.io._
import org.apache.spark._
import org.apache.spark.storage._

val rdd = sc.sequenceFile("/tmp/random-strings", classOf[Text], classOf[Text])
rdd 
  .map(item => item._1.toString -> item._2.toString) 
  .repartitionAndSortWithinPartitions(new HashPartitioner(1000)) 
  .coalesce(10,false) 
  .count 
{code}
Note that the number of partitions is equal to the total number of cores 
allocated to the job.


  was:
Trying to use _coalesce_ after shuffle-oriented transformations leads to 
OutOfMemoryErrors or _Container killed by YARN for exceeding memory limits. X 
GB of Y GB physical memory used. Consider 
boostingspark.yarn.executor.memoryOverhead_

The error happens when trying specify pretty small number of partitions in 
_coalesce_ call.

*How to reproduce?*

# Start spark-shell
{code:bash}
spark-shell \ 
  --num-executors=5 \ 
  --executor-cores=2 \ 
  --master=yarn \
  --deploy-mode=client \ 
  --conf spark.executor.memory=1g \ 
  --conf spark.dynamicAllocation.enabled=false \
  --conf spark.executor.extraJavaOptions='-XX:+HeapDumpOnOutOfMemoryError 
-XX:HeapDumpPath=/tmp -Dio.netty.noUnsafe=true'
{code}
Please note using _-Dio.netty.noUnsafe=true_ property. Preventing off-heap 
memory usage seems to be the only way to control the amount of memory used for 
shuffle data transferring by now.
Also note that the total number of cores allocated for job is 5x2=10
# Then generate some test data
{code:scala}
import org.apache.hadoop.io._ 
import org.apache.hadoop.io.compress._ 
import org.apache.commons.lang._ 
import org.apache.spark._ 

// generate 100M records of sample data 
sc.makeRDD(1 to 1000, 1000) 
  .flatMap(item => (1 to 10) 
.map(i => new Text(RandomStringUtils.randomAlphanumeric(3).toLowerCase) -> 
new Text(RandomStringUtils.randomAlphanumeric(1024 
  .saveAsSequenceFile("/tmp/random-strings", Some(classOf[GzipCodec])) 
{code}
# Run the sample job
{code:scala}
import org.apache.hadoop.io._
import org.apache.spark._
import org.apache.spark.storage._

val rdd = sc.sequenceFile("/tmp/random-strings", classOf[Text], classOf[Text])
rdd 
  .map(item => item._1.toString -> item._2.toString) 
  .repartitionAndSortWithinPartitions(new HashPartitioner(1000)) 
  .coalesce(10,false) 
  .count 
{code}
Note that the number of partitions is equal to the total number of cores 
allocated to the job.





> Memory leak of PartitionedPairBuffer when coalescing after 
> repartitionAndSortWithinPartitions
> -
>
> Key: SPARK-26114
> URL: https://issues.apache.org/jira/browse/SPARK-26114
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.2, 2.3.2, 2.4.0
> Environment: Spark 3.0.0-SNAPSHOT (master branch)
> Scala 2.11
> Yarn 2.7
>Reporter: Sergey Zhemzhitsky
>Priority: Major
> Attachments: run1-noparams-dominator-tree-externalsorter-gc-root.png, 
> run1-noparams-dominator-tree-externalsorter.png, 
> run1-noparams-dominator-tree.png
>
>
> Trying to use 

[jira] [Updated] (SPARK-26114) Memory leak of PartitionedPairBuffer when coalescing after repartitionAndSortWithinPartitions

2018-11-19 Thread Sergey Zhemzhitsky (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Zhemzhitsky updated SPARK-26114:
---
Description: 
Trying to use _coalesce_ after shuffle-oriented transformations leads to 
OutOfMemoryErrors or _Container killed by YARN for exceeding memory limits. X 
GB of Y GB physical memory used. Consider 
boostingspark.yarn.executor.memoryOverhead_.
Discussion is 
[here|http://apache-spark-developers-list.1001551.n3.nabble.com/Coalesce-behaviour-td25289.html].

The error happens when trying specify pretty small number of partitions in 
_coalesce_ call.

*How to reproduce?*

# Start spark-shell
{code:bash}
spark-shell \ 
  --num-executors=5 \ 
  --executor-cores=2 \ 
  --master=yarn \
  --deploy-mode=client \ 
  --conf spark.executor.memory=1g \ 
  --conf spark.dynamicAllocation.enabled=false \
  --conf spark.executor.extraJavaOptions='-XX:+HeapDumpOnOutOfMemoryError 
-XX:HeapDumpPath=/tmp -Dio.netty.noUnsafe=true'
{code}
Please note using _-Dio.netty.noUnsafe=true_ property. Preventing off-heap 
memory usage seems to be the only way to control the amount of memory used for 
shuffle data transferring by now.
Also note that the total number of cores allocated for job is 5x2=10
# Then generate some test data
{code:scala}
import org.apache.hadoop.io._ 
import org.apache.hadoop.io.compress._ 
import org.apache.commons.lang._ 
import org.apache.spark._ 

// generate 100M records of sample data 
sc.makeRDD(1 to 1000, 1000) 
  .flatMap(item => (1 to 10) 
.map(i => new Text(RandomStringUtils.randomAlphanumeric(3).toLowerCase) -> 
new Text(RandomStringUtils.randomAlphanumeric(1024 
  .saveAsSequenceFile("/tmp/random-strings", Some(classOf[GzipCodec])) 
{code}
# Run the sample job
{code:scala}
import org.apache.hadoop.io._
import org.apache.spark._
import org.apache.spark.storage._

val rdd = sc.sequenceFile("/tmp/random-strings", classOf[Text], classOf[Text])
rdd 
  .map(item => item._1.toString -> item._2.toString) 
  .repartitionAndSortWithinPartitions(new HashPartitioner(1000)) 
  .coalesce(10,false) 
  .count 
{code}
Note that the number of partitions is equal to the total number of cores 
allocated to the job.

Here is dominator tree from the heapdump
 !run1-noparams-dominator-tree.png|width=500!
 

  was:
Trying to use _coalesce_ after shuffle-oriented transformations leads to 
OutOfMemoryErrors or _Container killed by YARN for exceeding memory limits. X 
GB of Y GB physical memory used. Consider 
boostingspark.yarn.executor.memoryOverhead_.
Discussion is 
[here|http://apache-spark-developers-list.1001551.n3.nabble.com/Coalesce-behaviour-td25289.html].

The error happens when trying specify pretty small number of partitions in 
_coalesce_ call.

*How to reproduce?*

# Start spark-shell
{code:bash}
spark-shell \ 
  --num-executors=5 \ 
  --executor-cores=2 \ 
  --master=yarn \
  --deploy-mode=client \ 
  --conf spark.executor.memory=1g \ 
  --conf spark.dynamicAllocation.enabled=false \
  --conf spark.executor.extraJavaOptions='-XX:+HeapDumpOnOutOfMemoryError 
-XX:HeapDumpPath=/tmp -Dio.netty.noUnsafe=true'
{code}
Please note using _-Dio.netty.noUnsafe=true_ property. Preventing off-heap 
memory usage seems to be the only way to control the amount of memory used for 
shuffle data transferring by now.
Also note that the total number of cores allocated for job is 5x2=10
# Then generate some test data
{code:scala}
import org.apache.hadoop.io._ 
import org.apache.hadoop.io.compress._ 
import org.apache.commons.lang._ 
import org.apache.spark._ 

// generate 100M records of sample data 
sc.makeRDD(1 to 1000, 1000) 
  .flatMap(item => (1 to 10) 
.map(i => new Text(RandomStringUtils.randomAlphanumeric(3).toLowerCase) -> 
new Text(RandomStringUtils.randomAlphanumeric(1024 
  .saveAsSequenceFile("/tmp/random-strings", Some(classOf[GzipCodec])) 
{code}
# Run the sample job
{code:scala}
import org.apache.hadoop.io._
import org.apache.spark._
import org.apache.spark.storage._

val rdd = sc.sequenceFile("/tmp/random-strings", classOf[Text], classOf[Text])
rdd 
  .map(item => item._1.toString -> item._2.toString) 
  .repartitionAndSortWithinPartitions(new HashPartitioner(1000)) 
  .coalesce(10,false) 
  .count 
{code}
Note that the number of partitions is equal to the total number of cores 
allocated to the job.



> Memory leak of PartitionedPairBuffer when coalescing after 
> repartitionAndSortWithinPartitions
> -
>
> Key: SPARK-26114
> URL: https://issues.apache.org/jira/browse/SPARK-26114
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.2, 2.3.2, 2.4.0
> Environment: Spark 3.0.0-SNAPSHOT (master branch)
> Scala 2.11
> Yarn 2.7
>Reporter: Sergey Zhemzhitsky
>

[jira] [Updated] (SPARK-26114) Memory leak of PartitionedPairBuffer when coalescing after repartitionAndSortWithinPartitions

2018-11-19 Thread Sergey Zhemzhitsky (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Zhemzhitsky updated SPARK-26114:
---
Description: 
Trying to use _coalesce_ after shuffle-oriented transformations leads to 
OutOfMemoryErrors or _Container killed by YARN for exceeding memory limits. X 
GB of Y GB physical memory used. Consider 
boostingspark.yarn.executor.memoryOverhead_.
Discussion is 
[here|http://apache-spark-developers-list.1001551.n3.nabble.com/Coalesce-behaviour-td25289.html].

The error happens when trying specify pretty small number of partitions in 
_coalesce_ call.

*How to reproduce?*

# Start spark-shell
{code:bash}
spark-shell \ 
  --num-executors=5 \ 
  --executor-cores=2 \ 
  --master=yarn \
  --deploy-mode=client \ 
  --conf spark.executor.memory=1g \ 
  --conf spark.dynamicAllocation.enabled=false \
  --conf spark.executor.extraJavaOptions='-XX:+HeapDumpOnOutOfMemoryError 
-XX:HeapDumpPath=/tmp -Dio.netty.noUnsafe=true'
{code}
Please note using _-Dio.netty.noUnsafe=true_ property. Preventing off-heap 
memory usage seems to be the only way to control the amount of memory used for 
shuffle data transferring by now.
Also note that the total number of cores allocated for job is 5x2=10
# Then generate some test data
{code:scala}
import org.apache.hadoop.io._ 
import org.apache.hadoop.io.compress._ 
import org.apache.commons.lang._ 
import org.apache.spark._ 

// generate 100M records of sample data 
sc.makeRDD(1 to 1000, 1000) 
  .flatMap(item => (1 to 10) 
.map(i => new Text(RandomStringUtils.randomAlphanumeric(3).toLowerCase) -> 
new Text(RandomStringUtils.randomAlphanumeric(1024 
  .saveAsSequenceFile("/tmp/random-strings", Some(classOf[GzipCodec])) 
{code}
# Run the sample job
{code:scala}
import org.apache.hadoop.io._
import org.apache.spark._
import org.apache.spark.storage._

val rdd = sc.sequenceFile("/tmp/random-strings", classOf[Text], classOf[Text])
rdd 
  .map(item => item._1.toString -> item._2.toString) 
  .repartitionAndSortWithinPartitions(new HashPartitioner(1000)) 
  .coalesce(10,false) 
  .count 
{code}
Note that the number of partitions is equal to the total number of cores 
allocated to the job.

Here is dominator tree from the heapdump
 !run1-noparams-dominator-tree.png|width=700!

4 instances of ExternalSorter, although there are only 2 concurrently running 
tasks per executor.
 !run1-noparams-dominator-tree-externalsorter.png|width=700! 

And paths to GC root of the already stopped ExternalSorter.
 !run1-noparams-dominator-tree-externalsorter-gc-root.png|width=700! 

  was:
Trying to use _coalesce_ after shuffle-oriented transformations leads to 
OutOfMemoryErrors or _Container killed by YARN for exceeding memory limits. X 
GB of Y GB physical memory used. Consider 
boostingspark.yarn.executor.memoryOverhead_.
Discussion is 
[here|http://apache-spark-developers-list.1001551.n3.nabble.com/Coalesce-behaviour-td25289.html].

The error happens when trying specify pretty small number of partitions in 
_coalesce_ call.

*How to reproduce?*

# Start spark-shell
{code:bash}
spark-shell \ 
  --num-executors=5 \ 
  --executor-cores=2 \ 
  --master=yarn \
  --deploy-mode=client \ 
  --conf spark.executor.memory=1g \ 
  --conf spark.dynamicAllocation.enabled=false \
  --conf spark.executor.extraJavaOptions='-XX:+HeapDumpOnOutOfMemoryError 
-XX:HeapDumpPath=/tmp -Dio.netty.noUnsafe=true'
{code}
Please note using _-Dio.netty.noUnsafe=true_ property. Preventing off-heap 
memory usage seems to be the only way to control the amount of memory used for 
shuffle data transferring by now.
Also note that the total number of cores allocated for job is 5x2=10
# Then generate some test data
{code:scala}
import org.apache.hadoop.io._ 
import org.apache.hadoop.io.compress._ 
import org.apache.commons.lang._ 
import org.apache.spark._ 

// generate 100M records of sample data 
sc.makeRDD(1 to 1000, 1000) 
  .flatMap(item => (1 to 10) 
.map(i => new Text(RandomStringUtils.randomAlphanumeric(3).toLowerCase) -> 
new Text(RandomStringUtils.randomAlphanumeric(1024 
  .saveAsSequenceFile("/tmp/random-strings", Some(classOf[GzipCodec])) 
{code}
# Run the sample job
{code:scala}
import org.apache.hadoop.io._
import org.apache.spark._
import org.apache.spark.storage._

val rdd = sc.sequenceFile("/tmp/random-strings", classOf[Text], classOf[Text])
rdd 
  .map(item => item._1.toString -> item._2.toString) 
  .repartitionAndSortWithinPartitions(new HashPartitioner(1000)) 
  .coalesce(10,false) 
  .count 
{code}
Note that the number of partitions is equal to the total number of cores 
allocated to the job.

Here is dominator tree from the heapdump
 !run1-noparams-dominator-tree.png|width=500!
 


> Memory leak of PartitionedPairBuffer when coalescing after 
> repartitionAndSortWithinPartitions
> -
>
>

[jira] [Updated] (SPARK-26114) Memory leak of PartitionedPairBuffer when coalescing after repartitionAndSortWithinPartitions

2018-11-19 Thread Sergey Zhemzhitsky (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Zhemzhitsky updated SPARK-26114:
---
Description: 
Trying to use _coalesce_ after shuffle-oriented transformations leads to 
OutOfMemoryErrors or _Container killed by YARN for exceeding memory limits. X 
GB of Y GB physical memory used. Consider 
boostingspark.yarn.executor.memoryOverhead_.
Discussion is 
[here|http://apache-spark-developers-list.1001551.n3.nabble.com/Coalesce-behaviour-td25289.html].

The error happens when trying specify pretty small number of partitions in 
_coalesce_ call.

*How to reproduce?*

# Start spark-shell
{code:bash}
spark-shell \ 
  --num-executors=5 \ 
  --executor-cores=2 \ 
  --master=yarn \
  --deploy-mode=client \ 
  --conf spark.executor.memoryOverhead=512 \
  --conf spark.executor.memory=1g \ 
  --conf spark.dynamicAllocation.enabled=false \
  --conf spark.executor.extraJavaOptions='-XX:+HeapDumpOnOutOfMemoryError 
-XX:HeapDumpPath=/tmp -Dio.netty.noUnsafe=true'
{code}
Please note using _-Dio.netty.noUnsafe=true_ property. Preventing off-heap 
memory usage seems to be the only way to control the amount of memory used for 
shuffle data transferring by now.
Also note that the total number of cores allocated for job is 5x2=10
# Then generate some test data
{code:scala}
import org.apache.hadoop.io._ 
import org.apache.hadoop.io.compress._ 
import org.apache.commons.lang._ 
import org.apache.spark._ 

// generate 100M records of sample data 
sc.makeRDD(1 to 1000, 1000) 
  .flatMap(item => (1 to 10) 
.map(i => new Text(RandomStringUtils.randomAlphanumeric(3).toLowerCase) -> 
new Text(RandomStringUtils.randomAlphanumeric(1024 
  .saveAsSequenceFile("/tmp/random-strings", Some(classOf[GzipCodec])) 
{code}
# Run the sample job
{code:scala}
import org.apache.hadoop.io._
import org.apache.spark._
import org.apache.spark.storage._

val rdd = sc.sequenceFile("/tmp/random-strings", classOf[Text], classOf[Text])
rdd 
  .map(item => item._1.toString -> item._2.toString) 
  .repartitionAndSortWithinPartitions(new HashPartitioner(1000)) 
  .coalesce(10,false) 
  .count 
{code}
Note that the number of partitions is equal to the total number of cores 
allocated to the job.

Here is dominator tree from the heapdump
 !run1-noparams-dominator-tree.png|width=700!

4 instances of ExternalSorter, although there are only 2 concurrently running 
tasks per executor.
 !run1-noparams-dominator-tree-externalsorter.png|width=700! 

And paths to GC root of the already stopped ExternalSorter.
 !run1-noparams-dominator-tree-externalsorter-gc-root.png|width=700! 

  was:
Trying to use _coalesce_ after shuffle-oriented transformations leads to 
OutOfMemoryErrors or _Container killed by YARN for exceeding memory limits. X 
GB of Y GB physical memory used. Consider 
boostingspark.yarn.executor.memoryOverhead_.
Discussion is 
[here|http://apache-spark-developers-list.1001551.n3.nabble.com/Coalesce-behaviour-td25289.html].

The error happens when trying specify pretty small number of partitions in 
_coalesce_ call.

*How to reproduce?*

# Start spark-shell
{code:bash}
spark-shell \ 
  --num-executors=5 \ 
  --executor-cores=2 \ 
  --master=yarn \
  --deploy-mode=client \ 
  --conf spark.executor.memory=1g \ 
  --conf spark.dynamicAllocation.enabled=false \
  --conf spark.executor.extraJavaOptions='-XX:+HeapDumpOnOutOfMemoryError 
-XX:HeapDumpPath=/tmp -Dio.netty.noUnsafe=true'
{code}
Please note using _-Dio.netty.noUnsafe=true_ property. Preventing off-heap 
memory usage seems to be the only way to control the amount of memory used for 
shuffle data transferring by now.
Also note that the total number of cores allocated for job is 5x2=10
# Then generate some test data
{code:scala}
import org.apache.hadoop.io._ 
import org.apache.hadoop.io.compress._ 
import org.apache.commons.lang._ 
import org.apache.spark._ 

// generate 100M records of sample data 
sc.makeRDD(1 to 1000, 1000) 
  .flatMap(item => (1 to 10) 
.map(i => new Text(RandomStringUtils.randomAlphanumeric(3).toLowerCase) -> 
new Text(RandomStringUtils.randomAlphanumeric(1024 
  .saveAsSequenceFile("/tmp/random-strings", Some(classOf[GzipCodec])) 
{code}
# Run the sample job
{code:scala}
import org.apache.hadoop.io._
import org.apache.spark._
import org.apache.spark.storage._

val rdd = sc.sequenceFile("/tmp/random-strings", classOf[Text], classOf[Text])
rdd 
  .map(item => item._1.toString -> item._2.toString) 
  .repartitionAndSortWithinPartitions(new HashPartitioner(1000)) 
  .coalesce(10,false) 
  .count 
{code}
Note that the number of partitions is equal to the total number of cores 
allocated to the job.

Here is dominator tree from the heapdump
 !run1-noparams-dominator-tree.png|width=700!

4 instances of ExternalSorter, although there are only 2 concurrently running 
tasks per executor.
 !run1-noparams-dominator-tree-externalsorter.png|width=700! 

[jira] [Created] (SPARK-27641) Unregistering a single Metrics Source with no metrics leads to removing all the from other sources with the same name

2019-05-06 Thread Sergey Zhemzhitsky (JIRA)
Sergey Zhemzhitsky created SPARK-27641:
--

 Summary: Unregistering a single Metrics Source with no metrics 
leads to removing all the from other sources with the same name
 Key: SPARK-27641
 URL: https://issues.apache.org/jira/browse/SPARK-27641
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.4.2, 2.3.3, 2.2.3
Reporter: Sergey Zhemzhitsky


Currently Spark allows registering multiple Metric Sources with the same source 
name like the following

{code:scala}
val acc1 = sc.longAccumulator
LongAccumulatorSource.register(sc, {"acc1" -> acc1})

val acc2 = sc.longAccumulator
LongAccumulatorSource.register(sc, {"acc2" -> acc2})
{code}

In that case there are two metric sources registered and both of these sources 
have the same name - 
[AccumulatorSource|https://github.com/apache/spark/blob/6ef45301a46c47c12fbc74bb9ceaffea685ed944/core/src/main/scala/org/apache/spark/metrics/source/AccumulatorSource.scala#L47]

If you try to unregister the source with no accumulators and metrics registered 
like the following
{code:scala}
SparkEnv.get.metricsSystem.removeSource(new LongAccumulatorSource)
{code}
... then all the metrics for all the sources with the same name will be 
unregistered because of the 
[following|https://github.com/apache/spark/blob/6ef45301a46c47c12fbc74bb9ceaffea685ed944/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala#L171]
 snippet which removes all matching records which start with the corresponding 
prefix which includes the source name, but does not include metric name to be 
removed.
{code:scala}
def removeSource(source: Source) {
  sources -= source
  val regName = buildRegistryName(source)
  registry.removeMatching((name: String, _: Metric) => name.startsWith(regName))
}
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27641) Unregistering a single Metrics Source with no metrics leads to removing all the metrics from other sources with the same name

2019-05-06 Thread Sergey Zhemzhitsky (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Zhemzhitsky updated SPARK-27641:
---
Summary: Unregistering a single Metrics Source with no metrics leads to 
removing all the metrics from other sources with the same name  (was: 
Unregistering a single Metrics Source with no metrics leads to removing all the 
from other sources with the same name)

> Unregistering a single Metrics Source with no metrics leads to removing all 
> the metrics from other sources with the same name
> -
>
> Key: SPARK-27641
> URL: https://issues.apache.org/jira/browse/SPARK-27641
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.3, 2.3.3, 2.4.2
>Reporter: Sergey Zhemzhitsky
>Priority: Major
>
> Currently Spark allows registering multiple Metric Sources with the same 
> source name like the following
> {code:scala}
> val acc1 = sc.longAccumulator
> LongAccumulatorSource.register(sc, {"acc1" -> acc1})
> val acc2 = sc.longAccumulator
> LongAccumulatorSource.register(sc, {"acc2" -> acc2})
> {code}
> In that case there are two metric sources registered and both of these 
> sources have the same name - 
> [AccumulatorSource|https://github.com/apache/spark/blob/6ef45301a46c47c12fbc74bb9ceaffea685ed944/core/src/main/scala/org/apache/spark/metrics/source/AccumulatorSource.scala#L47]
> If you try to unregister the source with no accumulators and metrics 
> registered like the following
> {code:scala}
> SparkEnv.get.metricsSystem.removeSource(new LongAccumulatorSource)
> {code}
> ... then all the metrics for all the sources with the same name will be 
> unregistered because of the 
> [following|https://github.com/apache/spark/blob/6ef45301a46c47c12fbc74bb9ceaffea685ed944/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala#L171]
>  snippet which removes all matching records which start with the 
> corresponding prefix which includes the source name, but does not include 
> metric name to be removed.
> {code:scala}
> def removeSource(source: Source) {
>   sources -= source
>   val regName = buildRegistryName(source)
>   registry.removeMatching((name: String, _: Metric) => 
> name.startsWith(regName))
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org