This is an automated email from the ASF dual-hosted git repository. rajeshbabu pushed a commit to branch 4.x-HBase-1.3 in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x-HBase-1.3 by this push: new 78eb884 PHOENIX-5308 Unable to run the some end2end tests in real cluster mainly the once using accessing hbase internals from minihbasecluster or custom coprocessors(Rajeshbabu) 78eb884 is described below commit 78eb8841b7a5d28e2591dc89adef89f692e98220 Author: Rajeshbabu Chintaguntla <Rajeshbabu Chintaguntla> AuthorDate: Mon Jun 17 16:15:07 2019 +0530 PHOENIX-5308 Unable to run the some end2end tests in real cluster mainly the once using accessing hbase internals from minihbasecluster or custom coprocessors(Rajeshbabu) --- .../end2end/ConcurrentMutationsExtendedIT.java | 404 +++++++++++++++++++++ .../phoenix/end2end/ConcurrentMutationsIT.java | 343 +---------------- .../end2end/index/MutableIndexExtendedIT.java | 184 ++++++++++ .../phoenix/end2end/index/MutableIndexIT.java | 175 --------- .../phoenix/end2end/join/HashJoinCacheIT.java | 3 + 5 files changed, 592 insertions(+), 517 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java new file mode 100644 index 0000000..571961d --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java @@ -0,0 +1,404 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end; + +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseIOException; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver; +import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.util.*; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.io.IOException; +import java.sql.*; +import java.util.Properties; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.*; + +@RunWith(RunUntilFailure.class) @Category(NeedsOwnMiniClusterTest.class) +public class ConcurrentMutationsExtendedIT extends ParallelStatsDisabledIT { + + private static final Random RAND = new Random(5); + private static final String MVCC_LOCK_TEST_TABLE_PREFIX = "MVCCLOCKTEST_"; + private static final String LOCK_TEST_TABLE_PREFIX = "LOCKTEST_"; + private static final int ROW_LOCK_WAIT_TIME = 10000; + + private final Object lock = new Object(); + + @Test + public void testSynchronousDeletesAndUpsertValues() throws Exception { + final String tableName = generateUniqueName(); + final String indexName = generateUniqueName(); + Connection conn = DriverManager.getConnection(getUrl()); + conn.createStatement().execute("CREATE TABLE " + tableName + + "(k1 INTEGER NOT NULL, k2 INTEGER NOT NULL, v1 INTEGER, CONSTRAINT pk PRIMARY KEY (k1,k2)) COLUMN_ENCODED_BYTES = 0"); + TestUtil.addCoprocessor(conn, tableName, DelayingRegionObserver.class); + conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + tableName + "(v1)"); + final CountDownLatch doneSignal = new CountDownLatch(2); + Runnable r1 = new Runnable() { + + @Override public void run() { + try { + Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES); + for (int i = 0; i < 50; i++) { + Thread.sleep(20); + synchronized (lock) { + PhoenixConnection conn = null; + try { + conn = + DriverManager.getConnection(getUrl(), props) + .unwrap(PhoenixConnection.class); + conn.setAutoCommit(true); + conn.createStatement().execute("DELETE FROM " + tableName); + } finally { + if (conn != null) conn.close(); + } + } + } + } catch (SQLException e) { + throw new RuntimeException(e); + } catch (InterruptedException e) { + Thread.interrupted(); + throw new RuntimeException(e); + } finally { + doneSignal.countDown(); + } + } + + }; + Runnable r2 = new Runnable() { + + @Override public void run() { + try { + Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES); + int nRowsToUpsert = 1000; + for (int i = 0; i < nRowsToUpsert; i++) { + synchronized (lock) { + PhoenixConnection conn = null; + try { + conn = + DriverManager.getConnection(getUrl(), props) + .unwrap(PhoenixConnection.class); + conn.createStatement().execute( + "UPSERT INTO " + tableName + " VALUES (" + (i % 10) + + ", 0, 1)"); + if ((i % 20) == 0 || i == nRowsToUpsert - 1) { + conn.commit(); + } + } finally { + if (conn != null) conn.close(); + } + } + } + } catch (SQLException e) { + throw new RuntimeException(e); + } finally { + doneSignal.countDown(); + } + } + + }; + Thread t1 = new Thread(r1); + t1.start(); + Thread t2 = new Thread(r2); + t2.start(); + + doneSignal.await(60, TimeUnit.SECONDS); + IndexScrutiny.scrutinizeIndex(conn, tableName, indexName); + } + + @Test + public void testConcurrentDeletesAndUpsertValues() throws Exception { + final String tableName = generateUniqueName(); + final String indexName = generateUniqueName(); + Connection conn = DriverManager.getConnection(getUrl()); + conn.createStatement().execute("CREATE TABLE " + tableName + + "(k1 INTEGER NOT NULL, k2 INTEGER NOT NULL, v1 INTEGER, CONSTRAINT pk PRIMARY KEY (k1,k2))"); + TestUtil.addCoprocessor(conn, tableName, DelayingRegionObserver.class); + conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + tableName + "(v1)"); + final CountDownLatch doneSignal = new CountDownLatch(2); + Runnable r1 = new Runnable() { + + @Override public void run() { + try { + Connection conn = DriverManager.getConnection(getUrl()); + conn.setAutoCommit(true); + for (int i = 0; i < 50; i++) { + Thread.sleep(20); + conn.createStatement().execute("DELETE FROM " + tableName); + } + } catch (SQLException e) { + throw new RuntimeException(e); + } catch (InterruptedException e) { + Thread.interrupted(); + throw new RuntimeException(e); + } finally { + doneSignal.countDown(); + } + } + + }; + Runnable r2 = new Runnable() { + + @Override public void run() { + try { + Connection conn = DriverManager.getConnection(getUrl()); + for (int i = 0; i < 1000; i++) { + conn.createStatement().execute( + "UPSERT INTO " + tableName + " VALUES (" + (i % 10) + ", 0, 1)"); + if ((i % 20) == 0) { + conn.commit(); + } + } + conn.commit(); + } catch (SQLException e) { + throw new RuntimeException(e); + } finally { + doneSignal.countDown(); + } + } + + }; + Thread t1 = new Thread(r1); + t1.start(); + Thread t2 = new Thread(r2); + t2.start(); + + doneSignal.await(60, TimeUnit.SECONDS); + IndexScrutiny.scrutinizeIndex(conn, tableName, indexName); + } + + @Test @Repeat(5) + public void testConcurrentUpserts() throws Exception { + int nThreads = 4; + final int batchSize = 200; + final int nRows = 51; + final int nIndexValues = 23; + final String tableName = generateUniqueName(); + final String indexName = generateUniqueName(); + Connection conn = DriverManager.getConnection(getUrl()); + conn.createStatement().execute("CREATE TABLE " + tableName + + "(k1 INTEGER NOT NULL, k2 INTEGER NOT NULL, v1 INTEGER, CONSTRAINT pk PRIMARY KEY (k1,k2)) COLUMN_ENCODED_BYTES = 0, VERSIONS=1"); + TestUtil.addCoprocessor(conn, tableName, DelayingRegionObserver.class); + conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + tableName + "(v1)"); + final CountDownLatch doneSignal = new CountDownLatch(nThreads); + Runnable[] runnables = new Runnable[nThreads]; + for (int i = 0; i < nThreads; i++) { + runnables[i] = new Runnable() { + + @Override public void run() { + try { + Connection conn = DriverManager.getConnection(getUrl()); + for (int i = 0; i < 10000; i++) { + boolean isNull = RAND.nextBoolean(); + int randInt = RAND.nextInt() % nIndexValues; + conn.createStatement().execute( + "UPSERT INTO " + tableName + " VALUES (" + (i % nRows) + ", 0, " + + (isNull ? null : randInt) + ")"); + if ((i % batchSize) == 0) { + conn.commit(); + } + } + conn.commit(); + } catch (SQLException e) { + throw new RuntimeException(e); + } finally { + doneSignal.countDown(); + } + } + + }; + } + for (int i = 0; i < nThreads; i++) { + Thread t = new Thread(runnables[i]); + t.start(); + } + + assertTrue("Ran out of time", doneSignal.await(120, TimeUnit.SECONDS)); + long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, tableName, indexName); + assertEquals(nRows, actualRowCount); + } + + @Test + public void testRowLockDuringPreBatchMutateWhenIndexed() throws Exception { + final String tableName = LOCK_TEST_TABLE_PREFIX + generateUniqueName(); + final String indexName = generateUniqueName(); + Connection conn = DriverManager.getConnection(getUrl()); + + conn.createStatement().execute("CREATE TABLE " + tableName + + "(k VARCHAR PRIMARY KEY, v INTEGER) COLUMN_ENCODED_BYTES = 0"); + TestUtil.addCoprocessor(conn, tableName, DelayingRegionObserver.class); + conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + tableName + "(v)"); + final CountDownLatch doneSignal = new CountDownLatch(2); + final String[] failedMsg = new String[1]; + Runnable r1 = new Runnable() { + + @Override public void run() { + try { + Connection conn = DriverManager.getConnection(getUrl()); + conn.createStatement() + .execute("UPSERT INTO " + tableName + " VALUES ('foo',0)"); + conn.createStatement() + .execute("UPSERT INTO " + tableName + " VALUES ('foo',1)"); + conn.commit(); + } catch (Exception e) { + failedMsg[0] = e.getMessage(); + throw new RuntimeException(e); + } finally { + doneSignal.countDown(); + } + } + + }; + Runnable r2 = new Runnable() { + + @Override public void run() { + try { + Connection conn = DriverManager.getConnection(getUrl()); + conn.createStatement() + .execute("UPSERT INTO " + tableName + " VALUES ('foo',2)"); + conn.createStatement() + .execute("UPSERT INTO " + tableName + " VALUES ('foo',3)"); + conn.commit(); + } catch (Exception e) { + failedMsg[0] = e.getMessage(); + throw new RuntimeException(e); + } finally { + doneSignal.countDown(); + } + } + + }; + Thread t1 = new Thread(r1); + t1.start(); + Thread t2 = new Thread(r2); + t2.start(); + + doneSignal.await(ROW_LOCK_WAIT_TIME + 5000, TimeUnit.SECONDS); + assertNull(failedMsg[0], failedMsg[0]); + long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, tableName, indexName); + assertEquals(1, actualRowCount); + } + + @Test + public void testLockUntilMVCCAdvanced() throws Exception { + final String tableName = MVCC_LOCK_TEST_TABLE_PREFIX + generateUniqueName(); + final String indexName = generateUniqueName(); + Connection conn = DriverManager.getConnection(getUrl()); + conn.createStatement().execute("CREATE TABLE " + tableName + + "(k VARCHAR PRIMARY KEY, v INTEGER) COLUMN_ENCODED_BYTES = 0"); + conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + tableName + "(v,k)"); + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES ('foo',0)"); + conn.commit(); + TestUtil.addCoprocessor(conn, tableName, DelayingRegionObserver.class); + final CountDownLatch doneSignal = new CountDownLatch(2); + final String[] failedMsg = new String[1]; + Runnable r1 = new Runnable() { + + @Override public void run() { + try { + Connection conn = DriverManager.getConnection(getUrl()); + conn.createStatement() + .execute("UPSERT INTO " + tableName + " VALUES ('foo',1)"); + conn.commit(); + } catch (Exception e) { + failedMsg[0] = e.getMessage(); + throw new RuntimeException(e); + } finally { + doneSignal.countDown(); + } + } + + }; + Runnable r2 = new Runnable() { + + @Override public void run() { + try { + Connection conn = DriverManager.getConnection(getUrl()); + conn.createStatement() + .execute("UPSERT INTO " + tableName + " VALUES ('foo',2)"); + conn.commit(); + } catch (Exception e) { + failedMsg[0] = e.getMessage(); + throw new RuntimeException(e); + } finally { + doneSignal.countDown(); + } + } + + }; + Thread t1 = new Thread(r1); + t1.start(); + Thread t2 = new Thread(r2); + t2.start(); + + doneSignal.await(ROW_LOCK_WAIT_TIME + 5000, TimeUnit.SECONDS); + long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, tableName, indexName); + assertEquals(1, actualRowCount); + } + + public static class DelayingRegionObserver extends SimpleRegionObserver { + private volatile boolean lockedTableRow; + + @Override public void postBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, + MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { + try { + String tableName = c.getEnvironment().getRegionInfo().getTable().getNameAsString(); + if (tableName.startsWith(MVCC_LOCK_TEST_TABLE_PREFIX)) { + Thread.sleep(ROW_LOCK_WAIT_TIME + / 2); // Wait long enough that they'll both have the same mvcc + } + } catch (InterruptedException e) { + } + } + + @Override public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, + MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException { + try { + String tableName = c.getEnvironment().getRegionInfo().getTable().getNameAsString(); + if (tableName.startsWith(LOCK_TEST_TABLE_PREFIX)) { + if (lockedTableRow) { + throw new DoNotRetryIOException( + "Expected lock in preBatchMutate to be exclusive, but it wasn't for row " + + Bytes + .toStringBinary(miniBatchOp.getOperation(0).getRow())); + } + lockedTableRow = true; + Thread.sleep(ROW_LOCK_WAIT_TIME + 2000); + } + Thread.sleep(Math.abs(RAND.nextInt()) % 10); + } catch (InterruptedException e) { + } finally { + lockedTableRow = false; + } + + } + } + +} diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java index d1f30c6..f312df0 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java @@ -56,12 +56,7 @@ import org.junit.runner.RunWith; @RunWith(RunUntilFailure.class) public class ConcurrentMutationsIT extends ParallelStatsDisabledIT { - private static final Random RAND = new Random(5); - private static final String MVCC_LOCK_TEST_TABLE_PREFIX = "MVCCLOCKTEST_"; - private static final String LOCK_TEST_TABLE_PREFIX = "LOCKTEST_"; - private static final int ROW_LOCK_WAIT_TIME = 10000; - - private final Object lock = new Object(); + private static class MyClock extends EnvironmentEdge { public volatile long time; @@ -77,342 +72,6 @@ public class ConcurrentMutationsIT extends ParallelStatsDisabledIT { } @Test - public void testSynchronousDeletesAndUpsertValues() throws Exception { - final String tableName = generateUniqueName(); - final String indexName = generateUniqueName(); - Connection conn = DriverManager.getConnection(getUrl()); - conn.createStatement().execute("CREATE TABLE " + tableName + "(k1 INTEGER NOT NULL, k2 INTEGER NOT NULL, v1 INTEGER, CONSTRAINT pk PRIMARY KEY (k1,k2)) COLUMN_ENCODED_BYTES = 0"); - TestUtil.addCoprocessor(conn, tableName, DelayingRegionObserver.class); - conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + tableName + "(v1)"); - final CountDownLatch doneSignal = new CountDownLatch(2); - Runnable r1 = new Runnable() { - - @Override - public void run() { - try { - Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES); - for (int i = 0; i < 50; i++) { - Thread.sleep(20); - synchronized (lock) { - PhoenixConnection conn = null; - try { - conn = DriverManager.getConnection(getUrl(), props).unwrap(PhoenixConnection.class); - conn.setAutoCommit(true); - conn.createStatement().execute("DELETE FROM " + tableName); - } finally { - if (conn != null) conn.close(); - } - } - } - } catch (SQLException e) { - throw new RuntimeException(e); - } catch (InterruptedException e) { - Thread.interrupted(); - throw new RuntimeException(e); - } finally { - doneSignal.countDown(); - } - } - - }; - Runnable r2 = new Runnable() { - - @Override - public void run() { - try { - Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES); - int nRowsToUpsert = 1000; - for (int i = 0; i < nRowsToUpsert; i++) { - synchronized(lock) { - PhoenixConnection conn = null; - try { - conn = DriverManager.getConnection(getUrl(), props).unwrap(PhoenixConnection.class); - conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES (" + (i % 10) + ", 0, 1)"); - if ((i % 20) == 0 || i == nRowsToUpsert-1 ) { - conn.commit(); - } - } finally { - if (conn != null) conn.close(); - } - } - } - } catch (SQLException e) { - throw new RuntimeException(e); - } finally { - doneSignal.countDown(); - } - } - - }; - Thread t1 = new Thread(r1); - t1.start(); - Thread t2 = new Thread(r2); - t2.start(); - - doneSignal.await(60, TimeUnit.SECONDS); - IndexScrutiny.scrutinizeIndex(conn, tableName, indexName); - } - - @Test - public void testConcurrentDeletesAndUpsertValues() throws Exception { - final String tableName = generateUniqueName(); - final String indexName = generateUniqueName(); - Connection conn = DriverManager.getConnection(getUrl()); - conn.createStatement().execute("CREATE TABLE " + tableName + "(k1 INTEGER NOT NULL, k2 INTEGER NOT NULL, v1 INTEGER, CONSTRAINT pk PRIMARY KEY (k1,k2))"); - TestUtil.addCoprocessor(conn, tableName, DelayingRegionObserver.class); - conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + tableName + "(v1)"); - final CountDownLatch doneSignal = new CountDownLatch(2); - Runnable r1 = new Runnable() { - - @Override - public void run() { - try { - Connection conn = DriverManager.getConnection(getUrl()); - conn.setAutoCommit(true); - for (int i = 0; i < 50; i++) { - Thread.sleep(20); - conn.createStatement().execute("DELETE FROM " + tableName); - } - } catch (SQLException e) { - throw new RuntimeException(e); - } catch (InterruptedException e) { - Thread.interrupted(); - throw new RuntimeException(e); - } finally { - doneSignal.countDown(); - } - } - - }; - Runnable r2 = new Runnable() { - - @Override - public void run() { - try { - Connection conn = DriverManager.getConnection(getUrl()); - for (int i = 0; i < 1000; i++) { - conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES (" + (i % 10) + ", 0, 1)"); - if ((i % 20) == 0) { - conn.commit(); - } - } - conn.commit(); - } catch (SQLException e) { - throw new RuntimeException(e); - } finally { - doneSignal.countDown(); - } - } - - }; - Thread t1 = new Thread(r1); - t1.start(); - Thread t2 = new Thread(r2); - t2.start(); - - doneSignal.await(60, TimeUnit.SECONDS); - IndexScrutiny.scrutinizeIndex(conn, tableName, indexName); - } - - @Test - @Repeat(5) - public void testConcurrentUpserts() throws Exception { - int nThreads = 4; - final int batchSize = 200; - final int nRows = 51; - final int nIndexValues = 23; - final String tableName = generateUniqueName(); - final String indexName = generateUniqueName(); - Connection conn = DriverManager.getConnection(getUrl()); - conn.createStatement().execute("CREATE TABLE " + tableName + "(k1 INTEGER NOT NULL, k2 INTEGER NOT NULL, v1 INTEGER, CONSTRAINT pk PRIMARY KEY (k1,k2)) COLUMN_ENCODED_BYTES = 0, VERSIONS=1"); - TestUtil.addCoprocessor(conn, tableName, DelayingRegionObserver.class); - conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + tableName + "(v1)"); - final CountDownLatch doneSignal = new CountDownLatch(nThreads); - Runnable[] runnables = new Runnable[nThreads]; - for (int i = 0; i < nThreads; i++) { - runnables[i] = new Runnable() { - - @Override - public void run() { - try { - Connection conn = DriverManager.getConnection(getUrl()); - for (int i = 0; i < 10000; i++) { - boolean isNull = RAND.nextBoolean(); - int randInt = RAND.nextInt() % nIndexValues; - conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES (" + (i % nRows) + ", 0, " + (isNull ? null : randInt) + ")"); - if ((i % batchSize) == 0) { - conn.commit(); - } - } - conn.commit(); - } catch (SQLException e) { - throw new RuntimeException(e); - } finally { - doneSignal.countDown(); - } - } - - }; - } - for (int i = 0; i < nThreads; i++) { - Thread t = new Thread(runnables[i]); - t.start(); - } - - assertTrue("Ran out of time", doneSignal.await(120, TimeUnit.SECONDS)); - long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, tableName, indexName); - assertEquals(nRows, actualRowCount); - } - - @Test - public void testRowLockDuringPreBatchMutateWhenIndexed() throws Exception { - final String tableName = LOCK_TEST_TABLE_PREFIX + generateUniqueName(); - final String indexName = generateUniqueName(); - Connection conn = DriverManager.getConnection(getUrl()); - - conn.createStatement().execute("CREATE TABLE " + tableName + "(k VARCHAR PRIMARY KEY, v INTEGER) COLUMN_ENCODED_BYTES = 0"); - TestUtil.addCoprocessor(conn, tableName, DelayingRegionObserver.class); - conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + tableName + "(v)"); - final CountDownLatch doneSignal = new CountDownLatch(2); - final String[] failedMsg = new String[1]; - Runnable r1 = new Runnable() { - - @Override - public void run() { - try { - Connection conn = DriverManager.getConnection(getUrl()); - conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES ('foo',0)"); - conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES ('foo',1)"); - conn.commit(); - } catch (Exception e) { - failedMsg[0] = e.getMessage(); - throw new RuntimeException(e); - } finally { - doneSignal.countDown(); - } - } - - }; - Runnable r2 = new Runnable() { - - @Override - public void run() { - try { - Connection conn = DriverManager.getConnection(getUrl()); - conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES ('foo',2)"); - conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES ('foo',3)"); - conn.commit(); - } catch (Exception e) { - failedMsg[0] = e.getMessage(); - throw new RuntimeException(e); - } finally { - doneSignal.countDown(); - } - } - - }; - Thread t1 = new Thread(r1); - t1.start(); - Thread t2 = new Thread(r2); - t2.start(); - - doneSignal.await(ROW_LOCK_WAIT_TIME + 5000, TimeUnit.SECONDS); - assertNull(failedMsg[0], failedMsg[0]); - long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, tableName, indexName); - assertEquals(1, actualRowCount); - } - - @Test - public void testLockUntilMVCCAdvanced() throws Exception { - final String tableName = MVCC_LOCK_TEST_TABLE_PREFIX + generateUniqueName(); - final String indexName = generateUniqueName(); - Connection conn = DriverManager.getConnection(getUrl()); - conn.createStatement().execute("CREATE TABLE " + tableName + "(k VARCHAR PRIMARY KEY, v INTEGER) COLUMN_ENCODED_BYTES = 0"); - conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + tableName + "(v,k)"); - conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES ('foo',0)"); - conn.commit(); - TestUtil.addCoprocessor(conn, tableName, DelayingRegionObserver.class); - final CountDownLatch doneSignal = new CountDownLatch(2); - final String[] failedMsg = new String[1]; - Runnable r1 = new Runnable() { - - @Override - public void run() { - try { - Connection conn = DriverManager.getConnection(getUrl()); - conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES ('foo',1)"); - conn.commit(); - } catch (Exception e) { - failedMsg[0] = e.getMessage(); - throw new RuntimeException(e); - } finally { - doneSignal.countDown(); - } - } - - }; - Runnable r2 = new Runnable() { - - @Override - public void run() { - try { - Connection conn = DriverManager.getConnection(getUrl()); - conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES ('foo',2)"); - conn.commit(); - } catch (Exception e) { - failedMsg[0] = e.getMessage(); - throw new RuntimeException(e); - } finally { - doneSignal.countDown(); - } - } - - }; - Thread t1 = new Thread(r1); - t1.start(); - Thread t2 = new Thread(r2); - t2.start(); - - doneSignal.await(ROW_LOCK_WAIT_TIME + 5000, TimeUnit.SECONDS); - long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, tableName, indexName); - assertEquals(1, actualRowCount); - } - - public static class DelayingRegionObserver extends SimpleRegionObserver { - private volatile boolean lockedTableRow; - - @Override - public void postBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { - try { - String tableName = c.getEnvironment().getRegionInfo().getTable().getNameAsString(); - if (tableName.startsWith(MVCC_LOCK_TEST_TABLE_PREFIX)) { - Thread.sleep(ROW_LOCK_WAIT_TIME/2); // Wait long enough that they'll both have the same mvcc - } - } catch (InterruptedException e) { - } - } - - @Override - public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException { - try { - String tableName = c.getEnvironment().getRegionInfo().getTable().getNameAsString(); - if (tableName.startsWith(LOCK_TEST_TABLE_PREFIX)) { - if (lockedTableRow) { - throw new DoNotRetryIOException("Expected lock in preBatchMutate to be exclusive, but it wasn't for row " + Bytes.toStringBinary(miniBatchOp.getOperation(0).getRow())); - } - lockedTableRow = true; - Thread.sleep(ROW_LOCK_WAIT_TIME + 2000); - } - Thread.sleep(Math.abs(RAND.nextInt()) % 10); - } catch (InterruptedException e) { - } finally { - lockedTableRow = false; - } - - } - } - - @Test @Ignore("PHOENIX-4058 Generate correct index updates when DeleteColumn processed before Put with same timestamp") public void testSetIndexedColumnToNullAndValueAtSameTS() throws Exception { try { diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexExtendedIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexExtendedIT.java new file mode 100644 index 0000000..8676da0 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexExtendedIT.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end.index; + +import jline.internal.Log; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver; +import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; +import org.apache.phoenix.end2end.ParallelStatsDisabledIT; +import org.apache.phoenix.end2end.PartialScannerResultsDisabledIT; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; +import org.apache.phoenix.query.BaseTest; +import org.apache.phoenix.query.ConnectionQueryServices; +import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.schema.PIndexState; +import org.apache.phoenix.util.*; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.*; +import java.sql.Connection; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Properties; + +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.junit.Assert.*; + +@RunWith(Parameterized.class) +@Category(NeedsOwnMiniClusterTest.class) +public class MutableIndexExtendedIT extends ParallelStatsDisabledIT { + + private static final Logger LOGGER = LoggerFactory.getLogger(MutableIndexIT.class); + + protected final boolean localIndex; + protected final String tableDDLOptions; + + public MutableIndexExtendedIT(Boolean localIndex, String txProvider, Boolean columnEncoded) { + this.localIndex = localIndex; + StringBuilder optionBuilder = new StringBuilder(); + if (txProvider != null) { + optionBuilder + .append("TRANSACTIONAL=true," + PhoenixDatabaseMetaData.TRANSACTION_PROVIDER + + "='" + txProvider + "'"); + } + if (!columnEncoded) { + if (optionBuilder.length() != 0) optionBuilder.append(","); + optionBuilder.append("COLUMN_ENCODED_BYTES=0"); + } + this.tableDDLOptions = optionBuilder.toString(); + } + + private static Connection getConnection(Properties props) throws SQLException { + props.setProperty(QueryServices.INDEX_MUTATE_BATCH_SIZE_THRESHOLD_ATTRIB, + Integer.toString(1)); + Connection conn = DriverManager.getConnection(getUrl(), props); + return conn; + } + + protected static Connection getConnection() throws SQLException { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + return getConnection(props); + } + + @Parameterized.Parameters(name = "MutableIndexExtendedIT_localIndex={0},transactionProvider={1},columnEncoded={2}") + // name is used by failsafe as file name in reports + public static Collection<Object[]> data() { + return TestUtil.filterTxParamData(Arrays.asList( + new Object[][] { { false, null, false }, { false, null, true }, + { false, "TEPHRA", false }, { false, "TEPHRA", true }, + { false, "OMID", false }, { true, null, false }, { true, null, true }, + { true, "TEPHRA", false }, { true, "TEPHRA", true }, }), 1); + } + + @Test + public void testIndexHalfStoreFileReader() throws Exception { + if (!localIndex) + return; + + Connection conn1 = getConnection(); + ConnectionQueryServices connectionQueryServices = driver.getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES); + HBaseAdmin admin = connectionQueryServices.getAdmin(); + String tableName = "TBL_" + generateUniqueName(); + String indexName = "IDX_" + generateUniqueName(); + createBaseTable(conn1, tableName, "('e')"); + conn1.createStatement().execute("CREATE "+(localIndex?"LOCAL":"")+" INDEX " + indexName + + " ON " + tableName + "(v1)" + (localIndex?"":" SPLIT ON ('e')")); + conn1.createStatement().execute("UPSERT INTO "+tableName+" values('b',1,2,4,'z')"); + conn1.createStatement().execute("UPSERT INTO "+tableName+" values('f',1,2,3,'z')"); + conn1.createStatement().execute("UPSERT INTO "+tableName+" values('j',2,4,2,'a')"); + conn1.createStatement().execute("UPSERT INTO "+tableName+" values('q',3,1,1,'c')"); + conn1.commit(); + + + String query = "SELECT count(*) FROM " + tableName +" where v1<='z'"; + ResultSet rs = conn1.createStatement().executeQuery(query); + assertTrue(rs.next()); + assertEquals(4, rs.getInt(1)); + + TableName indexTable = TableName.valueOf(localIndex?tableName: indexName); + admin.flush(indexTable); + boolean merged = false; + HTableInterface table = connectionQueryServices.getTable(indexTable.getName()); + // merge regions until 1 left + long numRegions = 0; + while (true) { + rs = conn1.createStatement().executeQuery(query); + assertTrue(rs.next()); + assertEquals(4, rs.getInt(1)); //TODO this returns 5 sometimes instead of 4, duplicate results? + try { + List<HRegionInfo> indexRegions = admin.getTableRegions(indexTable); + numRegions = indexRegions.size(); + if (numRegions==1) { + break; + } + if(!merged) { + List<HRegionInfo> regions = + admin.getTableRegions(indexTable); + LOGGER.info("Merging: " + regions.size()); + admin.mergeRegions(regions.get(0).getEncodedNameAsBytes(), + regions.get(1).getEncodedNameAsBytes(), false); + merged = true; + Threads.sleep(10000); + } + } catch (Exception ex) { + LOGGER.info(ex.getMessage()); + } + long waitStartTime = System.currentTimeMillis(); + // wait until merge happened + while (System.currentTimeMillis() - waitStartTime < 10000) { + List<HRegionInfo> regions = admin.getTableRegions(indexTable); + LOGGER.info("Waiting:" + regions.size()); + if (regions.size() < numRegions) { + break; + } + Threads.sleep(1000); + } + SnapshotTestingUtils.waitForTableToBeOnline(BaseTest.getUtility(), indexTable); + assertTrue("Index table should be online ", admin.isTableAvailable(indexTable)); + } + } + + protected void createBaseTable(Connection conn, String tableName, String splits) + throws SQLException { + String ddl = + "CREATE TABLE " + tableName + " (t_id VARCHAR NOT NULL,\n" + + "k1 INTEGER NOT NULL,\n" + "k2 INTEGER NOT NULL,\n" + "k3 INTEGER,\n" + + "v1 VARCHAR,\n" + "CONSTRAINT pk PRIMARY KEY (t_id, k1, k2))\n" + ( + tableDDLOptions != null ? + tableDDLOptions : + "") + (splits != null ? (" split on " + splits) : ""); + conn.createStatement().execute(ddl); + } +} diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java index 2f4accd..d8726c7 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java @@ -39,29 +39,15 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.regionserver.HStore; -import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; -import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver; import org.apache.phoenix.end2end.ParallelStatsDisabledIT; -import org.apache.phoenix.end2end.PartialScannerResultsDisabledIT; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; -import org.apache.phoenix.query.BaseTest; -import org.apache.phoenix.query.ConnectionQueryServices; -import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; -import org.apache.phoenix.schema.PIndexState; import org.apache.phoenix.schema.PTableKey; import org.apache.phoenix.util.ByteUtil; -import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.util.IndexScrutiny; -import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.QueryUtil; @@ -651,74 +637,6 @@ public class MutableIndexIT extends ParallelStatsDisabledIT { "CREATE " + (localIndex ? "LOCAL" : "")+" INDEX " + indexName + " ON " + tableName + "(v1"+(isReverse?" DESC":"")+") include (k3)"); } - @Test - public void testIndexHalfStoreFileReader() throws Exception { - if (!localIndex) - return; - - Connection conn1 = getConnection(); - ConnectionQueryServices connectionQueryServices = driver.getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES); - HBaseAdmin admin = connectionQueryServices.getAdmin(); - String tableName = "TBL_" + generateUniqueName(); - String indexName = "IDX_" + generateUniqueName(); - createBaseTable(conn1, tableName, "('e')"); - conn1.createStatement().execute("CREATE "+(localIndex?"LOCAL":"")+" INDEX " + indexName + " ON " + tableName + "(v1)" + (localIndex?"":" SPLIT ON ('e')")); - conn1.createStatement().execute("UPSERT INTO "+tableName+" values('b',1,2,4,'z')"); - conn1.createStatement().execute("UPSERT INTO "+tableName+" values('f',1,2,3,'z')"); - conn1.createStatement().execute("UPSERT INTO "+tableName+" values('j',2,4,2,'a')"); - conn1.createStatement().execute("UPSERT INTO "+tableName+" values('q',3,1,1,'c')"); - conn1.commit(); - - - String query = "SELECT count(*) FROM " + tableName +" where v1<='z'"; - ResultSet rs = conn1.createStatement().executeQuery(query); - assertTrue(rs.next()); - assertEquals(4, rs.getInt(1)); - - TableName indexTable = TableName.valueOf(localIndex?tableName: indexName); - admin.flush(indexTable); - boolean merged = false; - HTableInterface table = connectionQueryServices.getTable(indexTable.getName()); - // merge regions until 1 left - long numRegions = 0; - while (true) { - rs = conn1.createStatement().executeQuery(query); - assertTrue(rs.next()); - assertEquals(4, rs.getInt(1)); //TODO this returns 5 sometimes instead of 4, duplicate results? - try { - List<HRegionInfo> indexRegions = admin.getTableRegions(indexTable); - numRegions = indexRegions.size(); - if (numRegions==1) { - break; - } - if(!merged) { - List<HRegionInfo> regions = - admin.getTableRegions(indexTable); - Log.info("Merging: " + regions.size()); - admin.mergeRegions(regions.get(0).getEncodedNameAsBytes(), - regions.get(1).getEncodedNameAsBytes(), false); - merged = true; - Threads.sleep(10000); - } - } catch (Exception ex) { - Log.info(ex); - } - long waitStartTime = System.currentTimeMillis(); - // wait until merge happened - while (System.currentTimeMillis() - waitStartTime < 10000) { - List<HRegionInfo> regions = admin.getTableRegions(indexTable); - Log.info("Waiting:" + regions.size()); - if (regions.size() < numRegions) { - break; - } - Threads.sleep(1000); - } - SnapshotTestingUtils.waitForTableToBeOnline(BaseTest.getUtility(), indexTable); - assertTrue("Index table should be online ", admin.isTableAvailable(indexTable)); - } - } - - private List<HRegionInfo> splitDuringScan(Connection conn1, String tableName, String indexName, String[] strings, HBaseAdmin admin, boolean isReverse) throws SQLException, IOException, InterruptedException { ResultSet rs; @@ -831,99 +749,6 @@ public class MutableIndexIT extends ParallelStatsDisabledIT { } } - // Tests that if major compaction is run on a table with a disabled index, - // deleted cells are kept - // TODO: Move to a different test class? - @Test - public void testCompactDisabledIndex() throws Exception { - if (localIndex || tableDDLOptions.contains("TRANSACTIONAL=true")) - return; - - try (Connection conn = getConnection()) { - String schemaName = generateUniqueName(); - String dataTableName = generateUniqueName() + "_DATA"; - String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName); - String indexTableName = generateUniqueName() + "_IDX"; - String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName); - conn.createStatement().execute( - String.format(PartialScannerResultsDisabledIT.TEST_TABLE_DDL, dataTableFullName)); - conn.createStatement().execute(String.format(PartialScannerResultsDisabledIT.INDEX_1_DDL, - indexTableName, dataTableFullName)); - - //insert a row, and delete it - PartialScannerResultsDisabledIT.writeSingleBatch(conn, 1, 1, dataTableFullName); - conn.createStatement().execute("DELETE FROM " + dataTableFullName); - conn.commit(); - - // disable the index, simulating an index write failure - PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class); - IndexUtil.updateIndexState(pConn, indexTableFullName, PIndexState.DISABLE, - EnvironmentEdgeManager.currentTimeMillis()); - - // major compaction should not remove the deleted row - List<HRegion> regions = getUtility().getHBaseCluster().getRegions(TableName.valueOf(dataTableFullName)); - HRegion hRegion = regions.get(0); - hRegion.flush(true); - HStore store = (HStore) hRegion.getStore(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES); - store.triggerMajorCompaction(); - store.compactRecentForTestingAssumingDefaultPolicy(1); - HTableInterface dataHTI = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(dataTableFullName)); - assertEquals(1, TestUtil.getRawRowCount(dataHTI)); - - // reenable the index - IndexUtil.updateIndexState(pConn, indexTableFullName, PIndexState.INACTIVE, - EnvironmentEdgeManager.currentTimeMillis()); - IndexUtil.updateIndexState(pConn, indexTableFullName, PIndexState.ACTIVE, 0L); - - // now major compaction should remove the deleted row - store.triggerMajorCompaction(); - store.compactRecentForTestingAssumingDefaultPolicy(1); - dataHTI = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(dataTableFullName)); - assertEquals(0, TestUtil.getRawRowCount(dataHTI)); - } - } - - // some tables (e.g. indexes on views) have UngroupedAgg coproc loaded, but don't have a - // corresponding row in syscat. This tests that compaction isn't blocked - // TODO: Move to a different test class? - @Test(timeout=120000) - public void testCompactNonPhoenixTable() throws Exception { - if (localIndex || tableDDLOptions.contains("TRANSACTIONAL=true")) - return; - - try (Connection conn = getConnection()) { - // create a vanilla HBase table (non-Phoenix) - String randomTable = generateUniqueName(); - TableName hbaseTN = TableName.valueOf(randomTable); - byte[] famBytes = Bytes.toBytes("fam"); - HTable hTable = getUtility().createTable(hbaseTN, famBytes); - TestUtil.addCoprocessor(conn, randomTable, UngroupedAggregateRegionObserver.class); - Put put = new Put(Bytes.toBytes("row")); - byte[] value = new byte[1]; - Bytes.random(value); - put.add(famBytes, Bytes.toBytes("colQ"), value); - hTable.put(put); - hTable.flushCommits(); - - // major compaction shouldn't cause a timeout or RS abort - List<HRegion> regions = getUtility().getHBaseCluster().getRegions(hbaseTN); - HRegion hRegion = regions.get(0); - hRegion.flush(true); - HStore store = (HStore) hRegion.getStore(famBytes); - store.triggerMajorCompaction(); - store.compactRecentForTestingAssumingDefaultPolicy(1); - - // we should be able to compact syscat itself as well - regions = getUtility().getHBaseCluster().getRegions(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME)); - hRegion = regions.get(0); - hRegion.flush(true); - store = (HStore) hRegion.getStore(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES); - store.triggerMajorCompaction(); - store.compactRecentForTestingAssumingDefaultPolicy(1); - } - } - - @Test public void testUpsertingDeletedRowShouldGiveProperDataWithIndexes() throws Exception { testUpsertingDeletedRowShouldGiveProperDataWithIndexes(false); diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/HashJoinCacheIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/HashJoinCacheIT.java index c49c61f..d2414c6 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/HashJoinCacheIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/HashJoinCacheIT.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.phoenix.cache.GlobalCache; import org.apache.phoenix.cache.TenantCache; import org.apache.phoenix.coprocessor.HashJoinCacheNotFoundException; +import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.join.HashJoinInfo; import org.apache.phoenix.query.QueryServices; @@ -46,7 +47,9 @@ import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.TestUtil; import org.junit.Test; +import org.junit.experimental.categories.Category; +@Category(NeedsOwnMiniClusterTest.class) public class HashJoinCacheIT extends BaseJoinIT { @Override