Or heck... I could just base-64 encode the serialized byte arrays and pass them as strings in the configuration. If it's going to be a hack, might as well go all the way.
On Fri, Apr 2, 2010 at 4:10 PM, Kris Nuttycombe <[email protected]> wrote: > On Fri, Apr 2, 2010 at 3:10 PM, Owen O'Malley <[email protected]> wrote: >> >> On Apr 2, 2010, at 12:05 PM, Kris Nuttycombe wrote: >> >>> What I'm wondering is, is there any way to simply serialize a Mapper >>> or Reducer object, and have the serialized instance copied, passed >>> around and used everywhere instead of always having the Mapper and >>> Reducer instantiated by reflection? This would greatly simplify >>> library design in my case. >> >> Currently the best you can do is to make your Mapper or Reduce implement >> Configurable and use the values out of the configuration. >> >> Take a look at MAPREDUCE-1183. It should be exactly what you are asking for >> when it gets implemented. >> >> -- Owen >> > > Thanks for the reference to that ticket, Owen. In the meantime, I > think I may have figured out a workaround. The following code > (completely untested as of yet, but a starting point) provides base > classes for an implementation based upon the distributed cache: > > > import org.apache.hadoop.conf._ > import org.apache.hadoop.util._ > import org.apache.hadoop.mapreduce._ > import java.io._ > import SerializingResourceToolRunner._ > > object SerializingResourceToolRunner { > val serializedResourceName = "socialmedia.mr_tool.serfile" > } > > class SerializingResourceToolRunner[T <: Serializable](tool: > SerializingResourceTool[T]) { > def runWithToolRunner(argv: Array[String]) = { > def stripFileArg(i: Int, l: List[String], f: Option[String]): > (List[String], Option[String]) = { > if (i >= argv.length) (l, f) > else if (argv(i) == "-files") stripFileArg(i + 2, l, option(argv(i + 1))) > else stripFileArg(i + 1, argv(i) :: l, f) > } > > val tempFile = File.createTempFile("mr_tool", ".ser") > using(new FileOutputStream(tempFile)) { > f => using(new ObjectOutputStream(f)) { > out => out.writeObject(tool.resource) > } > } > > val (args, filesArg) = stripFileArg(0, Nil, None) > > tool.getConf.set(serializedResourceName, tempFile.getName) > val filesArgWithTempFile = filesArg.map(_ + "," + > tempFile.getAbsolutePath).getOrElse(tempFile.getAbsolutePath) > ToolRunner.run(tool, ("-files" :: filesArgWithTempFile :: args).toArray) > } > } > > trait Resources[T] { > private var _resource: T = _ > def resource: T = _resource > > def init(conf: Configuration): Unit = { > _resource = using(new FileInputStream(new > File(conf.get(serializedResourceName)))) { > f => using(new ObjectInputStream(f)) { > in => in.readObject.asInstanceOf[T] > } > } > } > } > > abstract class SerializedResourceMapper[T, KI, VI, KO, VO] extends > Mapper[KI, VI, KO, VO] with Resources[T] { > override def setup(context: Mapper[KI, VI, KO, VO]#Context): Unit = { > super.setup(context) > init(context.getConfiguration) > } > } > > abstract class SerializedResourceReducer[T, KI, VI, KO, VO] extends > Reducer[KI, VI, KO, VO] with Resources[T] { > override def setup(context: Reducer[KI, VI, KO, VO]#Context): Unit = { > super.setup(context) > init(context.getConfiguration) > } > } > > abstract class SerializingResourceTool[T <: Serializable] extends > Configured with Tool { > def resource: T > } >
