HBASE-18553 Expose scan cursor for asynchronous scanner
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/4c74a73d Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/4c74a73d Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/4c74a73d Branch: refs/heads/HBASE-14070.HLC Commit: 4c74a73d57e09fd2c0ecde862a196c28dc6cd219 Parents: 2a9cdd5 Author: zhangduo <zhang...@apache.org> Authored: Tue Aug 15 17:15:06 2017 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Wed Aug 16 21:04:57 2017 +0800 ---------------------------------------------------------------------- .../AsyncScanSingleRegionRpcRetryingCaller.java | 35 ++++- .../hbase/client/AsyncTableResultScanner.java | 20 ++- .../hbase/client/RawScanResultConsumer.java | 11 +- .../client/AbstractTestResultScannerCursor.java | 89 +++++++++++ .../client/TestAsyncResultScannerCursor.java | 49 ++++++ .../hbase/client/TestRawAsyncScanCursor.java | 157 +++++++++++++------ .../hbase/client/TestResultScannerCursor.java | 34 ++++ .../hadoop/hbase/client/TestScanCursor.java | 90 ----------- 8 files changed, 330 insertions(+), 155 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/4c74a73d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java index 02a4357..d16cb8b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.shaded.io.netty.util.Timeout; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -157,10 +158,9 @@ class AsyncScanSingleRegionRpcRetryingCaller { private ScanResumerImpl resumer; - public ScanControllerImpl(ScanResponse resp) { - callerThread = Thread.currentThread(); - cursor = resp.hasCursor() ? Optional.of(ProtobufUtil.toCursor(resp.getCursor())) - : Optional.empty(); + public ScanControllerImpl(Optional<Cursor> cursor) { + this.callerThread = Thread.currentThread(); + this.cursor = cursor; } private void preCheck() { @@ -476,10 +476,11 @@ class AsyncScanSingleRegionRpcRetryingCaller { } updateServerSideMetrics(scanMetrics, resp); boolean isHeartbeatMessage = resp.hasHeartbeatMessage() && resp.getHeartbeatMessage(); + Result[] rawResults; Result[] results; int numberOfCompleteRowsBefore = resultCache.numberOfCompleteRows(); try { - Result[] rawResults = ResponseConverter.getResults(controller.cellScanner(), resp); + rawResults = ResponseConverter.getResults(controller.cellScanner(), resp); updateResultsMetrics(scanMetrics, rawResults, isHeartbeatMessage); results = resultCache.addAndGet( Optional.ofNullable(rawResults).orElse(ScanResultCache.EMPTY_RESULT_ARRAY), @@ -493,12 +494,30 @@ class AsyncScanSingleRegionRpcRetryingCaller { return; } - ScanControllerImpl scanController = new ScanControllerImpl(resp); + ScanControllerImpl scanController; if (results.length > 0) { + scanController = new ScanControllerImpl( + resp.hasCursor() ? Optional.of(ProtobufUtil.toCursor(resp.getCursor())) + : Optional.empty()); updateNextStartRowWhenError(results[results.length - 1]); consumer.onNext(results, scanController); - } else if (resp.hasHeartbeatMessage() && resp.getHeartbeatMessage()) { - consumer.onHeartbeat(scanController); + } else { + Optional<Cursor> cursor = Optional.empty(); + if (resp.hasCursor()) { + cursor = Optional.of(ProtobufUtil.toCursor(resp.getCursor())); + } else if (scan.isNeedCursorResult() && rawResults.length > 0) { + // It is size limit exceed and we need to return the last Result's row. + // When user setBatch and the scanner is reopened, the server may return Results that + // user has seen and the last Result can not be seen because the number is not enough. + // So the row keys of results may not be same, we must use the last one. + cursor = Optional.of(new Cursor(rawResults[rawResults.length - 1].getRow())); + } + scanController = new ScanControllerImpl(cursor); + if (isHeartbeatMessage || cursor.isPresent()) { + // only call onHeartbeat if server tells us explicitly this is a heartbeat message, or we + // want to pass a cursor to upper layer. + consumer.onHeartbeat(scanController); + } } ScanControllerState state = scanController.destroy(); if (state == ScanControllerState.TERMINATED) { http://git-wip-us.apache.org/repos/asf/hbase/blob/4c74a73d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java index 28a5568..3050084 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java @@ -19,9 +19,6 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize; -import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables; - import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayDeque; @@ -31,6 +28,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; +import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables; /** * The {@link ResultScanner} implementation for {@link AsyncTable}. It will fetch data automatically @@ -46,6 +45,8 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer { private final long maxCacheSize; + private final Scan scan; + private final Queue<Result> queue = new ArrayDeque<>(); private ScanMetrics scanMetrics; @@ -61,6 +62,7 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer { public AsyncTableResultScanner(RawAsyncTable table, Scan scan, long maxCacheSize) { this.rawTable = table; this.maxCacheSize = maxCacheSize; + this.scan = scan; table.scan(scan, this); } @@ -98,6 +100,10 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer { public synchronized void onHeartbeat(ScanController controller) { if (closed) { controller.terminate(); + return; + } + if (scan.isNeedCursorResult()) { + controller.cursor().ifPresent(c -> queue.add(Result.createCursorResult(c))); } } @@ -143,9 +149,11 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer { } } Result result = queue.poll(); - cacheSize -= calcEstimatedSize(result); - if (resumer != null && cacheSize <= maxCacheSize / 2) { - resumePrefetch(); + if (!result.isCursor()) { + cacheSize -= calcEstimatedSize(result); + if (resumer != null && cacheSize <= maxCacheSize / 2) { + resumePrefetch(); + } } return result; } http://git-wip-us.apache.org/repos/asf/hbase/blob/4c74a73d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java index 54d4887..4fedb0f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawScanResultConsumer.java @@ -95,8 +95,15 @@ public interface RawScanResultConsumer { void onNext(Result[] results, ScanController controller); /** - * Indicate that there is an heartbeat message but we have not cumulated enough cells to call - * onNext. + * Indicate that there is a heartbeat message but we have not cumulated enough cells to call + * {@link #onNext(Result[], ScanController)}. + * <p> + * Note that this method will always be called when RS returns something to us but we do not have + * enough cells to call {@link #onNext(Result[], ScanController)}. Sometimes it may not be a + * 'heartbeat' message for RS, for example, we have a large row with many cells and size limit is + * exceeded before sending all the cells for this row. For RS it does send some data to us and the + * time limit has not been reached, but we can not return the data to client so here we call this + * method to tell client we have already received something. * <p> * This method give you a chance to terminate a slow scan operation. * @param controller used to suspend or terminate the scan. Notice that the {@code controller} http://git-wip-us.apache.org/repos/asf/hbase/blob/4c74a73d/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestResultScannerCursor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestResultScannerCursor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestResultScannerCursor.java new file mode 100644 index 0000000..3df7a7b --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestResultScannerCursor.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; + +import org.junit.Test; + +public abstract class AbstractTestResultScannerCursor extends AbstractTestScanCursor { + + protected abstract ResultScanner getScanner(Scan scan) throws Exception; + + @Test + public void testHeartbeatWithSparseFilter() throws Exception { + try (ResultScanner scanner = getScanner(createScanWithSparseFilter())) { + int num = 0; + Result r; + while ((r = scanner.next()) != null) { + if (num < (NUM_ROWS - 1) * NUM_FAMILIES * NUM_QUALIFIERS) { + assertTrue(r.isCursor()); + assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS], + r.getCursor().getRow()); + } else { + assertFalse(r.isCursor()); + assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS], r.getRow()); + } + num++; + } + } + } + + @Test + public void testHeartbeatWithSparseFilterReversed() throws Exception { + try (ResultScanner scanner = getScanner(createReversedScanWithSparseFilter())) { + int num = 0; + Result r; + while ((r = scanner.next()) != null) { + if (num < (NUM_ROWS - 1) * NUM_FAMILIES * NUM_QUALIFIERS) { + assertTrue(r.isCursor()); + assertArrayEquals(ROWS[NUM_ROWS - 1 - num / NUM_FAMILIES / NUM_QUALIFIERS], + r.getCursor().getRow()); + } else { + assertFalse(r.isCursor()); + assertArrayEquals(ROWS[0], r.getRow()); + } + num++; + } + } + } + + @Test + public void testSizeLimit() throws IOException { + try (ResultScanner scanner = + TEST_UTIL.getConnection().getTable(TABLE_NAME).getScanner(createScanWithSizeLimit())) { + int num = 0; + Result r; + while ((r = scanner.next()) != null) { + if (num % (NUM_FAMILIES * NUM_QUALIFIERS) != (NUM_FAMILIES * NUM_QUALIFIERS) - 1) { + assertTrue(r.isCursor()); + assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS], + r.getCursor().getRow()); + } else { + assertFalse(r.isCursor()); + assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS], r.getRow()); + } + num++; + } + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/4c74a73d/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncResultScannerCursor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncResultScannerCursor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncResultScannerCursor.java new file mode 100644 index 0000000..5aebb4a --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncResultScannerCursor.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.util.concurrent.ForkJoinPool; + +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; + +@Category({ MediumTests.class, ClientTests.class }) +public class TestAsyncResultScannerCursor extends AbstractTestResultScannerCursor { + + private static AsyncConnection CONN; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + AbstractTestScanCursor.setUpBeforeClass(); + CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); + } + + public static void tearDownAfterClass() throws Exception { + if (CONN != null) { + CONN.close(); + } + AbstractTestScanCursor.tearDownAfterClass(); + } + + @Override + protected ResultScanner getScanner(Scan scan) throws Exception { + return CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()).getScanner(scan); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/4c74a73d/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncScanCursor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncScanCursor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncScanCursor.java index 9caf942..4bca451 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncScanCursor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncScanCursor.java @@ -27,70 +27,83 @@ import java.util.concurrent.ExecutionException; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; @Category({ MediumTests.class, ClientTests.class }) public class TestRawAsyncScanCursor extends AbstractTestScanCursor { + private static AsyncConnection CONN; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + AbstractTestScanCursor.setUpBeforeClass(); + CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); + } + + public static void tearDownAfterClass() throws Exception { + if (CONN != null) { + CONN.close(); + } + AbstractTestScanCursor.tearDownAfterClass(); + } + private void doTest(boolean reversed) throws InterruptedException, ExecutionException, IOException { CompletableFuture<Void> future = new CompletableFuture<>(); - try (AsyncConnection conn = - ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get()) { - RawAsyncTable table = conn.getRawTable(TABLE_NAME); - table.scan(reversed ? createReversedScanWithSparseFilter() : createScanWithSparseFilter(), - new RawScanResultConsumer() { - - private int count; - - @Override - public void onHeartbeat(ScanController controller) { - int row = count / NUM_FAMILIES / NUM_QUALIFIERS; - if (reversed) { - row = NUM_ROWS - 1 - row; - } - try { - assertArrayEquals(ROWS[row], controller.cursor().get().getRow()); - count++; - } catch (Throwable e) { - future.completeExceptionally(e); - throw e; - } + RawAsyncTable table = CONN.getRawTable(TABLE_NAME); + table.scan(reversed ? createReversedScanWithSparseFilter() : createScanWithSparseFilter(), + new RawScanResultConsumer() { + + private int count; + + @Override + public void onHeartbeat(ScanController controller) { + int row = count / NUM_FAMILIES / NUM_QUALIFIERS; + if (reversed) { + row = NUM_ROWS - 1 - row; + } + try { + assertArrayEquals(ROWS[row], controller.cursor().get().getRow()); + count++; + } catch (Throwable e) { + future.completeExceptionally(e); + throw e; } + } - @Override - public void onNext(Result[] results, ScanController controller) { - try { - assertEquals(1, results.length); - assertEquals(NUM_ROWS - 1, count / NUM_FAMILIES / NUM_QUALIFIERS); - // we will always provide a scan cursor if time limit is reached. - if (count == NUM_ROWS * NUM_FAMILIES * NUM_QUALIFIERS - 1) { - assertFalse(controller.cursor().isPresent()); - } else { - assertArrayEquals(ROWS[reversed ? 0 : NUM_ROWS - 1], - controller.cursor().get().getRow()); - } - assertArrayEquals(ROWS[reversed ? 0 : NUM_ROWS - 1], results[0].getRow()); - count++; - } catch (Throwable e) { - future.completeExceptionally(e); - throw e; + @Override + public void onNext(Result[] results, ScanController controller) { + try { + assertEquals(1, results.length); + assertEquals(NUM_ROWS - 1, count / NUM_FAMILIES / NUM_QUALIFIERS); + // we will always provide a scan cursor if time limit is reached. + if (count == NUM_ROWS * NUM_FAMILIES * NUM_QUALIFIERS - 1) { + assertFalse(controller.cursor().isPresent()); + } else { + assertArrayEquals(ROWS[reversed ? 0 : NUM_ROWS - 1], + controller.cursor().get().getRow()); } + assertArrayEquals(ROWS[reversed ? 0 : NUM_ROWS - 1], results[0].getRow()); + count++; + } catch (Throwable e) { + future.completeExceptionally(e); + throw e; } + } - @Override - public void onError(Throwable error) { - future.completeExceptionally(error); - } + @Override + public void onError(Throwable error) { + future.completeExceptionally(error); + } - @Override - public void onComplete() { - future.complete(null); - } - }); - future.get(); - } + @Override + public void onComplete() { + future.complete(null); + } + }); + future.get(); } @Test @@ -104,4 +117,50 @@ public class TestRawAsyncScanCursor extends AbstractTestScanCursor { throws IOException, InterruptedException, ExecutionException { doTest(true); } + + @Test + public void testSizeLimit() throws InterruptedException, ExecutionException { + CompletableFuture<Void> future = new CompletableFuture<>(); + RawAsyncTable table = CONN.getRawTable(TABLE_NAME); + table.scan(createScanWithSizeLimit(), new RawScanResultConsumer() { + + private int count; + + @Override + public void onHeartbeat(ScanController controller) { + try { + assertArrayEquals(ROWS[count / NUM_FAMILIES / NUM_QUALIFIERS], + controller.cursor().get().getRow()); + count++; + } catch (Throwable e) { + future.completeExceptionally(e); + throw e; + } + } + + @Override + public void onNext(Result[] results, ScanController controller) { + try { + assertFalse(controller.cursor().isPresent()); + assertEquals(1, results.length); + assertArrayEquals(ROWS[count / NUM_FAMILIES / NUM_QUALIFIERS], results[0].getRow()); + count++; + } catch (Throwable e) { + future.completeExceptionally(e); + throw e; + } + } + + @Override + public void onError(Throwable error) { + future.completeExceptionally(error); + } + + @Override + public void onComplete() { + future.complete(null); + } + }); + future.get(); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/4c74a73d/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestResultScannerCursor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestResultScannerCursor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestResultScannerCursor.java new file mode 100644 index 0000000..3b2ef2c --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestResultScannerCursor.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; + +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.experimental.categories.Category; + +@Category({ MediumTests.class, ClientTests.class }) +public class TestResultScannerCursor extends AbstractTestResultScannerCursor { + + @Override + protected ResultScanner getScanner(Scan scan) throws IOException { + return TEST_UTIL.getConnection().getTable(TABLE_NAME).getScanner(scan); + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/4c74a73d/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScanCursor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScanCursor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScanCursor.java deleted file mode 100644 index f7798f0..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScanCursor.java +++ /dev/null @@ -1,90 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.client; - -import java.io.IOException; - -import org.apache.hadoop.hbase.testclassification.ClientTests; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.junit.Assert; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -@Category({ MediumTests.class, ClientTests.class }) -public class TestScanCursor extends AbstractTestScanCursor { - - @Test - public void testHeartbeatWithSparseFilter() throws Exception { - try (ResultScanner scanner = - TEST_UTIL.getConnection().getTable(TABLE_NAME).getScanner(createScanWithSparseFilter())) { - int num = 0; - Result r; - while ((r = scanner.next()) != null) { - if (num < (NUM_ROWS - 1) * NUM_FAMILIES * NUM_QUALIFIERS) { - Assert.assertTrue(r.isCursor()); - Assert.assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS], - r.getCursor().getRow()); - } else { - Assert.assertFalse(r.isCursor()); - Assert.assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS], r.getRow()); - } - num++; - } - } - } - - @Test - public void testHeartbeatWithSparseFilterReversed() throws Exception { - try (ResultScanner scanner = TEST_UTIL.getConnection().getTable(TABLE_NAME) - .getScanner(createReversedScanWithSparseFilter())) { - int num = 0; - Result r; - while ((r = scanner.next()) != null) { - if (num < (NUM_ROWS - 1) * NUM_FAMILIES * NUM_QUALIFIERS) { - Assert.assertTrue(r.isCursor()); - Assert.assertArrayEquals(ROWS[NUM_ROWS - 1 - num / NUM_FAMILIES / NUM_QUALIFIERS], - r.getCursor().getRow()); - } else { - Assert.assertFalse(r.isCursor()); - Assert.assertArrayEquals(ROWS[0], r.getRow()); - } - num++; - } - } - } - - @Test - public void testSizeLimit() throws IOException { - try (ResultScanner scanner = - TEST_UTIL.getConnection().getTable(TABLE_NAME).getScanner(createScanWithSizeLimit())) { - int num = 0; - Result r; - while ((r = scanner.next()) != null) { - if (num % (NUM_FAMILIES * NUM_QUALIFIERS) != (NUM_FAMILIES * NUM_QUALIFIERS) - 1) { - Assert.assertTrue(r.isCursor()); - Assert.assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS], - r.getCursor().getRow()); - } else { - Assert.assertFalse(r.isCursor()); - Assert.assertArrayEquals(ROWS[num / NUM_FAMILIES / NUM_QUALIFIERS], r.getRow()); - } - num++; - } - } - } -}