Github user zsxwing commented on the pull request:
https://github.com/apache/spark/pull/3262#issuecomment-63055331
> What's the distinction for intToIntWritable/writableConverters?
`writableConverters` can work. Already done. Here is the code to test
binary compatibility.
```Scala
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.SparkContext._
object ImplicitBackforwardCompatibilityApp {
def main(args: Array[String]): Unit = {
val conf = new
SparkConf().setAppName("ImplicitBackforwardCompatibilityApp")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(1 to 100).map(i => (i, i))
val rdd2 = rdd.groupByKey() // rddToPairRDDFunctions
val rdd3 = rdd2.sortByKey() // rddToOrderedRDDFunctions
val s1 = rdd3.map(_._1).stats() // numericRDDToDoubleRDDFunctions
println(s1)
val s2 = rdd3.map(_._1.toDouble).stats() //
doubleRDDToDoubleRDDFunctions
println(s2)
val f = rdd2.countAsync() // rddToAsyncRDDActions
println(f.get())
rdd2.map { case (k, v) => (k, v.size)} saveAsSequenceFile
("/tmp/implicit_test_path") // rddToSequenceFileRDDFunctions
val a1 = sc.accumulator(123.4) // DoubleAccumulatorParam
a1.add(1.0)
println(a1.value)
val a2 = sc.accumulator(123) // IntAccumulatorParam
a2.add(3)
println(a2.value)
val a3 = sc.accumulator(123L) // LongAccumulatorParam
a3.add(11L)
println(a3.value)
val a4 = sc.accumulator(123F) // FloatAccumulatorParam
a4.add(1.1F)
println(a4.value)
{
sc.parallelize(1 to 10).map(i => (i,
i)).saveAsSequenceFile("/tmp/implicit_test_int")
val r = sc.sequenceFile[Int, Int]("/tmp/implicit_test_int")
r.map { case (k, v) => (k.toString, v.toString)} foreach (println)
}
{
sc.parallelize(1 to 10).map(i => (i.toLong,
i.toLong)).saveAsSequenceFile("/tmp/implicit_test_long")
val r = sc.sequenceFile[Long, Long]("/tmp/implicit_test_long")
r.map { case (k, v) => (k.toString, v.toString)} foreach (println)
}
{
sc.parallelize(1 to 10).map(i => (i.toDouble,
i.toDouble)).saveAsSequenceFile("/tmp/implicit_test_double")
val r = sc.sequenceFile[Double, Double]("/tmp/implicit_test_double")
r.map { case (k, v) => (k.toString, v.toString)} foreach (println)
}
{
sc.parallelize(1 to 10).map(i => (i.toFloat,
i.toFloat)).saveAsSequenceFile("/tmp/implicit_test_float")
val r = sc.sequenceFile[Float, Float]("/tmp/implicit_test_float")
r.map { case (k, v) => (k.toString, v.toString)} foreach (println)
}
{
sc.parallelize(1 to 10).map(i => (i.toString,
i.toString)).saveAsSequenceFile("/tmp/implicit_test_string")
val r = sc.sequenceFile[String, String]("/tmp/implicit_test_string")
r.map { case (k, v) => (k.toString, v.toString)} foreach (println)
}
{
sc.parallelize(1 to 10).map(i => (true,
false)).saveAsSequenceFile("/tmp/implicit_test_boolean")
val r = sc.sequenceFile[Boolean,
Boolean]("/tmp/implicit_test_boolean")
r.map { case (k, v) => (k.toString, v.toString)} foreach (println)
}
{
sc.parallelize(1 to 10).map(i => (Array(i.toByte),
Array(i.toByte))).saveAsSequenceFile("/tmp/implicit_test_bytes")
val r = sc.sequenceFile[Array[Byte],
Array[Byte]]("/tmp/implicit_test_bytes")
r.map { case (k, v) => (k.toString, v.toString)} foreach (println)
}
{
sc.parallelize(1 to 10).map(i => (i.toString,
i.toString)).saveAsSequenceFile("/tmp/implicit_test_writable")
val r = sc.sequenceFile[org.apache.hadoop.io.Text,
org.apache.hadoop.io.Text]("/tmp/implicit_test_writable")
r.map { case (k, v) => (k.toString, v.toString)} foreach (println)
}
sc.stop()
}
}
```
I compiled the above codes with Spark 1.1.0 and ran it with the new Spark
compiled from this PR. And it works correctly.
For `intToIntWritable`, the problem is that the implicit value for
`SequenceFileRDDFunctions` is a function `T => Writable[T]`. However, we cannot
add these `xxxToXXXWritable` methods to the implicit scope of `T =>
Writable[T]` which is out of the Spark's codes. The definition of `implicit
scope` is:
> implicit scope, which contains all sort of companion objects and package
object that bear some relation to the implicit's type which we search for (i.e.
package object of the type, companion object of the type itself, of its type
constructor if any, of its parameters if any, and also of its supertype and
supertraits).
Ref: http://eed3si9n.com/revisiting-implicits-without-import-tax
A possible solution is creating a new class for `T => Writable[T]` like
`WritableConverter`, and change the implicit type of
`SequenceFileRDDFunctions` to this class. E.g.
```Scala
class SequenceFileRDDFunctions[K, V](
self: RDD[(K, V)])(implicit keyConverter: NewWritableConverter[K],
valueConverter: NewWritableConverter[V])
```
However, since it's a breaking change (of cause, we can also add a new
SequenceFileRDDFunctions class to avoid breaking the old codes), I don't think
it's worth us to change it.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]