Hi Ted,

Following from what you have said, we have edited AggregateClient.java with the 
following modification to the median() method:

...

    // scan the region with median and find it
    Scan scan2 = new Scan(scan);
    // inherit stop row from method parameter
    if (startRow != null)
      scan2.setStartRow(startRow);
    HTable table = new HTable(conf, tableName);
    int cacheSize = scan2.getCaching();
    if (!scan2.getCacheBlocks() || scan2.getCaching() < 2) {
      scan2.setCacheBlocks(true);
      cacheSize = 5;
      scan2.setCaching(cacheSize);
    }

//    log.debug("scan:\t" + scan);
//    log.debug("scan2:\t" + scan2);

    ResultScanner scanner = table.getScanner(scan2);

...

The addition of "if (startRow != null) scan2.setStartRow(startRow);" has fixed 
our NPE/Memory issue and we now see a sensible result successfully returned. 

However, in the process of debugging this, I added some 'log.debug("scan:\t" + 
scan);' statements as you can see in the snippet above. These lines (when 
uncommented) were causing the following exception: 

[sshexec] java.lang.NoSuchMethodError: 
org.codehaus.jackson.map.ObjectMapper.writeValueAsString(Ljava/lang/Object;)Ljava/lang/String;
  [sshexec] 
  [sshexec]     at 
org.apache.hadoop.hbase.client.Operation.toJSON(Operation.java:67)
  [sshexec] 
  [sshexec]     at 
org.apache.hadoop.hbase.client.Operation.toString(Operation.java:93)
  [sshexec] 
  [sshexec]     at 
org.apache.hadoop.hbase.client.Operation.toString(Operation.java:107)
  [sshexec] 
  [sshexec]     at java.lang.String.valueOf(String.java:2826)
  [sshexec]     at java.lang.StringBuilder.append(StringBuilder.java:115)
  [sshexec]     at 
org.apache.hadoop.hbase.client.coprocessor.AggregationClient.median(AggregationClient.java:473)
  [sshexec]     at 
uk.org.cse.aggregation.EDRPAggregator.testSumWithValidRange(EDRPAggregator.java:55)
  [sshexec]     at 
uk.org.cse.aggregation.EDRPAggregator.main(EDRPAggregator.java:85)
  [sshexec]     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  [sshexec]     at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
  [sshexec]     at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
  [sshexec]     at java.lang.reflect.Method.invoke(Method.java:597)
  [sshexec]     at org.apache.hadoop.util.RunJar.main(RunJar.java:156)

Do you have any idea why we got this error above? It seems the scanner 
toString() method requires the Jackson JSON processor, which is failing for 
some reason...

Also, please can you tell me where log.debug() output can be found? (e.g. 
$HBASE_HOME/logs/xxx-region-....log?)

Please find a patch attached with the fixed median method for HBase 0.92 
branch. Note that I have removed the log.debug() statements.

Thanks,
Tom

-----Original Message-----
From: Ted Yu [mailto:[email protected]] 
Sent: 23 January 2012 15:24
To: [email protected]
Subject: median through AggregationProtocol

Royston:
The exception came from this line:
    ResultScanner scanner = table.getScanner(scan2); Can you help me review the 
logic starting with:
    // scan the region with median and find it
    Scan scan2 = new Scan(scan);
You can log the String form of scan and scan2 before the table.getScanner() 
call.

I think the NPE below reveals that startRow is null (median is in first region).
If that is the case, the following should help:
    if (startRow != null) scan2.setStartRow(startRow);

Thanks

On Mon, Jan 23, 2012 at 5:50 AM, Royston Sellman < 
[email protected]> wrote:

