Thanks for the prompt response John!
When I say that I'm pre-splitting my table, I mean I am using the 
tableOperations().addSplits(table,splits) command.  I have verified that this 
is correctly splitting my table into 4 tablets and it is being distributed 
across my cloud before I start my map reduce job.

Now, I only kick off the job once, but it appears that 4 separate jobs run (one 
after the other).  The first one reaches 100% in its map phase (and based on my 
output only handled ΒΌ of the data), then the next job starts at 0% and reaches 
100%, and so on.  So I think I'm "only running one mapper at a time in an MR 
job that has 4 mappers total.".  I have 2 mapper slots per node.  My hadoop is 
set up so that one machine is the namenode and the other 3 are datanodes.  This 
gives me 6 slots total.  (This is not congruent to my accumulo where the master 
is also a slave - giving 4 total slaves).

My map reduce job is not a chain job, so all 4 tablets should be able to run at 
the same time.

Here is my job class code below:

import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
import org.apache.accumulo.core.client.mapreduce.AccumuloRowInputFormat;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.log4j.Level;


public class Accumulo_FE_MR_Job extends Configured implements Tool{

       private void runOneTable() throws Exception {
        System.out.println("Running Map Reduce Feature Extraction Job");

        Job job  = new Job(getConf(), getClass().getName());

        job.setJarByClass(getClass());
        job.setJobName("MRFE");

        job.setInputFormatClass(AccumuloRowInputFormat.class);
        AccumuloRowInputFormat.setZooKeeperInstance(job.getConfiguration(),
                HMaxConstants.INSTANCE,
                HMaxConstants.ZOO_SERVERS);

        AccumuloRowInputFormat.setInputInfo(job.getConfiguration(),
                     HMaxConstants.USER,
                HMaxConstants.PASSWORD.getBytes(),
                HMaxConstants.FEATLESS_IMG_TABLE,
                new Authorizations());

        AccumuloRowInputFormat.setLogLevel(job.getConfiguration(), Level.FATAL);

        job.setMapperClass(AccumuloFEMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(DoubleWritable.class);

        job.setNumReduceTasks(4);
        job.setReducerClass(AccumuloFEReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setOutputFormatClass(AccumuloOutputFormat.class);
        AccumuloOutputFormat.setZooKeeperInstance(job.getConfiguration(),
                     HMaxConstants.INSTANCE,
                     HMaxConstants.ZOO_SERVERS);
        AccumuloOutputFormat.setOutputInfo(job.getConfiguration(),
                     HMaxConstants.USER,
                     HMaxConstants.PASSWORD.getBytes(),
                true,
                HMaxConstants.ALL_IMG_TABLE);

        AccumuloOutputFormat.setLogLevel(job.getConfiguration(), Level.FATAL);

        job.waitForCompletion(true);
        if (job.isSuccessful()) {
            System.err.println("Job Successful");
        } else {
            System.err.println("Job Unsuccessful");
        }
     }

       @Override
       public int run(String[] arg0) throws Exception {
              runOneTable();
              return 0;
       }
}

Thanks,
Duane

From: John Vines [mailto:[email protected]]
Sent: Friday, November 02, 2012 5:04 PM
To: [email protected]
Subject: Re: Accumulo Map Reduce is not distributed

This sounds like an issue with how your MR environment is configured and/or how 
you're kicking off your mapreduce.

Accumulo's input formats with automatically set the number of mappers to the 
number of tablets you have, so you should have seen your job go from 1 mapper 
to 4. What you describe is you now do 4 MR jobs instead of just one, is that 
correct? Because that doesn't make a lot of sense, unless by presplitting your 
table you meant you now have 4 different support tables. Or do you mean that 
you're only running one mapper at a time in an MR job that has 4 mappers total?

I believe it's somewhere in your kickoff that things may be a bit misconstrued. 
Just so I'm clear, how many mapper slots do you have per node, is your job a 
chain MR job, and do you mind sharing your code which sets up and kicks off 
your MR job so I have an idea of what could be kicking off 4 jobs.

John

On Fri, Nov 2, 2012 at 4:53 PM, Cornish, Duane C. 
<[email protected]<mailto:[email protected]>> wrote:
Hello,

I apologize if this discuss should be directed to a hadoop map reduce forum, 
however, I have some concern that my problem may be with my use of accumulo.

I have a map reduce job that I want to run over data in a table.  I have an 
index table and a support table which contains a subset of the data in the 
index table.  I would like to map reduce over the support table on my small 4 
node cluster.

I have written a map reduce job that uses the AccumuloRowInputFormat class and 
sets the support table as its input table.

In my mapper, I read in a row of the support table, and make a call to a static 
function which pulls information out of the index table.  Next, I use the data 
pulled back from the function call as input to a call to an external .so file 
that is stored on the name node.  I then make another static function call to 
ingest the new data back into the index table.  (I know I could emit this in 
the reduce step, but what I'm ingesting is formatted in a somewhat complex java 
object and I already had a static function that ingested it the way I needed 
it.)  My reduce step is completely empty.

I output print statements from my mapper to see my progress.  The problem that 
I'm getting is that my entire job appears to run in sequence not in parallel.  
I am running it from the accumulo master on the 4 node system.

I realized that my support table is very small and was not being split across 
any tables.  I am now presplitting this table across all 4 nodes.  Now, when I 
run the map reduce job it appears that 4 separate map reduce jobs run one after 
each other.  The first map reduce job runs, gets to 100%, then the next map 
reduce job runs, etc.  The job is only called once, why are there 4 jobs 
running?  Why won't these jobs run in parallel?

Is there any way to set the number of tasks that can run?  This is possible 
from the hadoop command line, is it possible from the java API? Also, could my 
problem stem from the fact that during my mapper I am making static function 
calls to another class in my java project, accessing my accumulo index table, 
or making a call to an exteral .so library?  I could restructure the job to 
avoid making static function calls and I could write directly to the Accumulo 
table from my map reduce job if that would fix my problem.  I can't avoid 
making the external .so library call.  Any help would be greatly appreciated.

Thanks,
Duane

Reply via email to