I attach the code than I'm executing. I don't have accss to the generator
to HBase.
In the last benchmark, simple scan takes about 4 times less than this
version.

With that version is available just to do complete scans.
I have been trying a complete scan of a HTable with 100.000 rows and it
takes less than one second, is it not too fast???




2014-09-14 20:21 GMT+02:00 Guillermo Ortiz <[email protected]>:

> I don't have the code here. But I created a class RegionScanner, this
> class does a complete scan of a region. So I have to set the start and stop
> keys. the start and stop key are the limits of that region.
>
> El domingo, 14 de septiembre de 2014, Anoop John <[email protected]>
> escribió:
>
> Again full code snippet can better speak.
>>
>> But not getting what u r doing with below code
>>
>> private List<RegionScanner> generatePartitions() {
>>         List<RegionScanner> regionScanners = new
>> ArrayList<RegionScanner>();
>>         byte[] startKey;
>>         byte[] stopKey;
>>         HConnection connection = null;
>>         HBaseAdmin hbaseAdmin = null;
>>         try {
>>             connection = HConnectionManager.
>> createConnection(HBaseConfiguration.create());
>>             hbaseAdmin = new HBaseAdmin(connection);
>>             List<HRegionInfo> regions =
>> hbaseAdmin.getTableRegions(scanConfiguration.getTable());
>>             RegionScanner regionScanner = null;
>>             for (HRegionInfo region : regions) {
>>
>>                 startKey = region.getStartKey();
>>                 stopKey = region.getEndKey();
>>
>>                 regionScanner = new RegionScanner(startKey, stopKey,
>> scanConfiguration);
>>                 // regionScanner = createRegionScanner(startKey, stopKey);
>>                 if (regionScanner != null) {
>>                     regionScanners.add(regionScanner);
>>                 }
>>             }
>>
>> And I execute the RegionScanner with this:
>> public List<Result> call() throws Exception {
>>         HConnection connection =
>> HConnectionManager.
>> createConnection(HBaseConfiguration.create());
>>         HTableInterface table =
>> connection.getTable(configuration.getTable());
>>
>>     Scan scan = new Scan(startKey, stopKey);
>>         scan.setBatch(configuration.getBatch());
>>         scan.setCaching(configuration.getCaching());
>>         ResultScanner resultScanner = table.getScanner(scan);
>>
>>
>> What is this part?
>> new RegionScanner(startKey, stopKey,
>> scanConfiguration);
>>
>>
>> >>Scan scan = new Scan(startKey, stopKey);
>>         scan.setBatch(configuration.
>> getBatch());
>>         scan.setCaching(configuration.getCaching());
>>         ResultScanner resultScanner = table.getScanner(scan);
>>
>>
>> And not setting start and stop rows to this Scan object? !!
>>
>>
>> Sorry If I missed some parts from ur code.
>>
>> -Anoop-
>>
>>
>> On Sun, Sep 14, 2014 at 2:54 PM, Guillermo Ortiz <[email protected]>
>> wrote:
>>
>> > I don't have the code here,, but I'll put the code in a couple of days.
>> I
>> > have to check the executeservice again! I don't remember exactly how I
>> did.
>> >
>> > I'm using Hbase 0.98.
>> >
>> > El domingo, 14 de septiembre de 2014, lars hofhansl <[email protected]>
>> > escribió:
>> >
>> > > What specific version of 0.94 are you using?
>> > >
>> > > In general, if you have multiple spindles (disks) and/or multiple CPU
>> > > cores at the region server you should benefits from keeping multiple
>> > region
>> > > server handler threads busy. I have experimented with this before and
>> > saw a
>> > > close to linear speed up (up to the point where all disks/core were
>> > busy).
>> > > Obviously this also assuming this is the only load you throw at the
>> > servers
>> > > at this point.
>> > >
>> > > Can you post your complete code to pastebin? Maybe even with some
>> code to
>> > > seed the data?
>> > > How do you run your callables? Did you configure the ExecuteService
>> > > correctly (assuming you use one to run your callables)?
>> > >
>> > > Then we can run it and have a look.
>> > >
>> > > Thanks.
>> > >
>> > > -- Lars
>> > >
>> > >
>> > > ----- Original Message -----
>> > > From: Guillermo Ortiz <[email protected] <javascript:;>>
>> > > To: "[email protected] <javascript:;>" <[email protected]
>> > > <javascript:;>>
>> > > Cc:
>> > > Sent: Saturday, September 13, 2014 4:49 PM
>> > > Subject: Re: Scan vs Parallel scan.
>> > >
>> > > What am I missing??
>> > >
>> > >
>> > >
>> > >
>> > > 2014-09-12 16:05 GMT+02:00 Guillermo Ortiz <[email protected]
>> > > <javascript:;>>:
>> > >
>> > > > For an partial scan, I guess that I call to the RS to get data, it
>> > starts
>> > > > looking in the store files and recollecting the data. (It doesn't
>> write
>> > > to
>> > > > the blockcache in both cases). It has ready the data and it gives to
>> > the
>> > > > client the data step by step, I mean,,, it depends the caching and
>> > > batching
>> > > > parameters.
>> > > >
>> > > > Big differences that I see...
>> > > > I'm opening more connections to the Table, one for Region.
>> > > >
>> > > > I should check the single table scan, it looks like it does partial
>> > scans
>> > > > sequentially. Since you can see on the HBase Master how the request
>> > > > increase one after another, not all in the same time.
>> > > >
>> > > > 2014-09-12 15:23 GMT+02:00 Michael Segel <[email protected]
>> > > <javascript:;>>:
>> > > >
>> > > >> It doesn’t matter which RS, but that you have 1 thread for each
>> > region.
>> > > >>
>> > > >> So for each thread, what’s happening.
>> > > >> Step by step, what is the code doing.
>> > > >>
>> > > >> Now you’re comparing this against a single table scan, right?
>> > > >> What’s happening in the table scan…?
>> > > >>
>> > > >>
>> > > >> On Sep 12, 2014, at 2:04 PM, Guillermo Ortiz <[email protected]
>> > > <javascript:;>>
>> > > >> wrote:
>> > > >>
>> > > >> > Right, My table for example has keys between 0-9. in three
>> regions
>> > > >> > 0-2,3-7,7-9
>> > > >> > I lauch three partial scans in parallel. The scans that I'm
>> > executing
>> > > >> are:
>> > > >> > scan(0,2), scan(3,7), scan(7,9).
>> > > >> > Each region is if a different RS, so each thread goes to
>> different
>> > RS.
>> > > >> It's
>> > > >> > not exactly like that, but on the benchmark case it's like it's
>> > > working.
>> > > >> >
>> > > >> > Really the code will execute a thread for each Region not for
>> each
>> > > >> > RegionServer. But in the test I only have two regions for
>> > > regionServer.
>> > > >> I
>> > > >> > dont' think that's an important point, there're two threads for
>> RS.
>> > > >> >
>> > > >> > 2014-09-12 14:48 GMT+02:00 Michael Segel <
>> [email protected]
>> > > <javascript:;>>:
>> > > >> >
>> > > >> >> Ok, lets again take a step back…
>> > > >> >>
>> > > >> >> So you are comparing your partial scan(s) against a full table
>> > scan?
>> > > >> >>
>> > > >> >> If I understood your question, you launch 3 partial scans where
>> you
>> > > set
>> > > >> >> the start row and then end row of each scan, right?
>> > > >> >>
>> > > >> >> On Sep 12, 2014, at 9:16 AM, Guillermo Ortiz <
>> [email protected]
>> > > <javascript:;>>
>> > > >> wrote:
>> > > >> >>
>> > > >> >>> Okay, then, the partial scan doesn't work as I think.
>> > > >> >>> How could it exceed the limit of a single region if I calculate
>> > the
>> > > >> >> limits?
>> > > >> >>>
>> > > >> >>>
>> > > >> >>> The only bad point that I see it's that If a region server has
>> > three
>> > > >> >>> regions of the same table,  I'm executing three partial scans
>> > about
>> > > >> this
>> > > >> >> RS
>> > > >> >>> and they could compete for resources (network, etc..) on this
>> > node.
>> > > >> It'd
>> > > >> >> be
>> > > >> >>> better to have one thread for RS. But, that doesn't answer your
>> > > >> >> questions.
>> > > >> >>>
>> > > >> >>> I keep thinking...
>> > > >> >>>
>> > > >> >>> 2014-09-12 9:40 GMT+02:00 Michael Segel <
>> > [email protected]
>> > > <javascript:;>>:
>> > > >> >>>
>> > > >> >>>> Hi,
>> > > >> >>>>
>> > > >> >>>> I wanted to take a step back from the actual code and to stop
>> and
>> > > >> think
>> > > >> >>>> about what you are doing and what HBase is doing under the
>> > covers.
>> > > >> >>>>
>> > > >> >>>> So in your code, you are asking HBase to do 3 separate scans
>> and
>> > > then
>> > > >> >> you
>> > > >> >>>> take the result set back and join it.
>> > > >> >>>>
>> > > >> >>>> What does HBase do when it does a range scan?
>> > > >> >>>> What happens when that range scan exceeds a single region?
>> > > >> >>>>
>> > > >> >>>> If you answer those questions… you’ll have your answer.
>> > > >> >>>>
>> > > >> >>>> HTH
>> > > >> >>>>
>> > > >> >>>> -Mike
>> > > >> >>>>
>> > > >> >>>> On Sep 12, 2014, at 8:34 AM, Guillermo Ortiz <
>> > [email protected]
>> > > <javascript:;>>
>> > > >> >> wrote:
>> > > >> >>>>
>> > > >> >>>>> It's not all the code, I set things like these as well:
>> > > >> >>>>> scan.setMaxVersions();
>> > > >> >>>>> scan.setCacheBlocks(false);
>> > > >> >>>>> ...
>> > > >> >>>>>
>> > > >> >>>>> 2014-09-12 9:33 GMT+02:00 Guillermo Ortiz <
>> [email protected]
>> > > <javascript:;>>:
>> > > >> >>>>>
>> > > >> >>>>>> yes, that is. I have changed the HBase version to 0.98
>> > > >> >>>>>>
>> > > >> >>>>>> I got the start and stop keys with this method:
>> > > >> >>>>>> private List<RegionScanner> generatePartitions() {
>> > > >> >>>>>>      List<RegionScanner> regionScanners = new
>> > > >> >>>>>> ArrayList<RegionScanner>();
>> > > >> >>>>>>      byte[] startKey;
>> > > >> >>>>>>      byte[] stopKey;
>> > > >> >>>>>>      HConnection connection = null;
>> > > >> >>>>>>      HBaseAdmin hbaseAdmin = null;
>> > > >> >>>>>>      try {
>> > > >> >>>>>>          connection = HConnectionManager.
>> > > >> >>>>>> createConnection(HBaseConfiguration.create());
>> > > >> >>>>>>          hbaseAdmin = new HBaseAdmin(connection);
>> > > >> >>>>>>          List<HRegionInfo> regions =
>> > > >> >>>>>> hbaseAdmin.getTableRegions(scanConfiguration.getTable());
>> > > >> >>>>>>          RegionScanner regionScanner = null;
>> > > >> >>>>>>          for (HRegionInfo region : regions) {
>> > > >> >>>>>>
>> > > >> >>>>>>              startKey = region.getStartKey();
>> > > >> >>>>>>              stopKey = region.getEndKey();
>> > > >> >>>>>>
>> > > >> >>>>>>              regionScanner = new RegionScanner(startKey,
>> > stopKey,
>> > > >> >>>>>> scanConfiguration);
>> > > >> >>>>>>              // regionScanner =
>> createRegionScanner(startKey,
>> > > >> >>>> stopKey);
>> > > >> >>>>>>              if (regionScanner != null) {
>> > > >> >>>>>>                  regionScanners.add(regionScanner);
>> > > >> >>>>>>              }
>> > > >> >>>>>>          }
>> > > >> >>>>>>
>> > > >> >>>>>> And I execute the RegionScanner with this:
>> > > >> >>>>>> public List<Result> call() throws Exception {
>> > > >> >>>>>>      HConnection connection =
>> > > >> >>>>>>
>> > HConnectionManager.createConnection(HBaseConfiguration.create());
>> > > >> >>>>>>      HTableInterface table =
>> > > >> >>>>>> connection.getTable(configuration.getTable());
>> > > >> >>>>>>
>> > > >> >>>>>>  Scan scan = new Scan(startKey, stopKey);
>> > > >> >>>>>>      scan.setBatch(configuration.getBatch());
>> > > >> >>>>>>      scan.setCaching(configuration.getCaching());
>> > > >> >>>>>>      ResultScanner resultScanner = table.getScanner(scan);
>> > > >> >>>>>>
>> > > >> >>>>>>      List<Result> results = new ArrayList<Result>();
>> > > >> >>>>>>      for (Result result : resultScanner) {
>> > > >> >>>>>>          results.add(result);
>> > > >> >>>>>>      }
>> > > >> >>>>>>
>> > > >> >>>>>>      connection.close();
>> > > >> >>>>>>      table.close();
>> > > >> >>>>>>
>> > > >> >>>>>>      return results;
>> > > >> >>>>>>  }
>> > > >> >>>>>>
>> > > >> >>>>>> They implement Callable.
>> > > >> >>>>>>
>> > > >> >>>>>>
>> > > >> >>>>>> 2014-09-12 9:26 GMT+02:00 Michael Segel <
>> > > [email protected] <javascript:;>
>> > > >> >:
>> > > >> >>>>>>
>> > > >> >>>>>>> Lets take a step back….
>> > > >> >>>>>>>
>> > > >> >>>>>>> Your parallel scan is having the client create N threads
>> where
>> > > in
>> > > >> >> each
>> > > >> >>>>>>> thread, you’re doing a partial scan of the table where each
>> > > >> partial
>> > > >> >>>> scan
>> > > >> >>>>>>> takes the first and last row of each region?
>> > > >> >>>>>>>
>> > > >> >>>>>>> Is that correct?
>> > > >> >>>>>>>
>> > > >> >>>>>>> On Sep 12, 2014, at 7:36 AM, Guillermo Ortiz <
>> > > >> [email protected] <javascript:;>>
>> > > >> >>>>>>> wrote:
>> > > >> >>>>>>>
>> > > >> >>>>>>>> I was checking a little bit more about,, I checked the
>> > cluster
>> > > >> and
>> > > >> >>>> data
>> > > >> >>>>>>> is
>> > > >> >>>>>>>> store in three different regions servers, each one in a
>> > > >> differente
>> > > >> >>>> node.
>> > > >> >>>>>>>> So, I guess the threads go to different hard-disks.
>> > > >> >>>>>>>>
>> > > >> >>>>>>>> If someone has an idea or suggestion.. why it's faster a
>> > single
>> > > >> scan
>> > > >> >>>>>>> than
>> > > >> >>>>>>>> this implementation. I based on this implementation
>> > > >> >>>>>>>> https://github.com/zygm0nt/hbase-distributed-search
>> > > >> >>>>>>>>
>> > > >> >>>>>>>> 2014-09-11 12:05 GMT+02:00 Guillermo Ortiz <
>> > > [email protected] <javascript:;>
>> > > >> >:
>> > > >> >>>>>>>>
>> > > >> >>>>>>>>> I'm working with HBase 0.94 for this case,, I'll try with
>> > > 0.98,
>> > > >> >>>>>>> although
>> > > >> >>>>>>>>> there is not difference.
>> > > >> >>>>>>>>> I disabled the table and disabled the blockcache for that
>> > > family
>> > > >> >> and
>> > > >> >>>> I
>> > > >> >>>>>>> put
>> > > >> >>>>>>>>> scan.setBlockcache(false) as well for both cases.
>> > > >> >>>>>>>>>
>> > > >> >>>>>>>>> I think that it's not possible that I executing an
>> complete
>> > > scan
>> > > >> >> for
>> > > >> >>>>>>> each
>> > > >> >>>>>>>>> thread since my data are the type:
>> > > >> >>>>>>>>> 000001 f:q value=1
>> > > >> >>>>>>>>> 000002 f:q value=2
>> > > >> >>>>>>>>> 000003 f:q value=3
>> > > >> >>>>>>>>> ...
>> > > >> >>>>>>>>>
>> > > >> >>>>>>>>> I add all the values and get the same result on a single
>> > scan
>> > > >> than
>> > > >> >> a
>> > > >> >>>>>>>>> distributed, so, I guess that DistributedScan did well.
>> > > >> >>>>>>>>> The count from the hbase shell takes about 10-15seconds,
>> I
>> > > don't
>> > > >> >>>>>>> remember,
>> > > >> >>>>>>>>> but like 4x  of the scan time.
>> > > >> >>>>>>>>> I'm not using any filter for the scans.
>> > > >> >>>>>>>>>
>> > > >> >>>>>>>>> This is the way I calculate number of regions/scans
>> > > >> >>>>>>>>> private List<RegionScanner> generatePartitions() {
>> > > >> >>>>>>>>>     List<RegionScanner> regionScanners = new
>> > > >> >>>>>>>>> ArrayList<RegionScanner>();
>> > > >> >>>>>>>>>     byte[] startKey;
>> > > >> >>>>>>>>>     byte[] stopKey;
>> > > >> >>>>>>>>>     HConnection connection = null;
>> > > >> >>>>>>>>>     HBaseAdmin hbaseAdmin = null;
>> > > >> >>>>>>>>>     try {
>> > > >> >>>>>>>>>         connection =
>> > > >> >>>>>>>>>
>> > > >> HConnectionManager.createConnection(HBaseConfiguration.create());
>> > > >> >>>>>>>>>         hbaseAdmin = new HBaseAdmin(connection);
>> > > >> >>>>>>>>>         List<HRegionInfo> regions =
>> > > >> >>>>>>>>> hbaseAdmin.getTableRegions(scanConfiguration.getTable());
>> > > >> >>>>>>>>>         RegionScanner regionScanner = null;
>> > > >> >>>>>>>>>         for (HRegionInfo region : regions) {
>> > > >> >>>>>>>>>
>> > > >> >>>>>>>>>             startKey = region.getStartKey();
>> > > >> >>>>>>>>>             stopKey = region.getEndKey();
>> > > >> >>>>>>>>>
>> > > >> >>>>>>>>>             regionScanner = new RegionScanner(startKey,
>> > > stopKey,
>> > > >> >>>>>>>>> scanConfiguration);
>> > > >> >>>>>>>>>             // regionScanner =
>> createRegionScanner(startKey,
>> > > >> >>>>>>> stopKey);
>> > > >> >>>>>>>>>             if (regionScanner != null) {
>> > > >> >>>>>>>>>                 regionScanners.add(regionScanner);
>> > > >> >>>>>>>>>             }
>> > > >> >>>>>>>>>         }
>> > > >> >>>>>>>>>
>> > > >> >>>>>>>>> I did some test for a tiny table and I think that the
>> range
>> > > for
>> > > >> >> each
>> > > >> >>>>>>> scan
>> > > >> >>>>>>>>> works fine. Although, I though that it was interesting
>> that
>> > > the
>> > > >> >> time
>> > > >> >>>>>>> when I
>> > > >> >>>>>>>>> execute distributed scan is about 6x.
>> > > >> >>>>>>>>>
>> > > >> >>>>>>>>> I'm going to check about the hard disks, but I think that
>> > ti's
>> > > >> >> right.
>> > > >> >>>>>>>>>
>> > > >> >>>>>>>>>
>> > > >> >>>>>>>>>
>> > > >> >>>>>>>>>
>> > > >> >>>>>>>>> 2014-09-11 7:50 GMT+02:00 lars hofhansl <
>> [email protected]
>> > > <javascript:;>>:
>> > > >> >>>>>>>>>
>> > > >> >>>>>>>>>> Which version of HBase?
>> > > >> >>>>>>>>>> Can you show us the code?
>> > > >> >>>>>>>>>>
>> > > >> >>>>>>>>>>
>> > > >> >>>>>>>>>> Your parallel scan with caching 100 takes about 6x as
>> long
>> > as
>> > > >> the
>> > > >> >>>>>>> single
>> > > >> >>>>>>>>>> scan, which is suspicious because you say you have 6
>> > regions.
>> > > >> >>>>>>>>>> Are you sure you're not accidentally scanning all the
>> data
>> > in
>> > > >> each
>> > > >> >>>> of
>> > > >> >>>>>>>>>> your parallel scans?
>> > > >> >>>>>>>>>>
>> > > >> >>>>>>>>>> -- Lars
>> > > >> >>>>>>>>>>
>> > > >> >>>>>>>>>>
>> > > >> >>>>>>>>>>
>> > > >> >>>>>>>>>> ________________________________
>> > > >> >>>>>>>>>> From: Guillermo Ortiz <[email protected]
>> > <javascript:;>>
>> > > >> >>>>>>>>>> To: "[email protected] <javascript:;>" <
>> > > [email protected] <javascript:;>>
>> > > >> >>>>>>>>>> Sent: Wednesday, September 10, 2014 1:40 AM
>> > > >> >>>>>>>>>> Subject: Scan vs Parallel scan.
>> > > >> >>>>>>>>>>
>> > > >> >>>>>>>>>>
>> > > >> >>>>>>>>>> Hi,
>> > > >> >>>>>>>>>>
>> > > >> >>>>>>>>>> I developed an distributed scan, I create an thread for
>> > each
>> > > >> >> region.
>> > > >> >>>>>>> After
>> > > >> >>>>>>>>>> that, I've tried to get some times Scan vs
>> DistributedScan.
>> > > >> >>>>>>>>>> I have disabled blockcache in my table. My cluster has 3
>> > > region
>> > > >> >>>>>>> servers
>> > > >> >>>>>>>>>> with 2 regions each one, in total there are 100.000 rows
>> > and
>> > > >> >>>> execute a
>> > > >> >>>>>>>>>> complete scan.
>> > > >> >>>>>>>>>>
>> > > >> >>>>>>>>>> My partitions are
>> > > >> >>>>>>>>>> -01666 -> request 16665
>> > > >> >>>>>>>>>> 016666-033332 -> request 16666
>> > > >> >>>>>>>>>> 033332-049998 -> request 16666
>> > > >> >>>>>>>>>> 049998-066664 -> request 16666
>> > > >> >>>>>>>>>> 066664-083330 -> request 16666
>> > > >> >>>>>>>>>> 083330- -> request 16671
>> > > >> >>>>>>>>>>
>> > > >> >>>>>>>>>>
>> > > >> >>>>>>>>>> 14/09/10 09:15:47 INFO hbase.HbaseScanTest: NUM ROWS
>> 100000
>> > > >> >>>>>>>>>> 14/09/10 09:15:47 INFO util.TimerUtil: SCAN
>> > > >> >>>>>>> PARALLEL:22089ms,Counter:2 ->
>> > > >> >>>>>>>>>> Caching 10
>> > > >> >>>>>>>>>>
>> > > >> >>>>>>>>>> 14/09/10 09:16:04 INFO hbase.HbaseScanTest: NUM ROWS
>> 100000
>> > > >> >>>>>>>>>> 14/09/10 09:16:04 INFO util.TimerUtil: SCAN
>> > > >> >>>>>>> PARALJEL:16598ms,Counter:2 ->
>> > > >> >>>>>>>>>> Caching 100
>> > > >> >>>>>>>>>>
>> > > >> >>>>>>>>>> 14/09/10 09:16:22 INFO hbase.HbaseScanTest: NUM ROWS
>> 100000
>> > > >> >>>>>>>>>> 14/09/10 09:16:22 INFO util.TimerUtil: SCAN
>> > > >> >>>>>>> PARALLEL:16497ms,Counter:2 ->
>> > > >> >>>>>>>>>> Caching 1000
>> > > >> >>>>>>>>>>
>> > > >> >>>>>>>>>> 14/09/10 09:17:41 INFO hbase.HbaseScanTest: NUM ROWS
>> 100000
>> > > >> >>>>>>>>>> 14/09/10 09:17:41 INFO util.TimerUtil: SCAN
>> > > >> >> NORMAL:68288ms,Counter:2
>> > > >> >>>>>>> ->
>> > > >> >>>>>>>>>> Caching 1
>> > > >> >>>>>>>>>>
>> > > >> >>>>>>>>>> 14/09/10 09:17:48 INFO hbase.HbaseScanTest: NUM ROWS
>> 100000
>> > > >> >>>>>>>>>> 14/09/10 09:17:48 INFO util.TimerUtil: SCAN
>> > > >> >> NORMAL:2646ms,Counter:2
>> > > >> >>>> ->
>> > > >> >>>>>>>>>> Caching 100
>> > > >> >>>>>>>>>>
>> > > >> >>>>>>>>>> 14/09/10 09:17:58 INFO hbase.HbaseScanTest: NUM ROWS
>> 100000
>> > > >> >>>>>>>>>> 14/09/10 09:17:58 INFO util.TimerUtil: SCAN
>> > > >> >> NORMAL:3903ms,Counter:2
>> > > >> >>>> ->
>> > > >> >>>>>>>>>> Caching 1000
>> > > >> >>>>>>>>>>
>> > > >> >>>>>>>>>> Parallel scan works much worse than simple scan,, and I
>> > don't
>> > > >> know
>> > > >> >>>> why
>> > > >> >>>>>>>>>> it's
>> > > >> >>>>>>>>>> so fast,, it's really much faster than execute an
>> "count"
>> > > from
>> > > >> >> hbase
>> > > >> >>>>>>>>>> shell,
>> > > >> >>>>>>>>>> what it doesn't look pretty notmal. The only time that
>> it
>> > > works
>> > > >> >>>> better
>> > > >> >>>>>>>>>> parallel is when I execute a normal scan with caching 1.
>> > > >> >>>>>>>>>>
>> > > >> >>>>>>>>>> Any clue about it?
>> > > >> >>>>>>>>>>
>> > > >> >>>>>>>>>
>> > > >> >>>>>>>>>
>> > > >> >>>>>>>
>> > > >> >>>>>>>
>> > > >> >>>>>>
>> > > >> >>>>
>> > > >> >>>>
>> > > >> >>
>> > > >> >>
>> > > >>
>> > > >>
>> > > >
>> > >
>> >
>>
>
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.hbase.filter.FilterList;

/**
 * 
 * Class to store the configuration for a parallel scan. By default batch is 10, caching 100 and get
 * the max number of versions.
 * 
 */
public final class ScanConfiguration {
    private byte[] table;
    // TODO extra parameters if scan is updated to do scan by ranges and just one thread per
    // regionServer.
    // private byte[] startRow;
    // private byte[] stopRow;
    // private byte[] regionName;
    private FilterList filters;
    private List<byte[]> families;
    private List<Qualifier> qualifiers;
    private int batch;
    private int caching;
    private int versions;
    private Long from;
    private Long until;

    private static final int DEFAULT_CACHING = 10;
    private static final int DEFAULT_BATCHING = 100;

    /**
     * Constructor.
     */
    public ScanConfiguration() {
        super();
        filters = null;
        families = new ArrayList<byte[]>();
        qualifiers = new ArrayList<Qualifier>();
        batch = DEFAULT_BATCHING;
        caching = DEFAULT_CACHING;
        versions = 0;
    }

    /**
     * @return the table
     */
    public byte[] getTable() {
        return table;
    }

    /**
     * @param pTable
     *            the table to set
     */
    public void setTable(final byte[] pTable) {
        table = pTable;
    }

    /**
     * @return the filters
     */
    public FilterList getFilters() {
        return filters;
    }

    /**
     * @param pFilters
     *            the filters to set
     */
    public void setFilters(final FilterList pFilters) {
        filters = pFilters;
    }

    /**
     * @return the families
     */
    public List<byte[]> getFamilies() {
        return families;
    }

    /**
     * @param pFamilies
     *            the families to set
     */
    public void setFamilies(final List<byte[]> pFamilies) {
        families = pFamilies;
    }

    /**
     * @return the qualifiers
     */
    public List<Qualifier> getQualifiers() {
        return qualifiers;
    }

    /**
     * @param pQualifiers
     *            the qualifiers to set
     */
    public void setQualifiers(final List<Qualifier> pQualifiers) {
        qualifiers = pQualifiers;
    }

    /**
     * @return the batch
     */
    public int getBatch() {
        return batch;
    }

    /**
     * @param pBatch
     *            the batch to set
     */
    public void setBatch(final int pBatch) {
        batch = pBatch;
    }

    /**
     * @return the caching
     */
    public int getCaching() {
        return caching;
    }

    /**
     * @param pCaching
     *            the caching to set
     */
    public void setCaching(final int pCaching) {
        caching = pCaching;
    }

    /**
     * @return the versions
     */
    public int getVersions() {
        return versions;
    }

    /**
     * @param pVersions
     *            the versions to set
     */
    public void setVersions(final int pVersions) {
        versions = pVersions;
    }

    /**
     * @return the from
     */
    public Long getFrom() {
        return from;
    }

    /**
     * @param pFrom
     *            the from to set
     */
    public void setFrom(final Long pFrom) {
        from = pFrom;
    }

    /**
     * @return the until
     */
    public Long getUntil() {
        return until;
    }

    /**
     * @param pUntil
     *            the until to set
     */
    public void setUntil(final Long pUntil) {
        until = pUntil;
    }

}
public class Qualifier {
    private byte[] family;
    private byte[] qualifier;

    /**
     * @return the family
     */
    public byte[] getFamily() {
        return family;
    }

    /**
     * @param pFamily
     *            the family to set
     */
    public void setFamily(byte[] pFamily) {
        family = pFamily;
    }

    /**
     * @return the qualifier
     */
    public byte[] getQualifier() {
        return qualifier;
    }

    /**
     * @param pQualifier
     *            the qualifier to set
     */
    public void setQualifier(byte[] pQualifier) {
        qualifier = pQualifier;
    }

}
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;

import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;


/**
 * Class which execute an thread to do an partial scan.
 * 
 */
public final class RegionScanner implements Callable<List<Result>> {
    private ScanConfiguration configuration;
    private byte[] startKey;
    private byte[] stopKey;
    private byte[] region;

    /**
     * Constructor.
     * 
     * @param pStartKey
     *            start key to the scan
     * @param pStopKey
     *            stop key to the scan
     * @param pConfiguration
     *            configuration to the scan.
     */
    public RegionScanner(final byte[] pStartKey, final byte[] pStopKey,
            final ScanConfiguration pConfiguration) {
        configuration = pConfiguration;
        startKey = pStartKey;
        stopKey = pStopKey;
    }

    @Override
    public List<Result> call() throws Exception {
        HConnection connection = HConnectionManager.createConnection(HBaseConfiguration.create());
        HTableInterface table = connection.getTable(configuration.getTable());
        ResultScanner resultScanner = table.getScanner(buildScanner());

        List<Result> results = new ArrayList<Result>();
        for (Result result : resultScanner) {
            results.add(result);
        }
	resultScanner.close();
        connection.close();
        table.close();

        return results;
    }

    private Scan buildScanner() throws IOException {
        Scan scan = new Scan(startKey, stopKey);
        scan.setBatch(configuration.getBatch());
        scan.setCaching(configuration.getCaching());

        if (configuration.getVersions() != 0) {
            scan.setMaxVersions(configuration.getVersions());
        } else {
            scan.setMaxVersions();
        }

        scan.setCacheBlocks(false);
        for (Qualifier qualifier : configuration.getQualifiers()) {
            scan.addColumn(qualifier.getFamily(), qualifier.getQualifier());
        }

        for (byte[] family : configuration.getFamilies()) {
            scan.addFamily(family);
        }

        if (configuration.getFilters() != null
                && !configuration.getFilters().getFilters().isEmpty()) {
            scan.setFilter(configuration.getFilters());
        }

        if (configuration.getFrom() != null && configuration.getUntil() != null) {
            scan.setTimeRange(configuration.getFrom(), configuration.getUntil());
        }

        return scan;
    }

}
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.Result;
import org.apache.log4j.Logger;


/**
 * 
 * Utility to execute scans in parallel. It throws an thread for region. That version just has
 * complete scan of the table. It doesn't permit ranges.
 * 
 */
public final class DistributedScan {

    private ScanConfiguration scanConfiguration;
    /**
     * Instance of the log object to show the DistributedScan selecting.
     */
    private static final Logger LOG = Logger.getLogger(DistributedScan.class);

    /**
     * Constructor with configuration for the Scan.
     * 
     * @param pScanConfiguration
     *            configuration.
     */
    public DistributedScan(final ScanConfiguration pScanConfiguration) {
        super();
        scanConfiguration = pScanConfiguration;
    }

    /**
     * Method to execute a distributed scan.
     * 
     * @return a list of the result from HBase.
     */
    public List<Result> executeDistributedScan() {
        List<Result> results = new ArrayList<Result>();
        List<RegionScanner> tasks = generatePartitions();

        try {
            ExecutorService executor = Executors.newFixedThreadPool(tasks.size());
            final List<Future<List<Result>>> resultFromParts = executor.invokeAll(tasks);
            executor.shutdown();

            awaitThreadsTermination(executor);
            for (Future<List<Result>> future : resultFromParts) {
                results.addAll(future.get());
            }
        } catch (InterruptedException e) {
            LOG.error("Error executing thread", e);
        } catch (ExecutionException e) {
            LOG.error("Error executing scan in Hbase some thread", e);
        }
        return results;

    }

    private List<RegionScanner> generatePartitions() {
        List<RegionScanner> regionScanners = new ArrayList<RegionScanner>();
        byte[] startKey;
        byte[] stopKey;
        HConnection connection = null;
        HBaseAdmin hbaseAdmin = null;
        try {
            connection = HConnectionManager.createConnection(HBaseConfiguration.create());
            hbaseAdmin = new HBaseAdmin(connection);
            List<HRegionInfo> regions = hbaseAdmin.getTableRegions(scanConfiguration.getTable());

            if (LOG.isDebugEnabled()) {
                LOG.debug("There're " + regions.size()
                        + " regions for the table, we're going to execute the scan"
                        + " in that number of threads.");
            }

            RegionScanner regionScanner = null;
            for (HRegionInfo region : regions) {

                startKey = region.getStartKey();
                stopKey = region.getEndKey();

                regionScanner = new RegionScanner(startKey, stopKey, scanConfiguration);
                if (regionScanner != null) {
                    regionScanners.add(regionScanner);
                }
            }
        } catch (IOException e) {
            throw new BidoopException("Error generating the partitions for the distributed scan", e);
        } finally {
            try {
                if (connection != null) {
                    connection.close();
                }
                if (hbaseAdmin != null) {
                    hbaseAdmin.close();
                }
            } catch (IOException e) {
                LOG.error("Error closing connection in hbase", e);
            }
        }

        return regionScanners;
    }

    private void awaitThreadsTermination(final ExecutorService executor) {
        long ts = DateUtil.getNow().getTime();
        while (!executor.isTerminated()) {
            if ((DateUtil.getNow().getTime() - ts) > DateUtil.MS_SECOND) {
                LOG.debug("Waiting for thread termination");
                ts = DateUtil.getNow().getTime();
            }
        }
        LOG.info("All threads have finished");
    }
}

Reply via email to