> Hi Ted,
>
> Finally rebuilt branch/0.92 and applied your patch and rebuilt my code.
> Using AggregationClient.sum() on my test table I get the correct result.
> Just swapping to AggregationClient.median() I get the following error:
>
>  [sshexec] org.apache.hadoop.hbase.client.RetriesExhaustedException: 
> Failed after attempts=10, exceptions:
>  [sshexec] Mon Jan 23 13:44:12 GMT 2012, 
> org.apache.hadoop.hbase.client.ScannerCallable@219ba640,
> java.lang.NullPointerException
>  [sshexec] Mon Jan 23 13:44:13 GMT 2012, 
> org.apache.hadoop.hbase.client.ScannerCallable@219ba640,
> java.lang.NullPointerException
>  [sshexec] Mon Jan 23 13:44:14 GMT 2012, 
> org.apache.hadoop.hbase.client.ScannerCallable@219ba640,
> java.lang.NullPointerException
>  [sshexec] Mon Jan 23 13:44:15 GMT 2012, 
> org.apache.hadoop.hbase.client.ScannerCallable@219ba640,
> java.lang.NullPointerException
>  [sshexec] Mon Jan 23 13:44:17 GMT 2012, 
> org.apache.hadoop.hbase.client.ScannerCallable@219ba640,
> java.lang.NullPointerException
>  [sshexec] Mon Jan 23 13:44:19 GMT 2012, 
> org.apache.hadoop.hbase.client.ScannerCallable@219ba640,
> java.lang.NullPointerException
>  [sshexec] Mon Jan 23 13:44:23 GMT 2012, 
> org.apache.hadoop.hbase.client.ScannerCallable@219ba640,
> java.lang.NullPointerException
>  [sshexec] Mon Jan 23 13:44:27 GMT 2012, 
> org.apache.hadoop.hbase.client.ScannerCallable@219ba640,
> java.lang.NullPointerException
>  [sshexec] Mon Jan 23 13:44:35 GMT 2012, 
> org.apache.hadoop.hbase.client.ScannerCallable@219ba640,
> java.lang.NullPointerException
>  [sshexec] Mon Jan 23 13:44:51 GMT 2012, 
> org.apache.hadoop.hbase.client.ScannerCallable@219ba640,
> java.lang.NullPointerException
>  [sshexec]
>  [sshexec] Result = -1
>  [sshexec]     at
>
> org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.
> getRegionServerWithRetries(HConnectionManager.java:1345)
>  [sshexec]     at
>
> org.apache.hadoop.hbase.client.HTable$ClientScanner.nextScanner(HTable.java:
> 1203)
>  [sshexec]     at
>
> org.apache.hadoop.hbase.client.HTable$ClientScanner.initialize(HTable.
> java:1
> 126)
>  [sshexec]     at
> org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:627)
>  [sshexec]     at
>
> org.apache.hadoop.hbase.client.coprocessor.AggregationClient.median(Ag
> gregat
> ionClient.java:469)
>  [sshexec]     at
>
> uk.org.cse.aggregation.EDRPAggregator.testSumWithValidRange(EDRPAggreg
> ator.j
> ava:55)
>  [sshexec]     at
> uk.org.cse.aggregation.EDRPAggregator.main(EDRPAggregator.java:85)
>  [sshexec]     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
>  [sshexec]     at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.j
> ava:39
> )
>  [sshexec]     at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccess
> orImpl
> .java:25)
>  [sshexec]     at java.lang.reflect.Method.invoke(Method.java:597)
>  [sshexec]     at org.apache.hadoop.util.RunJar.main(RunJar.java:156)
>
> Something wrong with Scan setup?
>
> Cheers,
> Royston
>
>
>
> -----Original Message-----
> From: [email protected] [mailto:[email protected]]
> Sent: 21 January 2012 17:14
> To: [email protected]
> Subject: Re: Hbase out of memory error
>
> Benoit's patches are already in 0.92
>
> Thanks
>
>
> On Jan 21, 2012, at 9:11 AM, Royston Sellman 
> <[email protected]> wrote:
>
> > So should I try applying Benoit Sigoure's patch for HBASE-5204? Will 
> > this
> patch be in the 0.92 branch soon?
> >
> > Cheers,
> > Royston
> >
> >
> >
> > On 21 Jan 2012, at 16:58, [email protected] wrote:
> >
> >> That is the correct branch.
> >>
> >> Thanks
> >>
> >>
> >>
> >> On Jan 21, 2012, at 8:50 AM, Royston Sellman
> <[email protected]> wrote:
> >>
> >>> Hi Ted,
> >>>
> >>> Yes, I am compiling with the same HBase jars. I wasn't aware of
> HBASE-5204, thanks, it sounds possible this is my problem. Can you 
> think of anything else I should check?
> >>>
> >>> Just to make sure: I am checking out the code from
> svn.apache.org/repos/asf/hbase/branches/0.92  Is this the correct branch?
> >>>
> >>> Thanks,
> >>> Royston
> >>>
> >>>
> >>> On 20 Jan 2012, at 18:45, Ted Yu wrote:
> >>>
> >>>> Royston:
> >>>> I guess you have seen HBASE-5204. In particular:
> >>>>>> when a 0.92 server fails to deserialize a 0.90-style RPC, it 
> >>>>>> attempts to
> >>>> allocate a large buffer because it doesn't read fields of 
> >>>> 0.90-style RPCs properly.
> >>>>
> >>>> Were your client code compiled with the same version of HBase as 
> >>>> what was running on your cluster ?
> >>>>
> >>>> Thanks
> >>>>
> >>>> On Fri, Jan 20, 2012 at 9:20 AM, Royston Sellman < 
> >>>> [email protected]> wrote:
> >>>>
> >>>>> Trying to run my code (a test of Aggregation Protocol and an MR 
> >>>>> HBase table
> >>>>> loader) on latest build of 0.92.0 (r1232715) I get an 'old server'
> >>>>> warning (I've seen this before and it's always been non-fatal) 
> >>>>> then an out of memory exception then job hangs:
> >>>>>
> >>>>>
> >>>>>
> >>>>> [sshexec] 12/01/20 16:56:48 WARN zookeeper.ClientCnxnSocket:
> >>>>> Connected to an old server; r-o mode will be unavailable
> >>>>>
> >>>>> [sshexec] 12/01/20 16:56:48 INFO zookeeper.ClientCnxn: Session 
> >>>>> establishment complete on server namenode/10.0.0.235:2181, 
> >>>>> sessionid = 0x34cda4e5d000e5, negotiated timeout = 40000
> >>>>>
> >>>>> [sshexec] 12/01/20 16:56:49 WARN ipc.HBaseClient: Unexpected 
> >>>>> exception receiving call responses
> >>>>>
> >>>>> [sshexec] java.lang.OutOfMemoryError: Java heap space
> >>>>>
> >>>>> [sshexec]       at java.lang.reflect.Array.newArray(Native Method)
> >>>>>
> >>>>> [sshexec]       at java.lang.reflect.Array.newInstance(Array.java:52)
> >>>>>
> >>>>> [sshexec]       at
> >>>>>
> >>>>> org.apache.hadoop.hbase.io.HbaseObjectWritable.readObject(HbaseO
> >>>>> bj
> >>>>> ectWritabl
> >>>>> e.java:542)
> >>>>>
> >>>>> [sshexec]       at
> >>>>>
> >>>>> org.apache.hadoop.hbase.io.HbaseObjectWritable.readFields(HbaseO
> >>>>> bj
> >>>>> ectWritabl
> >>>>> e.java:289)
> >>>>>
> >>>>> [sshexec]       at
> >>>>>
> >>>>> org.apache.hadoop.hbase.ipc.HBaseClient$Connection.receiveRespon
> >>>>> se
> >>>>> (HBaseClie
> >>>>> nt.java:593)
> >>>>>
> >>>>> [sshexec]       at
> >>>>>
> >>>>> org.apache.hadoop.hbase.ipc.HBaseClient$Connection.run(HBaseClie
> >>>>> nt
> >>>>> .java:505)
> >>>>>
> >>>>>
> >>>>>
> >>>>> Hbase shell seems to work (I can list and scan my tables).
> >>>>>
> >>>>>
> >>>>>
> >>>>> If I svn roll back to 12 Jan 0.92 and rebuild my code works.
> >>>>>
> >>>>>
> >>>>>
> >>>>> Tried setting export HBASE_HEAPSIZE=1500 but got same error.
> >>>>>
> >>>>>
> >>>>>
> >>>>> Nothing significant in logs.
> >>>>>
> >>>>>
> >>>>>
> >>>>> [Note to Ted Yu: I need to fix this so I can carry on testing on 
> >>>>> Aggregation Protocol]
> >>>>>
> >>>>>
> >>>>>
> >>>>> Best Regards,
> >>>>>
> >>>>> Royston
> >>>>>
> >>>>>
> >>>
> >
>
>
Index: org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java
===================================================================
--- org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java   
(revision 1234879)
+++ org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java   
(working copy)
@@ -23,13 +23,20 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.coprocessor.AggregateProtocol;
 import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter;
