Hello Bryan & Oliver,
I am using suggestions from both of you to do the bulk upload. The problem
I am running into is that the job that uses 'HFileOutputFormat.
configureIncrementalLoad' is taking very long to complete. One thing I
noticed is that it's using only 1 Reducer.
When I looked at the source code for HFileOutputFormat, I noticed that the
no. of Reducers is determined by this:
List<ImmutableBytesWritable> startKeys = getRegionStartKeys(table);
LOG.info("Configuring " + startKeys.size() + " reduce partitions " +
"to match current region count");
job.setNumReduceTasks(startKeys.size());
When I look at the log I see this:
12/05/16 03:11:02 INFO mapreduce.HFileOutputFormat: Configuring 301 reduce
partitions to match current region count
which implies that the regions were created successfully. But shouldn't
this set the number of Reducers to 301? What am I missing?
Thanks for your help.
On Thu, May 10, 2012 at 9:04 AM, Bryan Beaudreault <[email protected]
> wrote:
> I don't think there is. You need to have a table seeded with the right
> regions in order to run the bulk loader jobs.
>
> My machines are sufficiently fast that it did not take that long to sort.
> One thing I did do to speed this up was add a mapper to the job that
> generates the splits, which would calculate the size of each KeyValue. So
> instead of passing around the KeyValue's I would pass around just the size
> of the KeyValues. You could do a similar thing with the Puts. Here are my
> keys/values for the job in full:
>
> Mapper:
>
> KeyIn: ImmutableBytesWritable
> ValueIn: KeyValue
>
> KeyOut: ImmutableBytesWritable
> ValueOut: IntWritable
>
> Reducer:
>
> KeyIn: ImmutableBytesWritable
> ValueIn: IntWritable
>
> At this point I would just add up the ints from the IntWritable. This cuts
> down drastically on the amount of data passed around in the sort.
>
> Hope this helps. If it is still too slow you might have to experiment with
> using many reducers and making sure you don't have holes or regions that
> are too big due to the way the keys are partitioned. I was lucky enough to
> not have to go that far.
>
>
> On Thu, May 10, 2012 at 11:55 AM, Something Something <
> [email protected]> wrote:
>
> > I am beginning to get a sinking feeling about this :( But I won't give
> up!
> >
> > Problem is that when I use one Reducer the job runs for a long time. I
> > killed it after about an hour. Keep in mind, we do have a decent cluster
> > size. The Map stage completes in a minute & when I set no. of reducers
> to
> > 0 (which is not what we want) the job completes in 12 minutes. In other
> > words, sorting is taking very very long! What could be the problem?
> >
> > Is there no other way to do the bulk upload without first *learning* the
> > data?
> >
> > On Thu, May 10, 2012 at 7:15 AM, Bryan Beaudreault <
> > [email protected]
> > > wrote:
> >
> > > Since our Key was ImmutableByteWritable (representing a rowKey) and the
> > > Value was KeyValue, there could be many KeyValue's per row key (thus
> > values
> > > per hadoop key in the reducer). So yes, what we did is very much the
> > same
> > > as what you described. Hadoop will sort the ImutableByteWritable keys
> > > before sending them to the reducer. This is the primary sort. We then
> > > loop the values for each key, adding up the size of each KeyValue until
> > we
> > > reach the region size. Each time that happens we record the rowKey
> from
> > > the hadoop key and use that as the start key for a new region.
> > >
> > > Secondary sort is not necessary unless the order of the values matter
> for
> > > you. In this case (with the row key as the reducer key), I don't think
> > > that matters.
> > >
> > > On Thu, May 10, 2012 at 3:22 AM, Something Something <
> > > [email protected]> wrote:
> > >
> > > > Thank you Tim & Bryan for the responses. Sorry for the delayed
> > response.
> > > > Got busy with other things.
> > > >
> > > > Bryan - I decided to focus on the region split problem first. The
> > > > challenge here is to find the correct start key for each region,
> right?
> > > > Here are the steps I could think of:
> > > >
> > > > 1) Sort the keys.
> > > > 2) Count how many keys & divide by # of regions we want to create.
> > > (e.g.
> > > > 300). This gives us # of keys in a region (region size).
> > > > 3) Loop thru the sorted keys & every time region size is reached,
> > write
> > > > down region # & starting key. This info can later be used to create
> > the
> > > > table.
> > > >
> > > > Honestly, I am not sure what you mean by "hadoop does this
> > > automatically".
> > > > If you used a single reducer, did you use secondary sort
> > > > (setOutputValueGroupingComparator) to sort the keys? Did you loop
> thru
> > > the
> > > > *values* to find regions? Would appreciate it if you would describe
> > this
> > > > MR job. Thanks.
> > > >
> > > >
> > > > On Wed, May 9, 2012 at 8:25 AM, Bryan Beaudreault
> > > > <[email protected]>wrote:
> > > >
> > > > > I also recently had this problem, trying to index 6+ billion
> records
> > > into
> > > > > HBase. The job would take about 4 hours before it brought down the
> > > > entire
> > > > > cluster, at only around 60% complete.
> > > > >
> > > > > After trying a bunch of things, we went to bulk loading. This is
> > > > actually
> > > > > pretty easy, though the hardest part is that you need to have a
> table
> > > > ready
> > > > > with the region splits you are going to use. Region splits aside,
> > > there
> > > > > are 2 steps:
> > > > >
> > > > > 1) Change your job to instead of executing yours Puts, just output
> > them
> > > > > using context.write. Put is writable. (We used
> > ImmutableBytesWritable
> > > as
> > > > > the Key, representing the rowKey)
> > > > > 2) Add another job that reads that input and configure it
> > > > > using HFileOutputFormat.configureIncrementalLoad(Job job, HTable
> > > table);
> > > > > This will add the right reducer.
> > > > >
> > > > > Once those two have run, you can finalize the process using the
> > > > > completebulkload tool documented at
> > > > > http://hbase.apache.org/bulk-loads.html
> > > > >
> > > > > For the region splits problem, we created another job which sorted
> > all
> > > of
> > > > > the puts by the key (hadoop does this automatically) and had a
> single
> > > > > reducer. It stepped through all of the Puts calculating up the
> total
> > > > size
> > > > > until it reached some threshold. When it did it recorded the
> > bytearray
> > > > and
> > > > > used that for the start of the next region. We used the result of
> > this
> > > > job
> > > > > to create a new table. There is probably a better way to do this
> but
> > > it
> > > > > takes like 20 minutes to write.
> > > > >
> > > > > This whole process took less than an hour, with the bulk load part
> > only
> > > > > taking 15 minutes. Much better!
> > > > >
> > > > > On Wed, May 9, 2012 at 11:08 AM, Something Something <
> > > > > [email protected]> wrote:
> > > > >
> > > > > > Hey Oliver,
> > > > > >
> > > > > > Thanks a "billion" for the response -:) I will take any code you
> > can
> > > > > > provide even if it's a hack! I will even send you an Amazon gift
> > > card
> > > > -
> > > > > > not that you care or need it -:)
> > > > > >
> > > > > > Can you share some performance statistics? Thanks again.
> > > > > >
> > > > > >
> > > > > > On Wed, May 9, 2012 at 8:02 AM, Oliver Meyn (GBIF) <
> [email protected]
> > >
> > > > > wrote:
> > > > > >
> > > > > > > Heya Something,
> > > > > > >
> > > > > > > I had a similar task recently and by far the best way to go
> about
> > > > this
> > > > > is
> > > > > > > with bulk loading after pre-splitting your target table. As
> you
> > > know
> > > > > > > ImportTsv doesn't understand Avro files so I hacked together my
> > own
> > > > > > > ImportAvro class to create the Hfiles that I eventually moved
> > into
> > > > > HBase
> > > > > > > with completebulkload. I haven't committed my class anywhere
> > > because
> > > > > > it's
> > > > > > > a pretty ugly hack, but I'm happy to share it with you as a
> > > starting
> > > > > > point.
> > > > > > > Doing billions of puts will just drive you crazy.
> > > > > > >
> > > > > > > Cheers,
> > > > > > > Oliver
> > > > > > >
> > > > > > > On 2012-05-09, at 4:51 PM, Something Something wrote:
> > > > > > >
> > > > > > > > I ran the following MR job that reads AVRO files & puts them
> on
> > > > > HBase.
> > > > > > > The
> > > > > > > > files have tons of data (billions). We have a fairly decent
> > size
> > > > > > > cluster.
> > > > > > > > When I ran this MR job, it brought down HBase. When I
> > commented
> > > > out
> > > > > > the
> > > > > > > > Puts on HBase, the job completed in 45 seconds (yes that's
> > > > seconds).
> > > > > > > >
> > > > > > > > Obviously, my HBase configuration is not ideal. I am using
> all
> > > the
> > > > > > > default
> > > > > > > > HBase configurations that come out of Cloudera's
> distribution:
> > > > > > > 0.90.4+49.
> > > > > > > >
> > > > > > > > I am planning to read up on the following two:
> > > > > > > >
> > > > > > > > http://hbase.apache.org/book/important_configurations.html
> > > > > > > > http://www.cloudera.com/blog/2011/04/hbase-dos-and-donts/
> > > > > > > >
> > > > > > > > But can someone quickly take a look and recommend a list of
> > > > > priorities,
> > > > > > > > such as "try this first..."? That would be greatly
> > appreciated.
> > > > As
> > > > > > > > always, thanks for the time.
> > > > > > > >
> > > > > > > >
> > > > > > > > Here's the Mapper. (There's no reducer):
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > public class AvroProfileMapper extends
> > > > AvroMapper<GenericData.Record,
> > > > > > > > NullWritable> {
> > > > > > > > private static final Logger logger =
> > > > > > > > LoggerFactory.getLogger(AvroProfileMapper.class);
> > > > > > > >
> > > > > > > > final private String SEPARATOR = "*";
> > > > > > > >
> > > > > > > > private HTable table;
> > > > > > > >
> > > > > > > > private String datasetDate;
> > > > > > > > private String tableName;
> > > > > > > >
> > > > > > > > @Override
> > > > > > > > public void configure(JobConf jobConf) {
> > > > > > > > super.configure(jobConf);
> > > > > > > > datasetDate = jobConf.get("datasetDate");
> > > > > > > > tableName = jobConf.get("tableName");
> > > > > > > >
> > > > > > > > // Open table for writing
> > > > > > > > try {
> > > > > > > > table = new HTable(jobConf, tableName);
> > > > > > > > table.setAutoFlush(false);
> > > > > > > > table.setWriteBufferSize(1024 * 1024 * 12);
> > > > > > > > } catch (IOException e) {
> > > > > > > > throw new RuntimeException("Failed table
> > > construction",
> > > > > e);
> > > > > > > > }
> > > > > > > > }
> > > > > > > >
> > > > > > > > @Override
> > > > > > > > public void map(GenericData.Record record,
> > > > > > AvroCollector<NullWritable>
> > > > > > > > collector,
> > > > > > > > Reporter reporter) throws IOException {
> > > > > > > >
> > > > > > > > String u1 = record.get("u1").toString();
> > > > > > > >
> > > > > > > > GenericData.Array<GenericData.Record> fields =
> > > > > > > > (GenericData.Array<GenericData.Record>) record.get("bag");
> > > > > > > > for (GenericData.Record rec : fields) {
> > > > > > > > Integer s1 = (Integer) rec.get("s1");
> > > > > > > > Integer n1 = (Integer) rec.get("n1");
> > > > > > > > Integer c1 = (Integer) rec.get("c1");
> > > > > > > > Integer freq = (Integer) rec.get("freq");
> > > > > > > > if (freq == null) {
> > > > > > > > freq = 0;
> > > > > > > > }
> > > > > > > >
> > > > > > > > String key = u1 + SEPARATOR + n1 + SEPARATOR + c1
> +
> > > > > > SEPARATOR
> > > > > > > +
> > > > > > > > s1;
> > > > > > > > Put put = new Put(Bytes.toBytes(key));
> > > > > > > > put.setWriteToWAL(false);
> > > > > > > > put.add(Bytes.toBytes("info"),
> > > > Bytes.toBytes("frequency"),
> > > > > > > > Bytes.toBytes(freq.toString()));
> > > > > > > > try {
> > > > > > > > table.put(put);
> > > > > > > > } catch (IOException e) {
> > > > > > > > throw new RuntimeException("Error while
> writing
> > to
> > > > " +
> > > > > > > > table + " table.", e);
> > > > > > > > }
> > > > > > > >
> > > > > > > > }
> > > > > > > > logger.error("------------ Finished processing user:
> "
> > +
> > > > u1);
> > > > > > > > }
> > > > > > > >
> > > > > > > > @Override
> > > > > > > > public void close() throws IOException {
> > > > > > > > table.close();
> > > > > > > > }
> > > > > > > >
> > > > > > > > }
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > Oliver Meyn
> > > > > > > Software Developer
> > > > > > > Global Biodiversity Information Facility (GBIF)
> > > > > > > +45 35 32 15 12
> > > > > > > http://www.gbif.org
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>