On Fri, Apr 2, 2010 at 3:10 PM, Owen O'Malley <[email protected]> wrote:
>
> On Apr 2, 2010, at 12:05 PM, Kris Nuttycombe wrote:
>
>> What I'm wondering is, is there any way to simply serialize a Mapper
>> or Reducer object, and have the serialized instance copied, passed
>> around and used everywhere instead of always having the Mapper and
>> Reducer instantiated by reflection? This would greatly simplify
>> library design in my case.
>
> Currently the best you can do is to make your Mapper or Reduce implement
> Configurable and use the values out of the configuration.
>
> Take a look at MAPREDUCE-1183. It should be exactly what you are asking for
> when it gets implemented.
>
> -- Owen
>

Thanks for the reference to that ticket, Owen. In the meantime, I
think I may have figured out a workaround. The following code
(completely untested as of yet, but a starting point) provides base
classes for an implementation based upon the distributed cache:


import org.apache.hadoop.conf._
import org.apache.hadoop.util._
import org.apache.hadoop.mapreduce._
import java.io._
import SerializingResourceToolRunner._

object SerializingResourceToolRunner {
  val serializedResourceName = "socialmedia.mr_tool.serfile"
}

class SerializingResourceToolRunner[T <: Serializable](tool:
SerializingResourceTool[T]) {
  def runWithToolRunner(argv: Array[String]) = {
    def stripFileArg(i: Int, l: List[String], f: Option[String]):
(List[String], Option[String]) = {
      if (i >= argv.length) (l, f)
      else if (argv(i) == "-files") stripFileArg(i + 2, l, option(argv(i + 1)))
      else stripFileArg(i + 1, argv(i) :: l, f)
    }

    val tempFile = File.createTempFile("mr_tool", ".ser")
    using(new FileOutputStream(tempFile)) {
      f => using(new ObjectOutputStream(f)) {
        out => out.writeObject(tool.resource)
      }
    }

    val (args, filesArg) = stripFileArg(0, Nil, None)

    tool.getConf.set(serializedResourceName, tempFile.getName)
    val filesArgWithTempFile = filesArg.map(_ + "," +
tempFile.getAbsolutePath).getOrElse(tempFile.getAbsolutePath)
    ToolRunner.run(tool, ("-files" :: filesArgWithTempFile :: args).toArray)
  }
}

trait Resources[T] {
  private var _resource: T = _
  def resource: T = _resource

  def init(conf: Configuration): Unit = {
    _resource = using(new FileInputStream(new
File(conf.get(serializedResourceName)))) {
      f => using(new ObjectInputStream(f)) {
        in => in.readObject.asInstanceOf[T]
      }
    }
  }
}

abstract class SerializedResourceMapper[T, KI, VI, KO, VO] extends
Mapper[KI, VI, KO, VO] with Resources[T] {
  override def setup(context: Mapper[KI, VI, KO, VO]#Context): Unit = {
    super.setup(context)
    init(context.getConfiguration)
  }
}

abstract class SerializedResourceReducer[T, KI, VI, KO, VO] extends
Reducer[KI, VI, KO, VO] with Resources[T] {
  override def setup(context: Reducer[KI, VI, KO, VO]#Context): Unit = {
    super.setup(context)
    init(context.getConfiguration)
  }
}

abstract class SerializingResourceTool[T <: Serializable] extends
Configured with Tool {
  def resource: T
}

Reply via email to