[GitHub] spark issue #22995: [SPARK-25998] [CORE] Change TorrentBroadcast to hold wea...

2018-11-16 Thread mridulm
Github user mridulm commented on the issue:

https://github.com/apache/spark/pull/22995
  
ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22609: [SPARK-25594] [Core] Avoid maintaining task information ...

2018-10-02 Thread mridulm
Github user mridulm commented on the issue:

https://github.com/apache/spark/pull/22609
  
I will close the PR and document in our tests to use spark.ui.retainedTasks 
= 1 (or some very low value) : which will, in effect, be equivalent to this PR 
- and loose information anyway; but will be specific to the application which 
choose to do it (and which does not care about the api).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22609: [SPARK-25594] [Core] Avoid maintaining task infor...

2018-10-02 Thread mridulm
Github user mridulm closed the pull request at:

https://github.com/apache/spark/pull/22609


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22609: [SPARK-25594] [Core] Avoid maintaining task information ...

2018-10-02 Thread mridulm
Github user mridulm commented on the issue:

https://github.com/apache/spark/pull/22609
  


> I tried that a lot while writing this code, but the disk store is just 
too slow

That is unfortunate ... I was actually hoping that would be a good 
compromise if expanded api needs to be preserved.

>  if UI is disabled, any task related metrics were reported as zero:

I was incorrect in this regard - they were not zero, but 
JobProgressListener had stage/job counters which maintained this.
This was limited to job and stage level though - keeping the actual 
overhead fairly minimal and honoring 
spark.ui.retainedStages/spark.ui.retainedJobs.


Given this, current PR would be a functionality regression - since we end 
up loosing information we would otherwise expose.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22609: [SPARK-25594] [Core] Avoid maintaining task information ...

2018-10-02 Thread mridulm
Github user mridulm commented on the issue:

https://github.com/apache/spark/pull/22609
  
@vanzin any reason why liveStore is hardcoded to be in-memory ? Any 
implications of making it disk backed ? That might be another option to unblock 
- requires a config change to existing applications; but way better than OOM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22609: [SPARK-25594] [Core] Avoid maintaining task information ...

2018-10-02 Thread mridulm
Github user mridulm commented on the issue:

https://github.com/apache/spark/pull/22609
  
