nice work tracking down the problems w/ the codec getting applied
consistently. I think you're close to the fix, just need to understand
scala implicit resolution rules.
I'm not entirely sure what you mean when you say "I simply copy/pasted
saveAsObject() body to my funtion:" -- where does your function live?
Are you trying to modify SequenceFileRDDFunctions, then recompile your own
version of spark? or are you trying to leave the spark package alone, and
add your own helper function elsewhere?
If you are modifying SequenceFileRDDFunctions, then you should just be able
to drop another function in there no problem. Just be sure you have the
implicit conversions in scope when you try to apply them. The way to do
that is to "import org.apache.spark.SparkContext._". In the SparkContext
*object*, you'll notice a bunch of "implicit def"s -- by importing those,
you are telling the scala compiler that it should try to apply those rules
when searching for function definitions.
Your attempt at *explicit* conversion doesn't work b/c you aren't actually
doing a conversion -- you are attempting to apply a function. What you
have gets desugared to:
org.apache.spark.rdd.SequenceFileRDDFunctions*.apply*[(NullWritable,
BytesWritable)](...)
you'll notice that the compiler even told you it was looking for an object
called "SequenceFileRDDFunctions" but didn't find one. You want to create
a new instance of the class, which you do by adding *new* in front.
*new* org.apache.spark.rdd.SequenceFileRDDFunctions[(NullWritable,
BytesWritable)](...)
This is really confusing when you are new to scala -- lots of companion
objects have an "apply" method that act like new. But
SequenceFileRDDFunctions doesn't.
then the second part is, making your newly added functions available on all
RDDs by implicit conversion. first, define a wrapper class with your new
function, and a companion object with an implicit conversion:
class AwesomeRDD[T](self: RDD[T]) {
def saveAwesomeObjectFile(path: String) {
//put your def here
}
}
object AwesomeRDD {
implicit def addAwesomeFunctions[T](rdd: RDD[T]) = new AwesomeRDD(rdd)
}
then, just import the implicit conversion wherever you want it:
class Demo {
val rdd: RDD[String] = ...
import AwesomeRDD._
rdd.saveAwesomeObjectFile("/path")
}
On Sat, Jan 4, 2014 at 9:42 AM, Aureliano Buendia <[email protected]>wrote:
> Hi,
>
> I'm trying to create a custom version of saveAsObject(). however, I do not
> seem to be able to use SequenceFileRDDFunctions in my package.
>
> I simply copy/pasted saveAsObject() body to my funtion:
>
> out.mapPartitions(iter => iter.grouped(10).map(_.toArray))
> .map(x => (NullWritable.get(), new BytesWritable(serialize(x))))
> .*saveAsSequenceFile*("output")
>
> But that gives me this error:
>
> value saveAsSequenceFile is not a member of
> org.apache.spark.rdd.RDD[(org.apache.hadoop.io.NullWritable,
> org.apache.hadoop.io.BytesWritable)]
> possible cause: maybe a semicolon is missing before `value
> saveAsSequenceFile'?
> .saveAsSequenceFile("output")
> ^
>
> Scala implicit conversion error is not of any help here. So I tried to
> apply explicit conversion:
>
> *org.apache.spark.rdd.SequenceFileRDDFunctions[(NullWritable,
> BytesWritable)](*out.mapPartitions(iter =>
> iter.grouped(10).map(_.toArray))
> .map(x => (NullWritable.get(), new BytesWritable(serialize(x))))*)*
> .saveAsSequenceFile("output")
>
> Giving me this error:
>
> object SequenceFileRDDFunctions is not a member of package
> org.apache.spark.rdd
> *Note: class SequenceFileRDDFunctions exists, but it has no companion
> object.*
> org.apache.spark.rdd.SequenceFileRDDFunctions[(NullWritable,
> BytesWritable)](out.mapPartitions(iter => iter.grouped(10).map(_.toArray))
> ^
>
> Is this scala compiler version mismatch hell?
>