This is an automated email from the ASF dual-hosted git repository. tdsilva pushed a commit to branch 4.14-HBase-1.3 in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.14-HBase-1.3 by this push: new e3505d9 PHOENIX-4296: reverse scan in ChunkedResultIterator e3505d9 is described below commit e3505d91e46de1a1756a145d396f27a3c70e927f Author: chfeng <chf...@gmail.com> AuthorDate: Thu May 16 18:41:41 2019 +0800 PHOENIX-4296: reverse scan in ChunkedResultIterator --- .../phoenix/iterate/ChunkedResultIterator.java | 13 +++- .../phoenix/iterate/ChunkedResultIteratorTest.java | 73 ++++++++++++++++++++++ 2 files changed, 83 insertions(+), 3 deletions(-) diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java index acb6c04..1aab2d5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java @@ -58,6 +58,7 @@ public class ChunkedResultIterator implements PeekingResultIterator { private final ParallelIteratorFactory delegateIteratorFactory; private ImmutableBytesWritable lastKey = new ImmutableBytesWritable(); + private ImmutableBytesWritable prevLastKey = new ImmutableBytesWritable(); private final StatementContext context; private final TableRef tableRef; private final long chunkSize; @@ -96,8 +97,9 @@ public class ChunkedResultIterator implements PeekingResultIterator { } } - private ChunkedResultIterator(ParallelIteratorFactory delegateIteratorFactory, MutationState mutationState, - StatementContext context, TableRef tableRef, Scan scan, long chunkSize, ResultIterator scanner, QueryPlan plan) throws SQLException { + private ChunkedResultIterator(ParallelIteratorFactory delegateIteratorFactory, + MutationState mutationState, StatementContext context, TableRef tableRef, Scan scan, + long chunkSize, ResultIterator scanner, QueryPlan plan) throws SQLException { this.delegateIteratorFactory = delegateIteratorFactory; this.context = context; this.tableRef = tableRef; @@ -138,8 +140,12 @@ public class ChunkedResultIterator implements PeekingResultIterator { if (resultIterator.peek() == null && lastKey != null) { resultIterator.close(); scan = ScanUtil.newScan(scan); - if(ScanUtil.isLocalIndex(scan)) { + if (ScanUtil.isLocalIndex(scan)) { scan.setAttribute(SCAN_START_ROW_SUFFIX, ByteUtil.copyKeyBytesIfNecessary(lastKey)); + } else if (ScanUtil.isReversed(scan)) { + // lastKey is the last row the previous iterator meet but not returned. + // for reverse scan, use prevLastKey as the new stopRow. + scan.setStopRow(ByteUtil.copyKeyBytesIfNecessary(prevLastKey)); } else { scan.setStartRow(ByteUtil.copyKeyBytesIfNecessary(lastKey)); } @@ -212,6 +218,7 @@ public class ChunkedResultIterator implements PeekingResultIterator { byte[] currentKey = lastKey.get(); int offset = lastKey.getOffset(); int length = lastKey.getLength(); + prevLastKey.set(lastKey.copyBytes()); newTuple.getKey(lastKey); return Bytes.compareTo(currentKey, offset, length, lastKey.get(), lastKey.getOffset(), lastKey.getLength()) != 0; diff --git a/phoenix-core/src/test/java/org/apache/phoenix/iterate/ChunkedResultIteratorTest.java b/phoenix-core/src/test/java/org/apache/phoenix/iterate/ChunkedResultIteratorTest.java new file mode 100644 index 0000000..18402f0 --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/iterate/ChunkedResultIteratorTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.iterate; + +import static org.apache.phoenix.util.TestUtil.PHOENIX_JDBC_URL; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.util.List; +import java.util.Properties; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.end2end.ParallelStatsDisabledIT; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixStatement; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTableKey; +import org.apache.phoenix.schema.TableRef; +import org.junit.Assert; +import org.junit.Test; + +@SuppressWarnings("deprecated") public class ChunkedResultIteratorTest + extends ParallelStatsDisabledIT { + + @Test + public void testChunked() throws Exception { + Properties props = new Properties(); + props.setProperty(QueryServices.RENEW_LEASE_ENABLED, "false"); + props.setProperty(QueryServices.SCAN_RESULT_CHUNK_SIZE, "2"); + Connection conn = DriverManager.getConnection(PHOENIX_JDBC_URL, props); + String tableName = generateUniqueName(); + + conn.createStatement().execute("CREATE TABLE " + tableName + + " (A UNSIGNED_LONG NOT NULL PRIMARY KEY, B VARCHAR(10))"); + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES (1, 'A')"); + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES (2, 'B')"); + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES (3, 'C')"); + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES (4, 'D')"); + conn.commit(); + + + String sql = "SELECT A, B FROM " + tableName + " ORDER BY A DESC"; + PhoenixStatement stmt = conn.createStatement().unwrap(PhoenixStatement.class); + ResultSet rs = stmt.executeQuery(sql); + + int cnt = 0; + while ((rs.next())) { + cnt++; + assertTrue("too many results returned", cnt <= 4); + } + assertEquals(4, cnt); + } +}