PHOENIX-4764 Cleanup metadata of child views for a base table that has been dropped
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/3903ad76 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/3903ad76 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/3903ad76 Branch: refs/heads/omid2 Commit: 3903ad768496d0a6517298a8f196863b87290e72 Parents: a48e42e Author: Kadir <kozde...@salesforce.com> Authored: Wed Sep 26 23:32:31 2018 -0700 Committer: Karan Mehta <karanmeht...@gmail.com> Committed: Fri Oct 26 14:40:48 2018 -0700 ---------------------------------------------------------------------- .../phoenix/end2end/BasePermissionsIT.java | 4 +- .../phoenix/end2end/DropTableWithViewsIT.java | 151 ++++++++++ .../end2end/QueryDatabaseMetaDataIT.java | 4 + .../end2end/TenantSpecificTablesDDLIT.java | 4 +- .../coprocessor/MetaDataEndpointImpl.java | 46 ++- .../phoenix/coprocessor/TaskRegionObserver.java | 292 +++++++++++++++++++ .../phoenix/jdbc/PhoenixDatabaseMetaData.java | 9 +- .../query/ConnectionQueryServicesImpl.java | 20 +- .../query/ConnectionlessQueryServicesImpl.java | 9 + .../apache/phoenix/query/QueryConstants.java | 17 +- .../org/apache/phoenix/query/QueryServices.java | 6 + .../phoenix/query/QueryServicesOptions.java | 4 + .../java/org/apache/phoenix/schema/PTable.java | 31 +- .../phoenix/schema/stats/StatisticsUtil.java | 2 + .../org/apache/phoenix/util/SchemaUtil.java | 10 + .../java/org/apache/phoenix/query/BaseTest.java | 1 + 16 files changed, 589 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/3903ad76/phoenix-core/src/it/java/org/apache/phoenix/end2end/BasePermissionsIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BasePermissionsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BasePermissionsIT.java index 88a942e..932ce9f 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BasePermissionsIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BasePermissionsIT.java @@ -428,7 +428,7 @@ public class BasePermissionsIT extends BaseTest { @Override public Object run() throws Exception { try (Connection conn = getConnection(); Statement stmt = conn.createStatement();) { - assertFalse(stmt.execute("DROP TABLE IF EXISTS " + tableName)); + assertFalse(stmt.execute(String.format("DROP TABLE IF EXISTS %s CASCADE", tableName))); } return null; } @@ -653,7 +653,7 @@ public class BasePermissionsIT extends BaseTest { @Override public Object run() throws Exception { try (Connection conn = getConnection(); Statement stmt = conn.createStatement();) { - assertFalse(stmt.execute("DROP VIEW " + viewName)); + assertFalse(stmt.execute(String.format("DROP VIEW %s CASCADE", viewName))); } return null; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/3903ad76/phoenix-core/src/it/java/org/apache/phoenix/end2end/DropTableWithViewsIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DropTableWithViewsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DropTableWithViewsIT.java new file mode 100644 index 0000000..9502218 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DropTableWithViewsIT.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end; + +import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.fail; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.util.Arrays; +import java.util.Collection; + +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.phoenix.coprocessor.TableViewFinderResult; +import org.apache.phoenix.coprocessor.ViewFinder; +import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; + +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.util.SchemaUtil; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +@RunWith(Parameterized.class) +public class DropTableWithViewsIT extends SplitSystemCatalogIT { + + private final boolean isMultiTenant; + private final boolean columnEncoded; + private final String TENANT_SPECIFIC_URL1 = getUrl() + ';' + TENANT_ID_ATTRIB + "=" + TENANT1; + + public DropTableWithViewsIT(boolean isMultiTenant, boolean columnEncoded) { + this.isMultiTenant = isMultiTenant; + this.columnEncoded = columnEncoded; + } + + @Parameters(name="DropTableWithViewsIT_multiTenant={0}, columnEncoded={1}") // name is used by failsafe as file name in reports + public static Collection<Boolean[]> data() { + return Arrays.asList(new Boolean[][] { + { false, false }, { false, true }, + { true, false }, { true, true } }); + } + + private String generateDDL(String format) { + return generateDDL("", format); + } + + private String generateDDL(String options, String format) { + StringBuilder optionsBuilder = new StringBuilder(options); + if (!columnEncoded) { + if (optionsBuilder.length() != 0) + optionsBuilder.append(","); + optionsBuilder.append("COLUMN_ENCODED_BYTES=0"); + } + if (isMultiTenant) { + if (optionsBuilder.length() !=0 ) + optionsBuilder.append(","); + optionsBuilder.append("MULTI_TENANT=true"); + } + return String.format(format, isMultiTenant ? "TENANT_ID VARCHAR NOT NULL, " : "", + isMultiTenant ? "TENANT_ID, " : "", optionsBuilder.toString()); + } + + @Test + public void testDropTableWithChildViews() throws Exception { + String baseTable = SchemaUtil.getTableName(SCHEMA1, generateUniqueName()); + try (Connection conn = DriverManager.getConnection(getUrl()); + Connection viewConn = + isMultiTenant ? DriverManager.getConnection(TENANT_SPECIFIC_URL1) : conn) { + String ddlFormat = + "CREATE TABLE IF NOT EXISTS " + baseTable + " (" + + " %s PK2 VARCHAR NOT NULL, V1 VARCHAR, V2 VARCHAR " + + " CONSTRAINT NAME_PK PRIMARY KEY (%s PK2)" + " ) %s"; + conn.createStatement().execute(generateDDL(ddlFormat)); + conn.commit(); + // Create a view tree (i.e., tree of views) with depth of 2 and fanout factor of 4 + for (int i = 0; i < 4; i++) { + String childView = SchemaUtil.getTableName(SCHEMA2, generateUniqueName()); + String childViewDDL = "CREATE VIEW " + childView + " AS SELECT * FROM " + baseTable; + viewConn.createStatement().execute(childViewDDL); + for (int j = 0; j < 4; j++) { + String grandChildView = SchemaUtil.getTableName(SCHEMA2, generateUniqueName()); + String grandChildViewDDL = "CREATE VIEW " + grandChildView + " AS SELECT * FROM " + childView; + viewConn.createStatement().execute(grandChildViewDDL); + } + } + // Drop the base table + String dropTable = String.format("DROP TABLE IF EXISTS %s CASCADE", baseTable); + conn.createStatement().execute(dropTable); + + // Wait for the tasks for dropping child views to complete. The depth of the view tree is 2, so we expect that + // this will be done in two task handling runs, i.e., in tree task handling interval at most in general + // by assuming that each non-root level will be processed in one interval. To be on the safe side, we will + // wait at most 10 intervals. + long halfTimeInterval = config.getLong(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, + QueryServicesOptions.DEFAULT_TASK_HANDLING_INTERVAL_MS)/2; + ResultSet rs = null; + boolean timedOut = true; + Thread.sleep(3 * halfTimeInterval); + for (int i = 3; i < 20; i++) { + rs = conn.createStatement().executeQuery("SELECT * " + + " FROM " + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME + + " WHERE " + PhoenixDatabaseMetaData.TASK_TYPE + " = " + + PTable.TaskType.DROP_CHILD_VIEWS.getSerializedValue()); + Thread.sleep(halfTimeInterval); + if (!rs.next()) { + timedOut = false; + break; + } + } + if (timedOut) { + fail("Drop child view task execution timed out!"); + } + // Views should be dropped by now + TableName linkTable = TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAME_BYTES); + TableViewFinderResult childViewsResult = new TableViewFinderResult(); + ViewFinder.findAllRelatives(getUtility().getConnection().getTable(linkTable), + HConstants.EMPTY_BYTE_ARRAY, + SchemaUtil.getSchemaNameFromFullName(baseTable).getBytes(), + SchemaUtil.getTableNameFromFullName(baseTable).getBytes(), + PTable.LinkType.CHILD_TABLE, + childViewsResult); + assertTrue(childViewsResult.getLinks().size() == 0); + // There should not be any orphan views + rs = conn.createStatement().executeQuery("SELECT * FROM " + PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME + + " WHERE " + PhoenixDatabaseMetaData.TABLE_SCHEM + " = '" + SCHEMA2 +"'"); + assertFalse(rs.next()); + } + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/3903ad76/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java index ab22b6d..226a2cc 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java @@ -186,6 +186,10 @@ public class QueryDatabaseMetaDataIT extends ParallelStatsDisabledIT { assertEquals(PhoenixDatabaseMetaData.SYSTEM_STATS_TABLE, rs.getString("TABLE_NAME")); assertEquals(PTableType.SYSTEM.toString(), rs.getString("TABLE_TYPE")); assertTrue(rs.next()); + assertEquals(SYSTEM_CATALOG_SCHEMA, rs.getString("TABLE_SCHEM")); + assertEquals(PhoenixDatabaseMetaData.SYSTEM_TASK_TABLE, rs.getString("TABLE_NAME")); + assertEquals(PTableType.SYSTEM.toString(), rs.getString("TABLE_TYPE")); + assertTrue(rs.next()); assertEquals(null, rs.getString("TABLE_SCHEM")); assertEquals(tableAName, rs.getString("TABLE_NAME")); assertEquals(PTableType.TABLE.toString(), rs.getString("TABLE_TYPE")); http://git-wip-us.apache.org/repos/asf/phoenix/blob/3903ad76/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java index 956b43c..85c9128 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java @@ -503,8 +503,10 @@ public class TenantSpecificTablesDDLIT extends BaseTenantSpecificTablesIT { assertTableMetaData(rs, PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA, PhoenixDatabaseMetaData.TYPE_SEQUENCE, PTableType.SYSTEM); assertTrue(rs.next()); assertTableMetaData(rs, SYSTEM_CATALOG_SCHEMA, PhoenixDatabaseMetaData.SYSTEM_STATS_TABLE, PTableType.SYSTEM); + assertTrue(rs.next()); + assertTableMetaData(rs, SYSTEM_CATALOG_SCHEMA, PhoenixDatabaseMetaData.SYSTEM_TASK_TABLE, PTableType.SYSTEM); assertFalse(rs.next()); - + rs = meta.getTables(null, "", StringUtil.escapeLike(tenantTable2), new String[] {TABLE.getValue().getString()}); assertFalse(rs.next()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/3903ad76/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java index aa78b1b..424b6d6 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java @@ -86,6 +86,7 @@ import java.security.PrivilegedExceptionAction; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Statement; +import java.sql.Timestamp; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -1936,7 +1937,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso table = loadTable(env, tableKey, cacheKey, clientTimeStamp, HConstants.LATEST_TIMESTAMP, clientVersion); } catch (ParentTableNotFoundException e) { - dropChildMetadata(e.getParentSchemaName(), e.getParentTableName(), e.getParentTenantId()); + dropChildViews(env, e.getParentTenantId(), e.getParentSchemaName(), e.getParentTableName()); } if (table != null) { if (table.getTimeStamp() < clientTimeStamp) { @@ -1961,7 +1962,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso // check if the table was dropped, but had child views that were have not yet // been cleaned up by compaction if (!Bytes.toString(schemaName).equals(QueryConstants.SYSTEM_SCHEMA_NAME)) { - dropChildMetadata(schemaName, tableName, tenantIdBytes); + dropChildViews(env, tenantIdBytes, schemaName, tableName); } byte[] parentTableKey = null; @@ -2328,19 +2329,31 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso } } - private void dropChildMetadata(byte[] schemaName, byte[] tableName, byte[] tenantIdBytes) + public static void dropChildViews(RegionCoprocessorEnvironment env, byte[] tenantIdBytes, byte[] schemaName, byte[] tableName) throws IOException, SQLException, ClassNotFoundException { - TableViewFinderResult childViewsResult = new TableViewFinderResult(); - findAllChildViews(tenantIdBytes, schemaName, tableName, childViewsResult); + Table hTable = + ServerUtil.getHTableForCoprocessorScan(env, + SchemaUtil.getPhysicalTableName( + PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAME_BYTES, + env.getConfiguration()).getName()); + TableViewFinderResult childViewsResult = ViewFinder.findRelatedViews(hTable, tenantIdBytes, schemaName, tableName, + PTable.LinkType.CHILD_TABLE, HConstants.LATEST_TIMESTAMP); + if (childViewsResult.hasLinks()) { + for (TableInfo viewInfo : childViewsResult.getLinks()) { byte[] viewTenantId = viewInfo.getTenantId(); byte[] viewSchemaName = viewInfo.getSchemaName(); byte[] viewName = viewInfo.getTableName(); + if (logger.isDebugEnabled()) { + logger.debug("dropChildViews :" + Bytes.toString(schemaName) + "." + Bytes.toString(tableName) + + " -> " + Bytes.toString(viewSchemaName) + "." + Bytes.toString(viewName) + + "with tenant id :" + Bytes.toString(viewTenantId)); + } Properties props = new Properties(); if (viewTenantId != null && viewTenantId.length != 0) props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, Bytes.toString(viewTenantId)); - try (PhoenixConnection connection = QueryUtil.getConnectionOnServer(env.getConfiguration()) + try (PhoenixConnection connection = QueryUtil.getConnectionOnServer(props, env.getConfiguration()) .unwrap(PhoenixConnection.class)) { MetaDataClient client = new MetaDataClient(connection); org.apache.phoenix.parse.TableName viewTableName = org.apache.phoenix.parse.TableName @@ -2351,7 +2364,6 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso } } } - private boolean execeededIndexQuota(PTableType tableType, PTable parentTable) { return PTableType.INDEX == tableType && parentTable.getIndexes().size() >= maxIndexesPerTable; } @@ -2514,6 +2526,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso } throw new IllegalStateException(msg); } + // drop rows from catalog on this region mutateRowsWithLocks(region, localMutations, Collections.<byte[]> emptySet(), HConstants.NO_NONCE, HConstants.NO_NONCE); @@ -2631,7 +2644,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso EnvironmentEdgeManager.currentTimeMillis(), null); } - if (tableType == PTableType.TABLE || tableType == PTableType.SYSTEM) { + if (tableType == PTableType.TABLE || tableType == PTableType.VIEW || tableType == PTableType.SYSTEM) { // check to see if the table has any child views try (Table hTable = env.getTable(SchemaUtil.getPhysicalTableName( @@ -2640,10 +2653,19 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso boolean hasChildViews = ViewFinder.hasChildViews(hTable, tenantId, schemaName, tableName, clientTimeStamp); - if (hasChildViews && !isCascade) { - // DROP without CASCADE on tables with child views is not permitted - return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, - EnvironmentEdgeManager.currentTimeMillis(), null); + if (hasChildViews) { + if (!isCascade) { + // DROP without CASCADE on tables with child views is not permitted + return new MetaDataMutationResult(MutationCode.UNALLOWED_TABLE_MUTATION, + EnvironmentEdgeManager.currentTimeMillis(), null); + } + try { + PhoenixConnection conn = QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class); + TaskRegionObserver.addTask(conn, PTable.TaskType.DROP_CHILD_VIEWS, Bytes.toString(tenantId), + Bytes.toString(schemaName), Bytes.toString(tableName), this.accessCheckEnabled); + } catch (Throwable t) { + logger.error("Adding a task to drop child views failed!", t); + } } } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/3903ad76/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java new file mode 100644 index 0000000..ca71961 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java @@ -0,0 +1,292 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.coprocessor; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.sql.Types; + +import java.util.Properties; +import java.util.TimerTask; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import javax.annotation.concurrent.GuardedBy; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.hadoop.hbase.ipc.RpcServer.Call; +import org.apache.hadoop.hbase.ipc.RpcUtil; +import org.apache.hadoop.hbase.security.User; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.schema.MetaDataClient; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTable.TaskType; + +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.QueryUtil; + + +/** + * Coprocessor for task related operations. This coprocessor would only be registered + * to SYSTEM.TASK table + */ + +public class TaskRegionObserver extends BaseRegionObserver { + public static final Log LOG = LogFactory.getLog(TaskRegionObserver.class); + protected ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(TaskType.values().length); + private long timeInterval = QueryServicesOptions.DEFAULT_TASK_HANDLING_INTERVAL_MS; + private long timeMaxInterval = QueryServicesOptions.DEFAULT_TASK_HANDLING_MAX_INTERVAL_MS; + @GuardedBy("TaskRegionObserver.class") + // initial delay before the first task is handled + private static final long initialDelay = 10000; // 10 secs + + @Override + public void preClose(final ObserverContext<RegionCoprocessorEnvironment> c, + boolean abortRequested) { + executor.shutdownNow(); + } + + @Override + public void start(CoprocessorEnvironment env) throws IOException { + Configuration config = env.getConfiguration(); + timeInterval = + config.getLong( + QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, + QueryServicesOptions.DEFAULT_TASK_HANDLING_INTERVAL_MS); + timeMaxInterval = + config.getLong( + QueryServices.TASK_HANDLING_MAX_INTERVAL_MS_ATTRIB, + QueryServicesOptions.DEFAULT_TASK_HANDLING_MAX_INTERVAL_MS); + } + + @Override + public void postOpen(ObserverContext<RegionCoprocessorEnvironment> e) { + final RegionCoprocessorEnvironment env = e.getEnvironment(); + + // turn off verbose deprecation logging + Logger deprecationLogger = Logger.getLogger("org.apache.hadoop.conf.Configuration.deprecation"); + if (deprecationLogger != null) { + deprecationLogger.setLevel(Level.WARN); + } + + DropChildViewsTask task = new DropChildViewsTask(e.getEnvironment(), timeMaxInterval); + executor.scheduleWithFixedDelay(task, initialDelay, timeInterval, TimeUnit.MILLISECONDS); + } + + private static void mutateSystemTaskTable(final PhoenixConnection conn, final PreparedStatement stmt, boolean accessCheckEnabled) + throws IOException { + // we need to mutate SYSTEM.TASK with HBase/login user if access is enabled. + if (accessCheckEnabled) { + User.runAsLoginUser(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + final Call rpcContext = RpcUtil.getRpcContext(); + // setting RPC context as null so that user can be reset + try { + RpcUtil.setRpcContext(null); + stmt.execute(); + conn.commit(); + } catch (SQLException e) { + throw new IOException(e); + } finally { + // setting RPC context back to original context of the RPC + RpcUtil.setRpcContext(rpcContext); + } + return null; + } + }); + } + else { + try { + stmt.execute(); + conn.commit(); + } catch (SQLException e) { + throw new IOException(e); + } + } + } + + public static void addTask(PhoenixConnection conn, TaskType taskType, String tenantId, String schemaName, + String tableName, boolean accessCheckEnabled) + throws IOException { + PreparedStatement stmt = null; + try { + stmt = conn.prepareStatement("UPSERT INTO " + + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME + " ( " + + PhoenixDatabaseMetaData.TASK_TYPE + ", " + + PhoenixDatabaseMetaData.TENANT_ID + ", " + + PhoenixDatabaseMetaData.TABLE_SCHEM + ", " + + PhoenixDatabaseMetaData.TABLE_NAME + " ) VALUES(?,?,?,?)"); + stmt.setByte(1, taskType.getSerializedValue()); + if (tenantId != null) { + stmt.setString(2, tenantId); + } else { + stmt.setNull(2, Types.VARCHAR); + } + if (schemaName != null) { + stmt.setString(3, schemaName); + } else { + stmt.setNull(3, Types.VARCHAR); + } + stmt.setString(4, tableName); + } catch (SQLException e) { + throw new IOException(e); + } + mutateSystemTaskTable(conn, stmt, accessCheckEnabled); + } + + public static void deleteTask(PhoenixConnection conn, TaskType taskType, Timestamp ts, String tenantId, + String schemaName, String tableName, boolean accessCheckEnabled) throws IOException { + PreparedStatement stmt = null; + try { + stmt = conn.prepareStatement("DELETE FROM " + + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME + + " WHERE " + PhoenixDatabaseMetaData.TASK_TYPE + " = ? AND " + + PhoenixDatabaseMetaData.TASK_TS + " = ? AND " + + PhoenixDatabaseMetaData.TENANT_ID + (tenantId == null ? " IS NULL " : " = '" + tenantId + "'") + " AND " + + PhoenixDatabaseMetaData.TABLE_SCHEM + (schemaName == null ? " IS NULL " : " = '" + schemaName + "'") + " AND " + + PhoenixDatabaseMetaData.TABLE_NAME + " = ?"); + stmt.setByte(1, taskType.getSerializedValue()); + stmt.setTimestamp(2, ts); + stmt.setString(3, tableName); + } catch (SQLException e) { + throw new IOException(e); + } + mutateSystemTaskTable(conn, stmt, accessCheckEnabled); + } + + /** + * Task runs periodically to clean up task of child views whose parent is dropped + * + */ + public static class DropChildViewsTask extends TimerTask { + private RegionCoprocessorEnvironment env; + private long timeMaxInterval; + private boolean accessCheckEnabled; + + public DropChildViewsTask(RegionCoprocessorEnvironment env, long timeMaxInterval) { + this.env = env; + this.accessCheckEnabled = env.getConfiguration().getBoolean(QueryServices.PHOENIX_ACLS_ENABLED, + QueryServicesOptions.DEFAULT_PHOENIX_ACLS_ENABLED); + this.timeMaxInterval = timeMaxInterval; + } + + @Override + public void run() { + PhoenixConnection connForTask = null; + Timestamp timestamp = null; + String tenantId = null; + byte[] tenantIdBytes; + String schemaName= null; + byte[] schemaNameBytes; + String tableName = null; + byte[] tableNameBytes; + PhoenixConnection pconn; + try { + String taskQuery = "SELECT " + + PhoenixDatabaseMetaData.TASK_TS + ", " + + PhoenixDatabaseMetaData.TENANT_ID + ", " + + PhoenixDatabaseMetaData.TABLE_SCHEM + ", " + + PhoenixDatabaseMetaData.TABLE_NAME + + " FROM " + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME + + " WHERE "+ PhoenixDatabaseMetaData.TASK_TYPE + " = " + PTable.TaskType.DROP_CHILD_VIEWS.getSerializedValue(); + + connForTask = QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class); + PreparedStatement taskStatement = connForTask.prepareStatement(taskQuery); + ResultSet rs = taskStatement.executeQuery(); + while (rs.next()) { + try { + // delete child views only if the parent table is deleted from the system catalog + timestamp = rs.getTimestamp(1); + tenantId = rs.getString(2); + tenantIdBytes= rs.getBytes(2); + schemaName= rs.getString(3); + schemaNameBytes = rs.getBytes(3); + tableName= rs.getString(4); + tableNameBytes = rs.getBytes(4); + + if (tenantId != null) { + Properties tenantProps = new Properties(); + tenantProps.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId); + pconn = QueryUtil.getConnectionOnServer(tenantProps, env.getConfiguration()).unwrap(PhoenixConnection.class); + + } + else { + pconn = QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class); + } + + MetaDataProtocol.MetaDataMutationResult result = new MetaDataClient(pconn).updateCache(pconn.getTenantId(), + schemaName, tableName, true); + if (result.getMutationCode() != MetaDataProtocol.MutationCode.TABLE_ALREADY_EXISTS) { + MetaDataEndpointImpl.dropChildViews(env, tenantIdBytes, schemaNameBytes, tableNameBytes); + } else if (System.currentTimeMillis() < timeMaxInterval + timestamp.getTime()) { + // skip this task as it has not been expired and its parent table has not been dropped yet + LOG.info("Skipping a child view drop task. The parent table has not been dropped yet : " + + schemaName + "." + tableName + + " with tenant id " + (tenantId == null ? " IS NULL" : tenantId) + + " and timestamp " + timestamp.toString()); + continue; + } + else { + LOG.warn(" A drop child view task has expired and will be removed from the system task table : " + + schemaName + "." + tableName + + " with tenant id " + (tenantId == null ? " IS NULL" : tenantId) + + " and timestamp " + timestamp.toString()); + } + + deleteTask(connForTask, PTable.TaskType.DROP_CHILD_VIEWS, timestamp, tenantId, schemaName, + tableName, this.accessCheckEnabled); + } + catch (Throwable t) { + LOG.warn("Exception while dropping a child view task. " + + "It will be retried in the next system task table scan : " + + schemaName + "." + tableName + + " with tenant id " + (tenantId == null ? " IS NULL" : tenantId) + + " and timestamp " + timestamp.toString(), t); + } + } + } catch (Throwable t) { + LOG.error("DropChildViewsTask failed!", t); + } finally { + if (connForTask != null) { + try { + connForTask.close(); + } catch (SQLException ignored) { + LOG.debug("DropChildViewsTask can't close connection", ignored); + } + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/3903ad76/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java index 52dfe99..3ff62e2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java @@ -213,6 +213,10 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData { public static final byte[] INDEX_TYPE_BYTES = Bytes.toBytes(INDEX_TYPE); public static final String LINK_TYPE = "LINK_TYPE"; public static final byte[] LINK_TYPE_BYTES = Bytes.toBytes(LINK_TYPE); + public static final String TASK_TYPE = "TASK_TYPE"; + public static final byte[] TASK_TYPE_BYTES = Bytes.toBytes(TASK_TYPE); + public static final String TASK_TS = "TASK_TS"; + public static final byte[] TASK_TS_BYTES = Bytes.toBytes(TASK_TS); public static final String ARRAY_SIZE = "ARRAY_SIZE"; public static final byte[] ARRAY_SIZE_BYTES = Bytes.toBytes(ARRAY_SIZE); public static final String VIEW_CONSTANT = "VIEW_CONSTANT"; @@ -361,7 +365,10 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData { public static final byte[] SYSTEM_CHILD_LINK_NAME_BYTES = Bytes.toBytes(SYSTEM_CHILD_LINK_NAME); public static final TableName SYSTEM_LINK_HBASE_TABLE_NAME = TableName.valueOf(SYSTEM_CHILD_LINK_NAME); - + public static final String SYSTEM_TASK_TABLE = "TASK"; + public static final String SYSTEM_TASK_NAME = SchemaUtil.getTableName(SYSTEM_CATALOG_SCHEMA, SYSTEM_TASK_TABLE); + public static final byte[] SYSTEM_TASK_NAME_BYTES = Bytes.toBytes(SYSTEM_TASK_NAME); + public static final TableName SYSTEM_TASK_HBASE_TABLE_NAME = TableName.valueOf(SYSTEM_TASK_NAME); //SYSTEM:LOG public static final String SYSTEM_LOG_TABLE = "LOG"; public static final String QUERY_ID = "QUERY_ID"; http://git-wip-us.apache.org/repos/asf/phoenix/blob/3903ad76/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index b68637a..ebcb7b9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -152,6 +152,7 @@ import org.apache.phoenix.coprocessor.MetaDataRegionObserver; import org.apache.phoenix.coprocessor.ScanRegionObserver; import org.apache.phoenix.coprocessor.SequenceRegionObserver; import org.apache.phoenix.coprocessor.ServerCachingEndpointImpl; +import org.apache.phoenix.coprocessor.TaskRegionObserver; import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver; import org.apache.phoenix.coprocessor.generated.MetaDataProtos; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.AddColumnRequest; @@ -958,7 +959,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement if (!descriptor.hasCoprocessor(SequenceRegionObserver.class.getName())) { descriptor.addCoprocessor(SequenceRegionObserver.class.getName(), null, priority, null); } + } else if (SchemaUtil.isTaskTable(tableName)) { + if(!descriptor.hasCoprocessor(TaskRegionObserver.class.getName())) { + descriptor.addCoprocessor(TaskRegionObserver.class.getName(), null, priority, null); } + } if (isTransactional) { Class<? extends RegionObserver> coprocessorClass = provider.getTransactionProvider().getCoprocessor(); @@ -2812,6 +2817,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement return setSystemDDLProperties(QueryConstants.CREATE_MUTEX_METADTA); } + protected String getTaskDDL() { + return setSystemDDLProperties(QueryConstants.CREATE_TASK_METADATA); + } + private String setSystemDDLProperties(String ddl) { return String.format(ddl, props.getInt(DEFAULT_SYSTEM_MAX_VERSIONS_ATTRIB, QueryServicesOptions.DEFAULT_SYSTEM_MAX_VERSIONS), @@ -3038,6 +3047,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement try { metaConnection.createStatement().executeUpdate(getMutexDDL()); } catch (TableAlreadyExistsException e) {} + try { + metaConnection.createStatement().executeUpdate(getTaskDDL()); + } catch (TableAlreadyExistsException e) {} + // Catch the IOException to log the error message and then bubble it up for the client to retry. } @@ -3496,6 +3509,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement try { metaConnection.createStatement().executeUpdate(getMutexDDL()); } catch (NewerTableAlreadyExistsException e) {} catch (TableAlreadyExistsException e) {} + try { + metaConnection.createStatement().executeUpdate(getTaskDDL()); + } catch (NewerTableAlreadyExistsException e) {} catch (TableAlreadyExistsException e) {} // In case namespace mapping is enabled and system table to system namespace mapping is also enabled, // create an entry for the SYSTEM namespace in the SYSCAT table, so that GRANT/REVOKE commands can work @@ -3733,8 +3749,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement // No tables exist matching "SYSTEM\..*", they are all already in "SYSTEM:.*" if (tableNames.size() == 0) { return; } // Try to move any remaining tables matching "SYSTEM\..*" into "SYSTEM:" - if (tableNames.size() > 7) { - logger.warn("Expected 7 system tables but found " + tableNames.size() + ":" + tableNames); + if (tableNames.size() > 8) { + logger.warn("Expected 8 system tables but found " + tableNames.size() + ":" + tableNames); } byte[] mappedSystemTable = SchemaUtil http://git-wip-us.apache.org/repos/asf/phoenix/blob/3903ad76/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java index bbfacc0..655de0d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java @@ -185,6 +185,10 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple return setSystemDDLProperties(QueryConstants.CREATE_MUTEX_METADTA); } + protected String getTaskDDL() { + return setSystemDDLProperties(QueryConstants.CREATE_TASK_METADATA); + } + private String setSystemDDLProperties(String ddl) { return String.format(ddl, props.getInt(DEFAULT_SYSTEM_MAX_VERSIONS_ATTRIB, QueryServicesOptions.DEFAULT_SYSTEM_MAX_VERSIONS), @@ -388,6 +392,11 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple .executeUpdate(getMutexDDL()); } catch (NewerTableAlreadyExistsException ignore) { } + try { + metaConnection.createStatement() + .executeUpdate(getTaskDDL()); + } catch (NewerTableAlreadyExistsException ignore) { + } } catch (SQLException e) { sqlE = e; } finally { http://git-wip-us.apache.org/repos/asf/phoenix/blob/3903ad76/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java index bbff343..c45fec9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java @@ -347,8 +347,8 @@ public interface QueryConstants { // Links from parent to child views are stored in a separate table for // scalability - public static final String CREATE_CHILD_LINK_METADATA = "CREATE TABLE " + SYSTEM_CATALOG_SCHEMA + ".\"" - + SYSTEM_CHILD_LINK_TABLE + "\"(\n" + + public static final String CREATE_CHILD_LINK_METADATA = "CREATE TABLE " + SYSTEM_CATALOG_SCHEMA + ".\"" + + SYSTEM_CHILD_LINK_TABLE + "\"(\n" + // PK columns TENANT_ID + " VARCHAR NULL," + TABLE_SCHEM + " VARCHAR NULL," + TABLE_NAME + " VARCHAR NOT NULL," + COLUMN_NAME + " VARCHAR NULL," + COLUMN_FAMILY + " VARCHAR NULL," + LINK_TYPE + " UNSIGNED_TINYINT,\n" @@ -371,4 +371,17 @@ public interface QueryConstants { HColumnDescriptor.KEEP_DELETED_CELLS + "=%s,\n" + PhoenixDatabaseMetaData.TRANSACTIONAL + "=" + Boolean.FALSE; + public static final String CREATE_TASK_METADATA = + "CREATE TABLE " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_TASK_TABLE + "\"(\n" + + // PK columns + TASK_TYPE + " UNSIGNED_TINYINT NOT NULL," + + TASK_TS + " TIMESTAMP NOT NULL," + + TENANT_ID + " VARCHAR NULL," + + TABLE_SCHEM + " VARCHAR NULL," + + TABLE_NAME + " VARCHAR NOT NULL,\n" + + "CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (" + TASK_TYPE + "," + TASK_TS + " ROW_TIMESTAMP," + TENANT_ID + "," + TABLE_SCHEM + "," + + TABLE_NAME + "))\n" + + HConstants.VERSIONS + "=%s,\n" + + HColumnDescriptor.KEEP_DELETED_CELLS + "=%s,\n" + + PhoenixDatabaseMetaData.TRANSACTIONAL + "=" + Boolean.FALSE; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/3903ad76/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index 337bb05..d06f07c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -320,6 +320,12 @@ public interface QueryServices extends SQLCloseable { public static final String SYSTEM_CATALOG_SPLITTABLE = "phoenix.system.catalog.splittable"; + // The parameters defined for handling task stored in table SYSTEM.TASK + // The time interval between periodic scans of table SYSTEM.TASK + public static final String TASK_HANDLING_INTERVAL_MS_ATTRIB = "phoenix.task.handling.interval.ms"; + // The maximum time for a task to stay in table SYSTEM.TASK + public static final String TASK_HANDLING_MAX_INTERVAL_MS_ATTRIB = "phoenix.task.handling.maxInterval.ms"; + /** * Get executor service used for parallel scans */ http://git-wip-us.apache.org/repos/asf/phoenix/blob/3903ad76/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index 35dbe3a..a6a73f7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -340,6 +340,10 @@ public class QueryServicesOptions { public static final int DEFAULT_UPDATE_CACHE_FREQUENCY = 0; public static final int DEFAULT_SMALL_SCAN_THRESHOLD = 100; + // default system task handling interval in milliseconds + public static final long DEFAULT_TASK_HANDLING_INTERVAL_MS = 60*1000; // 1 min + public static final long DEFAULT_TASK_HANDLING_MAX_INTERVAL_MS = 30*60*1000; // 30 minutes + @SuppressWarnings("serial") public static final Set<String> DEFAULT_QUERY_SERVER_SKIP_WORDS = new HashSet<String>() { { http://git-wip-us.apache.org/repos/asf/phoenix/blob/3903ad76/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java index 8cbf757..6dfe411 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java @@ -192,7 +192,36 @@ public interface PTable extends PMetaDataEntity { return LinkType.values()[serializedValue-1]; } } - + + public enum TaskType { + DROP_CHILD_VIEWS((byte)1); + + private final byte[] byteValue; + private final byte serializedValue; + + TaskType(byte serializedValue) { + this.serializedValue = serializedValue; + this.byteValue = Bytes.toBytes(this.name()); + } + + public byte[] getBytes() { + return byteValue; + } + + public byte getSerializedValue() { + return this.serializedValue; + } + public static TaskType getDefault() { + return DROP_CHILD_VIEWS; + } + public static TaskType fromSerializedValue(byte serializedValue) { + if (serializedValue < 1 || serializedValue > TaskType.values().length) { + throw new IllegalArgumentException("Invalid TaskType " + serializedValue); + } + return TaskType.values()[serializedValue-1]; + } + } + public enum ImmutableStorageScheme implements ColumnValueEncoderDecoderSupplier { ONE_CELL_PER_COLUMN((byte)1) { @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/3903ad76/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java index 4a758b7..23b1fcc 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java @@ -62,10 +62,12 @@ public class StatisticsUtil { DISABLE_STATS.add(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_FUNCTION_NAME)); DISABLE_STATS.add(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_NAME)); DISABLE_STATS.add(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME)); + DISABLE_STATS.add(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_TASK_NAME)); DISABLE_STATS.add(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES,true)); DISABLE_STATS.add(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_FUNCTION_NAME_BYTES,true)); DISABLE_STATS.add(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_NAME_BYTES,true)); DISABLE_STATS.add(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES,true)); + DISABLE_STATS.add(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_TASK_NAME_BYTES,true)); } private StatisticsUtil() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/3903ad76/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java index ec2eb14..24a0e12 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java @@ -540,6 +540,12 @@ public class SchemaUtil { || Bytes.compareTo(tableName, SchemaUtil .getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_NAME_BYTES, true).getName()) == 0; } + + public static boolean isTaskTable(byte[] tableName) { + return Bytes.compareTo(tableName, PhoenixDatabaseMetaData.SYSTEM_TASK_NAME_BYTES) == 0 + || Bytes.compareTo(tableName, SchemaUtil + .getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_TASK_NAME_BYTES, true).getName()) == 0; + } public static boolean isChildLinkTable(byte[] tableName) { return Bytes.compareTo(tableName, SYSTEM_CHILD_LINK_NAME_BYTES) == 0 || Bytes.compareTo(tableName, @@ -550,6 +556,10 @@ public class SchemaUtil { return PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_NAME.equals(table.getName().getString()); } + public static boolean isTaskTable(PTable table) { + return PhoenixDatabaseMetaData.SYSTEM_TASK_NAME.equals(table.getName().getString()); + } + public static boolean isMetaTable(PTable table) { return PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA.equals(table.getSchemaName().getString()) && PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE.equals(table.getTableName().getString()); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/3903ad76/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java index 5ca247b..e6e3936 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java @@ -620,6 +620,7 @@ public abstract class BaseTest { conf.setInt("hbase.assignment.zkevent.workers", 5); conf.setInt("hbase.assignment.threads.max", 5); conf.setInt("hbase.catalogjanitor.interval", 5000); + conf.setInt(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, 1000); conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); conf.setInt(NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY, 1); return conf;