Have a look at org.apache.hadoop.io.Stringifier (and DefaultStringifier), which may be helpful too.
Cheers, Tom On Fri, Apr 2, 2010 at 3:35 PM, Kris Nuttycombe <[email protected]> wrote: > Or heck... I could just base-64 encode the serialized byte arrays and > pass them as strings in the configuration. If it's going to be a hack, > might as well go all the way. > > On Fri, Apr 2, 2010 at 4:10 PM, Kris Nuttycombe > <[email protected]> wrote: >> On Fri, Apr 2, 2010 at 3:10 PM, Owen O'Malley <[email protected]> wrote: >>> >>> On Apr 2, 2010, at 12:05 PM, Kris Nuttycombe wrote: >>> >>>> What I'm wondering is, is there any way to simply serialize a Mapper >>>> or Reducer object, and have the serialized instance copied, passed >>>> around and used everywhere instead of always having the Mapper and >>>> Reducer instantiated by reflection? This would greatly simplify >>>> library design in my case. >>> >>> Currently the best you can do is to make your Mapper or Reduce implement >>> Configurable and use the values out of the configuration. >>> >>> Take a look at MAPREDUCE-1183. It should be exactly what you are asking for >>> when it gets implemented. >>> >>> -- Owen >>> >> >> Thanks for the reference to that ticket, Owen. In the meantime, I >> think I may have figured out a workaround. The following code >> (completely untested as of yet, but a starting point) provides base >> classes for an implementation based upon the distributed cache: >> >> >> import org.apache.hadoop.conf._ >> import org.apache.hadoop.util._ >> import org.apache.hadoop.mapreduce._ >> import java.io._ >> import SerializingResourceToolRunner._ >> >> object SerializingResourceToolRunner { >> val serializedResourceName = "socialmedia.mr_tool.serfile" >> } >> >> class SerializingResourceToolRunner[T <: Serializable](tool: >> SerializingResourceTool[T]) { >> def runWithToolRunner(argv: Array[String]) = { >> def stripFileArg(i: Int, l: List[String], f: Option[String]): >> (List[String], Option[String]) = { >> if (i >= argv.length) (l, f) >> else if (argv(i) == "-files") stripFileArg(i + 2, l, option(argv(i + >> 1))) >> else stripFileArg(i + 1, argv(i) :: l, f) >> } >> >> val tempFile = File.createTempFile("mr_tool", ".ser") >> using(new FileOutputStream(tempFile)) { >> f => using(new ObjectOutputStream(f)) { >> out => out.writeObject(tool.resource) >> } >> } >> >> val (args, filesArg) = stripFileArg(0, Nil, None) >> >> tool.getConf.set(serializedResourceName, tempFile.getName) >> val filesArgWithTempFile = filesArg.map(_ + "," + >> tempFile.getAbsolutePath).getOrElse(tempFile.getAbsolutePath) >> ToolRunner.run(tool, ("-files" :: filesArgWithTempFile :: args).toArray) >> } >> } >> >> trait Resources[T] { >> private var _resource: T = _ >> def resource: T = _resource >> >> def init(conf: Configuration): Unit = { >> _resource = using(new FileInputStream(new >> File(conf.get(serializedResourceName)))) { >> f => using(new ObjectInputStream(f)) { >> in => in.readObject.asInstanceOf[T] >> } >> } >> } >> } >> >> abstract class SerializedResourceMapper[T, KI, VI, KO, VO] extends >> Mapper[KI, VI, KO, VO] with Resources[T] { >> override def setup(context: Mapper[KI, VI, KO, VO]#Context): Unit = { >> super.setup(context) >> init(context.getConfiguration) >> } >> } >> >> abstract class SerializedResourceReducer[T, KI, VI, KO, VO] extends >> Reducer[KI, VI, KO, VO] with Resources[T] { >> override def setup(context: Reducer[KI, VI, KO, VO]#Context): Unit = { >> super.setup(context) >> init(context.getConfiguration) >> } >> } >> >> abstract class SerializingResourceTool[T <: Serializable] extends >> Configured with Tool { >> def resource: T >> } >> >
