PHOENIX-4335 System catalog snapshot created each time a new connection is created
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/ef39feeb Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/ef39feeb Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/ef39feeb Branch: refs/heads/4.x-HBase-1.1 Commit: ef39feebe0cf3b59537c0d0261657c090abe039c Parents: e811218 Author: James Taylor <jtay...@salesforce.com> Authored: Tue Oct 31 15:55:03 2017 -0700 Committer: James Taylor <jtay...@salesforce.com> Committed: Wed Nov 15 10:46:40 2017 -0800 ---------------------------------------------------------------------- .../phoenix/end2end/SystemCatalogUpgradeIT.java | 121 +++++++++++++++++++ .../phoenix/coprocessor/MetaDataProtocol.java | 12 +- .../query/ConnectionQueryServicesImpl.java | 39 ++++-- .../java/org/apache/phoenix/query/BaseTest.java | 35 ++++-- 4 files changed, 190 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/ef39feeb/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemCatalogUpgradeIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemCatalogUpgradeIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemCatalogUpgradeIT.java new file mode 100644 index 0000000..e5b1d6e --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemCatalogUpgradeIT.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.Map; +import java.util.Properties; + +import org.apache.phoenix.coprocessor.MetaDataProtocol; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo; +import org.apache.phoenix.jdbc.PhoenixTestDriver; +import org.apache.phoenix.query.BaseTest; +import org.apache.phoenix.query.ConnectionQueryServices; +import org.apache.phoenix.query.ConnectionQueryServicesImpl; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesTestImpl; +import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.common.collect.Maps; + +public class SystemCatalogUpgradeIT extends BaseTest { + private static boolean reinitialize; + private static int countUpgradeAttempts; + private static long systemTableVersion = MetaDataProtocol.getPriorVersion(); + + private static class PhoenixUpgradeCountingServices extends ConnectionQueryServicesImpl { + public PhoenixUpgradeCountingServices(QueryServices services, ConnectionInfo connectionInfo, Properties info) { + super(services, connectionInfo, info); + } + + @Override + protected void setUpgradeRequired() { + super.setUpgradeRequired(); + countUpgradeAttempts++; + } + + @Override + protected long getSystemTableVersion() { + return systemTableVersion; + } + + @Override + protected boolean isInitialized() { + return !reinitialize && super.isInitialized(); + } + } + + public static class PhoenixUpgradeCountingDriver extends PhoenixTestDriver { + private ConnectionQueryServices cqs; + private final ReadOnlyProps overrideProps; + + public PhoenixUpgradeCountingDriver(ReadOnlyProps props) { + overrideProps = props; + } + + @Override + public boolean acceptsURL(String url) throws SQLException { + return true; + } + + @Override // public for testing + public synchronized ConnectionQueryServices getConnectionQueryServices(String url, Properties info) throws SQLException { + if (cqs == null) { + cqs = new PhoenixUpgradeCountingServices(new QueryServicesTestImpl(getDefaultProps(), overrideProps), ConnectionInfo.create(url), info); + cqs.init(url, info); + } else if (reinitialize) { + cqs.init(url, info); + reinitialize = false; + } + return cqs; + } + } + + @BeforeClass + public static void doSetup() throws Exception { + Map<String, String> props = Maps.newConcurrentMap(); + props.put(BaseTest.DRIVER_CLASS_NAME_ATTRIB, PhoenixUpgradeCountingDriver.class.getName()); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + } + + @Test + public void testUpgradeOnlyHappensOnce() throws Exception { + ConnectionQueryServices services = DriverManager.getConnection(getUrl()).unwrap(PhoenixConnection.class).getQueryServices(); + assertTrue(services instanceof PhoenixUpgradeCountingServices); + // Check if the timestamp version is changing between the current version and prior version + boolean wasTimestampChanged = systemTableVersion != MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP; + reinitialize = true; + systemTableVersion = MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP; + DriverManager.getConnection(getUrl()); + // Confirm that if the timestamp changed, that an upgrade was performed (and that if it + // didn't, that an upgrade wasn't attempted). + assertEquals(wasTimestampChanged ? 1 : 0, countUpgradeAttempts); + // Confirm that another connection does not increase the number of times upgrade was attempted + DriverManager.getConnection(getUrl()); + assertEquals(wasTimestampChanged ? 1 : 0, countUpgradeAttempts); + } + + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/ef39feeb/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java index 655068d..c4ecc3f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java @@ -19,6 +19,7 @@ package org.apache.phoenix.coprocessor; import java.util.ArrayList; import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.NavigableMap; import java.util.TreeMap; @@ -89,7 +90,8 @@ public abstract class MetaDataProtocol extends MetaDataService { public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_9_0 = MIN_TABLE_TIMESTAMP + 20; public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_10_0 = MIN_TABLE_TIMESTAMP + 25; public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_11_0 = MIN_TABLE_TIMESTAMP + 27; - public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_12_0 = MIN_TABLE_TIMESTAMP + 28; + // Since there's no upgrade code, keep the version the same as the previous version + public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_12_0 = MIN_SYSTEM_TABLE_TIMESTAMP_4_11_0; // MIN_SYSTEM_TABLE_TIMESTAMP needs to be set to the max of all the MIN_SYSTEM_TABLE_TIMESTAMP_* constants public static final long MIN_SYSTEM_TABLE_TIMESTAMP = MIN_SYSTEM_TABLE_TIMESTAMP_4_12_0; @@ -431,6 +433,14 @@ public abstract class MetaDataProtocol extends MetaDataService { } } + public static long getPriorVersion() { + Iterator<Long> iterator = TIMESTAMP_VERSION_MAP.descendingKeySet().iterator(); + if (!iterator.hasNext()) { + return -1; + } + return iterator.next(); + } + public static String getVersion(long serverTimestamp) { /* * It is possible that when clients are trying to run upgrades concurrently, we could be at an intermediate http://git-wip-us.apache.org/repos/asf/phoenix/blob/ef39feeb/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index 4868551..c65fa7a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -2353,13 +2353,38 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement return addColumn(oldMetaConnection, tableName, timestamp, columns, true); } + // Available for testing + protected long getSystemTableVersion() { + return MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP; + } + + // Available for testing + protected void setUpgradeRequired() { + this.upgradeRequired.set(true); + } + + // Available for testing + protected boolean isInitialized() { + return initialized; + } + + // Available for testing + protected void setInitialized(boolean isInitialized) { + initialized = isInitialized; + } + + // Available for testing + protected String getSystemCatalogDML() { + return QueryConstants.CREATE_TABLE_METADATA; + } + @Override public void init(final String url, final Properties props) throws SQLException { try { PhoenixContextExecutor.call(new Callable<Void>() { @Override public Void call() throws Exception { - if (initialized) { + if (isInitialized()) { if (initializationException != null) { // Throw previous initialization exception, as we won't resuse this instance throw initializationException; @@ -2367,7 +2392,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement return null; } synchronized (ConnectionQueryServicesImpl.this) { - if (initialized) { + if (isInitialized()) { if (initializationException != null) { // Throw previous initialization exception, as we won't resuse this instance throw initializationException; @@ -2409,7 +2434,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } Properties scnProps = PropertiesUtil.deepCopy(props); scnProps.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, - Long.toString(MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP)); + Long.toString(getSystemTableVersion())); scnProps.remove(PhoenixRuntime.TENANT_ID_ATTRIB); String globalUrl = JDBCUtil.removeProperty(url, PhoenixRuntime.TENANT_ID_ATTRIB); try (HBaseAdmin hBaseAdmin = getAdmin(); @@ -2417,7 +2442,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement scnProps, newEmptyMetaData())) { try { metaConnection.setRunningUpgrade(true); - metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_TABLE_METADATA); + metaConnection.createStatement().executeUpdate(getSystemCatalogDML()); } catch (NewerTableAlreadyExistsException ignore) { // Ignore, as this will happen if the SYSTEM.CATALOG already exists at this fixed // timestamp. A TableAlreadyExistsException is not thrown, since the table only exists @@ -2425,7 +2450,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } catch (TableAlreadyExistsException e) { long currentServerSideTableTimeStamp = e.getTable().getTimeStamp(); if (currentServerSideTableTimeStamp < MIN_SYSTEM_TABLE_TIMESTAMP) { - ConnectionQueryServicesImpl.this.upgradeRequired.set(true); + setUpgradeRequired(); } } catch (PhoenixIOException e) { if (!Iterables.isEmpty(Iterables.filter(Throwables.getCausalChain(e), AccessDeniedException.class))) { @@ -2484,7 +2509,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement throw initializationException; } } finally { - initialized = true; + setInitialized(true); } } } @@ -2567,7 +2592,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE); boolean snapshotCreated = false; try { - if (!ConnectionQueryServicesImpl.this.upgradeRequired.get()) { + if (!isUpgradeRequired()) { throw new UpgradeNotRequiredException(); } Properties scnProps = PropertiesUtil.deepCopy(props); http://git-wip-us.apache.org/repos/asf/phoenix/blob/ef39feeb/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java index 5b09cad..b1b4396 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java @@ -79,6 +79,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.IOException; +import java.lang.reflect.Constructor; import java.math.BigDecimal; import java.sql.Connection; import java.sql.DatabaseMetaData; @@ -163,6 +164,8 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; */ public abstract class BaseTest { + public static final String DRIVER_CLASS_NAME_ATTRIB = "phoenix.driver.class.name"; + private static final Map<String,String> tableDDLMap; private static final Logger logger = LoggerFactory.getLogger(BaseTest.class); protected static final int DEFAULT_TXN_TIMEOUT_SECONDS = 30; @@ -440,7 +443,7 @@ public abstract class BaseTest { * @return url to be used by clients to connect to the cluster. * @throws IOException */ - protected static String setUpTestCluster(@Nonnull Configuration conf, ReadOnlyProps overrideProps) throws IOException { + protected static String setUpTestCluster(@Nonnull Configuration conf, ReadOnlyProps overrideProps) throws Exception { boolean isDistributedCluster = isDistributedClusterModeEnabled(conf); if (!isDistributedCluster) { return initMiniCluster(conf, overrideProps); @@ -538,8 +541,9 @@ public abstract class BaseTest { * Initialize the mini cluster using phoenix-test specific configuration. * @param overrideProps TODO * @return url to be used by clients to connect to the mini cluster. + * @throws Exception */ - private static String initMiniCluster(Configuration conf, ReadOnlyProps overrideProps) { + private static String initMiniCluster(Configuration conf, ReadOnlyProps overrideProps) throws Exception { setUpConfigForMiniCluster(conf, overrideProps); utility = new HBaseTestingUtility(conf); try { @@ -559,8 +563,9 @@ public abstract class BaseTest { * Initialize the cluster in distributed mode * @param overrideProps TODO * @return url to be used by clients to connect to the mini cluster. + * @throws Exception */ - private static String initClusterDistributedMode(Configuration conf, ReadOnlyProps overrideProps) { + private static String initClusterDistributedMode(Configuration conf, ReadOnlyProps overrideProps) throws Exception { setTestConfigForDistribuedCluster(conf, overrideProps); try { IntegrationTestingUtility util = new IntegrationTestingUtility(conf); @@ -572,13 +577,13 @@ public abstract class BaseTest { return JDBC_PROTOCOL + JDBC_PROTOCOL_TERMINATOR + PHOENIX_TEST_DRIVER_URL_PARAM; } - private static void setTestConfigForDistribuedCluster(Configuration conf, ReadOnlyProps overrideProps) { + private static void setTestConfigForDistribuedCluster(Configuration conf, ReadOnlyProps overrideProps) throws Exception { setDefaultTestConfig(conf, overrideProps); } - private static void setDefaultTestConfig(Configuration conf, ReadOnlyProps overrideProps) { + private static void setDefaultTestConfig(Configuration conf, ReadOnlyProps overrideProps) throws Exception { ConfigUtil.setReplicationConfigIfAbsent(conf); - QueryServices services = new PhoenixTestDriver().getQueryServices(); + QueryServices services = newTestDriver(overrideProps).getQueryServices(); for (Entry<String,String> entry : services.getProps()) { conf.set(entry.getKey(), entry.getValue()); } @@ -595,11 +600,11 @@ public abstract class BaseTest { } } - public static Configuration setUpConfigForMiniCluster(Configuration conf) { + public static Configuration setUpConfigForMiniCluster(Configuration conf) throws Exception { return setUpConfigForMiniCluster(conf, ReadOnlyProps.EMPTY_PROPS); } - public static Configuration setUpConfigForMiniCluster(Configuration conf, ReadOnlyProps overrideProps) { + public static Configuration setUpConfigForMiniCluster(Configuration conf, ReadOnlyProps overrideProps) throws Exception { assertNotNull(conf); setDefaultTestConfig(conf, overrideProps); /* @@ -626,12 +631,24 @@ public abstract class BaseTest { return conf; } + private static PhoenixTestDriver newTestDriver(ReadOnlyProps props) throws Exception { + PhoenixTestDriver newDriver; + String driverClassName = props.get(DRIVER_CLASS_NAME_ATTRIB); + if (driverClassName == null) { + newDriver = new PhoenixTestDriver(props); + } else { + Class<?> clazz = Class.forName(driverClassName); + Constructor constr = clazz.getConstructor(ReadOnlyProps.class); + newDriver = (PhoenixTestDriver)constr.newInstance(props); + } + return newDriver; + } /** * Create a {@link PhoenixTestDriver} and register it. * @return an initialized and registered {@link PhoenixTestDriver} */ public static PhoenixTestDriver initAndRegisterTestDriver(String url, ReadOnlyProps props) throws Exception { - PhoenixTestDriver newDriver = new PhoenixTestDriver(props); + PhoenixTestDriver newDriver = newTestDriver(props); DriverManager.registerDriver(newDriver); Driver oldDriver = DriverManager.getDriver(url); if (oldDriver != newDriver) {