Multiple threads per JVM
------------------------

                 Key: MAPREDUCE-2123
                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-2123
             Project: Hadoop Map/Reduce
          Issue Type: Improvement
            Reporter: Randy Wilson


I have a process that standardizes name and place strings, and requires access 
to java objects that require a lot of RAM (800MB).  Hadoop (via Amazon elastic 
mapreduce) was running out of memory, because it was firing up one JVM per task 
per slave.  Each JVM needed 1.5GB, and 6 of those blew out memory.

In this case, we don't need 6 different JVMs running--we only need one, but 
with multiple threads.  I tried using a MultithreadedMapper, but it doesn't 
have a thread-safe "run()" method: it makes 3 calls to the input source to read 
one "line", which doesn't work if multiple threads are doing that.  So I had to 
override the run() method.  I ended up having to do so much work to override 
the run() method that it was simpler to skip using the MultithreadedMapper at 
all.  Instead, I took my original mapper and just overrode the run() method 
there directly.  I fired up n threads, each of which called a method that had a 
synchronized(mutex) around the part of the process that made the three calls to 
an input source to get the next line to operate on.  Then, outside of the 
synchronized block, it called the map() method, which made use of the large, 
shared (singleton) standardization object.

All of this made me wonder why hadoop fires up multiple JVMs per slave in the 
first place--that is a lot of overhead to use per thread.   I've also been 
warned that doing continual reuse of JVMs instead of restarting one per task 
will use up more memory.  That seems like it should only be true if hadoop (or 
our mapper) is leaking memory.  If that's the case, let's get it fixed.

My guess is that since hadoop can run tasks in languages other than Java--and 
since other languages may have less overhead per process--that firing up a JVM 
per task (or per thread) simplifies hadoop.  But the multithreaded solution we 
did was very general-purpose.  It seems like it ought to be pretty much the 
default solution in java, and that a "...map.threads" property should be all 
that is required to fire up that many threads to help with each task, rather 
than have to jump through the hoops we had to.

Below is the implementation that seems to be working:

In the main class:
    Configuration config = getConf();
    config.set("num_threads_per_jvm", Integer.toString(numThreads));
    Job job = new Job(config, "Standardize stuff");

In the Mapper class:
  public void run(final Context context) throws IOException, 
InterruptedException {
    int numThreads = 
Integer.parseInt(context.getConfiguration().get("num_threads_per_jvm");
    setup(context); // setup and cleanup just once, rather than once per thread
    List<MapRunner> mapRunners = new ArrayList<MapRunner>();
    for (int i = 0; i < numThreads; i++) {
      MapRunner mapRunner = new MapRunner(context, i);
      mapRunners.add(mapRunner);
      mapRunner.start();
    }
     // Wait for all the threads to complete
    for (MapRunner mapRunner : mapRunners) {
      mapRunner.join();
    }
    cleanup(context);
  }

  private class MapRunner extends Thread {
    final Context context;

    private MapRunner(Context context) {
      this.context = context;
    }

    @Override
    public void run() {
      boolean gotValue = true;
      do {
        try {
          Text key = null;
          Text value = null;
          synchronized(contextMutex) {
            gotValue = context.nextKeyValue();
            if (gotValue) {
              key = context.getCurrentKey();
              value = context.getCurrentValue();
            }
          }
          if (gotValue) {
            map(key, value, context);
          }
        } catch (Exception e) {
          throw new RuntimeException(e);
        }
      } while (gotValue);
    }
  }

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to