HBASE-15177 Reduce garbage created under high load
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a69272ef Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a69272ef Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a69272ef Branch: refs/heads/hbase-12439 Commit: a69272efe12f7b780fbf2fa14c42d0c0b155205f Parents: d5d26f0 Author: Enis Soztutar <e...@apache.org> Authored: Thu Feb 4 11:07:36 2016 -0800 Committer: Enis Soztutar <e...@apache.org> Committed: Thu Feb 4 13:27:00 2016 -0800 ---------------------------------------------------------------------- .../hadoop/hbase/client/ScannerCallable.java | 14 ++- .../hadoop/hbase/ipc/AsyncRpcChannel.java | 7 +- .../org/apache/hadoop/hbase/ipc/IPCUtil.java | 20 ++-- .../hbase/ipc/PayloadCarryingRpcController.java | 7 +- .../apache/hadoop/hbase/ipc/RpcClientImpl.java | 6 +- .../hadoop/hbase/protobuf/ProtobufUtil.java | 19 +++- .../hadoop/hbase/client/TestClientScanner.java | 2 +- .../apache/hadoop/hbase/ipc/TestIPCUtil.java | 4 +- .../hadoop/hbase/io/ByteBufferInputStream.java | 107 +++++++++++++++++++ .../org/apache/hadoop/hbase/util/Threads.java | 2 +- .../org/apache/hadoop/hbase/ipc/RpcServer.java | 54 ++++++---- .../AnnotationReadingPriorityFunction.java | 9 +- .../hadoop/hbase/regionserver/HRegion.java | 6 +- .../hbase/regionserver/RSRpcServices.java | 15 ++- 14 files changed, 210 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/a69272ef/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java index f6445a6..72d69ec 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java @@ -191,6 +191,13 @@ public class ScannerCallable extends RegionServerCallable<Result[]> { if (Thread.interrupted()) { throw new InterruptedIOException(); } + + if (controller == null) { + controller = controllerFactory.newController(); + controller.setPriority(getTableName()); + controller.setCallTimeout(callTimeout); + } + if (closed) { if (scannerId != -1) { close(); @@ -209,9 +216,6 @@ public class ScannerCallable extends RegionServerCallable<Result[]> { RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq, this.scanMetrics != null, renew); ScanResponse response = null; - controller = controllerFactory.newController(); - controller.setPriority(getTableName()); - controller.setCallTimeout(callTimeout); try { response = getStub().scan(controller, request); // Client and RS maintain a nextCallSeq number during the scan. Every next() call @@ -371,7 +375,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> { ScanRequest request = RequestConverter.buildScanRequest(this.scannerId, 0, true, this.scanMetrics != null); try { - getStub().scan(null, request); + getStub().scan(controller, request); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } @@ -388,7 +392,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> { getLocation().getRegionInfo().getRegionName(), this.scan, 0, false); try { - ScanResponse response = getStub().scan(null, request); + ScanResponse response = getStub().scan(controller, request); long id = response.getScannerId(); if (logScannerActivity) { LOG.info("Open scanner=" + id + " for scan=" + scan.toString() http://git-wip-us.apache.org/repos/asf/hbase/blob/a69272ef/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java index 69978fc..787aa47 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java @@ -412,7 +412,7 @@ public class AsyncRpcChannel { requestHeaderBuilder.setCellBlockMeta(cellBlockBuilder.build()); } // Only pass priority if there one. Let zero be same as no priority. - if (call.controller.getPriority() != 0) { + if (call.controller.getPriority() != PayloadCarryingRpcController.PRIORITY_UNSET) { requestHeaderBuilder.setPriority(call.controller.getPriority()); } @@ -660,6 +660,7 @@ public class AsyncRpcChannel { private void handleSaslConnectionFailure(final int currRetries, final Throwable ex, final UserGroupInformation user) throws IOException, InterruptedException { user.doAs(new PrivilegedExceptionAction<Void>() { + @Override public Void run() throws IOException, InterruptedException { if (shouldAuthenticateOverKrb()) { if (currRetries < MAX_SASL_RETRIES) { @@ -702,12 +703,12 @@ public class AsyncRpcChannel { public int getConnectionHashCode() { return ConnectionId.hashCode(ticket, serviceName, address); } - + @Override public int hashCode() { return getConnectionHashCode(); } - + @Override public boolean equals(Object obj) { if (obj instanceof AsyncRpcChannel) { http://git-wip-us.apache.org/repos/asf/hbase/blob/a69272ef/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java index 734227c..22c5cc1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hbase.ipc; -import java.io.ByteArrayInputStream; import java.io.DataInput; import java.io.IOException; import java.io.InputStream; @@ -36,6 +35,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.io.BoundedByteBufferPool; +import org.apache.hadoop.hbase.io.ByteBufferInputStream; import org.apache.hadoop.hbase.io.ByteBufferOutputStream; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.util.Bytes; @@ -180,19 +180,18 @@ public class IPCUtil { public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor, final byte [] cellBlock) throws IOException { - return createCellScanner(codec, compressor, cellBlock, 0, cellBlock.length); + return createCellScanner(codec, compressor, ByteBuffer.wrap(cellBlock)); } /** * @param codec - * @param cellBlock - * @param offset - * @param length + * @param cellBlock ByteBuffer containing the cells written by the Codec. The buffer should be + * position()'ed at the start of the cell block and limit()'ed at the end. * @return CellScanner to work against the content of <code>cellBlock</code> * @throws IOException */ public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor, - final byte [] cellBlock, final int offset, final int length) + final ByteBuffer cellBlock) throws IOException { // If compressed, decompress it first before passing it on else we will leak compression // resources if the stream is not closed properly after we let it out. @@ -202,18 +201,17 @@ public class IPCUtil { if (compressor instanceof Configurable) ((Configurable)compressor).setConf(this.conf); Decompressor poolDecompressor = CodecPool.getDecompressor(compressor); CompressionInputStream cis = - compressor.createInputStream(new ByteArrayInputStream(cellBlock, offset, length), - poolDecompressor); + compressor.createInputStream(new ByteBufferInputStream(cellBlock), poolDecompressor); ByteBufferOutputStream bbos = null; try { // TODO: This is ugly. The buffer will be resized on us if we guess wrong. // TODO: Reuse buffers. - bbos = new ByteBufferOutputStream((length - offset) * + bbos = new ByteBufferOutputStream(cellBlock.remaining() * this.cellBlockDecompressionMultiplier); IOUtils.copy(cis, bbos); bbos.close(); ByteBuffer bb = bbos.getByteBuffer(); - is = new ByteArrayInputStream(bb.array(), 0, bb.limit()); + is = new ByteBufferInputStream(bb); } finally { if (is != null) is.close(); if (bbos != null) bbos.close(); @@ -221,7 +219,7 @@ public class IPCUtil { CodecPool.returnDecompressor(poolDecompressor); } } else { - is = new ByteArrayInputStream(cellBlock, offset, length); + is = new ByteBufferInputStream(cellBlock); } return codec.getDecoder(is); } http://git-wip-us.apache.org/repos/asf/hbase/blob/a69272ef/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java index 09f4323..f4f18b3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java @@ -35,14 +35,14 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; @InterfaceAudience.Private public class PayloadCarryingRpcController extends TimeLimitedRpcController implements CellScannable { + + public static final int PRIORITY_UNSET = -1; /** * Priority to set on this request. Set it here in controller so available composing the * request. This is the ordained way of setting priorities going forward. We will be * undoing the old annotation-based mechanism. */ - // Currently only multi call makes use of this. Eventually this should be only way to set - // priority. - private int priority = HConstants.NORMAL_QOS; + private int priority = PRIORITY_UNSET; /** * They are optionally set on construction, cleared after we make the call, and then optionally @@ -67,6 +67,7 @@ public class PayloadCarryingRpcController /** * @return One-shot cell scanner (you cannot back it up and restart) */ + @Override public CellScanner cellScanner() { return cellScanner; } http://git-wip-us.apache.org/repos/asf/hbase/blob/a69272ef/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java index 940fcd1..83d4adf 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java @@ -899,8 +899,10 @@ public class RpcClientImpl extends AbstractRpcClient { cellBlockBuilder.setLength(cellBlock.limit()); builder.setCellBlockMeta(cellBlockBuilder.build()); } - // Only pass priority if there one. Let zero be same as no priority. - if (priority != 0) builder.setPriority(priority); + // Only pass priority if there is one set. + if (priority != PayloadCarryingRpcController.PRIORITY_UNSET) { + builder.setPriority(priority); + } RequestHeader header = builder.build(); setupIOstreams(); http://git-wip-us.apache.org/repos/asf/hbase/blob/a69272ef/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java index 7cd0d91..fe76780 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java @@ -2430,13 +2430,13 @@ public final class ProtobufUtil { */ public static String getRegionEncodedName( final RegionSpecifier regionSpecifier) throws DoNotRetryIOException { - byte[] value = regionSpecifier.getValue().toByteArray(); + ByteString value = regionSpecifier.getValue(); RegionSpecifierType type = regionSpecifier.getType(); switch (type) { case REGION_NAME: - return HRegionInfo.encodeRegionName(value); + return HRegionInfo.encodeRegionName(value.toByteArray()); case ENCODED_REGION_NAME: - return Bytes.toString(value); + return value.toStringUtf8(); default: throw new DoNotRetryIOException( "Unsupported region specifier type: " + type); @@ -3135,6 +3135,19 @@ public final class ProtobufUtil { codedInput.checkLastTagWas(0); } + public static void mergeFrom(Message.Builder builder, CodedInputStream codedInput, int length) + throws IOException { + codedInput.resetSizeCounter(); + int prevLimit = codedInput.setSizeLimit(length); + + int limit = codedInput.pushLimit(length); + builder.mergeFrom(codedInput); + codedInput.popLimit(limit); + + codedInput.checkLastTagWas(0); + codedInput.setSizeLimit(prevLimit); + } + public static ReplicationLoadSink toReplicationLoadSink( ClusterStatusProtos.ReplicationLoadSink cls) { return new ReplicationLoadSink(cls.getAgeOfLastAppliedOp(), cls.getTimeStampsOfLastAppliedOp()); http://git-wip-us.apache.org/repos/asf/hbase/blob/a69272ef/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java index 6d7cc7f..f083001 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java @@ -522,7 +522,7 @@ public class TestClientScanner { anyBoolean(), anyInt())).thenReturn(new RegionLocations(null, null, null)); try (MockClientScanner scanner = new MockClientScanner(conf, scan, TableName.valueOf("table"), - clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) { + clusterConn, rpcFactory, new RpcControllerFactory(conf), pool, Integer.MAX_VALUE)) { Iterator<Result> iter = scanner.iterator(); while (iter.hasNext()) { iter.next(); http://git-wip-us.apache.org/repos/asf/hbase/blob/a69272ef/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java index 163be70..0038aec 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java @@ -58,7 +58,7 @@ public class TestIPCUtil { public void before() { this.util = new IPCUtil(new Configuration()); } - + @Test public void testBuildCellBlock() throws IOException { doBuildCellBlockUndoCellBlock(this.util, new KeyValueCodec(), null); @@ -79,7 +79,7 @@ public class TestIPCUtil { CellScanner cellScanner = sized? getSizedCellScanner(cells): CellUtil.createCellScanner(Arrays.asList(cells).iterator()); ByteBuffer bb = util.buildCellBlock(codec, compressor, cellScanner); - cellScanner = util.createCellScanner(codec, compressor, bb.array(), 0, bb.limit()); + cellScanner = util.createCellScanner(codec, compressor, bb); int i = 0; while (cellScanner.advance()) { i++; http://git-wip-us.apache.org/repos/asf/hbase/blob/a69272ef/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferInputStream.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferInputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferInputStream.java new file mode 100644 index 0000000..8aee07b --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferInputStream.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io; + +import java.io.InputStream; +import java.nio.ByteBuffer; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.ByteBufferUtils; + +/** + * Not thread safe! + * <p> + * Please note that the reads will cause position movement on wrapped ByteBuffer. + */ +@InterfaceAudience.Private +public class ByteBufferInputStream extends InputStream { + + private ByteBuffer buf; + + public ByteBufferInputStream(ByteBuffer buf) { + this.buf = buf; + } + + /** + * Reads the next byte of data from this input stream. The value byte is returned as an + * <code>int</code> in the range <code>0</code> to <code>255</code>. If no byte is available + * because the end of the stream has been reached, the value <code>-1</code> is returned. + * @return the next byte of data, or <code>-1</code> if the end of the stream has been reached. + */ + @Override + public int read() { + if (this.buf.hasRemaining()) { + return (this.buf.get() & 0xff); + } + return -1; + } + + /** + * Reads up to next <code>len</code> bytes of data from buffer into passed array(starting from + * given offset). + * @param b the array into which the data is read. + * @param off the start offset in the destination array <code>b</code> + * @param len the maximum number of bytes to read. + * @return the total number of bytes actually read into the buffer, or <code>-1</code> if not even + * 1 byte can be read because the end of the stream has been reached. + */ + @Override + public int read(byte[] b, int off, int len) { + int avail = available(); + if (avail <= 0) { + return -1; + } + + if (len > avail) { + len = avail; + } + if (len <= 0) { + return 0; + } + + ByteBufferUtils.copyFromBufferToArray(b, this.buf, this.buf.position(), off, len); + this.buf.position(this.buf.position() + len); // we should advance the buffer position + return len; + } + + /** + * Skips <code>n</code> bytes of input from this input stream. Fewer bytes might be skipped if the + * end of the input stream is reached. The actual number <code>k</code> of bytes to be skipped is + * equal to the smaller of <code>n</code> and remaining bytes in the stream. + * @param n the number of bytes to be skipped. + * @return the actual number of bytes skipped. + */ + @Override + public long skip(long n) { + long k = Math.min(n, available()); + if (k < 0) { + k = 0; + } + this.buf.position((int) (this.buf.position() + k)); + return k; + } + + /** + * @return the number of remaining bytes that can be read (or skipped + * over) from this input stream. + */ + @Override + public int available() { + return this.buf.remaining(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/a69272ef/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java index c366762..d10e0f2 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java @@ -45,7 +45,7 @@ public class Threads { private static final Log LOG = LogFactory.getLog(Threads.class); private static final AtomicInteger poolNumber = new AtomicInteger(1); - private static UncaughtExceptionHandler LOGGING_EXCEPTION_HANDLER = + public static final UncaughtExceptionHandler LOGGING_EXCEPTION_HANDLER = new UncaughtExceptionHandler() { @Override public void uncaughtException(Thread t, Throwable e) { http://git-wip-us.apache.org/repos/asf/hbase/blob/a69272ef/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 98669e9..58fc598 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -84,6 +84,7 @@ import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.io.BoundedByteBufferPool; +import org.apache.hadoop.hbase.io.ByteBufferInputStream; import org.apache.hadoop.hbase.io.ByteBufferOutputStream; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.monitoring.TaskMonitor; @@ -110,6 +111,7 @@ import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Counter; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Writable; @@ -529,10 +531,12 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { return this.size; } + @Override public long getResponseCellSize() { return responseCellSize; } + @Override public void incrementResponseCellSize(long cellSize) { responseCellSize += cellSize; } @@ -621,7 +625,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { readPool = Executors.newFixedThreadPool(readThreads, new ThreadFactoryBuilder().setNameFormat( "RpcServer.reader=%d,bindAddress=" + bindAddress.getHostName() + - ",port=" + port).setDaemon(true).build()); + ",port=" + port).setDaemon(true) + .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); for (int i = 0; i < readThreads; ++i) { Reader reader = new Reader(); readers[i] = reader; @@ -898,7 +903,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { throw ieo; } catch (Exception e) { if (LOG.isDebugEnabled()) { - LOG.debug(getName() + ": Caught exception while reading:" + e.getMessage()); + LOG.debug(getName() + ": Caught exception while reading:", e); } count = -1; //so that the (count < 0) block is executed } @@ -944,6 +949,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { Responder() throws IOException { this.setName("RpcServer.responder"); this.setDaemon(true); + this.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER); writeSelector = Selector.open(); // create a selector } @@ -1361,17 +1367,18 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { return authorizedUgi; } - private void saslReadAndProcess(byte[] saslToken) throws IOException, + private void saslReadAndProcess(ByteBuffer saslToken) throws IOException, InterruptedException { if (saslContextEstablished) { if (LOG.isTraceEnabled()) - LOG.trace("Have read input token of size " + saslToken.length + LOG.trace("Have read input token of size " + saslToken.limit() + " for processing by saslServer.unwrap()"); if (!useWrap) { processOneRpc(saslToken); } else { - byte [] plaintextData = saslServer.unwrap(saslToken, 0, saslToken.length); + byte[] b = saslToken.array(); + byte [] plaintextData = saslServer.unwrap(b, saslToken.position(), saslToken.limit()); processUnwrappedData(plaintextData); } } else { @@ -1420,10 +1427,10 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } } if (LOG.isDebugEnabled()) { - LOG.debug("Have read input token of size " + saslToken.length + LOG.debug("Have read input token of size " + saslToken.limit() + " for processing by saslServer.evaluateResponse()"); } - replyToken = saslServer.evaluateResponse(saslToken); + replyToken = saslServer.evaluateResponse(saslToken.array()); } catch (IOException e) { IOException sendToClient = e; Throwable cause = e; @@ -1619,6 +1626,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { throw new IllegalArgumentException("Unexpected data length " + dataLength + "!! from " + getHostAddress()); } + + // TODO: check dataLength against some limit so that the client cannot OOM the server data = ByteBuffer.allocate(dataLength); // Increment the rpc count. This counter will be decreased when we write @@ -1648,9 +1657,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } if (useSasl) { - saslReadAndProcess(data.array()); + saslReadAndProcess(data); } else { - processOneRpc(data.array()); + processOneRpc(data); } } finally { @@ -1679,8 +1688,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { } // Reads the connection header following version - private void processConnectionHeader(byte[] buf) throws IOException { - this.connectionHeader = ConnectionHeader.parseFrom(buf); + private void processConnectionHeader(ByteBuffer buf) throws IOException { + this.connectionHeader = ConnectionHeader.parseFrom( + new ByteBufferInputStream(buf)); String serviceName = connectionHeader.getServiceName(); if (serviceName == null) throw new EmptyServiceNameException(); this.service = getService(services, serviceName); @@ -1794,13 +1804,14 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { if (unwrappedData.remaining() == 0) { unwrappedDataLengthBuffer.clear(); unwrappedData.flip(); - processOneRpc(unwrappedData.array()); + processOneRpc(unwrappedData); unwrappedData = null; } } } - private void processOneRpc(byte[] buf) throws IOException, InterruptedException { + + private void processOneRpc(ByteBuffer buf) throws IOException, InterruptedException { if (connectionHeaderRead) { processRequest(buf); } else { @@ -1822,16 +1833,16 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { * @throws IOException * @throws InterruptedException */ - protected void processRequest(byte[] buf) throws IOException, InterruptedException { - long totalRequestSize = buf.length; + protected void processRequest(ByteBuffer buf) throws IOException, InterruptedException { + long totalRequestSize = buf.limit(); int offset = 0; // Here we read in the header. We avoid having pb // do its default 4k allocation for CodedInputStream. We force it to use backing array. - CodedInputStream cis = CodedInputStream.newInstance(buf, offset, buf.length); + CodedInputStream cis = CodedInputStream.newInstance(buf.array(), offset, buf.limit()); int headerSize = cis.readRawVarint32(); offset = cis.getTotalBytesRead(); Message.Builder builder = RequestHeader.newBuilder(); - ProtobufUtil.mergeFrom(builder, buf, offset, headerSize); + ProtobufUtil.mergeFrom(builder, cis, headerSize); RequestHeader header = (RequestHeader) builder.build(); offset += headerSize; int id = header.getCallId(); @@ -1862,19 +1873,18 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver { md = this.service.getDescriptorForType().findMethodByName(header.getMethodName()); if (md == null) throw new UnsupportedOperationException(header.getMethodName()); builder = this.service.getRequestPrototype(md).newBuilderForType(); - // To read the varint, I need an inputstream; might as well be a CIS. - cis = CodedInputStream.newInstance(buf, offset, buf.length); + cis.resetSizeCounter(); int paramSize = cis.readRawVarint32(); offset += cis.getTotalBytesRead(); if (builder != null) { - ProtobufUtil.mergeFrom(builder, buf, offset, paramSize); + ProtobufUtil.mergeFrom(builder, cis, paramSize); param = builder.build(); } offset += paramSize; } if (header.hasCellBlockMeta()) { - cellScanner = ipcUtil.createCellScanner(this.codec, this.compressionCodec, - buf, offset, buf.length); + buf.position(offset); + cellScanner = ipcUtil.createCellScanner(this.codec, this.compressionCodec, buf); } } catch (Throwable t) { InetSocketAddress address = getListenerAddress(); http://git-wip-us.apache.org/repos/asf/hbase/blob/a69272ef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java index cfdbce0..8438378 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java @@ -35,7 +35,6 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoReque import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier; @@ -217,10 +216,10 @@ public class AnnotationReadingPriorityFunction implements PriorityFunction { if (param == null) { return HConstants.NORMAL_QOS; } - if (param instanceof MultiRequest) { - // The multi call has its priority set in the header. All calls should work this way but - // only this one has been converted so far. No priority == NORMAL_QOS. - return header.hasPriority()? header.getPriority(): HConstants.NORMAL_QOS; + + // Trust the client-set priorities if set + if (header.hasPriority()) { + return header.getPriority(); } String cls = param.getClass().getName(); http://git-wip-us.apache.org/repos/asf/hbase/blob/a69272ef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 3cf4122..99b571f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -5177,6 +5177,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * @param readLock is the lock reader or writer. True indicates that a non-exlcusive * lock is requested */ + @Override public RowLock getRowLock(byte[] row, boolean readLock) throws IOException { // Make sure the row is inside of this region before getting the lock for it. checkRow(row, "row lock"); @@ -5592,8 +5593,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // Here we separate all scanners into two lists - scanner that provide data required // by the filter to operate (scanners list) and all others (joinedScanners list). - List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(); - List<KeyValueScanner> joinedScanners = new ArrayList<KeyValueScanner>(); + List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(scan.getFamilyMap().size()); + List<KeyValueScanner> joinedScanners + = new ArrayList<KeyValueScanner>(scan.getFamilyMap().size()); if (additionalScanners != null) { scanners.addAll(additionalScanners); } http://git-wip-us.apache.org/repos/asf/hbase/blob/a69272ef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 7eaadc2..3e133c4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -1147,8 +1147,19 @@ public class RSRpcServices implements HBaseRPCErrorHandler, */ Region getRegion( final RegionSpecifier regionSpecifier) throws IOException { - return regionServer.getRegionByEncodedName(regionSpecifier.getValue().toByteArray(), - ProtobufUtil.getRegionEncodedName(regionSpecifier)); + ByteString value = regionSpecifier.getValue(); + RegionSpecifierType type = regionSpecifier.getType(); + switch (type) { + case REGION_NAME: + byte[] regionName = value.toByteArray(); + String encodedRegionName = HRegionInfo.encodeRegionName(regionName); + return regionServer.getRegionByEncodedName(regionName, encodedRegionName); + case ENCODED_REGION_NAME: + return regionServer.getRegionByEncodedName(value.toStringUtf8()); + default: + throw new DoNotRetryIOException( + "Unsupported region specifier type: " + type); + } } @VisibleForTesting