@@ -362,4 +369,133 @@
     return res;
   }
 
+  /**
+   * It helps locate the region with median for a given column whose weight 
+   * is specified in an optional column.
+   * From individual regions, it obtains sum of values and sum of weights.
+   * @param tableName
+   * @param ci
+   * @param scan
+   * @return pair whose first element is a map between start row of the region
+   *  and (sum of values, sum of weights) for the region, the second element is
+   *  (sum of values, sum of weights) for all the regions chosen
+   * @throws Throwable
+   */
+  private <R, S> Pair<NavigableMap<byte[], List<S>>, List<S>>
+  getMedianArgs(final byte[] tableName,
+      final ColumnInterpreter<R, S> ci, final Scan scan) throws Throwable {
+    validateParameters(scan);
+    final NavigableMap<byte[], List<S>> map =
+      new TreeMap<byte[], List<S>>(Bytes.BYTES_COMPARATOR);
+    class StdCallback implements Batch.Callback<List<S>> {
+      S sumVal = null, sumWeights = null;
+
+      public Pair<NavigableMap<byte[], List<S>>, List<S>> getMedianParams() {
+        List<S> l = new ArrayList<S>();
+        l.add(sumVal);
+        l.add(sumWeights);
+        Pair<NavigableMap<byte[], List<S>>, List<S>> p =
+          new Pair<NavigableMap<byte[], List<S>>, List<S>>(map, l);
+        return p;
+      }
+
+      @Override
+      public synchronized void update(byte[] region, byte[] row, List<S> 
result) {
+        map.put(row, result);
+        sumVal = ci.add(sumVal, result.get(0));
+        sumWeights = ci.add(sumWeights, result.get(1));
+      }
+    }
+    StdCallback stdCallback = new StdCallback();
+    HTable table = new HTable(conf, tableName);
+    table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan
+        .getStopRow(),
+        new Batch.Call<AggregateProtocol, List<S>>() {
+          @Override
+          public List<S> call(AggregateProtocol instance)
+              throws IOException {
+            return instance.getMedian(ci, scan);
+          }
+
+        }, stdCallback);
+    return stdCallback.getMedianParams();
+  }
+
+  /**
+   * This is the client side interface/handler for calling the median method 
for a
+   * given cf-cq combination. This method collects the necessary parameters
+   * to compute the median and returns the median.
+   * @param tableName
+   * @param ci
+   * @param scan
+   * @return R the median
+   * @throws Throwable
+   */
+  public <R, S> R median(final byte[] tableName, ColumnInterpreter<R, S> ci,
+      Scan scan) throws Throwable {
+    Pair<NavigableMap<byte[], List<S>>, List<S>> p = getMedianArgs(tableName, 
ci, scan);
+    byte[] startRow = null;
+    byte[] colFamily = scan.getFamilies()[0];
+    NavigableSet<byte[]> quals = scan.getFamilyMap().get(colFamily);
+    NavigableMap<byte[], List<S>> map = p.getFirst();
+    S sumVal = p.getSecond().get(0);
+    S sumWeights = p.getSecond().get(1);
+    double halfSumVal = ci.divideForAvg(sumVal, 2L);
+    double movingSumVal = 0;
+    boolean weighted = false;
+    if (quals.size() > 1) {
+      weighted = true;
+      halfSumVal = ci.divideForAvg(sumWeights, 2L);
+    }
+    
+    for (Map.Entry<byte[], List<S>> entry : map.entrySet()) {
+      S s = weighted ? entry.getValue().get(1) : entry.getValue().get(0);
+      double newSumVal = movingSumVal + ci.divideForAvg(s, 1L);
+      if (newSumVal > halfSumVal) break;  // we found the region with the 
median
+      movingSumVal = newSumVal;
+      startRow = entry.getKey();
+    }
+    // scan the region with median and find it
+    Scan scan2 = new Scan(scan);
+    // inherit stop row from method parameter
+    if (startRow != null) scan2.setStartRow(startRow);
+    HTable table = new HTable(conf, tableName);
+    int cacheSize = scan2.getCaching();
+    if (!scan2.getCacheBlocks() || scan2.getCaching() < 2) {
+      scan2.setCacheBlocks(true);
+      cacheSize = 5;
+      scan2.setCaching(cacheSize);
+    }
+    ResultScanner scanner = table.getScanner(scan2);
+    Result[] results = null;
+    byte[] qualifier = quals.pollFirst();
+    // qualifier for the weight column
+    byte[] weightQualifier = weighted ? quals.pollLast() : qualifier;
+    R value = null;
+    try {
+      do {
+        results = scanner.next(cacheSize);
+        if (results != null && results.length > 0) {
+          for (int i = 0; i < results.length; i++) {
+            Result r = results[i];
+            // retrieve weight
+            KeyValue kv = r.getColumnLatest(colFamily, weightQualifier);
+            R newValue = ci.getValue(colFamily, weightQualifier, kv);
+            S s = ci.castToReturnType(newValue);
+            double newSumVal = movingSumVal + ci.divideForAvg(s, 1L);
+            // see if we have moved past the median
+            if (newSumVal > halfSumVal) {
+              return value;
+            }
+            movingSumVal = newSumVal;
+            kv = r.getColumnLatest(colFamily, qualifier);
+            value = ci.getValue(colFamily, qualifier, kv);
+          }
+        }
+      } while (results != null && results.length > 0);
+    } finally {
+      scanner.close();
+    }
+    return null;
+  }
 }
