[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/11105#discussion_r90755993 --- Diff: core/src/test/scala/org/apache/spark/DataPropertyAccumulatorSuite.scala --- @@ -0,0 +1,395 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import scala.concurrent.ExecutionContext.Implicits.global +import scala.ref.WeakReference + +import org.scalatest.Matchers + +import org.apache.spark.scheduler._ +import org.apache.spark.util.{AccumulatorContext, AccumulatorMetadata, AccumulatorV2, LongAccumulator} + + +class DataPropertyAccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContext { --- End diff -- That sounds like a good plan, I'll try and give the tests some more descriptive names (or where that isn't enough explain in comments some more about the functionality they are testing). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/11105#discussion_r86493902 --- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala --- @@ -136,15 +181,92 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable { def reset(): Unit /** + * Takes the inputs and accumulates. e.g. it can be a simple `+=` for counter accumulator. + * Developers should extend addImpl to customize the adding functionality. + */ + final def add(v: IN): Unit = { --- End diff -- Or maybe just DataAccumulator since V1 - V2 was a big change and this really isn't all that big of a change. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/11105#discussion_r86493318 --- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala --- @@ -136,15 +181,92 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable { def reset(): Unit /** + * Takes the inputs and accumulates. e.g. it can be a simple `+=` for counter accumulator. + * Developers should extend addImpl to customize the adding functionality. + */ + final def add(v: IN): Unit = { --- End diff -- drat :( Do we want to do AccumulatorV3 and deprecate AccumulatorV2's constructor? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/11105#discussion_r86492811 --- Diff: core/src/test/scala/org/apache/spark/DataPropertyAccumulatorSuite.scala --- @@ -0,0 +1,383 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import scala.concurrent.ExecutionContext.Implicits.global +import scala.ref.WeakReference + +import org.scalatest.Matchers + +import org.apache.spark.scheduler._ +import org.apache.spark.util.{AccumulatorContext, AccumulatorMetadata, AccumulatorV2, LongAccumulator} + + +class DataPropertyAccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContext { + test("two partition old and new") { +sc = new SparkContext("local[2]", "test") +val acc = sc.longAccumulator(dataProperty = true, "l2") + +val a = sc.parallelize(1 to 20, 2) +val b = a.map{x => acc.add(x); x} +b.cache() +b.count() +acc.value should be (210) + } + + test("single partition") { +sc = new SparkContext("local[2]", "test") +val acc = sc.longAccumulator(dataProperty = true) + +val a = sc.parallelize(1 to 20, 1) +val b = a.map{x => acc.add(x); x} +b.cache() +b.count() +acc.value should be (210) + } + + test("adding only the first element per partition should work even if partition is empty") { +sc = new SparkContext("local[2]", "test") +val acc = sc.longAccumulator(dataProperty = true) +val a = sc.parallelize(1 to 20, 30) +val b = a.mapPartitions{itr => + acc.add(1) + itr +} +b.count() +acc.value should be (30) + } + + test("shuffled (combineByKey)") { +sc = new SparkContext("local[2]", "test") +val a = sc.parallelize(1L to 40L, 5) +val buckets = 4 +val b = a.map{x => ((x % buckets), x)} +val inputs = List(b, b.repartition(10), b.partitionBy(new HashPartitioner(5))).map(_.cache()) +val mapSideCombines = List(true, false) +inputs.foreach { input => + mapSideCombines.foreach { mapSideCombine => +val accs = (1 to 4).map(x => sc.longAccumulator(dataProperty = true)).toList +val raccs = (1 to 4).map(x => sc.longAccumulator(dataProperty = false)).toList +val List(acc, acc1, acc2, acc3) = accs +val List(racc, racc1, racc2, racc3) = raccs +val c = input.combineByKey( + (x: Long) => {acc1.add(1); acc.add(1); racc1.add(1); racc.add(1); x}, + {(a: Long, b: Long) => acc2.add(1); acc.add(1); racc2.add(1); racc.add(1); (a + b)}, + {(a: Long, b: Long) => acc3.add(1); acc.add(1); racc3.add(1); racc.add(1); (a + b)}, + new HashPartitioner(10), + mapSideCombine) +val d = input.combineByKey( + (x: Long) => {acc1.add(1); acc.add(1); x}, + {(a: Long, b: Long) => acc2.add(1); acc.add(1); (a + b)}, + {(a: Long, b: Long) => acc3.add(1); acc.add(1); (a + b)}, + new HashPartitioner(2), + mapSideCombine) +val e = d.map{x => acc.add(1); x} --- End diff -- Well there are two ways it could fail, `e.count()` look like duplicates from d - or if the `insertAll` wasn't happening the updates from the read side of `d` would look like they were coming from `e` and we would get double counting. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/11105#discussion_r86491369 --- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala --- @@ -136,15 +181,92 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable { def reset(): Unit /** + * Takes the inputs and accumulates. e.g. it can be a simple `+=` for counter accumulator. + * Developers should extend addImpl to customize the adding functionality. + */ + final def add(v: IN): Unit = { +if (metadata != null && metadata.dataProperty) { + dataPropertyAdd(v) +} else { + addImpl(v) +} + } + + private def dataPropertyAdd(v: IN): Unit = { +// To allow the user to be able to access the current accumulated value from their process +// worker side then we need to perform a "normal" add as well as the data property add. +addImpl(v) +// Add to the pending updates for data property +val updateInfo = TaskContext.get().getRDDPartitionInfo() +val base = pending.getOrElse(updateInfo, copyAndReset()) +// Since we may have constructed a new accumulator, set atDriverSide to false as the default +// new accumulators will have atDriverSide equal to true. +base.atDriverSide = false +base.addImpl(v) +pending(updateInfo) = base + } + + /** + * Mark a specific rdd/shuffle/partition as completely processed. This is a noop for + * non-data property accumuables. + */ + private[spark] def markFullyProcessed(taskOutputId: TaskOutputId): Unit = { +if (metadata.dataProperty) { + completed += taskOutputId +} + } + + /** + * Takes the inputs and accumulates. e.g. it can be a simple `+=` for counter accumulator. + * Developers should extend addImpl to customize the adding functionality. * Takes the inputs and accumulates. */ - def add(v: IN): Unit + protected[spark] def addImpl(v: IN) + + /** + * Merges another same-type accumulator into this one and update its state, i.e. this should be + * merge-in-place. Developers should extend mergeImpl to customize the merge functionality. + */ + final private[spark] lazy val merge: (AccumulatorV2[IN, OUT] => Unit) = { +assert(isAtDriverSide) +// Handle data property accumulators +if (metadata != null && metadata.dataProperty) { + dataPropertyMerge _ +} else { + mergeImpl _ +} + } + + final private[spark] def dataPropertyMerge(other: AccumulatorV2[IN, OUT]) = { +// Apply all foreach partitions regardless - they can only be fully evaluated +val unprocessed = other.pending.filter{ + case (ForeachOutputId(), v) => mergeImpl(v); false --- End diff -- Yes they would. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/11105#discussion_r86490667 --- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala --- @@ -42,18 +60,45 @@ private[spark] case class AccumulatorMetadata( * `OUT` should be a type that can be read atomically (e.g., Int, Long), or thread-safely * (e.g., synchronized collections) because it will be read from other threads. */ -abstract class AccumulatorV2[IN, OUT] extends Serializable { +abstract class AccumulatorV2[@specialized(Int, Long, Double) IN, OUT] extends Serializable { private[spark] var metadata: AccumulatorMetadata = _ - private[this] var atDriverSide = true + private[spark] var atDriverSide = true + + /** + * The following values are used for data property [[AccumulatorV2]]s. + * Data property [[AccumulatorV2]]s have only-once semantics. These semantics are implemented + * by keeping track of which RDD id, shuffle id, and partition id the current function is + * processing in. If a partition is fully processed the results for that partition/shuffle/rdd + * combination are sent back to the driver. The driver keeps track of which rdd/shuffle/partitions + * already have been applied, and only combines values into value_ if the rdd/shuffle/partition + * has not already been aggregated on the driver program + */ + // For data property accumulators pending and processed updates. + // Pending and processed are keyed by (rdd id, shuffle id, partition id) + private[spark] lazy val pending = +new mutable.HashMap[TaskOutputId, AccumulatorV2[IN, OUT]]() + // Completed contains the set of (rdd id, shuffle id, partition id) that have been + // fully processed on the worker side. This is used to determine if the updates should + // be merged on the driver for a particular rdd/shuffle/partition combination. + private[spark] lazy val completed = new mutable.HashSet[TaskOutputId]() + // rddProcessed is keyed by rdd id and the value is a bitset containing all partitions + // for the given key which have been merged into the value. This is used on the driver. + @transient private[spark] lazy val rddProcessed = new mutable.HashMap[Int, mutable.BitSet]() + // shuffleProcessed is the same as rddProcessed except keyed by shuffle id. + @transient private[spark] lazy val shuffleProcessed = new mutable.HashMap[Int, mutable.BitSet]() --- End diff -- `completedTaskOutputsForOneTask` & `pendingAccumulatorUpdatesFromOneTask` seem like really long variable names - how about `completedOutputsForTask` and `pendingAccumulatorUpdatesForTask`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/11105#discussion_r86490436 --- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala --- @@ -42,18 +60,45 @@ private[spark] case class AccumulatorMetadata( * `OUT` should be a type that can be read atomically (e.g., Int, Long), or thread-safely * (e.g., synchronized collections) because it will be read from other threads. */ -abstract class AccumulatorV2[IN, OUT] extends Serializable { +abstract class AccumulatorV2[@specialized(Int, Long, Double) IN, OUT] extends Serializable { private[spark] var metadata: AccumulatorMetadata = _ - private[this] var atDriverSide = true + private[spark] var atDriverSide = true + + /** + * The following values are used for data property [[AccumulatorV2]]s. + * Data property [[AccumulatorV2]]s have only-once semantics. These semantics are implemented + * by keeping track of which RDD id, shuffle id, and partition id the current function is + * processing in. If a partition is fully processed the results for that partition/shuffle/rdd + * combination are sent back to the driver. The driver keeps track of which rdd/shuffle/partitions + * already have been applied, and only combines values into value_ if the rdd/shuffle/partition + * has not already been aggregated on the driver program + */ + // For data property accumulators pending and processed updates. + // Pending and processed are keyed by (rdd id, shuffle id, partition id) + private[spark] lazy val pending = +new mutable.HashMap[TaskOutputId, AccumulatorV2[IN, OUT]]() + // Completed contains the set of (rdd id, shuffle id, partition id) that have been + // fully processed on the worker side. This is used to determine if the updates should + // be merged on the driver for a particular rdd/shuffle/partition combination. + private[spark] lazy val completed = new mutable.HashSet[TaskOutputId]() + // rddProcessed is keyed by rdd id and the value is a bitset containing all partitions + // for the given key which have been merged into the value. This is used on the driver. + @transient private[spark] lazy val rddProcessed = new mutable.HashMap[Int, mutable.BitSet]() + // shuffleProcessed is the same as rddProcessed except keyed by shuffle id. + @transient private[spark] lazy val shuffleProcessed = new mutable.HashMap[Int, mutable.BitSet]() --- End diff -- That understanding seems correct, indeed the merge function now asserts that it is at driver side (from the last time this comment was posted). As an aside I don't know what GitHub is doing/changing with the review features but it seems to be really weird on this PR for some reason. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/11105#discussion_r86490194 --- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala --- @@ -136,15 +181,92 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable { def reset(): Unit /** + * Takes the inputs and accumulates. e.g. it can be a simple `+=` for counter accumulator. + * Developers should extend addImpl to customize the adding functionality. + */ + final def add(v: IN): Unit = { --- End diff -- Yah there isn't a way to do this for everyone without breaking binary compatibility. Given that the AccumulatorV2 API was added in Spark2.0 though my only argument is that I don't expect anyone would be using this yet (given the current usability of accumulators) and maybe it would be ok if we mention it in the release notes? But if you think the best path forward is subclassing that makes sense too. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/11105#discussion_r86489315 --- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala --- @@ -136,15 +181,92 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable { def reset(): Unit /** + * Takes the inputs and accumulates. e.g. it can be a simple `+=` for counter accumulator. + * Developers should extend addImpl to customize the adding functionality. + */ + final def add(v: IN): Unit = { +if (metadata != null && metadata.dataProperty) { + dataPropertyAdd(v) +} else { + addImpl(v) +} + } + + private def dataPropertyAdd(v: IN): Unit = { +// To allow the user to be able to access the current accumulated value from their process +// worker side then we need to perform a "normal" add as well as the data property add. +addImpl(v) +// Add to the pending updates for data property +val updateInfo = TaskContext.get().getRDDPartitionInfo() +val base = pending.getOrElse(updateInfo, copyAndReset()) +// Since we may have constructed a new accumulator, set atDriverSide to false as the default +// new accumulators will have atDriverSide equal to true. +base.atDriverSide = false +base.addImpl(v) +pending(updateInfo) = base + } + + /** + * Mark a specific rdd/shuffle/partition as completely processed. This is a noop for + * non-data property accumuables. + */ + private[spark] def markFullyProcessed(taskOutputId: TaskOutputId): Unit = { +if (metadata.dataProperty) { + completed += taskOutputId +} + } + + /** + * Takes the inputs and accumulates. e.g. it can be a simple `+=` for counter accumulator. + * Developers should extend addImpl to customize the adding functionality. * Takes the inputs and accumulates. */ - def add(v: IN): Unit + protected[spark] def addImpl(v: IN) + + /** + * Merges another same-type accumulator into this one and update its state, i.e. this should be + * merge-in-place. Developers should extend mergeImpl to customize the merge functionality. + */ + final private[spark] lazy val merge: (AccumulatorV2[IN, OUT] => Unit) = { +assert(isAtDriverSide) +// Handle data property accumulators +if (metadata != null && metadata.dataProperty) { + dataPropertyMerge _ +} else { + mergeImpl _ +} + } + + final private[spark] def dataPropertyMerge(other: AccumulatorV2[IN, OUT]) = { +// Apply all foreach partitions regardless - they can only be fully evaluated +val unprocessed = other.pending.filter{ + case (ForeachOutputId(), v) => mergeImpl(v); false + case _ => true +} +val term = unprocessed.filter{case (k, v) => other.completed.contains(k)} +term.flatMap { + case (RDDOutputId(rddId, splitId), v) => +Some((rddProcessed, rddId, splitId, v)) + case (ShuffleMapOutputId(shuffleWriteId, splitId), v) => +Some((shuffleProcessed, shuffleWriteId, splitId, v)) + case _ => // We won't ever hit this case but avoid compiler warnings +None +}.foreach { + case (processed, id, splitId, v) => + val splits = processed.getOrElseUpdate(id, new mutable.BitSet()) + if (!splits.contains(splitId)) { +splits += splitId +mergeImpl(v) + } +} --- End diff -- this methods feels more complicated than it needs to be -- what do you think of this version? there is a small amount of duplication but I think its easier to follow: ```scala final private[spark] def dataPropertyMerge(other: AccumulatorV2[IN, OUT]) = { def mergeAccumUpdateAndMarkOutputAsProcessed( partitionsAlreadyMerged: mutable.BitSet, outputId: TaskOutputId, accumUpdate: AccumulatorV2[IN, OUT] ): Unit = { // we don't merge in accumulator updates from an incomplete accumulator update, eg. a take() // which only partially reads an rdd partition if (other.completed.contains(outputId)) { // has this partition been processed before? if (!partitionsAlreadyMerged.contains(outputId.partition)) { partitionsAlreadyMerged += outputId.partition mergeImpl(accumUpdate) } } } other.pending.foreach { // Apply all foreach partitions regardless - they can only be fully evaluated case (ForeachOutputId, accumUpdate) => mergeImpl(accumUpdate) // For RDDs & shuffles, apply the accumulator updates as long as the output is complete
[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/11105#discussion_r86487454 --- Diff: core/src/main/scala/org/apache/spark/TaskContext.scala --- @@ -69,6 +69,20 @@ object TaskContext { } } +/** + * Identifies where an update for a data property accumulator update came from. This is important + * to ensure that updates are not double-counted when rdds get recomputed. When the executors send + * back data property acucmualtor updates, they seperate out the updates per rdd or shuffle output + * generated by the task. That gives the driver sufficient info to ensure that each update is + * counted once. The shuffleId is important since two seperate shuffle actions could happen on the + * same RDD and the RDD ID and partition ID with different accumulators. + * Any accumulators used inside of `runJob` directly are always counted because there is no + * resubmition of `runJob`/`foreach`. +*/ +private[spark] sealed trait TaskOutputId +private[spark] case class RDDOutputId(rddId: Int, partition: Int) extends TaskOutputId +private[spark] case class ShuffleMapOutputId(shuffleId: Int, partition: Int) extends TaskOutputId +private[spark] case class ForeachOutputId() extends TaskOutputId --- End diff -- can be a `case object` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/11105#discussion_r86486890 --- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala --- @@ -136,15 +181,92 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable { def reset(): Unit /** + * Takes the inputs and accumulates. e.g. it can be a simple `+=` for counter accumulator. + * Developers should extend addImpl to customize the adding functionality. + */ + final def add(v: IN): Unit = { +if (metadata != null && metadata.dataProperty) { + dataPropertyAdd(v) +} else { + addImpl(v) +} + } + + private def dataPropertyAdd(v: IN): Unit = { +// To allow the user to be able to access the current accumulated value from their process +// worker side then we need to perform a "normal" add as well as the data property add. +addImpl(v) +// Add to the pending updates for data property +val updateInfo = TaskContext.get().getRDDPartitionInfo() +val base = pending.getOrElse(updateInfo, copyAndReset()) +// Since we may have constructed a new accumulator, set atDriverSide to false as the default +// new accumulators will have atDriverSide equal to true. +base.atDriverSide = false +base.addImpl(v) +pending(updateInfo) = base + } + + /** + * Mark a specific rdd/shuffle/partition as completely processed. This is a noop for + * non-data property accumuables. + */ + private[spark] def markFullyProcessed(taskOutputId: TaskOutputId): Unit = { +if (metadata.dataProperty) { + completed += taskOutputId +} + } + + /** + * Takes the inputs and accumulates. e.g. it can be a simple `+=` for counter accumulator. + * Developers should extend addImpl to customize the adding functionality. * Takes the inputs and accumulates. */ - def add(v: IN): Unit + protected[spark] def addImpl(v: IN) + + /** + * Merges another same-type accumulator into this one and update its state, i.e. this should be + * merge-in-place. Developers should extend mergeImpl to customize the merge functionality. + */ + final private[spark] lazy val merge: (AccumulatorV2[IN, OUT] => Unit) = { +assert(isAtDriverSide) +// Handle data property accumulators +if (metadata != null && metadata.dataProperty) { + dataPropertyMerge _ +} else { + mergeImpl _ +} + } + + final private[spark] def dataPropertyMerge(other: AccumulatorV2[IN, OUT]) = { +// Apply all foreach partitions regardless - they can only be fully evaluated +val unprocessed = other.pending.filter{ + case (ForeachOutputId(), v) => mergeImpl(v); false --- End diff -- the idea is that if you use an accumulator in a foreach, you get the same behavior whether you use a data-property accumulator or not, right? that makes sense. before this last change, would data-property accumulators just get totally ignored in a foreach? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/11105#discussion_r86486267 --- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala --- @@ -136,15 +181,92 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable { def reset(): Unit /** + * Takes the inputs and accumulates. e.g. it can be a simple `+=` for counter accumulator. + * Developers should extend addImpl to customize the adding functionality. + */ + final def add(v: IN): Unit = { --- End diff -- oh crap. you are going to hate me ... making this `final` is going to break binary compatibility. is there any other alternative? I will keep thinking but I can't come up with anything. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/11105#discussion_r86486502 --- Diff: core/src/main/scala/org/apache/spark/TaskContextImpl.scala --- @@ -126,4 +126,14 @@ private[spark] class TaskContextImpl( taskMetrics.registerAccumulator(a) } + private var rddPartitionInfo: TaskOutputId = null --- End diff -- rename to `taskOutputInfo`, since with the new naming sometimes this is an `RDDPartitionInfo`, sometimes a `SuffleMapOutputId` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/11105#discussion_r86487664 --- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala --- @@ -136,15 +181,92 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable { def reset(): Unit /** + * Takes the inputs and accumulates. e.g. it can be a simple `+=` for counter accumulator. + * Developers should extend addImpl to customize the adding functionality. + */ + final def add(v: IN): Unit = { +if (metadata != null && metadata.dataProperty) { + dataPropertyAdd(v) +} else { + addImpl(v) +} + } + + private def dataPropertyAdd(v: IN): Unit = { +// To allow the user to be able to access the current accumulated value from their process +// worker side then we need to perform a "normal" add as well as the data property add. +addImpl(v) +// Add to the pending updates for data property +val updateInfo = TaskContext.get().getRDDPartitionInfo() +val base = pending.getOrElse(updateInfo, copyAndReset()) +// Since we may have constructed a new accumulator, set atDriverSide to false as the default +// new accumulators will have atDriverSide equal to true. +base.atDriverSide = false +base.addImpl(v) +pending(updateInfo) = base + } + + /** + * Mark a specific rdd/shuffle/partition as completely processed. This is a noop for + * non-data property accumuables. + */ + private[spark] def markFullyProcessed(taskOutputId: TaskOutputId): Unit = { +if (metadata.dataProperty) { + completed += taskOutputId +} + } + + /** + * Takes the inputs and accumulates. e.g. it can be a simple `+=` for counter accumulator. + * Developers should extend addImpl to customize the adding functionality. * Takes the inputs and accumulates. */ - def add(v: IN): Unit + protected[spark] def addImpl(v: IN) + + /** + * Merges another same-type accumulator into this one and update its state, i.e. this should be + * merge-in-place. Developers should extend mergeImpl to customize the merge functionality. + */ + final private[spark] lazy val merge: (AccumulatorV2[IN, OUT] => Unit) = { +assert(isAtDriverSide) +// Handle data property accumulators +if (metadata != null && metadata.dataProperty) { + dataPropertyMerge _ +} else { + mergeImpl _ +} + } + + final private[spark] def dataPropertyMerge(other: AccumulatorV2[IN, OUT]) = { +// Apply all foreach partitions regardless - they can only be fully evaluated +val unprocessed = other.pending.filter{ + case (ForeachOutputId(), v) => mergeImpl(v); false + case _ => true +} +val term = unprocessed.filter{case (k, v) => other.completed.contains(k)} --- End diff -- rename `term` to `accumUpdatesFromUnmergedTaskOutputs`, and include a comment like "Only take updates for task outputs we haven't seen before. So if this task computed two rdds, but one of them had been computed previously, only take the accumulator updates from the other one." obvious to you by now but its the most important part so worth spelling it out :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/11105#discussion_r86486618 --- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala --- @@ -136,15 +181,92 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable { def reset(): Unit /** + * Takes the inputs and accumulates. e.g. it can be a simple `+=` for counter accumulator. + * Developers should extend addImpl to customize the adding functionality. + */ + final def add(v: IN): Unit = { +if (metadata != null && metadata.dataProperty) { + dataPropertyAdd(v) +} else { + addImpl(v) +} + } + + private def dataPropertyAdd(v: IN): Unit = { +// To allow the user to be able to access the current accumulated value from their process +// worker side then we need to perform a "normal" add as well as the data property add. +addImpl(v) +// Add to the pending updates for data property +val updateInfo = TaskContext.get().getRDDPartitionInfo() +val base = pending.getOrElse(updateInfo, copyAndReset()) +// Since we may have constructed a new accumulator, set atDriverSide to false as the default +// new accumulators will have atDriverSide equal to true. +base.atDriverSide = false +base.addImpl(v) +pending(updateInfo) = base + } + + /** + * Mark a specific rdd/shuffle/partition as completely processed. This is a noop for + * non-data property accumuables. --- End diff -- Maybe add something like "see [[TaskOutputId]] for an explanation of why this is important for data property accumulators"? I worry the purpose of this may seem a little mysterious on its own ... figure we should at least point to the more complete description. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/11105#discussion_r86486669 --- Diff: core/src/main/scala/org/apache/spark/TaskContext.scala --- @@ -69,6 +69,20 @@ object TaskContext { } } +/** + * Identifies where an update for a data property accumulator update came from. This is important + * to ensure that updates are not double-counted when rdds get recomputed. When the executors send + * back data property acucmualtor updates, they seperate out the updates per rdd or shuffle output + * generated by the task. That gives the driver sufficient info to ensure that each update is + * counted once. The shuffleId is important since two seperate shuffle actions could happen on the + * same RDD and the RDD ID and partition ID with different accumulators. + * Any accumulators used inside of `runJob` directly are always counted because there is no + * resubmition of `runJob`/`foreach`. --- End diff -- Can you add something here about why this is important for tracking incomplete partitions, eg. with repeatedly partially computing partitions with `take()`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/11105#discussion_r86486810 --- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala --- @@ -136,15 +181,92 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable { def reset(): Unit /** + * Takes the inputs and accumulates. e.g. it can be a simple `+=` for counter accumulator. + * Developers should extend addImpl to customize the adding functionality. + */ + final def add(v: IN): Unit = { +if (metadata != null && metadata.dataProperty) { + dataPropertyAdd(v) +} else { + addImpl(v) +} + } + + private def dataPropertyAdd(v: IN): Unit = { +// To allow the user to be able to access the current accumulated value from their process +// worker side then we need to perform a "normal" add as well as the data property add. +addImpl(v) +// Add to the pending updates for data property +val updateInfo = TaskContext.get().getRDDPartitionInfo() +val base = pending.getOrElse(updateInfo, copyAndReset()) --- End diff -- rename `base` to `mergedAccumulatorForOneTaskOutput` (or something like that? I just find `base` way to generic.) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/11105#discussion_r86486722 --- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala --- @@ -136,15 +181,92 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable { def reset(): Unit /** + * Takes the inputs and accumulates. e.g. it can be a simple `+=` for counter accumulator. + * Developers should extend addImpl to customize the adding functionality. + */ + final def add(v: IN): Unit = { --- End diff -- ok maybe another subclass of `AccumulatorV2`? sucks that we woudn't get data property goodness in all accumulators automatically, though --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/11105#discussion_r86488263 --- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala --- @@ -136,15 +181,92 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable { def reset(): Unit /** + * Takes the inputs and accumulates. e.g. it can be a simple `+=` for counter accumulator. + * Developers should extend addImpl to customize the adding functionality. + */ + final def add(v: IN): Unit = { +if (metadata != null && metadata.dataProperty) { + dataPropertyAdd(v) +} else { + addImpl(v) +} + } + + private def dataPropertyAdd(v: IN): Unit = { +// To allow the user to be able to access the current accumulated value from their process +// worker side then we need to perform a "normal" add as well as the data property add. +addImpl(v) +// Add to the pending updates for data property +val updateInfo = TaskContext.get().getRDDPartitionInfo() +val base = pending.getOrElse(updateInfo, copyAndReset()) +// Since we may have constructed a new accumulator, set atDriverSide to false as the default +// new accumulators will have atDriverSide equal to true. +base.atDriverSide = false +base.addImpl(v) +pending(updateInfo) = base + } + + /** + * Mark a specific rdd/shuffle/partition as completely processed. This is a noop for + * non-data property accumuables. + */ + private[spark] def markFullyProcessed(taskOutputId: TaskOutputId): Unit = { +if (metadata.dataProperty) { + completed += taskOutputId +} + } + + /** + * Takes the inputs and accumulates. e.g. it can be a simple `+=` for counter accumulator. + * Developers should extend addImpl to customize the adding functionality. * Takes the inputs and accumulates. */ - def add(v: IN): Unit + protected[spark] def addImpl(v: IN) + + /** + * Merges another same-type accumulator into this one and update its state, i.e. this should be + * merge-in-place. Developers should extend mergeImpl to customize the merge functionality. + */ + final private[spark] lazy val merge: (AccumulatorV2[IN, OUT] => Unit) = { +assert(isAtDriverSide) +// Handle data property accumulators +if (metadata != null && metadata.dataProperty) { + dataPropertyMerge _ +} else { + mergeImpl _ +} + } + + final private[spark] def dataPropertyMerge(other: AccumulatorV2[IN, OUT]) = { --- End diff -- its worth noting there is an asymmetry here that you might not expect from a `merge` method -- `this` has to be the accumulator that was created originally on the driver (so you access the right `rddProcessed` and `shuffleProcessed`), and `other` has to be the one from the task update (so you access the right `other.pending` and `other.completed`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/11105#discussion_r86486359 --- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala --- @@ -42,18 +60,45 @@ private[spark] case class AccumulatorMetadata( * `OUT` should be a type that can be read atomically (e.g., Int, Long), or thread-safely * (e.g., synchronized collections) because it will be read from other threads. */ -abstract class AccumulatorV2[IN, OUT] extends Serializable { +abstract class AccumulatorV2[@specialized(Int, Long, Double) IN, OUT] extends Serializable { private[spark] var metadata: AccumulatorMetadata = _ - private[this] var atDriverSide = true + private[spark] var atDriverSide = true + + /** + * The following values are used for data property [[AccumulatorV2]]s. + * Data property [[AccumulatorV2]]s have only-once semantics. These semantics are implemented + * by keeping track of which RDD id, shuffle id, and partition id the current function is + * processing in. If a partition is fully processed the results for that partition/shuffle/rdd + * combination are sent back to the driver. The driver keeps track of which rdd/shuffle/partitions + * already have been applied, and only combines values into value_ if the rdd/shuffle/partition + * has not already been aggregated on the driver program + */ + // For data property accumulators pending and processed updates. + // Pending and processed are keyed by (rdd id, shuffle id, partition id) + private[spark] lazy val pending = +new mutable.HashMap[TaskOutputId, AccumulatorV2[IN, OUT]]() + // Completed contains the set of (rdd id, shuffle id, partition id) that have been + // fully processed on the worker side. This is used to determine if the updates should + // be merged on the driver for a particular rdd/shuffle/partition combination. + private[spark] lazy val completed = new mutable.HashSet[TaskOutputId]() + // rddProcessed is keyed by rdd id and the value is a bitset containing all partitions + // for the given key which have been merged into the value. This is used on the driver. + @transient private[spark] lazy val rddProcessed = new mutable.HashMap[Int, mutable.BitSet]() + // shuffleProcessed is the same as rddProcessed except keyed by shuffle id. + @transient private[spark] lazy val shuffleProcessed = new mutable.HashMap[Int, mutable.BitSet]() --- End diff -- I had a comment here earlier to check my understanding of this -- github is being really weird with these comments so I'm going to repost: I find the logic around pending, completed and processed really confusing. I'm still thinking this through -- before making concrete suggestions I want to check my understanding. completed should always only reflect the update from one task. It should only be set on the executor. At first I was going to say it shouldn't be a set, it should just be one value -- but of course one task can get updates from multiple rdds. It will differ from the keyset of pending b/c pending can include updates from partitions that aren't completed (eg. those take and coalesce cases). `processed` is only used on the driver. Since it has global knowledge, it can use this determine whether each update is a repeat or not. (now this is `rddProcessed` and `shuffleProcessed`.) I think the comment "If a partition is fully processed the results for that partition/shuffle/rdd combination are sent back to the driver" is incorrect -- it looks to me like the updates are always sent back to the driver, and the driver always makes the call on whether or not to merge. seems like merge (and thus dataPropertyMerge) is only called on the driver. I'm trying to figure out whether or not it matters. It seems like it would do something weird if it were called on the executor. maybe we should add an assert(isAtDriver). Assuming that is correct, I'd suggest some renamings: `completed` to `completedTaskOutputsForOneTask` and include a comment that its keyset differs from `pending` b/c some of the updates in pending may be from incomplete partition processing. `pending` to `pendingAccumulatorUpdatesFromOneTask` and have the comment mention that this tracks updates all accumulator updates from one task, but some of them may be ignored if the output they represent isn't fully computed, eg. an rdd.take() leads to incomplete partition processing. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticke
[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/11105#discussion_r86487826 --- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala --- @@ -136,15 +181,92 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable { def reset(): Unit /** + * Takes the inputs and accumulates. e.g. it can be a simple `+=` for counter accumulator. + * Developers should extend addImpl to customize the adding functionality. + */ + final def add(v: IN): Unit = { +if (metadata != null && metadata.dataProperty) { + dataPropertyAdd(v) +} else { + addImpl(v) +} + } + + private def dataPropertyAdd(v: IN): Unit = { +// To allow the user to be able to access the current accumulated value from their process +// worker side then we need to perform a "normal" add as well as the data property add. +addImpl(v) +// Add to the pending updates for data property +val updateInfo = TaskContext.get().getRDDPartitionInfo() +val base = pending.getOrElse(updateInfo, copyAndReset()) +// Since we may have constructed a new accumulator, set atDriverSide to false as the default +// new accumulators will have atDriverSide equal to true. +base.atDriverSide = false +base.addImpl(v) +pending(updateInfo) = base + } + + /** + * Mark a specific rdd/shuffle/partition as completely processed. This is a noop for + * non-data property accumuables. + */ + private[spark] def markFullyProcessed(taskOutputId: TaskOutputId): Unit = { +if (metadata.dataProperty) { + completed += taskOutputId +} + } + + /** + * Takes the inputs and accumulates. e.g. it can be a simple `+=` for counter accumulator. + * Developers should extend addImpl to customize the adding functionality. * Takes the inputs and accumulates. */ - def add(v: IN): Unit + protected[spark] def addImpl(v: IN) + + /** + * Merges another same-type accumulator into this one and update its state, i.e. this should be + * merge-in-place. Developers should extend mergeImpl to customize the merge functionality. + */ + final private[spark] lazy val merge: (AccumulatorV2[IN, OUT] => Unit) = { +assert(isAtDriverSide) +// Handle data property accumulators +if (metadata != null && metadata.dataProperty) { + dataPropertyMerge _ +} else { + mergeImpl _ +} + } + + final private[spark] def dataPropertyMerge(other: AccumulatorV2[IN, OUT]) = { +// Apply all foreach partitions regardless - they can only be fully evaluated +val unprocessed = other.pending.filter{ + case (ForeachOutputId(), v) => mergeImpl(v); false + case _ => true +} +val term = unprocessed.filter{case (k, v) => other.completed.contains(k)} +term.flatMap { + case (RDDOutputId(rddId, splitId), v) => +Some((rddProcessed, rddId, splitId, v)) + case (ShuffleMapOutputId(shuffleWriteId, splitId), v) => +Some((shuffleProcessed, shuffleWriteId, splitId, v)) + case _ => // We won't ever hit this case but avoid compiler warnings +None +}.foreach { + case (processed, id, splitId, v) => --- End diff -- indentation is off --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/11105#discussion_r86487362 --- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala --- @@ -136,15 +181,92 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable { def reset(): Unit /** + * Takes the inputs and accumulates. e.g. it can be a simple `+=` for counter accumulator. + * Developers should extend addImpl to customize the adding functionality. + */ + final def add(v: IN): Unit = { +if (metadata != null && metadata.dataProperty) { + dataPropertyAdd(v) +} else { + addImpl(v) +} + } + + private def dataPropertyAdd(v: IN): Unit = { +// To allow the user to be able to access the current accumulated value from their process +// worker side then we need to perform a "normal" add as well as the data property add. +addImpl(v) +// Add to the pending updates for data property +val updateInfo = TaskContext.get().getRDDPartitionInfo() +val base = pending.getOrElse(updateInfo, copyAndReset()) +// Since we may have constructed a new accumulator, set atDriverSide to false as the default +// new accumulators will have atDriverSide equal to true. +base.atDriverSide = false +base.addImpl(v) +pending(updateInfo) = base + } + + /** + * Mark a specific rdd/shuffle/partition as completely processed. This is a noop for + * non-data property accumuables. + */ + private[spark] def markFullyProcessed(taskOutputId: TaskOutputId): Unit = { +if (metadata.dataProperty) { + completed += taskOutputId +} + } + + /** + * Takes the inputs and accumulates. e.g. it can be a simple `+=` for counter accumulator. + * Developers should extend addImpl to customize the adding functionality. * Takes the inputs and accumulates. */ - def add(v: IN): Unit + protected[spark] def addImpl(v: IN) + + /** + * Merges another same-type accumulator into this one and update its state, i.e. this should be + * merge-in-place. Developers should extend mergeImpl to customize the merge functionality. + */ + final private[spark] lazy val merge: (AccumulatorV2[IN, OUT] => Unit) = { +assert(isAtDriverSide) +// Handle data property accumulators +if (metadata != null && metadata.dataProperty) { + dataPropertyMerge _ +} else { + mergeImpl _ +} + } + + final private[spark] def dataPropertyMerge(other: AccumulatorV2[IN, OUT]) = { +// Apply all foreach partitions regardless - they can only be fully evaluated +val unprocessed = other.pending.filter{ --- End diff -- style nit: spaces around `{` (a couple other places in this method too) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/11105#discussion_r86484995 --- Diff: core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala --- @@ -104,10 +105,26 @@ class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag]( } override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = { +// Use -1 for our Shuffle ID since we are on the read side of the shuffle. +val shuffleWriteId = -1 +// If our task has data property accumulators we need to keep track of which partitions +// we are processing. +if (context.taskMetrics.hasDataPropertyAccumulators()) { + context.setRDDPartitionInfo(id, shuffleWriteId, split.index) +} val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]] -SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context) +val itr = SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, + context) --- End diff -- man github formatting for these comments is super weird. It doesn't show up in the comments view, I only saw it in an email notifcation. Shuffled-read operations (aka reducers) get a bunch of blocks from the mappers, and then it merges those blocks together. But the mappers have already sorted the data, so really this only needs to be a merge of a bunch of sorted streams. It happens to do this now by just sorting the whole thing. But, there isn't any reason why the implementation couldn't change -- it could do the merge on the fly instead, pulling just the next key from each stream. It would run the combiner for all the records with the same key, and then push that one record out to the iterator. If it changed to do that, this would break. I think you are right, its covered by a test case (another comment on that below), so thats fine. But I think its worth expanding your comment that this is based on the *assumption* that the shuffle reader always processes the entire input, running all combiners, before returning an iterator. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/11105#discussion_r86484498 --- Diff: core/src/test/scala/org/apache/spark/DataPropertyAccumulatorSuite.scala --- @@ -0,0 +1,383 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import scala.concurrent.ExecutionContext.Implicits.global +import scala.ref.WeakReference + +import org.scalatest.Matchers + +import org.apache.spark.scheduler._ +import org.apache.spark.util.{AccumulatorContext, AccumulatorMetadata, AccumulatorV2, LongAccumulator} + + +class DataPropertyAccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContext { + test("two partition old and new") { +sc = new SparkContext("local[2]", "test") +val acc = sc.longAccumulator(dataProperty = true, "l2") + +val a = sc.parallelize(1 to 20, 2) +val b = a.map{x => acc.add(x); x} +b.cache() +b.count() +acc.value should be (210) + } + + test("single partition") { +sc = new SparkContext("local[2]", "test") +val acc = sc.longAccumulator(dataProperty = true) + +val a = sc.parallelize(1 to 20, 1) +val b = a.map{x => acc.add(x); x} +b.cache() +b.count() +acc.value should be (210) + } + + test("adding only the first element per partition should work even if partition is empty") { +sc = new SparkContext("local[2]", "test") +val acc = sc.longAccumulator(dataProperty = true) +val a = sc.parallelize(1 to 20, 30) +val b = a.mapPartitions{itr => + acc.add(1) + itr +} +b.count() +acc.value should be (30) + } + + test("shuffled (combineByKey)") { +sc = new SparkContext("local[2]", "test") +val a = sc.parallelize(1L to 40L, 5) +val buckets = 4 +val b = a.map{x => ((x % buckets), x)} +val inputs = List(b, b.repartition(10), b.partitionBy(new HashPartitioner(5))).map(_.cache()) +val mapSideCombines = List(true, false) +inputs.foreach { input => + mapSideCombines.foreach { mapSideCombine => +val accs = (1 to 4).map(x => sc.longAccumulator(dataProperty = true)).toList +val raccs = (1 to 4).map(x => sc.longAccumulator(dataProperty = false)).toList +val List(acc, acc1, acc2, acc3) = accs +val List(racc, racc1, racc2, racc3) = raccs +val c = input.combineByKey( + (x: Long) => {acc1.add(1); acc.add(1); racc1.add(1); racc.add(1); x}, + {(a: Long, b: Long) => acc2.add(1); acc.add(1); racc2.add(1); racc.add(1); (a + b)}, + {(a: Long, b: Long) => acc3.add(1); acc.add(1); racc3.add(1); racc.add(1); (a + b)}, + new HashPartitioner(10), + mapSideCombine) +val d = input.combineByKey( + (x: Long) => {acc1.add(1); acc.add(1); x}, + {(a: Long, b: Long) => acc2.add(1); acc.add(1); (a + b)}, + {(a: Long, b: Long) => acc3.add(1); acc.add(1); (a + b)}, + new HashPartitioner(2), + mapSideCombine) +val e = d.map{x => acc.add(1); x} --- End diff -- related to the discussion about the iterator wrapping in ShuffledRDD -- this looks like the only test case that is related to the chaining of a shufflerdd plus another rdd.map, both with data accumulators. Is this case sufficient? I think it is sufficient, though its not totally obvious to me, at least. If it didn't work, than after the call to `d.count()`, when you later do `e.count()`, the updates from `e` would look like they were duplicate updates from `d` and get ignored. does that sounds right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.o
[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/11105#discussion_r86483290 --- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala --- @@ -306,18 +428,19 @@ class LongAccumulator extends AccumulatorV2[jl.Long, jl.Long] { /** * Adds v to the accumulator, i.e. increment sum by v and count by 1. - * @since 2.0.0 + * Added for simplicity with adding non java Longs. + * @since 2.1.0 */ - override def add(v: jl.Long): Unit = { -_sum += v -_count += 1 + def add(v: Long): Unit = { --- End diff -- yeah I don't love the copied code either. The thing is, `@specialized` has bigger ramfications -- the actual classes that get used (eg. that you see in stack traces) are different, tricker to access the right class from java, there are a ton of generated classes when there is multiple inheritance (not relevant here at the moment, but perhaps in the future). I think having copied code is the lesser of two evils. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/11105#discussion_r86461055 --- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala --- @@ -42,18 +60,43 @@ private[spark] case class AccumulatorMetadata( * `OUT` should be a type that can be read atomically (e.g., Int, Long), or thread-safely * (e.g., synchronized collections) because it will be read from other threads. */ -abstract class AccumulatorV2[IN, OUT] extends Serializable { +abstract class AccumulatorV2[@specialized(Int, Long, Double) IN, OUT] extends Serializable { --- End diff -- So we could do that, but would the specialized annotation maybe easier to maintain? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/11105#discussion_r86460868 --- Diff: core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala --- @@ -104,10 +105,26 @@ class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag]( } override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = { +// Use -1 for our Shuffle ID since we are on the read side of the shuffle. +val shuffleWriteId = -1 +// If our task has data property accumulators we need to keep track of which partitions +// we are processing. +if (context.taskMetrics.hasDataPropertyAccumulators()) { + context.setRDDPartitionInfo(id, shuffleWriteId, split.index) +} val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]] -SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context) +val itr = SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, + context) --- End diff -- So unless the input was already sorted we will always have to read the entire input first. Also since we have tests for this if we do change this its a simple matter of updating the wrapper to be more like the rest should we ever change this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/11105#discussion_r86460332 --- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala --- @@ -306,18 +428,19 @@ class LongAccumulator extends AccumulatorV2[jl.Long, jl.Long] { /** * Adds v to the accumulator, i.e. increment sum by v and count by 1. - * @since 2.0.0 + * Added for simplicity with adding non java Longs. + * @since 2.1.0 */ - override def add(v: jl.Long): Unit = { -_sum += v -_count += 1 + def add(v: Long): Unit = { --- End diff -- So we could do that, but it seems like maybe having the specialized is better than copying that code? I don't have super strong feelings about this but just something to consider. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/11105#discussion_r86381839 --- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala --- @@ -42,18 +60,43 @@ private[spark] case class AccumulatorMetadata( * `OUT` should be a type that can be read atomically (e.g., Int, Long), or thread-safely * (e.g., synchronized collections) because it will be read from other threads. */ -abstract class AccumulatorV2[IN, OUT] extends Serializable { +abstract class AccumulatorV2[@specialized(Int, Long, Double) IN, OUT] extends Serializable { --- End diff -- hmm, it doesn't look this way to me. If I look at the compiled code for AccumulatorV2 in the spark2 binaries, there are no specialized methods. It looks like boxing does happen. I do see test failures in `AccumulatorV2Suite` without this change, but looks like that can be fixed by just tweaking the specialized versions of these functions that are manually defined in `DoubleAccumulator` and `LongAccumulator`, eg. ```scala /** * Adds v to the accumulator, i.e. increment sum by v and count by 1. * @since 2.1.0 */ def add(v: Double): Unit = { if (metadata != null && metadata.dataProperty) { dataPropertyAdd(v: jl.Double) } else { addImpl(v) } } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/11105#discussion_r86384551 --- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala --- @@ -306,18 +428,19 @@ class LongAccumulator extends AccumulatorV2[jl.Long, jl.Long] { /** * Adds v to the accumulator, i.e. increment sum by v and count by 1. - * @since 2.0.0 + * Added for simplicity with adding non java Longs. + * @since 2.1.0 */ - override def add(v: jl.Long): Unit = { -_sum += v -_count += 1 + def add(v: Long): Unit = { --- End diff -- see my comment earlier about `@specialized` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/11105#discussion_r86384061 --- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala --- @@ -22,17 +22,36 @@ import java.io.ObjectInputStream import java.util.{ArrayList, Collections} import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicLong +import javax.annotation.concurrent.GuardedBy --- End diff -- unused --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/11105#discussion_r86383782 --- Diff: core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala --- @@ -104,10 +105,26 @@ class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag]( } override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = { +// Use -1 for our Shuffle ID since we are on the read side of the shuffle. +val shuffleWriteId = -1 +// If our task has data property accumulators we need to keep track of which partitions +// we are processing. +if (context.taskMetrics.hasDataPropertyAccumulators()) { + context.setRDDPartitionInfo(id, shuffleWriteId, split.index) +} val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]] -SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context) +val itr = SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, + context) --- End diff -- I am looking closely at the combiner code to try to confirm this. I think I believe it, I don't think its *guaranteed* to be true in the future. Eg., right now the combiners do an `insertAll` into the `ExternalAppendOnlyMap` before reading from it. But there is no reason spark couldn't change so that what it actually does is just insert the *next* key from all incoming streams into the `ExternalAppendOnlyMap`, and then feed that one key to the downstream iterators. At the very least, we need a test to ensure this doesn't break if that internal implementation were to change. (Does a test like that already exist?) Again, I'm still mulling over whether there is even a good use to bother supporting this at all ... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/11105#discussion_r84380686 --- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala --- @@ -136,15 +179,76 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable { def reset(): Unit /** + * Takes the inputs and accumulates. e.g. it can be a simple `+=` for counter accumulator. + * Developers should extend addImpl to customize the adding functionality. + */ + final def add(v: IN): Unit = { +if (metadata != null && metadata.dataProperty) { + dataPropertyAdd(v) +} else { + addImpl(v) +} + } + + private def dataPropertyAdd(v: IN): Unit = { +// Add first for localValue & AccumulatorInfo --- End diff -- sorry I wasn't clear -- I meant why is "first" important. But i guess first isn't actually important, you're saying that *both* are needed. Can you update the comment to reflect that? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/11105#discussion_r86384641 --- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala --- @@ -383,16 +506,7 @@ class DoubleAccumulator extends AccumulatorV2[jl.Double, jl.Double] { * Adds v to the accumulator, i.e. increment sum by v and count by 1. * @since 2.0.0 */ - override def add(v: jl.Double): Unit = { -_sum += v -_count += 1 - } - - /** - * Adds v to the accumulator, i.e. increment sum by v and count by 1. - * @since 2.0.0 - */ - def add(v: Double): Unit = { --- End diff -- see comment on `@specialized` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/11105#discussion_r85860913 --- Diff: core/src/test/scala/org/apache/spark/DataPropertyAccumulatorSuite.scala --- @@ -0,0 +1,361 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import scala.concurrent.ExecutionContext.Implicits.global +import scala.ref.WeakReference + +import org.scalatest.Matchers + +import org.apache.spark.scheduler._ + + +class DataPropertyAccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContext { + test("single partition") { +sc = new SparkContext("local[2]", "test") +val acc : Accumulator[Int] = sc.accumulator(0, dataProperty = true) + +val a = sc.parallelize(1 to 20, 1) +val b = a.map{x => acc += x; x} +b.cache() +b.count() +acc.value should be (210) + } + + test("adding only the first element per partition should work even if partition is empty") { +sc = new SparkContext("local[2]", "test") +val acc: Accumulator[Int] = sc.accumulator(0, dataProperty = true) +val a = sc.parallelize(1 to 20, 30) +val b = a.mapPartitions{itr => + acc += 1 + itr +} +b.count() +acc.value should be (30) + } + + test("shuffled (combineByKey)") { +sc = new SparkContext("local[2]", "test") +val a = sc.parallelize(1 to 40, 5) +val buckets = 4 +val b = a.map{x => ((x % buckets), x)} +val inputs = List(b, b.repartition(10), b.partitionBy(new HashPartitioner(5))).map(_.cache()) +val mapSideCombines = List(true, false) +inputs.foreach { input => + mapSideCombines.foreach { mapSideCombine => +val accs = (1 to 4).map(x => sc.accumulator(0, dataProperty = true)).toList +val raccs = (1 to 4).map(x => sc.accumulator(0, dataProperty = false)).toList +val List(acc, acc1, acc2, acc3) = accs +val List(racc, racc1, racc2, racc3) = raccs +val c = input.combineByKey( + (x: Int) => {acc1 += 1; acc += 1; racc1 += 1; racc += 1; x}, + {(a: Int, b: Int) => acc2 += 1; acc += 1; racc2 += 1; racc += 1; (a + b)}, + {(a: Int, b: Int) => acc3 += 1; acc += 1; racc3 += 1; racc += 1; (a + b)}, + new HashPartitioner(10), + mapSideCombine) +val d = input.combineByKey( + (x: Int) => {acc1 += 1; acc += 1; x}, + {(a: Int, b: Int) => acc2 += 1; acc += 1; (a + b)}, + {(a: Int, b: Int) => acc3 += 1; acc += 1; (a + b)}, + new HashPartitioner(2), + mapSideCombine) +val e = d.map{x => acc += 1; x} +c.count() +// If our partitioner is known then we should only create +// one combiner for each key value. Otherwise we should +// create at least that many combiners. +if (input.partitioner.isDefined) { + acc1.value should be (buckets) +} else { + acc1.value should be >= (buckets) +} +if (input.partitioner.isDefined) { + acc2.value should be > (0) +} else if (mapSideCombine) { + acc3.value should be > (0) +} else { + acc2.value should be > (0) + acc3.value should be (0) +} +acc.value should be (acc1.value + acc2.value + acc3.value) +val oldValues = accs.map(_.value) +// For one action the data property accumulators and regular should have the same value. +accs.map(_.value) should be (raccs.map(_.value)) +c.count() +accs.map(_.value) should be (oldValues) --- End diff -- @squito That is a testing and playing implementation. Seems I don't push it to remote and I can not find it now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well.
[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/11105#discussion_r85805218 --- Diff: core/src/main/scala/org/apache/spark/util/JsonProtocol.scala --- @@ -722,6 +722,7 @@ private[spark] object JsonProtocol { val value = Utils.jsonOption(json \ "Value").map { v => accumValueFromJson(name, v) } val internal = (json \ "Internal").extractOpt[Boolean].getOrElse(false) val countFailedValues = (json \ "Count Failed Values").extractOpt[Boolean].getOrElse(false) +val dataProperty = (json \ "DataProperty").extractOpt[Boolean].getOrElse(false) --- End diff -- Done :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/11105#discussion_r83690354 --- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala --- @@ -136,15 +179,76 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable { def reset(): Unit /** + * Takes the inputs and accumulates. e.g. it can be a simple `+=` for counter accumulator. + * Developers should extend addImpl to customize the adding functionality. + */ + final def add(v: IN): Unit = { +if (metadata != null && metadata.dataProperty) { + dataPropertyAdd(v) +} else { + addImpl(v) +} + } + + private def dataPropertyAdd(v: IN): Unit = { +// Add first for localValue & AccumulatorInfo --- End diff -- If we want the user to be able to access the current accumulated value from their process worker side then we need to perform a "normal" add as well as the data property add. Also if we don't have a merge step (e.g. only one accumulator) this might make a difference (although I _think_ this won't be the case anymore with the refactor that happened I haven't tested it). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/11105#discussion_r83689480 --- Diff: core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala --- @@ -104,10 +105,26 @@ class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag]( } override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = { +// Use -1 for our Shuffle ID since we are on the read side of the shuffle. +val shuffleWriteId = -1 +// If our task has data property accumulators we need to keep track of which partitions +// we are processing. +if (context.taskMetrics.hasDataPropertyAccumulators()) { + context.setRDDPartitionInfo(id, shuffleWriteId, split.index) +} val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]] -SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context) +val itr = SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, + context) --- End diff -- So the short version is no, because the only user controlled code which happens underneath this is eagerly evaluated on the entire shuffle input - namely `combineValuesByKey` and `combineCombinersByKey` both consume the entire input before passing back control. I'll add this in a comment explaining this behaviour. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/11105#discussion_r83687095 --- Diff: core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala --- @@ -34,8 +35,29 @@ private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag]( override def getPartitions: Array[Partition] = firstParent[T].partitions - override def compute(split: Partition, context: TaskContext): Iterator[U] = -f(context, split.index, firstParent[T].iterator(split, context)) + override def compute(split: Partition, context: TaskContext): Iterator[U] = { +val input = firstParent[T].iterator(split, context) +// If our task has data property accumulators we need to keep track of which partitions +// are fully processed and what we are currently processing. +if (context.taskMetrics.hasDataPropertyAccumulators()) { + // Use -1 for shuffle id since we are not in a shuffle. + val shuffleWriteId = -1 + // Set the ID of the RDD and partition being processed. We need to do this per + // element since we chain the iterator transformations together + val data = input.map{x => +context.setRDDPartitionInfo(id, shuffleWriteId, split.index) +x + } + val wrappedData = Utils.signalWhenEmpty(data, +() => context.taskMetrics.markFullyProcessed(id, shuffleWriteId, split.index)) + // We also set it before the first call to the user function in case the user provides a + // function which access a data property accumulator before accessing any elements. + context.setRDDPartitionInfo(id, shuffleWriteId, split.index) + f(context, split.index, wrappedData) +} else { + f(context, split.index, input) --- End diff -- Makes sense I'll include this in the next update. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/11105#discussion_r83623396 --- Diff: core/src/main/scala/org/apache/spark/TaskContextImpl.scala --- @@ -126,4 +126,14 @@ private[spark] class TaskContextImpl( taskMetrics.registerAccumulator(a) } + private var rddPartitionInfo: (Int, Int, Int) = null --- End diff -- Yah that sounds pretty reasonable. I'll do this update next. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/11105#discussion_r83557436 --- Diff: core/src/main/scala/org/apache/spark/Accumulable.scala --- @@ -56,15 +61,26 @@ class Accumulable[R, T] private ( @transient private val initialValue: R, param: AccumulableParam[R, T], val name: Option[String], -private[spark] val countFailedValues: Boolean) +private[spark] val countFailedValues: Boolean, +private[spark] val dataProperty: Boolean) extends Serializable { private[spark] def this( initialValue: R, param: AccumulableParam[R, T], name: Option[String], + countFailedValues: Boolean, + dataProperty: Boolean) = { +this(AccumulatorContext.newId(), initialValue, param, name, countFailedValues, dataProperty) + } + + private[spark] def this( + initialValue: R, + param: AccumulableParam[R, T], + name: Option[String], countFailedValues: Boolean) = { -this(AccumulatorContext.newId(), initialValue, param, name, countFailedValues) +this(AccumulatorContext.newId(), initialValue, param, name, countFailedValues, + false /* dataProperty */) --- End diff -- So for reasons (which aren't super clear), we can't specify it this way (you can see Josh ran into similar troubles in some other places) - but since we aren't going to deal with the old API I'll just rip this part out anyways. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/11105#discussion_r83287789 --- Diff: core/src/main/scala/org/apache/spark/util/JsonProtocol.scala --- @@ -722,6 +722,7 @@ private[spark] object JsonProtocol { val value = Utils.jsonOption(json \ "Value").map { v => accumValueFromJson(name, v) } val internal = (json \ "Internal").extractOpt[Boolean].getOrElse(false) val countFailedValues = (json \ "Count Failed Values").extractOpt[Boolean].getOrElse(false) +val dataProperty = (json \ "DataProperty").extractOpt[Boolean].getOrElse(false) --- End diff -- can you add a test case to the backwards compatability section in `JsonProtocolSuite`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/11105#discussion_r83273608 --- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala --- @@ -42,18 +60,43 @@ private[spark] case class AccumulatorMetadata( * `OUT` should be a type that can be read atomically (e.g., Int, Long), or thread-safely * (e.g., synchronized collections) because it will be read from other threads. */ -abstract class AccumulatorV2[IN, OUT] extends Serializable { +abstract class AccumulatorV2[@specialized(Int, Long, Double) IN, OUT] extends Serializable { --- End diff -- unrelated to this change, right? if you think its important open another jira / pr. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/11105#discussion_r83290175 --- Diff: core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala --- @@ -104,10 +105,26 @@ class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag]( } override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = { +// Use -1 for our Shuffle ID since we are on the read side of the shuffle. +val shuffleWriteId = -1 +// If our task has data property accumulators we need to keep track of which partitions +// we are processing. +if (context.taskMetrics.hasDataPropertyAccumulators()) { + context.setRDDPartitionInfo(id, shuffleWriteId, split.index) +} val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]] -SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context) +val itr = SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, + context) --- End diff -- don't you need to wrap this iterator to reset the partitionInfo per record, like you do in `MapPartitionsRDD`? what if there is `rdd.reduceByKey(...).map(...)` with accumulator updates in both? The more I think about this, I am wondering if dataproperty accumulators should only be supported in map partitions rdd. (sorry I think long ago I argued for putting this in ... I am still not certain ...) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/11105#discussion_r83280578 --- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala --- @@ -136,15 +179,76 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable { def reset(): Unit /** + * Takes the inputs and accumulates. e.g. it can be a simple `+=` for counter accumulator. + * Developers should extend addImpl to customize the adding functionality. + */ + final def add(v: IN): Unit = { +if (metadata != null && metadata.dataProperty) { + dataPropertyAdd(v) +} else { + addImpl(v) +} + } + + private def dataPropertyAdd(v: IN): Unit = { +// Add first for localValue & AccumulatorInfo +addImpl(v) +if (metadata != null && metadata.dataProperty) { + val updateInfo = TaskContext.get().getRDDPartitionInfo() + val base = pending.getOrElse(updateInfo, copyAndReset()) + base.atDriverSide = false + base.addImpl(v) + pending(updateInfo) = base +} + } + + /** + * Mark a specific rdd/shuffle/partition as completely processed. This is a noop for + * non-data property accumuables. + */ + private[spark] def markFullyProcessed(rddId: Int, shuffleWriteId: Int, partitionId: Int): Unit = { +if (metadata.dataProperty) { + completed += ((rddId, shuffleWriteId, partitionId)) +} + } + + /** + * Takes the inputs and accumulates. e.g. it can be a simple `+=` for counter accumulator. + * Developers should extend addImpl to customize the adding functionality. * Takes the inputs and accumulates. */ - def add(v: IN): Unit + protected[spark] def addImpl(v: IN) /** * Merges another same-type accumulator into this one and update its state, i.e. this should be - * merge-in-place. + * merge-in-place. Developers should extend mergeImpl to customize the merge functionality. */ - def merge(other: AccumulatorV2[IN, OUT]): Unit + final private[spark] lazy val merge: (AccumulatorV2[IN, OUT] => Unit) = { +// Handle data property accumulators +if (metadata != null && metadata.dataProperty) { + dataPropertyMerge _ +} else { + mergeImpl _ +} + } + + final private[spark] def dataPropertyMerge(other: AccumulatorV2[IN, OUT]) = { +val term = other.pending.filter{case (k, v) => other.completed.contains(k)} --- End diff -- fix indentation --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/11105#discussion_r83271510 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -1314,27 +1347,54 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli } /** - * Register the given accumulator. Note that accumulators must be registered before use, or it + * Register the given accumulator. Note that accumulators must be registered before use, or it * will throw exception. */ def register(acc: AccumulatorV2[_, _]): Unit = { -acc.register(this) +acc.register(this, dataProperty = false) } /** - * Register the given accumulator with given name. Note that accumulators must be registered + * Register the given accumulator with given name. Note that accumulators must be registered * before use, or it will throw exception. + * + * @param name The name of accumulator. */ def register(acc: AccumulatorV2[_, _], name: String): Unit = { acc.register(this, name = Some(name)) } /** + * Register the given accumulator. Note that accumulators must be registered before use, or it + * will throw exception. + * + * @param dataProperty If the accumulator should avoid re-counting multiple evaluations on the + * same RDD/partition. This adds some additional overhead for tracking and + * is an experimental feature. + */ + def register(acc: AccumulatorV2[_, _], dataProperty: Boolean): Unit = { +acc.register(this, dataProperty = dataProperty) + } + + /** + * Register the given accumulator with given name. Note that accumulators must be registered + * before use, or it will throw exception. + * + * @param dataProperty If the accumulator should avoid re-counting multiple evaluations on the + * same RDD/partition. This adds some additional overhead for tracking and + * is an experimental feature. + * @param name The name of accumulator. + */ + def register(acc: AccumulatorV2[_, _], dataProperty: Boolean, name: String): Unit = { --- End diff -- I think both of these `register` methods which expose `dataProperty` at all should be marked `@Experimental`. probably elsewhere too, won't mention it other places. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/11105#discussion_r83287564 --- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala --- @@ -136,15 +179,76 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable { def reset(): Unit /** + * Takes the inputs and accumulates. e.g. it can be a simple `+=` for counter accumulator. + * Developers should extend addImpl to customize the adding functionality. + */ + final def add(v: IN): Unit = { +if (metadata != null && metadata.dataProperty) { + dataPropertyAdd(v) +} else { + addImpl(v) +} + } + + private def dataPropertyAdd(v: IN): Unit = { +// Add first for localValue & AccumulatorInfo +addImpl(v) +if (metadata != null && metadata.dataProperty) { + val updateInfo = TaskContext.get().getRDDPartitionInfo() + val base = pending.getOrElse(updateInfo, copyAndReset()) + base.atDriverSide = false + base.addImpl(v) + pending(updateInfo) = base +} + } + + /** + * Mark a specific rdd/shuffle/partition as completely processed. This is a noop for + * non-data property accumuables. + */ + private[spark] def markFullyProcessed(rddId: Int, shuffleWriteId: Int, partitionId: Int): Unit = { +if (metadata.dataProperty) { + completed += ((rddId, shuffleWriteId, partitionId)) +} + } + + /** + * Takes the inputs and accumulates. e.g. it can be a simple `+=` for counter accumulator. + * Developers should extend addImpl to customize the adding functionality. * Takes the inputs and accumulates. */ - def add(v: IN): Unit + protected[spark] def addImpl(v: IN) /** * Merges another same-type accumulator into this one and update its state, i.e. this should be - * merge-in-place. + * merge-in-place. Developers should extend mergeImpl to customize the merge functionality. */ - def merge(other: AccumulatorV2[IN, OUT]): Unit + final private[spark] lazy val merge: (AccumulatorV2[IN, OUT] => Unit) = { +// Handle data property accumulators --- End diff -- this comment is kinda pointless -- pretty obvious that is what the `if` is doing. What is really needed is an explanation of *why* -- maybe that belongs as a doc comment on `dataPropertyMerge`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/11105#discussion_r83284254 --- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala --- @@ -136,15 +179,76 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable { def reset(): Unit /** + * Takes the inputs and accumulates. e.g. it can be a simple `+=` for counter accumulator. + * Developers should extend addImpl to customize the adding functionality. + */ + final def add(v: IN): Unit = { +if (metadata != null && metadata.dataProperty) { + dataPropertyAdd(v) +} else { + addImpl(v) +} + } + + private def dataPropertyAdd(v: IN): Unit = { +// Add first for localValue & AccumulatorInfo --- End diff -- why is this important? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/11105#discussion_r83271825 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -1343,7 +1403,25 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli */ def longAccumulator(name: String): LongAccumulator = { val acc = new LongAccumulator -register(acc, name) +register(acc, dataProperty = false, name) +acc + } + + /** + * Create and register a long accumulator, which starts with 0 and accumulates inputs by `+=`. + */ + def dataPropertyLongAccumulator: LongAccumulator = { --- End diff -- super subjective, but I don't like having a special method for this. I think you should have another version of `longAccumulator` which takes a `dataProperty` argument (also marked `@Experimental`). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/11105#discussion_r83287419 --- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala --- @@ -42,18 +60,43 @@ private[spark] case class AccumulatorMetadata( * `OUT` should be a type that can be read atomically (e.g., Int, Long), or thread-safely * (e.g., synchronized collections) because it will be read from other threads. */ -abstract class AccumulatorV2[IN, OUT] extends Serializable { +abstract class AccumulatorV2[@specialized(Int, Long, Double) IN, OUT] extends Serializable { private[spark] var metadata: AccumulatorMetadata = _ - private[this] var atDriverSide = true + private[spark] var atDriverSide = true + + /** + * The following values are used for data property [[AccumulatorV2]]s. + * Data property [[AccumulatorV2]]s have only-once semantics. These semantics are implemented + * by keeping track of which RDD id, shuffle id, and partition id the current function is + * processing in. If a partition is fully processed the results for that partition/shuffle/rdd + * combination are sent back to the driver. The driver keeps track of which rdd/shuffle/partitions + * already have been applied, and only combines values into value_ if the rdd/shuffle/partition + * has not already been aggregated on the driver program + */ + // For data property accumulators pending and processed updates. + // Pending and processed are keyed by (rdd id, shuffle id, partition id) + private[spark] lazy val pending = +new mutable.HashMap[(Int, Int, Int), AccumulatorV2[IN, OUT]]() + // Completed contains the set of (rdd id, shuffle id, partition id) that have been + // fully processed on the worker side. This is used to determine if the updates should + // be sent back to the driver for a particular rdd/shuffle/partition combination. + private[spark] lazy val completed = new mutable.HashSet[(Int, Int, Int)]() + // Processed is keyed by (rdd id, shuffle id) and the value is a bitset containing all partitions + // for the given key which have been merged into the value. This is used on the driver. + @transient private[spark] lazy val processed = new mutable.HashMap[(Int, Int), mutable.BitSet]() --- End diff -- I find the logic around `pending`, `completed` and `processed` really confusing. I'm still thinking this through -- before making concrete suggestions I want to check my understanding. `completed` should always only reflect the update from one task. It should only be set on the executor. At first I was going to say it shouldn't be a set, it should just be one value -- but of course one task can get updates from multiple rdds. It will differ from the keyset of `pending` b/c pending can include updates from partitions that aren't completed (eg. those `take` and `coalesce` cases). `processed` is only used on the driver. Since it has global knowledge, it can use this determine whether each update is a repeat or not. I think the comment whether updates should be sent back to the driver is incorrect -- it looks to me like the updates are always sent back to the driver, and the driver makes all of its determinations. seems like `merge` (and thus `dataPropertyMerge`) is only called on the driver. I'm trying to figure out whether or not it matters. It seems like it would do something weird if it were called on the executor. maybe we should add an `assert(isAtDriver)`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/11105#discussion_r83271147 --- Diff: core/src/main/scala/org/apache/spark/Accumulable.scala --- @@ -56,15 +61,26 @@ class Accumulable[R, T] private ( @transient private val initialValue: R, param: AccumulableParam[R, T], val name: Option[String], -private[spark] val countFailedValues: Boolean) +private[spark] val countFailedValues: Boolean, +private[spark] val dataProperty: Boolean) extends Serializable { private[spark] def this( initialValue: R, param: AccumulableParam[R, T], name: Option[String], + countFailedValues: Boolean, + dataProperty: Boolean) = { +this(AccumulatorContext.newId(), initialValue, param, name, countFailedValues, dataProperty) + } + + private[spark] def this( + initialValue: R, + param: AccumulableParam[R, T], + name: Option[String], countFailedValues: Boolean) = { -this(AccumulatorContext.newId(), initialValue, param, name, countFailedValues) +this(AccumulatorContext.newId(), initialValue, param, name, countFailedValues, + false /* dataProperty */) --- End diff -- `dataProperty = false` (though as I mentioned elsewhere, I think you shouldn't have this option on the old api) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/11105#discussion_r83291225 --- Diff: core/src/main/scala/org/apache/spark/TaskContextImpl.scala --- @@ -126,4 +126,14 @@ private[spark] class TaskContextImpl( taskMetrics.registerAccumulator(a) } + private var rddPartitionInfo: (Int, Int, Int) = null --- End diff -- one thing I think really needs to be documented somewhere is *why* we need to use these compound ids to track things, why there is a shuffle id, etc.. Also carrying around this triple is a little cumbersome, so I was thinking that maybe it makes sense to create a case class for it, which would also identify the purpose. I'm thinking of something like ```scala /** * Identifies where an update for a data property accumulator update came from. This is important to ensure that * that updates are not double-counted when rdds get recomputed. When the executors send back data property * acucmualtor updates, they seperate out the updates per rdd or shuffle output generated by the task. That * gives the driver sufficient info to ensure that each update is counted once. * [longer description of some of the tricky cases here, especially why we care about shuffle id]. */ sealed trait TaskOutputId case class RDDOutput(rddId: Int, partition: Int) case class ShuffleMapOutput(shuffleId: Int, partition: Int) ``` doesn't have to be a sealed trait, could just be one class as well. think this would help? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/11105#discussion_r83272894 --- Diff: core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala --- @@ -34,8 +35,29 @@ private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag]( override def getPartitions: Array[Partition] = firstParent[T].partitions - override def compute(split: Partition, context: TaskContext): Iterator[U] = -f(context, split.index, firstParent[T].iterator(split, context)) + override def compute(split: Partition, context: TaskContext): Iterator[U] = { +val input = firstParent[T].iterator(split, context) +// If our task has data property accumulators we need to keep track of which partitions +// are fully processed and what we are currently processing. +if (context.taskMetrics.hasDataPropertyAccumulators()) { + // Use -1 for shuffle id since we are not in a shuffle. + val shuffleWriteId = -1 + // Set the ID of the RDD and partition being processed. We need to do this per + // element since we chain the iterator transformations together + val data = input.map{x => +context.setRDDPartitionInfo(id, shuffleWriteId, split.index) +x + } + val wrappedData = Utils.signalWhenEmpty(data, +() => context.taskMetrics.markFullyProcessed(id, shuffleWriteId, split.index)) + // We also set it before the first call to the user function in case the user provides a + // function which access a data property accumulator before accessing any elements. + context.setRDDPartitionInfo(id, shuffleWriteId, split.index) + f(context, split.index, wrappedData) +} else { + f(context, split.index, input) --- End diff -- I'd add a comment here if there aren't any data property accumulators, we want to avoid the overhead that comes with the the extra iterator wrapping. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/11105#discussion_r83279340 --- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala --- @@ -136,15 +179,76 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable { def reset(): Unit /** + * Takes the inputs and accumulates. e.g. it can be a simple `+=` for counter accumulator. + * Developers should extend addImpl to customize the adding functionality. + */ + final def add(v: IN): Unit = { +if (metadata != null && metadata.dataProperty) { + dataPropertyAdd(v) +} else { + addImpl(v) +} + } + + private def dataPropertyAdd(v: IN): Unit = { +// Add first for localValue & AccumulatorInfo +addImpl(v) +if (metadata != null && metadata.dataProperty) { + val updateInfo = TaskContext.get().getRDDPartitionInfo() + val base = pending.getOrElse(updateInfo, copyAndReset()) + base.atDriverSide = false --- End diff -- why is this needed? I see tests fail w/out it, but don't understand why old handling in `readObject` aren't working here --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/11105#discussion_r83270945 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -1251,7 +1251,22 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli */ @deprecated("use AccumulatorV2", "2.0.0") def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]): Accumulator[T] = { -val acc = new Accumulator(initialValue, param) +accumulator(initialValue, dataProperty = false) + } + + /** + * Create an [[org.apache.spark.Accumulator]] variable of a given type, which tasks can "add" + * values to using the `+=` method. Only the driver can access the accumulator's `value`. + * + * @param dataProperty If the accumulator should avoid re-counting multiple evaluations on the + * same RDD/partition. This adds some additional overhead for tracking and + * is an experimental feature. + */ + @deprecated("use AccumulatorV2", "2.1.0") --- End diff -- what's the argument for adding it to the old api? Though I see the temptation, since its a nice new feature and we want to support users who haven't migrated yet -- I really don't think it makes sense to be adding stuff to a deprecated api. It will make users think its not really deprecated. Plus you provide a nice carrot for moving to the new api ;) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/11105#discussion_r83101779 --- Diff: core/src/test/scala/org/apache/spark/DataPropertyAccumulatorSuite.scala --- @@ -0,0 +1,361 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import scala.concurrent.ExecutionContext.Implicits.global +import scala.ref.WeakReference + +import org.scalatest.Matchers + +import org.apache.spark.scheduler._ + + +class DataPropertyAccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContext { + test("single partition") { +sc = new SparkContext("local[2]", "test") +val acc : Accumulator[Int] = sc.accumulator(0, dataProperty = true) + +val a = sc.parallelize(1 to 20, 1) +val b = a.map{x => acc += x; x} +b.cache() +b.count() +acc.value should be (210) + } + + test("adding only the first element per partition should work even if partition is empty") { +sc = new SparkContext("local[2]", "test") +val acc: Accumulator[Int] = sc.accumulator(0, dataProperty = true) +val a = sc.parallelize(1 to 20, 30) +val b = a.mapPartitions{itr => + acc += 1 + itr +} +b.count() +acc.value should be (30) + } + + test("shuffled (combineByKey)") { +sc = new SparkContext("local[2]", "test") +val a = sc.parallelize(1 to 40, 5) +val buckets = 4 +val b = a.map{x => ((x % buckets), x)} +val inputs = List(b, b.repartition(10), b.partitionBy(new HashPartitioner(5))).map(_.cache()) +val mapSideCombines = List(true, false) +inputs.foreach { input => + mapSideCombines.foreach { mapSideCombine => +val accs = (1 to 4).map(x => sc.accumulator(0, dataProperty = true)).toList +val raccs = (1 to 4).map(x => sc.accumulator(0, dataProperty = false)).toList +val List(acc, acc1, acc2, acc3) = accs +val List(racc, racc1, racc2, racc3) = raccs +val c = input.combineByKey( + (x: Int) => {acc1 += 1; acc += 1; racc1 += 1; racc += 1; x}, + {(a: Int, b: Int) => acc2 += 1; acc += 1; racc2 += 1; racc += 1; (a + b)}, + {(a: Int, b: Int) => acc3 += 1; acc += 1; racc3 += 1; racc += 1; (a + b)}, + new HashPartitioner(10), + mapSideCombine) +val d = input.combineByKey( + (x: Int) => {acc1 += 1; acc += 1; x}, + {(a: Int, b: Int) => acc2 += 1; acc += 1; (a + b)}, + {(a: Int, b: Int) => acc3 += 1; acc += 1; (a + b)}, + new HashPartitioner(2), + mapSideCombine) +val e = d.map{x => acc += 1; x} +c.count() +// If our partitioner is known then we should only create +// one combiner for each key value. Otherwise we should +// create at least that many combiners. +if (input.partitioner.isDefined) { + acc1.value should be (buckets) +} else { + acc1.value should be >= (buckets) +} +if (input.partitioner.isDefined) { + acc2.value should be > (0) +} else if (mapSideCombine) { + acc3.value should be > (0) +} else { + acc2.value should be > (0) + acc3.value should be (0) +} +acc.value should be (acc1.value + acc2.value + acc3.value) +val oldValues = accs.map(_.value) +// For one action the data property accumulators and regular should have the same value. +accs.map(_.value) should be (raccs.map(_.value)) +c.count() +accs.map(_.value) should be (oldValues) --- End diff -- @viirya sorry I am commenting really late here. Do you still have your implementation? It would be interesting to compare. If I understand right, you are proposing that `TaskResult` would include which RDDs had been
[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...
Github user techaddict commented on a diff in the pull request: https://github.com/apache/spark/pull/11105#discussion_r73330940 --- Diff: core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala --- @@ -220,8 +220,27 @@ class TaskMetrics private[spark] () extends Serializable { */ @transient private[spark] lazy val externalAccums = new ArrayBuffer[AccumulatorV2[_, _]] + /** +* All data property accumulators registered with this task. +*/ + @transient private lazy val dataPropertyAccums = new ArrayBuffer[AccumulatorV2[_, _]] + private[spark] def registerAccumulator(a: AccumulatorV2[_, _]): Unit = { externalAccums += a +if (a.dataProperty) { + dataPropertyAccums += a +} + } + + private[spark] def hasDataPropertyAccumulators(): Boolean = { +!dataPropertyAccums.isEmpty --- End diff -- nit: could be nonEmpty --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11105: [SPARK-12469][CORE] Data Property accumulators fo...
Github user techaddict commented on a diff in the pull request: https://github.com/apache/spark/pull/11105#discussion_r73330741 --- Diff: core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala --- @@ -220,8 +220,27 @@ class TaskMetrics private[spark] () extends Serializable { */ @transient private[spark] lazy val externalAccums = new ArrayBuffer[AccumulatorV2[_, _]] + /** +* All data property accumulators registered with this task. +*/ + @transient private lazy val dataPropertyAccums = new ArrayBuffer[AccumulatorV2[_, _]] + private[spark] def registerAccumulator(a: AccumulatorV2[_, _]): Unit = { externalAccums += a +if (a.dataProperty) { + dataPropertyAccums += a +} + } + + private[spark] def hasDataPropertyAccumulators(): Boolean = { +!dataPropertyAccums.isEmpty + } + + /** + * Mark an rdd/shuffle/and partition as fully processed for all dataProperty accumulators. + */ + private[spark] def markFullyProcessed(rddId: Int, shuffleWriteId: Int, partitionId: Int) = { +dataPropertyAccums.map(_.markFullyProcessed(rddId, shuffleWriteId, partitionId)) --- End diff -- should be `foreach` instead of `map` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org