@vanzin , @gengliangwang To clarify earlier behavior - if UI is disabled, 
any task related metrics were reported as zero: granularity of update was at 
the stage level.
UI is typically disabled when application does not want additional 
overheads associated with it (users leverage history server to see the 
application state) : particularly in multi-tenant settings (like thrift server) 
or very large applications (processing PB's of data).
If UI is enabled (default) or if application is not `live` (history 
server), task events would be processed.

Unfortunately, application + config, which used to work before 2.3 are now 
throwing OOM (typically, the long running tests are run for 7+ days to find any 
kerberos related issues - here a LR stress test failed within 28 hours) - and 
large part of the heap was occupied by TaskDataWrapper; making this a 
regression.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22609: [SPARK-25594] [Core] Avoid maintaining task information ...

2018-10-02 Thread mridulm
Github user mridulm commented on the issue:

https://github.com/apache/spark/pull/22609
  
This is a regression introduced in 2.3; unfortunately we did not notice it
until now.


On Tue, Oct 2, 2018 at 4:56 AM Shahid  wrote:

> Hi @mridulm <https://github.com/mridulm> , Can't we limit the task
> information by setting 'spark.ui.retainedTasks' lesser, to avoid OOM?
> correct me if I am wrong.
>
> —
> You are receiving this because you were mentioned.
>
>
> Reply to this email directly, view it on GitHub
> <https://github.com/apache/spark/pull/22609#issuecomment-426245438>, or 
mute
> the thread
> 
<https://github.com/notifications/unsubscribe-auth/ABhJlCqwUw8g_7t-xnAHhBxFXc2jrcO_ks5ug1RfgaJpZM4XDhaB>
> .
>



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22609: [SPARK-25594] [Core] Avoid maintaining task information ...

2018-10-02 Thread mridulm
Github user mridulm commented on the issue:

https://github.com/apache/spark/pull/22609
  
ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22609: [SPARK-25594] [Core] Avoid maintaining task information ...

2018-10-02 Thread mridulm
Github user mridulm commented on the issue:

https://github.com/apache/spark/pull/22609
  
+CC @vanzin, @tgravescs 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22609: [SPARK-25594] [Core] Avoid maintaining task infor...

2018-10-02 Thread mridulm
GitHub user mridulm opened a pull request:

https://github.com/apache/spark/pull/22609

[SPARK-25594] [Core] Avoid maintaining task information when UI is disabled

## What changes were proposed in this pull request?

Avoid maintaining task information in live spark application when UI is 
disabled.
For long running application, with large number of tasks, this ended up 
causing OOM in our tests.

## How was this patch tested?

Long running test successfully ran for 34 hours with steady memory, when it 
used to fail in 28 hours with OOM earlier.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mridulm/spark master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22609.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22609


commit c2f6b4a50ef3693292f600a2d4d7743ea870b96e
Author: Mridul Muralidharan 
Date:   2018-10-02T08:03:41Z

SPARK-25594: Avoid maintaining task information when UI is disabled




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...

2018-09-26 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22010#discussion_r22067
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -396,7 +397,20 @@ abstract class RDD[T: ClassTag](
* Return a new RDD containing the distinct elements in this RDD.
*/
   def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): 
RDD[T] = withScope {
-map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
+def removeDuplicatesInPartition(partition: Iterator[T]): Iterator[T] = 
{
+  // Create an instance of external append only map which ignores 
values.
+  val map = new ExternalAppendOnlyMap[T, Null, Null](
+createCombiner = value => null,
+mergeValue = (a, b) => a,
+mergeCombiners = (a, b) => a)
--- End diff --

scratch that - does not matter


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...

2018-09-26 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22010#discussion_r220672896
  
--- Diff: core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala ---
@@ -95,6 +95,18 @@ class RDDSuite extends SparkFunSuite with 
SharedSparkContext {
 assert(!deserial.toString().isEmpty())
   }
 
+  test("distinct with known partitioner preserves partitioning") {
+val rdd = sc.parallelize(1.to(100), 10).map(x => (x % 10, x % 
10)).sortByKey()
+val initialPartitioner = rdd.partitioner
+val distinctRdd = rdd.distinct()
+val resultingPartitioner = distinctRdd.partitioner
+assert(initialPartitioner === resultingPartitioner)
+val distinctRddDifferent = rdd.distinct(5)
+val distinctRddDifferentPartitioner = distinctRddDifferent.partitioner
+assert(initialPartitioner != distinctRddDifferentPartitioner)
+assert(distinctRdd.collect().sorted === 
distinctRddDifferent.collect().sorted)
--- End diff --

We could also check if the number of stages is what we expect.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...

2018-09-26 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22010#discussion_r220672579
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -396,7 +397,20 @@ abstract class RDD[T: ClassTag](
* Return a new RDD containing the distinct elements in this RDD.
*/
   def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): 
RDD[T] = withScope {
-map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
+def removeDuplicatesInPartition(partition: Iterator[T]): Iterator[T] = 
{
+  // Create an instance of external append only map which ignores 
values.
+  val map = new ExternalAppendOnlyMap[T, Null, Null](
+createCombiner = value => null,
+mergeValue = (a, b) => a,
+mergeCombiners = (a, b) => a)
--- End diff --

nit: clean them ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...

2018-09-19 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22010#discussion_r218946996
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala 
---
@@ -35,16 +35,24 @@ import org.apache.spark.{Partition, TaskContext}
  * @param isOrderSensitive whether or not the function is order-sensitive. 
If it's order
  * sensitive, it may return totally different 
result when the input order
  * is changed. Mostly stateful functions are 
order-sensitive.
+ * @param knownPartitioner If the result has a known partitioner.
  */
 private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
 var prev: RDD[T],
 f: (TaskContext, Int, Iterator[T]) => Iterator[U],  // (TaskContext, 
partition index, iterator)
 preservesPartitioning: Boolean = false,
 isFromBarrier: Boolean = false,
-isOrderSensitive: Boolean = false)
+isOrderSensitive: Boolean = false,
+knownPartitioner: Option[Partitioner] = None)
   extends RDD[U](prev) {
 
-  override val partitioner = if (preservesPartitioning) 
firstParent[T].partitioner else None
+  override val partitioner = {
+if (preservesPartitioning) {
+  firstParent[T].partitioner
+} else {
+  knownPartitioner
+}
+  }
--- End diff --

yes


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...

2018-09-19 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22010#discussion_r218946895
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -396,7 +396,26 @@ abstract class RDD[T: ClassTag](
* Return a new RDD containing the distinct elements in this RDD.
*/
   def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): 
RDD[T] = withScope {
-map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
+partitioner match {
+  case Some(p) if numPartitions == partitions.length =>
+def key(x: T): (T, Null) = (x, null)
+val cleanKey = sc.clean(key _)
+val keyed = new MapPartitionsRDD[(T, Null), T](
+  this,
+  (context, pid, iter) => iter.map(cleanKey),
+  knownPartitioner = Some(new WrappedPartitioner(p)))
+val duplicatesRemoved = keyed.reduceByKey((x, y) => x)
--- End diff --

Yes, would use right partitioner in this case


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22433: [SPARK-25442][SQL][K8S] Support STS to run in k8s deploy...

2018-09-16 Thread mridulm
Github user mridulm commented on the issue:

https://github.com/apache/spark/pull/22433
  
> As this script is common start point for all the resource 
managers(k8s/yarn/mesos/standalone/local), i guess changing this to fit for all 
the cases has a value add, instead of doing at each resource manager level. 
Thoughts?

Please note that I am specifically referring only to the need for changing 
application `name`.
The rationale given that `name` should be DNS compliant is a restriction 
specific to k8s and not spark.
Instead of doing one off rename's the right approach would be to handle 
this name translation such that it will benefit not just STS, but any user 
application.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22433: [SPARK-25442][SQL][K8S] Support STS to run in k8s deploy...

2018-09-16 Thread mridulm
Github user mridulm commented on the issue:

https://github.com/apache/spark/pull/22433
  
It is an implementation detail of k8s integration that application name is 
expected to be DNS compliant ... spark does not have that requirement; and 
yarn/mesos/standalone/local work without this restriction.
The right fix in k8s integration would be to sanitize the name specified by 
user/application to be compliant with its requirements. This will help not just 
with thrift server, but any spark application.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22433: [SPARK-25442][SQL][K8S] Support STS to run in k8s deploy...

2018-09-16 Thread mridulm
Github user mridulm commented on the issue:

https://github.com/apache/spark/pull/22433
  
Does it fail in k8s or does spark k8s code error out ?
If former, why not fix “name” handling in k8s to replace unsupported 
characters ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...

2018-09-15 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22010#discussion_r217901185
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -396,7 +396,26 @@ abstract class RDD[T: ClassTag](
* Return a new RDD containing the distinct elements in this RDD.
*/
   def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): 
RDD[T] = withScope {
-map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
+partitioner match {
+  case Some(p) if numPartitions == partitions.length =>
+def key(x: T): (T, Null) = (x, null)
+val cleanKey = sc.clean(key _)
+val keyed = new MapPartitionsRDD[(T, Null), T](
+  this,
+  (context, pid, iter) => iter.map(cleanKey),
+  knownPartitioner = Some(new WrappedPartitioner(p)))
+val duplicatesRemoved = keyed.reduceByKey((x, y) => x)
--- End diff --

Ah yes, no partitioner specified => use parent's partitioner.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...

2018-09-15 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22010#discussion_r217901179
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala 
---
@@ -35,16 +35,24 @@ import org.apache.spark.{Partition, TaskContext}
  * @param isOrderSensitive whether or not the function is order-sensitive. 
If it's order
  * sensitive, it may return totally different 
result when the input order
  * is changed. Mostly stateful functions are 
order-sensitive.
+ * @param knownPartitioner If the result has a known partitioner.
  */
 private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
 var prev: RDD[T],
 f: (TaskContext, Int, Iterator[T]) => Iterator[U],  // (TaskContext, 
partition index, iterator)
 preservesPartitioning: Boolean = false,
 isFromBarrier: Boolean = false,
-isOrderSensitive: Boolean = false)
+isOrderSensitive: Boolean = false,
+knownPartitioner: Option[Partitioner] = None)
   extends RDD[U](prev) {
 
-  override val partitioner = if (preservesPartitioning) 
firstParent[T].partitioner else None
+  override val partitioner = {
+if (preservesPartitioning) {
+  firstParent[T].partitioner
+} else {
+  knownPartitioner
+}
+  }
--- End diff --

Since we are already creating a `MapPartitionsRDD` in distinct, overriding 
`partitioner` should be trivial.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...

2018-09-14 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22010#discussion_r217876215
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -396,7 +396,26 @@ abstract class RDD[T: ClassTag](
* Return a new RDD containing the distinct elements in this RDD.
*/
   def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): 
RDD[T] = withScope {
-map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
+partitioner match {
+  case Some(p) if numPartitions == partitions.length =>
+def key(x: T): (T, Null) = (x, null)
+val cleanKey = sc.clean(key _)
+val keyed = new MapPartitionsRDD[(T, Null), T](
+  this,
+  (context, pid, iter) => iter.map(cleanKey),
+  knownPartitioner = Some(new WrappedPartitioner(p)))
+val duplicatesRemoved = keyed.reduceByKey((x, y) => x)
--- End diff --

Dont you need to specify the partitioner here ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...

2018-09-14 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22010#discussion_r217876184
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala 
---
@@ -35,16 +35,24 @@ import org.apache.spark.{Partition, TaskContext}
  * @param isOrderSensitive whether or not the function is order-sensitive. 
If it's order
  * sensitive, it may return totally different 
result when the input order
  * is changed. Mostly stateful functions are 
order-sensitive.
+ * @param knownPartitioner If the result has a known partitioner.
  */
 private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
 var prev: RDD[T],
 f: (TaskContext, Int, Iterator[T]) => Iterator[U],  // (TaskContext, 
partition index, iterator)
 preservesPartitioning: Boolean = false,
 isFromBarrier: Boolean = false,
-isOrderSensitive: Boolean = false)
+isOrderSensitive: Boolean = false,
+knownPartitioner: Option[Partitioner] = None)
   extends RDD[U](prev) {
 
-  override val partitioner = if (preservesPartitioning) 
firstParent[T].partitioner else None
+  override val partitioner = {
+if (preservesPartitioning) {
+  firstParent[T].partitioner
+} else {
+  knownPartitioner
+}
+  }
--- End diff --

We dont need to modify public api to add support for this.
Create a subclass of MapPartitionsRDD which has partitioner method 
overridden to specify what you need.
Did I miss something here ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22192: [SPARK-24918][Core] Executor Plugin API

2018-09-07 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22192#discussion_r216118263
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -136,6 +136,26 @@ private[spark] class Executor(
   // for fetching remote cached RDD blocks, so need to make sure it uses 
the right classloader too.
   env.serializerManager.setDefaultClassLoader(replClassLoader)
 
+  private val executorPlugins: Seq[ExecutorPlugin] = {
+val pluginNames = conf.get(EXECUTOR_PLUGINS)
+if (pluginNames.nonEmpty) {
+  logDebug(s"Initializing the following plugins: 
${pluginNames.mkString(", ")}")
+
+  // Plugins need to load using a class loader that includes the 
executor's user classpath
+  val pluginList: Seq[ExecutorPlugin] =
+Utils.withContextClassLoader(replClassLoader) {
+  val plugins = Utils.loadExtensions(classOf[ExecutorPlugin], 
pluginNames, conf)
+  plugins.foreach(_.init())
--- End diff --

On second thoughts, I am not sure if the latter should fail the executor 
... for example if plugin is unable to write a file, etc


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22192: [SPARK-24918][Core] Executor Plugin API

2018-09-07 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22192#discussion_r216116857
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -136,6 +136,26 @@ private[spark] class Executor(
   // for fetching remote cached RDD blocks, so need to make sure it uses 
the right classloader too.
   env.serializerManager.setDefaultClassLoader(replClassLoader)
 
+  private val executorPlugins: Seq[ExecutorPlugin] = {
+val pluginNames = conf.get(EXECUTOR_PLUGINS)
+if (pluginNames.nonEmpty) {
+  logDebug(s"Initializing the following plugins: 
${pluginNames.mkString(", ")}")
+
+  // Plugins need to load using a class loader that includes the 
executor's user classpath
+  val pluginList: Seq[ExecutorPlugin] =
+Utils.withContextClassLoader(replClassLoader) {
+  val plugins = Utils.loadExtensions(classOf[ExecutorPlugin], 
pluginNames, conf)
+  plugins.foreach(_.init())
--- End diff --

Just to confirm - the behavior we are going for is, if `init()` fails, then 
the executor does not come up - right ?
This could be invalid plugins specified or even initialization of one or 
more plugin's failing.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22192: [SPARK-24918][Core] Executor Plugin API

2018-09-07 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22192#discussion_r216116706
  
--- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala ---
@@ -240,6 +240,19 @@ private[spark] object Utils extends Logging {
 // scalastyle:on classforname
   }
 
+  /**
+   * Run a segment of code using a different context class loader in the 
current thread
+   */
+  def withContextClassLoader[T](ctxClassLoader: ClassLoader)(fn: => T): T 
= {
+val oldClassLoader = getContextOrSparkClassLoader
--- End diff --

Curious, why `getContextOrSparkClassLoader` and not just 
`Thread.getContextClassLoader` ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22192: [SPARK-24918][Core] Executor Plugin API

2018-09-06 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22192#discussion_r215750952
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -218,6 +244,8 @@ private[spark] class Executor(
 env.metricsSystem.report()
 heartbeater.shutdown()
 heartbeater.awaitTermination(10, TimeUnit.SECONDS)
+executorPluginThread.interrupt()
+executorPluginThread.join()
--- End diff --

We should be careful about semantics of `interrupt()` when overriding it - 
here we are using it to trigger something in addition to thread interruption.
For example, the actual shutdown of plugin's is getting triggered in this 
caller thread - and not in plugin init thread : which can cause issues since 
plugin's are using a a different thread ctx classloader.

This usage is actually a bit confusing.
A better option, if we want to follow current design, would be:
* Subclass Thread - expose methods to trigger state changes.
* Once init completes, wait+notify/condition wait until plugin shutdown is 
required.
* On stop(), invoke plugin shutdown in plugin thread (so that right 
classloader is in use).

IIRC VM shutdown results in `executor.stop` - and so plugin shutdown should 
get invoked even in that case.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22192: [SPARK-24918][Core] Executor Plugin API

2018-09-06 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22192#discussion_r215749556
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -136,6 +136,32 @@ private[spark] class Executor(
   // for fetching remote cached RDD blocks, so need to make sure it uses 
the right classloader too.
   env.serializerManager.setDefaultClassLoader(replClassLoader)
 
+  private val pluginList = conf.get(EXECUTOR_PLUGINS)
+  if (pluginList.nonEmpty) {
+logDebug(s"Initializing the following plugins: 
${pluginList.mkString(", ")}")
+  }
+
+  val executorPluginThread = new Thread {
+var plugins: Seq[ExecutorPlugin] = Nil
+
+override def run(): Unit = {
+  plugins = Utils.loadExtensions(classOf[ExecutorPlugin], pluginList, 
conf)
+  plugins.foreach(_.init())
+}
+
+override def interrupt(): Unit = {
+  plugins.foreach(_.shutdown())
+  super.interrupt()
+}
+  }
+
+  executorPluginThread.setContextClassLoader(replClassLoader)
+  executorPluginThread.start()
+
+  if (pluginList.nonEmpty) {
+logDebug("Finished initializing plugins")
+  }
+
--- End diff --

We should wait for plugin's to finish initialization (join() on thread) 
before moving on (particularly if we are emitting a message to indicate the 
same).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22192: [SPARK-24918][Core] Executor Plugin API

2018-09-06 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22192#discussion_r215749405
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -136,6 +136,32 @@ private[spark] class Executor(
   // for fetching remote cached RDD blocks, so need to make sure it uses 
the right classloader too.
   env.serializerManager.setDefaultClassLoader(replClassLoader)
 
+  private val pluginList = conf.get(EXECUTOR_PLUGINS)
+  if (pluginList.nonEmpty) {
+logDebug(s"Initializing the following plugins: 
${pluginList.mkString(", ")}")
+  }
+
+  val executorPluginThread = new Thread {
+var plugins: Seq[ExecutorPlugin] = Nil
+
+override def run(): Unit = {
+  plugins = Utils.loadExtensions(classOf[ExecutorPlugin], pluginList, 
conf)
+  plugins.foreach(_.init())
+}
+
+override def interrupt(): Unit = {
+  plugins.foreach(_.shutdown())
+  super.interrupt()
--- End diff --

super.interrupt in try/finally.
Also would be good to isolate the shutdown of each plugin from others (so 
that an exception thrown in one plugin does not prevent another plugin from 
shutting down)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22192: [SPARK-24918][Core] Executor Plugin API

2018-09-05 Thread mridulm
Github user mridulm commented on the issue:

https://github.com/apache/spark/pull/22192
  
@vanzin @NiharS I am uncomfortable with that change as well - which is why 
I wanted the initialization to be pushed into a separate thread (and then join) 
- if we really need to set the context classloader.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22192: [SPARK-24918][Core] Executor Plugin API

2018-08-29 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22192#discussion_r213819091
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -130,6 +130,14 @@ private[spark] class Executor(
   private val urlClassLoader = createClassLoader()
   private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader)
 
+  // Load plugins in the current thread, they are expected to not block.
+  // Heavy computation in plugin initialization should be done async.
+  Thread.currentThread().setContextClassLoader(replClassLoader)
+  conf.get(EXECUTOR_PLUGINS).foreach { classes =>
--- End diff --

IMO we should do this in a different thread.
It will isolate the execution creation/instantiation from executor 
initialization.
Also, we should do this after `Executor` initialization has completed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22192: [SPARK-24918][Core] Executor Plugin API

2018-08-29 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22192#discussion_r213818362
  
--- Diff: core/src/main/java/org/apache/spark/ExecutorPlugin.java ---
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark;
+
+import org.apache.spark.annotation.DeveloperApi;
+
+/**
+ * A plugin which can be automaticaly instantiated within each Spark 
executor.  Users can specify
+ * plugins which should be created with the "spark.executor.plugins" 
configuration.  An instance
+ * of each plugin will be created for every executor, including those 
created by dynamic allocation,
+ * before the executor starts running any tasks.
+ *
+ * The specific api exposed to the end users still considered to be very 
unstable.  We will
+ * *hopefully* be able to keep compatability by providing default 
implementations for any methods
+ * added, but make no guarantees this will always be possible across all 
spark releases.
+ *
+ * Spark does nothing to verify the plugin is doing legitimate things, or 
to manage the resources
+ * it uses.  A plugin acquires the same privileges as the user running the 
task.  A bad plugin
+ * could also intefere with task execution and make the executor fail in 
unexpected ways.
+ */
+@DeveloperApi
+public interface ExecutorPlugin {
--- End diff --

I agree, explicit start/stop is a good idea; it also makes the lifecycle 
more concrete.
With the explicit caveat that class loading need not result in 
instantiation, and object creation need not necessarily result in a start(), or 
a `stop()` being invoked for every `start()` (in case of VM failures).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22192: [SPARK-24918][Core] Executor Plugin API

2018-08-29 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22192#discussion_r213821565
  
--- Diff: core/src/test/java/org/apache/spark/ExecutorPluginSuite.java ---
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark;
+
+import org.apache.spark.api.java.JavaSparkContext;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+// Tests loading plugins into executors
+public class ExecutorPluginSuite {
+  // Static value modified by testing plugin to ensure plugin loaded 
correctly.
+  public static int numSuccessfulPlugins = 0;
+  private JavaSparkContext sc;
+
+  private String EXECUTOR_PLUGIN_CONF_NAME = "spark.executor.plugins";
+  private String testPluginName = 
"org.apache.spark.ExecutorPluginSuite$TestExecutorPlugin";
+
+  @Before
+  public void setUp() {
+sc = null;
+numSuccessfulPlugins = 0;
+  }
+
+  private SparkConf initializeSparkConf(String pluginNames) {
+return new SparkConf()
+.setMaster("local")
+.setAppName("test")
+.set(EXECUTOR_PLUGIN_CONF_NAME, pluginNames);
+  }
+
+  @Test
+  public void testPluginClassDoesNotExist() {
+SparkConf conf = initializeSparkConf("nonexistant.plugin");
+try {
+  sc = new JavaSparkContext(conf);
+} catch (Exception e) {
+  // We cannot catch ClassNotFoundException directly because Java 
doesn't think it'll be thrown
+  
assertTrue(e.toString().startsWith("java.lang.ClassNotFoundException"));
+} finally {
+  if (sc != null) {
+sc.stop();
+sc = null;
+  }
+}
+  }
+
+  @Test
+  public void testAddPlugin() throws InterruptedException {
+// Load the sample TestExecutorPlugin, which will change the value of 
pluginExecutionSuccessful
+SparkConf conf = initializeSparkConf(testPluginName);
+
+try {
+  sc = new JavaSparkContext(conf);
+} catch (Exception e) {
+  fail("Failed to start SparkContext with exception " + e.toString());
+}
+
+// Wait a moment since plugins run on separate threads
+Thread.sleep(500);
+
+assertEquals(1, numSuccessfulPlugins);
--- End diff --

After introducing an explicit start(), we can replace the `sleep` with a 
timed wait on a condition.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22209: [SPARK-24415][Core] Fixed the aggregated stage me...

2018-08-24 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22209#discussion_r212772176
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -350,11 +350,16 @@ private[spark] class AppStatusListener(
 val e = it.next()
 if (job.stageIds.contains(e.getKey()._1)) {
   val stage = e.getValue()
-  stage.status = v1.StageStatus.SKIPPED
-  job.skippedStages += stage.info.stageId
-  job.skippedTasks += stage.info.numTasks
-  it.remove()
-  update(stage, now)
+  // Only update the stage if it has not finished already
+  if (v1.StageStatus.ACTIVE.equals(stage.status) ||
+  v1.StageStatus.PENDING.equals(stage.status)) {
+stage.status = v1.StageStatus.SKIPPED
+job.skippedStages += stage.info.stageId
+job.skippedTasks += stage.info.numTasks
+job.activeStages -= 1
+it.remove()
--- End diff --

To clarify, I was referring to 'this' being job end event received before 
stage end (for a stage which is part of a job).

I was not referring to task end event's (those can come in after stage or 
job end's).

Thanks for clarifying @vanzin ... given the snippet is not trying to 
recover from events drop, wondering why "non"-skipped stages would even be in 
the list : I would expect all of them to be skipped ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...

2018-08-24 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22112#discussion_r212705976
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -1865,6 +1871,57 @@ abstract class RDD[T: ClassTag](
   // RDD chain.
   @transient protected lazy val isBarrier_ : Boolean =
 dependencies.filter(!_.isInstanceOf[ShuffleDependency[_, _, 
_]]).exists(_.rdd.isBarrier())
+
+  /**
+   * Returns the random level of this RDD's output. Please refer to 
[[RandomLevel]] for the
+   * definition.
+   *
+   * By default, an reliably checkpointed RDD, or RDD without parents(root 
RDD) is IDEMPOTENT. For
+   * RDDs with parents, we will generate a random level candidate per 
parent according to the
+   * dependency. The random level of the current RDD is the random level 
candidate that is random
+   * most. Please override [[getOutputRandomLevel]] to provide custom 
logic of calculating output
+   * random level.
+   */
+  // TODO: make it public so users can set random level to their custom 
RDDs.
+  // TODO: this can be per-partition. e.g. UnionRDD can have different 
random level for different
+  // partitions.
+  private[spark] final lazy val outputRandomLevel: RandomLevel.Value = {
+if 
(checkpointData.exists(_.isInstanceOf[ReliableRDDCheckpointData[_]])) {
+  RandomLevel.IDEMPOTENT
+} else {
+  getOutputRandomLevel
+}
+  }
+
+  @DeveloperApi
+  protected def getOutputRandomLevel: RandomLevel.Value = {
+val randomLevelCandidates = dependencies.map {
+  case dep: ShuffleDependency[_, _, _] =>
--- End diff --

`dep.rdd.partitioner` sorry


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22209: [SPARK-24415][Core] Fixed the issue where stage p...

2018-08-24 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22209#discussion_r212705634
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -350,11 +350,16 @@ private[spark] class AppStatusListener(
 val e = it.next()
 if (job.stageIds.contains(e.getKey()._1)) {
   val stage = e.getValue()
-  stage.status = v1.StageStatus.SKIPPED
-  job.skippedStages += stage.info.stageId
-  job.skippedTasks += stage.info.numTasks
-  it.remove()
-  update(stage, now)
+  // Only update the stage if it has not finished already
+  if (v1.StageStatus.ACTIVE.equals(stage.status) ||
+  v1.StageStatus.PENDING.equals(stage.status)) {
+stage.status = v1.StageStatus.SKIPPED
+job.skippedStages += stage.info.stageId
+job.skippedTasks += stage.info.numTasks
+job.activeStages -= 1
+it.remove()
--- End diff --

This can happen when events get dropped ...
Spark makes best case effort to deliver events in order; but when events 
get dropped, UI becomes inconsistent. I assume this might be an effort to 
recover in that case ? Any thoughts @tgravescs, @vanzin ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22209: [SPARK-24415][Core] Fixed the issue where stage p...

2018-08-23 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22209#discussion_r212491921
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -350,11 +350,16 @@ private[spark] class AppStatusListener(
 val e = it.next()
 if (job.stageIds.contains(e.getKey()._1)) {
   val stage = e.getValue()
-  stage.status = v1.StageStatus.SKIPPED
-  job.skippedStages += stage.info.stageId
-  job.skippedTasks += stage.info.numTasks
-  it.remove()
-  update(stage, now)
+  // Only update the stage if it has not finished already
+  if (v1.StageStatus.ACTIVE.equals(stage.status) ||
+  v1.StageStatus.PENDING.equals(stage.status)) {
+stage.status = v1.StageStatus.SKIPPED
+job.skippedStages += stage.info.stageId
+job.skippedTasks += stage.info.numTasks
+job.activeStages -= 1
+it.remove()
--- End diff --

Btw, there is an existing bug that we are not updating pool, etc which we 
do in onStageCompleted ...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22209: [SPARK-24415][Core] Fixed the issue where stage p...

2018-08-23 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22209#discussion_r212490964
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -350,11 +350,16 @@ private[spark] class AppStatusListener(
 val e = it.next()
 if (job.stageIds.contains(e.getKey()._1)) {
   val stage = e.getValue()
-  stage.status = v1.StageStatus.SKIPPED
-  job.skippedStages += stage.info.stageId
-  job.skippedTasks += stage.info.numTasks
-  it.remove()
-  update(stage, now)
+  // Only update the stage if it has not finished already
+  if (v1.StageStatus.ACTIVE.equals(stage.status) ||
+  v1.StageStatus.PENDING.equals(stage.status)) {
+stage.status = v1.StageStatus.SKIPPED
+job.skippedStages += stage.info.stageId
+job.skippedTasks += stage.info.numTasks
+job.activeStages -= 1
+it.remove()
--- End diff --

I am not sure I follow - if that is the case, why are we doing this for 
active stages here ? onStageCompleted/onTaskEnd would be fired for active 
stages as well.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22209: [SPARK-24415][Core] Fixed the issue where stage p...

2018-08-23 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22209#discussion_r212468772
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -350,11 +350,16 @@ private[spark] class AppStatusListener(
 val e = it.next()
 if (job.stageIds.contains(e.getKey()._1)) {
   val stage = e.getValue()
-  stage.status = v1.StageStatus.SKIPPED
-  job.skippedStages += stage.info.stageId
-  job.skippedTasks += stage.info.numTasks
-  it.remove()
-  update(stage, now)
+  // Only update the stage if it has not finished already
+  if (v1.StageStatus.ACTIVE.equals(stage.status) ||
+  v1.StageStatus.PENDING.equals(stage.status)) {
+stage.status = v1.StageStatus.SKIPPED
+job.skippedStages += stage.info.stageId
+job.skippedTasks += stage.info.numTasks
+job.activeStages -= 1
+it.remove()
--- End diff --

removal from iterator should always happen


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...

2018-08-23 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22112#discussion_r212462874
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1502,6 +1502,53 @@ private[spark] class DAGScheduler(
 failedStages += failedStage
 failedStages += mapStage
 if (noResubmitEnqueued) {
+  // If the map stage is INDETERMINATE, which means the map 
tasks may return
+  // different result when re-try, we need to re-try all the 
tasks of the failed
+  // stage and its succeeding stages, because the input data 
will be changed after the
+  // map tasks are re-tried.
+  // Note that, if map stage is UNORDERED, we are fine. The 
shuffle partitioner is
+  // guaranteed to be idempotent, so the input data of the 
reducers will not change even
+  // if the map tasks are re-tried.
+  if (mapStage.rdd.computingRandomLevel == 
RDD.RandomLevel.INDETERMINATE) {
+def rollBackStage(stage: Stage): Unit = stage match {
+  case mapStage: ShuffleMapStage =>
+val numMissingPartitions = 
mapStage.findMissingPartitions().length
+if (numMissingPartitions < mapStage.numTasks) {
+  markStageAsFinished(
+mapStage,
+Some("preceding shuffle map stage with random 
output gets retried."),
+willRetry = true)
+  
mapOutputTracker.unregisterAllMapOutput(mapStage.shuffleDep.shuffleId)
+  failedStages += mapStage
+}
+
+  case resultStage =>
+val numMissingPartitions = 
resultStage.findMissingPartitions().length
+if (numMissingPartitions < resultStage.numTasks) {
+  // TODO: support to rollback result tasks.
+  val errorMessage = "A shuffle map stage with random 
output was failed and " +
+s"retried. However, Spark cannot rollback the 
result stage $resultStage " +
+"to re-process the input data, and has to fail 
this job. Please " +
+"eliminate the randomness by checkpointing the RDD 
before " +
+"repartition/zip and try again."
+  abortStage(failedStage, errorMessage, None)
+}
+}
+
+def rollbackSucceedingStages(stageChain: List[Stage]): 
Unit = {
+  if (stageChain.head.id == failedStage.id) {
+stageChain.foreach { stage =>
+  if (!failedStages.contains(stage)) 
rollBackStage(stage)
+}
+  } else {
+stageChain.head.parents.foreach(s => 
rollbackSucceedingStages(s :: stageChain))
+  }
+}
--- End diff --


Something like this sketch was what I meant :
```
def rollbackSucceedingStages(stageChain: List[Stage], alreadyProcessed: 
Set[Int]): Set[Int] = {

  var processed = alreadyProcessed
  val stage = stageChain.head

  if (stage.id == failedStage.id) {
stageChain.foreach { stage =>
  if (!failedStages.contains(stage)) rollBackStage(stage)
}
  } else {
stage.parents.foreach(s =>
  if (! processed.contains(s.id)){
processed = rollbackSucceedingStages(s :: stageChain, processed)
})
  }

  processed + failedStage.id
}
```

(or perhaps with mutable Set to make it simpler ?)
This will reduce need to reprocess stages we have already handled in large 
dag's; where a stage subtree figures out in multiple places in the dag.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...

2018-08-23 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22112#discussion_r212452014
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2627,6 +2632,81 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 assert(countSubmittedMapStageAttempts() === 2)
   }
 
+  test("SPARK-23207: retry all the succeeding stages when the map stage is 
random") {
--- End diff --

We need to add a test where final result stage has not started (yet) but 
intermediate stages are retried on failure.
This will validate that if result stage has not yet started, it wont fail 
the job


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...

2018-08-23 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22112#discussion_r212451081
  
--- Diff: 
core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala ---
@@ -95,6 +99,18 @@ private[spark] class ZippedPartitionsRDD2[A: ClassTag, 
B: ClassTag, V: ClassTag]
 rdd2 = null
 f = null
   }
+
+  private def isRandomOrder(rdd: RDD[_]): Boolean = {
+rdd.computingRandomLevel == RDD.RandomLevel.UNORDERED
--- End diff --

`isRandomOrder` gives the impression that both cases (INDETERMINATE and 
UNORDERED) are 'random'.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21698: [SPARK-23243][Core] Fix RDD.repartition() data correctne...

2018-08-23 Thread mridulm
Github user mridulm commented on the issue:

https://github.com/apache/spark/pull/21698
  
@jiangxb1987 I am guessing we should close this PR ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...

2018-08-23 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22112#discussion_r212386645
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -1865,6 +1876,39 @@ abstract class RDD[T: ClassTag](
   // RDD chain.
   @transient protected lazy val isBarrier_ : Boolean =
 dependencies.filter(!_.isInstanceOf[ShuffleDependency[_, _, 
_]]).exists(_.rdd.isBarrier())
+
+  /**
+   * Returns the random level of this RDD's computing function. Please 
refer to [[RDD.RandomLevel]]
+   * for the definition of random level.
+   *
+   * By default, an RDD without parents(root RDD) is IDEMPOTENT. For RDDs 
with parents, the random
+   * level of current RDD is the random level of the parent which is 
random most.
+   */
+  // TODO: make it public so users can set random level to their custom 
RDDs.
+  // TODO: this can be per-partition. e.g. UnionRDD can have different 
random level for different
+  // partitions.
+  private[spark] def computingRandomLevel: RDD.RandomLevel.Value = {
+val parentRandomLevels = dependencies.map {
+  case dep: ShuffleDependency[_, _, _] =>
+if (dep.rdd.computingRandomLevel == RDD.RandomLevel.INDETERMINATE) 
{
+  RDD.RandomLevel.INDETERMINATE
--- End diff --

Crap, brain fart ... you are right it is UNORDERED and not INDETERMINATE 
... I am still getting my head around the terms :-(


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...

2018-08-23 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22112#discussion_r212385688
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -1865,6 +1876,39 @@ abstract class RDD[T: ClassTag](
   // RDD chain.
   @transient protected lazy val isBarrier_ : Boolean =
 dependencies.filter(!_.isInstanceOf[ShuffleDependency[_, _, 
_]]).exists(_.rdd.isBarrier())
+
+  /**
+   * Returns the random level of this RDD's computing function. Please 
refer to [[RDD.RandomLevel]]
+   * for the definition of random level.
+   *
+   * By default, an RDD without parents(root RDD) is IDEMPOTENT. For RDDs 
with parents, the random
+   * level of current RDD is the random level of the parent which is 
random most.
+   */
+  // TODO: make it public so users can set random level to their custom 
RDDs.
+  // TODO: this can be per-partition. e.g. UnionRDD can have different 
random level for different
+  // partitions.
+  private[spark] def computingRandomLevel: RDD.RandomLevel.Value = {
+val parentRandomLevels = dependencies.map {
+  case dep: ShuffleDependency[_, _, _] =>
+if (dep.rdd.computingRandomLevel == RDD.RandomLevel.INDETERMINATE) 
{
+  RDD.RandomLevel.INDETERMINATE
--- End diff --

RE: checkpoint.

I wanted to handle two cases.
* Checkpoint is being done as part of the current job (and not a previous 
job which forced materialization of checkpoint'ed RDD).
* Checkpoint is happening to reliable store, not local - where we are 
subject to failures on node failures.

Looks like `dep.rdd.isCheckpointed` is the wrong way to go about it 
(relying on `dependencies` is insufficient for both cases).

A better option seems to be:
```
  // If checkpointed already - then always same order
  case dep: Dependency if dep.rdd.getCheckpointFile.isDefined => 
RDD.RandomLevel.IDEMPOTENT
```

> Actually we know. As long as the shuffle map stage RDD is IDEMPOTENT or 
UNORDERED, the reduce RDD is UNORDERED instead of INDETERMINATE.

It does not matter what the output order of map stage was, after we shuffle 
the map output, it is always indeterminate order except for the specific cases 
I referred to above.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...

2018-08-23 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22112#discussion_r212192600
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -855,16 +858,17 @@ abstract class RDD[T: ClassTag](
* a map on the other).
*/
   def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope {
-zipPartitions(other, preservesPartitioning = false) { (thisIter, 
otherIter) =>
-  new Iterator[(T, U)] {
-def hasNext: Boolean = (thisIter.hasNext, otherIter.hasNext) match 
{
-  case (true, true) => true
-  case (false, false) => false
-  case _ => throw new SparkException("Can only zip RDDs with " +
-"same number of elements in each partition")
+zipPartitionsInternal(other, preservesPartitioning = false, 
orderSensitiveFunc = true) {
+  (thisIter, otherIter) =>
+new Iterator[(T, U)] {
+  def hasNext: Boolean = (thisIter.hasNext, otherIter.hasNext) 
match {
+case (true, true) => true
+case (false, false) => false
+case _ => throw new SparkException("Can only zip RDDs with " +
+  "same number of elements in each partition")
+  }
+  def next(): (T, U) = (thisIter.next(), otherIter.next())
 }
-def next(): (T, U) = (thisIter.next(), otherIter.next())
-  }
--- End diff --

Bulk of the change here is simply indentation right (except for 
zipPartitions -> zipPartitionsInternal and flag) ? I want to make sure I did 
not miss something here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...

2018-08-23 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22112#discussion_r212199604
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1502,6 +1502,53 @@ private[spark] class DAGScheduler(
 failedStages += failedStage
 failedStages += mapStage
 if (noResubmitEnqueued) {
+  // If the map stage is INDETERMINATE, which means the map 
tasks may return
+  // different result when re-try, we need to re-try all the 
tasks of the failed
+  // stage and its succeeding stages, because the input data 
will be changed after the
+  // map tasks are re-tried.
+  // Note that, if map stage is UNORDERED, we are fine. The 
shuffle partitioner is
+  // guaranteed to be idempotent, so the input data of the 
reducers will not change even
+  // if the map tasks are re-tried.
+  if (mapStage.rdd.computingRandomLevel == 
RDD.RandomLevel.INDETERMINATE) {
+def rollBackStage(stage: Stage): Unit = stage match {
+  case mapStage: ShuffleMapStage =>
+val numMissingPartitions = 
mapStage.findMissingPartitions().length
+if (numMissingPartitions < mapStage.numTasks) {
+  markStageAsFinished(
+mapStage,
+Some("preceding shuffle map stage with random 
output gets retried."),
+willRetry = true)
+  
mapOutputTracker.unregisterAllMapOutput(mapStage.shuffleDep.shuffleId)
+  failedStages += mapStage
+}
+
+  case resultStage =>
+val numMissingPartitions = 
resultStage.findMissingPartitions().length
+if (numMissingPartitions < resultStage.numTasks) {
+  // TODO: support to rollback result tasks.
+  val errorMessage = "A shuffle map stage with random 
output was failed and " +
+s"retried. However, Spark cannot rollback the 
result stage $resultStage " +
+"to re-process the input data, and has to fail 
this job. Please " +
+"eliminate the randomness by checkpointing the RDD 
before " +
+"repartition/zip and try again."
+  abortStage(failedStage, errorMessage, None)
--- End diff --

This ends up abort'ing the job if I am not wrong : while we want to retry 
the stage.
+CC @markhamstra, @tgravescs  if there is a better way to accomplish result 
stage rollback.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...

2018-08-23 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22112#discussion_r212195284
  
--- Diff: 
core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala ---
@@ -95,6 +99,18 @@ private[spark] class ZippedPartitionsRDD2[A: ClassTag, 
B: ClassTag, V: ClassTag]
 rdd2 = null
 f = null
   }
+
+  private def isRandomOrder(rdd: RDD[_]): Boolean = {
+rdd.computingRandomLevel == RDD.RandomLevel.UNORDERED
--- End diff --

Instead of this, check if order is not IDEMPOTENT.
In both unordered and indeterminate case, we should return indeterminate 
for this zip rdd from computingRandomLevel.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...

2018-08-23 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22112#discussion_r212199007
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1502,6 +1502,53 @@ private[spark] class DAGScheduler(
 failedStages += failedStage
 failedStages += mapStage
 if (noResubmitEnqueued) {
+  // If the map stage is INDETERMINATE, which means the map 
tasks may return
+  // different result when re-try, we need to re-try all the 
tasks of the failed
+  // stage and its succeeding stages, because the input data 
will be changed after the
+  // map tasks are re-tried.
+  // Note that, if map stage is UNORDERED, we are fine. The 
shuffle partitioner is
+  // guaranteed to be idempotent, so the input data of the 
reducers will not change even
+  // if the map tasks are re-tried.
+  if (mapStage.rdd.computingRandomLevel == 
RDD.RandomLevel.INDETERMINATE) {
+def rollBackStage(stage: Stage): Unit = stage match {
+  case mapStage: ShuffleMapStage =>
+val numMissingPartitions = 
mapStage.findMissingPartitions().length
+if (numMissingPartitions < mapStage.numTasks) {
+  markStageAsFinished(
+mapStage,
+Some("preceding shuffle map stage with random 
output gets retried."),
+willRetry = true)
+  
mapOutputTracker.unregisterAllMapOutput(mapStage.shuffleDep.shuffleId)
+  failedStages += mapStage
+}
+
+  case resultStage =>
+val numMissingPartitions = 
resultStage.findMissingPartitions().length
+if (numMissingPartitions < resultStage.numTasks) {
+  // TODO: support to rollback result tasks.
+  val errorMessage = "A shuffle map stage with random 
output was failed and " +
+s"retried. However, Spark cannot rollback the 
result stage $resultStage " +
+"to re-process the input data, and has to fail 
this job. Please " +
+"eliminate the randomness by checkpointing the RDD 
before " +
+"repartition/zip and try again."
+  abortStage(failedStage, errorMessage, None)
+}
+}
+
+def rollbackSucceedingStages(stageChain: List[Stage]): 
Unit = {
+  if (stageChain.head.id == failedStage.id) {
+stageChain.foreach { stage =>
+  if (!failedStages.contains(stage)) 
rollBackStage(stage)
+}
+  } else {
+stageChain.head.parents.foreach(s => 
rollbackSucceedingStages(s :: stageChain))
+  }
+}
+
+rollBackStage(failedStage)
--- End diff --

This would be implicitly handled anyway, right ? Any reason to specifically 
do it here ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...

2018-08-23 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22112#discussion_r212193814
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -1876,6 +1920,22 @@ abstract class RDD[T: ClassTag](
  */
 object RDD {
 
+  /**
+   * The random level of RDD's computing function, which indicates the 
behavior when rerun the
+   * computing function. There are 3 random levels, ordered by the 
randomness from low to high:
+   * 1. IDEMPOTENT: The computing function always return the same result 
with same order when rerun.
+   * 2. UNORDERED: The computing function returns same data set in 
potentially a different order
+   *   when rerun.
+   * 3. INDETERMINATE. The computing function may return totally different 
result when rerun.
+   *
+   * Note that, the output of the computing function usually relies on 
parent RDDs. When a
+   * parent RDD's computing function is random, it's very likely this 
computing function is also
+   * random.
+   */
+  object RandomLevel extends Enumeration {
--- End diff --

While reviewing the PR, this enumeration looks a bit unclear.
We are modelling two things here :
* Behavior of `compute` in RDD - whether it is idempotent, order sensitive 
or indeterminate.
also,
* What is the input order of the tuples coming in from the partition to 
`RDD.compute`

For example, shuffle output of a sorted RDD would be `IDEMPOTENT` by this 
definition - but idempotent is a functional behavior of a closure, not input 
order. Perhaps we need to convey this better ? Or does 'idempotent' order make 
sense ?
Thoughts @cloud-fan, @markhamstra ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...

2018-08-23 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22112#discussion_r212198632
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -70,7 +70,8 @@ class MyRDD(
 numPartitions: Int,
 dependencies: List[Dependency[_]],
 locations: Seq[Seq[String]] = Nil,
-@(transient @param) tracker: MapOutputTrackerMaster = null)
+@(transient @param) tracker: MapOutputTrackerMaster = null,
+isRandom: Boolean = false)
--- End diff --

isRandom -> unordered


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...

2018-08-23 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22112#discussion_r212197939
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1502,6 +1502,53 @@ private[spark] class DAGScheduler(
 failedStages += failedStage
 failedStages += mapStage
 if (noResubmitEnqueued) {
+  // If the map stage is INDETERMINATE, which means the map 
tasks may return
+  // different result when re-try, we need to re-try all the 
tasks of the failed
+  // stage and its succeeding stages, because the input data 
will be changed after the
+  // map tasks are re-tried.
+  // Note that, if map stage is UNORDERED, we are fine. The 
shuffle partitioner is
+  // guaranteed to be idempotent, so the input data of the 
reducers will not change even
+  // if the map tasks are re-tried.
+  if (mapStage.rdd.computingRandomLevel == 
RDD.RandomLevel.INDETERMINATE) {
+def rollBackStage(stage: Stage): Unit = stage match {
+  case mapStage: ShuffleMapStage =>
+val numMissingPartitions = 
mapStage.findMissingPartitions().length
+if (numMissingPartitions < mapStage.numTasks) {
+  markStageAsFinished(
+mapStage,
+Some("preceding shuffle map stage with random 
output gets retried."),
+willRetry = true)
+  
mapOutputTracker.unregisterAllMapOutput(mapStage.shuffleDep.shuffleId)
+  failedStages += mapStage
+}
+
+  case resultStage =>
+val numMissingPartitions = 
resultStage.findMissingPartitions().length
+if (numMissingPartitions < resultStage.numTasks) {
+  // TODO: support to rollback result tasks.
+  val errorMessage = "A shuffle map stage with random 
output was failed and " +
+s"retried. However, Spark cannot rollback the 
result stage $resultStage " +
+"to re-process the input data, and has to fail 
this job. Please " +
+"eliminate the randomness by checkpointing the RDD 
before " +
+"repartition/zip and try again."
+  abortStage(failedStage, errorMessage, None)
+}
+}
+
+def rollbackSucceedingStages(stageChain: List[Stage]): 
Unit = {
+  if (stageChain.head.id == failedStage.id) {
+stageChain.foreach { stage =>
+  if (!failedStages.contains(stage)) 
rollBackStage(stage)
+}
+  } else {
+stageChain.head.parents.foreach(s => 
rollbackSucceedingStages(s :: stageChain))
+  }
+}
--- End diff --

This method looks expensive for large DAG's, memorization should help 
reduce the cost.
Compute the set of stages to rollback and rollback the stages found.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...

2018-08-23 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22112#discussion_r212192065
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -1876,6 +1920,22 @@ abstract class RDD[T: ClassTag](
  */
 object RDD {
 
+  /**
+   * The random level of RDD's computing function, which indicates the 
behavior when rerun the
+   * computing function. There are 3 random levels, ordered by the 
randomness from low to high:
+   * 1. IDEMPOTENT: The computing function always return the same result 
with same order when rerun.
+   * 2. UNORDERED: The computing function returns same data set in 
potentially a different order
+   *   when rerun.
+   * 3. INDETERMINATE. The computing function may return totally different 
result when rerun.
+   *
+   * Note that, the output of the computing function usually relies on 
parent RDDs. When a
+   * parent RDD's computing function is random, it's very likely this 
computing function is also
+   * random.
+   */
+  object RandomLevel extends Enumeration {
--- End diff --

RandomLevel is not very descriptive; particularly after rename of the 
levels to UNORDERED/INDETERMINATE.
OrderSensitivity or something else more descriptive.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...

2018-08-23 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22112#discussion_r212192772
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -1865,6 +1876,39 @@ abstract class RDD[T: ClassTag](
   // RDD chain.
   @transient protected lazy val isBarrier_ : Boolean =
 dependencies.filter(!_.isInstanceOf[ShuffleDependency[_, _, 
_]]).exists(_.rdd.isBarrier())
+
+  /**
+   * Returns the random level of this RDD's computing function. Please 
refer to [[RDD.RandomLevel]]
+   * for the definition of random level.
+   *
+   * By default, an RDD without parents(root RDD) is IDEMPOTENT. For RDDs 
with parents, the random
+   * level of current RDD is the random level of the parent which is 
random most.
+   */
+  // TODO: make it public so users can set random level to their custom 
RDDs.
+  // TODO: this can be per-partition. e.g. UnionRDD can have different 
random level for different
+  // partitions.
+  private[spark] def computingRandomLevel: RDD.RandomLevel.Value = {
--- End diff --

We will need to expose this with `@Experimental` tag - cant keep it 
`private[spark]` given the implications for custom RDD's.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...

2018-08-23 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22112#discussion_r212193206
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -1865,6 +1876,39 @@ abstract class RDD[T: ClassTag](
   // RDD chain.
   @transient protected lazy val isBarrier_ : Boolean =
 dependencies.filter(!_.isInstanceOf[ShuffleDependency[_, _, 
_]]).exists(_.rdd.isBarrier())
+
+  /**
+   * Returns the random level of this RDD's computing function. Please 
refer to [[RDD.RandomLevel]]
+   * for the definition of random level.
+   *
+   * By default, an RDD without parents(root RDD) is IDEMPOTENT. For RDDs 
with parents, the random
+   * level of current RDD is the random level of the parent which is 
random most.
+   */
+  // TODO: make it public so users can set random level to their custom 
RDDs.
+  // TODO: this can be per-partition. e.g. UnionRDD can have different 
random level for different
+  // partitions.
+  private[spark] def computingRandomLevel: RDD.RandomLevel.Value = {
+val parentRandomLevels = dependencies.map {
+  case dep: ShuffleDependency[_, _, _] =>
+if (dep.rdd.computingRandomLevel == RDD.RandomLevel.INDETERMINATE) 
{
+  RDD.RandomLevel.INDETERMINATE
--- End diff --

It does not matter what the parent RDD's order was - after shuffle, 
currently, it is going to be always UNORDERED - unless Aggregator and key 
ordering is specified in dep.
Given [this 
comment](https://github.com/apache/spark/pull/22112#issuecomment-414034703), 
and adding a few missing cases, this becomes:
```
  // If checkpointed already - then always same order
  case dep: Dependency if dep.rdd.isCheckpointed => 
RDD.RandomLevel.IDEMPOTENT
  // if same partitioner, then shuffle not done.
  case dep: ShuffleDependency[_, _, _] if dep.partitioner == partitioner => 
dep.rdd.computingRandomLevel
  // if aggregator specified (and so unique keys) and key ordering 
specified - then consistent ordering.
  case dep: ShuffleDependency[_, _, _] if dep.keyOrdering.isDefined && 
dep.aggregator.isDefined =>
RDD.RandomLevel.IDEMPOTENT
  // All other shuffle cases, we dont know the output order in spark.
  case dep: ShuffleDependency[_, _, _] => RDD.RandomLevel.INDETERMINATE
```



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...

2018-08-23 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22112#discussion_r212192261
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala 
---
@@ -54,4 +58,12 @@ private[spark] class MapPartitionsRDD[U: ClassTag, T: 
ClassTag](
 
   @transient protected lazy override val isBarrier_ : Boolean =
 isFromBarrier || dependencies.exists(_.rdd.isBarrier())
+
+  override private[spark] def computingRandomLevel = {
--- End diff --

computingRandomLevel -> computeOrderSensitivity
(suffix to match what `RandomLevel` gets renamed to)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...

2018-08-23 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22112#discussion_r212190267
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala 
---
@@ -32,12 +32,16 @@ import org.apache.spark.{Partition, TaskContext}
  *  doesn't modify the keys.
  * @param isFromBarrier Indicates whether this RDD is transformed from an 
RDDBarrier, a stage
  *  containing at least one RDDBarrier shall be turned 
into a barrier stage.
+ * @param orderSensitiveFunc whether or not the zip function is 
order-sensitive. If it's order
--- End diff --

remove 'zip'


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...

2018-08-23 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22112#discussion_r212196598
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1502,6 +1502,53 @@ private[spark] class DAGScheduler(
 failedStages += failedStage
 failedStages += mapStage
 if (noResubmitEnqueued) {
+  // If the map stage is INDETERMINATE, which means the map 
tasks may return
+  // different result when re-try, we need to re-try all the 
tasks of the failed
+  // stage and its succeeding stages, because the input data 
will be changed after the
+  // map tasks are re-tried.
+  // Note that, if map stage is UNORDERED, we are fine. The 
shuffle partitioner is
+  // guaranteed to be idempotent, so the input data of the 
reducers will not change even
+  // if the map tasks are re-tried.
+  if (mapStage.rdd.computingRandomLevel == 
RDD.RandomLevel.INDETERMINATE) {
+def rollBackStage(stage: Stage): Unit = stage match {
+  case mapStage: ShuffleMapStage =>
+val numMissingPartitions = 
mapStage.findMissingPartitions().length
+if (numMissingPartitions < mapStage.numTasks) {
+  markStageAsFinished(
+mapStage,
+Some("preceding shuffle map stage with random 
output gets retried."),
+willRetry = true)
+  
mapOutputTracker.unregisterAllMapOutput(mapStage.shuffleDep.shuffleId)
+  failedStages += mapStage
+}
+
+  case resultStage =>
+val numMissingPartitions = 
resultStage.findMissingPartitions().length
+if (numMissingPartitions < resultStage.numTasks) {
--- End diff --

IIRC this can be a valid case - for example if result stage is being run on 
only a subset of partitions (first() for example) : so number of missing 
partitions can be legitimately > numTasks and still have missing relevant 
partitions.
I might be a bit rusty here though, +CC @markhamstra 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22112: [SPARK-23243][Core] Fix RDD.repartition() data correctne...

2018-08-22 Thread mridulm
Github user mridulm commented on the issue:

https://github.com/apache/spark/pull/22112
  

@tgravescs:
> The shuffle simply transfers the bytes its supposed to. Sparks shuffle of 
those bytes is not consistent in that the order it fetches from can change and 
without the sort happening on that data the order can be different on rerun. I 
guess maybe you mean the ShuffledRDD as a whole or do you mean something else 
here?


By shuffle, I am referring to the output of shuffle which is be consumed by 
RDD with `ShuffleDependency` as input.
More specifically, the output of 
`SparkEnv.get.shuffleManager.getReader(...).read()` which RDD (user and spark 
impl's) uses to fetch output of shuffle machinery.
This output will not just be shuffle bytes/deserialize, but with 
aggregation applied (if specified) and ordering imposed (if specified).

ShuffledRDD is one such usage within spark core, but others exist within 
spark core and in user code.

> All I'm saying is zip is just another variant of this, you could document 
it as such and do nothing internal to spark to "fix it".

I agree; repartition + shuffle, zip, sample, mllib usages are all variants 
of the same problem - of shuffle output order being inconsistent.

> I guess we can separate out these 2 discussions. I think the point of 
this pr is to temporarily workaround the data loss/corruption issue with 
repartition by failing. So if everyone agrees on that lets move the discussion 
to a jira about what to do with the rest of the operators and fix repartition 
here. thoughts?

Sounds good to me.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22112: [SPARK-23243][Core] Fix RDD.repartition() data correctne...

2018-08-22 Thread mridulm
Github user mridulm commented on the issue:

https://github.com/apache/spark/pull/22112
  
Catching up on discussion ...

@cloud-fan
> shuffled RDD will never be deterministic unless the shuffle key is the 
entire record and key ordering is specified. 

Let me rephrase that - key ordering with aggregator specified.
Unfortunately this will then mean it is applicable only to custom user code 
- since default spark api's do not set both.

> The reduce task fetches multiple remote shuffle blocks at the same time, 
so the order is always random.

This is not a characteristics of shuffle in MR based systems, but an 
implementation detail of shuffle in spark.
In hadoop mapreduce, for example, shuffle output is always ordered and this 
problem does not occur.

>  In Addition, Spark SQL never specifies key ordering.

Spark SQL has re-implemented a lot of the spark core primitives - given 
this, I would expect spark sql to :
* When there is a rdd view gets generated off a dataframe, a local sort be 
introduced where appropriate - as has already been done in SPARK-23207 for 
repartition case. and/or
* appropriately expose IDEMPOTENT, UNORDERED and INDETERMINATE in RDD view.

@tgravescs 
> I don't agree that " We actually cannot support random output". Users can 
do this now in MR and spark and we can't really stop them other then say we 
don't support and if you do failure handling will cause different results.

What I mentioned was not specific to spark, but general to any MR like 
system.
This applies even in hadoop mapreduce and used to be a bug in some of our 
pig udf's :-)
For example, if there is random output generated in mapper and there are 
node failures during reducer phase (after all mapper's have completed), the 
exact same problem would occur with random mapper output.
We cannot, ofcourse, stop users from doing it - but we do not guarantee 
correct results (just as hadoop mapreduce does not in this scenario).

>  I don't want us to document it away now and then change our mind in next 
release. Our end decision should be final.

My current thought is as follows:

Without making shuffle output order repeatable, we do not have a way to 
properly fix this.
My understanding from @jiangxb1987, who has looked at it in detail with 
@sameeragarwal and others, is that this is a very difficult invariant to 
achieve in current spark codebase for shuffle in general.
(Please holler if I am off base @jiangxb1987 !)

With the assumption that we cannot currently fix this - explicitly warn'ing 
user and/or reschedule all tasks/stages for correctness might be a good stop 
gap.
User's could mitigate the performance impact via checkpoint'ing [1] - I 
would expect this to be the go-to solution; for any non trivial job, the perf 
characteristics and SLA violations are going to be terrible after this patch is 
applied when failures occur : but we should not have any data loss.

In future, we might resolve this issue in a more principled manner.

[1] As @cloud-fan's pointed out 
[here|https://github.com/apache/spark/pull/22112#issuecomment-414034703] sort 
is not gaurantee'ed to work - unless key's are unique : since ordering is 
defined only on key and not value (and so value re-order can occur).

@cloud-fan 
> This is the problem we are resolving here. This assumption is incorrect, 
and the RDD closure should handle it, or use what I proposed in this PR: the 
retry strategy.

I would disagree with this - this is an artifact of implementation detail 
of spark shuffle - and is not the expected behavior for a MR based system.
Unfortunately, this has been the behavior since beginning IMO (atleast 
since 0.6)
IMO this was not a conscious design choice, but rather an oversight.

> IIRC @mridulm didn't agree with it. One problem is that, it's hard for 
users to realize that Spark returns wrong result, so they don't know when to 
handle it.

Actually I would expect user's to end up doing either of these two - the 
perf characteristics and lack of predictability in SLA after this patch are 
going to force users to choose one of the two.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22112: [SPARK-23243][Core] Fix RDD.repartition() data correctne...

2018-08-17 Thread mridulm
Github user mridulm commented on the issue:

https://github.com/apache/spark/pull/22112
  
@tgravescs Please see 
https://github.com/apache/spark/pull/22112#discussion_r210788359 for a further 
elaboration. We actually cannot support random order (except for small subset 
of cases like map-only jobs for example).
Ideally, I would like to see order sensitive closure's fixed - and fixing 
repartition + shuffle would fix this for a general case for all order sensitive 
closures.
This PR is not fixing the problem, but rather failing and re-trying the job 
as a workaround - which, as you mention, can be terribly expensive for large 
jobs. Ofcourse, data correctness trump's performance, so I am fine with this as 
stop-gap. I would expect most non trivial application's will simply workaround 
this by checkpoint'ing to hdfs like what we did in YST.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...

2018-08-17 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22112#discussion_r210963665
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -1864,6 +1877,22 @@ abstract class RDD[T: ClassTag](
   // From performance concern, cache the value to avoid repeatedly compute 
`isBarrier()` on a long
   // RDD chain.
   @transient protected lazy val isBarrier_ : Boolean = 
dependencies.exists(_.rdd.isBarrier())
+
+  /**
+   * Whether the RDD's computing function is idempotent. Idempotent means 
the computing function
+   * not only satisfies the requirement, but also produce the same output 
sequence(the output order
+   * can't vary) given the same input sequence. Spark assumes all the RDDs 
are idempotent, except
+   * for the shuffle RDD and RDDs derived from non-idempotent RDD.
+   */
--- End diff --

This will mean all rdd's which are directly or indirectly reading from an 
unsorted shuffle output are not 'idempotent'.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...

2018-08-17 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22112#discussion_r210963213
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -853,6 +861,11 @@ abstract class RDD[T: ClassTag](
* second element in each RDD, etc. Assumes that the two RDDs have the 
*same number of
* partitions* and the *same number of elements in each partition* (e.g. 
one was made through
* a map on the other).
+   *
+   * Note that, `zip` violates the requirement of the RDD computing 
function. If the order of input
+   * data changes, `zip` will return different result. Because of this, 
Spark may return unexpected
+   * result if there is a shuffle after `zip`, and the shuffle failed and 
retried. To workaround
+   * this, users can call `zipPartitions` and sort the input data before 
zip.
--- End diff --

All zip method are affected by it, not just this one.
I added a list of other methods I have used from memory (though 
unfortunately it is not exhaustive)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...

2018-08-17 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22112#discussion_r210967814
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1441,6 +1441,44 @@ class DAGScheduler(
 failedStages += failedStage
 failedStages += mapStage
 if (noResubmitEnqueued) {
+  // If the map stage is not idempotent(produces data in a 
different order when retry)
+  // and the shuffle partitioner is order sensitive, we have 
to retry all the tasks of
+  // the failed stage and its succeeding stages, because the 
input data of the failed
+  // stage will be changed after the map tasks are re-tried.
+  if (!mapStage.rdd.isIdempotent && 
mapStage.shuffleDep.orderSensitivePartitioner) {
+def rollBackStage(stage: Stage): Unit = stage match {
+  case mapStage: ShuffleMapStage =>
+if (mapStage.findMissingPartitions().length < 
mapStage.numPartitions) {
--- End diff --

This is making an assumption that all partitions of a stage are getting 
computed - which is not necessarily true in a general case (see numTasks vs 
numPartitions).



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...

2018-08-17 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22112#discussion_r210964794
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -1864,6 +1877,22 @@ abstract class RDD[T: ClassTag](
   // From performance concern, cache the value to avoid repeatedly compute 
`isBarrier()` on a long
   // RDD chain.
   @transient protected lazy val isBarrier_ : Boolean = 
dependencies.exists(_.rdd.isBarrier())
+
+  /**
+   * Whether the RDD's computing function is idempotent. Idempotent means 
the computing function
+   * not only satisfies the requirement, but also produce the same output 
sequence(the output order
+   * can't vary) given the same input sequence. Spark assumes all the RDDs 
are idempotent, except
+   * for the shuffle RDD and RDDs derived from non-idempotent RDD.
+   */
+  // TODO: Add public APIs to allow users to mark their RDD as 
non-idempotent.
+  // TODO: this can be per-partition. e.g. UnionRDD can have part of its 
partitions idempotent.
+  private[spark] def isIdempotent: Boolean = {
+dependencies.forall { dep =>
+  // Shuffle RDD is always considered as non-idempotent, because its 
computing function needs
+  // to fetch remote shuffle blocks, and these fetched blocks may 
arrive in a random order.
+  !dep.isInstanceOf[ShuffleDependency[_, _, _]] && dep.rdd.isIdempotent
--- End diff --

This is too strict.
As I discussed with @jiangxb1987 , something like this would be better:
```
  dep =>
dep match {
  case shuffleDep: ShuffleDependency[_, _, _] => 
shuffleDep.keyOrdering.isDefined
  // IIRC this is not comprehensive if checkpoint is happening as part 
of this job.
  case checkpointedDep: Dependency[_] if 
checkpointedDep.rdd.isCheckpointed => true
  case _ => dep.rdd.isIdempotent
}
```

Note that this method can end up with stack overflow error's - please refer 
to `DAGScheduler.stageDependsOn` which does a similar dependency traveral (but 
for different purpose).



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...

2018-08-17 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22112#discussion_r210788359
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -112,6 +112,11 @@ abstract class RDD[T: ClassTag](
   /**
* :: DeveloperApi ::
* Implemented by subclasses to compute a given partition.
+   *
+   * Spark requires the computing function to always output the same data 
set(the order can vary)
+   * given the same input data set. For example, a computing function that 
increases each input
+   * value by 1 is valid, a computing function that increases each input 
by a random value is
+   * invalid.
--- End diff --

"a computing function that increases each input by a random value is 
invalid." is too stringent a restriction imo and not in line with the preceding 
requirement of essentially generating same output given same input data 
iterator.

As I mentioned before, in typical MR based systems there are three types of 
closures in general:
* Truely random output - this is something MR systems do not support well 
except in some very specific cases: primarily due to repeatability of 
computation not being guaranteed.
  * I am not considering pseudo random repeatable computations here.
* order sensitive output - task output is dependent on both input order and 
computation.
  * In spark core we have:
* coalesce with shuffle (repartition uses this)
* \*sample\*
* zip*
* random*
* glom, take, etc.
* I am ignoring the \*approx\* variants - since they are 
approximation's by definition.
  * There should be a lot of usage within mllib (atleast yahoo's internal 
BigML library did have a lot of it).
  * I vaguely remember a bunch of usages in graphx when I had last checked 
it a few years back.
  * I dont have much context into sql, but you should know about that 
better :-)
* record dependent - output for a single record depends only on current 
input record.
  * per record filter, map, etc.
  * A variant of this is where the entire partition is processed by closure 
- without regard to order : that is, the input iterator is consumed in 
mapPartitions, but closure is agnostic to tuple order (count() in spark core is 
a trivial example).

Order sensitive output is supported by hadoop mapreduce systems due to the 
sort which is done as part of shuffle (all keys are comparable) - but in spark, 
this requires specifying keyOrdering to explicitly enable shuffle output sort 
(as part of global sort or per partition sort). 
This allows spark to avoid the cost when it is not applicable (if shuffle 
map task is only record dependent) : but in case of order sensitive input's, 
this assumption breaks.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...

2018-08-17 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22112#discussion_r210756079
  
--- Diff: core/src/main/scala/org/apache/spark/Dependency.scala ---
@@ -94,6 +94,16 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: 
ClassTag](
 shuffleId, _rdd.partitions.length, this)
 
   _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
+
+  /**
+   * whether the partitioner is order sensitive to the input data and will 
partition shuffle output
+   * differently if the input data order changes. For example, hash and 
range partitioners are
+   * order insensitive, round-robin partitioner is order sensitive. This 
is a property of
+   * `ShuffleDependency` instead of `Partitioner`, because it's common 
that a map task partitions
+   * its output by itself and use a dummy partitioner later.
+   */
+  // This is defined as a `var` here instead of the constructor, to pass 
the mima check.
+  private[spark] var orderSensitivePartitioner: Boolean = false
--- End diff --

Output order is orthogonal to what the partitioner is - but enforced by 
whether `keyOrdering` is defined in shuffle dependency. We should not associate 
order sensitivity to partitioner (which has no influence on order).

> "For example, hash and range partitioners are order insensitive, 
round-robin partitioner is order sensitive".
There is no round robin partitioner in spark core.

Similarly 
> "... because it's common that a map task partitions its output by itself 
and use a dummy partitioner later." 

Tasks do not partition - that is the responsibility of partitioner.
There can be implementations where tasks and partitioner work in tandem - 
but that is an impl detail of user code (`distributePartition` is essentially 
user code which happens to be bundled as part of spark).

In general, something like this would suffice here (whether rdd is ordered 
or not):

`def isOrdered = keyOrdering.isDefined`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22112: [SPARK-23243][Core] Fix RDD.repartition() data correctne...

2018-08-16 Thread mridulm
Github user mridulm commented on the issue:

https://github.com/apache/spark/pull/22112
  
@tgravescs To understand better, are you suggesting that we do not support 
any api and/or user closure which depends on input order ?
If yes, that would break not just repartition + shuffle, but also other 
publically exposed api in spark core and (my guess) non trivial aspects of 
mllib.

Or is it that we support repartition and possibly a few other high priority 
cases (sampling in mllib for example ?) and not support the rest ?

My (unproven) contention is that solution for repartition + shuffle would 
be a general solution (or very close to it) : which will then work for all 
other cases with suitable modifications as required.
By "expand solution to cover all later.", I was referring to these changes 
to leverage whatever we build for repartition in other usecases- for example 
set appropriate parameters, etc in interest of time.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22112: [SPARK-23243][Core] Fix RDD.repartition() data correctne...

2018-08-16 Thread mridulm
Github user mridulm commented on the issue:

https://github.com/apache/spark/pull/22112
  
@tgravescs I was specifically in agreement with
> Personally I don't want to talk about implementation until we decide what 
we want our semantics to be around the unordered operations because that 
affects any implementation.

and

> I would propose we fix the things that are using the round robin type 
partitioning (repartition) but then unordered things like zip/MapPartitions 
(via user code) we document or perhaps give the user the option to sort.

IMO a fix in spark core for repartition should work for most (if not all) 
order dependent closures - we might choose not to implement for others due to 
time constraints; but basic idea should be fairly similar.
Given this, I am fine with documenting the potential issue for others and 
fix for a core subset - with assumption that we will expand solution to cover 
all later.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22112: [SPARK-23243][Core] Fix RDD.repartition() data correctne...

2018-08-16 Thread mridulm
Github user mridulm commented on the issue:

https://github.com/apache/spark/pull/22112
  
I agree @tgravescs, I was looking at the implementation to understand what 
the expectations are wrt newly introduced methods/fields and whether they make 
sense : I did not see any details furnished.
I don’t think we can hack our way out of this.

I would expect a solution for repartition to also be applicable to other 
order dependent closures as well - though we might choose to fix them later, 
the basic approach ideally should be transferable.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22112: [WIP][SPARK-23243][Core] Fix RDD.repartition() data corr...

2018-08-16 Thread mridulm
Github user mridulm commented on the issue:

https://github.com/apache/spark/pull/22112
  
You are perfectly correct @jiangxb1987, that was a silly mistake on my part 
- and not trivial at all !
It should be shuffle dependency we should rely on when traversing the 
dependency tree, not shuffledrdd.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22112: [WIP][SPARK-23243][Core] Fix RDD.repartition() data corr...

2018-08-16 Thread mridulm
Github user mridulm commented on the issue:

https://github.com/apache/spark/pull/22112
  
I am not sure what the definition of `isIdempotent` here is.

For example, from MapPartitionsRDD :
```
override private[spark] def isIdempotent = {
if (inputOrderSensitive) {
  prev.isIdempotent
} else {
  true
}
  }
```

Consider:
`val rdd1 = rdd.groupBy().map(...).repartition(...).filter(...)`.
By definition above, this would make rdd1 idempotent.
Depending on what the definition of idempotent is (partition level, record 
level, etc) - this can be correct or wrong code.


Similarly, I am not sure why idempotency or ordering is depending on 
`Partitioner`.
IMO we should traverse the dependency graph and rely on how `ShuffledRDD` 
is configured - whether there is a key ordering specified (applies to both 
global sort and per partition sort), whether it is from a checkpoint or marked 
for checkpoint, whether it is from a stable input source, etc.






---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22101: [SPARK-25114][Core] Fix RecordBinaryComparator when subt...

2018-08-14 Thread mridulm
Github user mridulm commented on the issue:

https://github.com/apache/spark/pull/22101
  
LGTM pending Xiao Li's excellent suggestion :-)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21698: [SPARK-23243][Core] Fix RDD.repartition() data correctne...

2018-08-14 Thread mridulm
Github user mridulm commented on the issue:

https://github.com/apache/spark/pull/21698
  
>  I guess on the RDD side its not called RoundRobinPartitioner 
Thanks for clarifying @tgravescs ! I was looking at `RangePartitioner` and 
variants and was wondering what I was missing - did not make the obvious 
connection with sql :-)

> If we can't come up with another solution, I would actually be ok with 
failing short term, its better then corruption

If I understand correctly, the proposal is 
* In `ShuffledRDD`, add a flag `orderSensitiveReducer` (?) - to track 
specific patterns identified which is order sensitive (repartition with shuffle 
= true, zip, etc).
* If a task is getting re-executed as part of stage re-execution, if the 
flag is true, fail job.
  * Task re-execution as part of same stage, speculative execution, etc 
should not be an issue - since only one task completes.
  * ResultStage should not be affected.
  * I am unsure about how cache'ing data interacts here - might need some 
investigation.

This looks like a reasonable stop gap until we fix the issue.

It also allows for users to make progress by inserting a checkpoint before 
the order sensitive closure to unblock them.





---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21698: [SPARK-23243][Core] Fix RDD.repartition() data correctne...

2018-08-14 Thread mridulm
Github user mridulm commented on the issue:

https://github.com/apache/spark/pull/21698
  
@cloud-fan I think we have to be clear on the boundaries of the solution we 
can provide in spark.

> RDD#mapPartitions and its friends can take arbitrary user functions, 
which may produce random result

As stated above, this is something we do not support in spark.
Spark, MR, etc assume that computation is idempotent - we do not support 
non determinism in computation : but computation could be sensitive to input 
order. For a given input partition (iterator of tuples), the closure must 
always generate the same output partition (iterator of tuples).

Which is why 'randomness' (or rather pseudo-randomness) is seeded based 
using invariants like partition id which result in same output partition on 
task re-execution.

The problem we have here is : even if user code satisfies this constraint, 
due to non determinism in input order, the output changes when closure is order 
sensitive.

Given this, analyzing the statement below :

> Step 3 is problematic: assuming we have 5 map tasks and 5 reduce tasks, 
and the input data is random. Let's say reduce task 1,2,3,4 are finished, 
reduce task 5 failed with FetchFailed, and Spark rerun map task 3 and 4. Map 
task 3 and 4 reprocess the random data and create diffrent shuffle blocks for 
reduce task 3, 4, 5. So not only reduce task 5 needs to rerun, reduce task 3, 
4, 5 all need to rerun, because their input data changed.

Here - map task 3 and 4 will always produce the same output partition for 
supported closures - if the input partition remains same.
When reading off checkpoint's, immutable hdfs files, etc - this invariant 
is satisfied.
With @squito's suggestion implemented, this invariant will be satisfied for 
shuffle input as well.

With deterministic input partition - we can see that output of map task 3 
and 4 will always be the same - and reduce task input's for 3/4/5 will be the 
same. So only reduce task 5 will need to be rerun and none of the other input's 
will change.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22101: [SPARK-23207][Core][FOLLOWUP] Fix RecordBinaryCom...

2018-08-14 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22101#discussion_r209862610
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/execution/RecordBinaryComparator.java
 ---
@@ -42,16 +42,16 @@ public int compare(
   while ((leftOff + i) % 8 != 0 && i < leftLen) {
 res = (Platform.getByte(leftObj, leftOff + i) & 0xff) -
 (Platform.getByte(rightObj, rightOff + i) & 0xff);
-if (res != 0) return res;
+if (res != 0) return (int) res;
 i += 1;
   }
 }
 // for architectures that support unaligned accesses, chew it up 8 
bytes at a time
 if (Platform.unaligned() || (((leftOff + i) % 8 == 0) && ((rightOff + 
i) % 8 == 0))) {
   while (i <= leftLen - 8) {
-res = (int) ((Platform.getLong(leftObj, leftOff + i) -
-Platform.getLong(rightObj, rightOff + i)) % 
Integer.MAX_VALUE);
-if (res != 0) return res;
+res = Platform.getLong(leftObj, leftOff + i) -
+Platform.getLong(rightObj, rightOff + i);
+if (res != 0) return res > 0 ? 1 : -1;
--- End diff --

The subtraction is buggy due to potential overflow.
Why not simply use:
```
  final long v1 = Platform.getLong(leftObj, leftOff + i);
  final long v2 = Platform.getLong(rightObj, rightOff + i);
  if (v1 != v2) {
return v1 > v2 ? -1 : 1;
  }
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22101: [SPARK-23207][Core][FOLLOWUP] Fix RecordBinaryCom...

2018-08-14 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22101#discussion_r209860397
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/execution/RecordBinaryComparator.java
 ---
@@ -42,16 +42,16 @@ public int compare(
   while ((leftOff + i) % 8 != 0 && i < leftLen) {
 res = (Platform.getByte(leftObj, leftOff + i) & 0xff) -
 (Platform.getByte(rightObj, rightOff + i) & 0xff);
-if (res != 0) return res;
+if (res != 0) return (int) res;
--- End diff --

We can restrict scope of 'res' as an 'int' : and avoid the type 
promotions/conversions.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21698: [SPARK-23243][Core] Fix RDD.repartition() data correctne...

2018-08-13 Thread mridulm
Github user mridulm commented on the issue:

https://github.com/apache/spark/pull/21698
  
@squito @tgravescs I am probably missing something about why hash 
partitioner helps, can you please clarify ?
IIRC the partitioner for CoalescedRDD when shuffle is enabled is 
HashPartitioner ... the issue is the `distributePartition` before the shuffle 
which is order sensitive but is not deterministic since its input is not 
deterministic if it is derived from one or more shuffle output's.

Btw, when shuffle = false, it does not suffer from the problem - mentally I 
had assumed that had an issue too - on a recheck now, I find it interesting 
that it does not (I never used that, so had never checked in detail !)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21698: [SPARK-23243][Core] Fix RDD.repartition() data correctne...

2018-08-13 Thread mridulm
Github user mridulm commented on the issue:

https://github.com/apache/spark/pull/21698
  
@tgravescs I vaguely remember someone at y! labs telling me (more than a 
decade back) about MR always doing a sort as part of its shuffle to avoid a 
variant of this problem by design.
Essentially it boils down to Imran's suggestion even for arbitrary byte 
writable's [1], [2] ... 

[1] 
https://hadoop.apache.org/docs/r0.23.11/api/src-html/org/apache/hadoop/io/BytesWritable.html
[2] 
https://hadoop.apache.org/docs/r0.23.11/api/src-html/org/apache/hadoop/io/WritableComparator.html#line.154


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22079: [SPARK-23207][SQL][BACKPORT-2.2] Shuffle+Repartit...

2018-08-13 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22079#discussion_r209705612
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/execution/RecordBinaryComparator.java
 ---
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution;
+
+import org.apache.spark.unsafe.Platform;
+import org.apache.spark.util.collection.unsafe.sort.RecordComparator;
+
+public final class RecordBinaryComparator extends RecordComparator {
+
+  // TODO(jiangxb) Add test suite for this.
+  @Override
+  public int compare(
+  Object leftObj, long leftOff, int leftLen, Object rightObj, long 
rightOff, int rightLen) {
+int i = 0;
+int res = 0;
+
+// If the arrays have different length, the longer one is larger.
+if (leftLen != rightLen) {
+  return leftLen - rightLen;
+}
+
+// The following logic uses `leftLen` as the length for both `leftObj` 
and `rightObj`, since
+// we have guaranteed `leftLen` == `rightLen`.
+
+// check if stars align and we can get both offsets to be aligned
+if ((leftOff % 8) == (rightOff % 8)) {
+  while ((leftOff + i) % 8 != 0 && i < leftLen) {
+res = (Platform.getByte(leftObj, leftOff + i) & 0xff) -
+(Platform.getByte(rightObj, rightOff + i) & 0xff);
+if (res != 0) return res;
+i += 1;
+  }
+}
+// for architectures that support unaligned accesses, chew it up 8 
bytes at a time
+if (Platform.unaligned() || (((leftOff + i) % 8 == 0) && ((rightOff + 
i) % 8 == 0))) {
+  while (i <= leftLen - 8) {
+res = (int) ((Platform.getLong(leftObj, leftOff + i) -
+Platform.getLong(rightObj, rightOff + i)) % 
Integer.MAX_VALUE);
+if (res != 0) return res;
--- End diff --

It is possible for two objects to be unequal and yet we consider them as 
equal with this code, if the long values are separated by Int.MaxValue.
Any particular reason why this code is written like this with the 
inaccuracy and not simply the idiomatic comparator comparison ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21895: [SPARK-24948][SHS] Delegate check access permissions to ...

2018-08-06 Thread mridulm
Github user mridulm commented on the issue:

https://github.com/apache/spark/pull/21895
  
I merged to master, thanks for the work @mgaido91 !



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21895: [SPARK-24948][SHS] Delegate check access permissi...

2018-08-03 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/21895#discussion_r207641341
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -779,6 +808,8 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 listing.delete(classOf[LogInfo], log.logPath)
   }
 }
+// Clean the blacklist from the expired entries.
+clearBlacklist(CLEAN_INTERVAL_S)
--- End diff --

I misread it as MAX_LOG_AGE_S ... CLEAN_INTERVAL_S should be fine here, you 
are right.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21895: [SPARK-24948][SHS] Delegate check access permissi...

2018-08-03 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/21895#discussion_r207493280
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -161,6 +162,29 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 new HistoryServerDiskManager(conf, path, listing, clock)
   }
 
+  private val blacklist = new ConcurrentHashMap[String, Long]
+
+  // Visible for testing
+  private[history] def isBlacklisted(path: Path): Boolean = {
+blacklist.containsKey(path.getName)
+  }
+
+  private def blacklist(path: Path): Unit = {
+blacklist.put(path.getName, clock.getTimeMillis())
+  }
+
+  /**
+   * Removes expired entries in the blacklist, according to the provided 
`expireTimeInSeconds`.
+   */
+  private def clearBlacklist(expireTimeInSeconds: Long): Unit = {
+val expiredThreshold = clock.getTimeMillis() - expireTimeInSeconds * 
1000
+val expired = new mutable.ArrayBuffer[String]
+blacklist.asScala.foreach {
+  case (path, creationTime) if creationTime < expiredThreshold => 
expired += path
+}
+expired.foreach(blacklist.remove(_))
--- End diff --

Instead of this, why not simply:
```
blacklist.asScala.retain((_, creationTime) => creationTime >= 
expiredThreshold)
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21895: [SPARK-24948][SHS] Delegate check access permissi...

2018-08-03 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/21895#discussion_r207493081
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -779,6 +808,8 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 listing.delete(classOf[LogInfo], log.logPath)
   }
 }
+// Clean the blacklist from the expired entries.
+clearBlacklist(CLEAN_INTERVAL_S)
--- End diff --

My only concern is that, if there happens to be a transient acl issue when 
initially accessing the file, we will never see it in the application list even 
when acl is fixed : without a SHS restart.
Wondering if the clean interval here could be fraction of CLEAN_INTERVAL_S 
- so that these files have a chance of making it to app list : without much of 
an overhead on NN.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21895: [SPARK-24948][SHS] Delegate check access permissions to ...

2018-07-28 Thread mridulm
Github user mridulm commented on the issue:

https://github.com/apache/spark/pull/21895
  
+CC @jerryshao 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21895: [SPARK-24948][SHS] Delegate check access permissi...

2018-07-28 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/21895#discussion_r205948923
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -973,6 +973,38 @@ private[history] object FsHistoryProvider {
   private[history] val CURRENT_LISTING_VERSION = 1L
 }
 
+private[history] trait CachedFileSystemHelper extends Logging {
+  protected def fs: FileSystem
+
+  /**
+   * Cache containing the result for the already checked files.
+   */
+  // Visible for testing.
+  private[history] val cache = new mutable.HashMap[String, Boolean]
--- End diff --

For long running history server in busy clusters (particularly where 
`spark.history.fs.cleaner.maxAge` is configured to be low), this Map will cause 
OOM.
Either an LRU cache or a disk backed map with periodic cleanup (based on 
maxAge) might be better ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21653: [SPARK-13343] speculative tasks that didn't commi...

2018-07-20 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/21653#discussion_r204199580
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -723,6 +723,21 @@ private[spark] class TaskSetManager(
   def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = 
{
 val info = taskInfos(tid)
 val index = info.index
+// Check if any other attempt succeeded before this and this attempt 
has not been handled
+if (successful(index) && killedByOtherAttempt.contains(tid)) {
+  calculatedTasks -= 1
+
+  val resultSizeAcc = result.accumUpdates.find(a =>
+a.name == Some(InternalAccumulator.RESULT_SIZE))
+  if (resultSizeAcc.isDefined) {
+totalResultSize -= 
resultSizeAcc.get.asInstanceOf[LongAccumulator].value
--- End diff --

I agree, I dont see a better option.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21589: [SPARK-24591][CORE] Number of cores and executors in the...

2018-07-18 Thread mridulm
Github user mridulm commented on the issue:

https://github.com/apache/spark/pull/21589
  
@MaxGekk We are going in circles.
I dont think this is a good api to expose currently - the data is available 
through multiple other means as I detailed and while not a succinct oneliner, 
it is useable.
Not to mention @markhamstra's comment.
Unless there is some other compelling reason for introducing this which I 
have missed; I am -1 on introducing this change.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21589: [SPARK-24591][CORE] Number of cores and executors in the...

2018-07-18 Thread mridulm
Github user mridulm commented on the issue:

https://github.com/apache/spark/pull/21589
  
@MaxGekk The example you cites is literally one of a handful of usages 
which is not easily overridden - and is prefixed with a 'HACK ALERT' ! A few 
others are in mllib, typically for reading schema.

I will reiterate the solutions available to users currently:
* Rely on `defaultParallelism` - this gives the expected result, unless 
explicitly overridden by user.
* If you need fine grained information about executors, use spark listener 
(it is trivial to keep a count with `onExecutorAdded`/`onExecutorRemoved`).
* If you simply want a current value without own listener - use REST api to 
query for current executors.

Having said this, I will caution against this approach if you are concerned 
about performance. `defaultParallelism` exists to give a default when user does 
not explicitly override when creating an `RDD` : and reflects the current 
number of executors.
Particularly when dynamic resource allocation is enabled, this value is not 
optimal : spark will acquire or release resources based on pending tasks.

Using available cluster resources (from cluster manager - not spark) as a 
way to model parallelism would be a better approach : externalize your config's 
and populate based on resources available to application (in your example: 
difference between test/staging/production).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-07-18 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r203489913
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala ---
@@ -160,11 +160,29 @@ case class 
SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo) extends
  * Periodic updates from executors.
  * @param execId executor id
  * @param accumUpdates sequence of (taskId, stageId, stageAttemptId, 
accumUpdates)
+ * @param executorUpdates executor level metrics updates
  */
 @DeveloperApi
 case class SparkListenerExecutorMetricsUpdate(
 execId: String,
-accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])])
+accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])],
+executorUpdates: Option[Array[Long]] = None)
+  extends SparkListenerEvent
+
+/**
+ * Peak metric values for the executor for the stage, written to the 
history log at stage
+ * completion.
+ * @param execId executor id
+ * @param stageId stage id
+ * @param stageAttemptId stage attempt
+ * @param executorMetrics executor level metrics, indexed by 
MetricGetter.values
+ */
+@DeveloperApi
+case class SparkListenerStageExecutorMetrics(
+execId: String,
+stageId: Int,
+stageAttemptId: Int,
+executorMetrics: Array[Long])
--- End diff --

I am completely sold on the idea of enum-like.
My main concern was around avoiding `MatchError`'s in scala and the other 
potential failures you elaborated on above.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21589: [SPARK-24591][CORE] Number of cores and executors in the...

2018-07-18 Thread mridulm
Github user mridulm commented on the issue:

https://github.com/apache/spark/pull/21589
  
+CC @markhamstra since you were looking at API stability.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21589: [SPARK-24591][CORE] Number of cores and executors in the...

2018-07-18 Thread mridulm
Github user mridulm commented on the issue:

https://github.com/apache/spark/pull/21589
  

I am not convinced by the rationale given for adding the new api's in the 
jira.
The examples given there can be easily modeled using `defaultParallelism` 
(to get current state) and executor events (to get numCores, memory per 
executor).
For example: `df.repartition(5 * sc.defaultParallelism)`

The other argument seems to be that users can override this value and set 
it to a static constant.
User's are not expected to override it unless they want fine grained 
control over the value and spark is expected to honor it when specified.

One thing to be kept in mind is that dynamic resource allocation will kick 
in after tasks are submitted (when there are insufficient resources available) 
- so trying to fine tune this for an application, in presence of DRA, uses 
these api's is not going to be effective anyway.

If there are corner cases where `defaultParallelism` is not accurate, we 
should fix those to reflect the current value.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function

2018-07-18 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/21102#discussion_r203322643
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala ---
@@ -163,7 +187,7 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag](
* to a new position (in the new data array).
*/
   def rehashIfNeeded(k: T, allocateFunc: (Int) => Unit, moveFunc: (Int, 
Int) => Unit) {
-if (_size > _growThreshold) {
+if (_occupied > _growThreshold) {
--- End diff --

For accuracy sake - my example snippet above will fail much earlier - due 
to OpenHashSet. MAX_CAPACITY. Though that is probably not the point anyway :-)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function

2018-07-18 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/21102#discussion_r203322056
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala ---
@@ -163,7 +187,7 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag](
* to a new position (in the new data array).
*/
   def rehashIfNeeded(k: T, allocateFunc: (Int) => Unit, moveFunc: (Int, 
Int) => Unit) {
-if (_size > _growThreshold) {
+if (_occupied > _growThreshold) {
--- End diff --

There is no explicitly entry here - it is simply unoccupied slots in an 
array.
The slot is free, it can be used by some other (new) entry when insert is 
called.

It must be trivial to see how very bad behavior can happen with actual size 
of set being very small - with a series of add/remove's : resulting in unending 
growth of the set.

something like this, for example, is enough to cause set to blow to 2B 
entries:
```
var i = 0
while (i < Int.MaxValue) {
  set.add(1)
  set.remove(1)
  assert (0 == set.size)
  i += 1
}
```



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21758: [SPARK-24795][CORE] Implement barrier execution mode

2018-07-18 Thread mridulm
Github user mridulm commented on the issue:

https://github.com/apache/spark/pull/21758
  
I had left a few comments on SPARK-24375 @jiangxb1987 ... unfortunately the 
jira's have moved around a bit.
If this is active PR for introducing the feature, would be great to get 
clarity on them.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-07-18 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r203319952
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala ---
@@ -160,11 +160,29 @@ case class 
SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo) extends
  * Periodic updates from executors.
  * @param execId executor id
  * @param accumUpdates sequence of (taskId, stageId, stageAttemptId, 
accumUpdates)
+ * @param executorUpdates executor level metrics updates
  */
 @DeveloperApi
 case class SparkListenerExecutorMetricsUpdate(
 execId: String,
-accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])])
+accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])],
+executorUpdates: Option[Array[Long]] = None)
+  extends SparkListenerEvent
+
+/**
+ * Peak metric values for the executor for the stage, written to the 
history log at stage
+ * completion.
+ * @param execId executor id
+ * @param stageId stage id
+ * @param stageAttemptId stage attempt
+ * @param executorMetrics executor level metrics, indexed by 
MetricGetter.values
+ */
+@DeveloperApi
+case class SparkListenerStageExecutorMetrics(
+execId: String,
+stageId: Int,
+stageAttemptId: Int,
+executorMetrics: Array[Long])
--- End diff --

+1 on enum's @squito !
The only concern would be evolving the enum's in a later release - changing 
enum could result in source incompatibility.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21729: [SPARK-24755][Core] Executor loss can cause task to not ...

2018-07-18 Thread mridulm
Github user mridulm commented on the issue:

https://github.com/apache/spark/pull/21729
  
Looks good to me, thanks for fixing this @hthuynh2 !


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21589: [SPARK-24591][CORE] Number of cores and executors in the...

2018-07-18 Thread mridulm
Github user mridulm commented on the issue:

https://github.com/apache/spark/pull/21589
  
I am not seeing the utility of these two methods.
`defaultParallelism` already captures the current number of cores.

For monitoring usecases, existing events fired via listener can be used to 
keep track of current executor population (if that is the intended usecase).

Given that this is duplicating information already exposed, I am not very 
keen on adding additional api.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function

2018-07-18 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/21102#discussion_r203311755
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala ---
@@ -85,9 +85,13 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag](
   protected var _capacity = nextPowerOf2(initialCapacity)
   protected var _mask = _capacity - 1
   protected var _size = 0
+  protected var _occupied = 0
   protected var _growThreshold = (loadFactor * _capacity).toInt
+  def g: Int = _growThreshold
+  def o: Int = _occupied
 
   protected var _bitset = new BitSet(_capacity)
+  protected var _bitsetDeleted: BitSet = null
--- End diff --

Why protected ? Make it private instead.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function

2018-07-18 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/21102#discussion_r203314045
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala ---
@@ -163,7 +187,7 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag](
* to a new position (in the new data array).
*/
   def rehashIfNeeded(k: T, allocateFunc: (Int) => Unit, moveFunc: (Int, 
Int) => Unit) {
-if (_size > _growThreshold) {
+if (_occupied > _growThreshold) {
--- End diff --

I dont see any value in _occupied - on contrary it can cause very bad 
behavior if there is a lot of remove's expected.
`_size` is a better metric to decide to rehash and grow.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function

2018-07-18 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/21102#discussion_r203311109
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala ---
@@ -85,9 +85,13 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag](
   protected var _capacity = nextPowerOf2(initialCapacity)
   protected var _mask = _capacity - 1
   protected var _size = 0
+  protected var _occupied = 0
   protected var _growThreshold = (loadFactor * _capacity).toInt
+  def g: Int = _growThreshold
+  def o: Int = _occupied
--- End diff --

Also, please use more descriptive and comprehensible names


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   4   5   6   7   8   9   10   >