PHOENIX-3230 Upgrade code running concurrently on different JVMs could make clients unusuable
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/df0d6117 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/df0d6117 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/df0d6117 Branch: refs/heads/calcite Commit: df0d61179a97881b8b72595af198dbb346adfb9f Parents: 36d500c Author: Samarth <samarth.j...@salesforce.com> Authored: Thu Sep 8 20:20:59 2016 -0700 Committer: Samarth <samarth.j...@salesforce.com> Committed: Thu Sep 8 20:20:59 2016 -0700 ---------------------------------------------------------------------- .../phoenix/coprocessor/MetaDataProtocol.java | 20 +- .../phoenix/exception/SQLExceptionCode.java | 1 + .../query/ConnectionQueryServicesImpl.java | 244 +++++++++++-------- .../apache/phoenix/query/QueryConstants.java | 1 + .../org/apache/phoenix/util/UpgradeUtil.java | 5 +- 5 files changed, 164 insertions(+), 107 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/df0d6117/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java index dce89bd..20922e5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java @@ -19,9 +19,9 @@ package org.apache.phoenix.coprocessor; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.phoenix.coprocessor.generated.MetaDataProtos; @@ -29,7 +29,6 @@ import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataService; import org.apache.phoenix.coprocessor.generated.PFunctionProtos; import org.apache.phoenix.hbase.index.util.VersionUtil; -import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.parse.PFunction; import org.apache.phoenix.parse.PSchema; import org.apache.phoenix.schema.PColumn; @@ -64,7 +63,7 @@ import com.google.protobuf.ByteString; public abstract class MetaDataProtocol extends MetaDataService { public static final int PHOENIX_MAJOR_VERSION = 4; public static final int PHOENIX_MINOR_VERSION = 8; - public static final int PHOENIX_PATCH_NUMBER = 0; + public static final int PHOENIX_PATCH_NUMBER = 1; public static final int PHOENIX_VERSION = VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER); @@ -89,7 +88,7 @@ public abstract class MetaDataProtocol extends MetaDataService { // ALWAYS update this map whenever rolling out a new release (major, minor or patch release). // Key is the SYSTEM.CATALOG timestamp for the version and value is the version string. - public static final Map<Long, String> TIMESTAMP_VERSION_MAP = new HashMap<>(10); + private static final NavigableMap<Long, String> TIMESTAMP_VERSION_MAP = new TreeMap<>(); static { TIMESTAMP_VERSION_MAP.put(MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0, "4.1.x"); TIMESTAMP_VERSION_MAP.put(MIN_SYSTEM_TABLE_TIMESTAMP_4_2_0, "4.2.0"); @@ -100,6 +99,7 @@ public abstract class MetaDataProtocol extends MetaDataService { TIMESTAMP_VERSION_MAP.put(MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0, "4.7.x"); TIMESTAMP_VERSION_MAP.put(MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0, "4.8.x"); } + public static final String CURRENT_CLIENT_VERSION = PHOENIX_MAJOR_VERSION + "." + PHOENIX_MINOR_VERSION + "." + PHOENIX_PATCH_NUMBER; // TODO: pare this down to minimum, as we don't need duplicates for both table and column errors, nor should we need @@ -401,4 +401,14 @@ public abstract class MetaDataProtocol extends MetaDataService { return schema; } } + + public static String getVersion(long serverTimestamp) { + /* + * It is possible that when clients are trying to run upgrades concurrently, we could be at an intermediate + * server timestamp. Using floorKey provides us a range based lookup where the timestamp range for a release is + * [timeStampForRelease, timestampForNextRelease). + */ + String version = TIMESTAMP_VERSION_MAP.get(TIMESTAMP_VERSION_MAP.floorKey(serverTimestamp)); + return version; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/df0d6117/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java index 5a8fffa..0ccecae 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java @@ -370,6 +370,7 @@ public enum SQLExceptionCode { OUTDATED_JARS(2007, "INT09", "Outdated jars."), INDEX_METADATA_NOT_FOUND(2008, "INT10", "Unable to find cached index metadata. "), UNKNOWN_ERROR_CODE(2009, "INT11", "Unknown error code."), + CONCURRENT_UPGRADE_IN_PROGRESS(2010, "INT12", ""), OPERATION_TIMED_OUT(6000, "TIM01", "Operation timed out.", new Factory() { @Override public SQLException newException(SQLExceptionInfo info) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/df0d6117/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index 86217e7..c5d53c3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -19,13 +19,12 @@ package org.apache.phoenix.query; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.hadoop.hbase.HColumnDescriptor.TTL; import static org.apache.phoenix.coprocessor.MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP; +import static org.apache.phoenix.coprocessor.MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0; import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_MAJOR_VERSION; import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_MINOR_VERSION; import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_PATCH_NUMBER; -import static org.apache.phoenix.util.UpgradeUtil.getUpgradeSnapshotName; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME; +import static org.apache.phoenix.coprocessor.MetaDataProtocol.getVersion; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_SEQUENCE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_STATS_NAME; import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_DROP_METADATA; import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RENEW_LEASE_ENABLED; @@ -80,6 +79,7 @@ import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; @@ -88,9 +88,6 @@ import org.apache.hadoop.hbase.ipc.BlockingRpcCallback; import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto; import org.apache.hadoop.hbase.regionserver.IndexHalfStoreFileReaderGenerator; -import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException; -import org.apache.hadoop.hbase.snapshot.SnapshotCreationException; import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; @@ -210,6 +207,7 @@ import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ImmutableList; @@ -2331,140 +2329,141 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement if (upgradeSystemTables) { long currentServerSideTableTimeStamp = e.getTable().getTimeStamp(); sysCatalogTableName = e.getTable().getPhysicalName().getString(); - if (currentServerSideTableTimeStamp < MIN_SYSTEM_TABLE_TIMESTAMP) { + if (currentServerSideTableTimeStamp < MIN_SYSTEM_TABLE_TIMESTAMP && acquireUpgradeMutex(currentServerSideTableTimeStamp)) { snapshotName = getUpgradeSnapshotName(sysCatalogTableName, currentServerSideTableTimeStamp); createSnapshot(snapshotName, sysCatalogTableName); } - String columnsToAdd = ""; - // This will occur if we have an older SYSTEM.CATALOG and we need to update it to include - // any new columns we've added. - if(currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_3_0) { - // We know that we always need to add the STORE_NULLS column for 4.3 release - columnsToAdd = addColumn(columnsToAdd, PhoenixDatabaseMetaData.STORE_NULLS + " " + PBoolean.INSTANCE.getSqlTypeName()); - try (HBaseAdmin admin = getAdmin()) { - HTableDescriptor[] localIndexTables = admin.listTables(MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX+".*"); - for (HTableDescriptor table : localIndexTables) { - if (table.getValue(MetaDataUtil.PARENT_TABLE_KEY) == null - && table.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_NAME) != null) { - table.setValue(MetaDataUtil.PARENT_TABLE_KEY, + String columnsToAdd = ""; + // This will occur if we have an older SYSTEM.CATALOG and we need to update it to include + // any new columns we've added. + if(currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_3_0) { + // We know that we always need to add the STORE_NULLS column for 4.3 release + columnsToAdd = addColumn(columnsToAdd, PhoenixDatabaseMetaData.STORE_NULLS + " " + PBoolean.INSTANCE.getSqlTypeName()); + try (HBaseAdmin admin = getAdmin()) { + HTableDescriptor[] localIndexTables = admin.listTables(MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX+".*"); + for (HTableDescriptor table : localIndexTables) { + if (table.getValue(MetaDataUtil.PARENT_TABLE_KEY) == null + && table.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_NAME) != null) { + table.setValue(MetaDataUtil.PARENT_TABLE_KEY, MetaDataUtil.getUserTableName(table - .getNameAsString())); - // Explicitly disable, modify and enable the table to ensure co-location of data - // and index regions. If we just modify the table descriptor when online schema - // change enabled may reopen the region in same region server instead of following data region. - admin.disableTable(table.getTableName()); - admin.modifyTable(table.getTableName(), table); - admin.enableTable(table.getTableName()); - } + .getNameAsString())); + // Explicitly disable, modify and enable the table to ensure co-location of data + // and index regions. If we just modify the table descriptor when online schema + // change enabled may reopen the region in same region server instead of following data region. + admin.disableTable(table.getTableName()); + admin.modifyTable(table.getTableName(), table); + admin.enableTable(table.getTableName()); } } } + } - // If the server side schema is before MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0 then - // we need to add INDEX_TYPE and INDEX_DISABLE_TIMESTAMP columns too. - // TODO: Once https://issues.apache.org/jira/browse/PHOENIX-1614 is fixed, - // we should just have a ALTER TABLE ADD IF NOT EXISTS statement with all - // the column names that have been added to SYSTEM.CATALOG since 4.0. - if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0) { - columnsToAdd = addColumn(columnsToAdd, PhoenixDatabaseMetaData.INDEX_TYPE + " " + PUnsignedTinyint.INSTANCE.getSqlTypeName() + // If the server side schema is before MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0 then + // we need to add INDEX_TYPE and INDEX_DISABLE_TIMESTAMP columns too. + // TODO: Once https://issues.apache.org/jira/browse/PHOENIX-1614 is fixed, + // we should just have a ALTER TABLE ADD IF NOT EXISTS statement with all + // the column names that have been added to SYSTEM.CATALOG since 4.0. + if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0) { + columnsToAdd = addColumn(columnsToAdd, PhoenixDatabaseMetaData.INDEX_TYPE + " " + PUnsignedTinyint.INSTANCE.getSqlTypeName() + ", " + PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP + " " + PLong.INSTANCE.getSqlTypeName()); - } + } - // If we have some new columns from 4.1-4.3 to add, add them now. - if (!columnsToAdd.isEmpty()) { - // Ugh..need to assign to another local variable to keep eclipse happy. - PhoenixConnection newMetaConnection = addColumnsIfNotExists(metaConnection, + // If we have some new columns from 4.1-4.3 to add, add them now. + if (!columnsToAdd.isEmpty()) { + // Ugh..need to assign to another local variable to keep eclipse happy. + PhoenixConnection newMetaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG, MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_3_0, columnsToAdd); - metaConnection = newMetaConnection; - } + metaConnection = newMetaConnection; + } - if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_5_0) { - columnsToAdd = PhoenixDatabaseMetaData.BASE_COLUMN_COUNT + " " - + PInteger.INSTANCE.getSqlTypeName(); - try { - metaConnection = addColumn(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG, + if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_5_0) { + columnsToAdd = PhoenixDatabaseMetaData.BASE_COLUMN_COUNT + " " + + PInteger.INSTANCE.getSqlTypeName(); + try { + metaConnection = addColumn(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG, MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_5_0, columnsToAdd, false); - upgradeTo4_5_0(metaConnection); - } catch (ColumnAlreadyExistsException ignored) { - /* - * Upgrade to 4.5 is a slightly special case. We use the fact that the column - * BASE_COLUMN_COUNT is already part of the meta-data schema as the signal that - * the server side upgrade has finished or is in progress. - */ - logger.debug("No need to run 4.5 upgrade"); + upgradeTo4_5_0(metaConnection); + } catch (ColumnAlreadyExistsException ignored) { + /* + * Upgrade to 4.5 is a slightly special case. We use the fact that the column + * BASE_COLUMN_COUNT is already part of the meta-data schema as the signal that + * the server side upgrade has finished or is in progress. + */ + logger.debug("No need to run 4.5 upgrade"); + } + Properties props = PropertiesUtil.deepCopy(metaConnection.getClientInfo()); + props.remove(PhoenixRuntime.CURRENT_SCN_ATTRIB); + props.remove(PhoenixRuntime.TENANT_ID_ATTRIB); + PhoenixConnection conn = new PhoenixConnection(ConnectionQueryServicesImpl.this, metaConnection.getURL(), props, metaConnection.getMetaDataCache()); + try { + List<String> tablesNeedingUpgrade = UpgradeUtil.getPhysicalTablesWithDescRowKey(conn); + if (!tablesNeedingUpgrade.isEmpty()) { + logger.warn("The following tables require upgrade due to a bug causing the row key to be incorrect for descending columns and ascending BINARY columns (PHOENIX-2067 and PHOENIX-2120):\n" + Joiner.on(' ').join(tablesNeedingUpgrade) + "\nTo upgrade issue the \"bin/psql.py -u\" command."); } - Properties props = PropertiesUtil.deepCopy(metaConnection.getClientInfo()); - props.remove(PhoenixRuntime.CURRENT_SCN_ATTRIB); - props.remove(PhoenixRuntime.TENANT_ID_ATTRIB); - PhoenixConnection conn = new PhoenixConnection(ConnectionQueryServicesImpl.this, metaConnection.getURL(), props, metaConnection.getMetaDataCache()); - try { - List<String> tablesNeedingUpgrade = UpgradeUtil.getPhysicalTablesWithDescRowKey(conn); - if (!tablesNeedingUpgrade.isEmpty()) { - logger.warn("The following tables require upgrade due to a bug causing the row key to be incorrect for descending columns and ascending BINARY columns (PHOENIX-2067 and PHOENIX-2120):\n" + Joiner.on(' ').join(tablesNeedingUpgrade) + "\nTo upgrade issue the \"bin/psql.py -u\" command."); - } - List<String> unsupportedTables = UpgradeUtil.getPhysicalTablesWithDescVarbinaryRowKey(conn); - if (!unsupportedTables.isEmpty()) { - logger.warn("The following tables use an unsupported VARBINARY DESC construct and need to be changed:\n" + Joiner.on(' ').join(unsupportedTables)); - } - } catch (Exception ex) { - logger.error("Unable to determine tables requiring upgrade due to PHOENIX-2067", ex); - } finally { - conn.close(); + List<String> unsupportedTables = UpgradeUtil.getPhysicalTablesWithDescVarbinaryRowKey(conn); + if (!unsupportedTables.isEmpty()) { + logger.warn("The following tables use an unsupported VARBINARY DESC construct and need to be changed:\n" + Joiner.on(' ').join(unsupportedTables)); } + } catch (Exception ex) { + logger.error("Unable to determine tables requiring upgrade due to PHOENIX-2067", ex); + } finally { + conn.close(); } - // Add these columns one at a time, each with different timestamps so that if folks have - // run the upgrade code already for a snapshot, we'll still enter this block (and do the - // parts we haven't yet done). - if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_6_0) { - columnsToAdd = PhoenixDatabaseMetaData.IS_ROW_TIMESTAMP + " " + PBoolean.INSTANCE.getSqlTypeName(); - metaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG, + } + // Add these columns one at a time, each with different timestamps so that if folks have + // run the upgrade code already for a snapshot, we'll still enter this block (and do the + // parts we haven't yet done). + if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_6_0) { + columnsToAdd = PhoenixDatabaseMetaData.IS_ROW_TIMESTAMP + " " + PBoolean.INSTANCE.getSqlTypeName(); + metaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG, MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_6_0, columnsToAdd); - } - if(currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0) { - // Drop old stats table so that new stats table is created - metaConnection = dropStatsTable(metaConnection, + } + if(currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0) { + // Drop old stats table so that new stats table is created + metaConnection = dropStatsTable(metaConnection, MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 4); - metaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG, + metaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG, MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 3, PhoenixDatabaseMetaData.TRANSACTIONAL + " " + PBoolean.INSTANCE.getSqlTypeName()); - metaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG, + metaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG, MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 2, PhoenixDatabaseMetaData.UPDATE_CACHE_FREQUENCY + " " + PLong.INSTANCE.getSqlTypeName()); - metaConnection = setImmutableTableIndexesImmutable(metaConnection, + metaConnection = setImmutableTableIndexesImmutable(metaConnection, MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 1); - metaConnection = updateSystemCatalogTimestamp(metaConnection, + metaConnection = updateSystemCatalogTimestamp(metaConnection, MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0); - ConnectionQueryServicesImpl.this.removeTable(null, PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, null, MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0); - clearCache(); - } + ConnectionQueryServicesImpl.this.removeTable(null, PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, null, MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0); + clearCache(); + } - if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0) { - metaConnection = addColumnsIfNotExists(metaConnection, + if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0) { + metaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG, MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0 - 2, PhoenixDatabaseMetaData.IS_NAMESPACE_MAPPED + " " + PBoolean.INSTANCE.getSqlTypeName()); - metaConnection = addColumnsIfNotExists(metaConnection, + metaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG, MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0 - 1, PhoenixDatabaseMetaData.AUTO_PARTITION_SEQ + " " + PVarchar.INSTANCE.getSqlTypeName()); - metaConnection = addColumnsIfNotExists(metaConnection, + metaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG, MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0, PhoenixDatabaseMetaData.APPEND_ONLY_SCHEMA + " " + PBoolean.INSTANCE.getSqlTypeName()); - metaConnection = UpgradeUtil.disableViewIndexes(metaConnection); - if(getProps().getBoolean(QueryServices.LOCAL_INDEX_CLIENT_UPGRADE_ATTRIB, + metaConnection = UpgradeUtil.disableViewIndexes(metaConnection); + if(getProps().getBoolean(QueryServices.LOCAL_INDEX_CLIENT_UPGRADE_ATTRIB, QueryServicesOptions.DEFAULT_LOCAL_INDEX_CLIENT_UPGRADE)) { - metaConnection = UpgradeUtil.upgradeLocalIndexes(metaConnection); - } - ConnectionQueryServicesImpl.this.removeTable(null, + metaConnection = UpgradeUtil.upgradeLocalIndexes(metaConnection); + } + ConnectionQueryServicesImpl.this.removeTable(null, PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, null, MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0); - clearCache(); - } + clearCache(); + } + } } @@ -2544,6 +2543,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } success = true; scheduleRenewLeaseTasks(); + } catch (UpgradeInProgressException e) { + // don't set it as initializationException because otherwise client won't be able to retry + throw e; } catch (Exception e) { if (e instanceof SQLException) { initializationException = (SQLException)e; @@ -2719,12 +2721,56 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } } } + + /** + * Acquire distributed mutex of sorts to make sure only one JVM is able to run the upgrade code by + * making use of HBase's checkAndPut api. + * <p> + * This method was added as part of 4.8.1 release. For clients upgrading to 4.8.1, the old value in the + * version cell will be null i.e. the QueryConstants#VERSION column will be non-existent. For client's + * upgrading to a release newer than 4.8.1 the existing version cell will be non-null. The client which + * wins the race will end up setting the version cell to the MetadataProtocol#MIN_SYSTEM_TABLE_TIMESTAMP + * for the release. + * </p> + * + * @return true if client won the race, false otherwise + * @throws IOException + * @throws SQLException + */ + private boolean acquireUpgradeMutex(long currentServerSideTableTimestamp) throws IOException, + SQLException { + Preconditions.checkArgument(currentServerSideTableTimestamp < MIN_SYSTEM_TABLE_TIMESTAMP); + try (HTableInterface sysCatalogTable = getTable(SYSTEM_CATALOG_NAME_BYTES)) { + byte[] row = SchemaUtil.getTableKey(null, PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA, + PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME); + byte[] family = PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES; + byte[] qualifier = QueryConstants.VERSION; + byte[] oldValue = currentServerSideTableTimestamp < MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0 ? null + : Bytes.toBytes(currentServerSideTableTimestamp); + byte[] newValue = Bytes.toBytes(MIN_SYSTEM_TABLE_TIMESTAMP); + long ts = MIN_SYSTEM_TABLE_TIMESTAMP; + Put put = new Put(row, ts); + put.add(family, qualifier, newValue); + boolean acquired = sysCatalogTable.checkAndPut(row, family, qualifier, oldValue, put); + if (!acquired) { throw new UpgradeInProgressException( + getVersion(currentServerSideTableTimestamp), getVersion(MIN_SYSTEM_TABLE_TIMESTAMP)); } + return true; + } + } }); } catch (Exception e) { Throwables.propagateIfInstanceOf(e, SQLException.class); throw Throwables.propagate(e); } } + + private static class UpgradeInProgressException extends SQLException { + public UpgradeInProgressException(String upgradeFrom, String upgradeTo) { + super("Cluster is being concurrently upgraded from " + upgradeFrom + " to " + upgradeTo + + ". Please retry establishing connection.", SQLExceptionCode.CONCURRENT_UPGRADE_IN_PROGRESS + .getSQLState(), SQLExceptionCode.CONCURRENT_UPGRADE_IN_PROGRESS.getErrorCode()); + } + } private List<String> getTableNames(List<HTableDescriptor> tables) { List<String> tableNames = new ArrayList<String>(4); http://git-wip-us.apache.org/repos/asf/phoenix/blob/df0d6117/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java index 9f8f58c..3077943 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java @@ -356,5 +356,6 @@ public interface QueryConstants { public static final byte[] OFFSET_FAMILY = "f_offset".getBytes(); public static final byte[] OFFSET_COLUMN = "c_offset".getBytes(); public static final String LAST_SCAN = "LAST_SCAN"; + public static final byte[] VERSION = "VERSION".getBytes(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/df0d6117/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java index 7c34f4a..8bc3e63 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java @@ -19,7 +19,7 @@ package org.apache.phoenix.util; import static com.google.common.base.Preconditions.checkNotNull; import static org.apache.phoenix.coprocessor.MetaDataProtocol.CURRENT_CLIENT_VERSION; -import static org.apache.phoenix.coprocessor.MetaDataProtocol.TIMESTAMP_VERSION_MAP; +import static org.apache.phoenix.coprocessor.MetaDataProtocol.getVersion; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ARRAY_SIZE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CACHE_SIZE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_FAMILY; @@ -31,7 +31,6 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TABLE_NAME; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TYPE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DECIMAL_DIGITS; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INCREMENT_BY; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_TYPE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LINK_TYPE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MAX_VALUE; @@ -1896,7 +1895,7 @@ public class UpgradeUtil { public static final String getUpgradeSnapshotName(String tableString, long currentSystemTableTimestamp) { Format formatter = new SimpleDateFormat("yyyyMMddHHmmssZ"); String date = formatter.format(new Date(System.currentTimeMillis())); - String upgradingFrom = TIMESTAMP_VERSION_MAP.get(currentSystemTableTimestamp); + String upgradingFrom = getVersion(currentSystemTableTimestamp); return "SNAPSHOT_" + tableString + "_" + upgradingFrom + "_TO_" + CURRENT_CLIENT_VERSION + "_" + date; } } \ No newline at end of file