This is an automated email from the ASF dual-hosted git repository. vjasani pushed a commit to branch 4.16 in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.16 by this push: new 2605cc2 PHOENIX-6506 : Tenant Connection is not able to access/validate Global Sequences (#1270) (#1261) 2605cc2 is described below commit 2605cc233e206678a72e02322a76b4b0bbf180f7 Author: Lokesh Khurana <khuranalokes...@gmail.com> AuthorDate: Thu Jul 29 21:20:33 2021 +0530 PHOENIX-6506 : Tenant Connection is not able to access/validate Global Sequences (#1270) (#1261) Signed-off-by: Viraj Jasani <vjas...@apache.org> --- .../org/apache/phoenix/end2end/UpsertValuesIT.java | 148 ++++++++++++++++++++- .../phoenix/query/ConnectionQueryServicesImpl.java | 40 +++++- 2 files changed, 186 insertions(+), 2 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java index e89cc58..32662ec 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java @@ -44,11 +44,15 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.schema.SequenceNotFoundException; import org.apache.phoenix.schema.types.PInteger; import org.apache.phoenix.util.DateUtil; import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.TestUtil; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.PhoenixRuntime; +import org.junit.Assert; import org.junit.Test; @@ -670,5 +674,147 @@ public class UpsertValuesIT extends ParallelStatsDisabledIT { assertTrue(next.containsColumn(Bytes.toBytes("CF2"), PInteger.INSTANCE.toBytes(3))); } } - + + + @Test + public void testUpsertValueWithDiffSequenceAndConnections() throws Exception { + String tableName = generateUniqueName(); + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.setAutoCommit(true); + PreparedStatement createTableStatement = conn.prepareStatement(String.format("CREATE TABLE IF NOT EXISTS " + + "%s (SERVICE VARCHAR NOT NULL, SEQUENCE_NUMBER BIGINT NOT NULL , " + + "CONSTRAINT PK PRIMARY KEY (SERVICE, SEQUENCE_NUMBER)) MULTI_TENANT = TRUE", tableName)); + createTableStatement.execute(); + } + + testGlobalSequenceUpsertWithTenantConnection(tableName); + testGlobalSequenceUpsertWithGlobalConnection(tableName); + testTenantSequenceUpsertWithSameTenantConnection(tableName); + testTenantSequenceUpsertWithDifferentTenantConnection(tableName); + testTenantSequenceUpsertWithGlobalConnection(tableName); + + } + + private void testTenantSequenceUpsertWithGlobalConnection(String tableName) throws Exception { + String sequenceName = generateUniqueSequenceName(); + + try (Connection conn = getTenantConnection("PHOENIX")) { + conn.setAutoCommit(true); + PreparedStatement createSequenceStatement = conn.prepareStatement(String.format("CREATE SEQUENCE " + + "IF NOT EXISTS %s", sequenceName)); + createSequenceStatement.execute(); + } + + try (Connection tenantConn = DriverManager.getConnection(getUrl())) { + tenantConn.setAutoCommit(true); + Statement executeUpdateStatement = tenantConn.createStatement(); + try { + executeUpdateStatement.execute(String.format("UPSERT INTO %s ( SERVICE, SEQUENCE_NUMBER) VALUES " + + "( 'PHOENIX', NEXT VALUE FOR %s)", tableName, sequenceName)); + Assert.fail(); + } catch (SequenceNotFoundException e) { + assertTrue(true); + } catch (Exception e) { + Assert.fail(); + } + } + } + + private void testTenantSequenceUpsertWithDifferentTenantConnection(String tableName) throws Exception { + String sequenceName = generateUniqueSequenceName(); + + try (Connection conn = getTenantConnection("PHOENIX")) { + conn.setAutoCommit(true); + PreparedStatement createSequenceStatement = conn.prepareStatement(String.format("CREATE SEQUENCE " + + "IF NOT EXISTS %s", sequenceName)); + createSequenceStatement.execute(); + } + + try (Connection tenantConn = getTenantConnection("HBASE")) { + tenantConn.setAutoCommit(true); + + Statement executeUpdateStatement = tenantConn.createStatement(); + try { + executeUpdateStatement.execute(String.format("UPSERT INTO %s ( SEQUENCE_NUMBER) VALUES " + + "( NEXT VALUE FOR %s)", tableName, sequenceName)); + Assert.fail(); + } catch (SequenceNotFoundException e) { + assertTrue(true); + } catch (Exception e) { + Assert.fail(); + } + } + } + + private void testTenantSequenceUpsertWithSameTenantConnection(String tableName) throws Exception { + String sequenceName = generateUniqueSequenceName(); + + try (Connection conn = getTenantConnection("ZOOKEEPER")) { + conn.setAutoCommit(true); + PreparedStatement createSequenceStatement = conn.prepareStatement(String.format("CREATE SEQUENCE " + + "IF NOT EXISTS %s", sequenceName)); + createSequenceStatement.execute(); + Statement executeUpdateStatement = conn.createStatement(); + executeUpdateStatement.execute(String.format("UPSERT INTO %s ( SEQUENCE_NUMBER) VALUES " + + "( NEXT VALUE FOR %s)", tableName, sequenceName)); + ResultSet rs = executeUpdateStatement.executeQuery("select * from " + tableName); + assertTrue(rs.next()); + assertEquals("1", rs.getString(1)); + assertFalse(rs.next()); + } + + } + + private void testGlobalSequenceUpsertWithGlobalConnection(String tableName) throws Exception { + String sequenceName = generateUniqueSequenceName(); + + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.setAutoCommit(true); + PreparedStatement createSequenceStatement = conn.prepareStatement(String.format("CREATE SEQUENCE " + + "IF NOT EXISTS %s", sequenceName)); + createSequenceStatement.execute(); + Statement executeUpdateStatement = conn.createStatement(); + executeUpdateStatement.execute(String.format("UPSERT INTO %s ( SERVICE, SEQUENCE_NUMBER) VALUES " + + "( 'PHOENIX', NEXT VALUE FOR %s)", tableName, sequenceName)); + ResultSet rs = executeUpdateStatement.executeQuery("select * from " + tableName); + assertTrue(rs.next()); + assertEquals("HBASE", rs.getString(1)); + assertEquals("1", rs.getString(2)); + assertTrue(rs.next()); + assertEquals("PHOENIX", rs.getString(1)); + assertEquals("1", rs.getString(2)); + assertFalse(rs.next()); + } + } + + private void testGlobalSequenceUpsertWithTenantConnection(String tableName) throws Exception { + String sequenceName = generateUniqueSequenceName(); + + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.setAutoCommit(true); + PreparedStatement createSequenceStatement = conn.prepareStatement(String.format("CREATE SEQUENCE " + + "IF NOT EXISTS %s", sequenceName)); + createSequenceStatement.execute(); + } + + try (Connection tenantConn = getTenantConnection("HBASE")) { + tenantConn.setAutoCommit(true); + + Statement executeUpdateStatement = tenantConn.createStatement(); + executeUpdateStatement.execute(String.format("UPSERT INTO %s ( SEQUENCE_NUMBER) VALUES " + + "( NEXT VALUE FOR %s)", tableName, sequenceName)); + + ResultSet rs = executeUpdateStatement.executeQuery("select * from " + tableName); + assertTrue(rs.next()); + assertEquals("1", rs.getString(1)); + assertFalse(rs.next()); + + } + } + + private static Connection getTenantConnection(String tenantId) throws Exception { + Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES); + props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId); + return DriverManager.getConnection(getUrl(), props); + } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index 7fdffdd..64ae6a6 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -4949,7 +4949,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement for (SequenceAllocation sequenceAllocation : sequenceAllocations) { SequenceKey key = sequenceAllocation.getSequenceKey(); Sequence newSequences = new Sequence(key); - Sequence sequence = sequenceMap.putIfAbsent(key, newSequences); + Sequence sequence = getSequence(sequenceAllocation); if (sequence == null) { sequence = newSequences; } @@ -5022,6 +5022,44 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } } + /** + * checks if sequenceAllocation's sequence there in sequenceMap, also returns Global Sequences + * from Tenant sequenceAllocations + * @param sequenceAllocation + * @return + */ + + private Sequence getSequence(SequenceAllocation sequenceAllocation) { + SequenceKey key = sequenceAllocation.getSequenceKey(); + if (key.getTenantId() == null) { + return sequenceMap.putIfAbsent(key, new Sequence(key)); + } else { + Sequence sequence = sequenceMap.get(key); + if (sequence == null) { + for (Map.Entry<SequenceKey,Sequence> entry : sequenceMap.entrySet()) { + if (compareSequenceKeysWithoutTenant(key, entry.getKey())) { + return entry.getValue(); + } + } + } else { + return sequence; + } + } + return null; + } + + private boolean compareSequenceKeysWithoutTenant(SequenceKey keyToCompare, SequenceKey availableKey) { + if (availableKey.getTenantId() != null) { + return false; + } + boolean sameSchema = keyToCompare.getSchemaName() == null ? availableKey.getSchemaName() == null : + keyToCompare.getSchemaName().equals(availableKey.getSchemaName()); + if (!sameSchema) { + return false; + } + return keyToCompare.getSequenceName().equals(availableKey.getSequenceName()); + } + @Override public void clearTableFromCache(final byte[] tenantId, final byte[] schemaName, final byte[] tableName, final long clientTS) throws SQLException {