On Fri, Apr 2, 2010 at 3:10 PM, Owen O'Malley <[email protected]> wrote:
>
> On Apr 2, 2010, at 12:05 PM, Kris Nuttycombe wrote:
>
>> What I'm wondering is, is there any way to simply serialize a Mapper
>> or Reducer object, and have the serialized instance copied, passed
>> around and used everywhere instead of always having the Mapper and
>> Reducer instantiated by reflection? This would greatly simplify
>> library design in my case.
>
> Currently the best you can do is to make your Mapper or Reduce implement
> Configurable and use the values out of the configuration.
>
> Take a look at MAPREDUCE-1183. It should be exactly what you are asking for
> when it gets implemented.
>
> -- Owen
>
Thanks for the reference to that ticket, Owen. In the meantime, I
think I may have figured out a workaround. The following code
(completely untested as of yet, but a starting point) provides base
classes for an implementation based upon the distributed cache:
import org.apache.hadoop.conf._
import org.apache.hadoop.util._
import org.apache.hadoop.mapreduce._
import java.io._
import SerializingResourceToolRunner._
object SerializingResourceToolRunner {
val serializedResourceName = "socialmedia.mr_tool.serfile"
}
class SerializingResourceToolRunner[T <: Serializable](tool:
SerializingResourceTool[T]) {
def runWithToolRunner(argv: Array[String]) = {
def stripFileArg(i: Int, l: List[String], f: Option[String]):
(List[String], Option[String]) = {
if (i >= argv.length) (l, f)
else if (argv(i) == "-files") stripFileArg(i + 2, l, option(argv(i + 1)))
else stripFileArg(i + 1, argv(i) :: l, f)
}
val tempFile = File.createTempFile("mr_tool", ".ser")
using(new FileOutputStream(tempFile)) {
f => using(new ObjectOutputStream(f)) {
out => out.writeObject(tool.resource)
}
}
val (args, filesArg) = stripFileArg(0, Nil, None)
tool.getConf.set(serializedResourceName, tempFile.getName)
val filesArgWithTempFile = filesArg.map(_ + "," +
tempFile.getAbsolutePath).getOrElse(tempFile.getAbsolutePath)
ToolRunner.run(tool, ("-files" :: filesArgWithTempFile :: args).toArray)
}
}
trait Resources[T] {
private var _resource: T = _
def resource: T = _resource
def init(conf: Configuration): Unit = {
_resource = using(new FileInputStream(new
File(conf.get(serializedResourceName)))) {
f => using(new ObjectInputStream(f)) {
in => in.readObject.asInstanceOf[T]
}
}
}
}
abstract class SerializedResourceMapper[T, KI, VI, KO, VO] extends
Mapper[KI, VI, KO, VO] with Resources[T] {
override def setup(context: Mapper[KI, VI, KO, VO]#Context): Unit = {
super.setup(context)
init(context.getConfiguration)
}
}
abstract class SerializedResourceReducer[T, KI, VI, KO, VO] extends
Reducer[KI, VI, KO, VO] with Resources[T] {
override def setup(context: Reducer[KI, VI, KO, VO]#Context): Unit = {
super.setup(context)
init(context.getConfiguration)
}
}
abstract class SerializingResourceTool[T <: Serializable] extends
Configured with Tool {
def resource: T
}