http://git-wip-us.apache.org/repos/asf/phoenix/blob/e90feaa3/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index 1aa9b88..dfe7ee8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -63,6 +63,7 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.concurrent.GuardedBy; @@ -131,6 +132,8 @@ import org.apache.phoenix.coprocessor.generated.MetaDataProtos.UpdateIndexStateR import org.apache.phoenix.exception.PhoenixIOException; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; +import org.apache.phoenix.exception.UpgradeInProgressException; +import org.apache.phoenix.exception.UpgradeNotRequiredException; import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.hbase.index.IndexRegionSplitPolicy; import org.apache.phoenix.hbase.index.Indexer; @@ -266,6 +269,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement private final List<LinkedBlockingQueue<WeakReference<PhoenixConnection>>> connectionQueues; private ScheduledExecutorService renewLeaseExecutor; private final boolean renewLeaseEnabled; + private final boolean isAutoUpgradeEnabled; + private final AtomicBoolean upgradeRequired = new AtomicBoolean(false); private static interface FeatureSupported { boolean isSupported(ConnectionQueryServices services); @@ -342,6 +347,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement connectionQueues = ImmutableList.copyOf(list); // A little bit of a smell to leak `this` here, but should not be a problem this.tableStatsCache = new TableStatsCache(this, config); + this.isAutoUpgradeEnabled = config.getBoolean(AUTO_UPGRADE_ENABLED, QueryServicesOptions.DEFAULT_AUTO_UPGRADE_ENABLED); } @Override @@ -2310,29 +2316,16 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement return null; } checkClosed(); - PhoenixConnection metaConnection = null; - boolean success = false; - String snapshotName = null; - String sysCatalogTableName = null; try { openConnection(); - String noUpgradeProp = props.getProperty(PhoenixRuntime.NO_UPGRADE_ATTRIB); - boolean upgradeSystemTables = !Boolean.TRUE.equals(Boolean.valueOf(noUpgradeProp)); - Properties scnProps = PropertiesUtil.deepCopy(props); - scnProps.setProperty( - PhoenixRuntime.CURRENT_SCN_ATTRIB, - Long.toString(MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP)); - scnProps.remove(PhoenixRuntime.TENANT_ID_ATTRIB); - String globalUrl = JDBCUtil.removeProperty(url, PhoenixRuntime.TENANT_ID_ATTRIB); - metaConnection = new PhoenixConnection( - ConnectionQueryServicesImpl.this, globalUrl, scnProps, newEmptyMetaData()); + boolean isDoNotUpgradePropSet = UpgradeUtil.isNoUpgradeSet(props); try (HBaseAdmin admin = getAdmin()) { boolean mappedSystemCatalogExists = admin .tableExists(SchemaUtil.getPhysicalTableName(SYSTEM_CATALOG_NAME_BYTES, true)); if (SchemaUtil.isNamespaceMappingEnabled(PTableType.SYSTEM, ConnectionQueryServicesImpl.this.getProps())) { if (admin.tableExists(SYSTEM_CATALOG_NAME_BYTES)) { - //check if the server is already updated and have namespace config properly set. + //check if the server is already updated and have namespace config properly set. checkClientServerCompatibility(SYSTEM_CATALOG_NAME_BYTES); } ensureSystemTablesUpgraded(ConnectionQueryServicesImpl.this.getProps()); @@ -2345,232 +2338,32 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement + IS_NAMESPACE_MAPPING_ENABLED + " enabled") .build().buildException(); } } - try { - metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_TABLE_METADATA); - } catch (NewerTableAlreadyExistsException ignore) { - // Ignore, as this will happen if the SYSTEM.CATALOG already exists at this fixed timestamp. - // A TableAlreadyExistsException is not thrown, since the table only exists *after* this fixed timestamp. - } catch (TableAlreadyExistsException e) { - if (upgradeSystemTables) { - long currentServerSideTableTimeStamp = e.getTable().getTimeStamp(); - sysCatalogTableName = e.getTable().getPhysicalName().getString(); - if (currentServerSideTableTimeStamp < MIN_SYSTEM_TABLE_TIMESTAMP && acquireUpgradeMutex(currentServerSideTableTimeStamp, e.getTable().getPhysicalName().getBytes())) { - snapshotName = getUpgradeSnapshotName(sysCatalogTableName, currentServerSideTableTimeStamp); - createSnapshot(snapshotName, sysCatalogTableName); - } - String columnsToAdd = ""; - // This will occur if we have an older SYSTEM.CATALOG and we need to update it to include - // any new columns we've added. - if(currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_3_0) { - // We know that we always need to add the STORE_NULLS column for 4.3 release - columnsToAdd = addColumn(columnsToAdd, PhoenixDatabaseMetaData.STORE_NULLS + " " + PBoolean.INSTANCE.getSqlTypeName()); - try (HBaseAdmin admin = getAdmin()) { - HTableDescriptor[] localIndexTables = admin.listTables(MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX+".*"); - for (HTableDescriptor table : localIndexTables) { - if (table.getValue(MetaDataUtil.PARENT_TABLE_KEY) == null - && table.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_NAME) != null) { - table.setValue(MetaDataUtil.PARENT_TABLE_KEY, - MetaDataUtil.getUserTableName(table - .getNameAsString())); - // Explicitly disable, modify and enable the table to ensure co-location of data - // and index regions. If we just modify the table descriptor when online schema - // change enabled may reopen the region in same region server instead of following data region. - admin.disableTable(table.getTableName()); - admin.modifyTable(table.getTableName(), table); - admin.enableTable(table.getTableName()); - } - } - } - } - - // If the server side schema is before MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0 then - // we need to add INDEX_TYPE and INDEX_DISABLE_TIMESTAMP columns too. - // TODO: Once https://issues.apache.org/jira/browse/PHOENIX-1614 is fixed, - // we should just have a ALTER TABLE ADD IF NOT EXISTS statement with all - // the column names that have been added to SYSTEM.CATALOG since 4.0. - if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0) { - columnsToAdd = addColumn(columnsToAdd, PhoenixDatabaseMetaData.INDEX_TYPE + " " + PUnsignedTinyint.INSTANCE.getSqlTypeName() - + ", " + PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP + " " + PLong.INSTANCE.getSqlTypeName()); - } - - // If we have some new columns from 4.1-4.3 to add, add them now. - if (!columnsToAdd.isEmpty()) { - // Ugh..need to assign to another local variable to keep eclipse happy. - PhoenixConnection newMetaConnection = addColumnsIfNotExists(metaConnection, - PhoenixDatabaseMetaData.SYSTEM_CATALOG, - MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_3_0, columnsToAdd); - metaConnection = newMetaConnection; - } - - if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_5_0) { - columnsToAdd = PhoenixDatabaseMetaData.BASE_COLUMN_COUNT + " " - + PInteger.INSTANCE.getSqlTypeName(); - try { - metaConnection = addColumn(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG, - MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_5_0, columnsToAdd, false); - upgradeTo4_5_0(metaConnection); - } catch (ColumnAlreadyExistsException ignored) { - /* - * Upgrade to 4.5 is a slightly special case. We use the fact that the column - * BASE_COLUMN_COUNT is already part of the meta-data schema as the signal that - * the server side upgrade has finished or is in progress. - */ - logger.debug("No need to run 4.5 upgrade"); - } - Properties props = PropertiesUtil.deepCopy(metaConnection.getClientInfo()); - props.remove(PhoenixRuntime.CURRENT_SCN_ATTRIB); - props.remove(PhoenixRuntime.TENANT_ID_ATTRIB); - PhoenixConnection conn = new PhoenixConnection(ConnectionQueryServicesImpl.this, metaConnection.getURL(), props, metaConnection.getMetaDataCache()); - try { - List<String> tablesNeedingUpgrade = UpgradeUtil.getPhysicalTablesWithDescRowKey(conn); - if (!tablesNeedingUpgrade.isEmpty()) { - logger.warn("The following tables require upgrade due to a bug causing the row key to be incorrect for descending columns and ascending BINARY columns (PHOENIX-2067 and PHOENIX-2120):\n" + Joiner.on(' ').join(tablesNeedingUpgrade) + "\nTo upgrade issue the \"bin/psql.py -u\" command."); - } - List<String> unsupportedTables = UpgradeUtil.getPhysicalTablesWithDescVarbinaryRowKey(conn); - if (!unsupportedTables.isEmpty()) { - logger.warn("The following tables use an unsupported VARBINARY DESC construct and need to be changed:\n" + Joiner.on(' ').join(unsupportedTables)); - } - } catch (Exception ex) { - logger.error("Unable to determine tables requiring upgrade due to PHOENIX-2067", ex); - } finally { - conn.close(); - } - } - // Add these columns one at a time, each with different timestamps so that if folks have - // run the upgrade code already for a snapshot, we'll still enter this block (and do the - // parts we haven't yet done). - if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_6_0) { - columnsToAdd = PhoenixDatabaseMetaData.IS_ROW_TIMESTAMP + " " + PBoolean.INSTANCE.getSqlTypeName(); - metaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG, - MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_6_0, columnsToAdd); - } - if(currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0) { - // Drop old stats table so that new stats table is created - metaConnection = dropStatsTable(metaConnection, - MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 4); - metaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG, - MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 3, - PhoenixDatabaseMetaData.TRANSACTIONAL + " " + PBoolean.INSTANCE.getSqlTypeName()); - metaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG, - MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 2, - PhoenixDatabaseMetaData.UPDATE_CACHE_FREQUENCY + " " + PLong.INSTANCE.getSqlTypeName()); - metaConnection = setImmutableTableIndexesImmutable(metaConnection, - MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 1); - metaConnection = updateSystemCatalogTimestamp(metaConnection, - MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0); - ConnectionQueryServicesImpl.this.removeTable(null, PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, null, MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0); - clearCache(); - } - - if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0) { - metaConnection = addColumnsIfNotExists(metaConnection, - PhoenixDatabaseMetaData.SYSTEM_CATALOG, - MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0 - 2, - PhoenixDatabaseMetaData.IS_NAMESPACE_MAPPED + " " - + PBoolean.INSTANCE.getSqlTypeName()); - metaConnection = addColumnsIfNotExists(metaConnection, - PhoenixDatabaseMetaData.SYSTEM_CATALOG, - MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0 - 1, - PhoenixDatabaseMetaData.AUTO_PARTITION_SEQ + " " - + PVarchar.INSTANCE.getSqlTypeName()); - metaConnection = addColumnsIfNotExists(metaConnection, - PhoenixDatabaseMetaData.SYSTEM_CATALOG, - MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0, - PhoenixDatabaseMetaData.APPEND_ONLY_SCHEMA + " " - + PBoolean.INSTANCE.getSqlTypeName()); - metaConnection = UpgradeUtil.disableViewIndexes(metaConnection); - if(getProps().getBoolean(QueryServices.LOCAL_INDEX_CLIENT_UPGRADE_ATTRIB, - QueryServicesOptions.DEFAULT_LOCAL_INDEX_CLIENT_UPGRADE)) { - metaConnection = UpgradeUtil.upgradeLocalIndexes(metaConnection); - } - ConnectionQueryServicesImpl.this.removeTable(null, - PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, null, - MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0); - clearCache(); - } - - } - } - - int nSaltBuckets = ConnectionQueryServicesImpl.this.props.getInt(QueryServices.SEQUENCE_SALT_BUCKETS_ATTRIB, - QueryServicesOptions.DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS); - try { - String createSequenceTable = Sequence.getCreateTableStatement(nSaltBuckets); - metaConnection.createStatement().executeUpdate(createSequenceTable); - nSequenceSaltBuckets = nSaltBuckets; - } catch (NewerTableAlreadyExistsException e) { - // Ignore, as this will happen if the SYSTEM.SEQUENCE already exists at this fixed timestamp. - // A TableAlreadyExistsException is not thrown, since the table only exists *after* this fixed timestamp. - nSequenceSaltBuckets = getSaltBuckets(e); - } catch (TableAlreadyExistsException e) { - if (upgradeSystemTables) { - // This will occur if we have an older SYSTEM.SEQUENCE and we need to update it to include - // any new columns we've added. + Properties scnProps = PropertiesUtil.deepCopy(props); + scnProps.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, + Long.toString(MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP)); + scnProps.remove(PhoenixRuntime.TENANT_ID_ATTRIB); + String globalUrl = JDBCUtil.removeProperty(url, PhoenixRuntime.TENANT_ID_ATTRIB); + try (PhoenixConnection metaConnection = new PhoenixConnection(ConnectionQueryServicesImpl.this, globalUrl, + scnProps, newEmptyMetaData())) { + try { + metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_TABLE_METADATA); + } catch (NewerTableAlreadyExistsException ignore) { + // Ignore, as this will happen if the SYSTEM.CATALOG already exists at this fixed + // timestamp. A TableAlreadyExistsException is not thrown, since the table only exists + // *after* this fixed timestamp. + } catch (TableAlreadyExistsException e) { long currentServerSideTableTimeStamp = e.getTable().getTimeStamp(); - if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0) { - // If the table time stamp is before 4.1.0 then we need to add below columns - // to the SYSTEM.SEQUENCE table. - String columnsToAdd = PhoenixDatabaseMetaData.MIN_VALUE + " " + PLong.INSTANCE.getSqlTypeName() - + ", " + PhoenixDatabaseMetaData.MAX_VALUE + " " + PLong.INSTANCE.getSqlTypeName() - + ", " + PhoenixDatabaseMetaData.CYCLE_FLAG + " " + PBoolean.INSTANCE.getSqlTypeName() - + ", " + PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG + " " + PBoolean.INSTANCE.getSqlTypeName(); - addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG, - MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP, columnsToAdd); - } - // If the table timestamp is before 4.2.1 then run the upgrade script - if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_2_1) { - if (UpgradeUtil.upgradeSequenceTable(metaConnection, nSaltBuckets, e.getTable())) { - metaConnection.removeTable(null, - PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_SCHEMA, - PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_TABLE, - MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP); - clearTableFromCache(ByteUtil.EMPTY_BYTE_ARRAY, - PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_SCHEMA_BYTES, - PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_TABLE_BYTES, - MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP); - clearTableRegionCache(PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_NAME_BYTES); - } - nSequenceSaltBuckets = nSaltBuckets; - } else { - nSequenceSaltBuckets = getSaltBuckets(e); + if (currentServerSideTableTimeStamp < MIN_SYSTEM_TABLE_TIMESTAMP) { + ConnectionQueryServicesImpl.this.upgradeRequired.set(true); } } - } - try { - metaConnection.createStatement().executeUpdate( - QueryConstants.CREATE_STATS_TABLE_METADATA); - } catch (NewerTableAlreadyExistsException ignore) { - } catch(TableAlreadyExistsException e) { - if (upgradeSystemTables) { - long currentServerSideTableTimeStamp = e.getTable().getTimeStamp(); - if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_3_0) { - metaConnection = addColumnsIfNotExists( - metaConnection, - SYSTEM_STATS_NAME, - MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP, - PhoenixDatabaseMetaData.GUIDE_POSTS_ROW_COUNT + " " - + PLong.INSTANCE.getSqlTypeName()); - } + if (!ConnectionQueryServicesImpl.this.upgradeRequired.get()) { + createOtherSystemTables(metaConnection); + } else if (isAutoUpgradeEnabled && !isDoNotUpgradePropSet) { + upgradeSystemTables(url, props); } } - try { - metaConnection.createStatement().executeUpdate( - QueryConstants.CREATE_FUNCTION_METADATA); - } catch (NewerTableAlreadyExistsException e) { - } catch (TableAlreadyExistsException e) { - } - if (SchemaUtil.isNamespaceMappingEnabled(PTableType.SYSTEM, - ConnectionQueryServicesImpl.this.getProps())) { - try { - metaConnection.createStatement().executeUpdate("CREATE SCHEMA IF NOT EXISTS " - + PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA); - } catch (NewerSchemaAlreadyExistsException e) {} - } - success = true; scheduleRenewLeaseTasks(); - } catch (UpgradeInProgressException e) { - // don't set it as initializationException because otherwise client won't be able to retry - throw e; } catch (Exception e) { if (e instanceof SQLException) { initializationException = (SQLException)e; @@ -2579,222 +2372,517 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement initializationException = new SQLException(e); } } finally { - try { - if (metaConnection != null) metaConnection.close(); - } catch (SQLException e) { - if (initializationException != null) { - initializationException.setNextException(e); - } else { - initializationException = e; - } - } finally { - try { - restoreFromSnapshot(sysCatalogTableName, snapshotName, success); - } catch (SQLException e) { - if (initializationException != null) { - initializationException.setNextException(e); - } else { - initializationException = e; - } - } - try { - if (initializationException != null) { - throw initializationException; - } - } finally { - initialized = true; - } - } + initialized = true; } - } + } return null; } - - private void createSnapshot(String snapshotName, String tableName) - throws SQLException { - HBaseAdmin admin = null; - SQLException sqlE = null; - try { - admin = getAdmin(); - admin.snapshot(snapshotName, tableName); - logger.info("Successfully created snapshot " + snapshotName + " for " - + tableName); - } catch (Exception e) { - sqlE = new SQLException(e); - } finally { - try { - if (admin != null) { - admin.close(); - } - } catch (Exception e) { - SQLException adminCloseEx = new SQLException(e); - if (sqlE == null) { - sqlE = adminCloseEx; - } else { - sqlE.setNextException(adminCloseEx); - } - } finally { - if (sqlE != null) { - throw sqlE; + }); + } catch (Exception e) { + Throwables.propagateIfInstanceOf(e, SQLException.class); + Throwables.propagate(e); + } + } + + private void createOtherSystemTables(PhoenixConnection metaConnection) throws SQLException { + try { + metaConnection.createStatement().execute(QueryConstants.CREATE_SEQUENCE_METADATA); + } catch (TableAlreadyExistsException ignore) {} + try { + metaConnection.createStatement().execute(QueryConstants.CREATE_STATS_TABLE_METADATA); + } catch (TableAlreadyExistsException ignore) {} + try { + metaConnection.createStatement().execute(QueryConstants.CREATE_FUNCTION_METADATA); + } catch (TableAlreadyExistsException ignore) {} + } + + /** + * There is no other locking needed here since only one connection (on the same or different JVM) will be able to + * acquire the upgrade mutex via {@link #acquireUpgradeMutex(long, byte[])}. + */ + @Override + public void upgradeSystemTables(final String url, final Properties props) throws SQLException { + PhoenixConnection metaConnection = null; + boolean success = false; + String snapshotName = null; + String sysCatalogTableName = null; + SQLException toThrow = null; + try { + if (!ConnectionQueryServicesImpl.this.upgradeRequired.get()) { + throw new UpgradeNotRequiredException(); + } + Properties scnProps = PropertiesUtil.deepCopy(props); + scnProps.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, + Long.toString(MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP)); + scnProps.remove(PhoenixRuntime.TENANT_ID_ATTRIB); + String globalUrl = JDBCUtil.removeProperty(url, PhoenixRuntime.TENANT_ID_ATTRIB); + metaConnection = new PhoenixConnection(ConnectionQueryServicesImpl.this, globalUrl, + scnProps, newEmptyMetaData()); + metaConnection.setRunningUpgrade(true); + try { + metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_TABLE_METADATA); + } catch (NewerTableAlreadyExistsException ignore) { + // Ignore, as this will happen if the SYSTEM.CATALOG already exists at this fixed + // timestamp. A TableAlreadyExistsException is not thrown, since the table only exists + // *after* this fixed timestamp. + } catch (TableAlreadyExistsException e) { + long currentServerSideTableTimeStamp = e.getTable().getTimeStamp(); + sysCatalogTableName = e.getTable().getPhysicalName().getString(); + if (currentServerSideTableTimeStamp < MIN_SYSTEM_TABLE_TIMESTAMP + && acquireUpgradeMutex(currentServerSideTableTimeStamp, e.getTable() + .getPhysicalName().getBytes())) { + snapshotName = getUpgradeSnapshotName(sysCatalogTableName, + currentServerSideTableTimeStamp); + createSnapshot(snapshotName, sysCatalogTableName); + } + String columnsToAdd = ""; + // This will occur if we have an older SYSTEM.CATALOG and we need to update it to + // include any new columns we've added. + if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_3_0) { + // We know that we always need to add the STORE_NULLS column for 4.3 release + columnsToAdd = addColumn(columnsToAdd, PhoenixDatabaseMetaData.STORE_NULLS + + " " + PBoolean.INSTANCE.getSqlTypeName()); + try (HBaseAdmin admin = getAdmin()) { + HTableDescriptor[] localIndexTables = admin + .listTables(MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX + ".*"); + for (HTableDescriptor table : localIndexTables) { + if (table.getValue(MetaDataUtil.PARENT_TABLE_KEY) == null + && table.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_NAME) != null) { + table.setValue(MetaDataUtil.PARENT_TABLE_KEY, + MetaDataUtil.getUserTableName(table.getNameAsString())); + // Explicitly disable, modify and enable the table to ensure + // co-location of data and index regions. If we just modify the + // table descriptor when online schema change enabled may reopen + // the region in same region server instead of following data region. + admin.disableTable(table.getTableName()); + admin.modifyTable(table.getTableName(), table); + admin.enableTable(table.getTableName()); } } } } - private void restoreFromSnapshot(String tableName, String snapshotName, - boolean success) throws SQLException { - boolean snapshotRestored = false; - boolean tableDisabled = false; - if (!success && snapshotName != null) { - SQLException sqlE = null; - HBaseAdmin admin = null; - try { - logger.warn("Starting restore of " + tableName + " using snapshot " - + snapshotName + " because upgrade failed"); - admin = getAdmin(); - admin.disableTable(tableName); - tableDisabled = true; - admin.restoreSnapshot(snapshotName); - snapshotRestored = true; - logger.warn("Successfully restored " + tableName + " using snapshot " - + snapshotName); - } catch (Exception e) { - sqlE = new SQLException(e); - } finally { - if (admin != null && tableDisabled) { - try { - admin.enableTable(tableName); - if (snapshotRestored) { - logger.warn("Successfully restored and enabled " + tableName + " using snapshot " - + snapshotName); - } else { - logger.warn("Successfully enabled " + tableName + " after restoring using snapshot " - + snapshotName + " failed. "); - } - } catch (Exception e1) { - SQLException enableTableEx = new SQLException(e1); - if (sqlE == null) { - sqlE = enableTableEx; - } else { - sqlE.setNextException(enableTableEx); - } - logger.error("Failure in enabling " - + tableName - + (snapshotRestored ? " after successfully restoring using snapshot" - + snapshotName - : " after restoring using snapshot " - + snapshotName + " failed. ")); - } finally { - try { - admin.close(); - } catch (Exception e2) { - SQLException adminCloseEx = new SQLException(e2); - if (sqlE == null) { - sqlE = adminCloseEx; - } else { - sqlE.setNextException(adminCloseEx); - } - } finally { - if (sqlE != null) { - throw sqlE; - } - } - } - } - } - } + // If the server side schema is before MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0 then + // we need to add INDEX_TYPE and INDEX_DISABLE_TIMESTAMP columns too. + // TODO: Once https://issues.apache.org/jira/browse/PHOENIX-1614 is fixed, + // we should just have a ALTER TABLE ADD IF NOT EXISTS statement with all + // the column names that have been added to SYSTEM.CATALOG since 4.0. + if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0) { + columnsToAdd = addColumn(columnsToAdd, PhoenixDatabaseMetaData.INDEX_TYPE + " " + + PUnsignedTinyint.INSTANCE.getSqlTypeName() + ", " + + PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP + " " + + PLong.INSTANCE.getSqlTypeName()); } - private void ensureSystemTablesUpgraded(ReadOnlyProps props) - throws SQLException, IOException, IllegalArgumentException, InterruptedException { - if (!SchemaUtil.isNamespaceMappingEnabled(PTableType.SYSTEM, props)) { return; } - HTableInterface metatable = null; - try (HBaseAdmin admin = getAdmin()) { - ensureNamespaceCreated(QueryConstants.SYSTEM_SCHEMA_NAME); - List<HTableDescriptor> tables = Arrays - .asList(admin.listTables(QueryConstants.SYSTEM_SCHEMA_NAME + "\\..*", false)); - List<String> tableNames = getTableNames(tables); - if (tableNames.size() == 0) { return; } - if (tableNames.size() > 4) { throw new IllegalArgumentException( - "Expected 4 system table only but found " + tableNames.size() + ":" + tableNames); } - byte[] mappedSystemTable = SchemaUtil - .getPhysicalName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, props).getName(); - metatable = getTable(mappedSystemTable); - if (tableNames.contains(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME)) { - if (!admin.tableExists(mappedSystemTable)) { - UpgradeUtil.mapTableToNamespace(admin, metatable, - PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, props, null, PTableType.SYSTEM, - null); - ConnectionQueryServicesImpl.this.removeTable(null, - PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, null, - MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0); - } - tableNames.remove(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME); - } - for (String table : tableNames) { - UpgradeUtil.mapTableToNamespace(admin, metatable, table, props, null, PTableType.SYSTEM, - null); - ConnectionQueryServicesImpl.this.removeTable(null, table, null, - MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0); + // If we have some new columns from 4.1-4.3 to add, add them now. + if (!columnsToAdd.isEmpty()) { + // Ugh..need to assign to another local variable to keep eclipse happy. + PhoenixConnection newMetaConnection = addColumnsIfNotExists(metaConnection, + PhoenixDatabaseMetaData.SYSTEM_CATALOG, + MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_3_0, columnsToAdd); + metaConnection = newMetaConnection; + } + + if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_5_0) { + columnsToAdd = PhoenixDatabaseMetaData.BASE_COLUMN_COUNT + " " + + PInteger.INSTANCE.getSqlTypeName(); + try { + metaConnection = addColumn(metaConnection, + PhoenixDatabaseMetaData.SYSTEM_CATALOG, + MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_5_0, columnsToAdd, + false); + upgradeTo4_5_0(metaConnection); + } catch (ColumnAlreadyExistsException ignored) { + /* + * Upgrade to 4.5 is a slightly special case. We use the fact that the + * column BASE_COLUMN_COUNT is already part of the meta-data schema as the + * signal that the server side upgrade has finished or is in progress. + */ + logger.debug("No need to run 4.5 upgrade"); + } + Properties p = PropertiesUtil.deepCopy(metaConnection.getClientInfo()); + p.remove(PhoenixRuntime.CURRENT_SCN_ATTRIB); + p.remove(PhoenixRuntime.TENANT_ID_ATTRIB); + PhoenixConnection conn = new PhoenixConnection( + ConnectionQueryServicesImpl.this, metaConnection.getURL(), p, + metaConnection.getMetaDataCache()); + try { + List<String> tablesNeedingUpgrade = UpgradeUtil + .getPhysicalTablesWithDescRowKey(conn); + if (!tablesNeedingUpgrade.isEmpty()) { + logger.warn("The following tables require upgrade due to a bug causing the row key to be incorrect for descending columns and ascending BINARY columns (PHOENIX-2067 and PHOENIX-2120):\n" + + Joiner.on(' ').join(tablesNeedingUpgrade) + + "\nTo upgrade issue the \"bin/psql.py -u\" command."); } - if (!tableNames.isEmpty()) { - clearCache(); + List<String> unsupportedTables = UpgradeUtil + .getPhysicalTablesWithDescVarbinaryRowKey(conn); + if (!unsupportedTables.isEmpty()) { + logger.warn("The following tables use an unsupported VARBINARY DESC construct and need to be changed:\n" + + Joiner.on(' ').join(unsupportedTables)); } + } catch (Exception ex) { + logger.error( + "Unable to determine tables requiring upgrade due to PHOENIX-2067", + ex); } finally { - if (metatable != null) { - metatable.close(); - } + conn.close(); + } + } + // Add these columns one at a time, each with different timestamps so that if folks + // have + // run the upgrade code already for a snapshot, we'll still enter this block (and do + // the + // parts we haven't yet done). + if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_6_0) { + columnsToAdd = PhoenixDatabaseMetaData.IS_ROW_TIMESTAMP + " " + + PBoolean.INSTANCE.getSqlTypeName(); + metaConnection = addColumnsIfNotExists(metaConnection, + PhoenixDatabaseMetaData.SYSTEM_CATALOG, + MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_6_0, columnsToAdd); + } + if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0) { + // Drop old stats table so that new stats table is created + metaConnection = dropStatsTable(metaConnection, + MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 4); + metaConnection = addColumnsIfNotExists( + metaConnection, + PhoenixDatabaseMetaData.SYSTEM_CATALOG, + MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 3, + PhoenixDatabaseMetaData.TRANSACTIONAL + " " + + PBoolean.INSTANCE.getSqlTypeName()); + metaConnection = addColumnsIfNotExists( + metaConnection, + PhoenixDatabaseMetaData.SYSTEM_CATALOG, + MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 2, + PhoenixDatabaseMetaData.UPDATE_CACHE_FREQUENCY + " " + + PLong.INSTANCE.getSqlTypeName()); + metaConnection = setImmutableTableIndexesImmutable(metaConnection, + MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 1); + metaConnection = updateSystemCatalogTimestamp(metaConnection, + MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0); + ConnectionQueryServicesImpl.this.removeTable(null, + PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, null, + MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0); + clearCache(); + } + + if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0) { + metaConnection = addColumnsIfNotExists( + metaConnection, + PhoenixDatabaseMetaData.SYSTEM_CATALOG, + MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0 - 2, + PhoenixDatabaseMetaData.IS_NAMESPACE_MAPPED + " " + + PBoolean.INSTANCE.getSqlTypeName()); + metaConnection = addColumnsIfNotExists( + metaConnection, + PhoenixDatabaseMetaData.SYSTEM_CATALOG, + MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0 - 1, + PhoenixDatabaseMetaData.AUTO_PARTITION_SEQ + " " + + PVarchar.INSTANCE.getSqlTypeName()); + metaConnection = addColumnsIfNotExists( + metaConnection, + PhoenixDatabaseMetaData.SYSTEM_CATALOG, + MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0, + PhoenixDatabaseMetaData.APPEND_ONLY_SCHEMA + " " + + PBoolean.INSTANCE.getSqlTypeName()); + metaConnection = UpgradeUtil.disableViewIndexes(metaConnection); + if (getProps().getBoolean(QueryServices.LOCAL_INDEX_CLIENT_UPGRADE_ATTRIB, + QueryServicesOptions.DEFAULT_LOCAL_INDEX_CLIENT_UPGRADE)) { + metaConnection = UpgradeUtil.upgradeLocalIndexes(metaConnection); } + ConnectionQueryServicesImpl.this.removeTable(null, + PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, null, + MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0); + clearCache(); } + } + - /** - * Acquire distributed mutex of sorts to make sure only one JVM is able to run the upgrade code by - * making use of HBase's checkAndPut api. - * <p> - * This method was added as part of 4.8.1 release. For clients upgrading to 4.8.1, the old value in the - * version cell will be null i.e. the {@value QueryConstants#UPGRADE_MUTEX} column will be non-existent. For client's - * upgrading to a release newer than 4.8.1 the existing version cell will be non-null. The client which - * wins the race will end up setting the version cell to the {@value MetaDataProtocol#MIN_SYSTEM_TABLE_TIMESTAMP} - * for the release. - * </p> - * - * @return true if client won the race, false otherwise - * @throws IOException - * @throws SQLException - */ - private boolean acquireUpgradeMutex(long currentServerSideTableTimestamp, byte[] sysCatalogTableName) throws IOException, - SQLException { - Preconditions.checkArgument(currentServerSideTableTimestamp < MIN_SYSTEM_TABLE_TIMESTAMP); - try (HTableInterface sysCatalogTable = getTable(sysCatalogTableName)) { - byte[] row = SchemaUtil.getTableKey(null, PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA, - PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE); - byte[] family = PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES; - byte[] qualifier = QueryConstants.UPGRADE_MUTEX; - byte[] oldValue = currentServerSideTableTimestamp < MIN_SYSTEM_TABLE_TIMESTAMP_4_8_1 ? null - : PLong.INSTANCE.toBytes(currentServerSideTableTimestamp); - byte[] newValue = PLong.INSTANCE.toBytes(MIN_SYSTEM_TABLE_TIMESTAMP); - // Note that the timestamp for this put doesn't really matter since UPGRADE_MUTEX column isn't used - // to calculate SYSTEM.CATALOG's server side timestamp. - Put put = new Put(row); - put.add(family, qualifier, newValue); - boolean acquired = sysCatalogTable.checkAndPut(row, family, qualifier, oldValue, put); - if (!acquired) { throw new UpgradeInProgressException( - getVersion(currentServerSideTableTimestamp), getVersion(MIN_SYSTEM_TABLE_TIMESTAMP)); } - return true; + int nSaltBuckets = ConnectionQueryServicesImpl.this.props.getInt( + QueryServices.SEQUENCE_SALT_BUCKETS_ATTRIB, + QueryServicesOptions.DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS); + try { + String createSequenceTable = Sequence.getCreateTableStatement(nSaltBuckets); + metaConnection.createStatement().executeUpdate(createSequenceTable); + nSequenceSaltBuckets = nSaltBuckets; + } catch (NewerTableAlreadyExistsException e) { + // Ignore, as this will happen if the SYSTEM.SEQUENCE already exists at this fixed + // timestamp. + // A TableAlreadyExistsException is not thrown, since the table only exists *after* this + // fixed timestamp. + nSequenceSaltBuckets = getSaltBuckets(e); + } catch (TableAlreadyExistsException e) { + // This will occur if we have an older SYSTEM.SEQUENCE and we need to update it to + // include + // any new columns we've added. + long currentServerSideTableTimeStamp = e.getTable().getTimeStamp(); + if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0) { + // If the table time stamp is before 4.1.0 then we need to add below columns + // to the SYSTEM.SEQUENCE table. + String columnsToAdd = PhoenixDatabaseMetaData.MIN_VALUE + " " + + PLong.INSTANCE.getSqlTypeName() + ", " + + PhoenixDatabaseMetaData.MAX_VALUE + " " + + PLong.INSTANCE.getSqlTypeName() + ", " + + PhoenixDatabaseMetaData.CYCLE_FLAG + " " + + PBoolean.INSTANCE.getSqlTypeName() + ", " + + PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG + " " + + PBoolean.INSTANCE.getSqlTypeName(); + addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG, + MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP, columnsToAdd); + } + // If the table timestamp is before 4.2.1 then run the upgrade script + if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_2_1) { + if (UpgradeUtil.upgradeSequenceTable(metaConnection, nSaltBuckets, e.getTable())) { + metaConnection.removeTable(null, + PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_SCHEMA, + PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_TABLE, + MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP); + clearTableFromCache(ByteUtil.EMPTY_BYTE_ARRAY, + PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_SCHEMA_BYTES, + PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_TABLE_BYTES, + MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP); + clearTableRegionCache(PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_NAME_BYTES); } + nSequenceSaltBuckets = nSaltBuckets; + } else { + nSequenceSaltBuckets = getSaltBuckets(e); } - }); + } + try { + metaConnection.createStatement().executeUpdate( + QueryConstants.CREATE_STATS_TABLE_METADATA); + } catch (NewerTableAlreadyExistsException ignore) {} catch (TableAlreadyExistsException e) { + long currentServerSideTableTimeStamp = e.getTable().getTimeStamp(); + if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_3_0) { + metaConnection = addColumnsIfNotExists( + metaConnection, + SYSTEM_STATS_NAME, + MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP, + PhoenixDatabaseMetaData.GUIDE_POSTS_ROW_COUNT + " " + + PLong.INSTANCE.getSqlTypeName()); + } + } + try { + metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_FUNCTION_METADATA); + } catch (NewerTableAlreadyExistsException e) {} catch (TableAlreadyExistsException e) {} + if (SchemaUtil.isNamespaceMappingEnabled(PTableType.SYSTEM, + ConnectionQueryServicesImpl.this.getProps())) { + try { + metaConnection.createStatement().executeUpdate( + "CREATE SCHEMA IF NOT EXISTS " + + PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA); + } catch (NewerSchemaAlreadyExistsException e) {} + } + ConnectionQueryServicesImpl.this.upgradeRequired.set(false); + success = true; + } catch (UpgradeInProgressException | UpgradeNotRequiredException e) { + // don't set it as initializationException because otherwise client won't be able to retry + throw e; } catch (Exception e) { - Throwables.propagateIfInstanceOf(e, SQLException.class); - Throwables.propagate(e); + if (e instanceof SQLException) { + toThrow = (SQLException)e; + } else { + // wrap every other exception into a SQLException + toThrow = new SQLException(e); + } + } finally { + try { + if (metaConnection != null) { + metaConnection.close(); + } + } catch (SQLException e) { + if (toThrow != null) { + toThrow.setNextException(e); + } else { + toThrow = e; + } + } finally { + try { + restoreFromSnapshot(sysCatalogTableName, snapshotName, success); + } catch (SQLException e) { + if (toThrow != null) { + toThrow.setNextException(e); + } else { + toThrow = e; + } + } + if (toThrow != null) { throw toThrow; } + } + } + } + + private void createSnapshot(String snapshotName, String tableName) + throws SQLException { + HBaseAdmin admin = null; + SQLException sqlE = null; + try { + admin = getAdmin(); + admin.snapshot(snapshotName, tableName); + logger.info("Successfully created snapshot " + snapshotName + " for " + + tableName); + } catch (Exception e) { + sqlE = new SQLException(e); + } finally { + try { + if (admin != null) { + admin.close(); + } + } catch (Exception e) { + SQLException adminCloseEx = new SQLException(e); + if (sqlE == null) { + sqlE = adminCloseEx; + } else { + sqlE.setNextException(adminCloseEx); + } + } finally { + if (sqlE != null) { + throw sqlE; + } + } } } - private static class UpgradeInProgressException extends SQLException { - public UpgradeInProgressException(String upgradeFrom, String upgradeTo) { - super("Cluster is being concurrently upgraded from " + upgradeFrom + " to " + upgradeTo - + ". Please retry establishing connection.", SQLExceptionCode.CONCURRENT_UPGRADE_IN_PROGRESS - .getSQLState(), SQLExceptionCode.CONCURRENT_UPGRADE_IN_PROGRESS.getErrorCode()); + private void restoreFromSnapshot(String tableName, String snapshotName, + boolean success) throws SQLException { + boolean snapshotRestored = false; + boolean tableDisabled = false; + if (!success && snapshotName != null) { + SQLException sqlE = null; + HBaseAdmin admin = null; + try { + logger.warn("Starting restore of " + tableName + " using snapshot " + + snapshotName + " because upgrade failed"); + admin = getAdmin(); + admin.disableTable(tableName); + tableDisabled = true; + admin.restoreSnapshot(snapshotName); + snapshotRestored = true; + logger.warn("Successfully restored " + tableName + " using snapshot " + + snapshotName); + } catch (Exception e) { + sqlE = new SQLException(e); + } finally { + if (admin != null && tableDisabled) { + try { + admin.enableTable(tableName); + if (snapshotRestored) { + logger.warn("Successfully restored and enabled " + tableName + " using snapshot " + + snapshotName); + } else { + logger.warn("Successfully enabled " + tableName + " after restoring using snapshot " + + snapshotName + " failed. "); + } + } catch (Exception e1) { + SQLException enableTableEx = new SQLException(e1); + if (sqlE == null) { + sqlE = enableTableEx; + } else { + sqlE.setNextException(enableTableEx); + } + logger.error("Failure in enabling " + + tableName + + (snapshotRestored ? " after successfully restoring using snapshot" + + snapshotName + : " after restoring using snapshot " + + snapshotName + " failed. ")); + } finally { + try { + admin.close(); + } catch (Exception e2) { + SQLException adminCloseEx = new SQLException(e2); + if (sqlE == null) { + sqlE = adminCloseEx; + } else { + sqlE.setNextException(adminCloseEx); + } + } finally { + if (sqlE != null) { + throw sqlE; + } + } + } + } + } + } + } + + private void ensureSystemTablesUpgraded(ReadOnlyProps props) + throws SQLException, IOException, IllegalArgumentException, InterruptedException { + if (!SchemaUtil.isNamespaceMappingEnabled(PTableType.SYSTEM, props)) { return; } + HTableInterface metatable = null; + try (HBaseAdmin admin = getAdmin()) { + ensureNamespaceCreated(QueryConstants.SYSTEM_SCHEMA_NAME); + List<HTableDescriptor> tables = Arrays + .asList(admin.listTables(QueryConstants.SYSTEM_SCHEMA_NAME + "\\..*")); + List<String> tableNames = getTableNames(tables); + if (tableNames.size() == 0) { return; } + if (tableNames.size() > 4) { throw new IllegalArgumentException( + "Expected 4 system table only but found " + tableNames.size() + ":" + tableNames); } + byte[] mappedSystemTable = SchemaUtil + .getPhysicalName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, props).getName(); + metatable = getTable(mappedSystemTable); + if (tableNames.contains(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME)) { + if (!admin.tableExists(mappedSystemTable)) { + UpgradeUtil.mapTableToNamespace(admin, metatable, + PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, props, null, PTableType.SYSTEM, + null); + ConnectionQueryServicesImpl.this.removeTable(null, + PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, null, + MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0); + } + tableNames.remove(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME); + } + for (String table : tableNames) { + UpgradeUtil.mapTableToNamespace(admin, metatable, table, props, null, PTableType.SYSTEM, + null); + ConnectionQueryServicesImpl.this.removeTable(null, table, null, + MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_1_0); + } + if (!tableNames.isEmpty()) { + clearCache(); + } + } finally { + if (metatable != null) { + metatable.close(); + } + } + } + + /** + * Acquire distributed mutex of sorts to make sure only one JVM is able to run the upgrade code by + * making use of HBase's checkAndPut api. + * <p> + * This method was added as part of 4.8.1 release. For clients upgrading to 4.8.1, the old value in the + * cell will be null i.e. the {@value QueryConstants#UPGRADE_MUTEX} column will be non-existent. For client's + * upgrading to a release newer than 4.8.1 the existing cell value will be non-null. The client which + * wins the race will end up setting the cell value to the {@value MetaDataProtocol#MIN_SYSTEM_TABLE_TIMESTAMP} + * for the release. + * </p> + * + * @return true if client won the race, false otherwise + * @throws IOException + * @throws SQLException + */ + @VisibleForTesting + public boolean acquireUpgradeMutex(long currentServerSideTableTimestamp, byte[] sysCatalogTableName) throws IOException, + SQLException { + Preconditions.checkArgument(currentServerSideTableTimestamp < MIN_SYSTEM_TABLE_TIMESTAMP); + try (HTableInterface sysCatalogTable = getTable(sysCatalogTableName)) { + byte[] row = SchemaUtil.getTableKey(null, PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA, + PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE); + byte[] family = PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES; + byte[] qualifier = QueryConstants.UPGRADE_MUTEX; + byte[] oldValue = currentServerSideTableTimestamp < MIN_SYSTEM_TABLE_TIMESTAMP_4_8_1 ? null + : PLong.INSTANCE.toBytes(currentServerSideTableTimestamp); + byte[] newValue = PLong.INSTANCE.toBytes(MIN_SYSTEM_TABLE_TIMESTAMP); + // Note that the timestamp for this put doesn't really matter since UPGRADE_MUTEX column isn't used + // to calculate SYSTEM.CATALOG's server side timestamp. + Put put = new Put(row); + put.addColumn(family, qualifier, newValue); + boolean acquired = sysCatalogTable.checkAndPut(row, family, qualifier, oldValue, put); + if (!acquired) { throw new UpgradeInProgressException( + getVersion(currentServerSideTableTimestamp), getVersion(MIN_SYSTEM_TABLE_TIMESTAMP)); } + return true; } } @@ -3879,4 +3967,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement public void invalidateStats(ImmutableBytesPtr tableName) { this.tableStatsCache.invalidate(Objects.requireNonNull(tableName)); } + + @Override + public boolean isUpgradeRequired() { + return upgradeRequired.get(); + } }
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e90feaa3/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java index 560b5d9..337e43c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java @@ -650,4 +650,12 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple public void invalidateStats(ImmutableBytesPtr tableName) { this.tableStatsCache.invalidate(Objects.requireNonNull(tableName)); } + + @Override + public void upgradeSystemTables(String url, Properties props) throws SQLException {} + + @Override + public boolean isUpgradeRequired() { + return false; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/e90feaa3/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java index 99ad59c..81517e3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java @@ -345,4 +345,14 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple public void invalidateStats(ImmutableBytesPtr tableName) { getDelegate().invalidateStats(tableName); } + + @Override + public void upgradeSystemTables(String url, Properties props) throws SQLException { + getDelegate().upgradeSystemTables(url, props); + } + + @Override + public boolean isUpgradeRequired() { + return getDelegate().isUpgradeRequired(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/e90feaa3/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index 8cd009a..51a18d4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -224,6 +224,7 @@ public interface QueryServices extends SQLCloseable { public static final String INDEX_ASYNC_BUILD_ENABLED = "phoenix.index.async.build.enabled"; public static final String CLIENT_CACHE_ENCODING = "phoenix.table.client.cache.encoding"; + public static final String AUTO_UPGRADE_ENABLED = "phoenix.autoupgrade.enabled"; /** * Get executor service used for parallel scans */ http://git-wip-us.apache.org/repos/asf/phoenix/blob/e90feaa3/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index 669bcd2..9b87361 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -20,6 +20,7 @@ package org.apache.phoenix.query; import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD; import static org.apache.phoenix.query.QueryServices.ALLOW_ONLINE_TABLE_SCHEMA_UPDATE; import static org.apache.phoenix.query.QueryServices.ALLOW_VIEWS_ADD_NEW_CF_BASE_TABLE; +import static org.apache.phoenix.query.QueryServices.AUTO_UPGRADE_ENABLED; import static org.apache.phoenix.query.QueryServices.CALL_QUEUE_PRODUCER_ATTRIB_NAME; import static org.apache.phoenix.query.QueryServices.CALL_QUEUE_ROUND_ROBIN_ATTRIB; import static org.apache.phoenix.query.QueryServices.COLLECT_REQUEST_LEVEL_METRICS; @@ -257,6 +258,7 @@ public class QueryServicesOptions { public static final boolean DEFAULT_INDEX_ASYNC_BUILD_ENABLED = true; public static final String DEFAULT_CLIENT_CACHE_ENCODING = PTableRefFactory.Encoding.OBJECT.toString(); + public static final boolean DEFAULT_AUTO_UPGRADE_ENABLED = true; @SuppressWarnings("serial") public static final Set<String> DEFAULT_QUERY_SERVER_SKIP_WORDS = new HashSet<String>() { @@ -334,7 +336,8 @@ public class QueryServicesOptions { .setIfUnset(RENEW_LEASE_THREAD_POOL_SIZE, DEFAULT_RENEW_LEASE_THREAD_POOL_SIZE) .setIfUnset(IS_NAMESPACE_MAPPING_ENABLED, DEFAULT_IS_NAMESPACE_MAPPING_ENABLED) .setIfUnset(IS_SYSTEM_TABLE_MAPPED_TO_NAMESPACE, DEFAULT_IS_SYSTEM_TABLE_MAPPED_TO_NAMESPACE) - .setIfUnset(LOCAL_INDEX_CLIENT_UPGRADE_ATTRIB, DEFAULT_LOCAL_INDEX_CLIENT_UPGRADE); + .setIfUnset(LOCAL_INDEX_CLIENT_UPGRADE_ATTRIB, DEFAULT_LOCAL_INDEX_CLIENT_UPGRADE) + .setIfUnset(AUTO_UPGRADE_ENABLED, DEFAULT_AUTO_UPGRADE_ENABLED); // HBase sets this to 1, so we reset it to something more appropriate. // Hopefully HBase will change this, because we can't know if a user set // it to 1, so we'll change it. http://git-wip-us.apache.org/repos/asf/phoenix/blob/e90feaa3/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index 28ed11d..73f1501 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -3406,7 +3406,7 @@ public class MetaDataClient { String indexTenantId = entry.getKey(); Properties props = new Properties(connection.getClientInfo()); props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, indexTenantId); - try (PhoenixConnection tenantConn = DriverManager.getConnection(connection.getURL(), props).unwrap(PhoenixConnection.class)) { + try (PhoenixConnection tenantConn = new PhoenixConnection(connection, connection.getQueryServices(), props)) { PostDDLCompiler dropCompiler = new PostDDLCompiler(tenantConn); tenantConn.getQueryServices().updateData(dropCompiler.compile(entry.getValue(), null, null, Collections.<PColumn>emptyList(), ts)); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/e90feaa3/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java b/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java index a8e80ab..fea6d61 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java @@ -38,16 +38,23 @@ import org.apache.commons.configuration.SubsetConfiguration; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableNotDisabledException; import org.apache.hadoop.metrics2.AbstractMetric; import org.apache.hadoop.metrics2.MetricsRecord; import org.apache.hadoop.metrics2.MetricsSink; import org.apache.hadoop.metrics2.MetricsTag; +import org.apache.phoenix.compile.MutationPlan; +import org.apache.phoenix.execute.MutationState; +import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; +import org.apache.phoenix.jdbc.PhoenixPreparedStatement; import org.apache.phoenix.metrics.MetricInfo; import org.apache.phoenix.metrics.Metrics; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.trace.util.Tracing; +import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.QueryUtil; import com.google.common.annotations.VisibleForTesting; @@ -93,7 +100,7 @@ public class PhoenixMetricsSink implements MetricsSink { private Connection conn; private String table; - + public PhoenixMetricsSink() { LOG.info("Writing tracing metrics to phoenix table"); @@ -133,14 +140,25 @@ public class PhoenixMetricsSink implements MetricsSink { } } } - + private void initializeInternal(Connection conn, String tableName) throws SQLException { this.conn = conn; - // ensure that the target table already exists - createTable(conn, tableName); + if (!traceTableExists(conn, tableName)) { + createTable(conn, tableName); + } + this.table = tableName; } - + + private boolean traceTableExists(Connection conn, String traceTableName) throws SQLException { + try { + PhoenixRuntime.getTable(conn, traceTableName); + return true; + } catch (TableNotFoundException e) { + return false; + } + } + /** * Used for <b>TESTING ONLY</b> * Initialize the connection and setup the table to use the @@ -183,10 +201,8 @@ public class PhoenixMetricsSink implements MetricsSink { // tables created as transactional tables, make these table non // transactional PhoenixDatabaseMetaData.TRANSACTIONAL + "=" + Boolean.FALSE; -; PreparedStatement stmt = conn.prepareStatement(ddl); stmt.execute(); - this.table = table; } @Override @@ -281,7 +297,12 @@ public class PhoenixMetricsSink implements MetricsSink { for (String tag : variableValues) { ps.setString(index++, tag); } - ps.execute(); + // Not going through the standard route of using statement.execute() as that code path + // is blocked if the metadata hasn't been been upgraded to the new minor release. + MutationPlan plan = ps.unwrap(PhoenixPreparedStatement.class).compileMutation(stmt); + MutationState state = conn.unwrap(PhoenixConnection.class).getMutationState(); + MutationState newState = plan.execute(); + state.join(newState); } catch (SQLException e) { LOG.error("Could not write metric: \n" + record + " to prepared statement:\n" + stmt, e); http://git-wip-us.apache.org/repos/asf/phoenix/blob/e90feaa3/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java index a690dd8..764d135 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java @@ -172,7 +172,6 @@ public class PhoenixRuntime { AUTO_COMMIT_ATTRIB, CONSISTENCY_ATTRIB, REQUEST_METRIC_ATTRIB, - NO_UPGRADE_ATTRIB }; /** @@ -215,6 +214,7 @@ public class PhoenixRuntime { props.setProperty(QueryServices.LOCAL_INDEX_CLIENT_UPGRADE_ATTRIB, "false"); } conn = DriverManager.getConnection(jdbcUrl, props).unwrap(PhoenixConnection.class); + conn.setRunningUpgrade(true); if (execCmd.isMapNamespace()) { String srcTable = execCmd.getSrcTable(); System.out.println("Starting upgrading table:" + srcTable + "... please don't kill it in between!!"); http://git-wip-us.apache.org/repos/asf/phoenix/blob/e90feaa3/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java index 6d8e00d..bab52a4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java @@ -310,7 +310,7 @@ public final class QueryUtil { } /** - * @return {@link PhoenixConnection} with NO_UPGRADE_ATTRIB set so that we don't initiate server upgrade + * @return {@link PhoenixConnection} with {@value UpgradeUtil#RUN_UPGRADE} set so that we don't initiate server upgrade */ public static Connection getConnectionOnServer(Configuration conf) throws ClassNotFoundException, SQLException { @@ -318,12 +318,12 @@ public final class QueryUtil { } /** - * @return {@link PhoenixConnection} with NO_UPGRADE_ATTRIB set so that we don't initiate server upgrade + * @return {@link PhoenixConnection} with {@value UpgradeUtil#DO_NOT_UPGRADE} set so that we don't initiate metadata upgrade. */ public static Connection getConnectionOnServer(Properties props, Configuration conf) throws ClassNotFoundException, SQLException { - props.setProperty(PhoenixRuntime.NO_UPGRADE_ATTRIB, Boolean.TRUE.toString()); + UpgradeUtil.doNotUpgradeOnFirstConnection(props); return getConnection(props, conf); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/e90feaa3/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java index 8bc3e63..cddebb7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java @@ -132,7 +132,12 @@ public class UpgradeUtil { private static final Logger logger = LoggerFactory.getLogger(UpgradeUtil.class); private static final byte[] SEQ_PREFIX_BYTES = ByteUtil.concat(QueryConstants.SEPARATOR_BYTE_ARRAY, Bytes.toBytes("_SEQ_")); public static final byte[] UPGRADE_TO_4_7_COLUMN_NAME = Bytes.toBytes("UPGRADE_TO_4_7"); - + /** + * Attribute for Phoenix's internal purposes only. When this attribute is set on a phoenix connection, then + * the upgrade code for upgrading the cluster to the new minor release is not triggered. Note that presence + * of this attribute overrides a true value for {@value QueryServices#AUTO_UPGRADE_ENABLED}. + */ + private static final String DO_NOT_UPGRADE = "DoNotUpgrade"; public static String UPSERT_BASE_COLUMN_COUNT_IN_HEADER_ROW = "UPSERT " + "INTO SYSTEM.CATALOG " + "(TENANT_ID, TABLE_SCHEM, TABLE_NAME, COLUMN_NAME, COLUMN_FAMILY, BASE_COLUMN_COUNT) " @@ -174,7 +179,7 @@ public class UpgradeUtil { + " FROM " + SYSTEM_CATALOG_SCHEMA + "." + SYSTEM_CATALOG_TABLE + " WHERE " + COLUMN_FAMILY + " = ? AND " + LINK_TYPE + " = " + LinkType.PHYSICAL_TABLE.getSerializedValue() + " AND ( " + TABLE_TYPE + "=" + "'" + PTableType.VIEW.getSerializedValue() + "' OR " + TABLE_TYPE + " IS NULL) ORDER BY "+TENANT_ID; - + private UpgradeUtil() { } @@ -1498,7 +1503,7 @@ public class UpgradeUtil { } throw new SQLException(buf.toString()); } - PhoenixConnection upgradeConn = new PhoenixConnection(conn, true); + PhoenixConnection upgradeConn = new PhoenixConnection(conn, true, true); try { upgradeConn.setAutoCommit(true); for (PTable table : tablesNeedingUpgrading) { @@ -1898,4 +1903,12 @@ public class UpgradeUtil { String upgradingFrom = getVersion(currentSystemTableTimestamp); return "SNAPSHOT_" + tableString + "_" + upgradingFrom + "_TO_" + CURRENT_CLIENT_VERSION + "_" + date; } + + public static boolean isNoUpgradeSet(Properties props) { + return Boolean.compare(true, Boolean.valueOf(props.getProperty(DO_NOT_UPGRADE))) == 0; + } + + public static void doNotUpgradeOnFirstConnection(Properties props) { + props.setProperty(DO_NOT_UPGRADE, String.valueOf(true)); + } } \ No newline at end of file