[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...

2016-12-03 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/11105#discussion_r90755993
  
--- Diff: 
core/src/test/scala/org/apache/spark/DataPropertyAccumulatorSuite.scala ---
@@ -0,0 +1,395 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.ref.WeakReference
+
+import org.scalatest.Matchers
+
+import org.apache.spark.scheduler._
+import org.apache.spark.util.{AccumulatorContext, AccumulatorMetadata, 
AccumulatorV2, LongAccumulator}
+
+
+class DataPropertyAccumulatorSuite extends SparkFunSuite with Matchers 
with LocalSparkContext {
--- End diff --

That sounds like a good plan, I'll try and give the tests some more 
descriptive names (or where that isn't enough explain in comments some more 
about the functionality they are testing).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...

2016-11-04 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/11105#discussion_r86493902
  
--- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala ---
@@ -136,15 +181,92 @@ abstract class AccumulatorV2[IN, OUT] extends 
Serializable {
   def reset(): Unit
 
   /**
+   * Takes the inputs and accumulates. e.g. it can be a simple `+=` for 
counter accumulator.
+   * Developers should extend addImpl to customize the adding 
functionality.
+   */
+  final def add(v: IN): Unit = {
--- End diff --

Or maybe just DataAccumulator since V1 - V2 was a big change and this 
really isn't all that big of a change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...

2016-11-04 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/11105#discussion_r86493318
  
--- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala ---
@@ -136,15 +181,92 @@ abstract class AccumulatorV2[IN, OUT] extends 
Serializable {
   def reset(): Unit
 
   /**
+   * Takes the inputs and accumulates. e.g. it can be a simple `+=` for 
counter accumulator.
+   * Developers should extend addImpl to customize the adding 
functionality.
+   */
+  final def add(v: IN): Unit = {
--- End diff --

drat :( Do we want to do AccumulatorV3 and deprecate AccumulatorV2's 
constructor?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...

2016-11-04 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/11105#discussion_r86492811
  
--- Diff: 
core/src/test/scala/org/apache/spark/DataPropertyAccumulatorSuite.scala ---
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.ref.WeakReference
+
+import org.scalatest.Matchers
+
+import org.apache.spark.scheduler._
+import org.apache.spark.util.{AccumulatorContext, AccumulatorMetadata, 
AccumulatorV2, LongAccumulator}
+
+
+class DataPropertyAccumulatorSuite extends SparkFunSuite with Matchers 
with LocalSparkContext {
+  test("two partition old and new") {
+sc = new SparkContext("local[2]", "test")
+val acc = sc.longAccumulator(dataProperty = true, "l2")
+
+val a = sc.parallelize(1 to 20, 2)
+val b = a.map{x => acc.add(x); x}
+b.cache()
+b.count()
+acc.value should be (210)
+  }
+
+  test("single partition") {
+sc = new SparkContext("local[2]", "test")
+val acc = sc.longAccumulator(dataProperty = true)
+
+val a = sc.parallelize(1 to 20, 1)
+val b = a.map{x => acc.add(x); x}
+b.cache()
+b.count()
+acc.value should be (210)
+  }
+
+  test("adding only the first element per partition should work even if 
partition is empty") {
+sc = new SparkContext("local[2]", "test")
+val acc = sc.longAccumulator(dataProperty = true)
+val a = sc.parallelize(1 to 20, 30)
+val b = a.mapPartitions{itr =>
+  acc.add(1)
+  itr
+}
+b.count()
+acc.value should be (30)
+  }
+
+  test("shuffled (combineByKey)") {
+sc = new SparkContext("local[2]", "test")
+val a = sc.parallelize(1L to 40L, 5)
+val buckets = 4
+val b = a.map{x => ((x % buckets), x)}
+val inputs = List(b, b.repartition(10), b.partitionBy(new 
HashPartitioner(5))).map(_.cache())
+val mapSideCombines = List(true, false)
+inputs.foreach { input =>
+  mapSideCombines.foreach { mapSideCombine =>
+val accs = (1 to 4).map(x => sc.longAccumulator(dataProperty = 
true)).toList
+val raccs = (1 to 4).map(x => sc.longAccumulator(dataProperty = 
false)).toList
+val List(acc, acc1, acc2, acc3) = accs
+val List(racc, racc1, racc2, racc3) = raccs
+val c = input.combineByKey(
+  (x: Long) => {acc1.add(1); acc.add(1); racc1.add(1); 
racc.add(1); x},
+  {(a: Long, b: Long) => acc2.add(1); acc.add(1); racc2.add(1); 
racc.add(1); (a + b)},
+  {(a: Long, b: Long) => acc3.add(1); acc.add(1); racc3.add(1); 
racc.add(1); (a + b)},
+  new HashPartitioner(10),
+  mapSideCombine)
+val d = input.combineByKey(
+  (x: Long) => {acc1.add(1); acc.add(1); x},
+  {(a: Long, b: Long) => acc2.add(1); acc.add(1); (a + b)},
+  {(a: Long, b: Long) => acc3.add(1); acc.add(1); (a + b)},
+  new HashPartitioner(2),
+  mapSideCombine)
+val e = d.map{x => acc.add(1); x}
--- End diff --

Well there are two ways it could fail, `e.count()` look like duplicates 
from d - or if the `insertAll` wasn't happening the updates from the read side 
of `d` would look like they were coming from `e` and we would get double 
counting.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...

2016-11-03 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/11105#discussion_r86491369
  
--- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala ---
@@ -136,15 +181,92 @@ abstract class AccumulatorV2[IN, OUT] extends 
Serializable {
   def reset(): Unit
 
   /**
+   * Takes the inputs and accumulates. e.g. it can be a simple `+=` for 
counter accumulator.
+   * Developers should extend addImpl to customize the adding 
functionality.
+   */
+  final def add(v: IN): Unit = {
+if (metadata != null && metadata.dataProperty) {
+  dataPropertyAdd(v)
+} else {
+  addImpl(v)
+}
+  }
+
+  private def dataPropertyAdd(v: IN): Unit = {
+// To allow the user to be able to access the current accumulated 
value from their process
+// worker side then we need to perform a "normal" add as well as the 
data property add.
+addImpl(v)
+// Add to the pending updates for data property
+val updateInfo = TaskContext.get().getRDDPartitionInfo()
+val base = pending.getOrElse(updateInfo, copyAndReset())
+// Since we may have constructed a new accumulator, set atDriverSide 
to false as the default
+// new accumulators will have atDriverSide equal to true.
+base.atDriverSide = false
+base.addImpl(v)
+pending(updateInfo) = base
+  }
+
+  /**
+   * Mark a specific rdd/shuffle/partition as completely processed. This 
is a noop for
+   * non-data property accumuables.
+   */
+  private[spark] def markFullyProcessed(taskOutputId: TaskOutputId): Unit 
= {
+if (metadata.dataProperty) {
+  completed += taskOutputId
+}
+  }
+
+  /**
+   * Takes the inputs and accumulates. e.g. it can be a simple `+=` for 
counter accumulator.
+   * Developers should extend addImpl to customize the adding 
functionality.
* Takes the inputs and accumulates.
*/
-  def add(v: IN): Unit
+  protected[spark] def addImpl(v: IN)
+
+  /**
+   * Merges another same-type accumulator into this one and update its 
state, i.e. this should be
+   * merge-in-place. Developers should extend mergeImpl to customize the 
merge functionality.
+   */
+  final private[spark] lazy val merge: (AccumulatorV2[IN, OUT] => Unit) = {
+assert(isAtDriverSide)
+// Handle data property accumulators
+if (metadata != null && metadata.dataProperty) {
+  dataPropertyMerge _
+} else {
+  mergeImpl _
+}
+  }
+
+  final private[spark] def dataPropertyMerge(other: AccumulatorV2[IN, 
OUT]) = {
+// Apply all foreach partitions regardless - they can only be fully 
evaluated
+val unprocessed = other.pending.filter{
+  case (ForeachOutputId(), v) => mergeImpl(v); false
--- End diff --

Yes they would.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...

2016-11-03 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/11105#discussion_r86490667
  
--- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala ---
@@ -42,18 +60,45 @@ private[spark] case class AccumulatorMetadata(
  * `OUT` should be a type that can be read atomically (e.g., Int, Long), 
or thread-safely
  * (e.g., synchronized collections) because it will be read from other 
threads.
  */
-abstract class AccumulatorV2[IN, OUT] extends Serializable {
+abstract class AccumulatorV2[@specialized(Int, Long, Double) IN, OUT] 
extends Serializable {
   private[spark] var metadata: AccumulatorMetadata = _
-  private[this] var atDriverSide = true
+  private[spark] var atDriverSide = true
+
+  /**
+   * The following values are used for data property [[AccumulatorV2]]s.
+   * Data property [[AccumulatorV2]]s have only-once semantics. These 
semantics are implemented
+   * by keeping track of which RDD id, shuffle id, and partition id the 
current function is
+   * processing in. If a partition is fully processed the results for that 
partition/shuffle/rdd
+   * combination are sent back to the driver. The driver keeps track of 
which rdd/shuffle/partitions
+   * already have been applied, and only combines values into value_ if 
the rdd/shuffle/partition
+   * has not already been aggregated on the driver program
+   */
+  // For data property accumulators pending and processed updates.
+  // Pending and processed are keyed by (rdd id, shuffle id, partition id)
+  private[spark] lazy val pending =
+new mutable.HashMap[TaskOutputId, AccumulatorV2[IN, OUT]]()
+  // Completed contains the set of (rdd id, shuffle id, partition id) that 
have been
+  // fully processed on the worker side. This is used to determine if the 
updates should
+  // be merged on the driver for a particular rdd/shuffle/partition 
combination.
+  private[spark] lazy val completed = new mutable.HashSet[TaskOutputId]()
+  // rddProcessed is keyed by rdd id and the value is a bitset containing 
all partitions
+  // for the given key which have been merged into the value. This is used 
on the driver.
+  @transient private[spark] lazy val rddProcessed = new 
mutable.HashMap[Int, mutable.BitSet]()
+  // shuffleProcessed is the same as rddProcessed except keyed by shuffle 
id.
+  @transient private[spark] lazy val shuffleProcessed = new 
mutable.HashMap[Int, mutable.BitSet]()
--- End diff --

`completedTaskOutputsForOneTask` & `pendingAccumulatorUpdatesFromOneTask` 
seem like really long variable names - how about `completedOutputsForTask` and 
`pendingAccumulatorUpdatesForTask`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...

2016-11-03 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/11105#discussion_r86490436
  
--- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala ---
@@ -42,18 +60,45 @@ private[spark] case class AccumulatorMetadata(
  * `OUT` should be a type that can be read atomically (e.g., Int, Long), 
or thread-safely
  * (e.g., synchronized collections) because it will be read from other 
threads.
  */
-abstract class AccumulatorV2[IN, OUT] extends Serializable {
+abstract class AccumulatorV2[@specialized(Int, Long, Double) IN, OUT] 
extends Serializable {
   private[spark] var metadata: AccumulatorMetadata = _
-  private[this] var atDriverSide = true
+  private[spark] var atDriverSide = true
+
+  /**
+   * The following values are used for data property [[AccumulatorV2]]s.
+   * Data property [[AccumulatorV2]]s have only-once semantics. These 
semantics are implemented
+   * by keeping track of which RDD id, shuffle id, and partition id the 
current function is
+   * processing in. If a partition is fully processed the results for that 
partition/shuffle/rdd
+   * combination are sent back to the driver. The driver keeps track of 
which rdd/shuffle/partitions
+   * already have been applied, and only combines values into value_ if 
the rdd/shuffle/partition
+   * has not already been aggregated on the driver program
+   */
+  // For data property accumulators pending and processed updates.
+  // Pending and processed are keyed by (rdd id, shuffle id, partition id)
+  private[spark] lazy val pending =
+new mutable.HashMap[TaskOutputId, AccumulatorV2[IN, OUT]]()
+  // Completed contains the set of (rdd id, shuffle id, partition id) that 
have been
+  // fully processed on the worker side. This is used to determine if the 
updates should
+  // be merged on the driver for a particular rdd/shuffle/partition 
combination.
+  private[spark] lazy val completed = new mutable.HashSet[TaskOutputId]()
+  // rddProcessed is keyed by rdd id and the value is a bitset containing 
all partitions
+  // for the given key which have been merged into the value. This is used 
on the driver.
+  @transient private[spark] lazy val rddProcessed = new 
mutable.HashMap[Int, mutable.BitSet]()
+  // shuffleProcessed is the same as rddProcessed except keyed by shuffle 
id.
+  @transient private[spark] lazy val shuffleProcessed = new 
mutable.HashMap[Int, mutable.BitSet]()
--- End diff --

That understanding seems correct, indeed the merge function now asserts 
that it is at driver side (from the last time this comment was posted).

As an aside I don't know what GitHub is doing/changing with the review 
features but it seems to be really weird on this PR for some reason.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...

2016-11-03 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/11105#discussion_r86490194
  
--- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala ---
@@ -136,15 +181,92 @@ abstract class AccumulatorV2[IN, OUT] extends 
Serializable {
   def reset(): Unit
 
   /**
+   * Takes the inputs and accumulates. e.g. it can be a simple `+=` for 
counter accumulator.
+   * Developers should extend addImpl to customize the adding 
functionality.
+   */
+  final def add(v: IN): Unit = {
--- End diff --

Yah there isn't a way to do this for everyone without breaking binary 
compatibility. Given that the AccumulatorV2 API was added in Spark2.0 though my 
only argument is that I don't expect anyone would be using this yet (given the 
current usability of accumulators) and maybe it would be ok if we mention it in 
the release notes?

But if you think the best path forward is subclassing that makes sense too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...

2016-11-03 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/11105#discussion_r86489315
  
--- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala ---
@@ -136,15 +181,92 @@ abstract class AccumulatorV2[IN, OUT] extends 
Serializable {
   def reset(): Unit
 
   /**
+   * Takes the inputs and accumulates. e.g. it can be a simple `+=` for 
counter accumulator.
+   * Developers should extend addImpl to customize the adding 
functionality.
+   */
+  final def add(v: IN): Unit = {
+if (metadata != null && metadata.dataProperty) {
+  dataPropertyAdd(v)
+} else {
+  addImpl(v)
+}
+  }
+
+  private def dataPropertyAdd(v: IN): Unit = {
+// To allow the user to be able to access the current accumulated 
value from their process
+// worker side then we need to perform a "normal" add as well as the 
data property add.
+addImpl(v)
+// Add to the pending updates for data property
+val updateInfo = TaskContext.get().getRDDPartitionInfo()
+val base = pending.getOrElse(updateInfo, copyAndReset())
+// Since we may have constructed a new accumulator, set atDriverSide 
to false as the default
+// new accumulators will have atDriverSide equal to true.
+base.atDriverSide = false
+base.addImpl(v)
+pending(updateInfo) = base
+  }
+
+  /**
+   * Mark a specific rdd/shuffle/partition as completely processed. This 
is a noop for
+   * non-data property accumuables.
+   */
+  private[spark] def markFullyProcessed(taskOutputId: TaskOutputId): Unit 
= {
+if (metadata.dataProperty) {
+  completed += taskOutputId
+}
+  }
+
+  /**
+   * Takes the inputs and accumulates. e.g. it can be a simple `+=` for 
counter accumulator.
+   * Developers should extend addImpl to customize the adding 
functionality.
* Takes the inputs and accumulates.
*/
-  def add(v: IN): Unit
+  protected[spark] def addImpl(v: IN)
+
+  /**
+   * Merges another same-type accumulator into this one and update its 
state, i.e. this should be
+   * merge-in-place. Developers should extend mergeImpl to customize the 
merge functionality.
+   */
+  final private[spark] lazy val merge: (AccumulatorV2[IN, OUT] => Unit) = {
+assert(isAtDriverSide)
+// Handle data property accumulators
+if (metadata != null && metadata.dataProperty) {
+  dataPropertyMerge _
+} else {
+  mergeImpl _
+}
+  }
+
+  final private[spark] def dataPropertyMerge(other: AccumulatorV2[IN, 
OUT]) = {
+// Apply all foreach partitions regardless - they can only be fully 
evaluated
+val unprocessed = other.pending.filter{
+  case (ForeachOutputId(), v) => mergeImpl(v); false
+  case _ => true
+}
+val term = unprocessed.filter{case (k, v) => 
other.completed.contains(k)}
+term.flatMap {
+  case (RDDOutputId(rddId, splitId), v) =>
+Some((rddProcessed, rddId, splitId, v))
+  case (ShuffleMapOutputId(shuffleWriteId, splitId), v) =>
+Some((shuffleProcessed, shuffleWriteId, splitId, v))
+  case _ => // We won't ever hit this case but avoid compiler warnings
+None
+}.foreach {
+  case (processed, id, splitId, v) =>
+  val splits = processed.getOrElseUpdate(id, new mutable.BitSet())
+  if (!splits.contains(splitId)) {
+splits += splitId
+mergeImpl(v)
+  }
+}
--- End diff --

this methods feels more complicated than it needs to be -- what do you 
think of this version?  there is a small amount of duplication but I think its 
easier to follow:

```scala
  final private[spark] def dataPropertyMerge(other: AccumulatorV2[IN, OUT]) 
= {
def mergeAccumUpdateAndMarkOutputAsProcessed(
  partitionsAlreadyMerged: mutable.BitSet,
  outputId: TaskOutputId,
  accumUpdate: AccumulatorV2[IN, OUT]
): Unit = {
  // we don't merge in accumulator updates from an incomplete 
accumulator update, eg. a take()
  // which only partially reads an rdd partition
  if (other.completed.contains(outputId)) {
// has this partition been processed before?
if (!partitionsAlreadyMerged.contains(outputId.partition)) {
  partitionsAlreadyMerged += outputId.partition
  mergeImpl(accumUpdate)
}
  }
}
other.pending.foreach {
  // Apply all foreach partitions regardless - they can only be fully 
evaluated
  case (ForeachOutputId, accumUpdate) =>
mergeImpl(accumUpdate)
  // For RDDs & shuffles, apply the accumulator updates as long as the 
output is complete
 

[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...

2016-11-03 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/11105#discussion_r86487454
  
--- Diff: core/src/main/scala/org/apache/spark/TaskContext.scala ---
@@ -69,6 +69,20 @@ object TaskContext {
   }
 }
 
+/**
+ * Identifies where an update for a data property accumulator update came 
from.  This is important
+ * to ensure that updates are not double-counted when rdds get recomputed. 
 When the executors send
+ * back data property acucmualtor updates, they seperate out the updates 
per rdd or shuffle output
+ * generated by the task. That gives the driver sufficient info to ensure 
that each update is
+ * counted once. The shuffleId is important since two seperate shuffle 
actions could happen on the
+ * same RDD and the RDD ID and partition ID with different accumulators.
+ * Any accumulators used inside of `runJob` directly are always counted 
because there is no
+ * resubmition of `runJob`/`foreach`.
+*/
+private[spark] sealed trait TaskOutputId
+private[spark] case class RDDOutputId(rddId: Int, partition: Int) extends 
TaskOutputId
+private[spark] case class ShuffleMapOutputId(shuffleId: Int, partition: 
Int) extends TaskOutputId
+private[spark] case class ForeachOutputId() extends TaskOutputId
--- End diff --

can be a `case object`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...

2016-11-03 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/11105#discussion_r86486890
  
--- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala ---
@@ -136,15 +181,92 @@ abstract class AccumulatorV2[IN, OUT] extends 
Serializable {
   def reset(): Unit
 
   /**
+   * Takes the inputs and accumulates. e.g. it can be a simple `+=` for 
counter accumulator.
+   * Developers should extend addImpl to customize the adding 
functionality.
+   */
+  final def add(v: IN): Unit = {
+if (metadata != null && metadata.dataProperty) {
+  dataPropertyAdd(v)
+} else {
+  addImpl(v)
+}
+  }
+
+  private def dataPropertyAdd(v: IN): Unit = {
+// To allow the user to be able to access the current accumulated 
value from their process
+// worker side then we need to perform a "normal" add as well as the 
data property add.
+addImpl(v)
+// Add to the pending updates for data property
+val updateInfo = TaskContext.get().getRDDPartitionInfo()
+val base = pending.getOrElse(updateInfo, copyAndReset())
+// Since we may have constructed a new accumulator, set atDriverSide 
to false as the default
+// new accumulators will have atDriverSide equal to true.
+base.atDriverSide = false
+base.addImpl(v)
+pending(updateInfo) = base
+  }
+
+  /**
+   * Mark a specific rdd/shuffle/partition as completely processed. This 
is a noop for
+   * non-data property accumuables.
+   */
+  private[spark] def markFullyProcessed(taskOutputId: TaskOutputId): Unit 
= {
+if (metadata.dataProperty) {
+  completed += taskOutputId
+}
+  }
+
+  /**
+   * Takes the inputs and accumulates. e.g. it can be a simple `+=` for 
counter accumulator.
+   * Developers should extend addImpl to customize the adding 
functionality.
* Takes the inputs and accumulates.
*/
-  def add(v: IN): Unit
+  protected[spark] def addImpl(v: IN)
+
+  /**
+   * Merges another same-type accumulator into this one and update its 
state, i.e. this should be
+   * merge-in-place. Developers should extend mergeImpl to customize the 
merge functionality.
+   */
+  final private[spark] lazy val merge: (AccumulatorV2[IN, OUT] => Unit) = {
+assert(isAtDriverSide)
+// Handle data property accumulators
+if (metadata != null && metadata.dataProperty) {
+  dataPropertyMerge _
+} else {
+  mergeImpl _
+}
+  }
+
+  final private[spark] def dataPropertyMerge(other: AccumulatorV2[IN, 
OUT]) = {
+// Apply all foreach partitions regardless - they can only be fully 
evaluated
+val unprocessed = other.pending.filter{
+  case (ForeachOutputId(), v) => mergeImpl(v); false
--- End diff --

the idea is that if you use an accumulator in a foreach, you get the same 
behavior whether you use a data-property accumulator or not, right?  that makes 
sense.  before this last change, would data-property accumulators just get 
totally ignored in a foreach?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...

2016-11-03 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/11105#discussion_r86486267
  
--- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala ---
@@ -136,15 +181,92 @@ abstract class AccumulatorV2[IN, OUT] extends 
Serializable {
   def reset(): Unit
 
   /**
+   * Takes the inputs and accumulates. e.g. it can be a simple `+=` for 
counter accumulator.
+   * Developers should extend addImpl to customize the adding 
functionality.
+   */
+  final def add(v: IN): Unit = {
--- End diff --

oh crap.  you are going to hate me ... making this `final` is going to 
break binary compatibility.

is there any other alternative? I will keep thinking but I can't come up 
with anything.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...

2016-11-03 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/11105#discussion_r86486502
  
--- Diff: core/src/main/scala/org/apache/spark/TaskContextImpl.scala ---
@@ -126,4 +126,14 @@ private[spark] class TaskContextImpl(
 taskMetrics.registerAccumulator(a)
   }
 
+  private var rddPartitionInfo: TaskOutputId = null
--- End diff --

rename to `taskOutputInfo`, since with the new naming sometimes this is an 
`RDDPartitionInfo`, sometimes a `SuffleMapOutputId`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...

2016-11-03 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/11105#discussion_r86487664
  
--- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala ---
@@ -136,15 +181,92 @@ abstract class AccumulatorV2[IN, OUT] extends 
Serializable {
   def reset(): Unit
 
   /**
+   * Takes the inputs and accumulates. e.g. it can be a simple `+=` for 
counter accumulator.
+   * Developers should extend addImpl to customize the adding 
functionality.
+   */
+  final def add(v: IN): Unit = {
+if (metadata != null && metadata.dataProperty) {
+  dataPropertyAdd(v)
+} else {
+  addImpl(v)
+}
+  }
+
+  private def dataPropertyAdd(v: IN): Unit = {
+// To allow the user to be able to access the current accumulated 
value from their process
+// worker side then we need to perform a "normal" add as well as the 
data property add.
+addImpl(v)
+// Add to the pending updates for data property
+val updateInfo = TaskContext.get().getRDDPartitionInfo()
+val base = pending.getOrElse(updateInfo, copyAndReset())
+// Since we may have constructed a new accumulator, set atDriverSide 
to false as the default
+// new accumulators will have atDriverSide equal to true.
+base.atDriverSide = false
+base.addImpl(v)
+pending(updateInfo) = base
+  }
+
+  /**
+   * Mark a specific rdd/shuffle/partition as completely processed. This 
is a noop for
+   * non-data property accumuables.
+   */
+  private[spark] def markFullyProcessed(taskOutputId: TaskOutputId): Unit 
= {
+if (metadata.dataProperty) {
+  completed += taskOutputId
+}
+  }
+
+  /**
+   * Takes the inputs and accumulates. e.g. it can be a simple `+=` for 
counter accumulator.
+   * Developers should extend addImpl to customize the adding 
functionality.
* Takes the inputs and accumulates.
*/
-  def add(v: IN): Unit
+  protected[spark] def addImpl(v: IN)
+
+  /**
+   * Merges another same-type accumulator into this one and update its 
state, i.e. this should be
+   * merge-in-place. Developers should extend mergeImpl to customize the 
merge functionality.
+   */
+  final private[spark] lazy val merge: (AccumulatorV2[IN, OUT] => Unit) = {
+assert(isAtDriverSide)
+// Handle data property accumulators
+if (metadata != null && metadata.dataProperty) {
+  dataPropertyMerge _
+} else {
+  mergeImpl _
+}
+  }
+
+  final private[spark] def dataPropertyMerge(other: AccumulatorV2[IN, 
OUT]) = {
+// Apply all foreach partitions regardless - they can only be fully 
evaluated
+val unprocessed = other.pending.filter{
+  case (ForeachOutputId(), v) => mergeImpl(v); false
+  case _ => true
+}
+val term = unprocessed.filter{case (k, v) => 
other.completed.contains(k)}
--- End diff --

rename `term` to `accumUpdatesFromUnmergedTaskOutputs`, and include a 
comment like "Only take updates for task outputs we haven't seen before.  So if 
this task computed two rdds, but one of them had been computed previously, only 
take the accumulator updates from the other one."

obvious to you by now but its the most important part so worth spelling it 
out :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...

2016-11-03 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/11105#discussion_r86486618
  
--- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala ---
@@ -136,15 +181,92 @@ abstract class AccumulatorV2[IN, OUT] extends 
Serializable {
   def reset(): Unit
 
   /**
+   * Takes the inputs and accumulates. e.g. it can be a simple `+=` for 
counter accumulator.
+   * Developers should extend addImpl to customize the adding 
functionality.
+   */
+  final def add(v: IN): Unit = {
+if (metadata != null && metadata.dataProperty) {
+  dataPropertyAdd(v)
+} else {
+  addImpl(v)
+}
+  }
+
+  private def dataPropertyAdd(v: IN): Unit = {
+// To allow the user to be able to access the current accumulated 
value from their process
+// worker side then we need to perform a "normal" add as well as the 
data property add.
+addImpl(v)
+// Add to the pending updates for data property
+val updateInfo = TaskContext.get().getRDDPartitionInfo()
+val base = pending.getOrElse(updateInfo, copyAndReset())
+// Since we may have constructed a new accumulator, set atDriverSide 
to false as the default
+// new accumulators will have atDriverSide equal to true.
+base.atDriverSide = false
+base.addImpl(v)
+pending(updateInfo) = base
+  }
+
+  /**
+   * Mark a specific rdd/shuffle/partition as completely processed. This 
is a noop for
+   * non-data property accumuables.
--- End diff --

Maybe add something like "see [[TaskOutputId]] for an explanation of why 
this is important for data property accumulators"?  I worry the purpose of this 
may seem a little mysterious on its own ... figure we should at least point to 
the more complete description.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...

2016-11-03 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/11105#discussion_r86486722
  
--- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala ---
@@ -136,15 +181,92 @@ abstract class AccumulatorV2[IN, OUT] extends 
Serializable {
   def reset(): Unit
 
   /**
+   * Takes the inputs and accumulates. e.g. it can be a simple `+=` for 
counter accumulator.
+   * Developers should extend addImpl to customize the adding 
functionality.
+   */
+  final def add(v: IN): Unit = {
--- End diff --

ok maybe another subclass of `AccumulatorV2`? sucks that we woudn't get 
data property goodness in all accumulators automatically, though


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...

2016-11-03 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/11105#discussion_r86488263
  
--- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala ---
@@ -136,15 +181,92 @@ abstract class AccumulatorV2[IN, OUT] extends 
Serializable {
   def reset(): Unit
 
   /**
+   * Takes the inputs and accumulates. e.g. it can be a simple `+=` for 
counter accumulator.
+   * Developers should extend addImpl to customize the adding 
functionality.
+   */
+  final def add(v: IN): Unit = {
+if (metadata != null && metadata.dataProperty) {
+  dataPropertyAdd(v)
+} else {
+  addImpl(v)
+}
+  }
+
+  private def dataPropertyAdd(v: IN): Unit = {
+// To allow the user to be able to access the current accumulated 
value from their process
+// worker side then we need to perform a "normal" add as well as the 
data property add.
+addImpl(v)
+// Add to the pending updates for data property
+val updateInfo = TaskContext.get().getRDDPartitionInfo()
+val base = pending.getOrElse(updateInfo, copyAndReset())
+// Since we may have constructed a new accumulator, set atDriverSide 
to false as the default
+// new accumulators will have atDriverSide equal to true.
+base.atDriverSide = false
+base.addImpl(v)
+pending(updateInfo) = base
+  }
+
+  /**
+   * Mark a specific rdd/shuffle/partition as completely processed. This 
is a noop for
+   * non-data property accumuables.
+   */
+  private[spark] def markFullyProcessed(taskOutputId: TaskOutputId): Unit 
= {
+if (metadata.dataProperty) {
+  completed += taskOutputId
+}
+  }
+
+  /**
+   * Takes the inputs and accumulates. e.g. it can be a simple `+=` for 
counter accumulator.
+   * Developers should extend addImpl to customize the adding 
functionality.
* Takes the inputs and accumulates.
*/
-  def add(v: IN): Unit
+  protected[spark] def addImpl(v: IN)
+
+  /**
+   * Merges another same-type accumulator into this one and update its 
state, i.e. this should be
+   * merge-in-place. Developers should extend mergeImpl to customize the 
merge functionality.
+   */
+  final private[spark] lazy val merge: (AccumulatorV2[IN, OUT] => Unit) = {
+assert(isAtDriverSide)
+// Handle data property accumulators
+if (metadata != null && metadata.dataProperty) {
+  dataPropertyMerge _
+} else {
+  mergeImpl _
+}
+  }
+
+  final private[spark] def dataPropertyMerge(other: AccumulatorV2[IN, 
OUT]) = {
--- End diff --

its worth noting there is an asymmetry here that you might not expect from 
a `merge` method -- `this` has to be the accumulator that was created 
originally on the driver (so you access the right `rddProcessed` and 
`shuffleProcessed`), and `other` has to be the one from the task update (so you 
access the right `other.pending` and `other.completed`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...

2016-11-03 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/11105#discussion_r86486359
  
--- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala ---
@@ -42,18 +60,45 @@ private[spark] case class AccumulatorMetadata(
  * `OUT` should be a type that can be read atomically (e.g., Int, Long), 
or thread-safely
  * (e.g., synchronized collections) because it will be read from other 
threads.
  */
-abstract class AccumulatorV2[IN, OUT] extends Serializable {
+abstract class AccumulatorV2[@specialized(Int, Long, Double) IN, OUT] 
extends Serializable {
   private[spark] var metadata: AccumulatorMetadata = _
-  private[this] var atDriverSide = true
+  private[spark] var atDriverSide = true
+
+  /**
+   * The following values are used for data property [[AccumulatorV2]]s.
+   * Data property [[AccumulatorV2]]s have only-once semantics. These 
semantics are implemented
+   * by keeping track of which RDD id, shuffle id, and partition id the 
current function is
+   * processing in. If a partition is fully processed the results for that 
partition/shuffle/rdd
+   * combination are sent back to the driver. The driver keeps track of 
which rdd/shuffle/partitions
+   * already have been applied, and only combines values into value_ if 
the rdd/shuffle/partition
+   * has not already been aggregated on the driver program
+   */
+  // For data property accumulators pending and processed updates.
+  // Pending and processed are keyed by (rdd id, shuffle id, partition id)
+  private[spark] lazy val pending =
+new mutable.HashMap[TaskOutputId, AccumulatorV2[IN, OUT]]()
+  // Completed contains the set of (rdd id, shuffle id, partition id) that 
have been
+  // fully processed on the worker side. This is used to determine if the 
updates should
+  // be merged on the driver for a particular rdd/shuffle/partition 
combination.
+  private[spark] lazy val completed = new mutable.HashSet[TaskOutputId]()
+  // rddProcessed is keyed by rdd id and the value is a bitset containing 
all partitions
+  // for the given key which have been merged into the value. This is used 
on the driver.
+  @transient private[spark] lazy val rddProcessed = new 
mutable.HashMap[Int, mutable.BitSet]()
+  // shuffleProcessed is the same as rddProcessed except keyed by shuffle 
id.
+  @transient private[spark] lazy val shuffleProcessed = new 
mutable.HashMap[Int, mutable.BitSet]()
--- End diff --

I had a comment here earlier to check my understanding of this -- github is 
being really weird with these comments so I'm going to repost:


I find the logic around pending, completed and processed really confusing. 
I'm still thinking this through -- before making concrete suggestions I want to 
check my understanding.

completed should always only reflect the update from one task. It should 
only be set on the executor. At first I was going to say it shouldn't be a set, 
it should just be one value -- but of course one task can get updates from 
multiple rdds. It will differ from the keyset of pending b/c pending can 
include updates from partitions that aren't completed (eg. those take and 
coalesce cases).

`processed` is only used on the driver. Since it has global knowledge, it 
can use this determine whether each update is a repeat or not. (now this is 
`rddProcessed` and `shuffleProcessed`.)

I think the comment "If a partition is fully processed the results for that 
partition/shuffle/rdd combination are sent back to the driver" is incorrect -- 
it looks to me like the updates are always sent back to the driver, and the 
driver always makes the call on whether or not to merge.

seems like merge (and thus dataPropertyMerge) is only called on the driver. 
I'm trying to figure out whether or not it matters. It seems like it would do 
something weird if it were called on the executor. maybe we should add an 
assert(isAtDriver).



Assuming that is correct, I'd suggest some renamings:

`completed` to `completedTaskOutputsForOneTask`
and include a comment that its keyset differs from `pending` b/c some of 
the updates in pending may be from incomplete partition processing.

`pending` to `pendingAccumulatorUpdatesFromOneTask`
and have the comment mention that this tracks updates all accumulator 
updates from one task, but some of them may be ignored if the output they 
represent isn't fully computed, eg. an rdd.take() leads to incomplete partition 
processing.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA 

[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...

2016-11-03 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/11105#discussion_r86486669
  
--- Diff: core/src/main/scala/org/apache/spark/TaskContext.scala ---
@@ -69,6 +69,20 @@ object TaskContext {
   }
 }
 
+/**
+ * Identifies where an update for a data property accumulator update came 
from.  This is important
+ * to ensure that updates are not double-counted when rdds get recomputed. 
 When the executors send
+ * back data property acucmualtor updates, they seperate out the updates 
per rdd or shuffle output
+ * generated by the task. That gives the driver sufficient info to ensure 
that each update is
+ * counted once. The shuffleId is important since two seperate shuffle 
actions could happen on the
+ * same RDD and the RDD ID and partition ID with different accumulators.
+ * Any accumulators used inside of `runJob` directly are always counted 
because there is no
+ * resubmition of `runJob`/`foreach`.
--- End diff --

Can you add something here about why this is important for tracking 
incomplete partitions, eg. with repeatedly partially computing partitions with 
`take()`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...

2016-11-03 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/11105#discussion_r86486810
  
--- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala ---
@@ -136,15 +181,92 @@ abstract class AccumulatorV2[IN, OUT] extends 
Serializable {
   def reset(): Unit
 
   /**
+   * Takes the inputs and accumulates. e.g. it can be a simple `+=` for 
counter accumulator.
+   * Developers should extend addImpl to customize the adding 
functionality.
+   */
+  final def add(v: IN): Unit = {
+if (metadata != null && metadata.dataProperty) {
+  dataPropertyAdd(v)
+} else {
+  addImpl(v)
+}
+  }
+
+  private def dataPropertyAdd(v: IN): Unit = {
+// To allow the user to be able to access the current accumulated 
value from their process
+// worker side then we need to perform a "normal" add as well as the 
data property add.
+addImpl(v)
+// Add to the pending updates for data property
+val updateInfo = TaskContext.get().getRDDPartitionInfo()
+val base = pending.getOrElse(updateInfo, copyAndReset())
--- End diff --

rename `base` to `mergedAccumulatorForOneTaskOutput` (or something like 
that?  I just find `base` way to generic.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...

2016-11-03 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/11105#discussion_r86487362
  
--- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala ---
@@ -136,15 +181,92 @@ abstract class AccumulatorV2[IN, OUT] extends 
Serializable {
   def reset(): Unit
 
   /**
+   * Takes the inputs and accumulates. e.g. it can be a simple `+=` for 
counter accumulator.
+   * Developers should extend addImpl to customize the adding 
functionality.
+   */
+  final def add(v: IN): Unit = {
+if (metadata != null && metadata.dataProperty) {
+  dataPropertyAdd(v)
+} else {
+  addImpl(v)
+}
+  }
+
+  private def dataPropertyAdd(v: IN): Unit = {
+// To allow the user to be able to access the current accumulated 
value from their process
+// worker side then we need to perform a "normal" add as well as the 
data property add.
+addImpl(v)
+// Add to the pending updates for data property
+val updateInfo = TaskContext.get().getRDDPartitionInfo()
+val base = pending.getOrElse(updateInfo, copyAndReset())
+// Since we may have constructed a new accumulator, set atDriverSide 
to false as the default
+// new accumulators will have atDriverSide equal to true.
+base.atDriverSide = false
+base.addImpl(v)
+pending(updateInfo) = base
+  }
+
+  /**
+   * Mark a specific rdd/shuffle/partition as completely processed. This 
is a noop for
+   * non-data property accumuables.
+   */
+  private[spark] def markFullyProcessed(taskOutputId: TaskOutputId): Unit 
= {
+if (metadata.dataProperty) {
+  completed += taskOutputId
+}
+  }
+
+  /**
+   * Takes the inputs and accumulates. e.g. it can be a simple `+=` for 
counter accumulator.
+   * Developers should extend addImpl to customize the adding 
functionality.
* Takes the inputs and accumulates.
*/
-  def add(v: IN): Unit
+  protected[spark] def addImpl(v: IN)
+
+  /**
+   * Merges another same-type accumulator into this one and update its 
state, i.e. this should be
+   * merge-in-place. Developers should extend mergeImpl to customize the 
merge functionality.
+   */
+  final private[spark] lazy val merge: (AccumulatorV2[IN, OUT] => Unit) = {
+assert(isAtDriverSide)
+// Handle data property accumulators
+if (metadata != null && metadata.dataProperty) {
+  dataPropertyMerge _
+} else {
+  mergeImpl _
+}
+  }
+
+  final private[spark] def dataPropertyMerge(other: AccumulatorV2[IN, 
OUT]) = {
+// Apply all foreach partitions regardless - they can only be fully 
evaluated
+val unprocessed = other.pending.filter{
--- End diff --

style nit: spaces around `{` (a couple other places in this method too)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...

2016-11-03 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/11105#discussion_r86487826
  
--- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala ---
@@ -136,15 +181,92 @@ abstract class AccumulatorV2[IN, OUT] extends 
Serializable {
   def reset(): Unit
 
   /**
+   * Takes the inputs and accumulates. e.g. it can be a simple `+=` for 
counter accumulator.
+   * Developers should extend addImpl to customize the adding 
functionality.
+   */
+  final def add(v: IN): Unit = {
+if (metadata != null && metadata.dataProperty) {
+  dataPropertyAdd(v)
+} else {
+  addImpl(v)
+}
+  }
+
+  private def dataPropertyAdd(v: IN): Unit = {
+// To allow the user to be able to access the current accumulated 
value from their process
+// worker side then we need to perform a "normal" add as well as the 
data property add.
+addImpl(v)
+// Add to the pending updates for data property
+val updateInfo = TaskContext.get().getRDDPartitionInfo()
+val base = pending.getOrElse(updateInfo, copyAndReset())
+// Since we may have constructed a new accumulator, set atDriverSide 
to false as the default
+// new accumulators will have atDriverSide equal to true.
+base.atDriverSide = false
+base.addImpl(v)
+pending(updateInfo) = base
+  }
+
+  /**
+   * Mark a specific rdd/shuffle/partition as completely processed. This 
is a noop for
+   * non-data property accumuables.
+   */
+  private[spark] def markFullyProcessed(taskOutputId: TaskOutputId): Unit 
= {
+if (metadata.dataProperty) {
+  completed += taskOutputId
+}
+  }
+
+  /**
+   * Takes the inputs and accumulates. e.g. it can be a simple `+=` for 
counter accumulator.
+   * Developers should extend addImpl to customize the adding 
functionality.
* Takes the inputs and accumulates.
*/
-  def add(v: IN): Unit
+  protected[spark] def addImpl(v: IN)
+
+  /**
+   * Merges another same-type accumulator into this one and update its 
state, i.e. this should be
+   * merge-in-place. Developers should extend mergeImpl to customize the 
merge functionality.
+   */
+  final private[spark] lazy val merge: (AccumulatorV2[IN, OUT] => Unit) = {
+assert(isAtDriverSide)
+// Handle data property accumulators
+if (metadata != null && metadata.dataProperty) {
+  dataPropertyMerge _
+} else {
+  mergeImpl _
+}
+  }
+
+  final private[spark] def dataPropertyMerge(other: AccumulatorV2[IN, 
OUT]) = {
+// Apply all foreach partitions regardless - they can only be fully 
evaluated
+val unprocessed = other.pending.filter{
+  case (ForeachOutputId(), v) => mergeImpl(v); false
+  case _ => true
+}
+val term = unprocessed.filter{case (k, v) => 
other.completed.contains(k)}
+term.flatMap {
+  case (RDDOutputId(rddId, splitId), v) =>
+Some((rddProcessed, rddId, splitId, v))
+  case (ShuffleMapOutputId(shuffleWriteId, splitId), v) =>
+Some((shuffleProcessed, shuffleWriteId, splitId, v))
+  case _ => // We won't ever hit this case but avoid compiler warnings
+None
+}.foreach {
+  case (processed, id, splitId, v) =>
--- End diff --

indentation is off


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...

2016-11-03 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/11105#discussion_r86484995
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala ---
@@ -104,10 +105,26 @@ class ShuffledRDD[K: ClassTag, V: ClassTag, C: 
ClassTag](
   }
 
   override def compute(split: Partition, context: TaskContext): 
Iterator[(K, C)] = {
+// Use -1 for our Shuffle ID since we are on the read side of the 
shuffle.
+val shuffleWriteId = -1
+// If our task has data property accumulators we need to keep track of 
which partitions
+// we are processing.
+if (context.taskMetrics.hasDataPropertyAccumulators()) {
+  context.setRDDPartitionInfo(id, shuffleWriteId, split.index)
+}
 val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
-SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, 
split.index + 1, context)
+val itr = SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, 
split.index, split.index + 1,
+  context)
--- End diff --

man github formatting for these comments is super weird.  It doesn't show 
up in the comments view, I only saw it in an email notifcation.

Shuffled-read operations (aka reducers) get a bunch of blocks from the 
mappers, and then it merges those blocks together.  But the mappers have 
already sorted the data, so really this only needs to be a merge of a bunch of 
sorted streams.  It happens to do this now by just sorting the whole thing.  
But, there isn't any reason why the implementation couldn't change -- it could 
do the merge on the fly instead, pulling just the next key from each stream.  
It would run the combiner for all the records with the same key, and then push 
that one record out to the iterator.

If it changed to do that, this would break.  I think you are right, its 
covered by a test case (another comment on that below), so thats fine.  But I 
think its worth expanding your comment that this is based on the *assumption* 
that the shuffle reader always processes the entire input, running all 
combiners, before returning an iterator.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...

2016-11-03 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/11105#discussion_r86484498
  
--- Diff: 
core/src/test/scala/org/apache/spark/DataPropertyAccumulatorSuite.scala ---
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.ref.WeakReference
+
+import org.scalatest.Matchers
+
+import org.apache.spark.scheduler._
+import org.apache.spark.util.{AccumulatorContext, AccumulatorMetadata, 
AccumulatorV2, LongAccumulator}
+
+
+class DataPropertyAccumulatorSuite extends SparkFunSuite with Matchers 
with LocalSparkContext {
+  test("two partition old and new") {
+sc = new SparkContext("local[2]", "test")
+val acc = sc.longAccumulator(dataProperty = true, "l2")
+
+val a = sc.parallelize(1 to 20, 2)
+val b = a.map{x => acc.add(x); x}
+b.cache()
+b.count()
+acc.value should be (210)
+  }
+
+  test("single partition") {
+sc = new SparkContext("local[2]", "test")
+val acc = sc.longAccumulator(dataProperty = true)
+
+val a = sc.parallelize(1 to 20, 1)
+val b = a.map{x => acc.add(x); x}
+b.cache()
+b.count()
+acc.value should be (210)
+  }
+
+  test("adding only the first element per partition should work even if 
partition is empty") {
+sc = new SparkContext("local[2]", "test")
+val acc = sc.longAccumulator(dataProperty = true)
+val a = sc.parallelize(1 to 20, 30)
+val b = a.mapPartitions{itr =>
+  acc.add(1)
+  itr
+}
+b.count()
+acc.value should be (30)
+  }
+
+  test("shuffled (combineByKey)") {
+sc = new SparkContext("local[2]", "test")
+val a = sc.parallelize(1L to 40L, 5)
+val buckets = 4
+val b = a.map{x => ((x % buckets), x)}
+val inputs = List(b, b.repartition(10), b.partitionBy(new 
HashPartitioner(5))).map(_.cache())
+val mapSideCombines = List(true, false)
+inputs.foreach { input =>
+  mapSideCombines.foreach { mapSideCombine =>
+val accs = (1 to 4).map(x => sc.longAccumulator(dataProperty = 
true)).toList
+val raccs = (1 to 4).map(x => sc.longAccumulator(dataProperty = 
false)).toList
+val List(acc, acc1, acc2, acc3) = accs
+val List(racc, racc1, racc2, racc3) = raccs
+val c = input.combineByKey(
+  (x: Long) => {acc1.add(1); acc.add(1); racc1.add(1); 
racc.add(1); x},
+  {(a: Long, b: Long) => acc2.add(1); acc.add(1); racc2.add(1); 
racc.add(1); (a + b)},
+  {(a: Long, b: Long) => acc3.add(1); acc.add(1); racc3.add(1); 
racc.add(1); (a + b)},
+  new HashPartitioner(10),
+  mapSideCombine)
+val d = input.combineByKey(
+  (x: Long) => {acc1.add(1); acc.add(1); x},
+  {(a: Long, b: Long) => acc2.add(1); acc.add(1); (a + b)},
+  {(a: Long, b: Long) => acc3.add(1); acc.add(1); (a + b)},
+  new HashPartitioner(2),
+  mapSideCombine)
+val e = d.map{x => acc.add(1); x}
--- End diff --

related to the discussion about the iterator wrapping in ShuffledRDD -- 
this looks like the only test case that is related to the chaining of a 
shufflerdd plus another rdd.map, both with data accumulators.  Is this case 
sufficient?

I think it is sufficient, though its not totally obvious to me, at least.  
If it didn't work, than after the call to `d.count()`, when you later do 
`e.count()`, the updates from `e` would look like they were duplicate updates 
from `d` and get ignored.  does that sounds right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at 

[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...

2016-11-03 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/11105#discussion_r86483290
  
--- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala ---
@@ -306,18 +428,19 @@ class LongAccumulator extends AccumulatorV2[jl.Long, 
jl.Long] {
 
   /**
* Adds v to the accumulator, i.e. increment sum by v and count by 1.
-   * @since 2.0.0
+   * Added for simplicity with adding non java Longs.
+   * @since 2.1.0
*/
-  override def add(v: jl.Long): Unit = {
-_sum += v
-_count += 1
+  def add(v: Long): Unit = {
--- End diff --

yeah I don't love the copied code either.  The thing is, `@specialized` has 
bigger ramfications -- the actual classes that get used (eg. that you see in 
stack traces) are different,  tricker to access the right class from java, 
there are a ton of generated classes when there is multiple inheritance (not 
relevant here at the moment, but perhaps in the future).  I think having copied 
code is the lesser of two evils.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...

2016-11-03 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/11105#discussion_r86461055
  
--- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala ---
@@ -42,18 +60,43 @@ private[spark] case class AccumulatorMetadata(
  * `OUT` should be a type that can be read atomically (e.g., Int, Long), 
or thread-safely
  * (e.g., synchronized collections) because it will be read from other 
threads.
  */
-abstract class AccumulatorV2[IN, OUT] extends Serializable {
+abstract class AccumulatorV2[@specialized(Int, Long, Double) IN, OUT] 
extends Serializable {
--- End diff --

So we could do that, but would the specialized annotation maybe easier to 
maintain?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...

2016-11-03 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/11105#discussion_r86460868
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala ---
@@ -104,10 +105,26 @@ class ShuffledRDD[K: ClassTag, V: ClassTag, C: 
ClassTag](
   }
 
   override def compute(split: Partition, context: TaskContext): 
Iterator[(K, C)] = {
+// Use -1 for our Shuffle ID since we are on the read side of the 
shuffle.
+val shuffleWriteId = -1
+// If our task has data property accumulators we need to keep track of 
which partitions
+// we are processing.
+if (context.taskMetrics.hasDataPropertyAccumulators()) {
+  context.setRDDPartitionInfo(id, shuffleWriteId, split.index)
+}
 val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
-SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, 
split.index + 1, context)
+val itr = SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, 
split.index, split.index + 1,
+  context)
--- End diff --

So unless the input was already sorted we will always have to read the 
entire input first. Also since we have tests for this if we do change this its 
a simple matter of updating the wrapper to be more like the rest should we ever 
change this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...

2016-11-03 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/11105#discussion_r86460332
  
--- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala ---
@@ -306,18 +428,19 @@ class LongAccumulator extends AccumulatorV2[jl.Long, 
jl.Long] {
 
   /**
* Adds v to the accumulator, i.e. increment sum by v and count by 1.
-   * @since 2.0.0
+   * Added for simplicity with adding non java Longs.
+   * @since 2.1.0
*/
-  override def add(v: jl.Long): Unit = {
-_sum += v
-_count += 1
+  def add(v: Long): Unit = {
--- End diff --

So we could do that, but it seems like maybe having the specialized is 
better than copying that code? I don't have super strong feelings about this 
but just something to consider.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...

2016-11-03 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/11105#discussion_r86381839
  
--- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala ---
@@ -42,18 +60,43 @@ private[spark] case class AccumulatorMetadata(
  * `OUT` should be a type that can be read atomically (e.g., Int, Long), 
or thread-safely
  * (e.g., synchronized collections) because it will be read from other 
threads.
  */
-abstract class AccumulatorV2[IN, OUT] extends Serializable {
+abstract class AccumulatorV2[@specialized(Int, Long, Double) IN, OUT] 
extends Serializable {
--- End diff --

hmm, it doesn't look this way to me.  If I look at the compiled code for 
AccumulatorV2 in the spark2 binaries, there are no specialized methods.  It 
looks like boxing does happen.

I do see test failures in `AccumulatorV2Suite` without this change, but 
looks like that can be fixed by just tweaking the specialized versions of these 
functions that are manually defined in `DoubleAccumulator` and 
`LongAccumulator`, eg.

```scala
  /**
   * Adds v to the accumulator, i.e. increment sum by v and count by 1.
   * @since 2.1.0
   */
  def add(v: Double): Unit = {
if (metadata != null && metadata.dataProperty) {
  dataPropertyAdd(v: jl.Double)
} else {
  addImpl(v)
}
  }
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...

2016-11-03 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/11105#discussion_r86384551
  
--- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala ---
@@ -306,18 +428,19 @@ class LongAccumulator extends AccumulatorV2[jl.Long, 
jl.Long] {
 
   /**
* Adds v to the accumulator, i.e. increment sum by v and count by 1.
-   * @since 2.0.0
+   * Added for simplicity with adding non java Longs.
+   * @since 2.1.0
*/
-  override def add(v: jl.Long): Unit = {
-_sum += v
-_count += 1
+  def add(v: Long): Unit = {
--- End diff --

see my comment earlier about `@specialized`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...

2016-11-03 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/11105#discussion_r86384061
  
--- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala ---
@@ -22,17 +22,36 @@ import java.io.ObjectInputStream
 import java.util.{ArrayList, Collections}
 import java.util.concurrent.ConcurrentHashMap
 import java.util.concurrent.atomic.AtomicLong
+import javax.annotation.concurrent.GuardedBy
--- End diff --

unused


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...

2016-11-03 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/11105#discussion_r86383782
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala ---
@@ -104,10 +105,26 @@ class ShuffledRDD[K: ClassTag, V: ClassTag, C: 
ClassTag](
   }
 
   override def compute(split: Partition, context: TaskContext): 
Iterator[(K, C)] = {
+// Use -1 for our Shuffle ID since we are on the read side of the 
shuffle.
+val shuffleWriteId = -1
+// If our task has data property accumulators we need to keep track of 
which partitions
+// we are processing.
+if (context.taskMetrics.hasDataPropertyAccumulators()) {
+  context.setRDDPartitionInfo(id, shuffleWriteId, split.index)
+}
 val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
-SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, 
split.index + 1, context)
+val itr = SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, 
split.index, split.index + 1,
+  context)
--- End diff --

I am looking closely at the combiner code to try to confirm this.  I think 
I believe it, I don't think its *guaranteed* to be true in the future.  Eg., 
right now the combiners do an `insertAll` into the `ExternalAppendOnlyMap` 
before reading from it.  But there is no reason spark couldn't change so that 
what it actually does is just insert the *next* key from all incoming streams 
into the `ExternalAppendOnlyMap`, and then feed that one key to the downstream 
iterators.  

At the very least, we need a test to ensure this doesn't break if that 
internal implementation were to change.  (Does a test like that already exist?)

Again, I'm still mulling over whether there is even a good use to bother 
supporting this at all ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...

2016-11-03 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/11105#discussion_r84380686
  
--- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala ---
@@ -136,15 +179,76 @@ abstract class AccumulatorV2[IN, OUT] extends 
Serializable {
   def reset(): Unit
 
   /**
+   * Takes the inputs and accumulates. e.g. it can be a simple `+=` for 
counter accumulator.
+   * Developers should extend addImpl to customize the adding 
functionality.
+   */
+  final def add(v: IN): Unit = {
+if (metadata != null && metadata.dataProperty) {
+  dataPropertyAdd(v)
+} else {
+  addImpl(v)
+}
+  }
+
+  private def dataPropertyAdd(v: IN): Unit = {
+// Add first for localValue & AccumulatorInfo
--- End diff --

sorry I wasn't clear -- I meant why is "first" important.  But i guess 
first isn't actually important, you're saying that *both* are needed.  Can you 
update the comment to reflect that?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...

2016-11-03 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/11105#discussion_r86384641
  
--- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala ---
@@ -383,16 +506,7 @@ class DoubleAccumulator extends 
AccumulatorV2[jl.Double, jl.Double] {
* Adds v to the accumulator, i.e. increment sum by v and count by 1.
* @since 2.0.0
*/
-  override def add(v: jl.Double): Unit = {
-_sum += v
-_count += 1
-  }
-
-  /**
-   * Adds v to the accumulator, i.e. increment sum by v and count by 1.
-   * @since 2.0.0
-   */
-  def add(v: Double): Unit = {
--- End diff --

see comment on `@specialized`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...

2016-10-31 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/11105#discussion_r85860913
  
--- Diff: 
core/src/test/scala/org/apache/spark/DataPropertyAccumulatorSuite.scala ---
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.ref.WeakReference
+
+import org.scalatest.Matchers
+
+import org.apache.spark.scheduler._
+
+
+class DataPropertyAccumulatorSuite extends SparkFunSuite with Matchers 
with LocalSparkContext {
+  test("single partition") {
+sc = new SparkContext("local[2]", "test")
+val acc : Accumulator[Int] = sc.accumulator(0, dataProperty = true)
+
+val a = sc.parallelize(1 to 20, 1)
+val b = a.map{x => acc += x; x}
+b.cache()
+b.count()
+acc.value should be (210)
+  }
+
+  test("adding only the first element per partition should work even if 
partition is empty") {
+sc = new SparkContext("local[2]", "test")
+val acc: Accumulator[Int] = sc.accumulator(0, dataProperty = true)
+val a = sc.parallelize(1 to 20, 30)
+val b = a.mapPartitions{itr =>
+  acc += 1
+  itr
+}
+b.count()
+acc.value should be (30)
+  }
+
+  test("shuffled (combineByKey)") {
+sc = new SparkContext("local[2]", "test")
+val a = sc.parallelize(1 to 40, 5)
+val buckets = 4
+val b = a.map{x => ((x % buckets), x)}
+val inputs = List(b, b.repartition(10), b.partitionBy(new 
HashPartitioner(5))).map(_.cache())
+val mapSideCombines = List(true, false)
+inputs.foreach { input =>
+  mapSideCombines.foreach { mapSideCombine =>
+val accs = (1 to 4).map(x => sc.accumulator(0, dataProperty = 
true)).toList
+val raccs = (1 to 4).map(x => sc.accumulator(0, dataProperty = 
false)).toList
+val List(acc, acc1, acc2, acc3) = accs
+val List(racc, racc1, racc2, racc3) = raccs
+val c = input.combineByKey(
+  (x: Int) => {acc1 += 1; acc += 1; racc1 += 1; racc += 1; x},
+  {(a: Int, b: Int) => acc2 += 1; acc += 1; racc2 += 1; racc += 1; 
(a + b)},
+  {(a: Int, b: Int) => acc3 += 1; acc += 1; racc3 += 1; racc += 1; 
(a + b)},
+  new HashPartitioner(10),
+  mapSideCombine)
+val d = input.combineByKey(
+  (x: Int) => {acc1 += 1; acc += 1; x},
+  {(a: Int, b: Int) => acc2 += 1; acc += 1; (a + b)},
+  {(a: Int, b: Int) => acc3 += 1; acc += 1; (a + b)},
+  new HashPartitioner(2),
+  mapSideCombine)
+val e = d.map{x => acc += 1; x}
+c.count()
+// If our partitioner is known then we should only create
+// one combiner for each key value. Otherwise we should
+// create at least that many combiners.
+if (input.partitioner.isDefined) {
+  acc1.value should be (buckets)
+} else {
+  acc1.value should be >= (buckets)
+}
+if (input.partitioner.isDefined) {
+  acc2.value should be > (0)
+} else if (mapSideCombine) {
+  acc3.value should be > (0)
+} else {
+  acc2.value should be > (0)
+  acc3.value should be (0)
+}
+acc.value should be (acc1.value + acc2.value + acc3.value)
+val oldValues = accs.map(_.value)
+// For one action the data property accumulators and regular 
should have the same value.
+accs.map(_.value) should be (raccs.map(_.value))
+c.count()
+accs.map(_.value) should be (oldValues)
--- End diff --

@squito That is a testing and playing implementation. Seems I don't push it 
to remote and I can not find it now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. 

[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...

2016-10-31 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/11105#discussion_r85805218
  
--- Diff: core/src/main/scala/org/apache/spark/util/JsonProtocol.scala ---
@@ -722,6 +722,7 @@ private[spark] object JsonProtocol {
 val value = Utils.jsonOption(json \ "Value").map { v => 
accumValueFromJson(name, v) }
 val internal = (json \ "Internal").extractOpt[Boolean].getOrElse(false)
 val countFailedValues = (json \ "Count Failed 
Values").extractOpt[Boolean].getOrElse(false)
+val dataProperty = (json \ 
"DataProperty").extractOpt[Boolean].getOrElse(false)
--- End diff --

Done :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...

2016-10-17 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/11105#discussion_r83690354
  
--- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala ---
@@ -136,15 +179,76 @@ abstract class AccumulatorV2[IN, OUT] extends 
Serializable {
   def reset(): Unit
 
   /**
+   * Takes the inputs and accumulates. e.g. it can be a simple `+=` for 
counter accumulator.
+   * Developers should extend addImpl to customize the adding 
functionality.
+   */
+  final def add(v: IN): Unit = {
+if (metadata != null && metadata.dataProperty) {
+  dataPropertyAdd(v)
+} else {
+  addImpl(v)
+}
+  }
+
+  private def dataPropertyAdd(v: IN): Unit = {
+// Add first for localValue & AccumulatorInfo
--- End diff --

If we want the user to be able to access the current accumulated value from 
their process worker side then we need to perform a "normal" add as well as the 
data property add. Also if we don't have a merge step (e.g. only one 
accumulator) this might make a difference (although I _think_ this won't be the 
case anymore with the refactor that happened I haven't tested it).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...

2016-10-17 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/11105#discussion_r83689480
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala ---
@@ -104,10 +105,26 @@ class ShuffledRDD[K: ClassTag, V: ClassTag, C: 
ClassTag](
   }
 
   override def compute(split: Partition, context: TaskContext): 
Iterator[(K, C)] = {
+// Use -1 for our Shuffle ID since we are on the read side of the 
shuffle.
+val shuffleWriteId = -1
+// If our task has data property accumulators we need to keep track of 
which partitions
+// we are processing.
+if (context.taskMetrics.hasDataPropertyAccumulators()) {
+  context.setRDDPartitionInfo(id, shuffleWriteId, split.index)
+}
 val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
-SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, 
split.index + 1, context)
+val itr = SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, 
split.index, split.index + 1,
+  context)
--- End diff --

So the short version is no, because the only user controlled code which 
happens underneath this is eagerly evaluated on the entire shuffle input - 
namely `combineValuesByKey` and `combineCombinersByKey` both consume the entire 
input before passing back control. I'll add this in a comment explaining this 
behaviour.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...

2016-10-17 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/11105#discussion_r83687095
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala 
---
@@ -34,8 +35,29 @@ private[spark] class MapPartitionsRDD[U: ClassTag, T: 
ClassTag](
 
   override def getPartitions: Array[Partition] = firstParent[T].partitions
 
-  override def compute(split: Partition, context: TaskContext): 
Iterator[U] =
-f(context, split.index, firstParent[T].iterator(split, context))
+  override def compute(split: Partition, context: TaskContext): 
Iterator[U] = {
+val input = firstParent[T].iterator(split, context)
+// If our task has data property accumulators we need to keep track of 
which partitions
+// are fully processed and what we are currently processing.
+if (context.taskMetrics.hasDataPropertyAccumulators()) {
+  // Use -1 for shuffle id since we are not in a shuffle.
+  val shuffleWriteId = -1
+  // Set the ID of the RDD and partition being processed. We need to 
do this per
+  // element since we chain the iterator transformations together
+  val data = input.map{x =>
+context.setRDDPartitionInfo(id, shuffleWriteId, split.index)
+x
+  }
+  val wrappedData = Utils.signalWhenEmpty(data,
+() => context.taskMetrics.markFullyProcessed(id, shuffleWriteId, 
split.index))
+  // We also set it before the first call to the user function in case 
the user provides a
+  // function which access a data property accumulator before 
accessing any elements.
+  context.setRDDPartitionInfo(id, shuffleWriteId, split.index)
+  f(context, split.index, wrappedData)
+} else {
+  f(context, split.index, input)
--- End diff --

Makes sense I'll include this in the next update.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...

2016-10-17 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/11105#discussion_r83623396
  
--- Diff: core/src/main/scala/org/apache/spark/TaskContextImpl.scala ---
@@ -126,4 +126,14 @@ private[spark] class TaskContextImpl(
 taskMetrics.registerAccumulator(a)
   }
 
+  private var rddPartitionInfo: (Int, Int, Int) = null
--- End diff --

Yah that sounds pretty reasonable. I'll do this update next.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...

2016-10-16 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/11105#discussion_r83557436
  
--- Diff: core/src/main/scala/org/apache/spark/Accumulable.scala ---
@@ -56,15 +61,26 @@ class Accumulable[R, T] private (
 @transient private val initialValue: R,
 param: AccumulableParam[R, T],
 val name: Option[String],
-private[spark] val countFailedValues: Boolean)
+private[spark] val countFailedValues: Boolean,
+private[spark] val dataProperty: Boolean)
   extends Serializable {
 
   private[spark] def this(
   initialValue: R,
   param: AccumulableParam[R, T],
   name: Option[String],
+  countFailedValues: Boolean,
+  dataProperty: Boolean) = {
+this(AccumulatorContext.newId(), initialValue, param, name, 
countFailedValues, dataProperty)
+  }
+
+  private[spark] def this(
+  initialValue: R,
+  param: AccumulableParam[R, T],
+  name: Option[String],
   countFailedValues: Boolean) = {
-this(AccumulatorContext.newId(), initialValue, param, name, 
countFailedValues)
+this(AccumulatorContext.newId(), initialValue, param, name, 
countFailedValues,
+  false /* dataProperty */)
--- End diff --

So for reasons (which aren't super clear), we can't specify it this way 
(you can see Josh ran into similar troubles in some other places) - but since 
we aren't going to deal with the old API I'll just rip this part out anyways.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...

2016-10-13 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/11105#discussion_r83287789
  
--- Diff: core/src/main/scala/org/apache/spark/util/JsonProtocol.scala ---
@@ -722,6 +722,7 @@ private[spark] object JsonProtocol {
 val value = Utils.jsonOption(json \ "Value").map { v => 
accumValueFromJson(name, v) }
 val internal = (json \ "Internal").extractOpt[Boolean].getOrElse(false)
 val countFailedValues = (json \ "Count Failed 
Values").extractOpt[Boolean].getOrElse(false)
+val dataProperty = (json \ 
"DataProperty").extractOpt[Boolean].getOrElse(false)
--- End diff --

can you add a test case to the backwards compatability section in 
`JsonProtocolSuite`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...

2016-10-13 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/11105#discussion_r83273608
  
--- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala ---
@@ -42,18 +60,43 @@ private[spark] case class AccumulatorMetadata(
  * `OUT` should be a type that can be read atomically (e.g., Int, Long), 
or thread-safely
  * (e.g., synchronized collections) because it will be read from other 
threads.
  */
-abstract class AccumulatorV2[IN, OUT] extends Serializable {
+abstract class AccumulatorV2[@specialized(Int, Long, Double) IN, OUT] 
extends Serializable {
--- End diff --

unrelated to this change, right?  if you think its important open another 
jira / pr.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...

2016-10-13 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/11105#discussion_r83290175
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala ---
@@ -104,10 +105,26 @@ class ShuffledRDD[K: ClassTag, V: ClassTag, C: 
ClassTag](
   }
 
   override def compute(split: Partition, context: TaskContext): 
Iterator[(K, C)] = {
+// Use -1 for our Shuffle ID since we are on the read side of the 
shuffle.
+val shuffleWriteId = -1
+// If our task has data property accumulators we need to keep track of 
which partitions
+// we are processing.
+if (context.taskMetrics.hasDataPropertyAccumulators()) {
+  context.setRDDPartitionInfo(id, shuffleWriteId, split.index)
+}
 val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
-SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, 
split.index + 1, context)
+val itr = SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, 
split.index, split.index + 1,
+  context)
--- End diff --

don't you need to wrap this iterator to reset the partitionInfo per record, 
like you do in `MapPartitionsRDD`?  what if there is 
`rdd.reduceByKey(...).map(...)` with accumulator updates in both?

The more I think about this, I am wondering if dataproperty accumulators 
should only be supported in map partitions rdd.  (sorry I think long ago I 
argued for putting this in ... I am still not certain ...)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...

2016-10-13 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/11105#discussion_r83280578
  
--- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala ---
@@ -136,15 +179,76 @@ abstract class AccumulatorV2[IN, OUT] extends 
Serializable {
   def reset(): Unit
 
   /**
+   * Takes the inputs and accumulates. e.g. it can be a simple `+=` for 
counter accumulator.
+   * Developers should extend addImpl to customize the adding 
functionality.
+   */
+  final def add(v: IN): Unit = {
+if (metadata != null && metadata.dataProperty) {
+  dataPropertyAdd(v)
+} else {
+  addImpl(v)
+}
+  }
+
+  private def dataPropertyAdd(v: IN): Unit = {
+// Add first for localValue & AccumulatorInfo
+addImpl(v)
+if (metadata != null && metadata.dataProperty) {
+  val updateInfo = TaskContext.get().getRDDPartitionInfo()
+  val base = pending.getOrElse(updateInfo, copyAndReset())
+  base.atDriverSide = false
+  base.addImpl(v)
+  pending(updateInfo) = base
+}
+  }
+
+  /**
+   * Mark a specific rdd/shuffle/partition as completely processed. This 
is a noop for
+   * non-data property accumuables.
+   */
+  private[spark] def markFullyProcessed(rddId: Int, shuffleWriteId: Int, 
partitionId: Int): Unit = {
+if (metadata.dataProperty) {
+  completed += ((rddId, shuffleWriteId, partitionId))
+}
+  }
+
+  /**
+   * Takes the inputs and accumulates. e.g. it can be a simple `+=` for 
counter accumulator.
+   * Developers should extend addImpl to customize the adding 
functionality.
* Takes the inputs and accumulates.
*/
-  def add(v: IN): Unit
+  protected[spark] def addImpl(v: IN)
 
   /**
* Merges another same-type accumulator into this one and update its 
state, i.e. this should be
-   * merge-in-place.
+   * merge-in-place. Developers should extend mergeImpl to customize the 
merge functionality.
*/
-  def merge(other: AccumulatorV2[IN, OUT]): Unit
+  final private[spark] lazy val merge: (AccumulatorV2[IN, OUT] => Unit) = {
+// Handle data property accumulators
+if (metadata != null && metadata.dataProperty) {
+  dataPropertyMerge _
+} else {
+  mergeImpl _
+}
+  }
+
+  final private[spark] def dataPropertyMerge(other: AccumulatorV2[IN, 
OUT]) = {
+val term = other.pending.filter{case (k, v) => 
other.completed.contains(k)}
--- End diff --

fix indentation


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...

2016-10-13 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/11105#discussion_r83271510
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -1314,27 +1347,54 @@ class SparkContext(config: SparkConf) extends 
Logging with ExecutorAllocationCli
   }
 
   /**
-   * Register the given accumulator.  Note that accumulators must be 
registered before use, or it
+   * Register the given accumulator. Note that accumulators must be 
registered before use, or it
* will throw exception.
*/
   def register(acc: AccumulatorV2[_, _]): Unit = {
-acc.register(this)
+acc.register(this, dataProperty = false)
   }
 
   /**
-   * Register the given accumulator with given name.  Note that 
accumulators must be registered
+   * Register the given accumulator with given name. Note that 
accumulators must be registered
* before use, or it will throw exception.
+   *
+   * @param name The name of accumulator.
*/
   def register(acc: AccumulatorV2[_, _], name: String): Unit = {
 acc.register(this, name = Some(name))
   }
 
   /**
+   * Register the given accumulator. Note that accumulators must be 
registered before use, or it
+   * will throw exception.
+   *
+   * @param dataProperty If the accumulator should avoid re-counting 
multiple evaluations on the
+   * same RDD/partition. This adds some additional 
overhead for tracking and
+   * is an experimental feature.
+   */
+  def register(acc: AccumulatorV2[_, _], dataProperty: Boolean): Unit = {
+acc.register(this, dataProperty = dataProperty)
+  }
+
+  /**
+   * Register the given accumulator with given name. Note that 
accumulators must be registered
+   * before use, or it will throw exception.
+   *
+   * @param dataProperty If the accumulator should avoid re-counting 
multiple evaluations on the
+   * same RDD/partition. This adds some additional 
overhead for tracking and
+   * is an experimental feature.
+   * @param name The name of accumulator.
+   */
+  def register(acc: AccumulatorV2[_, _], dataProperty: Boolean, name: 
String): Unit = {
--- End diff --

I think both of these `register` methods which expose `dataProperty` at all 
should be marked `@Experimental`.  probably elsewhere too, won't mention it 
other places.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...

2016-10-13 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/11105#discussion_r83287564
  
--- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala ---
@@ -136,15 +179,76 @@ abstract class AccumulatorV2[IN, OUT] extends 
Serializable {
   def reset(): Unit
 
   /**
+   * Takes the inputs and accumulates. e.g. it can be a simple `+=` for 
counter accumulator.
+   * Developers should extend addImpl to customize the adding 
functionality.
+   */
+  final def add(v: IN): Unit = {
+if (metadata != null && metadata.dataProperty) {
+  dataPropertyAdd(v)
+} else {
+  addImpl(v)
+}
+  }
+
+  private def dataPropertyAdd(v: IN): Unit = {
+// Add first for localValue & AccumulatorInfo
+addImpl(v)
+if (metadata != null && metadata.dataProperty) {
+  val updateInfo = TaskContext.get().getRDDPartitionInfo()
+  val base = pending.getOrElse(updateInfo, copyAndReset())
+  base.atDriverSide = false
+  base.addImpl(v)
+  pending(updateInfo) = base
+}
+  }
+
+  /**
+   * Mark a specific rdd/shuffle/partition as completely processed. This 
is a noop for
+   * non-data property accumuables.
+   */
+  private[spark] def markFullyProcessed(rddId: Int, shuffleWriteId: Int, 
partitionId: Int): Unit = {
+if (metadata.dataProperty) {
+  completed += ((rddId, shuffleWriteId, partitionId))
+}
+  }
+
+  /**
+   * Takes the inputs and accumulates. e.g. it can be a simple `+=` for 
counter accumulator.
+   * Developers should extend addImpl to customize the adding 
functionality.
* Takes the inputs and accumulates.
*/
-  def add(v: IN): Unit
+  protected[spark] def addImpl(v: IN)
 
   /**
* Merges another same-type accumulator into this one and update its 
state, i.e. this should be
-   * merge-in-place.
+   * merge-in-place. Developers should extend mergeImpl to customize the 
merge functionality.
*/
-  def merge(other: AccumulatorV2[IN, OUT]): Unit
+  final private[spark] lazy val merge: (AccumulatorV2[IN, OUT] => Unit) = {
+// Handle data property accumulators
--- End diff --

this comment is kinda pointless -- pretty obvious that is what the `if` is 
doing.  What is really needed is an explanation of *why* -- maybe that belongs 
as a doc comment on `dataPropertyMerge`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...

2016-10-13 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/11105#discussion_r83284254
  
--- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala ---
@@ -136,15 +179,76 @@ abstract class AccumulatorV2[IN, OUT] extends 
Serializable {
   def reset(): Unit
 
   /**
+   * Takes the inputs and accumulates. e.g. it can be a simple `+=` for 
counter accumulator.
+   * Developers should extend addImpl to customize the adding 
functionality.
+   */
+  final def add(v: IN): Unit = {
+if (metadata != null && metadata.dataProperty) {
+  dataPropertyAdd(v)
+} else {
+  addImpl(v)
+}
+  }
+
+  private def dataPropertyAdd(v: IN): Unit = {
+// Add first for localValue & AccumulatorInfo
--- End diff --

why is this important?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...

2016-10-13 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/11105#discussion_r83271825
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -1343,7 +1403,25 @@ class SparkContext(config: SparkConf) extends 
Logging with ExecutorAllocationCli
*/
   def longAccumulator(name: String): LongAccumulator = {
 val acc = new LongAccumulator
-register(acc, name)
+register(acc, dataProperty = false, name)
+acc
+  }
+
+  /**
+   * Create and register a long accumulator, which starts with 0 and 
accumulates inputs by `+=`.
+   */
+  def dataPropertyLongAccumulator: LongAccumulator = {
--- End diff --

super subjective, but I don't like having a special method for this.  I 
think you should have another version of `longAccumulator` which takes a 
`dataProperty` argument (also marked `@Experimental`).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...

2016-10-13 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/11105#discussion_r83287419
  
--- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala ---
@@ -42,18 +60,43 @@ private[spark] case class AccumulatorMetadata(
  * `OUT` should be a type that can be read atomically (e.g., Int, Long), 
or thread-safely
  * (e.g., synchronized collections) because it will be read from other 
threads.
  */
-abstract class AccumulatorV2[IN, OUT] extends Serializable {
+abstract class AccumulatorV2[@specialized(Int, Long, Double) IN, OUT] 
extends Serializable {
   private[spark] var metadata: AccumulatorMetadata = _
-  private[this] var atDriverSide = true
+  private[spark] var atDriverSide = true
+
+  /**
+   * The following values are used for data property [[AccumulatorV2]]s.
+   * Data property [[AccumulatorV2]]s have only-once semantics. These 
semantics are implemented
+   * by keeping track of which RDD id, shuffle id, and partition id the 
current function is
+   * processing in. If a partition is fully processed the results for that 
partition/shuffle/rdd
+   * combination are sent back to the driver. The driver keeps track of 
which rdd/shuffle/partitions
+   * already have been applied, and only combines values into value_ if 
the rdd/shuffle/partition
+   * has not already been aggregated on the driver program
+   */
+  // For data property accumulators pending and processed updates.
+  // Pending and processed are keyed by (rdd id, shuffle id, partition id)
+  private[spark] lazy val pending =
+new mutable.HashMap[(Int, Int, Int), AccumulatorV2[IN, OUT]]()
+  // Completed contains the set of (rdd id, shuffle id, partition id) that 
have been
+  // fully processed on the worker side. This is used to determine if the 
updates should
+  // be sent back to the driver for a particular rdd/shuffle/partition 
combination.
+  private[spark] lazy val completed = new mutable.HashSet[(Int, Int, 
Int)]()
+  // Processed is keyed by (rdd id, shuffle id) and the value is a bitset 
containing all partitions
+  // for the given key which have been merged into the value. This is used 
on the driver.
+  @transient private[spark] lazy val processed = new mutable.HashMap[(Int, 
Int), mutable.BitSet]()
--- End diff --

I find the logic around `pending`, `completed` and `processed` really 
confusing.  I'm still thinking this through -- before making concrete 
suggestions I want to check my understanding.

`completed` should always only reflect the update from one task.  It should 
only be set on the executor.  At first I was going to say it shouldn't be a 
set, it should just be one value -- but of course one task can get updates from 
multiple rdds.  It will differ from the keyset of `pending` b/c pending can 
include updates from partitions that aren't completed (eg. those `take` and 
`coalesce` cases).

`processed` is only used on the driver.  Since it has global knowledge, it 
can use this determine whether each update is a repeat or not.

I think the comment whether updates should be sent back to the driver is 
incorrect -- it looks to me like the updates are always sent back to the 
driver, and the driver makes all of its determinations.

seems like `merge` (and thus `dataPropertyMerge`) is only called on the 
driver.  I'm trying to figure out whether or not it matters.  It seems like it 
would do something weird if it were called on the executor.  maybe we should 
add an `assert(isAtDriver)`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...

2016-10-13 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/11105#discussion_r83271147
  
--- Diff: core/src/main/scala/org/apache/spark/Accumulable.scala ---
@@ -56,15 +61,26 @@ class Accumulable[R, T] private (
 @transient private val initialValue: R,
 param: AccumulableParam[R, T],
 val name: Option[String],
-private[spark] val countFailedValues: Boolean)
+private[spark] val countFailedValues: Boolean,
+private[spark] val dataProperty: Boolean)
   extends Serializable {
 
   private[spark] def this(
   initialValue: R,
   param: AccumulableParam[R, T],
   name: Option[String],
+  countFailedValues: Boolean,
+  dataProperty: Boolean) = {
+this(AccumulatorContext.newId(), initialValue, param, name, 
countFailedValues, dataProperty)
+  }
+
+  private[spark] def this(
+  initialValue: R,
+  param: AccumulableParam[R, T],
+  name: Option[String],
   countFailedValues: Boolean) = {
-this(AccumulatorContext.newId(), initialValue, param, name, 
countFailedValues)
+this(AccumulatorContext.newId(), initialValue, param, name, 
countFailedValues,
+  false /* dataProperty */)
--- End diff --

`dataProperty = false`

(though as I mentioned elsewhere, I think you shouldn't have this option on 
the old api)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...

2016-10-13 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/11105#discussion_r83272894
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala 
---
@@ -34,8 +35,29 @@ private[spark] class MapPartitionsRDD[U: ClassTag, T: 
ClassTag](
 
   override def getPartitions: Array[Partition] = firstParent[T].partitions
 
-  override def compute(split: Partition, context: TaskContext): 
Iterator[U] =
-f(context, split.index, firstParent[T].iterator(split, context))
+  override def compute(split: Partition, context: TaskContext): 
Iterator[U] = {
+val input = firstParent[T].iterator(split, context)
+// If our task has data property accumulators we need to keep track of 
which partitions
+// are fully processed and what we are currently processing.
+if (context.taskMetrics.hasDataPropertyAccumulators()) {
+  // Use -1 for shuffle id since we are not in a shuffle.
+  val shuffleWriteId = -1
+  // Set the ID of the RDD and partition being processed. We need to 
do this per
+  // element since we chain the iterator transformations together
+  val data = input.map{x =>
+context.setRDDPartitionInfo(id, shuffleWriteId, split.index)
+x
+  }
+  val wrappedData = Utils.signalWhenEmpty(data,
+() => context.taskMetrics.markFullyProcessed(id, shuffleWriteId, 
split.index))
+  // We also set it before the first call to the user function in case 
the user provides a
+  // function which access a data property accumulator before 
accessing any elements.
+  context.setRDDPartitionInfo(id, shuffleWriteId, split.index)
+  f(context, split.index, wrappedData)
+} else {
+  f(context, split.index, input)
--- End diff --

I'd add a comment here if there aren't any data property accumulators, we 
want to avoid the overhead that comes with the the extra iterator wrapping.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...

2016-10-13 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/11105#discussion_r83291225
  
--- Diff: core/src/main/scala/org/apache/spark/TaskContextImpl.scala ---
@@ -126,4 +126,14 @@ private[spark] class TaskContextImpl(
 taskMetrics.registerAccumulator(a)
   }
 
+  private var rddPartitionInfo: (Int, Int, Int) = null
--- End diff --

one thing I think really needs to be documented somewhere is *why* we need 
to use these compound ids to track things, why there is a shuffle id, etc..  
Also carrying around this triple is a little cumbersome, so I was thinking that 
maybe it makes sense to create a case class for it, which would also identify 
the purpose.  I'm thinking of something like

```scala
/**
 * Identifies where an update for a data property accumulator update came 
from.  This is important to ensure that
  * that updates are not double-counted when rdds get recomputed.  When the 
executors send back data property
 * acucmualtor updates, they seperate out the updates per rdd or shuffle 
output generated by the task.  That
 * gives the driver sufficient info to ensure that each update is counted 
once.
 * [longer description of some of the tricky cases here, especially why we 
care about shuffle id].
*/
sealed trait TaskOutputId
case class RDDOutput(rddId: Int, partition: Int)
case class ShuffleMapOutput(shuffleId: Int, partition: Int)
```

doesn't have to be a sealed trait, could just be one class as well.  think 
this would help?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...

2016-10-13 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/11105#discussion_r83279340
  
--- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala ---
@@ -136,15 +179,76 @@ abstract class AccumulatorV2[IN, OUT] extends 
Serializable {
   def reset(): Unit
 
   /**
+   * Takes the inputs and accumulates. e.g. it can be a simple `+=` for 
counter accumulator.
+   * Developers should extend addImpl to customize the adding 
functionality.
+   */
+  final def add(v: IN): Unit = {
+if (metadata != null && metadata.dataProperty) {
+  dataPropertyAdd(v)
+} else {
+  addImpl(v)
+}
+  }
+
+  private def dataPropertyAdd(v: IN): Unit = {
+// Add first for localValue & AccumulatorInfo
+addImpl(v)
+if (metadata != null && metadata.dataProperty) {
+  val updateInfo = TaskContext.get().getRDDPartitionInfo()
+  val base = pending.getOrElse(updateInfo, copyAndReset())
+  base.atDriverSide = false
--- End diff --

why is this needed?

I see tests fail w/out it, but don't understand why old handling in 
`readObject` aren't working here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...

2016-10-13 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/11105#discussion_r83270945
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -1251,7 +1251,22 @@ class SparkContext(config: SparkConf) extends 
Logging with ExecutorAllocationCli
*/
   @deprecated("use AccumulatorV2", "2.0.0")
   def accumulator[T](initialValue: T)(implicit param: 
AccumulatorParam[T]): Accumulator[T] = {
-val acc = new Accumulator(initialValue, param)
+accumulator(initialValue, dataProperty = false)
+  }
+
+  /**
+   * Create an [[org.apache.spark.Accumulator]] variable of a given type, 
which tasks can "add"
+   * values to using the `+=` method. Only the driver can access the 
accumulator's `value`.
+   *
+   * @param dataProperty If the accumulator should avoid re-counting 
multiple evaluations on the
+   * same RDD/partition. This adds some additional 
overhead for tracking and
+   * is an experimental feature.
+   */
+  @deprecated("use AccumulatorV2", "2.1.0")
--- End diff --

what's the argument for adding it to the old api?  Though I see the 
temptation, since its a nice new feature and we want to support users who 
haven't migrated yet -- I really don't think it makes sense to be adding stuff 
to a deprecated api.  It will make users think its not really deprecated.  Plus 
you provide a nice carrot for moving to the new api ;)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...

2016-10-12 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/11105#discussion_r83101779
  
--- Diff: 
core/src/test/scala/org/apache/spark/DataPropertyAccumulatorSuite.scala ---
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.ref.WeakReference
+
+import org.scalatest.Matchers
+
+import org.apache.spark.scheduler._
+
+
+class DataPropertyAccumulatorSuite extends SparkFunSuite with Matchers 
with LocalSparkContext {
+  test("single partition") {
+sc = new SparkContext("local[2]", "test")
+val acc : Accumulator[Int] = sc.accumulator(0, dataProperty = true)
+
+val a = sc.parallelize(1 to 20, 1)
+val b = a.map{x => acc += x; x}
+b.cache()
+b.count()
+acc.value should be (210)
+  }
+
+  test("adding only the first element per partition should work even if 
partition is empty") {
+sc = new SparkContext("local[2]", "test")
+val acc: Accumulator[Int] = sc.accumulator(0, dataProperty = true)
+val a = sc.parallelize(1 to 20, 30)
+val b = a.mapPartitions{itr =>
+  acc += 1
+  itr
+}
+b.count()
+acc.value should be (30)
+  }
+
+  test("shuffled (combineByKey)") {
+sc = new SparkContext("local[2]", "test")
+val a = sc.parallelize(1 to 40, 5)
+val buckets = 4
+val b = a.map{x => ((x % buckets), x)}
+val inputs = List(b, b.repartition(10), b.partitionBy(new 
HashPartitioner(5))).map(_.cache())
+val mapSideCombines = List(true, false)
+inputs.foreach { input =>
+  mapSideCombines.foreach { mapSideCombine =>
+val accs = (1 to 4).map(x => sc.accumulator(0, dataProperty = 
true)).toList
+val raccs = (1 to 4).map(x => sc.accumulator(0, dataProperty = 
false)).toList
+val List(acc, acc1, acc2, acc3) = accs
+val List(racc, racc1, racc2, racc3) = raccs
+val c = input.combineByKey(
+  (x: Int) => {acc1 += 1; acc += 1; racc1 += 1; racc += 1; x},
+  {(a: Int, b: Int) => acc2 += 1; acc += 1; racc2 += 1; racc += 1; 
(a + b)},
+  {(a: Int, b: Int) => acc3 += 1; acc += 1; racc3 += 1; racc += 1; 
(a + b)},
+  new HashPartitioner(10),
+  mapSideCombine)
+val d = input.combineByKey(
+  (x: Int) => {acc1 += 1; acc += 1; x},
+  {(a: Int, b: Int) => acc2 += 1; acc += 1; (a + b)},
+  {(a: Int, b: Int) => acc3 += 1; acc += 1; (a + b)},
+  new HashPartitioner(2),
+  mapSideCombine)
+val e = d.map{x => acc += 1; x}
+c.count()
+// If our partitioner is known then we should only create
+// one combiner for each key value. Otherwise we should
+// create at least that many combiners.
+if (input.partitioner.isDefined) {
+  acc1.value should be (buckets)
+} else {
+  acc1.value should be >= (buckets)
+}
+if (input.partitioner.isDefined) {
+  acc2.value should be > (0)
+} else if (mapSideCombine) {
+  acc3.value should be > (0)
+} else {
+  acc2.value should be > (0)
+  acc3.value should be (0)
+}
+acc.value should be (acc1.value + acc2.value + acc3.value)
+val oldValues = accs.map(_.value)
+// For one action the data property accumulators and regular 
should have the same value.
+accs.map(_.value) should be (raccs.map(_.value))
+c.count()
+accs.map(_.value) should be (oldValues)
--- End diff --

@viirya sorry I am commenting really late here.  Do you still have your 
implementation?  It would be interesting to compare.

If I understand right, you are proposing that `TaskResult` would include 
which RDDs had 

[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...

2016-08-03 Thread techaddict
Github user techaddict commented on a diff in the pull request:

https://github.com/apache/spark/pull/11105#discussion_r73330940
  
--- Diff: core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala 
---
@@ -220,8 +220,27 @@ class TaskMetrics private[spark] () extends 
Serializable {
*/
   @transient private[spark] lazy val externalAccums = new 
ArrayBuffer[AccumulatorV2[_, _]]
 
+   /**
+* All data property accumulators registered with this task.
+*/
+   @transient private lazy val dataPropertyAccums = new 
ArrayBuffer[AccumulatorV2[_, _]]
+
   private[spark] def registerAccumulator(a: AccumulatorV2[_, _]): Unit = {
 externalAccums += a
+if (a.dataProperty) {
+  dataPropertyAccums += a
+}
+  }
+
+  private[spark] def hasDataPropertyAccumulators(): Boolean = {
+!dataPropertyAccums.isEmpty
--- End diff --

nit: could be nonEmpty


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...

2016-08-03 Thread techaddict
Github user techaddict commented on a diff in the pull request:

https://github.com/apache/spark/pull/11105#discussion_r73330741
  
--- Diff: core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala 
---
@@ -220,8 +220,27 @@ class TaskMetrics private[spark] () extends 
Serializable {
*/
   @transient private[spark] lazy val externalAccums = new 
ArrayBuffer[AccumulatorV2[_, _]]
 
+   /**
+* All data property accumulators registered with this task.
+*/
+   @transient private lazy val dataPropertyAccums = new 
ArrayBuffer[AccumulatorV2[_, _]]
+
   private[spark] def registerAccumulator(a: AccumulatorV2[_, _]): Unit = {
 externalAccums += a
+if (a.dataProperty) {
+  dataPropertyAccums += a
+}
+  }
+
+  private[spark] def hasDataPropertyAccumulators(): Boolean = {
+!dataPropertyAccums.isEmpty
+  }
+
+  /**
+   * Mark an rdd/shuffle/and partition as fully processed for all 
dataProperty accumulators.
+   */
+  private[spark] def markFullyProcessed(rddId: Int, shuffleWriteId: Int, 
partitionId: Int) = {
+dataPropertyAccums.map(_.markFullyProcessed(rddId, shuffleWriteId, 
partitionId))
--- End diff --

should be `foreach` instead of `map`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org