Index: org/apache/hadoop/hbase/coprocessor/AggregateProtocol.java
===================================================================
--- org/apache/hadoop/hbase/coprocessor/AggregateProtocol.java  (revision 
1234879)
+++ org/apache/hadoop/hbase/coprocessor/AggregateProtocol.java  (working copy)
@@ -126,4 +126,19 @@
   <T, S> Pair<List<S>, Long> getStd(ColumnInterpreter<T, S> ci, Scan scan)
       throws IOException;
 
+  /**
+   * Gives a List containing sum of values and sum of weights.
+   * It is computed for the combination of column
+   * family and column qualifier(s) in the given row range as defined in the
+   * Scan object. In its current implementation, it takes one column family and
+   * two column qualifiers. The first qualifier is for values column and 
+   * the second qualifier (optional) is for weight column.
+   * @param ci
+   * @param scan
+   * @return Pair
+   * @throws IOException
+   */
+  <T, S> List<S> getMedian(ColumnInterpreter<T, S> ci, Scan scan)
+      throws IOException;
+
 }
\ No newline at end of file
Index: org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java
===================================================================
--- org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java    
(revision 1234879)
+++ org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java    
(working copy)
@@ -22,6 +22,7 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.NavigableSet;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -232,4 +233,45 @@
     return p;
   }
 
