I attach the code than I'm executing. I don't have accss to the generator
to HBase.
In the last benchmark, simple scan takes about 4 times less than this
version.
With that version is available just to do complete scans.
I have been trying a complete scan of a HTable with 100.000 rows and it
takes less than one second, is it not too fast???
2014-09-14 20:21 GMT+02:00 Guillermo Ortiz <[email protected]>:
> I don't have the code here. But I created a class RegionScanner, this
> class does a complete scan of a region. So I have to set the start and stop
> keys. the start and stop key are the limits of that region.
>
> El domingo, 14 de septiembre de 2014, Anoop John <[email protected]>
> escribió:
>
> Again full code snippet can better speak.
>>
>> But not getting what u r doing with below code
>>
>> private List<RegionScanner> generatePartitions() {
>> List<RegionScanner> regionScanners = new
>> ArrayList<RegionScanner>();
>> byte[] startKey;
>> byte[] stopKey;
>> HConnection connection = null;
>> HBaseAdmin hbaseAdmin = null;
>> try {
>> connection = HConnectionManager.
>> createConnection(HBaseConfiguration.create());
>> hbaseAdmin = new HBaseAdmin(connection);
>> List<HRegionInfo> regions =
>> hbaseAdmin.getTableRegions(scanConfiguration.getTable());
>> RegionScanner regionScanner = null;
>> for (HRegionInfo region : regions) {
>>
>> startKey = region.getStartKey();
>> stopKey = region.getEndKey();
>>
>> regionScanner = new RegionScanner(startKey, stopKey,
>> scanConfiguration);
>> // regionScanner = createRegionScanner(startKey, stopKey);
>> if (regionScanner != null) {
>> regionScanners.add(regionScanner);
>> }
>> }
>>
>> And I execute the RegionScanner with this:
>> public List<Result> call() throws Exception {
>> HConnection connection =
>> HConnectionManager.
>> createConnection(HBaseConfiguration.create());
>> HTableInterface table =
>> connection.getTable(configuration.getTable());
>>
>> Scan scan = new Scan(startKey, stopKey);
>> scan.setBatch(configuration.getBatch());
>> scan.setCaching(configuration.getCaching());
>> ResultScanner resultScanner = table.getScanner(scan);
>>
>>
>> What is this part?
>> new RegionScanner(startKey, stopKey,
>> scanConfiguration);
>>
>>
>> >>Scan scan = new Scan(startKey, stopKey);
>> scan.setBatch(configuration.
>> getBatch());
>> scan.setCaching(configuration.getCaching());
>> ResultScanner resultScanner = table.getScanner(scan);
>>
>>
>> And not setting start and stop rows to this Scan object? !!
>>
>>
>> Sorry If I missed some parts from ur code.
>>
>> -Anoop-
>>
>>
>> On Sun, Sep 14, 2014 at 2:54 PM, Guillermo Ortiz <[email protected]>
>> wrote:
>>
>> > I don't have the code here,, but I'll put the code in a couple of days.
>> I
>> > have to check the executeservice again! I don't remember exactly how I
>> did.
>> >
>> > I'm using Hbase 0.98.
>> >
>> > El domingo, 14 de septiembre de 2014, lars hofhansl <[email protected]>
>> > escribió:
>> >
>> > > What specific version of 0.94 are you using?
>> > >
>> > > In general, if you have multiple spindles (disks) and/or multiple CPU
>> > > cores at the region server you should benefits from keeping multiple
>> > region
>> > > server handler threads busy. I have experimented with this before and
>> > saw a
>> > > close to linear speed up (up to the point where all disks/core were
>> > busy).
>> > > Obviously this also assuming this is the only load you throw at the
>> > servers
>> > > at this point.
>> > >
>> > > Can you post your complete code to pastebin? Maybe even with some
>> code to
>> > > seed the data?
>> > > How do you run your callables? Did you configure the ExecuteService
>> > > correctly (assuming you use one to run your callables)?
>> > >
>> > > Then we can run it and have a look.
>> > >
>> > > Thanks.
>> > >
>> > > -- Lars
>> > >
>> > >
>> > > ----- Original Message -----
>> > > From: Guillermo Ortiz <[email protected] <javascript:;>>
>> > > To: "[email protected] <javascript:;>" <[email protected]
>> > > <javascript:;>>
>> > > Cc:
>> > > Sent: Saturday, September 13, 2014 4:49 PM
>> > > Subject: Re: Scan vs Parallel scan.
>> > >
>> > > What am I missing??
>> > >
>> > >
>> > >
>> > >
>> > > 2014-09-12 16:05 GMT+02:00 Guillermo Ortiz <[email protected]
>> > > <javascript:;>>:
>> > >
>> > > > For an partial scan, I guess that I call to the RS to get data, it
>> > starts
>> > > > looking in the store files and recollecting the data. (It doesn't
>> write
>> > > to
>> > > > the blockcache in both cases). It has ready the data and it gives to
>> > the
>> > > > client the data step by step, I mean,,, it depends the caching and
>> > > batching
>> > > > parameters.
>> > > >
>> > > > Big differences that I see...
>> > > > I'm opening more connections to the Table, one for Region.
>> > > >
>> > > > I should check the single table scan, it looks like it does partial
>> > scans
>> > > > sequentially. Since you can see on the HBase Master how the request
>> > > > increase one after another, not all in the same time.
>> > > >
>> > > > 2014-09-12 15:23 GMT+02:00 Michael Segel <[email protected]
>> > > <javascript:;>>:
>> > > >
>> > > >> It doesn’t matter which RS, but that you have 1 thread for each
>> > region.
>> > > >>
>> > > >> So for each thread, what’s happening.
>> > > >> Step by step, what is the code doing.
>> > > >>
>> > > >> Now you’re comparing this against a single table scan, right?
>> > > >> What’s happening in the table scan…?
>> > > >>
>> > > >>
>> > > >> On Sep 12, 2014, at 2:04 PM, Guillermo Ortiz <[email protected]
>> > > <javascript:;>>
>> > > >> wrote:
>> > > >>
>> > > >> > Right, My table for example has keys between 0-9. in three
>> regions
>> > > >> > 0-2,3-7,7-9
>> > > >> > I lauch three partial scans in parallel. The scans that I'm
>> > executing
>> > > >> are:
>> > > >> > scan(0,2), scan(3,7), scan(7,9).
>> > > >> > Each region is if a different RS, so each thread goes to
>> different
>> > RS.
>> > > >> It's
>> > > >> > not exactly like that, but on the benchmark case it's like it's
>> > > working.
>> > > >> >
>> > > >> > Really the code will execute a thread for each Region not for
>> each
>> > > >> > RegionServer. But in the test I only have two regions for
>> > > regionServer.
>> > > >> I
>> > > >> > dont' think that's an important point, there're two threads for
>> RS.
>> > > >> >
>> > > >> > 2014-09-12 14:48 GMT+02:00 Michael Segel <
>> [email protected]
>> > > <javascript:;>>:
>> > > >> >
>> > > >> >> Ok, lets again take a step back…
>> > > >> >>
>> > > >> >> So you are comparing your partial scan(s) against a full table
>> > scan?
>> > > >> >>
>> > > >> >> If I understood your question, you launch 3 partial scans where
>> you
>> > > set
>> > > >> >> the start row and then end row of each scan, right?
>> > > >> >>
>> > > >> >> On Sep 12, 2014, at 9:16 AM, Guillermo Ortiz <
>> [email protected]
>> > > <javascript:;>>
>> > > >> wrote:
>> > > >> >>
>> > > >> >>> Okay, then, the partial scan doesn't work as I think.
>> > > >> >>> How could it exceed the limit of a single region if I calculate
>> > the
>> > > >> >> limits?
>> > > >> >>>
>> > > >> >>>
>> > > >> >>> The only bad point that I see it's that If a region server has
>> > three
>> > > >> >>> regions of the same table, I'm executing three partial scans
>> > about
>> > > >> this
>> > > >> >> RS
>> > > >> >>> and they could compete for resources (network, etc..) on this
>> > node.
>> > > >> It'd
>> > > >> >> be
>> > > >> >>> better to have one thread for RS. But, that doesn't answer your
>> > > >> >> questions.
>> > > >> >>>
>> > > >> >>> I keep thinking...
>> > > >> >>>
>> > > >> >>> 2014-09-12 9:40 GMT+02:00 Michael Segel <
>> > [email protected]
>> > > <javascript:;>>:
>> > > >> >>>
>> > > >> >>>> Hi,
>> > > >> >>>>
>> > > >> >>>> I wanted to take a step back from the actual code and to stop
>> and
>> > > >> think
>> > > >> >>>> about what you are doing and what HBase is doing under the
>> > covers.
>> > > >> >>>>
>> > > >> >>>> So in your code, you are asking HBase to do 3 separate scans
>> and
>> > > then
>> > > >> >> you
>> > > >> >>>> take the result set back and join it.
>> > > >> >>>>
>> > > >> >>>> What does HBase do when it does a range scan?
>> > > >> >>>> What happens when that range scan exceeds a single region?
>> > > >> >>>>
>> > > >> >>>> If you answer those questions… you’ll have your answer.
>> > > >> >>>>
>> > > >> >>>> HTH
>> > > >> >>>>
>> > > >> >>>> -Mike
>> > > >> >>>>
>> > > >> >>>> On Sep 12, 2014, at 8:34 AM, Guillermo Ortiz <
>> > [email protected]
>> > > <javascript:;>>
>> > > >> >> wrote:
>> > > >> >>>>
>> > > >> >>>>> It's not all the code, I set things like these as well:
>> > > >> >>>>> scan.setMaxVersions();
>> > > >> >>>>> scan.setCacheBlocks(false);
>> > > >> >>>>> ...
>> > > >> >>>>>
>> > > >> >>>>> 2014-09-12 9:33 GMT+02:00 Guillermo Ortiz <
>> [email protected]
>> > > <javascript:;>>:
>> > > >> >>>>>
>> > > >> >>>>>> yes, that is. I have changed the HBase version to 0.98
>> > > >> >>>>>>
>> > > >> >>>>>> I got the start and stop keys with this method:
>> > > >> >>>>>> private List<RegionScanner> generatePartitions() {
>> > > >> >>>>>> List<RegionScanner> regionScanners = new
>> > > >> >>>>>> ArrayList<RegionScanner>();
>> > > >> >>>>>> byte[] startKey;
>> > > >> >>>>>> byte[] stopKey;
>> > > >> >>>>>> HConnection connection = null;
>> > > >> >>>>>> HBaseAdmin hbaseAdmin = null;
>> > > >> >>>>>> try {
>> > > >> >>>>>> connection = HConnectionManager.
>> > > >> >>>>>> createConnection(HBaseConfiguration.create());
>> > > >> >>>>>> hbaseAdmin = new HBaseAdmin(connection);
>> > > >> >>>>>> List<HRegionInfo> regions =
>> > > >> >>>>>> hbaseAdmin.getTableRegions(scanConfiguration.getTable());
>> > > >> >>>>>> RegionScanner regionScanner = null;
>> > > >> >>>>>> for (HRegionInfo region : regions) {
>> > > >> >>>>>>
>> > > >> >>>>>> startKey = region.getStartKey();
>> > > >> >>>>>> stopKey = region.getEndKey();
>> > > >> >>>>>>
>> > > >> >>>>>> regionScanner = new RegionScanner(startKey,
>> > stopKey,
>> > > >> >>>>>> scanConfiguration);
>> > > >> >>>>>> // regionScanner =
>> createRegionScanner(startKey,
>> > > >> >>>> stopKey);
>> > > >> >>>>>> if (regionScanner != null) {
>> > > >> >>>>>> regionScanners.add(regionScanner);
>> > > >> >>>>>> }
>> > > >> >>>>>> }
>> > > >> >>>>>>
>> > > >> >>>>>> And I execute the RegionScanner with this:
>> > > >> >>>>>> public List<Result> call() throws Exception {
>> > > >> >>>>>> HConnection connection =
>> > > >> >>>>>>
>> > HConnectionManager.createConnection(HBaseConfiguration.create());
>> > > >> >>>>>> HTableInterface table =
>> > > >> >>>>>> connection.getTable(configuration.getTable());
>> > > >> >>>>>>
>> > > >> >>>>>> Scan scan = new Scan(startKey, stopKey);
>> > > >> >>>>>> scan.setBatch(configuration.getBatch());
>> > > >> >>>>>> scan.setCaching(configuration.getCaching());
>> > > >> >>>>>> ResultScanner resultScanner = table.getScanner(scan);
>> > > >> >>>>>>
>> > > >> >>>>>> List<Result> results = new ArrayList<Result>();
>> > > >> >>>>>> for (Result result : resultScanner) {
>> > > >> >>>>>> results.add(result);
>> > > >> >>>>>> }
>> > > >> >>>>>>
>> > > >> >>>>>> connection.close();
>> > > >> >>>>>> table.close();
>> > > >> >>>>>>
>> > > >> >>>>>> return results;
>> > > >> >>>>>> }
>> > > >> >>>>>>
>> > > >> >>>>>> They implement Callable.
>> > > >> >>>>>>
>> > > >> >>>>>>
>> > > >> >>>>>> 2014-09-12 9:26 GMT+02:00 Michael Segel <
>> > > [email protected] <javascript:;>
>> > > >> >:
>> > > >> >>>>>>
>> > > >> >>>>>>> Lets take a step back….
>> > > >> >>>>>>>
>> > > >> >>>>>>> Your parallel scan is having the client create N threads
>> where
>> > > in
>> > > >> >> each
>> > > >> >>>>>>> thread, you’re doing a partial scan of the table where each
>> > > >> partial
>> > > >> >>>> scan
>> > > >> >>>>>>> takes the first and last row of each region?
>> > > >> >>>>>>>
>> > > >> >>>>>>> Is that correct?
>> > > >> >>>>>>>
>> > > >> >>>>>>> On Sep 12, 2014, at 7:36 AM, Guillermo Ortiz <
>> > > >> [email protected] <javascript:;>>
>> > > >> >>>>>>> wrote:
>> > > >> >>>>>>>
>> > > >> >>>>>>>> I was checking a little bit more about,, I checked the
>> > cluster
>> > > >> and
>> > > >> >>>> data
>> > > >> >>>>>>> is
>> > > >> >>>>>>>> store in three different regions servers, each one in a
>> > > >> differente
>> > > >> >>>> node.
>> > > >> >>>>>>>> So, I guess the threads go to different hard-disks.
>> > > >> >>>>>>>>
>> > > >> >>>>>>>> If someone has an idea or suggestion.. why it's faster a
>> > single
>> > > >> scan
>> > > >> >>>>>>> than
>> > > >> >>>>>>>> this implementation. I based on this implementation
>> > > >> >>>>>>>> https://github.com/zygm0nt/hbase-distributed-search
>> > > >> >>>>>>>>
>> > > >> >>>>>>>> 2014-09-11 12:05 GMT+02:00 Guillermo Ortiz <
>> > > [email protected] <javascript:;>
>> > > >> >:
>> > > >> >>>>>>>>
>> > > >> >>>>>>>>> I'm working with HBase 0.94 for this case,, I'll try with
>> > > 0.98,
>> > > >> >>>>>>> although
>> > > >> >>>>>>>>> there is not difference.
>> > > >> >>>>>>>>> I disabled the table and disabled the blockcache for that
>> > > family
>> > > >> >> and
>> > > >> >>>> I
>> > > >> >>>>>>> put
>> > > >> >>>>>>>>> scan.setBlockcache(false) as well for both cases.
>> > > >> >>>>>>>>>
>> > > >> >>>>>>>>> I think that it's not possible that I executing an
>> complete
>> > > scan
>> > > >> >> for
>> > > >> >>>>>>> each
>> > > >> >>>>>>>>> thread since my data are the type:
>> > > >> >>>>>>>>> 000001 f:q value=1
>> > > >> >>>>>>>>> 000002 f:q value=2
>> > > >> >>>>>>>>> 000003 f:q value=3
>> > > >> >>>>>>>>> ...
>> > > >> >>>>>>>>>
>> > > >> >>>>>>>>> I add all the values and get the same result on a single
>> > scan
>> > > >> than
>> > > >> >> a
>> > > >> >>>>>>>>> distributed, so, I guess that DistributedScan did well.
>> > > >> >>>>>>>>> The count from the hbase shell takes about 10-15seconds,
>> I
>> > > don't
>> > > >> >>>>>>> remember,
>> > > >> >>>>>>>>> but like 4x of the scan time.
>> > > >> >>>>>>>>> I'm not using any filter for the scans.
>> > > >> >>>>>>>>>
>> > > >> >>>>>>>>> This is the way I calculate number of regions/scans
>> > > >> >>>>>>>>> private List<RegionScanner> generatePartitions() {
>> > > >> >>>>>>>>> List<RegionScanner> regionScanners = new
>> > > >> >>>>>>>>> ArrayList<RegionScanner>();
>> > > >> >>>>>>>>> byte[] startKey;
>> > > >> >>>>>>>>> byte[] stopKey;
>> > > >> >>>>>>>>> HConnection connection = null;
>> > > >> >>>>>>>>> HBaseAdmin hbaseAdmin = null;
>> > > >> >>>>>>>>> try {
>> > > >> >>>>>>>>> connection =
>> > > >> >>>>>>>>>
>> > > >> HConnectionManager.createConnection(HBaseConfiguration.create());
>> > > >> >>>>>>>>> hbaseAdmin = new HBaseAdmin(connection);
>> > > >> >>>>>>>>> List<HRegionInfo> regions =
>> > > >> >>>>>>>>> hbaseAdmin.getTableRegions(scanConfiguration.getTable());
>> > > >> >>>>>>>>> RegionScanner regionScanner = null;
>> > > >> >>>>>>>>> for (HRegionInfo region : regions) {
>> > > >> >>>>>>>>>
>> > > >> >>>>>>>>> startKey = region.getStartKey();
>> > > >> >>>>>>>>> stopKey = region.getEndKey();
>> > > >> >>>>>>>>>
>> > > >> >>>>>>>>> regionScanner = new RegionScanner(startKey,
>> > > stopKey,
>> > > >> >>>>>>>>> scanConfiguration);
>> > > >> >>>>>>>>> // regionScanner =
>> createRegionScanner(startKey,
>> > > >> >>>>>>> stopKey);
>> > > >> >>>>>>>>> if (regionScanner != null) {
>> > > >> >>>>>>>>> regionScanners.add(regionScanner);
>> > > >> >>>>>>>>> }
>> > > >> >>>>>>>>> }
>> > > >> >>>>>>>>>
>> > > >> >>>>>>>>> I did some test for a tiny table and I think that the
>> range
>> > > for
>> > > >> >> each
>> > > >> >>>>>>> scan
>> > > >> >>>>>>>>> works fine. Although, I though that it was interesting
>> that
>> > > the
>> > > >> >> time
>> > > >> >>>>>>> when I
>> > > >> >>>>>>>>> execute distributed scan is about 6x.
>> > > >> >>>>>>>>>
>> > > >> >>>>>>>>> I'm going to check about the hard disks, but I think that
>> > ti's
>> > > >> >> right.
>> > > >> >>>>>>>>>
>> > > >> >>>>>>>>>
>> > > >> >>>>>>>>>
>> > > >> >>>>>>>>>
>> > > >> >>>>>>>>> 2014-09-11 7:50 GMT+02:00 lars hofhansl <
>> [email protected]
>> > > <javascript:;>>:
>> > > >> >>>>>>>>>
>> > > >> >>>>>>>>>> Which version of HBase?
>> > > >> >>>>>>>>>> Can you show us the code?
>> > > >> >>>>>>>>>>
>> > > >> >>>>>>>>>>
>> > > >> >>>>>>>>>> Your parallel scan with caching 100 takes about 6x as
>> long
>> > as
>> > > >> the
>> > > >> >>>>>>> single
>> > > >> >>>>>>>>>> scan, which is suspicious because you say you have 6
>> > regions.
>> > > >> >>>>>>>>>> Are you sure you're not accidentally scanning all the
>> data
>> > in
>> > > >> each
>> > > >> >>>> of
>> > > >> >>>>>>>>>> your parallel scans?
>> > > >> >>>>>>>>>>
>> > > >> >>>>>>>>>> -- Lars
>> > > >> >>>>>>>>>>
>> > > >> >>>>>>>>>>
>> > > >> >>>>>>>>>>
>> > > >> >>>>>>>>>> ________________________________
>> > > >> >>>>>>>>>> From: Guillermo Ortiz <[email protected]
>> > <javascript:;>>
>> > > >> >>>>>>>>>> To: "[email protected] <javascript:;>" <
>> > > [email protected] <javascript:;>>
>> > > >> >>>>>>>>>> Sent: Wednesday, September 10, 2014 1:40 AM
>> > > >> >>>>>>>>>> Subject: Scan vs Parallel scan.
>> > > >> >>>>>>>>>>
>> > > >> >>>>>>>>>>
>> > > >> >>>>>>>>>> Hi,
>> > > >> >>>>>>>>>>
>> > > >> >>>>>>>>>> I developed an distributed scan, I create an thread for
>> > each
>> > > >> >> region.
>> > > >> >>>>>>> After
>> > > >> >>>>>>>>>> that, I've tried to get some times Scan vs
>> DistributedScan.
>> > > >> >>>>>>>>>> I have disabled blockcache in my table. My cluster has 3
>> > > region
>> > > >> >>>>>>> servers
>> > > >> >>>>>>>>>> with 2 regions each one, in total there are 100.000 rows
>> > and
>> > > >> >>>> execute a
>> > > >> >>>>>>>>>> complete scan.
>> > > >> >>>>>>>>>>
>> > > >> >>>>>>>>>> My partitions are
>> > > >> >>>>>>>>>> -01666 -> request 16665
>> > > >> >>>>>>>>>> 016666-033332 -> request 16666
>> > > >> >>>>>>>>>> 033332-049998 -> request 16666
>> > > >> >>>>>>>>>> 049998-066664 -> request 16666
>> > > >> >>>>>>>>>> 066664-083330 -> request 16666
>> > > >> >>>>>>>>>> 083330- -> request 16671
>> > > >> >>>>>>>>>>
>> > > >> >>>>>>>>>>
>> > > >> >>>>>>>>>> 14/09/10 09:15:47 INFO hbase.HbaseScanTest: NUM ROWS
>> 100000
>> > > >> >>>>>>>>>> 14/09/10 09:15:47 INFO util.TimerUtil: SCAN
>> > > >> >>>>>>> PARALLEL:22089ms,Counter:2 ->
>> > > >> >>>>>>>>>> Caching 10
>> > > >> >>>>>>>>>>
>> > > >> >>>>>>>>>> 14/09/10 09:16:04 INFO hbase.HbaseScanTest: NUM ROWS
>> 100000
>> > > >> >>>>>>>>>> 14/09/10 09:16:04 INFO util.TimerUtil: SCAN
>> > > >> >>>>>>> PARALJEL:16598ms,Counter:2 ->
>> > > >> >>>>>>>>>> Caching 100
>> > > >> >>>>>>>>>>
>> > > >> >>>>>>>>>> 14/09/10 09:16:22 INFO hbase.HbaseScanTest: NUM ROWS
>> 100000
>> > > >> >>>>>>>>>> 14/09/10 09:16:22 INFO util.TimerUtil: SCAN
>> > > >> >>>>>>> PARALLEL:16497ms,Counter:2 ->
>> > > >> >>>>>>>>>> Caching 1000
>> > > >> >>>>>>>>>>
>> > > >> >>>>>>>>>> 14/09/10 09:17:41 INFO hbase.HbaseScanTest: NUM ROWS
>> 100000
>> > > >> >>>>>>>>>> 14/09/10 09:17:41 INFO util.TimerUtil: SCAN
>> > > >> >> NORMAL:68288ms,Counter:2
>> > > >> >>>>>>> ->
>> > > >> >>>>>>>>>> Caching 1
>> > > >> >>>>>>>>>>
>> > > >> >>>>>>>>>> 14/09/10 09:17:48 INFO hbase.HbaseScanTest: NUM ROWS
>> 100000
>> > > >> >>>>>>>>>> 14/09/10 09:17:48 INFO util.TimerUtil: SCAN
>> > > >> >> NORMAL:2646ms,Counter:2
>> > > >> >>>> ->
>> > > >> >>>>>>>>>> Caching 100
>> > > >> >>>>>>>>>>
>> > > >> >>>>>>>>>> 14/09/10 09:17:58 INFO hbase.HbaseScanTest: NUM ROWS
>> 100000
>> > > >> >>>>>>>>>> 14/09/10 09:17:58 INFO util.TimerUtil: SCAN
>> > > >> >> NORMAL:3903ms,Counter:2
>> > > >> >>>> ->
>> > > >> >>>>>>>>>> Caching 1000
>> > > >> >>>>>>>>>>
>> > > >> >>>>>>>>>> Parallel scan works much worse than simple scan,, and I
>> > don't
>> > > >> know
>> > > >> >>>> why
>> > > >> >>>>>>>>>> it's
>> > > >> >>>>>>>>>> so fast,, it's really much faster than execute an
>> "count"
>> > > from
>> > > >> >> hbase
>> > > >> >>>>>>>>>> shell,
>> > > >> >>>>>>>>>> what it doesn't look pretty notmal. The only time that
>> it
>> > > works
>> > > >> >>>> better
>> > > >> >>>>>>>>>> parallel is when I execute a normal scan with caching 1.
>> > > >> >>>>>>>>>>
>> > > >> >>>>>>>>>> Any clue about it?
>> > > >> >>>>>>>>>>
>> > > >> >>>>>>>>>
>> > > >> >>>>>>>>>
>> > > >> >>>>>>>
>> > > >> >>>>>>>
>> > > >> >>>>>>
>> > > >> >>>>
>> > > >> >>>>
>> > > >> >>
>> > > >> >>
>> > > >>
>> > > >>
>> > > >
>> > >
>> >
>>
>
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.filter.FilterList;
/**
*
* Class to store the configuration for a parallel scan. By default batch is 10, caching 100 and get
* the max number of versions.
*
*/
public final class ScanConfiguration {
private byte[] table;
// TODO extra parameters if scan is updated to do scan by ranges and just one thread per
// regionServer.
// private byte[] startRow;
// private byte[] stopRow;
// private byte[] regionName;
private FilterList filters;
private List<byte[]> families;
private List<Qualifier> qualifiers;
private int batch;
private int caching;
private int versions;
private Long from;
private Long until;
private static final int DEFAULT_CACHING = 10;
private static final int DEFAULT_BATCHING = 100;
/**
* Constructor.
*/
public ScanConfiguration() {
super();
filters = null;
families = new ArrayList<byte[]>();
qualifiers = new ArrayList<Qualifier>();
batch = DEFAULT_BATCHING;
caching = DEFAULT_CACHING;
versions = 0;
}
/**
* @return the table
*/
public byte[] getTable() {
return table;
}
/**
* @param pTable
* the table to set
*/
public void setTable(final byte[] pTable) {
table = pTable;
}
/**
* @return the filters
*/
public FilterList getFilters() {
return filters;
}
/**
* @param pFilters
* the filters to set
*/
public void setFilters(final FilterList pFilters) {
filters = pFilters;
}
/**
* @return the families
*/
public List<byte[]> getFamilies() {
return families;
}
/**
* @param pFamilies
* the families to set
*/
public void setFamilies(final List<byte[]> pFamilies) {
families = pFamilies;
}
/**
* @return the qualifiers
*/
public List<Qualifier> getQualifiers() {
return qualifiers;
}
/**
* @param pQualifiers
* the qualifiers to set
*/
public void setQualifiers(final List<Qualifier> pQualifiers) {
qualifiers = pQualifiers;
}
/**
* @return the batch
*/
public int getBatch() {
return batch;
}
/**
* @param pBatch
* the batch to set
*/
public void setBatch(final int pBatch) {
batch = pBatch;
}
/**
* @return the caching
*/
public int getCaching() {
return caching;
}
/**
* @param pCaching
* the caching to set
*/
public void setCaching(final int pCaching) {
caching = pCaching;
}
/**
* @return the versions
*/
public int getVersions() {
return versions;
}
/**
* @param pVersions
* the versions to set
*/
public void setVersions(final int pVersions) {
versions = pVersions;
}
/**
* @return the from
*/
public Long getFrom() {
return from;
}
/**
* @param pFrom
* the from to set
*/
public void setFrom(final Long pFrom) {
from = pFrom;
}
/**
* @return the until
*/
public Long getUntil() {
return until;
}
/**
* @param pUntil
* the until to set
*/
public void setUntil(final Long pUntil) {
until = pUntil;
}
}
public class Qualifier {
private byte[] family;
private byte[] qualifier;
/**
* @return the family
*/
public byte[] getFamily() {
return family;
}
/**
* @param pFamily
* the family to set
*/
public void setFamily(byte[] pFamily) {
family = pFamily;
}
/**
* @return the qualifier
*/
public byte[] getQualifier() {
return qualifier;
}
/**
* @param pQualifier
* the qualifier to set
*/
public void setQualifier(byte[] pQualifier) {
qualifier = pQualifier;
}
}
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
/**
* Class which execute an thread to do an partial scan.
*
*/
public final class RegionScanner implements Callable<List<Result>> {
private ScanConfiguration configuration;
private byte[] startKey;
private byte[] stopKey;
private byte[] region;
/**
* Constructor.
*
* @param pStartKey
* start key to the scan
* @param pStopKey
* stop key to the scan
* @param pConfiguration
* configuration to the scan.
*/
public RegionScanner(final byte[] pStartKey, final byte[] pStopKey,
final ScanConfiguration pConfiguration) {
configuration = pConfiguration;
startKey = pStartKey;
stopKey = pStopKey;
}
@Override
public List<Result> call() throws Exception {
HConnection connection = HConnectionManager.createConnection(HBaseConfiguration.create());
HTableInterface table = connection.getTable(configuration.getTable());
ResultScanner resultScanner = table.getScanner(buildScanner());
List<Result> results = new ArrayList<Result>();
for (Result result : resultScanner) {
results.add(result);
}
resultScanner.close();
connection.close();
table.close();
return results;
}
private Scan buildScanner() throws IOException {
Scan scan = new Scan(startKey, stopKey);
scan.setBatch(configuration.getBatch());
scan.setCaching(configuration.getCaching());
if (configuration.getVersions() != 0) {
scan.setMaxVersions(configuration.getVersions());
} else {
scan.setMaxVersions();
}
scan.setCacheBlocks(false);
for (Qualifier qualifier : configuration.getQualifiers()) {
scan.addColumn(qualifier.getFamily(), qualifier.getQualifier());
}
for (byte[] family : configuration.getFamilies()) {
scan.addFamily(family);
}
if (configuration.getFilters() != null
&& !configuration.getFilters().getFilters().isEmpty()) {
scan.setFilter(configuration.getFilters());
}
if (configuration.getFrom() != null && configuration.getUntil() != null) {
scan.setTimeRange(configuration.getFrom(), configuration.getUntil());
}
return scan;
}
}
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.Result;
import org.apache.log4j.Logger;
/**
*
* Utility to execute scans in parallel. It throws an thread for region. That version just has
* complete scan of the table. It doesn't permit ranges.
*
*/
public final class DistributedScan {
private ScanConfiguration scanConfiguration;
/**
* Instance of the log object to show the DistributedScan selecting.
*/
private static final Logger LOG = Logger.getLogger(DistributedScan.class);
/**
* Constructor with configuration for the Scan.
*
* @param pScanConfiguration
* configuration.
*/
public DistributedScan(final ScanConfiguration pScanConfiguration) {
super();
scanConfiguration = pScanConfiguration;
}
/**
* Method to execute a distributed scan.
*
* @return a list of the result from HBase.
*/
public List<Result> executeDistributedScan() {
List<Result> results = new ArrayList<Result>();
List<RegionScanner> tasks = generatePartitions();
try {
ExecutorService executor = Executors.newFixedThreadPool(tasks.size());
final List<Future<List<Result>>> resultFromParts = executor.invokeAll(tasks);
executor.shutdown();
awaitThreadsTermination(executor);
for (Future<List<Result>> future : resultFromParts) {
results.addAll(future.get());
}
} catch (InterruptedException e) {
LOG.error("Error executing thread", e);
} catch (ExecutionException e) {
LOG.error("Error executing scan in Hbase some thread", e);
}
return results;
}
private List<RegionScanner> generatePartitions() {
List<RegionScanner> regionScanners = new ArrayList<RegionScanner>();
byte[] startKey;
byte[] stopKey;
HConnection connection = null;
HBaseAdmin hbaseAdmin = null;
try {
connection = HConnectionManager.createConnection(HBaseConfiguration.create());
hbaseAdmin = new HBaseAdmin(connection);
List<HRegionInfo> regions = hbaseAdmin.getTableRegions(scanConfiguration.getTable());
if (LOG.isDebugEnabled()) {
LOG.debug("There're " + regions.size()
+ " regions for the table, we're going to execute the scan"
+ " in that number of threads.");
}
RegionScanner regionScanner = null;
for (HRegionInfo region : regions) {
startKey = region.getStartKey();
stopKey = region.getEndKey();
regionScanner = new RegionScanner(startKey, stopKey, scanConfiguration);
if (regionScanner != null) {
regionScanners.add(regionScanner);
}
}
} catch (IOException e) {
throw new BidoopException("Error generating the partitions for the distributed scan", e);
} finally {
try {
if (connection != null) {
connection.close();
}
if (hbaseAdmin != null) {
hbaseAdmin.close();
}
} catch (IOException e) {
LOG.error("Error closing connection in hbase", e);
}
}
return regionScanners;
}
private void awaitThreadsTermination(final ExecutorService executor) {
long ts = DateUtil.getNow().getTime();
while (!executor.isTerminated()) {
if ((DateUtil.getNow().getTime() - ts) > DateUtil.MS_SECOND) {
LOG.debug("Waiting for thread termination");
ts = DateUtil.getNow().getTime();
}
}
LOG.info("All threads have finished");
}
}