Having been down this road recently, the best results I've been able
to obtain is to use the Distributed Cache and Java serialization; if
your LexicalizedParser is not serializable then you'll have to jump
through more hoops, but the basic approach can be the same.
Here's some Scala code that I use to set things up and retrieve them
properly through the Distributed Cache:
package socialmedia.common.hadoop
import socialmedia.common.util.Util._
import org.apache.hadoop.conf._
import org.apache.hadoop.filecache._
import org.apache.hadoop.fs._
import org.apache.hadoop.io._
import java.io.ObjectInputStream
import java.io.ObjectOutputStream
import java.io.Serializable
import java.util.UUID
import net.lag.logging.Logger
import DistCacheResources._
object DistCacheResources {
private val log = Logger.get
val serializedResourceDir = "dist_cache_resource.serfile"
val serializedResourceUUID = "dist_cache_resource.uuid"
def addToDistCache[T <: Serializable](conf: Configuration, uuid:
UUID, resource: T): Unit = {
using(FileSystem.get(conf)) { fs =>
val path = new Path(fs.getHomeDirectory, new
Path(serializedResourceDir, uuid.toString + ".ser"))
log.info("Writing serialized resource to " + path)
using(fs.create(path)) {
out => using(new ObjectOutputStream(out)) {
oout => oout.writeObject(resource)
}
}
conf.set(serializedResourceUUID, uuid.toString)
DistributedCache.addCacheFile(path.toUri, conf)
}
}
}
trait DistCacheResources[T] extends Configurable {
private var _resource: Option[T] = _
private var _conf: Configuration = _
def resource: Option[T] = _resource
override def setConf(conf: Configuration) = _conf = conf
override def getConf = _conf
def init(conf: Configuration): Unit = {
if (conf == null) error("Configuration was null.")
val uuid = UUID.fromString(conf.get(serializedResourceUUID))
log.info("Will attempt to obtain serialized resource for uuid: " + uuid)
val cacheFiles = DistributedCache.getLocalCacheFiles(conf)
log.info("Available cache files: " + cacheFiles.mkString)
_resource = cacheFiles.find(_.getName == uuid.toString + ".ser").map {
resPath => using(FileSystem.getLocal(conf)) {
fs => using(fs.open(resPath)) {
in => using(new ObjectInputStream(in)) {
oin => oin.readObject.asInstanceOf[T]
}
}
}
}
}
}
Then, you can have something like :
class MyMapper extends Mapper[BW, BW, Text, BW] with
DistCacheResources[LexicalizedParser] {
override protected def setup(context: HMapper[BW, BW, Text,
BW]#Context): Unit = {
init(getConf)
}
// ...
}
And, in your job configuration, you just call
DistCacheResources.addToDistCache(...)
Kris
On Tue, Apr 20, 2010 at 5:16 AM, Rishav <[email protected]> wrote:
> Hello
>
> I am trying to pass an object of type LexicalizedParser (which is from an
> imported jar file stanford-parser) from the main method to the Mapper Class.
> This is to load the trained model first and then apply on every file offset
> in the map function in the Mapper Class.
> Could you tell me the best way to pass an object of any type to the Mapper?
>
> Thanking you
> Rishav
>
>
> --
> Rishav Bhowmick
> Computer Science -2010
> Carnegie Mellon University
> Tel: +974-561-4371
> e-mail: [email protected]
>
>