http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e432be7/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateSchemaCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateSchemaCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateSchemaCompiler.java new file mode 100644 index 0000000..40d1fee --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateSchemaCompiler.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.compile; + +import java.sql.SQLException; +import java.util.Collections; + +import org.apache.phoenix.execute.MutationState; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixStatement; +import org.apache.phoenix.parse.CreateSchemaStatement; +import org.apache.phoenix.schema.MetaDataClient; + +public class CreateSchemaCompiler { + private final PhoenixStatement statement; + + public CreateSchemaCompiler(PhoenixStatement statement) { + this.statement = statement; + } + + public MutationPlan compile(final CreateSchemaStatement create) throws SQLException { + final PhoenixConnection connection = statement.getConnection(); + final StatementContext context = new StatementContext(statement); + final MetaDataClient client = new MetaDataClient(connection); + + return new BaseMutationPlan(context, create.getOperation()) { + + @Override + public MutationState execute() throws SQLException { + try { + return client.createSchema(create); + } finally { + if (client.getConnection() != connection) { + client.getConnection().close(); + } + } + } + + @Override + public ExplainPlan getExplainPlan() throws SQLException { + return new ExplainPlan(Collections.singletonList("CREATE SCHEMA")); + } + + @Override + public StatementContext getContext() { + return context; + } + }; + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e432be7/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java index ffe9621..a2fc371 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java @@ -42,6 +42,7 @@ import org.apache.phoenix.parse.FamilyWildcardParseNode; import org.apache.phoenix.parse.JoinTableNode; import org.apache.phoenix.parse.NamedTableNode; import org.apache.phoenix.parse.PFunction; +import org.apache.phoenix.parse.PSchema; import org.apache.phoenix.parse.ParseNode; import org.apache.phoenix.parse.ParseNodeFactory; import org.apache.phoenix.parse.SelectStatement; @@ -51,6 +52,7 @@ import org.apache.phoenix.parse.TableNode; import org.apache.phoenix.parse.TableNodeVisitor; import org.apache.phoenix.parse.TableWildcardParseNode; import org.apache.phoenix.parse.UDFParseNode; +import org.apache.phoenix.parse.UseSchemaStatement; import org.apache.phoenix.parse.WildcardParseNode; import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.query.QueryConstants; @@ -73,6 +75,7 @@ import org.apache.phoenix.schema.PTableImpl; import org.apache.phoenix.schema.PTableKey; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.ProjectedColumn; +import org.apache.phoenix.schema.SchemaNotFoundException; import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.schema.TableRef; @@ -124,17 +127,36 @@ public class FromCompiler { public PFunction resolveFunction(String functionName) throws SQLException { throw new FunctionNotFoundException(functionName); - }; + } public boolean hasUDFs() { return false; - }; + } + + @Override + public PSchema resolveSchema(String schemaName) throws SQLException { + throw new SchemaNotFoundException(schemaName); + } + + @Override + public List<PSchema> getSchemas() { + return Collections.emptyList(); + } + }; public static ColumnResolver getResolverForCreation(final CreateTableStatement statement, final PhoenixConnection connection) throws SQLException { + TableName baseTable = statement.getBaseTableName(); + String schemaName; if (baseTable == null) { + if (SchemaUtil.isSchemaCheckRequired(statement.getTableType(), connection.getQueryServices().getProps())) { + schemaName = statement.getTableName().getSchemaName(); + if (schemaName != null) { + new SchemaResolver(connection, statement.getTableName().getSchemaName(), true); + } + } return EMPTY_TABLE_RESOLVER; } NamedTableNode tableNode = NamedTableNode.create(null, baseTable, Collections.<ColumnDef>emptyList()); @@ -148,7 +170,9 @@ public class FromCompiler { // A tenant-specific connection may not create a mapped VIEW. if (connection.getTenantId() == null && statement.getTableType() == PTableType.VIEW) { ConnectionQueryServices services = connection.getQueryServices(); - byte[] fullTableName = SchemaUtil.getTableNameAsBytes(baseTable.getSchemaName(), baseTable.getTableName()); + byte[] fullTableName = SchemaUtil.getPhysicalName( + SchemaUtil.getTableNameAsBytes(baseTable.getSchemaName(), baseTable.getTableName()), + connection.getQueryServices().getProps()).getName(); HTableInterface htable = null; try { htable = services.getTable(fullTableName); @@ -190,6 +214,11 @@ public class FromCompiler { return visitor; } + public static ColumnResolver getResolverForSchema(UseSchemaStatement statement, PhoenixConnection connection) + throws SQLException { + return new SchemaResolver(connection, SchemaUtil.normalizeIdentifier(statement.getSchemaName()), true); + } + public static ColumnResolver getResolver(NamedTableNode tableNode, PhoenixConnection connection) throws SQLException { SingleTableColumnResolver visitor = new SingleTableColumnResolver(connection, tableNode, true); return visitor; @@ -254,9 +283,47 @@ public class FromCompiler { return new ProjectedTableColumnResolver(projectedTable, connection, udfParseNodes); } + private static class SchemaResolver extends BaseColumnResolver { + private final List<PSchema> schemas; + + public SchemaResolver(PhoenixConnection conn, String schemaName, boolean updateCacheImmediately) + throws SQLException { + super(conn, 0); + schemaName = connection.getSchema() != null && schemaName == null ? connection.getSchema() : schemaName; + schemas = ImmutableList.of(createSchemaRef(schemaName, updateCacheImmediately)); + } + + @Override + public List<TableRef> getTables() { + throw new UnsupportedOperationException(); + } + + @Override + public TableRef resolveTable(String schemaName, String tableName) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public ColumnRef resolveColumn(String schemaName, String tableName, String colName) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public PSchema resolveSchema(String schemaName) throws SQLException { + return schemas.get(0); + } + + @Override + public List<PSchema> getSchemas() { + return schemas; + } + + } + private static class SingleTableColumnResolver extends BaseColumnResolver { private final List<TableRef> tableRefs; private final String alias; + private final List<PSchema> schemas; public SingleTableColumnResolver(PhoenixConnection connection, NamedTableNode table, long timeStamp, Map<String, UDFParseNode> udfParseNodes) throws SQLException { super(connection, 0, false, udfParseNodes); @@ -265,12 +332,18 @@ public class FromCompiler { if (def.getColumnDefName().getFamilyName() != null) { families.add(new PColumnFamilyImpl(PNameFactory.newName(def.getColumnDefName().getFamilyName()),Collections.<PColumn>emptyList())); } - } - Long scn = connection.getSCN(); - PTable theTable = new PTableImpl(connection.getTenantId(), table.getName().getSchemaName(), table.getName().getTableName(), scn == null ? HConstants.LATEST_TIMESTAMP : scn, families); + } + Long scn = connection.getSCN(); + String schema = table.getName().getSchemaName(); + if (connection.getSchema() != null) { + schema = schema != null ? schema : connection.getSchema(); + } + PTable theTable = new PTableImpl(connection.getTenantId(), schema, table.getName().getTableName(), + scn == null ? HConstants.LATEST_TIMESTAMP : scn, families); theTable = this.addDynamicColumns(table.getDynamicColumns(), theTable); alias = null; tableRefs = ImmutableList.of(new TableRef(alias, theTable, timeStamp, !table.getDynamicColumns().isEmpty())); + schemas = ImmutableList.of(new PSchema(theTable.getSchemaName().toString(), timeStamp)); } public SingleTableColumnResolver(PhoenixConnection connection, NamedTableNode tableNode, boolean updateCacheImmediately) throws SQLException { @@ -282,29 +355,36 @@ public class FromCompiler { Map<String, UDFParseNode> udfParseNodes) throws SQLException { super(connection, tsAddition, updateCacheImmediately, udfParseNodes); alias = tableNode.getAlias(); - TableRef tableRef = createTableRef(tableNode, updateCacheImmediately); + TableRef tableRef = createTableRef(tableNode.getName().getSchemaName(), tableNode, updateCacheImmediately); + PSchema schema = new PSchema(tableRef.getTable().getSchemaName().toString()); tableRefs = ImmutableList.of(tableRef); + schemas = ImmutableList.of(schema); } public SingleTableColumnResolver(PhoenixConnection connection, TableRef tableRef) { super(connection, 0); alias = tableRef.getTableAlias(); tableRefs = ImmutableList.of(tableRef); + schemas = ImmutableList.of(new PSchema(tableRef.getTable().getSchemaName().toString())); } public SingleTableColumnResolver(PhoenixConnection connection, TableRef tableRef, Map<String, UDFParseNode> udfParseNodes) throws SQLException { super(connection, 0, false, udfParseNodes); alias = tableRef.getTableAlias(); tableRefs = ImmutableList.of(tableRef); + schemas = ImmutableList.of(new PSchema(tableRef.getTable().getSchemaName().toString())); } public SingleTableColumnResolver(TableRef tableRef) throws SQLException { super(null, 0); alias = tableRef.getTableAlias(); tableRefs = ImmutableList.of(tableRef); + schemas = ImmutableList.of(new PSchema(tableRef.getTable().getSchemaName().toString())); } - @Override + + + @Override public List<TableRef> getTables() { return tableRefs; } @@ -364,6 +444,16 @@ public class FromCompiler { : tableRef.getTable().getColumn(colName); return new ColumnRef(tableRef, column.getPosition()); } + + @Override + public PSchema resolveSchema(String schemaName) throws SQLException { + return schemas.get(0); + } + + @Override + public List<PSchema> getSchemas() { + return schemas; + } } private static abstract class BaseColumnResolver implements ColumnResolver { @@ -399,9 +489,37 @@ public class FromCompiler { } } - protected TableRef createTableRef(NamedTableNode tableNode, boolean updateCacheImmediately) throws SQLException { + protected PSchema createSchemaRef(String schemaName, boolean updateCacheImmediately) throws SQLException { + long timeStamp = QueryConstants.UNSET_TIMESTAMP; + PSchema theSchema = null; + MetaDataClient client = new MetaDataClient(connection); + if (updateCacheImmediately) { + MetaDataMutationResult result = client.updateCache(schemaName, true); + timeStamp = TransactionUtil.getResolvedTimestamp(connection, result); + theSchema = result.getSchema(); + if (theSchema == null) { throw new SchemaNotFoundException(schemaName, timeStamp); } + } else { + try { + theSchema = connection.getSchema(new PTableKey(null, schemaName)); + } catch (SchemaNotFoundException e1) {} + // We always attempt to update the cache in the event of a + // SchemaNotFoundException + if (theSchema == null) { + MetaDataMutationResult result = client.updateCache(schemaName, true); + if (result.wasUpdated()) { + timeStamp = TransactionUtil.getResolvedTimestamp(connection, result); + theSchema = result.getSchema(); + } + } + if (theSchema == null) { throw new SchemaNotFoundException(schemaName, timeStamp); } + } + return theSchema; + } + + protected TableRef createTableRef(String connectionSchemaName, NamedTableNode tableNode, boolean updateCacheImmediately) throws SQLException { String tableName = tableNode.getName().getTableName(); String schemaName = tableNode.getName().getSchemaName(); + schemaName = connection.getSchema() != null && schemaName == null ? connection.getSchema() : schemaName; long timeStamp = QueryConstants.UNSET_TIMESTAMP; String fullTableName = SchemaUtil.getTableName(schemaName, tableName); PName tenantId = connection.getTenantId(); @@ -573,11 +691,17 @@ public class FromCompiler { private static class MultiTableColumnResolver extends BaseColumnResolver implements TableNodeVisitor<Void> { protected final ListMultimap<String, TableRef> tableMap; protected final List<TableRef> tables; + private String connectionSchemaName; private MultiTableColumnResolver(PhoenixConnection connection, int tsAddition) { super(connection, tsAddition); tableMap = ArrayListMultimap.<String, TableRef> create(); tables = Lists.newArrayList(); + try { + connectionSchemaName = connection.getSchema(); + } catch (SQLException e) { + // ignore + } } private MultiTableColumnResolver(PhoenixConnection connection, int tsAddition, Map<String, UDFParseNode> udfParseNodes) throws SQLException { @@ -606,7 +730,7 @@ public class FromCompiler { @Override public Void visit(NamedTableNode tableNode) throws SQLException { String alias = tableNode.getAlias(); - TableRef tableRef = createTableRef(tableNode, true); + TableRef tableRef = createTableRef(connectionSchemaName, tableNode, true); PTable theTable = tableRef.getTable(); if (alias != null) { @@ -648,11 +772,11 @@ public class FromCompiler { null, 0, 0, true, position++, SortOrder.ASC, null, null, false, null, false, false); columns.add(column); } - PTable t = PTableImpl.makePTable(null, PName.EMPTY_NAME, PName.EMPTY_NAME, - PTableType.SUBQUERY, null, MetaDataProtocol.MIN_TABLE_TIMESTAMP, PTable.INITIAL_SEQ_NUM, - null, null, columns, null, null, Collections.<PTable>emptyList(), - false, Collections.<PName>emptyList(), null, null, false, false, false, null, - null, null, false, false, 0, 0L); + PTable t = PTableImpl.makePTable(null, PName.EMPTY_NAME, PName.EMPTY_NAME, PTableType.SUBQUERY, null, + MetaDataProtocol.MIN_TABLE_TIMESTAMP, PTable.INITIAL_SEQ_NUM, null, null, columns, null, null, + Collections.<PTable> emptyList(), false, Collections.<PName> emptyList(), null, null, false, false, + false, null, null, null, false, false, 0, 0L, SchemaUtil + .isNamespaceMappingEnabled(PTableType.SUBQUERY, connection.getQueryServices().getProps())); String alias = subselectNode.getAlias(); TableRef tableRef = new TableRef(alias, t, MetaDataProtocol.MIN_TABLE_TIMESTAMP, false); @@ -750,6 +874,18 @@ public class FromCompiler { } } } + + @Override + public PSchema resolveSchema(String schemaName) throws SQLException { + // TODO Auto-generated method stub + return null; + } + + @Override + public List<PSchema> getSchemas() { + // TODO Auto-generated method stub + return null; + } } private static class ProjectedTableColumnResolver extends MultiTableColumnResolver { http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e432be7/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java index e6c5970..93b32de 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java @@ -1302,11 +1302,12 @@ public class JoinCompiler { return PTableImpl.makePTable(left.getTenantId(), left.getSchemaName(), PNameFactory.newName(SchemaUtil.getTableName(left.getName().getString(), right.getName().getString())), - left.getType(), left.getIndexState(), left.getTimeStamp(), left.getSequenceNumber(), left.getPKName(), - left.getBucketNum(), merged,left.getParentSchemaName(), left.getParentTableName(), left.getIndexes(), - left.isImmutableRows(), Collections.<PName>emptyList(), null, null, PTable.DEFAULT_DISABLE_WAL, - left.isMultiTenant(), left.getStoreNulls(), left.getViewType(), left.getViewIndexId(), left.getIndexType(), - left.rowKeyOrderOptimizable(), left.isTransactional(), left.getUpdateCacheFrequency(), left.getIndexDisableTimestamp()); + left.getType(), left.getIndexState(), left.getTimeStamp(), left.getSequenceNumber(), left.getPKName(), + left.getBucketNum(), merged, left.getParentSchemaName(), left.getParentTableName(), left.getIndexes(), + left.isImmutableRows(), Collections.<PName> emptyList(), null, null, PTable.DEFAULT_DISABLE_WAL, + left.isMultiTenant(), left.getStoreNulls(), left.getViewType(), left.getViewIndexId(), + left.getIndexType(), left.rowKeyOrderOptimizable(), left.isTransactional(), + left.getUpdateCacheFrequency(), left.getIndexDisableTimestamp(), left.isNamespaceMapped()); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e432be7/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java index 752e1a5..a786438 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java @@ -35,6 +35,7 @@ import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.jdbc.PhoenixStatement.Operation; import org.apache.phoenix.parse.PFunction; +import org.apache.phoenix.parse.PSchema; import org.apache.phoenix.parse.SelectStatement; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.AmbiguousColumnException; @@ -44,6 +45,7 @@ import org.apache.phoenix.schema.ColumnRef; import org.apache.phoenix.schema.FunctionNotFoundException; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PColumnFamily; +import org.apache.phoenix.schema.SchemaNotFoundException; import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.tuple.Tuple; @@ -118,6 +120,16 @@ public class PostDDLCompiler { public boolean hasUDFs() { return false; } + + @Override + public PSchema resolveSchema(String schemaName) throws SQLException { + throw new SchemaNotFoundException(schemaName); + } + + @Override + public List<PSchema> getSchemas() { + throw new UnsupportedOperationException(); + } }, scan, @@ -177,7 +189,17 @@ public class PostDDLCompiler { @Override public boolean hasUDFs() { return false; - }; + } + + @Override + public List<PSchema> getSchemas() { + throw new UnsupportedOperationException(); + } + + @Override + public PSchema resolveSchema(String schemaName) throws SQLException { + throw new SchemaNotFoundException(schemaName); + } }; PhoenixStatement statement = new PhoenixStatement(connection); StatementContext context = new StatementContext(statement, resolver, scan, new SequenceManager(statement)); http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e432be7/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java index 4be78a9..a9199c6 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java @@ -147,12 +147,13 @@ public class TupleProjectionCompiler { projectedColumns.add(column); } - return PTableImpl.makePTable(table.getTenantId(), table.getSchemaName(), table.getTableName(), PTableType.PROJECTED, - table.getIndexState(), table.getTimeStamp(), table.getSequenceNumber(), table.getPKName(), - table.getBucketNum(), projectedColumns, table.getParentSchemaName(), - table.getParentName(), table.getIndexes(), table.isImmutableRows(), Collections.<PName>emptyList(), null, null, - table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), - table.getIndexType(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp()); + return PTableImpl.makePTable(table.getTenantId(), table.getSchemaName(), table.getTableName(), + PTableType.PROJECTED, table.getIndexState(), table.getTimeStamp(), table.getSequenceNumber(), + table.getPKName(), table.getBucketNum(), projectedColumns, table.getParentSchemaName(), + table.getParentName(), table.getIndexes(), table.isImmutableRows(), Collections.<PName> emptyList(), + null, null, table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), + table.getViewIndexId(), + table.getIndexType(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.isNamespaceMapped()); } public static PTable createProjectedTable(TableRef tableRef, List<ColumnRef> sourceColumnRefs, boolean retainPKColumns) throws SQLException { @@ -175,11 +176,12 @@ public class TupleProjectionCompiler { projectedColumns.add(column); } return PTableImpl.makePTable(table.getTenantId(), PROJECTED_TABLE_SCHEMA, table.getName(), PTableType.PROJECTED, - null, table.getTimeStamp(), table.getSequenceNumber(), table.getPKName(), - retainPKColumns ? table.getBucketNum() : null, projectedColumns, null, - null, Collections.<PTable>emptyList(), table.isImmutableRows(), Collections.<PName>emptyList(), null, null, - table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), - null, table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp()); + null, table.getTimeStamp(), table.getSequenceNumber(), table.getPKName(), + retainPKColumns ? table.getBucketNum() : null, projectedColumns, null, null, + Collections.<PTable> emptyList(), table.isImmutableRows(), Collections.<PName> emptyList(), null, null, + table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), + table.getViewIndexId(), null, table.rowKeyOrderOptimizable(), table.isTransactional(), + table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.isNamespaceMapped()); } // For extracting column references from single select statement http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e432be7/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java index b25baf7..32fa8fe 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java @@ -36,6 +36,7 @@ import org.apache.phoenix.schema.PTableImpl; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.types.PDataType; +import org.apache.phoenix.util.SchemaUtil; public class UnionCompiler { private static final PName UNION_FAMILY_NAME = PNameFactory.newName("unionFamilyName"); @@ -79,10 +80,12 @@ public class UnionCompiler { projectedColumns.add(projectedColumn); } Long scn = statement.getConnection().getSCN(); - PTable tempTable = PTableImpl.makePTable(statement.getConnection().getTenantId(), UNION_SCHEMA_NAME, UNION_TABLE_NAME, - PTableType.SUBQUERY, null, HConstants.LATEST_TIMESTAMP, scn == null ? HConstants.LATEST_TIMESTAMP : scn, null, null, - projectedColumns, null, null, null, - true, null, null, null, true, true, true, null, null, null, false, false, 0, 0L); + PTable tempTable = PTableImpl.makePTable(statement.getConnection().getTenantId(), UNION_SCHEMA_NAME, + UNION_TABLE_NAME, PTableType.SUBQUERY, null, HConstants.LATEST_TIMESTAMP, + scn == null ? HConstants.LATEST_TIMESTAMP : scn, null, null, projectedColumns, null, null, null, true, + null, null, null, true, true, true, null, null, null, false, false, 0, 0L, + SchemaUtil.isNamespaceMappingEnabled(PTableType.SUBQUERY, + statement.getConnection().getQueryServices().getProps())); TableRef tableRef = new TableRef(null, tempTable, 0, false); return tableRef; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e432be7/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java index 770917f..1fd4f44 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java @@ -39,6 +39,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_STATE_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_TYPE_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_ARRAY_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_CONSTANT_BYTES; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_NAMESPACE_MAPPED_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_ROW_TIMESTAMP_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_VIEW_REFERENCED_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.JAR_PATH_BYTES; @@ -127,11 +128,14 @@ import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheRespons import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearTableFromCacheRequest; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearTableFromCacheResponse; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.CreateFunctionRequest; +import org.apache.phoenix.coprocessor.generated.MetaDataProtos.CreateSchemaRequest; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.CreateTableRequest; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.DropColumnRequest; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.DropFunctionRequest; +import org.apache.phoenix.coprocessor.generated.MetaDataProtos.DropSchemaRequest; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.DropTableRequest; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetFunctionsRequest; +import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetSchemaRequest; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetTableRequest; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetVersionRequest; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetVersionResponse; @@ -156,6 +160,7 @@ import org.apache.phoenix.metrics.Metrics; import org.apache.phoenix.parse.LiteralParseNode; import org.apache.phoenix.parse.PFunction; import org.apache.phoenix.parse.PFunction.FunctionArgument; +import org.apache.phoenix.parse.PSchema; import org.apache.phoenix.parse.ParseNode; import org.apache.phoenix.parse.SQLParser; import org.apache.phoenix.protobuf.ProtobufUtil; @@ -259,6 +264,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso private static final KeyValue ROW_KEY_ORDER_OPTIMIZABLE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, ROW_KEY_ORDER_OPTIMIZABLE_BYTES); private static final KeyValue TRANSACTIONAL_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, TRANSACTIONAL_BYTES); private static final KeyValue UPDATE_CACHE_FREQUENCY_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, UPDATE_CACHE_FREQUENCY_BYTES); + private static final KeyValue IS_NAMESPACE_MAPPED_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, + TABLE_FAMILY_BYTES, IS_NAMESPACE_MAPPED_BYTES); private static final List<KeyValue> TABLE_KV_COLUMNS = Arrays.<KeyValue>asList( EMPTY_KEYVALUE_KV, @@ -282,7 +289,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso BASE_COLUMN_COUNT_KV, ROW_KEY_ORDER_OPTIMIZABLE_KV, TRANSACTIONAL_KV, - UPDATE_CACHE_FREQUENCY_KV + UPDATE_CACHE_FREQUENCY_KV, + IS_NAMESPACE_MAPPED_KV ); static { Collections.sort(TABLE_KV_COLUMNS, KeyValue.COMPARATOR); @@ -309,6 +317,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso private static final int TRANSACTIONAL_INDEX = TABLE_KV_COLUMNS.indexOf(TRANSACTIONAL_KV); private static final int UPDATE_CACHE_FREQUENCY_INDEX = TABLE_KV_COLUMNS.indexOf(UPDATE_CACHE_FREQUENCY_KV); private static final int INDEX_DISABLE_TIMESTAMP = TABLE_KV_COLUMNS.indexOf(INDEX_DISABLE_TIMESTAMP_KV); + private static final int IS_NAMESPACE_MAPPED_INDEX = TABLE_KV_COLUMNS.indexOf(IS_NAMESPACE_MAPPED_KV); // KeyValues for Column private static final KeyValue DECIMAL_DIGITS_KV = KeyValue.createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, DECIMAL_DIGITS_BYTES); @@ -573,6 +582,39 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso } } + private List<PSchema> buildSchemas(List<byte[]> keys, HRegion region, long clientTimeStamp, + ImmutableBytesPtr cacheKey) throws IOException, SQLException { + List<KeyRange> keyRanges = Lists.newArrayListWithExpectedSize(keys.size()); + for (byte[] key : keys) { + byte[] stopKey = ByteUtil.concat(key, QueryConstants.SEPARATOR_BYTE_ARRAY); + ByteUtil.nextKey(stopKey, stopKey.length); + keyRanges.add(PVarbinary.INSTANCE.getKeyRange(key, true, stopKey, false)); + } + Scan scan = new Scan(); + scan.setTimeRange(MIN_TABLE_TIMESTAMP, clientTimeStamp); + ScanRanges scanRanges = ScanRanges.createPointLookup(keyRanges); + scanRanges.initializeScan(scan); + scan.setFilter(scanRanges.getSkipScanFilter()); + + RegionScanner scanner = region.getScanner(scan); + + Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache(); + List<PSchema> schemas = new ArrayList<PSchema>(); + PSchema schema = null; + try { + for (int i = 0; i < keys.size(); i++) { + schema = null; + schema = getSchema(scanner, clientTimeStamp); + if (schema == null) { return null; } + metaDataCache.put(cacheKey, schema); + schemas.add(schema); + } + return schemas; + } finally { + scanner.close(); + } + } + private void addIndexToTable(PName tenantId, PName schemaName, PName indexName, PName tableName, long clientTimeStamp, List<PTable> indexes) throws IOException, SQLException { byte[] key = SchemaUtil.getTableKey(tenantId == null ? ByteUtil.EMPTY_BYTE_ARRAY : tenantId.getBytes(), schemaName.getBytes(), indexName.getBytes()); PTable indexTable = doGetTable(key, clientTimeStamp); @@ -853,7 +895,11 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso Cell indexDisableTimestampKv = tableKeyValues[INDEX_DISABLE_TIMESTAMP]; long indexDisableTimestamp = indexDisableTimestampKv == null ? 0L : PLong.INSTANCE.getCodec().decodeLong(indexDisableTimestampKv.getValueArray(), indexDisableTimestampKv.getValueOffset(), SortOrder.getDefault()); - + Cell isNamespaceMappedKv = tableKeyValues[IS_NAMESPACE_MAPPED_INDEX]; + boolean isNamespaceMapped = isNamespaceMappedKv == null ? false + : Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(isNamespaceMappedKv.getValueArray(), + isNamespaceMappedKv.getValueOffset(), isNamespaceMappedKv.getValueLength())); + List<PColumn> columns = Lists.newArrayListWithExpectedSize(columnCount); List<PTable> indexes = new ArrayList<PTable>(); List<PName> physicalTables = new ArrayList<PName>(); @@ -879,26 +925,50 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso addColumnToTable(results, colName, famName, colKeyValues, columns, saltBucketNum != null); } } - PName physicalTableName = physicalTables.isEmpty() ? PNameFactory.newName(SchemaUtil.getTableName( - schemaName.getString(), tableName.getString())) : physicalTables.get(0); + PName physicalTableName = physicalTables.isEmpty() ? PNameFactory.newName(SchemaUtil.getPhysicalTableName( + Bytes.toBytes(SchemaUtil.getTableName(schemaName.getBytes(), tableName.getBytes())), isNamespaceMapped) + .getNameAsString()) : physicalTables.get(0); PTableStats stats = PTableStats.EMPTY_STATS; if (tenantId == null) { HTableInterface statsHTable = null; try { - statsHTable = ServerUtil.getHTableForCoprocessorScan(env, PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES); + statsHTable = ServerUtil.getHTableForCoprocessorScan(env, + SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES, env.getConfiguration()) + .getName()); stats = StatisticsUtil.readStatistics(statsHTable, physicalTableName.getBytes(), clientTimeStamp); timeStamp = Math.max(timeStamp, stats.getTimestamp()); } catch (org.apache.hadoop.hbase.TableNotFoundException e) { - logger.warn(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME + " not online yet?"); + logger.warn(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES, + env.getConfiguration()) + " not online yet?"); } finally { if (statsHTable != null) statsHTable.close(); } } - return PTableImpl.makePTable(tenantId, schemaName, tableName, tableType, indexState, timeStamp, - tableSeqNum, pkName, saltBucketNum, columns, tableType == INDEX ? schemaName : null, - tableType == INDEX ? dataTableName : null, indexes, isImmutableRows, physicalTables, defaultFamilyName, viewStatement, - disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType, rowKeyOrderOptimizable, transactional, updateCacheFrequency, - stats, baseColumnCount, indexDisableTimestamp); + return PTableImpl.makePTable(tenantId, schemaName, tableName, tableType, indexState, timeStamp, tableSeqNum, + pkName, saltBucketNum, columns, tableType == INDEX ? schemaName : null, + tableType == INDEX ? dataTableName : null, indexes, isImmutableRows, physicalTables, defaultFamilyName, + viewStatement, disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType, + rowKeyOrderOptimizable, transactional, updateCacheFrequency, stats, baseColumnCount, + indexDisableTimestamp, isNamespaceMapped); + } + + private PSchema getSchema(RegionScanner scanner, long clientTimeStamp) throws IOException, SQLException { + List<Cell> results = Lists.newArrayList(); + scanner.next(results); + if (results.isEmpty()) { return null; } + + Cell keyValue = results.get(0); + byte[] keyBuffer = keyValue.getRowArray(); + int keyLength = keyValue.getRowLength(); + int keyOffset = keyValue.getRowOffset(); + PName tenantId = newPName(keyBuffer, keyOffset, keyLength); + int tenantIdLength = (tenantId == null) ? 0 : tenantId.getBytes().length; + if (tenantIdLength == 0) { + tenantId = null; + } + PName schemaName = newPName(keyBuffer, keyOffset + tenantIdLength + 1, keyLength - tenantIdLength - 1); + long timeStamp = keyValue.getTimestamp(); + return new PSchema(schemaName.getString(), timeStamp); } private PFunction getFunction(RegionScanner scanner, final boolean isReplace, long clientTimeStamp, List<Mutation> deleteMutationsForReplace) @@ -1070,6 +1140,30 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso return null; } + private PSchema buildDeletedSchema(byte[] key, ImmutableBytesPtr cacheKey, HRegion region, long clientTimeStamp) + throws IOException { + if (clientTimeStamp == HConstants.LATEST_TIMESTAMP) { return null; } + + Scan scan = MetaDataUtil.newTableRowsScan(key, clientTimeStamp, HConstants.LATEST_TIMESTAMP); + scan.setFilter(new FirstKeyOnlyFilter()); + scan.setRaw(true); + List<Cell> results = Lists.<Cell> newArrayList(); + try (RegionScanner scanner = region.getScanner(scan);) { + scanner.next(results); + } + // HBase ignores the time range on a raw scan (HBASE-7362) + if (!results.isEmpty() && results.get(0).getTimestamp() > clientTimeStamp) { + Cell kv = results.get(0); + if (kv.getTypeByte() == Type.Delete.getCode()) { + Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env) + .getMetaDataCache(); + PSchema schema = newDeletedSchemaMarker(kv.getTimestamp()); + metaDataCache.put(cacheKey, schema); + return schema; + } + } + return null; + } private static PTable newDeletedTableMarker(long timestamp) { return new PTableImpl(timestamp); @@ -1079,6 +1173,10 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso return new PFunction(timestamp); } + private static PSchema newDeletedSchemaMarker(long timestamp) { + return new PSchema(timestamp); + } + private static boolean isTableDeleted(PTable table) { return table.getName() == null; } @@ -1343,6 +1441,24 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso private static final byte[] PHYSICAL_TABLE_BYTES = new byte[] {PTable.LinkType.PHYSICAL_TABLE.getSerializedValue()}; + private PSchema loadSchema(RegionCoprocessorEnvironment env, byte[] key, ImmutableBytesPtr cacheKey, + long clientTimeStamp, long asOfTimeStamp) throws IOException, SQLException { + HRegion region = env.getRegion(); + Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache(); + PSchema schema = (PSchema)metaDataCache.getIfPresent(cacheKey); + // We always cache the latest version - fault in if not in cache + if (schema != null) { return schema; } + ArrayList<byte[]> arrayList = new ArrayList<byte[]>(1); + arrayList.add(key); + List<PSchema> schemas = buildSchemas(arrayList, region, asOfTimeStamp, cacheKey); + if (schemas != null) return schemas.get(0); + // if not found then check if newer schema already exists and add delete marker for timestamp + // found + if (schema == null + && (schema = buildDeletedSchema(key, cacheKey, region, clientTimeStamp)) != null) { return schema; } + return null; + } + /** * @param tableName parent table's name * Looks for whether child views exist for the table specified by table. @@ -1365,7 +1481,9 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso } SingleColumnValueFilter linkFilter = new SingleColumnValueFilter(TABLE_FAMILY_BYTES, LINK_TYPE_BYTES, CompareOp.EQUAL, linkTypeBytes); linkFilter.setFilterIfMissing(true); - byte[] suffix = ByteUtil.concat(QueryConstants.SEPARATOR_BYTE_ARRAY, SchemaUtil.getTableNameAsBytes(schemaName, tableName)); + byte[] suffix = ByteUtil.concat(QueryConstants.SEPARATOR_BYTE_ARRAY, SchemaUtil + .getPhysicalTableName(SchemaUtil.getTableNameAsBytes(schemaName, tableName), table.isNamespaceMapped()) + .getName()); SuffixFilter rowFilter = new SuffixFilter(suffix); Filter filter = new FilterList(linkFilter, rowFilter); scan.setFilter(filter); @@ -1378,7 +1496,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso // TableName systemCatalogTableName = region.getTableDesc().getTableName(); // HTableInterface hTable = env.getTable(systemCatalogTableName); // These deprecated calls work around the issue - HTableInterface hTable = ServerUtil.getHTableForCoprocessorScan(env, PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES); + HTableInterface hTable = ServerUtil.getHTableForCoprocessorScan(env, + region.getTableDesc().getTableName().getName()); try { boolean allViewsInCurrentRegion = true; int numOfChildViews = 0; @@ -1578,7 +1697,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso // Add to list of HTables to delete, unless it's a view or its a shared index if (tableType != PTableType.VIEW && table.getViewIndexId()==null) { - tableNamesToDelete.add(table.getName().getBytes()); + tableNamesToDelete.add(table.getPhysicalName().getBytes()); } else { sharedTablesToDelete.add(new SharedTableState(table)); @@ -3125,7 +3244,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso } } - private static MetaDataMutationResult checkTableKeyInRegion(byte[] key, HRegion region) { + private static MetaDataMutationResult checkKeyInRegion(byte[] key, HRegion region, MutationCode code) { byte[] startKey = region.getStartKey(); byte[] endKey = region.getEndKey(); if (Bytes.compareTo(startKey, key) <= 0 @@ -3133,20 +3252,21 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso endKey) < 0)) { return null; // normal case; } - return new MetaDataMutationResult(MutationCode.TABLE_NOT_IN_REGION, - EnvironmentEdgeManager.currentTimeMillis(), null); + return new MetaDataMutationResult(code, EnvironmentEdgeManager.currentTimeMillis(), null); + } + + private static MetaDataMutationResult checkTableKeyInRegion(byte[] key, HRegion region) { + return checkKeyInRegion(key, region, MutationCode.TABLE_NOT_IN_REGION); + } private static MetaDataMutationResult checkFunctionKeyInRegion(byte[] key, HRegion region) { - byte[] startKey = region.getStartKey(); - byte[] endKey = region.getEndKey(); - if (Bytes.compareTo(startKey, key) <= 0 - && (Bytes.compareTo(HConstants.LAST_ROW, endKey) == 0 || Bytes.compareTo(key, - endKey) < 0)) { - return null; // normal case; - } - return new MetaDataMutationResult(MutationCode.FUNCTION_NOT_IN_REGION, - EnvironmentEdgeManager.currentTimeMillis(), null); + return checkKeyInRegion(key, region, MutationCode.FUNCTION_NOT_IN_REGION); + } + + private static MetaDataMutationResult checkSchemaKeyInRegion(byte[] key, HRegion region) { + return checkKeyInRegion(key, region, MutationCode.SCHEMA_NOT_IN_REGION); + } /** @@ -3210,6 +3330,46 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso } @Override + public void getSchema(RpcController controller, GetSchemaRequest request, RpcCallback<MetaDataResponse> done) { + MetaDataResponse.Builder builder = MetaDataResponse.newBuilder(); + HRegion region = env.getRegion(); + String schemaName = request.getSchemaName(); + byte[] lockKey = SchemaUtil.getSchemaKey(schemaName); + MetaDataMutationResult result = checkSchemaKeyInRegion(lockKey, region); + if (result != null) { + done.run(MetaDataMutationResult.toProto(result)); + return; + } + long clientTimeStamp = request.getClientTimestamp(); + List<RowLock> locks = Lists.newArrayList(); + try { + acquireLock(region, lockKey, locks); + // Get as of latest timestamp so we can detect if we have a + // newer schema that already + // exists without making an additional query + ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(lockKey); + PSchema schema = loadSchema(env, lockKey, cacheKey, clientTimeStamp, clientTimeStamp); + if (schema != null) { + if (schema.getTimeStamp() < clientTimeStamp) { + builder.setReturnCode(MetaDataProtos.MutationCode.SCHEMA_ALREADY_EXISTS); + builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis()); + builder.setSchema(PSchema.toProto(schema)); + done.run(builder.build()); + return; + } + } + } catch (Exception e) { + long currentTime = EnvironmentEdgeManager.currentTimeMillis(); + builder.setReturnCode(MetaDataProtos.MutationCode.SCHEMA_NOT_FOUND); + builder.setMutationTime(currentTime); + done.run(builder.build()); + return; + } finally { + region.releaseRowLocks(locks); + } + } + + @Override public void getFunctions(RpcController controller, GetFunctionsRequest request, RpcCallback<MetaDataResponse> done) { MetaDataResponse.Builder builder = MetaDataResponse.newBuilder(); @@ -3428,5 +3588,154 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso return new MetaDataMutationResult(MutationCode.FUNCTION_NOT_FOUND, EnvironmentEdgeManager.currentTimeMillis(), null); } + + @Override + public void createSchema(RpcController controller, CreateSchemaRequest request, + RpcCallback<MetaDataResponse> done) { + MetaDataResponse.Builder builder = MetaDataResponse.newBuilder(); + String schemaName = null; + try { + List<Mutation> schemaMutations = ProtobufUtil.getMutations(request); + schemaName = request.getSchemaName(); + Mutation m = MetaDataUtil.getPutOnlyTableHeaderRow(schemaMutations); + + byte[] lockKey = m.getRow(); + HRegion region = env.getRegion(); + MetaDataMutationResult result = checkSchemaKeyInRegion(lockKey, region); + if (result != null) { + done.run(MetaDataMutationResult.toProto(result)); + return; + } + List<RowLock> locks = Lists.newArrayList(); + long clientTimeStamp = MetaDataUtil.getClientTimeStamp(schemaMutations); + try { + acquireLock(region, lockKey, locks); + // Get as of latest timestamp so we can detect if we have a newer schema that already exists without + // making an additional query + ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(lockKey); + PSchema schema = loadSchema(env, lockKey, cacheKey, clientTimeStamp, clientTimeStamp); + if (schema != null) { + if (schema.getTimeStamp() < clientTimeStamp) { + builder.setReturnCode(MetaDataProtos.MutationCode.SCHEMA_ALREADY_EXISTS); + builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis()); + builder.setSchema(PSchema.toProto(schema)); + done.run(builder.build()); + return; + } else { + builder.setReturnCode(MetaDataProtos.MutationCode.NEWER_SCHEMA_FOUND); + builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis()); + builder.setSchema(PSchema.toProto(schema)); + done.run(builder.build()); + return; + } + } + region.mutateRowsWithLocks(schemaMutations, Collections.<byte[]> emptySet(), HConstants.NO_NONCE, + HConstants.NO_NONCE); + + // Invalidate the cache - the next getSchema call will add it + Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env) + .getMetaDataCache(); + if (cacheKey != null) { + metaDataCache.invalidate(cacheKey); + } + + // Get timeStamp from mutations - the above method sets it if + // it's unset + long currentTimeStamp = MetaDataUtil.getClientTimeStamp(schemaMutations); + builder.setReturnCode(MetaDataProtos.MutationCode.SCHEMA_NOT_FOUND); + builder.setMutationTime(currentTimeStamp); + done.run(builder.build()); + return; + } finally { + region.releaseRowLocks(locks); + } + } catch (Throwable t) { + logger.error("Creating the schema" + schemaName + "failed", t); + ProtobufUtil.setControllerException(controller, ServerUtil.createIOException(schemaName, t)); + } + } + + @Override + public void dropSchema(RpcController controller, DropSchemaRequest request, RpcCallback<MetaDataResponse> done) { + String schemaName = null; + try { + List<Mutation> schemaMetaData = ProtobufUtil.getMutations(request); + schemaName = request.getSchemaName(); + byte[] lockKey = SchemaUtil.getSchemaKey(schemaName); + HRegion region = env.getRegion(); + MetaDataMutationResult result = checkSchemaKeyInRegion(lockKey, region); + if (result != null) { + done.run(MetaDataMutationResult.toProto(result)); + return; + } + List<RowLock> locks = Lists.newArrayList(); + long clientTimeStamp = MetaDataUtil.getClientTimeStamp(schemaMetaData); + try { + acquireLock(region, lockKey, locks); + List<ImmutableBytesPtr> invalidateList = new ArrayList<ImmutableBytesPtr>(1); + result = doDropSchema(clientTimeStamp, schemaName, lockKey, schemaMetaData, invalidateList); + if (result.getMutationCode() != MutationCode.SCHEMA_ALREADY_EXISTS) { + done.run(MetaDataMutationResult.toProto(result)); + return; + } + region.mutateRowsWithLocks(schemaMetaData, Collections.<byte[]> emptySet(), HConstants.NO_NONCE, + HConstants.NO_NONCE); + Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env) + .getMetaDataCache(); + long currentTime = MetaDataUtil.getClientTimeStamp(schemaMetaData); + for (ImmutableBytesPtr ptr : invalidateList) { + metaDataCache.invalidate(ptr); + metaDataCache.put(ptr, newDeletedSchemaMarker(currentTime)); + } + done.run(MetaDataMutationResult.toProto(result)); + return; + } finally { + region.releaseRowLocks(locks); + } + } catch (Throwable t) { + logger.error("drop schema failed:", t); + ProtobufUtil.setControllerException(controller, ServerUtil.createIOException(schemaName, t)); + } + } + + private MetaDataMutationResult doDropSchema(long clientTimeStamp, String schemaName, byte[] key, + List<Mutation> schemaMutations, List<ImmutableBytesPtr> invalidateList) throws IOException, SQLException { + PSchema schema = loadSchema(env, key, new ImmutableBytesPtr(key), clientTimeStamp, clientTimeStamp); + boolean areTablesExists = false; + if (schema == null) { return new MetaDataMutationResult(MutationCode.SCHEMA_NOT_FOUND, + EnvironmentEdgeManager.currentTimeMillis(), null); } + if (schema.getTimeStamp() < clientTimeStamp) { + HRegion region = env.getRegion(); + Scan scan = MetaDataUtil.newTableRowsScan(SchemaUtil.getKeyForSchema(null, schemaName), MIN_TABLE_TIMESTAMP, + clientTimeStamp); + List<Cell> results = Lists.newArrayList(); + try (RegionScanner scanner = region.getScanner(scan);) { + scanner.next(results); + if (results.isEmpty()) { // Should not be possible + return new MetaDataMutationResult(MutationCode.SCHEMA_NOT_FOUND, + EnvironmentEdgeManager.currentTimeMillis(), null); + } + do { + Cell kv = results.get(0); + if (Bytes.compareTo(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(), key, 0, + key.length) != 0) { + areTablesExists = true; + break; + } + results.clear(); + scanner.next(results); + } while (!results.isEmpty()); + } + if (areTablesExists) { return new MetaDataMutationResult(MutationCode.TABLES_EXIST_ON_SCHEMA, schema, + EnvironmentEdgeManager.currentTimeMillis()); } + + return new MetaDataMutationResult(MutationCode.SCHEMA_ALREADY_EXISTS, schema, + EnvironmentEdgeManager.currentTimeMillis()); + } + return new MetaDataMutationResult(MutationCode.SCHEMA_NOT_FOUND, EnvironmentEdgeManager.currentTimeMillis(), + null); + + } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/7e432be7/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java index f8b4c79..0ebcd64 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java @@ -28,6 +28,7 @@ import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataService; import org.apache.phoenix.coprocessor.generated.PFunctionProtos; import org.apache.phoenix.hbase.index.util.VersionUtil; import org.apache.phoenix.parse.PFunction; +import org.apache.phoenix.parse.PSchema; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PColumnImpl; import org.apache.phoenix.schema.PName; @@ -78,8 +79,9 @@ public abstract class MetaDataProtocol extends MetaDataService { public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_5_0 = MIN_TABLE_TIMESTAMP + 8; public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_6_0 = MIN_TABLE_TIMESTAMP + 9; public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 = MIN_TABLE_TIMESTAMP + 15; + public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0 = MIN_TABLE_TIMESTAMP + 16; // MIN_SYSTEM_TABLE_TIMESTAMP needs to be set to the max of all the MIN_SYSTEM_TABLE_TIMESTAMP_* constants - public static final long MIN_SYSTEM_TABLE_TIMESTAMP = MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0; + public static final long MIN_SYSTEM_TABLE_TIMESTAMP = MIN_SYSTEM_TABLE_TIMESTAMP_4_8_0; // TODO: pare this down to minimum, as we don't need duplicates for both table and column errors, nor should we need // a different code for every type of error. // ENTITY_ALREADY_EXISTS, ENTITY_NOT_FOUND, NEWER_ENTITY_FOUND, ENTITY_NOT_IN_REGION, CONCURRENT_MODIFICATION @@ -99,6 +101,12 @@ public abstract class MetaDataProtocol extends MetaDataService { FUNCTION_NOT_FOUND, NEWER_FUNCTION_FOUND, FUNCTION_NOT_IN_REGION, + SCHEMA_ALREADY_EXISTS, + NEWER_SCHEMA_FOUND, + SCHEMA_NOT_FOUND, + SCHEMA_NOT_IN_REGION, + TABLES_EXIST_ON_SCHEMA, + UNALLOWED_SCHEMA_MUTATION, NO_OP }; @@ -176,6 +184,8 @@ public abstract class MetaDataProtocol extends MetaDataService { private byte[] columnName; private byte[] familyName; private boolean wasUpdated; + private PSchema schema; + private List<PFunction> functions = new ArrayList<PFunction>(1); public MetaDataMutationResult() { @@ -200,6 +210,12 @@ public abstract class MetaDataProtocol extends MetaDataService { this.wasUpdated = wasUpdated; } + public MetaDataMutationResult(MutationCode returnCode, PSchema schema, long currentTime) { + this.returnCode = returnCode; + this.mutationTime = currentTime; + this.schema = schema; + } + // For testing, so that connectionless can set wasUpdated so ColumnResolver doesn't complain public MetaDataMutationResult(MutationCode returnCode, long currentTime, PTable table, boolean wasUpdated) { this(returnCode, currentTime, table, Collections.<byte[]> emptyList()); @@ -297,6 +313,9 @@ public abstract class MetaDataProtocol extends MetaDataService { result.sharedTablesToDelete.add(new SharedTableState(sharedTable)); } } + if (proto.hasSchema()) { + result.schema = PSchema.createFromProto(proto.getSchema()); + } return result; } @@ -340,8 +359,15 @@ public abstract class MetaDataProtocol extends MetaDataService { builder.addSharedTablesToDelete(sharedTableStateBuilder.build()); } } + if (result.getSchema() != null) { + builder.setSchema(PSchema.toProto(result.schema)); + } } return builder.build(); } + + public PSchema getSchema() { + return schema; + } } }