+  @Override
+  public <T, S> List<S> getMedian(ColumnInterpreter<T, S> ci, Scan scan)
+  throws IOException {
+    S sumVal = null, sumWeights = null, tempVal = null, tempWeight = null;
+
+    InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment())
+    .getRegion().getScanner(scan);
+    byte[] colFamily = scan.getFamilies()[0];
+    NavigableSet<byte[]> quals = scan.getFamilyMap().get(colFamily);
+    byte[] valQualifier = quals.pollFirst();
+    // if weighted median is requested, get qualifier for the weight column
+    byte[] weightQualifier = quals.size() > 1 ? quals.pollLast() : null;
+    List<KeyValue> results = new ArrayList<KeyValue>();
+
+    boolean hasMoreRows = false;
+    try {
+      do {
+        tempVal = null;
+        tempWeight = null;
+        hasMoreRows = scanner.next(results);
+        for (KeyValue kv : results) {
+          tempVal = ci.add(tempVal, ci.castToReturnType(ci.getValue(colFamily,
+              valQualifier, kv)));
+          if (weightQualifier != null) {
+            tempWeight = ci.add(tempWeight,
+                ci.castToReturnType(ci.getValue(colFamily, weightQualifier, 
kv)));
+          }
+        }
+        results.clear();
+        sumVal = ci.add(sumVal, tempVal);
+        sumWeights = ci.add(sumWeights, tempWeight);
+      } while (hasMoreRows);
+    } finally {
+      scanner.close();
+    }
+    List<S> l = new ArrayList<S>();
+    l.add(sumVal);
+    l.add(sumWeights == null ? ci.castToReturnType(ci.getMinValue()) : 
sumWeights);
+    return l;
+  }
+  
 }

Reply via email to