Repository: phoenix Updated Branches: refs/heads/encodecolumns2 3b6709d05 -> e89337f83
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e89337f8/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java index 0521159..5fa4dfc 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java @@ -26,337 +26,110 @@ import static org.junit.Assert.fail; import java.sql.Connection; import java.sql.DatabaseMetaData; -import java.sql.Date; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; -import java.util.List; import java.util.Properties; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.phoenix.coprocessor.PhoenixTransactionalProcessor; import org.apache.phoenix.end2end.ParallelStatsDisabledIT; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.query.QueryConstants; -import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; -import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTableKey; -import org.apache.phoenix.schema.types.PInteger; -import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.StringUtil; import org.apache.phoenix.util.TestUtil; import org.apache.tephra.TxConstants; -import org.junit.Ignore; import org.junit.Test; -import com.google.common.collect.Lists; +public class TransactionIT extends ParallelStatsDisabledIT { -public class TransactionIT extends ParallelStatsDisabledIT { - @Test - public void testReadOwnWrites() throws Exception { - String transTableName = generateUniqueName(); - String fullTableName = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + transTableName; - String selectSql = "SELECT * FROM "+ fullTableName; - try (Connection conn = DriverManager.getConnection(getUrl())) { - TestUtil.createTransactionalTable(conn, fullTableName); - conn.setAutoCommit(false); - ResultSet rs = conn.createStatement().executeQuery(selectSql); - assertFalse(rs.next()); - - String upsert = "UPSERT INTO " + fullTableName + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk) VALUES(?, ?, ?, ?, ?, ?)"; - PreparedStatement stmt = conn.prepareStatement(upsert); - // upsert two rows - TestUtil.setRowKeyColumns(stmt, 1); - stmt.execute(); - TestUtil.setRowKeyColumns(stmt, 2); - stmt.execute(); - - // verify rows can be read even though commit has not been called - rs = conn.createStatement().executeQuery(selectSql); - TestUtil.validateRowKeyColumns(rs, 1); - TestUtil.validateRowKeyColumns(rs, 2); - assertFalse(rs.next()); - - conn.commit(); - - // verify rows can be read after commit - rs = conn.createStatement().executeQuery(selectSql); - TestUtil.validateRowKeyColumns(rs, 1); - TestUtil.validateRowKeyColumns(rs, 2); - assertFalse(rs.next()); - } + public void testReCreateTxnTableAfterDroppingExistingNonTxnTable() throws SQLException { + String tableName = generateUniqueName(); + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + conn.setAutoCommit(false); + Statement stmt = conn.createStatement(); + stmt.execute("CREATE TABLE " + tableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"); + stmt.execute("DROP TABLE " + tableName); + stmt.execute("CREATE TABLE " + tableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) TRANSACTIONAL=true"); + stmt.execute("CREATE INDEX " + tableName + "_IDX ON " + tableName + " (v1) INCLUDE(v2)"); + assertTrue(conn.unwrap(PhoenixConnection.class).getTable(new PTableKey(null, tableName)).isTransactional()); + assertTrue(conn.unwrap(PhoenixConnection.class).getTable(new PTableKey(null, tableName + "_IDX")).isTransactional()); } @Test - public void testTxnClosedCorrecty() throws Exception { - String transTableName = generateUniqueName(); - String fullTableName = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + transTableName; - String selectSql = "SELECT * FROM "+fullTableName; - try (Connection conn = DriverManager.getConnection(getUrl())) { - TestUtil.createTransactionalTable(conn, fullTableName); + public void testRowTimestampDisabled() throws SQLException { + String tableName = generateUniqueName(); + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { conn.setAutoCommit(false); - ResultSet rs = conn.createStatement().executeQuery(selectSql); - assertFalse(rs.next()); - - String upsert = "UPSERT INTO " + fullTableName + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk) VALUES(?, ?, ?, ?, ?, ?)"; - PreparedStatement stmt = conn.prepareStatement(upsert); - // upsert two rows - TestUtil.setRowKeyColumns(stmt, 1); - stmt.execute(); - TestUtil.setRowKeyColumns(stmt, 2); - stmt.execute(); - - // verify rows can be read even though commit has not been called - rs = conn.createStatement().executeQuery(selectSql); - TestUtil.validateRowKeyColumns(rs, 1); - TestUtil.validateRowKeyColumns(rs, 2); - // Long currentTx = rs.unwrap(PhoenixResultSet.class).getCurrentRow().getValue(0).getTimestamp(); - assertFalse(rs.next()); - - conn.close(); - // start new connection - // conn.createStatement().executeQuery(selectSql); - // assertFalse("This transaction should not be on the invalid transactions", - // txManager.getCurrentState().getInvalid().contains(currentTx)); - } - } - - @Test - public void testAutoCommitQuerySingleTable() throws Exception { - String transTableName = generateUniqueName(); - String fullTableName = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + transTableName; - try (Connection conn = DriverManager.getConnection(getUrl())) { - TestUtil.createTransactionalTable(conn, fullTableName); - conn.setAutoCommit(true); - // verify no rows returned - ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + fullTableName); - assertFalse(rs.next()); - } - } - - @Test - public void testAutoCommitQueryMultiTables() throws Exception { - String transTableName = generateUniqueName(); - String fullTableName = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + transTableName; - try (Connection conn = DriverManager.getConnection(getUrl())) { - TestUtil.createTransactionalTable(conn, fullTableName); - conn.setAutoCommit(true); - // verify no rows returned - ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + fullTableName + " a JOIN " + fullTableName + " b ON (a.long_pk = b.int_pk)"); - assertFalse(rs.next()); - } - } - - @Test - public void testColConflicts() throws Exception { - String transTableName = generateUniqueName(); - String fullTableName = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + transTableName; - try (Connection conn1 = DriverManager.getConnection(getUrl()); - Connection conn2 = DriverManager.getConnection(getUrl())) { - TestUtil.createTransactionalTable(conn1, fullTableName); - conn1.setAutoCommit(false); - conn2.setAutoCommit(false); - String selectSql = "SELECT * FROM "+fullTableName; - conn1.setAutoCommit(false); - ResultSet rs = conn1.createStatement().executeQuery(selectSql); - assertFalse(rs.next()); - // upsert row using conn1 - String upsertSql = "UPSERT INTO " + fullTableName + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk, a.int_col1) VALUES(?, ?, ?, ?, ?, ?, ?)"; - PreparedStatement stmt = conn1.prepareStatement(upsertSql); - TestUtil.setRowKeyColumns(stmt, 1); - stmt.setInt(7, 10); - stmt.execute(); - // upsert row using conn2 - stmt = conn2.prepareStatement(upsertSql); - TestUtil.setRowKeyColumns(stmt, 1); - stmt.setInt(7, 11); - stmt.execute(); - - conn1.commit(); - //second commit should fail + Statement stmt = conn.createStatement(); try { - conn2.commit(); + stmt.execute("CREATE TABLE " + tableName + "(k VARCHAR, v VARCHAR, d DATE NOT NULL, CONSTRAINT PK PRIMARY KEY(k,d ROW_TIMESTAMP)) TRANSACTIONAL=true"); fail(); - } - catch (SQLException e) { - assertEquals(e.getErrorCode(), SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION.getErrorCode()); } - } - } - - private void testRowConflicts(String fullTableName) throws Exception { - try (Connection conn1 = DriverManager.getConnection(getUrl()); - Connection conn2 = DriverManager.getConnection(getUrl())) { - conn1.setAutoCommit(false); - conn2.setAutoCommit(false); - String selectSql = "SELECT * FROM "+fullTableName; - conn1.setAutoCommit(false); - ResultSet rs = conn1.createStatement().executeQuery(selectSql); - boolean immutableRows = conn1.unwrap(PhoenixConnection.class).getTable(new PTableKey(null, fullTableName)).isImmutableRows(); - assertFalse(rs.next()); - // upsert row using conn1 - String upsertSql = "UPSERT INTO " + fullTableName + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk, a.int_col1) VALUES(?, ?, ?, ?, ?, ?, ?)"; - PreparedStatement stmt = conn1.prepareStatement(upsertSql); - TestUtil.setRowKeyColumns(stmt, 1); - stmt.setInt(7, 10); - stmt.execute(); - // upsert row using conn2 - upsertSql = "UPSERT INTO " + fullTableName + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk, b.int_col2) VALUES(?, ?, ?, ?, ?, ?, ?)"; - stmt = conn2.prepareStatement(upsertSql); - TestUtil.setRowKeyColumns(stmt, 1); - stmt.setInt(7, 11); - stmt.execute(); - - conn1.commit(); - //second commit should fail + catch(SQLException e) { + assertEquals(SQLExceptionCode.CANNOT_CREATE_TXN_TABLE_WITH_ROW_TIMESTAMP.getErrorCode(), e.getErrorCode()); + } + stmt.execute("CREATE TABLE " + tableName + "(k VARCHAR, v VARCHAR, d DATE NOT NULL, CONSTRAINT PK PRIMARY KEY(k,d ROW_TIMESTAMP))"); try { - conn2.commit(); - if (!immutableRows) fail(); - } - catch (SQLException e) { - if (immutableRows) fail(); - assertEquals(e.getErrorCode(), SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION.getErrorCode()); + stmt.execute("ALTER TABLE " + tableName + " SET TRANSACTIONAL=true"); + fail(); + } + catch(SQLException e) { + assertEquals(SQLExceptionCode.CANNOT_ALTER_TO_BE_TXN_WITH_ROW_TIMESTAMP.getErrorCode(), e.getErrorCode()); } } } @Test - public void testRowConflictDetected() throws Exception { - String transTableName = generateUniqueName(); - String fullTableName = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + transTableName; - Connection conn = DriverManager.getConnection(getUrl()); - TestUtil.createTransactionalTable(conn, fullTableName); - testRowConflicts(fullTableName); - } - - @Test - public void testNoConflictDetectionForImmutableRows() throws Exception { - String transTableName = generateUniqueName(); - String fullTableName = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + transTableName; - Connection conn = DriverManager.getConnection(getUrl()); - TestUtil.createTransactionalTable(conn, fullTableName); - conn.createStatement().execute("ALTER TABLE " + fullTableName + " SET IMMUTABLE_ROWS=true"); - testRowConflicts(fullTableName); - } - - @Test - public void testNonTxToTxTable() throws Exception { - String nonTxTableName = generateUniqueName(); + public void testTransactionalTableMetadata() throws SQLException { - Connection conn = DriverManager.getConnection(getUrl()); - conn.createStatement().execute("CREATE TABLE " + nonTxTableName + "(k INTEGER PRIMARY KEY, v VARCHAR)"); - conn.createStatement().execute("UPSERT INTO " + nonTxTableName + " VALUES (1)"); - conn.createStatement().execute("UPSERT INTO " + nonTxTableName + " VALUES (2, 'a')"); - conn.createStatement().execute("UPSERT INTO " + nonTxTableName + " VALUES (3, 'b')"); - conn.commit(); - - String index = generateUniqueName(); - conn.createStatement().execute("CREATE INDEX " + index + " ON " + nonTxTableName + "(v)"); - // Reset empty column value to an empty value like it is pre-transactions - HTableInterface htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes( nonTxTableName)); - List<Put>puts = Lists.newArrayList(new Put(PInteger.INSTANCE.toBytes(1)), new Put(PInteger.INSTANCE.toBytes(2)), new Put(PInteger.INSTANCE.toBytes(3))); - for (Put put : puts) { - put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, ByteUtil.EMPTY_BYTE_ARRAY); - } - htable.put(puts); - - conn.createStatement().execute("ALTER TABLE " + nonTxTableName + " SET TRANSACTIONAL=true"); - - htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes( nonTxTableName)); - assertTrue(htable.getTableDescriptor().getCoprocessors().contains(PhoenixTransactionalProcessor.class.getName())); - htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(index)); - assertTrue(htable.getTableDescriptor().getCoprocessors().contains(PhoenixTransactionalProcessor.class.getName())); + try (Connection conn = DriverManager.getConnection(getUrl())) { + String transactTableName = generateUniqueName(); + Statement stmt = conn.createStatement(); + stmt.execute("CREATE TABLE " + transactTableName + " (k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) " + + "TRANSACTIONAL=true"); + conn.commit(); - conn.createStatement().execute("UPSERT INTO " + nonTxTableName + " VALUES (4, 'c')"); - ResultSet rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ k FROM " + nonTxTableName + " WHERE v IS NULL"); - assertTrue(conn.unwrap(PhoenixConnection.class).getTable(new PTableKey(null, nonTxTableName)).isTransactional()); - assertTrue(rs.next()); - assertEquals(1,rs.getInt(1)); - assertFalse(rs.next()); - conn.commit(); - - conn.createStatement().execute("UPSERT INTO " + nonTxTableName + " VALUES (5, 'd')"); - rs = conn.createStatement().executeQuery("SELECT k FROM " + nonTxTableName); - assertTrue(conn.unwrap(PhoenixConnection.class).getTable(new PTableKey(null, index)).isTransactional()); - assertTrue(rs.next()); - assertEquals(1,rs.getInt(1)); - assertTrue(rs.next()); - assertEquals(2,rs.getInt(1)); - assertTrue(rs.next()); - assertEquals(3,rs.getInt(1)); - assertTrue(rs.next()); - assertEquals(4,rs.getInt(1)); - assertTrue(rs.next()); - assertEquals(5,rs.getInt(1)); - assertFalse(rs.next()); - conn.rollback(); - - rs = conn.createStatement().executeQuery("SELECT k FROM " + nonTxTableName); - assertTrue(rs.next()); - assertEquals(1,rs.getInt(1)); - assertTrue(rs.next()); - assertEquals(2,rs.getInt(1)); - assertTrue(rs.next()); - assertEquals(3,rs.getInt(1)); - assertTrue(rs.next()); - assertEquals(4,rs.getInt(1)); - assertFalse(rs.next()); + DatabaseMetaData dbmd = conn.getMetaData(); + ResultSet rs = dbmd.getTables(null, null, StringUtil.escapeLike(transactTableName), null); + assertTrue(rs.next()); + assertEquals("Transactional table was not marked as transactional in JDBC API.", + "true", rs.getString(PhoenixDatabaseMetaData.TRANSACTIONAL)); + + String nonTransactTableName = generateUniqueName(); + Statement stmt2 = conn.createStatement(); + stmt2.execute("CREATE TABLE " + nonTransactTableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) "); + conn.commit(); + + ResultSet rs2 = dbmd.getTables(null, null, StringUtil.escapeLike(nonTransactTableName), null); + assertTrue(rs2.next()); + assertEquals("Non-transactional table was marked as transactional in JDBC API.", + "false", rs2.getString(PhoenixDatabaseMetaData.TRANSACTIONAL)); + } } - @Ignore @Test - public void testNonTxToTxTableFailure() throws Exception { - String nonTxTableName = generateUniqueName(); - - Connection conn = DriverManager.getConnection(getUrl()); - // Put table in SYSTEM schema to prevent attempts to update the cache after we disable SYSTEM.CATALOG - conn.createStatement().execute("CREATE TABLE SYSTEM." + nonTxTableName + "(k INTEGER PRIMARY KEY, v VARCHAR)"); - conn.createStatement().execute("UPSERT INTO SYSTEM." + nonTxTableName + " VALUES (1)"); - conn.commit(); - // Reset empty column value to an empty value like it is pre-transactions - HTableInterface htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes("SYSTEM." + nonTxTableName)); - Put put = new Put(PInteger.INSTANCE.toBytes(1)); - put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, ByteUtil.EMPTY_BYTE_ARRAY); - htable.put(put); - - HBaseAdmin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin(); - admin.disableTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME); - try { - // This will succeed initially in updating the HBase metadata, but then will fail when - // the SYSTEM.CATALOG table is attempted to be updated, exercising the code to restore - // the coprocessors back to the non transactional ones. - conn.createStatement().execute("ALTER TABLE SYSTEM." + nonTxTableName + " SET TRANSACTIONAL=true"); + public void testOnDupKeyForTransactionalTable() throws Exception { + // TODO: we should support having a transactional table defined for a connectionless connection + try (Connection conn = DriverManager.getConnection(getUrl())) { + String transactTableName = generateUniqueName(); + conn.createStatement().execute("CREATE TABLE " + transactTableName + " (k integer not null primary key, v bigint) TRANSACTIONAL=true"); + conn.createStatement().execute("UPSERT INTO " + transactTableName + " VALUES(0,0) ON DUPLICATE KEY UPDATE v = v + 1"); fail(); } catch (SQLException e) { - assertTrue(e.getMessage().contains(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME + " is disabled")); - } finally { - admin.enableTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME); - admin.close(); + assertEquals(SQLExceptionCode.CANNOT_USE_ON_DUP_KEY_FOR_TRANSACTIONAL.getErrorCode(), e.getErrorCode()); } - - ResultSet rs = conn.createStatement().executeQuery("SELECT k FROM SYSTEM." + nonTxTableName + " WHERE v IS NULL"); - assertTrue(rs.next()); - assertEquals(1,rs.getInt(1)); - assertFalse(rs.next()); - - htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes("SYSTEM." + nonTxTableName)); - assertFalse(htable.getTableDescriptor().getCoprocessors().contains(PhoenixTransactionalProcessor.class.getName())); - assertEquals(1,conn.unwrap(PhoenixConnection.class).getQueryServices(). - getTableDescriptor(Bytes.toBytes("SYSTEM." + nonTxTableName)). - getFamily(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES).getMaxVersions()); } @Test @@ -439,120 +212,38 @@ public class TransactionIT extends ParallelStatsDisabledIT { } @Test - public void testCreateTableToBeTransactional() throws Exception { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(getUrl(), props); - String t1 = generateUniqueName(); - String t2 = generateUniqueName(); - String ddl = "CREATE TABLE " + t1 + " (k varchar primary key) transactional=true"; - conn.createStatement().execute(ddl); - PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class); - PTable table = pconn.getTable(new PTableKey(null, t1)); - HTableInterface htable = pconn.getQueryServices().getTable(Bytes.toBytes(t1)); - assertTrue(table.isTransactional()); - assertTrue(htable.getTableDescriptor().getCoprocessors().contains(PhoenixTransactionalProcessor.class.getName())); - - try { - ddl = "ALTER TABLE " + t1 + " SET transactional=false"; - conn.createStatement().execute(ddl); - fail(); - } catch (SQLException e) { - assertEquals(SQLExceptionCode.TX_MAY_NOT_SWITCH_TO_NON_TX.getErrorCode(), e.getErrorCode()); - } - - HBaseAdmin admin = pconn.getQueryServices().getAdmin(); - HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(t2)); - desc.addFamily(new HColumnDescriptor(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES)); - admin.createTable(desc); - ddl = "CREATE TABLE " + t2 + " (k varchar primary key) transactional=true"; - conn.createStatement().execute(ddl); - assertEquals(Boolean.TRUE.toString(), admin.getTableDescriptor(TableName.valueOf(t2)).getValue(TxConstants.READ_NON_TX_DATA)); - - // Should be ok, as HBase metadata should match existing metadata. - ddl = "CREATE TABLE IF NOT EXISTS " + t1 + " (k varchar primary key)"; - try { - conn.createStatement().execute(ddl); - fail(); - } catch (SQLException e) { - assertEquals(SQLExceptionCode.TX_MAY_NOT_SWITCH_TO_NON_TX.getErrorCode(), e.getErrorCode()); - } - ddl += " transactional=true"; - conn.createStatement().execute(ddl); - table = pconn.getTable(new PTableKey(null, t1)); - htable = pconn.getQueryServices().getTable(Bytes.toBytes(t1)); - assertTrue(table.isTransactional()); - assertTrue(htable.getTableDescriptor().getCoprocessors().contains(PhoenixTransactionalProcessor.class.getName())); - } - - @Test - public void testCurrentDate() throws Exception { + public void testColConflicts() throws Exception { String transTableName = generateUniqueName(); String fullTableName = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + transTableName; - String selectSql = "SELECT current_date() FROM "+fullTableName; - try (Connection conn = DriverManager.getConnection(getUrl())) { - TestUtil.createTransactionalTable(conn, fullTableName); - conn.setAutoCommit(false); - ResultSet rs = conn.createStatement().executeQuery(selectSql); + try (Connection conn1 = DriverManager.getConnection(getUrl()); + Connection conn2 = DriverManager.getConnection(getUrl())) { + TestUtil.createTransactionalTable(conn1, fullTableName); + conn1.setAutoCommit(false); + conn2.setAutoCommit(false); + String selectSql = "SELECT * FROM "+fullTableName; + conn1.setAutoCommit(false); + ResultSet rs = conn1.createStatement().executeQuery(selectSql); assertFalse(rs.next()); - - String upsert = "UPSERT INTO " + fullTableName + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk) VALUES(?, ?, ?, ?, ?, ?)"; - PreparedStatement stmt = conn.prepareStatement(upsert); - // upsert two rows + // upsert row using conn1 + String upsertSql = "UPSERT INTO " + fullTableName + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk, a.int_col1) VALUES(?, ?, ?, ?, ?, ?, ?)"; + PreparedStatement stmt = conn1.prepareStatement(upsertSql); TestUtil.setRowKeyColumns(stmt, 1); + stmt.setInt(7, 10); + stmt.execute(); + // upsert row using conn2 + stmt = conn2.prepareStatement(upsertSql); + TestUtil.setRowKeyColumns(stmt, 1); + stmt.setInt(7, 11); stmt.execute(); - conn.commit(); - - rs = conn.createStatement().executeQuery(selectSql); - assertTrue(rs.next()); - Date date1 = rs.getDate(1); - assertFalse(rs.next()); - - Thread.sleep(1000); - rs = conn.createStatement().executeQuery(selectSql); - assertTrue(rs.next()); - Date date2 = rs.getDate(1); - assertFalse(rs.next()); - assertTrue("current_date() should change while executing multiple statements", date2.getTime() > date1.getTime()); - } - } - - @Test - public void testReCreateTxnTableAfterDroppingExistingNonTxnTable() throws SQLException { - String tableName = generateUniqueName(); - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(getUrl(), props); - conn.setAutoCommit(false); - Statement stmt = conn.createStatement(); - stmt.execute("CREATE TABLE " + tableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"); - stmt.execute("DROP TABLE " + tableName); - stmt.execute("CREATE TABLE " + tableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) TRANSACTIONAL=true"); - stmt.execute("CREATE INDEX " + tableName + "_IDX ON " + tableName + " (v1) INCLUDE(v2)"); - assertTrue(conn.unwrap(PhoenixConnection.class).getTable(new PTableKey(null, tableName)).isTransactional()); - assertTrue(conn.unwrap(PhoenixConnection.class).getTable(new PTableKey(null, tableName + "_IDX")).isTransactional()); - } - - @Test - public void testRowTimestampDisabled() throws SQLException { - String tableName = generateUniqueName(); - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - try (Connection conn = DriverManager.getConnection(getUrl(), props)) { - conn.setAutoCommit(false); - Statement stmt = conn.createStatement(); - try { - stmt.execute("CREATE TABLE " + tableName + "(k VARCHAR, v VARCHAR, d DATE NOT NULL, CONSTRAINT PK PRIMARY KEY(k,d ROW_TIMESTAMP)) TRANSACTIONAL=true"); - fail(); - } - catch(SQLException e) { - assertEquals(SQLExceptionCode.CANNOT_CREATE_TXN_TABLE_WITH_ROW_TIMESTAMP.getErrorCode(), e.getErrorCode()); - } - stmt.execute("CREATE TABLE " + tableName + "(k VARCHAR, v VARCHAR, d DATE NOT NULL, CONSTRAINT PK PRIMARY KEY(k,d ROW_TIMESTAMP))"); + conn1.commit(); + //second commit should fail try { - stmt.execute("ALTER TABLE " + tableName + " SET TRANSACTIONAL=true"); + conn2.commit(); fail(); - } - catch(SQLException e) { - assertEquals(SQLExceptionCode.CANNOT_ALTER_TO_BE_TXN_WITH_ROW_TIMESTAMP.getErrorCode(), e.getErrorCode()); + } + catch (SQLException e) { + assertEquals(e.getErrorCode(), SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION.getErrorCode()); } } } @@ -600,118 +291,4 @@ public class TransactionIT extends ParallelStatsDisabledIT { conn.close(); } } - - @Test - public void testParallelUpsertSelect() throws Exception { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - props.setProperty(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, Integer.toString(3)); - props.setProperty(QueryServices.SCAN_CACHE_SIZE_ATTRIB, Integer.toString(3)); - props.setProperty(QueryServices.SCAN_RESULT_CHUNK_SIZE, Integer.toString(3)); - Connection conn = DriverManager.getConnection(getUrl(), props); - conn.setAutoCommit(false); - String fullTableName1 = generateUniqueName(); - String fullTableName2 = generateUniqueName(); - String sequenceName = "S_" + generateUniqueName(); - conn.createStatement().execute("CREATE SEQUENCE " + sequenceName); - conn.createStatement().execute("CREATE TABLE " + fullTableName1 + " (pk INTEGER PRIMARY KEY, val INTEGER) SALT_BUCKETS=4,TRANSACTIONAL=true"); - conn.createStatement().execute("CREATE TABLE " + fullTableName2 + " (pk INTEGER PRIMARY KEY, val INTEGER) TRANSACTIONAL=true"); - - for (int i = 0; i < 100; i++) { - conn.createStatement().execute("UPSERT INTO " + fullTableName1 + " VALUES (NEXT VALUE FOR " + sequenceName + ", " + (i%10) + ")"); - } - conn.commit(); - conn.setAutoCommit(true); - int upsertCount = conn.createStatement().executeUpdate("UPSERT INTO " + fullTableName2 + " SELECT pk, val FROM " + fullTableName1); - assertEquals(100,upsertCount); - conn.close(); - } - - @Test - public void testTransactionalTableMetadata() throws SQLException { - - try (Connection conn = DriverManager.getConnection(getUrl())) { - String transactTableName = generateUniqueName(); - Statement stmt = conn.createStatement(); - stmt.execute("CREATE TABLE " + transactTableName + " (k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) " + - "TRANSACTIONAL=true"); - conn.commit(); - - DatabaseMetaData dbmd = conn.getMetaData(); - ResultSet rs = dbmd.getTables(null, null, StringUtil.escapeLike(transactTableName), null); - assertTrue(rs.next()); - assertEquals("Transactional table was not marked as transactional in JDBC API.", - "true", rs.getString(PhoenixDatabaseMetaData.TRANSACTIONAL)); - - String nonTransactTableName = generateUniqueName(); - Statement stmt2 = conn.createStatement(); - stmt2.execute("CREATE TABLE " + nonTransactTableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) "); - conn.commit(); - - ResultSet rs2 = dbmd.getTables(null, null, StringUtil.escapeLike(nonTransactTableName), null); - assertTrue(rs2.next()); - assertEquals("Non-transactional table was marked as transactional in JDBC API.", - "false", rs2.getString(PhoenixDatabaseMetaData.TRANSACTIONAL)); - } - } - - @Test - public void testInflightPartialEval() throws SQLException { - - try (Connection conn = DriverManager.getConnection(getUrl())) { - String transactTableName = generateUniqueName(); - Statement stmt = conn.createStatement(); - stmt.execute("CREATE TABLE " + transactTableName + " (k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) " + - "TRANSACTIONAL=true"); - - try (Connection conn1 = DriverManager.getConnection(getUrl()); Connection conn2 = DriverManager.getConnection(getUrl())) { - conn1.createStatement().execute("UPSERT INTO " + transactTableName + " VALUES ('a','b','x')"); - // Select to force uncommitted data to be written - ResultSet rs = conn1.createStatement().executeQuery("SELECT * FROM " + transactTableName); - assertTrue(rs.next()); - assertEquals("a", rs.getString(1)); - assertEquals("b", rs.getString(2)); - assertFalse(rs.next()); - - conn2.createStatement().execute("UPSERT INTO " + transactTableName + " VALUES ('a','c','x')"); - // Select to force uncommitted data to be written - rs = conn2.createStatement().executeQuery("SELECT * FROM " + transactTableName ); - assertTrue(rs.next()); - assertEquals("a", rs.getString(1)); - assertEquals("c", rs.getString(2)); - assertFalse(rs.next()); - - // If the AndExpression were to see the uncommitted row from conn2, the filter would - // filter the row out early and no longer continue to evaluate other cells due to - // the way partial evaluation holds state. - rs = conn1.createStatement().executeQuery("SELECT * FROM " + transactTableName + " WHERE v1 != 'c' AND v2 = 'x'"); - assertTrue(rs.next()); - assertEquals("a", rs.getString(1)); - assertEquals("b", rs.getString(2)); - assertFalse(rs.next()); - - // Same as above for conn1 data - rs = conn2.createStatement().executeQuery("SELECT * FROM " + transactTableName + " WHERE v1 != 'b' AND v2 = 'x'"); - assertTrue(rs.next()); - assertEquals("a", rs.getString(1)); - assertEquals("c", rs.getString(2)); - assertFalse(rs.next()); - } - - } - } - - - @Test - public void testOnDupKeyForTransactionalTable() throws Exception { - // TODO: we should support having a transactional table defined for a connectionless connection - try (Connection conn = DriverManager.getConnection(getUrl())) { - String transactTableName = generateUniqueName(); - conn.createStatement().execute("CREATE TABLE " + transactTableName + " (k integer not null primary key, v bigint) TRANSACTIONAL=true"); - conn.createStatement().execute("UPSERT INTO " + transactTableName + " VALUES(0,0) ON DUPLICATE KEY UPDATE v = v + 1"); - fail(); - } catch (SQLException e) { - assertEquals(SQLExceptionCode.CANNOT_USE_ON_DUP_KEY_FOR_TRANSACTIONAL.getErrorCode(), e.getErrorCode()); - } - } - } http://git-wip-us.apache.org/repos/asf/phoenix/blob/e89337f8/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java index 14bcd70..da4419d 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java @@ -36,6 +36,7 @@ import org.apache.phoenix.end2end.ParallelStatsDisabledIT; import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.schema.PTableImpl; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.SchemaUtil; import org.apache.tephra.Transaction.VisibilityLevel; @@ -48,12 +49,25 @@ import org.junit.runners.Parameterized.Parameters; public class TxCheckpointIT extends ParallelStatsDisabledIT { private final boolean localIndex; - private final boolean mutable; + private final String tableDDLOptions; - public TxCheckpointIT(boolean localIndex, boolean mutable) { + public TxCheckpointIT(boolean localIndex, boolean mutable, boolean columnEncoded) { + StringBuilder optionBuilder = new StringBuilder(); this.localIndex = localIndex; - this.mutable = mutable; - + if (!columnEncoded) { + if (optionBuilder.length()!=0) + optionBuilder.append(","); + optionBuilder.append("COLUMN_ENCODED_BYTES=0"); + } + if (!mutable) { + if (optionBuilder.length()!=0) + optionBuilder.append(","); + optionBuilder.append("IMMUTABLE_ROWS=true"); + if (!columnEncoded) { + optionBuilder.append(",IMMUTABLE_STORAGE_SCHEME="+PTableImpl.ImmutableStorageScheme.ONE_CELL_PER_COLUMN); + } + } + this.tableDDLOptions = optionBuilder.toString(); } private static Connection getConnection() throws SQLException { @@ -66,10 +80,11 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT { return conn; } - @Parameters(name="TxCheckpointIT_localIndex={0},mutable={1}") // name is used by failsafe as file name in reports + @Parameters(name="TxCheckpointIT_localIndex={0},mutable={1},columnEncoded={2}") // name is used by failsafe as file name in reports public static Collection<Boolean[]> data() { return Arrays.asList(new Boolean[][] { - { false, false }, { false, true }, { true, false }, { true, true } + { false, false, false }, { false, false, true }, { false, true, false }, { false, true, true }, + { true, false, false }, { true, false, true }, { true, true, false }, { true, true, true } }); } @@ -86,7 +101,7 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT { Connection conn = getConnection(props); conn.setAutoCommit(true); conn.createStatement().execute("CREATE SEQUENCE "+seqName); - conn.createStatement().execute("CREATE TABLE " + fullTableName + "(pk INTEGER PRIMARY KEY, val INTEGER)"+(!mutable? " IMMUTABLE_ROWS=true" : "")); + conn.createStatement().execute("CREATE TABLE " + fullTableName + "(pk INTEGER PRIMARY KEY, val INTEGER)" + tableDDLOptions); conn.createStatement().execute("CREATE "+(localIndex? "LOCAL " : "")+"INDEX " + indexName + " ON " + fullTableName + "(val)"); conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES (NEXT VALUE FOR " + seqName + ",1)"); @@ -117,12 +132,11 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT { } private void testRollbackOfUncommittedDelete(String indexDDL, String fullTableName) throws Exception { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); Connection conn = getConnection(); conn.setAutoCommit(false); try { Statement stmt = conn.createStatement(); - stmt.execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"+(!mutable? " IMMUTABLE_ROWS=true" : "")); + stmt.execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)" + tableDDLOptions); stmt.execute(indexDDL); stmt.executeUpdate("upsert into " + fullTableName + " values('x1', 'y1', 'a1')"); @@ -206,13 +220,11 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT { String tableName = "TBL_" + generateUniqueName(); String indexName = "IDX_" + generateUniqueName(); String fullTableName = SchemaUtil.getTableName(tableName, tableName); - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); try (Connection conn = getConnection()) { conn.setAutoCommit(false); Statement stmt = conn.createStatement(); - stmt.execute("CREATE TABLE " + fullTableName + "(ID BIGINT NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)" - + (!mutable ? " IMMUTABLE_ROWS=true" : "")); + stmt.execute("CREATE TABLE " + fullTableName + "(ID BIGINT NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)" + tableDDLOptions); stmt.execute("CREATE " + (localIndex ? "LOCAL " : "") + "INDEX " + indexName + " ON " + fullTableName + " (v1) INCLUDE(v2)"); @@ -301,10 +313,8 @@ public class TxCheckpointIT extends ParallelStatsDisabledIT { try (Connection conn = getConnection()) { conn.setAutoCommit(false); Statement stmt = conn.createStatement(); - stmt.execute("CREATE TABLE " + fullTableName + "1(ID1 BIGINT NOT NULL PRIMARY KEY, FK1A INTEGER, FK1B INTEGER)" - + (!mutable ? " IMMUTABLE_ROWS=true" : "")); - stmt.execute("CREATE TABLE " + fullTableName + "2(ID2 BIGINT NOT NULL PRIMARY KEY, FK2 INTEGER)" - + (!mutable ? " IMMUTABLE_ROWS=true" : "")); + stmt.execute("CREATE TABLE " + fullTableName + "1(ID1 BIGINT NOT NULL PRIMARY KEY, FK1A INTEGER, FK1B INTEGER)" + tableDDLOptions); + stmt.execute("CREATE TABLE " + fullTableName + "2(ID2 BIGINT NOT NULL PRIMARY KEY, FK2 INTEGER)" + tableDDLOptions); stmt.execute("CREATE " + (localIndex ? "LOCAL " : "") + "INDEX " + indexName + " ON " + fullTableName + "1 (FK1B)"); http://git-wip-us.apache.org/repos/asf/phoenix/blob/e89337f8/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java index 49fa53d..32e9f68 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java @@ -16,7 +16,6 @@ * limitations under the License. */ package org.apache.phoenix.compile; - import static org.apache.phoenix.query.QueryConstants.VALUE_COLUMN_FAMILY; import java.sql.SQLException; @@ -172,10 +171,8 @@ public class TupleProjectionCompiler { String aliasedName = tableRef.getTableAlias() == null ? SchemaUtil.getColumnName(table.getName().getString(), colName) : SchemaUtil.getColumnName(tableRef.getTableAlias(), colName); - - PColumn column = new ProjectedColumn(PNameFactory.newName(aliasedName), - retainPKColumns && SchemaUtil.isPKColumn(sourceColumn) ? - null : PNameFactory.newName(VALUE_COLUMN_FAMILY), + PName familyName = SchemaUtil.isPKColumn(sourceColumn) ? (retainPKColumns ? null : PNameFactory.newName(VALUE_COLUMN_FAMILY)) : sourceColumn.getFamilyName(); + PColumn column = new ProjectedColumn(PNameFactory.newName(aliasedName), familyName, position++, sourceColumn.isNullable(), sourceColumnRef, sourceColumn.getColumnQualifierBytes()); projectedColumns.add(column); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/e89337f8/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java index a87b4f2..02ae028 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java @@ -720,26 +720,29 @@ public abstract class BaseTest { } protected static void ensureTableCreated(String url, String tableName) throws SQLException { - ensureTableCreated(url, tableName, tableName, null, null); + ensureTableCreated(url, tableName, tableName, null, null, null); } protected static void ensureTableCreated(String url, String tableName, String tableDDLType) throws SQLException { - ensureTableCreated(url, tableName, tableDDLType, null, null); + ensureTableCreated(url, tableName, tableDDLType, null, null, null); } - public static void ensureTableCreated(String url, String tableName, String tableDDLType, byte[][] splits) throws SQLException { - ensureTableCreated(url, tableName, tableDDLType, splits, null); + public static void ensureTableCreated(String url, String tableName, String tableDDLType, byte[][] splits, String tableDDLOptions) throws SQLException { + ensureTableCreated(url, tableName, tableDDLType, splits, null, tableDDLOptions); } protected static void ensureTableCreated(String url, String tableName, String tableDDLType, Long ts) throws SQLException { - ensureTableCreated(url, tableName, tableDDLType, null, ts); + ensureTableCreated(url, tableName, tableDDLType, null, ts, null); } - protected static void ensureTableCreated(String url, String tableName, String tableDDLType, byte[][] splits, Long ts) throws SQLException { + protected static void ensureTableCreated(String url, String tableName, String tableDDLType, byte[][] splits, Long ts, String tableDDLOptions) throws SQLException { String ddl = tableDDLMap.get(tableDDLType); if(!tableDDLType.equals(tableName)) { ddl = ddl.replace(tableDDLType, tableName); } + if (tableDDLOptions!=null) { + ddl += tableDDLOptions; + } createSchema(url,tableName, ts); createTestTable(url, ddl, splits, ts); } @@ -990,7 +993,7 @@ public abstract class BaseTest { } protected static void initSumDoubleValues(String tableName, byte[][] splits, String url) throws Exception { - ensureTableCreated(url, tableName, SUM_DOUBLE_NAME, splits); + ensureTableCreated(url, tableName, SUM_DOUBLE_NAME, splits, null); Properties props = new Properties(); Connection conn = DriverManager.getConnection(url, props); try { @@ -1058,18 +1061,18 @@ public abstract class BaseTest { } protected static String initATableValues(String tenantId, byte[][] splits, Date date, Long ts, String url) throws Exception { - return initATableValues(null, tenantId, splits, date, ts, url); + return initATableValues(null, tenantId, splits, date, ts, url, null); } - protected static String initATableValues(String tableName, String tenantId, byte[][] splits, Date date, Long ts, String url) throws Exception { + protected static String initATableValues(String tableName, String tenantId, byte[][] splits, Date date, Long ts, String url, String tableDDLOptions) throws Exception { if(tableName == null) { tableName = generateUniqueName(); } String tableDDLType = ATABLE_NAME; if (ts == null) { - ensureTableCreated(url, tableName, tableDDLType, splits); + ensureTableCreated(url, tableName, tableDDLType, splits, null, tableDDLOptions); } else { - ensureTableCreated(url, tableName, tableDDLType, splits, ts-5); + ensureTableCreated(url, tableName, tableDDLType, splits, ts-5, tableDDLOptions); } Properties props = new Properties(); @@ -1291,9 +1294,9 @@ public abstract class BaseTest { private static void initEntityHistoryTableValues(String tenantId, byte[][] splits, Date date, Long ts, String url) throws Exception { if (ts == null) { - ensureTableCreated(url, ENTITY_HISTORY_TABLE_NAME, ENTITY_HISTORY_TABLE_NAME, splits); + ensureTableCreated(url, ENTITY_HISTORY_TABLE_NAME, ENTITY_HISTORY_TABLE_NAME, splits, null); } else { - ensureTableCreated(url, ENTITY_HISTORY_TABLE_NAME, ENTITY_HISTORY_TABLE_NAME, splits, ts-2); + ensureTableCreated(url, ENTITY_HISTORY_TABLE_NAME, ENTITY_HISTORY_TABLE_NAME, splits, ts-2, null); } Properties props = new Properties(); @@ -1395,9 +1398,9 @@ public abstract class BaseTest { protected static void initSaltedEntityHistoryTableValues(String tenantId, byte[][] splits, Date date, Long ts, String url) throws Exception { if (ts == null) { - ensureTableCreated(url, ENTITY_HISTORY_SALTED_TABLE_NAME, ENTITY_HISTORY_SALTED_TABLE_NAME, splits); + ensureTableCreated(url, ENTITY_HISTORY_SALTED_TABLE_NAME, ENTITY_HISTORY_SALTED_TABLE_NAME, splits, null); } else { - ensureTableCreated(url, ENTITY_HISTORY_SALTED_TABLE_NAME, ENTITY_HISTORY_SALTED_TABLE_NAME, splits, ts-2); + ensureTableCreated(url, ENTITY_HISTORY_SALTED_TABLE_NAME, ENTITY_HISTORY_SALTED_TABLE_NAME, splits, ts-2, null); } Properties props = new Properties();