Github user zsxwing commented on the pull request:

    https://github.com/apache/spark/pull/3262#issuecomment-63055331
  
    > What's the distinction for intToIntWritable/writableConverters?
    
    `writableConverters` can work. Already done. Here is the code to test 
binary compatibility.
    ```Scala
    import org.apache.spark.{SparkContext, SparkConf}
    import org.apache.spark.SparkContext._
    
    object ImplicitBackforwardCompatibilityApp {
    
      def main(args: Array[String]): Unit = {
        val conf = new 
SparkConf().setAppName("ImplicitBackforwardCompatibilityApp")
        val sc = new SparkContext(conf)
    
        val rdd = sc.parallelize(1 to 100).map(i => (i, i))
        val rdd2 = rdd.groupByKey() // rddToPairRDDFunctions
        val rdd3 = rdd2.sortByKey() // rddToOrderedRDDFunctions
        val s1 = rdd3.map(_._1).stats() // numericRDDToDoubleRDDFunctions
        println(s1)
        val s2 = rdd3.map(_._1.toDouble).stats() // 
doubleRDDToDoubleRDDFunctions
        println(s2)
        val f = rdd2.countAsync() // rddToAsyncRDDActions
        println(f.get())
        rdd2.map { case (k, v) => (k, v.size)} saveAsSequenceFile 
("/tmp/implicit_test_path") // rddToSequenceFileRDDFunctions
    
        val a1 = sc.accumulator(123.4) // DoubleAccumulatorParam
        a1.add(1.0)
        println(a1.value)
        val a2 = sc.accumulator(123) // IntAccumulatorParam
        a2.add(3)
        println(a2.value)
        val a3 = sc.accumulator(123L) // LongAccumulatorParam
        a3.add(11L)
        println(a3.value)
        val a4 = sc.accumulator(123F) // FloatAccumulatorParam
        a4.add(1.1F)
        println(a4.value)
    
        {
          sc.parallelize(1 to 10).map(i => (i, 
i)).saveAsSequenceFile("/tmp/implicit_test_int")
          val r = sc.sequenceFile[Int, Int]("/tmp/implicit_test_int")
          r.map { case (k, v) => (k.toString, v.toString)} foreach (println)
        }
    
        {
          sc.parallelize(1 to 10).map(i => (i.toLong, 
i.toLong)).saveAsSequenceFile("/tmp/implicit_test_long")
          val r = sc.sequenceFile[Long, Long]("/tmp/implicit_test_long")
          r.map { case (k, v) => (k.toString, v.toString)} foreach (println)
        }
    
        {
          sc.parallelize(1 to 10).map(i => (i.toDouble, 
i.toDouble)).saveAsSequenceFile("/tmp/implicit_test_double")
          val r = sc.sequenceFile[Double, Double]("/tmp/implicit_test_double")
          r.map { case (k, v) => (k.toString, v.toString)} foreach (println)
        }
    
        {
          sc.parallelize(1 to 10).map(i => (i.toFloat, 
i.toFloat)).saveAsSequenceFile("/tmp/implicit_test_float")
          val r = sc.sequenceFile[Float, Float]("/tmp/implicit_test_float")
          r.map { case (k, v) => (k.toString, v.toString)} foreach (println)
        }
    
        {
          sc.parallelize(1 to 10).map(i => (i.toString, 
i.toString)).saveAsSequenceFile("/tmp/implicit_test_string")
          val r = sc.sequenceFile[String, String]("/tmp/implicit_test_string")
          r.map { case (k, v) => (k.toString, v.toString)} foreach (println)
        }
    
        {
          sc.parallelize(1 to 10).map(i => (true, 
false)).saveAsSequenceFile("/tmp/implicit_test_boolean")
          val r = sc.sequenceFile[Boolean, 
Boolean]("/tmp/implicit_test_boolean")
          r.map { case (k, v) => (k.toString, v.toString)} foreach (println)
        }
    
        {
          sc.parallelize(1 to 10).map(i => (Array(i.toByte), 
Array(i.toByte))).saveAsSequenceFile("/tmp/implicit_test_bytes")
          val r = sc.sequenceFile[Array[Byte], 
Array[Byte]]("/tmp/implicit_test_bytes")
          r.map { case (k, v) => (k.toString, v.toString)} foreach (println)
        }
    
        {
          sc.parallelize(1 to 10).map(i => (i.toString, 
i.toString)).saveAsSequenceFile("/tmp/implicit_test_writable")
          val r = sc.sequenceFile[org.apache.hadoop.io.Text, 
org.apache.hadoop.io.Text]("/tmp/implicit_test_writable")
          r.map { case (k, v) => (k.toString, v.toString)} foreach (println)
        }
    
        sc.stop()
      }
    }
    
    ```
    I compiled the above codes with Spark 1.1.0 and ran it with the new Spark 
compiled from this PR. And it works correctly.
    
    For `intToIntWritable`, the problem is that the implicit value for 
`SequenceFileRDDFunctions` is a function `T => Writable[T]`. However, we cannot 
add these `xxxToXXXWritable` methods to the implicit scope of `T => 
Writable[T]` which is out of the Spark's codes. The definition of `implicit 
scope` is:
    
    >  implicit scope, which contains all sort of companion objects and package 
object that bear some relation to the implicit's type which we search for (i.e. 
package object of the type, companion object of the type itself, of its type 
constructor if any, of its parameters if any, and also of its supertype and 
supertraits).
    
    Ref: http://eed3si9n.com/revisiting-implicits-without-import-tax
    
    A possible solution is creating a new class for `T => Writable[T]` like 
`WritableConverter`, and change the implicit type of  
`SequenceFileRDDFunctions` to this class. E.g.
    
    ```Scala
    class SequenceFileRDDFunctions[K, V](
        self: RDD[(K, V)])(implicit keyConverter: NewWritableConverter[K], 
valueConverter: NewWritableConverter[V])
    ```
    
    However, since it's a breaking change (of cause, we can also add a new 
SequenceFileRDDFunctions class to avoid breaking the old codes), I don't think 
it's worth us to change it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to