Multiple threads per JVM
------------------------
Key: MAPREDUCE-2123
URL: https://issues.apache.org/jira/browse/MAPREDUCE-2123
Project: Hadoop Map/Reduce
Issue Type: Improvement
Reporter: Randy Wilson
I have a process that standardizes name and place strings, and requires access
to java objects that require a lot of RAM (800MB). Hadoop (via Amazon elastic
mapreduce) was running out of memory, because it was firing up one JVM per task
per slave. Each JVM needed 1.5GB, and 6 of those blew out memory.
In this case, we don't need 6 different JVMs running--we only need one, but
with multiple threads. I tried using a MultithreadedMapper, but it doesn't
have a thread-safe "run()" method: it makes 3 calls to the input source to read
one "line", which doesn't work if multiple threads are doing that. So I had to
override the run() method. I ended up having to do so much work to override
the run() method that it was simpler to skip using the MultithreadedMapper at
all. Instead, I took my original mapper and just overrode the run() method
there directly. I fired up n threads, each of which called a method that had a
synchronized(mutex) around the part of the process that made the three calls to
an input source to get the next line to operate on. Then, outside of the
synchronized block, it called the map() method, which made use of the large,
shared (singleton) standardization object.
All of this made me wonder why hadoop fires up multiple JVMs per slave in the
first place--that is a lot of overhead to use per thread. I've also been
warned that doing continual reuse of JVMs instead of restarting one per task
will use up more memory. That seems like it should only be true if hadoop (or
our mapper) is leaking memory. If that's the case, let's get it fixed.
My guess is that since hadoop can run tasks in languages other than Java--and
since other languages may have less overhead per process--that firing up a JVM
per task (or per thread) simplifies hadoop. But the multithreaded solution we
did was very general-purpose. It seems like it ought to be pretty much the
default solution in java, and that a "...map.threads" property should be all
that is required to fire up that many threads to help with each task, rather
than have to jump through the hoops we had to.
Below is the implementation that seems to be working:
In the main class:
Configuration config = getConf();
config.set("num_threads_per_jvm", Integer.toString(numThreads));
Job job = new Job(config, "Standardize stuff");
In the Mapper class:
public void run(final Context context) throws IOException,
InterruptedException {
int numThreads =
Integer.parseInt(context.getConfiguration().get("num_threads_per_jvm");
setup(context); // setup and cleanup just once, rather than once per thread
List<MapRunner> mapRunners = new ArrayList<MapRunner>();
for (int i = 0; i < numThreads; i++) {
MapRunner mapRunner = new MapRunner(context, i);
mapRunners.add(mapRunner);
mapRunner.start();
}
// Wait for all the threads to complete
for (MapRunner mapRunner : mapRunners) {
mapRunner.join();
}
cleanup(context);
}
private class MapRunner extends Thread {
final Context context;
private MapRunner(Context context) {
this.context = context;
}
@Override
public void run() {
boolean gotValue = true;
do {
try {
Text key = null;
Text value = null;
synchronized(contextMutex) {
gotValue = context.nextKeyValue();
if (gotValue) {
key = context.getCurrentKey();
value = context.getCurrentValue();
}
}
if (gotValue) {
map(key, value, context);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
} while (gotValue);
}
}
--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.