Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11105#discussion_r83101779
  
    --- Diff: 
core/src/test/scala/org/apache/spark/DataPropertyAccumulatorSuite.scala ---
    @@ -0,0 +1,361 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark
    +
    +import scala.concurrent.ExecutionContext.Implicits.global
    +import scala.ref.WeakReference
    +
    +import org.scalatest.Matchers
    +
    +import org.apache.spark.scheduler._
    +
    +
    +class DataPropertyAccumulatorSuite extends SparkFunSuite with Matchers 
with LocalSparkContext {
    +  test("single partition") {
    +    sc = new SparkContext("local[2]", "test")
    +    val acc : Accumulator[Int] = sc.accumulator(0, dataProperty = true)
    +
    +    val a = sc.parallelize(1 to 20, 1)
    +    val b = a.map{x => acc += x; x}
    +    b.cache()
    +    b.count()
    +    acc.value should be (210)
    +  }
    +
    +  test("adding only the first element per partition should work even if 
partition is empty") {
    +    sc = new SparkContext("local[2]", "test")
    +    val acc: Accumulator[Int] = sc.accumulator(0, dataProperty = true)
    +    val a = sc.parallelize(1 to 20, 30)
    +    val b = a.mapPartitions{itr =>
    +      acc += 1
    +      itr
    +    }
    +    b.count()
    +    acc.value should be (30)
    +  }
    +
    +  test("shuffled (combineByKey)") {
    +    sc = new SparkContext("local[2]", "test")
    +    val a = sc.parallelize(1 to 40, 5)
    +    val buckets = 4
    +    val b = a.map{x => ((x % buckets), x)}
    +    val inputs = List(b, b.repartition(10), b.partitionBy(new 
HashPartitioner(5))).map(_.cache())
    +    val mapSideCombines = List(true, false)
    +    inputs.foreach { input =>
    +      mapSideCombines.foreach { mapSideCombine =>
    +        val accs = (1 to 4).map(x => sc.accumulator(0, dataProperty = 
true)).toList
    +        val raccs = (1 to 4).map(x => sc.accumulator(0, dataProperty = 
false)).toList
    +        val List(acc, acc1, acc2, acc3) = accs
    +        val List(racc, racc1, racc2, racc3) = raccs
    +        val c = input.combineByKey(
    +          (x: Int) => {acc1 += 1; acc += 1; racc1 += 1; racc += 1; x},
    +          {(a: Int, b: Int) => acc2 += 1; acc += 1; racc2 += 1; racc += 1; 
(a + b)},
    +          {(a: Int, b: Int) => acc3 += 1; acc += 1; racc3 += 1; racc += 1; 
(a + b)},
    +          new HashPartitioner(10),
    +          mapSideCombine)
    +        val d = input.combineByKey(
    +          (x: Int) => {acc1 += 1; acc += 1; x},
    +          {(a: Int, b: Int) => acc2 += 1; acc += 1; (a + b)},
    +          {(a: Int, b: Int) => acc3 += 1; acc += 1; (a + b)},
    +          new HashPartitioner(2),
    +          mapSideCombine)
    +        val e = d.map{x => acc += 1; x}
    +        c.count()
    +        // If our partitioner is known then we should only create
    +        // one combiner for each key value. Otherwise we should
    +        // create at least that many combiners.
    +        if (input.partitioner.isDefined) {
    +          acc1.value should be (buckets)
    +        } else {
    +          acc1.value should be >= (buckets)
    +        }
    +        if (input.partitioner.isDefined) {
    +          acc2.value should be > (0)
    +        } else if (mapSideCombine) {
    +          acc3.value should be > (0)
    +        } else {
    +          acc2.value should be > (0)
    +          acc3.value should be (0)
    +        }
    +        acc.value should be (acc1.value + acc2.value + acc3.value)
    +        val oldValues = accs.map(_.value)
    +        // For one action the data property accumulators and regular 
should have the same value.
    +        accs.map(_.value) should be (raccs.map(_.value))
    +        c.count()
    +        accs.map(_.value) should be (oldValues)
    --- End diff --
    
    @viirya sorry I am commenting really late here.  Do you still have your 
implementation?  It would be interesting to compare.
    
    If I understand right, you are proposing that `TaskResult` would include 
which RDDs had been computed.  Then when the results are merged on the driver, 
you would know updates to count and which not to.   Because of corner cases w/ 
things like `rdd.take()` and `rdd.coalesce()`, I think you'd still need to 
handle some of the trickier details done here, eg. track whether a partition is 
fully read or not with iterator wrapping (and the performance effect it has).
    
    I'm pretty sure you'd also lose the ability to handle an accumulator that 
was shared across multiple rdds:
    
    ```scala
    
    val acc = ...
    val rddA = ...
    val rddB = rddA.map {...} // use acc in here
    rddB.take(1000)
    val rddC = rddB.map {...} // use acc here also
    rddC.count()
    ```
    
    the problem is, if you only get the final accumulator value in your task 
updates, you wont' know how much of that value to increment by.  You won't know 
which parts were already counted, and which ones weren't.  That example is 
somewhat contrived, but there are lots of trickier cases like concurrently 
executing jobs w/ shared RDDs etc.  A major goal is to provide very consistent 
semantics, and I think we need to make sure you get sane values out in all 
those cases.
    
    Honestly I can't think of a great use case for data-property accumulators 
inside a combineByKey.  All the uses I can think inside combine-by-key are 
really compute-property (eg., measuring how much map-side combining am I doing, 
say by having an accumulator track how many times the record size > X in the 
map-side).  And the data-property accumulators I'd use would be immediately 
after or before the combineByKey.  But if we can add it, its nice to have, 
since I'd hate to have weird restrictions on where you can use them.  probably 
there are good use cases which I can't think of at the moment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to