[GitHub] spark pull request #23083: [SPARK-26114][CORE] ExternalSorter's readingItera...

2018-11-25 Thread szhem
Github user szhem commented on a diff in the pull request:

https://github.com/apache/spark/pull/23083#discussion_r236083618
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
@@ -727,9 +727,10 @@ private[spark] class ExternalSorter[K, V, C](
 spills.clear()
 forceSpillFiles.foreach(s => s.file.delete())
 forceSpillFiles.clear()
-if (map != null || buffer != null) {
+if (map != null || buffer != null || readingIterator != null) {
   map = null // So that the memory can be garbage-collected
   buffer = null // So that the memory can be garbage-collected
+  readingIterator = null // So that the memory can be garbage-collected
--- End diff --

I've added [test case for 
CompletionIterator](https://github.com/apache/spark/pull/23083/files#diff-444ed6b5e5333c3359cecec7d082396dR50).

Regarding `ExternalSorter` - taking into account that only the private api 
has been changed and there no similar test cases which verify that private 
`map` and `buffer` fields are set to `null` after sorter stops, don't you think 
that already existing tests cover the situation with `readingIterator` too?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23083: [SPARK-26114][CORE] ExternalSorter's readingIterator fie...

2018-11-24 Thread szhem
Github user szhem commented on the issue:

https://github.com/apache/spark/pull/23083
  
Hi @cloud-fan 

> Looking at the code, we are trying to fix 2 memory leaks: the task 
completion listener in ShuffleBlockFetcherIterator, and the CompletionIterator. 
If that's case, can you say that in the PR description?

I've updated the description and the title of this PR correspondingly.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23083: [SPARK-26114][CORE] ExternalSorter Leak

2018-11-24 Thread szhem
Github user szhem commented on a diff in the pull request:

https://github.com/apache/spark/pull/23083#discussion_r236057101
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
@@ -727,9 +727,10 @@ private[spark] class ExternalSorter[K, V, C](
 spills.clear()
 forceSpillFiles.foreach(s => s.file.delete())
 forceSpillFiles.clear()
-if (map != null || buffer != null) {
+if (map != null || buffer != null || readingIterator != null) {
   map = null // So that the memory can be garbage-collected
   buffer = null // So that the memory can be garbage-collected
+  readingIterator = null // So that the memory can be garbage-collected
--- End diff --

@advancedxy I've tried to remove all the modifications except for this one 
and got OutOfMemoryErrors once again. Here are the details:

1. Now there are 4 `ExternalSorter` remained 
2 of them are not closed ones ...

![1_readingiterator_isnull_nonclosed_externalsorter](https://user-images.githubusercontent.com/1523889/48973288-2218d180-f04d-11e8-9329-27b3edf33c48.png)
and 2 of them are closed ones ...

![2_readingiterator_isnull_closed_externalsorter](https://user-images.githubusercontent.com/1523889/48973295-483e7180-f04d-11e8-83cf-23361515363f.png)
as expected
2. There are 2 `SpillableIterator`s (which consume a significant part of 
memory) of already closed `ExternalSorter`s remained

![4_readingiterator_isnull_spillableiterator_of_closed_externalsorter](https://user-images.githubusercontent.com/1523889/48973318-cf8be500-f04d-11e8-912f-74be7420ca95.png)
3. These `SpillableIterator`s are referenced by `CompletionIterator`s ...

![6_completioniterator_of_blockstoreshufflereader](https://user-images.githubusercontent.com/1523889/48973357-a6b81f80-f04e-11e8-810f-dc8941430f34.png)
... which in their order seem to be referenced by the `cur` field ...

![7_coalescedrdd_compute_flatmap](https://user-images.githubusercontent.com/1523889/48973491-7e7df000-f051-11e8-8864-7e9e7f3f994b.png)
... of the standard `Iterator`'s `flatMap` that is used in the `compute` 
method of `CoalescedRDD`

![image](https://user-images.githubusercontent.com/1523889/48973401-7fae1d80-f04f-11e8-8cf2-043c808173d9.png)

Standard `Iterator`'s `flatMap` does not clean up its `cur` field before 
obtaining the next value for it which in its order will consume quite a lot of 
memory too 

![image](https://user-images.githubusercontent.com/1523889/48973418-dfa4c400-f04f-11e8-8f0e-b464567d43de.png)
.. and in case of Spark that means that the previous iterator consuming the 
memory will live there while fetching the next value for it

![8_coalescedrdd_compute_flatmap_cur_isnotassigned](https://user-images.githubusercontent.com/1523889/48974089-0000-f05f-11e8-8319-f7d1f778f381.png)

So I've returned the changes made to the `CompletionIterator` to reassign 
the reference of its sub-iterator to the `empty` iterator ...

![image](https://user-images.githubusercontent.com/1523889/48973472-27781b00-f051-11e8-86e1-cd6b87cd114b.png)

... and that has helped. 

P.S. I believe that cleaning up the standard `flatMap`'s iterator `cur` 
field before calling `nextCur` could help too
```scala
  def flatMap[B](f: A => GenTraversableOnce[B]): Iterator[B] = new 
AbstractIterator[B] {
private var cur: Iterator[B] = empty
private def nextCur() { cur = f(self.next()).toIterator }
def hasNext: Boolean = {
  // Equivalent to cur.hasNext || self.hasNext && { nextCur(); hasNext }
  // but slightly shorter bytecode (better JVM inlining!)
  while (!cur.hasNext) {
cur = empty
if (!self.hasNext) return false
nextCur()
  }
  true
}
def next(): B = (if (hasNext) cur else empty).next()
  }
```




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23083: [SPARK-26114][CORE] ExternalSorter Leak

2018-11-22 Thread szhem
Github user szhem commented on a diff in the pull request:

https://github.com/apache/spark/pull/23083#discussion_r235769204
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala ---
@@ -103,11 +116,26 @@ private[spark] class BlockStoreShuffleReader[K, C](
 
context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled)
 context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled)
 
context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes)
+val taskListener = new TaskCompletionListener {
+  override def onTaskCompletion(context: TaskContext): Unit = 
sorter.stop()
+}
 // Use completion callback to stop sorter if task was 
finished/cancelled.
-context.addTaskCompletionListener[Unit](_ => {
-  sorter.stop()
-})
-CompletionIterator[Product2[K, C], Iterator[Product2[K, 
C]]](sorter.iterator, sorter.stop())
+context.addTaskCompletionListener(taskListener)
+CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]](
+  sorter.iterator,
+  {
+sorter.stop()
+// remove task completion listener as soon as the sorter stops 
to prevent holding
+// its references till the end of the task which may lead to 
memory leaks, for
+// example, in case of processing multiple 
ShuffledRDDPartitions by a single task
+// like in case of CoalescedRDD occurred after the ShuffledRDD 
in the same stage
+// (e.g. rdd.repartition(1000).coalesce(10));
+// note that holding sorter references till the end of the 
task also holds
+// references to PartitionedAppendOnlyMap and 
PartitionedPairBuffer too and these
+// ones may consume a significant part of the available memory
+context.remoteTaskCompletionListener(taskListener)
--- End diff --

Great question! Honestly speaking I don't have pretty good solution right 
now.
TaskCompletionListener stops sorter in case of task failures, cancels, 
etc., i.e. in case of abnormal termination. In "happy path" case task 
completion listener is not needed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23083: [SPARK-26114][CORE] ExternalSorter Leak

2018-11-22 Thread szhem
Github user szhem commented on a diff in the pull request:

https://github.com/apache/spark/pull/23083#discussion_r235759169
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala 
---
@@ -72,7 +73,8 @@ final class ShuffleBlockFetcherIterator(
 maxBlocksInFlightPerAddress: Int,
 maxReqSizeShuffleToMem: Long,
 detectCorrupt: Boolean)
-  extends Iterator[(BlockId, InputStream)] with DownloadFileManager with 
Logging {
+  extends Iterator[(BlockId, InputStream)] with DownloadFileManager with 
TaskCompletionListener
--- End diff --

Introduced [the corresponding 
field](https://github.com/apache/spark/pull/23083/files#diff-27109eb30a77542d377c936e0d134420R156).
```scala
  /**
   * Task completion callback to be called in both success as well as 
failure cases to cleanup.
   * It may not be called at all in case the `cleanup` method has already 
been called before
   * task completion.
   */
  private[this] val cleanupTaskCompletionListener = (_: TaskContext) => 
cleanup()
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23083: [SPARK-26114][CORE] ExternalSorter Leak

2018-11-22 Thread szhem
Github user szhem commented on a diff in the pull request:

https://github.com/apache/spark/pull/23083#discussion_r235757779
  
--- Diff: core/src/main/scala/org/apache/spark/TaskContextImpl.scala ---
@@ -99,6 +99,13 @@ private[spark] class TaskContextImpl(
 this
   }
 
+  override def remoteTaskCompletionListener(listener: 
TaskCompletionListener)
+  : this.type = synchronized {
+onCompleteCallbacks -= listener
--- End diff --

Replaced ArrayBuffer with LinkedHashSet. Thank you!

Another interesting question is why these collections are traversed in the 
reverse order in 
[invokeLisneters](https://github.com/apache/spark/pull/23083/files#diff-df60223d4208aff4c67335528a55154cR135)
 like the following
```scala
  private def invokeListeners[T](
  listeners: Seq[T],
  name: String,
  error: Option[Throwable])(
  callback: T => Unit): Unit = {
val errorMsgs = new ArrayBuffer[String](2)
// Process callbacks in the reverse order of registration
listeners.reverse.foreach { listener =>
  ...
}
}
```

 I believe @hvanhovell could help to understand. @hvanhovell Could you 
please remind why task completion and error listeners are traversed in the 
reverse order (you seem to the the one who added the corresponding line)?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23083: [SPARK-26114][CORE] ExternalSorter Leak

2018-11-21 Thread szhem
Github user szhem commented on the issue:

https://github.com/apache/spark/pull/23083
  
> So do you mean CoGroupRDDs with multiple input sources will have similar 
problems?

Yep, but a little bit different ones

> If so, can you create another Jira?

Will do it shortly.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23083: [SPARK-26114][CORE] ExternalSorter Leak

2018-11-21 Thread szhem
Github user szhem commented on a diff in the pull request:

https://github.com/apache/spark/pull/23083#discussion_r235319165
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala 
---
@@ -72,7 +73,8 @@ final class ShuffleBlockFetcherIterator(
 maxBlocksInFlightPerAddress: Int,
 maxReqSizeShuffleToMem: Long,
 detectCorrupt: Boolean)
-  extends Iterator[(BlockId, InputStream)] with DownloadFileManager with 
Logging {
+  extends Iterator[(BlockId, InputStream)] with DownloadFileManager with 
TaskCompletionListener
--- End diff --

The main reason is that TaskCompletionListener is added in one place (in 
`initialize` method) and needs to be removed in another one (in `cleanup` 
method).

![image](https://user-images.githubusercontent.com/1523889/48833554-abe64780-ed8c-11e8-9da0-ef826918a275.png)
Will introduce a field for TaskCompletionListener instead. Thank you!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23083: [SPARK-26114][CORE] ExternalSorter Leak

2018-11-21 Thread szhem
Github user szhem commented on a diff in the pull request:

https://github.com/apache/spark/pull/23083#discussion_r235314949
  
--- Diff: core/src/main/scala/org/apache/spark/TaskContextImpl.scala ---
@@ -99,6 +99,13 @@ private[spark] class TaskContextImpl(
 this
   }
 
+  override def remoteTaskCompletionListener(listener: 
TaskCompletionListener)
+  : this.type = synchronized {
+onCompleteCallbacks -= listener
--- End diff --

Should we do the same thing (i.e. chaning ArrayBuffer to LinkedHashSet) for 
onFailureCallbacks too?
```scala
  /** List of callback functions to execute when the task completes. */
  @transient private val onCompleteCallbacks = new 
ArrayBuffer[TaskCompletionListener]

  /** List of callback functions to execute when the task fails. */
  @transient private val onFailureCallbacks = new 
ArrayBuffer[TaskFailureListener]
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23083: [SPARK-26114][CORE] ExternalSorter Leak

2018-11-21 Thread szhem
Github user szhem commented on a diff in the pull request:

https://github.com/apache/spark/pull/23083#discussion_r235308451
  
--- Diff: core/src/main/scala/org/apache/spark/TaskContext.scala ---
@@ -127,9 +127,21 @@ abstract class TaskContext extends Serializable {
 // Note that due to this scala bug: 
https://github.com/scala/bug/issues/11016, we need to make
 // this function polymorphic for every scala version >= 2.12, 
otherwise an overloaded method
 // resolution error occurs at compile time.
-addTaskCompletionListener(new TaskCompletionListener {
-  override def onTaskCompletion(context: TaskContext): Unit = 
f(context)
-})
+addTaskCompletionListener(TaskCompletionListenerWrapper(f))
+  }
+
+  /**
+   * Removes a (Java friendly) listener that is no longer needed to be 
executed on task completion.
+   */
+  def remoteTaskCompletionListener(listener: TaskCompletionListener): 
TaskContext
--- End diff --

Yep, seems that `v` was replaced with `t` on my keyboard)
Thanks a lot!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23083: [SPARK-26114][CORE] ExternalSorter Leak

2018-11-20 Thread szhem
Github user szhem commented on the issue:

https://github.com/apache/spark/pull/23083
  
Hi @davies, @advancedxy, @rxin,
You seem to be the last ones who touched the corresponding parts of the 
files in this PR.
Could you be so kind to take a look at it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23083: [SPARK-26114][CORE] ExternalSorter Leak

2018-11-19 Thread szhem
GitHub user szhem opened a pull request:

https://github.com/apache/spark/pull/23083

[SPARK-26114][CORE] ExternalSorter Leak

## What changes were proposed in this pull request?

This pull request fixes 
[SPARK-26114](https://issues.apache.org/jira/browse/SPARK-26114) issue that 
occurs when trying to reduce the number of partitions by means of coalesce 
without shuffling after shuffle-based transformations.

For the following data
```scala
import org.apache.hadoop.io._ 
import org.apache.hadoop.io.compress._ 
import org.apache.commons.lang._ 
import org.apache.spark._ 

// generate 100M records of sample data 
sc.makeRDD(1 to 1000, 1000) 
  .flatMap(item => (1 to 10) 
.map(i => new Text(RandomStringUtils.randomAlphanumeric(3).toLowerCase) 
-> new Text(RandomStringUtils.randomAlphanumeric(1024 
  .saveAsSequenceFile("/tmp/random-strings", Some(classOf[GzipCodec])) 
```

and the following job
```scala
import org.apache.hadoop.io._
import org.apache.spark._
import org.apache.spark.storage._

val rdd = sc.sequenceFile("/tmp/random-strings", classOf[Text], 
classOf[Text])
rdd 
  .map(item => item._1.toString -> item._2.toString) 
  .repartitionAndSortWithinPartitions(new HashPartitioner(1000)) 
  .coalesce(10,false) 
  .count 
```

... executed like the following
```bash
spark-shell \ 
  --num-executors=5 \ 
  --executor-cores=2 \ 
  --master=yarn \
  --deploy-mode=client \ 
  --conf spark.executor.memory=1g \ 
  --conf spark.dynamicAllocation.enabled=false \
  --conf spark.executor.extraJavaOptions='-XX:+HeapDumpOnOutOfMemoryError 
-XX:HeapDumpPath=/tmp -Dio.netty.noUnsafe=true'
```

... executors are always failing with OutOfMemoryErrors.

The main issue is multiple leaks of ExternalSorter references.
For example, in case of 2 tasks per executor it is expected to be 2 
simultaneous instances of ExternalSorter per executor but heap dump generated 
on OutOfMemoryError shows that there are more ones.


![run1-noparams-dominator-tree-externalsorter](https://user-images.githubusercontent.com/1523889/48703665-782ce580-ec05-11e8-95a9-d6c94e8285ab.png)


P.S. This PR does not cover cases with CoGroupedRDDs which use 
ExternalAppendOnlyMap internally, which itself can lead to OutOfMemoryErrors in 
many places. 

## How was this patch tested?

- Existing unit tests
- New unit tests
- Job executions on the live environment

Here is the screenshot before applying this patch

![run3-noparams-failure-ui-5x2-repartition-and-sort](https://user-images.githubusercontent.com/1523889/48700395-f769eb80-ebfc-11e8-831b-e94c757d416c.png)

Here is the screenshot after applying this patch

![run3-noparams-success-ui-5x2-repartition-and-sort](https://user-images.githubusercontent.com/1523889/48700610-7a8b4180-ebfd-11e8-9761-baaf38a58e66.png)
And in case of reducing the number of executors even more the job is still 
stable

![run3-noparams-success-ui-2x2-repartition-and-sort](https://user-images.githubusercontent.com/1523889/48700619-82e37c80-ebfd-11e8-98ed-a38e1f1f1fd9.png)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/szhem/spark SPARK-26114-externalsorter-leak

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/23083.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #23083


commit 9ca5ddbeb2022259ec79e60b3e81332466947d90
Author: Sergey Zhemzhitsky 
Date:   2018-11-05T22:31:59Z

Allow memory consumers and spillables to be optionally unregistered and not 
trackable on freeing up the memory

commit bf57de2c9d23c3cbac23817a3d0b6158739ae8aa
Author: Sergey Zhemzhitsky 
Date:   2018-11-05T22:34:15Z

allow to remove already registered task completion listeners if they dont 
need to be fired at task completion

commit e3531acf2751d4ba63e7fa353b66c70d534271df
Author: Sergey Zhemzhitsky 
Date:   2018-11-05T22:35:31Z

clean up readingIterator on stop

commit baa9656e9d9fa2d006ed6fb98257a213bcace588
Author: Sergey Zhemzhitsky 
Date:   2018-11-05T22:37:07Z

prevent capturing and holding sub-iterators after completion

commit 3cc54522545f43e0ee9ff9443ba3ea67e5fb9d5b
Author: Sergey Zhemzhitsky 
Date:   2018-11-05T22:40:13Z

cleaning up resourses as soon as they no longer needed, dont waiting till 
the end of the task

commit d36323e7db3d8dc5ec01a2ae46752342ff01fac5
Author: Sergey Zhemzhitsky 
Date:   2018-11-18T01:01:15Z

adding some unit tests

commit 12075ec265f0d09cd52865bb91155898b9ede523
Author: Sergey Zhemzhitsky 
Date:   2018

[GitHub] spark issue #19410: [SPARK-22184][CORE][GRAPHX] GraphX fails in case of insu...

2018-09-30 Thread szhem
Github user szhem commented on the issue:

https://github.com/apache/spark/pull/19410
  
Hello @mallman, @sujithjay, @felixcheung, @jkbradley, @mengxr, it's already 
about a year passed since this pull request has been opened.
I'm just wondering whether there is any chance to get any feedback for this 
PR (understanding that all of you have a little or probably no time having your 
own more important activities) and get it either rejected or merged?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19373: [SPARK-22150][CORE] PeriodicCheckpointer fails in case o...

2018-09-30 Thread szhem
Github user szhem commented on the issue:

https://github.com/apache/spark/pull/19373
  
Hello @sujithjay, @felixcheung, @jkbradley, @mengxr, it's already more than 
a year passed since this pull request has been opened. 
I'm just wondering whether there is any chance for this PR to be reviewed 
(understanding that all of you have a little or probably no time having your 
own more important activities) by someone and either rejected or merged.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19410: [SPARK-22184][CORE][GRAPHX] GraphX fails in case of insu...

2018-09-03 Thread szhem
Github user szhem commented on the issue:

https://github.com/apache/spark/pull/19410
  
I've tested the mentioned checkpointers with 
`spark.cleaner.referenceTracking.cleanCheckpoints` set to `true` and without 
explicit checkpoint files removal.

It seems that there are somewhere hard references remained - old checkpoint 
files are not deleted at all and it seems that ContextCleaner.doCleanCheckpoint 
is never called.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19410: [SPARK-22184][CORE][GRAPHX] GraphX fails in case of insu...

2018-09-01 Thread szhem
Github user szhem commented on the issue:

https://github.com/apache/spark/pull/19410
  
Hi @asolimando,

I believe that the solution with weak references will work and probably 
with `ContextCleaner` too, but there are some points I'd like to discuss if you 
don't mind

- Let's take `ContextCleaner` first. In that case we should have a pretty 
simple solution, but the backward compatibility of `PeriodicRDDCheckpointer` 
and `PeriodicGraphCheckpointer` will be lost, because 
  - it will be necessary to update these classes to prevent deleting 
checkpoint files 
  - user will always have to provide 
`spark.cleaner.referenceTracking.cleanCheckpoints` property in order to clean 
unnecessary checkpoints. 
  - the users who haven't specified 
`spark.cleaner.referenceTracking.cleanCheckpoints` previously (and I believe 
there will be most of them) will be affected by this new unexpected behaviour 

- In case of custom solution based on weak references 
  - it will be necessary to poll a reference queue at some place and moment 
to remove unnecessary checkpoint files. 
  - polling the reference queue in the separate thread will complicate the 
code 
  - polling the reference queue synchronously does not guarantee deleting 
all the unnecessary checkpoint files.

In case of reference queue, could you please recommend the convenient place 
in the source code to do it? 

As for me 
- setting `spark.cleaner.referenceTracking.cleanCheckpoints` to `true` by 
default should work 
- setting `spark.cleaner.referenceTracking.cleanCheckpoints` to `true` by 
default will allow us to simplify `PeriodicRDDCheckpointer` and 
`PeriodicGraphCheckpointer` too by deleting unnecessary code that cleans up 
checkpoint files
- setting `spark.cleaner.referenceTracking.cleanCheckpoints` to `true` by 
default sounds reasonable and should not break the code of those users who 
didn't do it previously
- but setting `spark.cleaner.referenceTracking.cleanCheckpoints` to `true` 
will probably break the backward compatibility (although this PR tries to 
preserve it)

What do you think? Will the community accept setting 
`spark.cleaner.referenceTracking.cleanCheckpoints` to `true`by default?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19410: [SPARK-22184][CORE][GRAPHX] GraphX fails in case of insu...

2018-07-09 Thread szhem
Github user szhem commented on the issue:

https://github.com/apache/spark/pull/19410
  
Hi @mallman,

I believe, that `ContextCleaner` currently does not delete checkpoint data 
it case of unexpected failures.
Also as it works at the end of the job then there is still a chance that a 
job processing quite a big graph with a lot of iterations can influence other 
running jobs by consuming a lot of disk during its run.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19410: [SPARK-22184][CORE][GRAPHX] GraphX fails in case of insu...

2018-07-03 Thread szhem
Github user szhem commented on the issue:

https://github.com/apache/spark/pull/19410
  
@mallman 

Just my two cents regarding built-in solutions:

Periodic checkpointer deletes checkpoint files not to pollute the hard 
drive. Although disk storage is cheap it's not free. 

For example, in my case (graph with >1B vertices and about the same amount 
of edges) checkpoint directory with a single checkpoint took about 150-200GB. 
Checkpoint interval was set to 5, and then job was able to complete in 
about 100 iterations.
So in case of not cleaning up unnecessary checkpoints, the checkpoint 
directory could grow up to 6TB (which is quite a lot) in my case.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19410: [SPARK-22184][CORE][GRAPHX] GraphX fails in case of insu...

2018-06-27 Thread szhem
Github user szhem commented on the issue:

https://github.com/apache/spark/pull/19410
  
Hi @mallman!

In case of 
```
StorageLevel.MEMORY_AND_DISK
StorageLevel.MEMORY_AND_DISK_SER_2
```
... tests pass.

They still fail in case of 
```
StorageLevel.MEMORY_ONLY
StorageLevel.MEMORY_ONLY_SER
```
Although it works, I'm not sure that changing the caching level of the 
graph is really a good option to go with as Spark starts complaining 
[here](https://github.com/apache/spark/blob/f830bb9170f6b853565d9dd30ca7418b93a54fe3/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala#L111)
 and 
[here](https://github.com/apache/spark/blob/f830bb9170f6b853565d9dd30ca7418b93a54fe3/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala#L131)
 
```
18/06/27 16:08:46.802 Executor task launch worker for task 3 WARN 
ShippableVertexPartitionOps: Diffing two VertexPartitions with different 
indexes is slow.
18/06/27 16:08:47.000 Executor task launch worker for task 4 WARN 
ShippableVertexPartitionOps: Diffing two VertexPartitions with different 
indexes is slow.
18/06/27 16:08:47.164 Executor task launch worker for task 5 WARN 
ShippableVertexPartitionOps: Diffing two VertexPartitions with different 
indexes is slow.
18/06/27 16:08:48.724 Executor task launch worker for task 18 WARN 
ShippableVertexPartitionOps: Joining two VertexPartitions with different 
indexes is slow.
18/06/27 16:08:48.749 Executor task launch worker for task 18 WARN 
ShippableVertexPartitionOps: Diffing two VertexPartitions with different 
indexes is slow.
18/06/27 16:08:48.868 Executor task launch worker for task 19 WARN 
ShippableVertexPartitionOps: Joining two VertexPartitions with different 
indexes is slow.
18/06/27 16:08:48.899 Executor task launch worker for task 19 WARN 
ShippableVertexPartitionOps: Diffing two VertexPartitions with different 
indexes is slow.
18/06/27 16:08:49.008 Executor task launch worker for task 20 WARN 
ShippableVertexPartitionOps: Joining two VertexPartitions with different 
indexes is slow.
18/06/27 16:08:49.028 Executor task launch worker for task 20 WARN 
ShippableVertexPartitionOps: Diffing two VertexPartitions with different 
indexes is slow.
```


P.S. To emulate the lack of memory I just set the following options like 
[here](https://github.com/apache/spark/pull/19410/files?utf8=%E2%9C%93=unified#diff-c2823ca69af75fc6cdfd1ebbf25c2aefR85)
 to emulate lack of memory resources.
```
spark.testing.reservedMemory
spark.testing.memory
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19410: [SPARK-22184][CORE][GRAPHX] GraphX fails in case of insu...

2018-06-25 Thread szhem
Github user szhem commented on the issue:

https://github.com/apache/spark/pull/19410
  
Just a kind remainder...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19373: [SPARK-22150][CORE] PeriodicCheckpointer fails in case o...

2018-06-25 Thread szhem
Github user szhem commented on the issue:

https://github.com/apache/spark/pull/19373
  
Just a kind remainder...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19373: [SPARK-22150][CORE] PeriodicCheckpointer fails in case o...

2018-04-02 Thread szhem
Github user szhem commented on the issue:

https://github.com/apache/spark/pull/19373
  
> so the fix might be to change to call checkpoint() to checkpoint(eager: 
true) - this ensures by the time checkpoint call is returned the checkpointing 
is completed.

Even if checkpoint is completed, `PeriodicRDDCheckpointer` removes files of 
the checkpointed and materialized RDDs later on, so it may happen that another 
RDD depends on the already removed files.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19410: [SPARK-22184][CORE][GRAPHX] GraphX fails in case of insu...

2018-03-29 Thread szhem
Github user szhem commented on the issue:

https://github.com/apache/spark/pull/19410
  
Hello @viirya, @mallman, @felixcheung,

You were reviewing graph checkpointing, introduced here #15125, and this PR 
changes the behaviour a little bit.

Could you please review this PR too if possible?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19373: [SPARK-22150][CORE] PeriodicCheckpointer fails in case o...

2018-03-29 Thread szhem
Github user szhem commented on the issue:

https://github.com/apache/spark/pull/19373
  
@felixcheung, 
Unfortunately, RDDs, `PeriodicRDDCheckpointer` is based on, do not have 
`checkpoint(eager: true)` yet. 
It's a functionality of DataSets.

I've experimented with the similar method for RDDs ...

```scala
def checkpoint(eager: Boolean): RDD[T] = {
  checkpoint()
  if (eager) {
count()
  }
  this
}
```

... and it does not work for `PeriodicRDDCheckpointer` in some scenarios.
Please, consider the following example

```scala
val checkpointInterval = 2

val checkpointer = new PeriodicRDDCheckpointer[(Int, 
Int)](checkpointInterval, sc)
val rdd1 = sc.makeRDD((0 until 10).map(i => i -> i))

// rdd1 is not materialized yet, checkpointer(update=1, 
checkpointInterval=2)
checkpointer.update(rdd1)
// rdd2 depends on rdd1
val rdd2 = rdd1.filter(_ => true)

// rdd1 is materialized, checkpointer(update=2, checkpointInterval=2)
checkpointer.update(rdd1)
// rdd3 depends on rdd1
val rdd3 = rdd1.filter(_ => true)

// rdd3 is not materialized yet, checkpointer(update=3, 
checkpointInterval=2)
checkpointer.update(rdd3)
// rdd3 is materialized, rdd1's files are removed, checkpointer(update=4, 
checkpointInterval=2)
checkpointer.update(rdd3)

// fails with FileNotFoundException because
// rdd1's files were removed on the previous step and
// rdd2 depends on rdd1
rdd2.count()
```
It fails with `FileNotFoundException` even in case of `eager` 
checkpointing, and passes in case of preserving parent checkpointed RDDs like 
it's done in this PR.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19373: [SPARK-22150][CORE] PeriodicCheckpointer fails in case o...

2018-03-27 Thread szhem
Github user szhem commented on the issue:

https://github.com/apache/spark/pull/19373
  
BTW, how do you think guys, may be it would be better to merge changes from 
#19410 into this one? 
The #19410 is almost about the same issue and fixes the described behaviour 
for GraphX.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19373: [SPARK-22150][CORE] PeriodicCheckpointer fails in...

2018-03-27 Thread szhem
Github user szhem commented on a diff in the pull request:

https://github.com/apache/spark/pull/19373#discussion_r177484476
  
--- Diff: 
core/src/main/scala/org/apache/spark/rdd/util/PeriodicRDDCheckpointer.scala ---
@@ -73,8 +76,6 @@ import org.apache.spark.util.PeriodicCheckpointer
  *
  * @param checkpointInterval  RDDs will be checkpointed at this interval
  * @tparam T  RDD element type
- *
- * TODO: Move this out of MLlib?
  */
 private[spark] class PeriodicRDDCheckpointer[T](
--- End diff --

@sujithjay, thanks a lot for noticing! 
Just updated the docs a little bit to clarify the new behaviour.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19373: [SPARK-22150][CORE] PeriodicCheckpointer fails in case o...

2018-03-27 Thread szhem
Github user szhem commented on the issue:

https://github.com/apache/spark/pull/19373
  
@felixcheung

> It is deleting earlier checkpoint after the current checkpoint is called 
though?

Currently PeriodicCheckpointer can fail in case of checkpointing RDDs which 
depend on each other like in the sample below.
```
// create a periodic checkpointer with interval of 2
val checkpointer = new PeriodicRDDCheckpointer[Double](2, sc)
val rdd1 = createRDD(sc)

// rdd2 depends on rdd1
val rdd2 = rdd1.filter(_ => true)
checkpointer.update(rdd2)
// on the second update rdd2 is checkpointed and checkpoint files of rdd1 
are deleted
checkpointer.update(rdd2)
// on action it's necessary to read already removed checkpoint files of rdd1
rdd2.count()
```
It's about deleting files of the already checkpointed and materialized RDD 
in case of another RDD depends on it.

If RDDs are cached before checkpointing (like it is often recommended) then 
this issue is likely to be not visible, because the checkpointed RDD will be 
read from cache and not from the materiazed files. 

The good example of such a behaviour is described in this PR - #19410, 
where GraphX fails with `FileNotFoundException` in case of insufficient memory 
resources when cached blocks of checkpointed and materialized RDDs are evicted 
from memory, causing them to be read from already deleted files.

> is this just an issue with DataSet.checkpoint(eager = true)?

This PR does not include modifications to DataSet API and affects mainly 
`PeriodicCheckpointer` and `PeriodicRDDCheckpointer`. 
It was created as a preliminary PR to this one - #19410 (where GraphX fails 
in case of reading cached RDDs already evicted from memory).



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19410: [SPARK-22184][CORE][GRAPHX] GraphX fails in case of insu...

2017-10-16 Thread szhem
Github user szhem commented on the issue:

https://github.com/apache/spark/pull/19410
  
I would happy if anyone can take a look at this PR.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19373: [SPARK-22150][CORE] PeriodicCheckpointer fails in case o...

2017-10-16 Thread szhem
Github user szhem commented on the issue:

https://github.com/apache/spark/pull/19373
  
I would happy if anyone can take a look at this PR.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19294: [SPARK-21549][CORE] Respect OutputFormats with no...

2017-10-06 Thread szhem
Github user szhem commented on a diff in the pull request:

https://github.com/apache/spark/pull/19294#discussion_r143306215
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ---
@@ -57,6 +60,15 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: 
String)
*/
   private def absPathStagingDir: Path = new Path(path, "_temporary-" + 
jobId)
 
+  /**
+   * Checks whether there are files to be committed to an absolute output 
location.
+   *
+   * As the committing and aborting the job occurs on driver where 
`addedAbsPathFiles` is always
+   * null, it is necessary to check whether the output path is specified, 
that may not be the case
+   * for committers not writing to distributed file systems.
--- End diff --

Thanks a lot, guys! I've just updated the comment 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19294: [SPARK-21549][CORE] Respect OutputFormats with no...

2017-10-06 Thread szhem
Github user szhem commented on a diff in the pull request:

https://github.com/apache/spark/pull/19294#discussion_r143305853
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ---
@@ -57,6 +60,15 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: 
String)
*/
   private def absPathStagingDir: Path = new Path(path, "_temporary-" + 
jobId)
 
+  /**
+   * Checks whether there are files to be committed to an absolute output 
location.
+   *
+   * As the committing and aborting the job occurs on driver where 
`addedAbsPathFiles` is always
--- End diff --

done


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19294: [SPARK-21549][CORE] Respect OutputFormats with no output...

2017-10-06 Thread szhem
Github user szhem commented on the issue:

https://github.com/apache/spark/pull/19294
  
@mridulm sql-related tests were removed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19294: [SPARK-21549][CORE] Respect OutputFormats with no...

2017-10-03 Thread szhem
Github user szhem commented on a diff in the pull request:

https://github.com/apache/spark/pull/19294#discussion_r142319273
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ---
@@ -57,6 +57,15 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: 
String)
*/
   private def absPathStagingDir: Path = new Path(path, "_temporary-" + 
jobId)
 
+  /**
+   * Checks whether there are files to be committed to an absolute output 
location.
+   *
+   * As the committing and aborting the job occurs on driver where 
`addedAbsPathFiles` is always
+   * null, it is necessary to check whether the output path is specified, 
that may not be the case
+   * for committers not writing to distributed file systems.
+   */
+  private def hasAbsPathFiles: Boolean = path != null
--- End diff --

done


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19294: [SPARK-21549][CORE] Respect OutputFormats with no output...

2017-10-02 Thread szhem
Github user szhem commented on the issue:

https://github.com/apache/spark/pull/19294
  
@gatorsmile I believe that in Spark SQL code path `path` cannot be null, 
because in that case `FileFormatWriter` [fails even 
before](https://github.com/apache/spark/blob/3f958a99921d149fb9fdf7ba7e78957afdad1405/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L118)
 `setupJob` ([which in its order calls 
setupCommitter](https://github.com/apache/spark/blob/e47f48c737052564e92903de16ff16707fae32c3/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L124))
 on the committer is called.

The interesting part is that the [FileOutputCommitter allows null output 
paths](https://github.com/apache/hadoop/blob/5af572b6443715b7a741296c1bd520a1840f9a7c/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java#L96)
 and the line you highlighted is executed only in that case.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19410: [SPARK-22184][CORE][GRAPHX] GraphX fails in case ...

2017-10-02 Thread szhem
GitHub user szhem opened a pull request:

https://github.com/apache/spark/pull/19410

[SPARK-22184][CORE][GRAPHX] GraphX fails in case of insufficient memory and 
checkpoints enabled

## What changes were proposed in this pull request?

Fix for [SPARK-22184](https://issues.apache.org/jira/browse/SPARK-22184) 
JIRA issue (and also includes the related #19373).

In case of GraphX jobs, when checkpoints are enabled, GraphX can fail with 
`FileNotFoundException`.

The failure can happen during Pregel iterations or when Pregel completes 
only in cases of insufficient memory when checkpointed RDDs are evicted from 
memory and have to be read from disk (but already removed from there).

This PR proposes to preserve all the checkpoints the last one (checkpoint) 
of `messages` and `graph` depends on during the iterations, and all the 
checkpoints of `messages` and `graph` the resulting `graph` depends at the end 
of Pregel iterations.

## How was this patch tested?

Unit tests

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/szhem/spark 
SPARK-22184-graphx-early-checkpoints-removal

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19410.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19410


commit f2386b61a47abf19b8ca6cea7e0e5c7da9baf7d6
Author: Sergey Zhemzhitsky <szhemzhit...@gmail.com>
Date:   2017-09-27T21:33:18Z

[SPARK-22150][CORE] preventing too early removal of checkpoints in case of 
dependant RDDs

commit aa2bedae74999694b0a9992986e85d3f9feab5f6
Author: Sergey Zhemzhitsky <szhemzhit...@gmail.com>
Date:   2017-10-02T13:10:48Z

[SPARK-22150][CORE] checking whether two checkpoints have the same 
checkpointed RDD as their parent to prevent early removal

commit 6406aea3bc87c1f3a9460bbc2ae1af67d7c0c294
Author: Sergey Zhemzhitsky <szhemzhit...@gmail.com>
Date:   2017-10-02T13:22:19Z

[SPARK-22150][CORE] respecting scala style settings

commit 4a55cda79e61e7eec67ae9545beb0c38eca7b11b
Author: Sergey Zhemzhitsky <szhemzhit...@gmail.com>
Date:   2017-10-02T14:43:27Z

[SPARK-22184][CORE][GRAPHX] retain all the checkpoints the last one depends 
on




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19294: [SPARK-21549][CORE] Respect OutputFormats with no output...

2017-09-28 Thread szhem
Github user szhem commented on the issue:

https://github.com/apache/spark/pull/19294
  
@mridulm Regarding FileFormatWriter I've implemented some basic tests which 
show that

1. [FileFormatWriter 
fails](https://github.com/apache/spark/blob/3f958a99921d149fb9fdf7ba7e78957afdad1405/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L118)
 even before setupJob on the committer is called [if the path is 
null](https://github.com/apache/spark/pull/19294/files#diff-bc98a3d91cf4f95f4f473146400044aaR40)

   FileOutputFormat.setOutputPath(job, new Path(outputSpec.outputPath))

2. [FileFormatWriter 
succeeds](https://github.com/apache/spark/pull/19294/files#diff-bc98a3d91cf4f95f4f473146400044aaR70)
 in case of default partitioning [when customPath is not 
defined](https://github.com/apache/spark/blob/3f958a99921d149fb9fdf7ba7e78957afdad1405/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L501)
 (the second branch of the `if` statement)

val currentPath = if (customPath.isDefined) {
  committer.newTaskTempFileAbsPath(taskAttemptContext, 
customPath.get, ext)
} else {
  committer.newTaskTempFile(taskAttemptContext, partDir, ext)
}

3. [FileFormatWriter 
succeeds](https://github.com/apache/spark/pull/19294/files#diff-bc98a3d91cf4f95f4f473146400044aaR107)
 in case of custom partitioning [when customPath is 
defined](https://github.com/apache/spark/blob/3f958a99921d149fb9fdf7ba7e78957afdad1405/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L499)
 (the first branch of the `if` statement)

val currentPath = if (customPath.isDefined) {
  committer.newTaskTempFileAbsPath(taskAttemptContext, 
customPath.get, ext)
} else {
  committer.newTaskTempFile(taskAttemptContext, partDir, ext)
}

Is there anything else I can help with to be sure nothing else was affected?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19373: [SPARK-22150][CORE] PeriodicCheckpointer fails in...

2017-09-27 Thread szhem
GitHub user szhem opened a pull request:

https://github.com/apache/spark/pull/19373

[SPARK-22150][CORE] PeriodicCheckpointer fails in case of dependant RDDs

## What changes were proposed in this pull request?

Fix for [SPARK-22150](https://issues.apache.org/jira/browse/SPARK-22150) 
JIRA issue.

In case of checkpointing RDDs which depend on previously checkpointed RDDs 
(for example in iterative algorithms) PeriodicCheckpointer removes already 
checkpointed materialized RDDs too early leading to FileNotFoundExceptions.

Consider the following snippet

// create a periodic checkpointer with interval of 2
val checkpointer = new PeriodicRDDCheckpointer[Double](2, sc)

val rdd1 = createRDD(sc)
checkpointer.update(rdd1)
// on the second update rdd1 is checkpointed
checkpointer.update(rdd1)
// on action checkpointed rdd is materialized and its lineage is 
truncated
rdd1.count() 

// rdd2 depends on rdd1
val rdd2 = rdd1.filter(_ => true)
checkpointer.update(rdd2)
// on the second update rdd2 is checkpointed and checkpoint files of 
rdd1 are deleted
checkpointer.update(rdd2)
// on action it's necessary to read already removed checkpoint files of 
rdd1
rdd2.count()

## How was this patch tested?

Unit tests

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/szhem/spark SPARK-22150-early-checkpoints

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19373.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19373


commit 0c3338cd645f5824f08fe37fd7174e25c416529b
Author: Sergey Zhemzhitsky <szhemzhit...@gmail.com>
Date:   2017-09-27T21:33:18Z

[SPARK-22150][CORE] preventing too early removal of checkpoints in case of 
dependant RDDs




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19294: [SPARK-21549][CORE] Respect OutputFormats with no output...

2017-09-26 Thread szhem
Github user szhem commented on the issue:

https://github.com/apache/spark/pull/19294
  
Hello guys, are there a change for this patch to be merged to master?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19294: [SPARK-21549][CORE] Respect OutputFormats with no output...

2017-09-24 Thread szhem
Github user szhem commented on the issue:

https://github.com/apache/spark/pull/19294
  
@mridulm Updated `FileFormatWriterSuite` [to 
cover](https://github.com/apache/spark/pull/19294/files#diff-bc98a3d91cf4f95f4f473146400044aa)
 both branches of the [committer 
calling](https://github.com/apache/spark/blob/3f958a99921d149fb9fdf7ba7e78957afdad1405/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L498)
 - for `newTaskTempFile` as well as for `newTaskTempFileAbsPath`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19294: [SPARK-21549][CORE] Respect OutputFormats with no...

2017-09-24 Thread szhem
Github user szhem commented on a diff in the pull request:

https://github.com/apache/spark/pull/19294#discussion_r140654204
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ---
@@ -57,6 +57,11 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: 
String)
*/
   private def absPathStagingDir: Path = new Path(path, "_temporary-" + 
jobId)
 
+  /**
+   * Checks whether there are files to be committed to an absolute output 
location.
+   */
+  private def hasAbsPathFiles: Boolean = addedAbsPathFiles != null && 
addedAbsPathFiles.nonEmpty
--- End diff --

Good catch, thank you!
According to the `FileCommitProtocol`, `addedAbsPathFiles` is always null 
on driver, so  we will not be able to commit or remove these files.

Replaced it with

 private def hasAbsPathFiles: Boolean = path != null



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19294: [SPARK-21549][CORE] Respect OutputFormats with no...

2017-09-24 Thread szhem
Github user szhem commented on a diff in the pull request:

https://github.com/apache/spark/pull/19294#discussion_r140652214
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ---
@@ -130,17 +135,21 @@ class HadoopMapReduceCommitProtocol(jobId: String, 
path: String)
 val filesToMove = taskCommits.map(_.obj.asInstanceOf[Map[String, 
String]])
   .foldLeft(Map[String, String]())(_ ++ _)
 logDebug(s"Committing files staged for absolute locations 
$filesToMove")
-val fs = absPathStagingDir.getFileSystem(jobContext.getConfiguration)
-for ((src, dst) <- filesToMove) {
-  fs.rename(new Path(src), new Path(dst))
+if (hasAbsPathFiles) {
+  val fs = absPathStagingDir.getFileSystem(jobContext.getConfiguration)
+  for ((src, dst) <- filesToMove) {
+fs.rename(new Path(src), new Path(dst))
+  }
+  fs.delete(absPathStagingDir, true)
 }
-fs.delete(absPathStagingDir, true)
--- End diff --

Wouldn't it be better to fix it in separate PR?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19294: [SPARK-21549][CORE] Respect OutputFormats with no output...

2017-09-23 Thread szhem
Github user szhem commented on the issue:

https://github.com/apache/spark/pull/19294
  
@mridulm 
> incorporating a test for the sql part will also help in this matter.

What should be the expected behaviour in case of sql? 
I'm asking because [the sql part seems to fail even before setupJob the on 
committer is 
called](https://github.com/apache/spark/blob/3f958a99921d149fb9fdf7ba7e78957afdad1405/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L118).

FileOutputFormat.setOutputPath(job, new Path(outputSpec.outputPath))


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19294: [SPARK-21549][CORE] Respect OutputFormats with no...

2017-09-21 Thread szhem
Github user szhem commented on a diff in the pull request:

https://github.com/apache/spark/pull/19294#discussion_r140165731
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ---
@@ -130,17 +130,21 @@ class HadoopMapReduceCommitProtocol(jobId: String, 
path: String)
 val filesToMove = taskCommits.map(_.obj.asInstanceOf[Map[String, 
String]])
   .foldLeft(Map[String, String]())(_ ++ _)
 logDebug(s"Committing files staged for absolute locations 
$filesToMove")
-val fs = absPathStagingDir.getFileSystem(jobContext.getConfiguration)
-for ((src, dst) <- filesToMove) {
-  fs.rename(new Path(src), new Path(dst))
+if (addedAbsPathFiles != null && addedAbsPathFiles.nonEmpty) {
--- End diff --

Introduced method

/**
 * Checks whether there are files to be committed to an absolute output 
location.
 */
private def hasAbsPathFiles: Boolean = addedAbsPathFiles != null && 
addedAbsPathFiles.nonEmpty


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19294: [SPARK-21549][CORE] Respect OutputFormats with no...

2017-09-20 Thread szhem
Github user szhem commented on a diff in the pull request:

https://github.com/apache/spark/pull/19294#discussion_r140076614
  
--- Diff: 
core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala ---
@@ -568,6 +568,51 @@ class PairRDDFunctionsSuite extends SparkFunSuite with 
SharedSparkContext {
 assert(FakeWriterWithCallback.exception.getMessage contains "failed to 
write")
   }
 
+  test("saveAsNewAPIHadoopDataset should use current working directory " +
+"for files to be committed to an absolute output location when empty 
output path specified") {
+val pairs = sc.parallelize(Array((new Integer(1), new Integer(2))), 1)
+
+val job = NewJob.getInstance(new Configuration(sc.hadoopConfiguration))
+job.setOutputKeyClass(classOf[Integer])
+job.setOutputValueClass(classOf[Integer])
+job.setOutputFormatClass(classOf[NewFakeFormat])
+val jobConfiguration = job.getConfiguration
+
+val fs = FileSystem.get(jobConfiguration)
+fs.setWorkingDirectory(new 
Path(getClass.getResource(".").toExternalForm))
+try {
+  // just test that the job does not fail with
+  // java.lang.IllegalArgumentException: Can not create a Path from a 
null string
+  pairs.saveAsNewAPIHadoopDataset(jobConfiguration)
+} finally {
+  // close to prevent filesystem caching across different tests
+  fs.close()
+}
+  }
+
+  test("saveAsHadoopDataset should use current working directory " +
+"for files to be committed to an absolute output location when empty 
output path specified") {
+val pairs = sc.parallelize(Array((new Integer(1), new Integer(2))), 1)
+
+val conf = new JobConf()
+conf.setOutputKeyClass(classOf[Integer])
+conf.setOutputValueClass(classOf[Integer])
+conf.setOutputFormat(classOf[FakeOutputFormat])
+conf.setOutputCommitter(classOf[FakeOutputCommitter])
+
+val fs = FileSystem.get(conf)
+fs.setWorkingDirectory(new 
Path(getClass.getResource(".").toExternalForm))
+try {
+  FakeOutputCommitter.ran = false
+  pairs.saveAsHadoopDataset(conf)
+} finally {
+  // close to prevent filesystem caching across different tests
+  fs.close()
--- End diff --

 I've updated PR not to use filesystem at all.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19294: [SPARK-21549][CORE] Respect OutputFormats with no...

2017-09-20 Thread szhem
Github user szhem commented on a diff in the pull request:

https://github.com/apache/spark/pull/19294#discussion_r140076564
  
--- Diff: 
core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala ---
@@ -568,6 +568,51 @@ class PairRDDFunctionsSuite extends SparkFunSuite with 
SharedSparkContext {
 assert(FakeWriterWithCallback.exception.getMessage contains "failed to 
write")
   }
 
+  test("saveAsNewAPIHadoopDataset should use current working directory " +
+"for files to be committed to an absolute output location when empty 
output path specified") {
+val pairs = sc.parallelize(Array((new Integer(1), new Integer(2))), 1)
+
+val job = NewJob.getInstance(new Configuration(sc.hadoopConfiguration))
+job.setOutputKeyClass(classOf[Integer])
+job.setOutputValueClass(classOf[Integer])
+job.setOutputFormatClass(classOf[NewFakeFormat])
+val jobConfiguration = job.getConfiguration
+
+val fs = FileSystem.get(jobConfiguration)
+fs.setWorkingDirectory(new 
Path(getClass.getResource(".").toExternalForm))
+try {
+  // just test that the job does not fail with
+  // java.lang.IllegalArgumentException: Can not create a Path from a 
null string
+  pairs.saveAsNewAPIHadoopDataset(jobConfiguration)
+} finally {
+  // close to prevent filesystem caching across different tests
+  fs.close()
--- End diff --

I was counting on indirect filesystem caching, so that it was exactly the 
same both in tests as well as in `SparkHadoopWriter` and calling to 
`newInstance` prevents us from such a possibility. Currently I've updated PR 
not to use filesystem at all. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19294: [SPARK-21549][CORE] Respect OutputFormats with no...

2017-09-20 Thread szhem
GitHub user szhem opened a pull request:

https://github.com/apache/spark/pull/19294

[SPARK-21549][CORE] Respect OutputFormats with no output directory provided

## What changes were proposed in this pull request?

Fix for https://issues.apache.org/jira/browse/SPARK-21549 JIRA issue.

Since version 2.2 Spark does not respect OutputFormat with no output paths 
provided.
The examples of such formats are [Cassandra 
OutputFormat](https://github.com/finn-no/cassandra-hadoop/blob/08dfa3a7ac727bb87269f27a1c82ece54e3f67e6/src/main/java/org/apache/cassandra/hadoop2/AbstractColumnFamilyOutputFormat.java),
 [Aerospike 
OutputFormat](https://github.com/aerospike/aerospike-hadoop/blob/master/mapreduce/src/main/java/com/aerospike/hadoop/mapreduce/AerospikeOutputFormat.java),
 etc. which do not have an ability to rollback the results written to an 
external systems on job failure. 

Provided output directory is required by Spark to allows files to committed 
to an absolute output location, that is not the case for output formats which 
write data to external systems.

This pull request proposes to use Filysystem's working directory, that is 
usually user's home directory in case of distributed file systems, if no output 
directory is provided by means of job configuration.

## How was this patch tested?

Unit tests


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/szhem/spark SPARK-21549-abs-output-commits

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19294.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19294


commit b99344845a73b33d4ec319b6484c3104306c34ee
Author: Sergey Zhemzhitsky <szhemzhit...@gmail.com>
Date:   2017-09-20T13:07:20Z

[SPARK-21549][CORE] Respect empty output paths for files to be committed to 
an absolute output location in case of custom output formats

commit 5c1474ab78f46a73236d971a23d9b112d8613405
Author: Sergey Zhemzhitsky <szhemzhit...@gmail.com>
Date:   2017-09-20T13:13:58Z

[SPARK-21549][CORE] Respect empty output paths for files to be committed to 
an absolute location - reformatting imports




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17740: [SPARK-SPARK-20404][CORE] Using Option(name) inst...

2017-04-24 Thread szhem
GitHub user szhem opened a pull request:

https://github.com/apache/spark/pull/17740

[SPARK-SPARK-20404][CORE] Using Option(name) instead of Some(name)

Using Option(name) instead of Some(name) to prevent runtime failures when 
using accumulators created like the following
```
sparkContext.accumulator(0, null)
```


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/szhem/spark SPARK-20404-null-acc-names

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17740.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17740


commit 9c6e4d685cebe034d12c085ae9f97f5187cec36b
Author: Sergey Zhemzhitsky <szhemzhit...@gmail.com>
Date:   2017-04-24T08:58:12Z

[SPARK-SPARK-20404][CORE] Using Option(name) instead of Some(name) when 
creating accumulators to prevent failures at runtime when using null names




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org