Thanks for your answers. I ended up by extending
org.apache.hadoop.hbase.mapreduce.TableInputFormat and overwriting the
split method and passing it to the map reduce job the following way:

public class TimeRangeTableInputFormat extends TableInputFormat {
        
        @Override
        public List<InputSplit> getSplits( JobContext context ) throws 
IOException
        {
                try {
                        List<InputSplit> splits = new ArrayList<InputSplit>();
                        Scan scan = getScan();

                        //startrow and endrow must be a string as bytes in the 
format 2013-01-28
                        byte startRow[] = scan.getStartRow();
                        byte stopRow[] = scan.getStopRow();

                        //For each date in the span, we are going to create a 
new scan object
                        SimpleDateFormat dateFormatter = new 
SimpleDateFormat("yyyy-MM-dd");
                        Date startDate = dateFormatter.parse( Bytes.toString( 
startRow ) );
                        Date endDate = dateFormatter.parse( Bytes.toString( 
stopRow ) );

                        for( Date iterDate = startDate; 
iterDate.compareTo(endDate) <= 0;
iterDate = Utils.addDays( iterDate, 1 ) ) {

                                //since the dates in the row keys are stored 
using md5
                                byte[] md5Key = Utils.md5( 
dateFormatter.format(iterDate) );
                                int md5Length = 16;
                                int longLength = 8;
                                
                                byte[] subStartRow = Bytes.padTail( md5Key, 
longLength ); //append
"0 0 0 0 0 0 0 0"
                                byte[] subEndRow   = Bytes.padTail( md5Key, 
longLength );
                                subEndRow[md5Length-1]++; //last byte gets 
counted up
                                
                                scan.setStartRow(subStartRow);
                                scan.setStopRow(subEndRow);
                                setScan(scan);
                                
                                for (InputSplit subSplit : 
super.getSplits(context))
                                        splits.add((InputSplit) 
ReflectionUtils.copy( context.getConfiguration(),
                                                                              
(TableSplit) subSplit, new TableSplit() ) );
                        }
                        
                        return splits;
                        
                } catch( Exception e ) {
                        e.printStackTrace();
                        return null;
                }
        }

}

This way I get a new scan object for every day. And although I'm using
md5 keys as a prefix in my rowkeys I can still scan ranges this way.

Some questions remain:
1. What is your opinion about this approach?
2. @Nick: I've read somewhere that a filter list would be less
efficient that overwriting the split method. What do you think?


2013/2/26 Nick Dimiduk <[email protected]>:
> Hi Paul,
>
> You want to run multiple scans so that you can filter the previous scan
> results? Am I correct in my understanding of your objective?
>
> First, I suggest you use the PrefixFilter [0] instead of constructing the
> rowkey prefix manually. This looks something like:
>
> byte[] md5Key = Utils.md5( "2013-01-07" );
> Scan scan = new Scan(md5Key);
> scan.setFilter(new PrefixFilter(md5Key));
>
> Yes, that's a bit redundant, but setting the startkey explicitly will save
> you some unnecessary processing.
>
> This map reduce job works fine but this is just one scan job for this map
>> reduce task. What do I have to do to pass multiple scans?
>
>
> Do you mean processing on multiple dates? In that case, what you really
> want is a full (unbounded) table scan. Since date is the first part of your
> compound rowkey, there's no prefix and no need for a filter, just use new
> Scan().
>
> In general, you can use multiple filters in a given Scan (or Get). See the
> FilterList [1] for details.
>
> Does this help?
> Nick
>
> [0]:
> http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/filter/PrefixFilter.html
> [1]:
> http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/filter/FilterList.html
>
> On Tue, Feb 26, 2013 at 5:41 AM, Paul van Hoven <
> [email protected]> wrote:
>
>> My rowkeys look something like this:
>>
>> md5( date ) + md5( ip address )
>>
>> So an example would be
>> md5( "2013-02-08") + md5( "192.168.187.2")
>>
>> For one particular date I got several rows. Now I'd like to query
>> different dates, for example "2013-01-01" and "2013-02-01" and some
>> other. Additionally I'd like to perform this or these scans in a map
>> reduce job.
>>
>> Currently my map reduce job looks like this:
>>
>> Configuration config = HBaseConfiguration.create();
>> Job job = new Job(config,"ToyJob");
>> job.setJarByClass( PlayWithMapReduce.class );
>>
>> byte[] md5Key = Utils.md5( "2013-01-07" );
>> int md5Length = 16;
>> int longLength = 8;
>>
>> byte[] startRow = Bytes.padTail( md5Key, longLength ); //append "0 0 0
>> 0 0 0 0 0"
>> byte[] endRow = Bytes.padTail( md5Key, longLength );
>> endRow[md5Length-1]++; //last byte gets counted up
>>
>> Scan scan = new Scan( startRow, endRow );
>> scan.setCaching(500);
>> scan.setCacheBlocks(false);
>>
>> Filter f = new SingleColumnValueFilter( Bytes.toBytes("CF"),
>> Bytes.toBytes("creativeId"), CompareOp.EQUAL, Bytes.toBytes("100") );
>> scan.setFilter(f);
>>
>> String tableName = "ToyDataTable";
>> TableMapReduceUtil.initTableMapperJob( tableName, scan, Mapper.class,
>> null, null, job);
>>
>> This map reduce job works fine but this is just one scan job for this
>> map reduce task. What do I have to do to pass multiple scans? Or do
>> you have any other suggestions on how to achieve that goal? The
>> constraint would be that it must be possible to combine it with map
>> reduce.
>>

Reply via email to