Repository: hive Updated Branches: refs/heads/master d61c60ba7 -> 0ed01fdf8
http://git-wip-us.apache.org/repos/asf/hive/blob/0ed01fdf/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java index d503cff..e687a69 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java @@ -23,6 +23,8 @@ import com.google.common.collect.Iterators; import org.apache.commons.codec.binary.Base64; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; +import org.apache.hadoop.hive.metastore.api.SQLForeignKey; +import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -143,6 +145,8 @@ public class HBaseReadWrite implements MetadataStore { private final static byte[] REF_COUNT_COL = "ref".getBytes(HBaseUtils.ENCODING); private final static byte[] DELEGATION_TOKEN_COL = "dt".getBytes(HBaseUtils.ENCODING); private final static byte[] MASTER_KEY_COL = "mk".getBytes(HBaseUtils.ENCODING); + private final static byte[] PRIMARY_KEY_COL = "pk".getBytes(HBaseUtils.ENCODING); + private final static byte[] FOREIGN_KEY_COL = "fk".getBytes(HBaseUtils.ENCODING); private final static byte[] GLOBAL_PRIVS_KEY = "gp".getBytes(HBaseUtils.ENCODING); private final static byte[] SEQUENCES_KEY = "seq".getBytes(HBaseUtils.ENCODING); private final static int TABLES_TO_CACHE = 10; @@ -1716,6 +1720,22 @@ public class HBaseReadWrite implements MetadataStore { ColumnStatisticsObj cso = HBaseUtils.deserializeStatsForOneColumn(pcs, statsCol.getValue()); builder.append(dumpThriftObject(cso)); } + // Add the primary key + List<SQLPrimaryKey> pk = getPrimaryKey(sdParts.containingTable.getDbName(), + sdParts.containingTable.getTableName()); + if (pk != null && pk.size() > 0) { + builder.append(" primary key: "); + for (SQLPrimaryKey pkcol : pk) builder.append(dumpThriftObject(pkcol)); + } + + // Add any foreign keys + List<SQLForeignKey> fks = getForeignKeys(sdParts.containingTable.getDbName(), + sdParts.containingTable.getTableName()); + if (fks != null && fks.size() > 0) { + builder.append(" foreign keys: "); + for (SQLForeignKey fkcol : fks) builder.append(dumpThriftObject(fkcol)); + + } return builder.toString(); } @@ -2530,6 +2550,86 @@ public class HBaseReadWrite implements MetadataStore { } /********************************************************************************************** + * Constraints (pk/fk) related methods + *********************************************************************************************/ + + /** + * Fetch a primary key + * @param dbName database the table is in + * @param tableName table name + * @return List of primary key objects, which together make up one key + * @throws IOException if there's a read error + */ + List<SQLPrimaryKey> getPrimaryKey(String dbName, String tableName) throws IOException { + byte[] key = HBaseUtils.buildKey(dbName, tableName); + byte[] serialized = read(TABLE_TABLE, key, CATALOG_CF, PRIMARY_KEY_COL); + if (serialized == null) return null; + return HBaseUtils.deserializePrimaryKey(dbName, tableName, serialized); + } + + /** + * Fetch a the foreign keys for a table + * @param dbName database the table is in + * @param tableName table name + * @return All of the foreign key columns thrown together in one list. Have fun sorting them out. + * @throws IOException if there's a read error + */ + List<SQLForeignKey> getForeignKeys(String dbName, String tableName) throws IOException { + byte[] key = HBaseUtils.buildKey(dbName, tableName); + byte[] serialized = read(TABLE_TABLE, key, CATALOG_CF, FOREIGN_KEY_COL); + if (serialized == null) return null; + return HBaseUtils.deserializeForeignKeys(dbName, tableName, serialized); + } + + /** + * Create a primary key on a table. + * @param pk Primary key for this table + * @throws IOException if unable to write the data to the store. + */ + void putPrimaryKey(List<SQLPrimaryKey> pk) throws IOException { + byte[][] serialized = HBaseUtils.serializePrimaryKey(pk); + store(TABLE_TABLE, serialized[0], CATALOG_CF, PRIMARY_KEY_COL, serialized[1]); + } + + /** + * Create one or more foreign keys on a table. Note that this will not add a foreign key, it + * will overwrite whatever is there. So if you wish to add a key to a table that may already + * foreign keys you need to first use {@link #getForeignKeys(String, String)} to fetch the + * existing keys, add to the list, and then call this. + * @param fks Foreign key(s) for this table + * @throws IOException if unable to write the data to the store. + */ + void putForeignKeys(List<SQLForeignKey> fks) throws IOException { + byte[][] serialized = HBaseUtils.serializeForeignKeys(fks); + store(TABLE_TABLE, serialized[0], CATALOG_CF, FOREIGN_KEY_COL, serialized[1]); + } + + /** + * Drop the primary key from a table. + * @param dbName database the table is in + * @param tableName table name + * @throws IOException if unable to delete from the store + */ + void deletePrimaryKey(String dbName, String tableName) throws IOException { + byte[] key = HBaseUtils.buildKey(dbName, tableName); + delete(TABLE_TABLE, key, CATALOG_CF, PRIMARY_KEY_COL); + } + + /** + * Drop all foreign keys from a table. Note that this will drop all keys blindly. You should + * only call this if you're sure you want to drop them all. If you just want to drop one you + * should instead all {@link #getForeignKeys(String, String)}, modify the list it returns, and + * then call {@link #putForeignKeys(List)}. + * @param dbName database the table is in + * @param tableName table name + * @throws IOException if unable to delete from the store + */ + void deleteForeignKeys(String dbName, String tableName) throws IOException { + byte[] key = HBaseUtils.buildKey(dbName, tableName); + delete(TABLE_TABLE, key, CATALOG_CF, FOREIGN_KEY_COL); + } + + /********************************************************************************************** * Cache methods *********************************************************************************************/ http://git-wip-us.apache.org/repos/asf/hive/blob/0ed01fdf/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java index f9fad4c..07cc0da 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java @@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.cache.CacheLoader; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.hive.common.ObjectPair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -2690,42 +2691,143 @@ public class HBaseStore implements RawStore { } @Override - public List<SQLPrimaryKey> getPrimaryKeys(String db_name, String tbl_name) - throws MetaException { - // TODO: WTF? - return null; + public List<SQLPrimaryKey> getPrimaryKeys(String db_name, String tbl_name) throws MetaException { + boolean commit = false; + openTransaction(); + try { + List<SQLPrimaryKey> pk = getHBase().getPrimaryKey(db_name, tbl_name); + commit = true; + return pk; + } catch (IOException e) { + LOG.error("Unable to get primary key", e); + throw new MetaException("Error reading db " + e.getMessage()); + } finally { + commitOrRoleBack(commit); + } } @Override - public List<SQLForeignKey> getForeignKeys(String parent_db_name, - String parent_tbl_name, String foreign_db_name, String foreign_tbl_name) - throws MetaException { - // TODO: WTF? - return null; + public List<SQLForeignKey> getForeignKeys(String parent_db_name, String parent_tbl_name, + String foreign_db_name, String foreign_tbl_name) + throws MetaException { + boolean commit = false; + openTransaction(); + try { + List<SQLForeignKey> fks = getHBase().getForeignKeys(parent_db_name, parent_tbl_name); + if (fks == null || fks.size() == 0) return null; + List<SQLForeignKey> result = new ArrayList<>(fks.size()); + for (SQLForeignKey fkcol : fks) { + if (fkcol.getFktable_db().equals(parent_db_name) && + fkcol.getFktable_name().equals(parent_tbl_name)) { + result.add(fkcol); + } + } + commit = true; + return result; + } catch (IOException e) { + LOG.error("Unable to get foreign key", e); + throw new MetaException("Error reading db " + e.getMessage()); + } finally { + commitOrRoleBack(commit); + } } @Override - public void createTableWithConstraints(Table tbl, - List<SQLPrimaryKey> primaryKeys, List<SQLForeignKey> foreignKeys) - throws InvalidObjectException, MetaException { - // TODO: WTF? + public void createTableWithConstraints(Table tbl, List<SQLPrimaryKey> primaryKeys, + List<SQLForeignKey> foreignKeys) + throws InvalidObjectException, MetaException { + boolean commit = false; + openTransaction(); + try { + createTable(tbl); + if (primaryKeys != null) addPrimaryKeys(primaryKeys); + if (foreignKeys != null) addForeignKeys(foreignKeys); + commit = true; + } finally { + commitOrRoleBack(commit); + } } @Override - public void dropConstraint(String dbName, String tableName, - String constraintName) throws NoSuchObjectException { - // TODO: WTF? + public void dropConstraint(String dbName, String tableName, String constraintName) + throws NoSuchObjectException { + // This is something of pain, since we have to search both primary key and foreign key to see + // which they want to drop. + boolean commit = false; + openTransaction(); + try { + List<SQLPrimaryKey> pk = getHBase().getPrimaryKey(dbName, tableName); + if (pk != null && pk.size() > 0 && pk.get(0).getPk_name().equals(constraintName)) { + getHBase().deletePrimaryKey(dbName, tableName); + commit = true; + return; + } + + List<SQLForeignKey> fks = getHBase().getForeignKeys(dbName, tableName); + if (fks != null && fks.size() > 0) { + List<SQLForeignKey> newKeyList = new ArrayList<>(fks.size()); + // Make a new list of keys that excludes all columns from the constraint we're dropping. + for (SQLForeignKey fkcol : fks) { + if (!fkcol.getFk_name().equals(constraintName)) newKeyList.add(fkcol); + } + // If we've dropped only one foreign key out of many keys, than update so that we still + // have the existing keys. Otherwise drop the foreign keys all together. + if (newKeyList.size() > 0) getHBase().putForeignKeys(newKeyList); + else getHBase().deleteForeignKeys(dbName, tableName); + commit = true; + return; + } + + commit = true; + throw new NoSuchObjectException("Unable to find constraint named " + constraintName + + " on table " + tableNameForErrorMsg(dbName, tableName)); + } catch (IOException e) { + LOG.error("Error fetching primary key for table " + tableNameForErrorMsg(dbName, tableName), e); + throw new NoSuchObjectException("Error fetching primary key for table " + + tableNameForErrorMsg(dbName, tableName) + " : " + e.getMessage()); + } finally { + commitOrRoleBack(commit); + } } @Override - public void addPrimaryKeys(List<SQLPrimaryKey> pks) - throws InvalidObjectException, MetaException { - // TODO: WTF? + public void addPrimaryKeys(List<SQLPrimaryKey> pks) throws InvalidObjectException, MetaException { + boolean commit = false; + openTransaction(); + try { + List<SQLPrimaryKey> currentPk = + getHBase().getPrimaryKey(pks.get(0).getTable_db(), pks.get(0).getTable_name()); + if (currentPk != null) { + throw new MetaException(" Primary key already exists for: " + + tableNameForErrorMsg(pks.get(0).getTable_db(), pks.get(0).getTable_name())); + } + getHBase().putPrimaryKey(pks); + commit = true; + } catch (IOException e) { + LOG.error("Error writing primary key", e); + throw new MetaException("Error writing primary key: " + e.getMessage()); + } finally { + commitOrRoleBack(commit); + } } @Override - public void addForeignKeys(List<SQLForeignKey> fks) - throws InvalidObjectException, MetaException { - // TODO: WTF? + public void addForeignKeys(List<SQLForeignKey> fks) throws InvalidObjectException, MetaException { + boolean commit = false; + openTransaction(); + try { + // Fetch the existing keys (if any) and add in these new ones + List<SQLForeignKey> existing = + getHBase().getForeignKeys(fks.get(0).getFktable_db(), fks.get(0).getFktable_name()); + if (existing == null) existing = new ArrayList<>(fks.size()); + existing.addAll(fks); + getHBase().putForeignKeys(existing); + commit = true; + } catch (IOException e) { + LOG.error("Error writing foreign keys", e); + throw new MetaException("Error writing foreign keys: " + e.getMessage()); + } finally { + commitOrRoleBack(commit); + } } } http://git-wip-us.apache.org/repos/asf/hive/blob/0ed01fdf/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java index 54daa4a..4546d43 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java @@ -37,6 +37,7 @@ import java.util.TreeSet; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.AggrStats; import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData; @@ -61,6 +62,8 @@ import org.apache.hadoop.hive.metastore.api.PrivilegeGrantInfo; import org.apache.hadoop.hive.metastore.api.ResourceType; import org.apache.hadoop.hive.metastore.api.ResourceUri; import org.apache.hadoop.hive.metastore.api.Role; +import org.apache.hadoop.hive.metastore.api.SQLForeignKey; +import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; import org.apache.hadoop.hive.metastore.api.SerDeInfo; import org.apache.hadoop.hive.metastore.api.SkewedInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; @@ -360,6 +363,7 @@ public class HBaseUtils { return result; } + /** * Deserialize a database. This method should be used when the db anme is already known as it * doesn't have to re-deserialize it. @@ -1495,6 +1499,123 @@ public class HBaseUtils { } /** + * Serialize the primary key for a table. + * @param pk Primary key columns. It is expected that all of these match to one pk, since + * anything else is meaningless. + * @return two byte arrays, first containts the hbase key, the second the serialized value. + */ + static byte[][] serializePrimaryKey(List<SQLPrimaryKey> pk) { + // First, figure out the dbName and tableName. We expect this to match for all list entries. + byte[][] result = new byte[2][]; + String dbName = pk.get(0).getTable_db(); + String tableName = pk.get(0).getTable_name(); + result[0] = buildKey(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tableName)); + + HbaseMetastoreProto.PrimaryKey.Builder builder = HbaseMetastoreProto.PrimaryKey.newBuilder(); + // Encode the primary key, if present + builder.setPkName(pk.get(0).getPk_name()); + builder.setEnableConstraint(pk.get(0).isEnable_cstr()); + builder.setValidateConstraint(pk.get(0).isValidate_cstr()); + builder.setRelyConstraint(pk.get(0).isRely_cstr()); + + for (SQLPrimaryKey pkcol : pk) { + HbaseMetastoreProto.PrimaryKey.PrimaryKeyColumn.Builder pkColBuilder = + HbaseMetastoreProto.PrimaryKey.PrimaryKeyColumn.newBuilder(); + pkColBuilder.setColumnName(pkcol.getColumn_name()); + pkColBuilder.setKeySeq(pkcol.getKey_seq()); + builder.addCols(pkColBuilder); + } + + result[1] = builder.build().toByteArray(); + return result; + } + + /** + * Serialize the foreign key(s) for a table. + * @param fks Foreign key columns. These may belong to multiple foreign keys. + * @return two byte arrays, first containts the key, the second the serialized value. + */ + static byte[][] serializeForeignKeys(List<SQLForeignKey> fks) { + // First, figure out the dbName and tableName. We expect this to match for all list entries. + byte[][] result = new byte[2][]; + String dbName = fks.get(0).getFktable_db(); + String tableName = fks.get(0).getFktable_name(); + result[0] = buildKey(HiveStringUtils.normalizeIdentifier(dbName), + HiveStringUtils.normalizeIdentifier(tableName)); + + HbaseMetastoreProto.ForeignKeys.Builder builder = HbaseMetastoreProto.ForeignKeys.newBuilder(); + + // Encode any foreign keys we find. This can be complex because there may be more than + // one foreign key in here, so we need to detect that. + Map<String, HbaseMetastoreProto.ForeignKeys.ForeignKey.Builder> fkBuilders = new HashMap<>(); + + for (SQLForeignKey fkcol : fks) { + HbaseMetastoreProto.ForeignKeys.ForeignKey.Builder fkBuilder = + fkBuilders.get(fkcol.getFk_name()); + if (fkBuilder == null) { + // We haven't seen this key before, so add it + fkBuilder = HbaseMetastoreProto.ForeignKeys.ForeignKey.newBuilder(); + fkBuilder.setFkName(fkcol.getFk_name()); + fkBuilder.setReferencedDbName(fkcol.getPktable_db()); + assert dbName.equals(fkcol.getFktable_db()) : "You switched databases on me!"; + fkBuilder.setReferencedTableName(fkcol.getPktable_name()); + assert tableName.equals(fkcol.getFktable_name()) : "You switched tables on me!"; + fkBuilder.setReferencedPkName(fkcol.getPk_name()); + fkBuilder.setUpdateRule(fkcol.getUpdate_rule()); + fkBuilder.setDeleteRule(fkcol.getDelete_rule()); + fkBuilder.setEnableConstraint(fkcol.isEnable_cstr()); + fkBuilder.setValidateConstraint(fkcol.isValidate_cstr()); + fkBuilder.setRelyConstraint(fkcol.isRely_cstr()); + fkBuilders.put(fkcol.getFk_name(), fkBuilder); + } + HbaseMetastoreProto.ForeignKeys.ForeignKey.ForeignKeyColumn.Builder fkColBuilder = + HbaseMetastoreProto.ForeignKeys.ForeignKey.ForeignKeyColumn.newBuilder(); + fkColBuilder.setColumnName(fkcol.getFkcolumn_name()); + fkColBuilder.setReferencedColumnName(fkcol.getPkcolumn_name()); + fkColBuilder.setKeySeq(fkcol.getKey_seq()); + fkBuilder.addCols(fkColBuilder); + } + for (HbaseMetastoreProto.ForeignKeys.ForeignKey.Builder fkBuilder : fkBuilders.values()) { + builder.addFks(fkBuilder); + } + result[1] = builder.build().toByteArray(); + return result; + } + + static List<SQLPrimaryKey> deserializePrimaryKey(String dbName, String tableName, byte[] value) + throws InvalidProtocolBufferException { + HbaseMetastoreProto.PrimaryKey proto = HbaseMetastoreProto.PrimaryKey.parseFrom(value); + List<SQLPrimaryKey> result = new ArrayList<>(); + for (HbaseMetastoreProto.PrimaryKey.PrimaryKeyColumn protoPkCol : proto.getColsList()) { + result.add(new SQLPrimaryKey(dbName, tableName, protoPkCol.getColumnName(), + protoPkCol.getKeySeq(), proto.getPkName(), proto.getEnableConstraint(), + proto.getValidateConstraint(), proto.getRelyConstraint())); + } + + return result; + } + + static List<SQLForeignKey> deserializeForeignKeys(String dbName, String tableName, byte[] value) + throws InvalidProtocolBufferException { + List<SQLForeignKey> result = new ArrayList<>(); + HbaseMetastoreProto.ForeignKeys protoConstraints = + HbaseMetastoreProto.ForeignKeys.parseFrom(value); + + for (HbaseMetastoreProto.ForeignKeys.ForeignKey protoFk : protoConstraints.getFksList()) { + for (HbaseMetastoreProto.ForeignKeys.ForeignKey.ForeignKeyColumn protoFkCol : + protoFk.getColsList()) { + result.add(new SQLForeignKey(protoFk.getReferencedDbName(), protoFk.getReferencedTableName(), + protoFkCol.getReferencedColumnName(), dbName, tableName, protoFkCol.getColumnName(), + protoFkCol.getKeySeq(), protoFk.getUpdateRule(), protoFk.getDeleteRule(), + protoFk.getFkName(), protoFk.getReferencedPkName(), protoFk.getEnableConstraint(), + protoFk.getValidateConstraint(), protoFk.getRelyConstraint())); + } + } + return result; + } + + /** * @param keyStart byte array representing the start prefix * @return byte array corresponding to the next possible prefix */ http://git-wip-us.apache.org/repos/asf/hive/blob/0ed01fdf/metastore/src/protobuf/org/apache/hadoop/hive/metastore/hbase/hbase_metastore_proto.proto ---------------------------------------------------------------------- diff --git a/metastore/src/protobuf/org/apache/hadoop/hive/metastore/hbase/hbase_metastore_proto.proto b/metastore/src/protobuf/org/apache/hadoop/hive/metastore/hbase/hbase_metastore_proto.proto index 6fbe36c..3f9e4c5 100644 --- a/metastore/src/protobuf/org/apache/hadoop/hive/metastore/hbase/hbase_metastore_proto.proto +++ b/metastore/src/protobuf/org/apache/hadoop/hive/metastore/hbase/hbase_metastore_proto.proto @@ -295,3 +295,39 @@ message PartitionKeyComparator { repeated Operator op = 3; repeated Range range = 4; } + +message PrimaryKey { + message PrimaryKeyColumn { + required string column_name = 1; + required sint32 key_seq = 2; + } + + required string pk_name = 1; + repeated PrimaryKeyColumn cols = 2; + optional bool enable_constraint = 3; + optional bool validate_constraint = 4; + optional bool rely_constraint = 5; +} + +message ForeignKeys { + message ForeignKey { + message ForeignKeyColumn { + required string column_name = 1; + required string referenced_column_name = 2; + required sint32 key_seq = 3; + } + + required string fk_name = 1; + required string referenced_db_name = 2; + required string referenced_table_name = 3; + optional string referenced_pk_name = 4; + optional int32 update_rule = 5; + optional int32 delete_rule = 6; + repeated ForeignKeyColumn cols = 7; + optional bool enable_constraint = 8; + optional bool validate_constraint = 9; + optional bool rely_constraint = 10; + } + + repeated ForeignKey fks = 1; +} http://git-wip-us.apache.org/repos/asf/hive/blob/0ed01fdf/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseStore.java b/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseStore.java index 4894ed3..fb0a8e7 100644 --- a/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseStore.java +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseStore.java @@ -22,10 +22,12 @@ import java.io.IOException; import java.security.MessageDigest; import java.util.ArrayList; import java.util.Arrays; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.SortedMap; +import java.util.SortedSet; import java.util.TreeMap; import org.apache.hadoop.hbase.Cell; @@ -46,6 +48,7 @@ import org.apache.hadoop.hive.metastore.api.Function; import org.apache.hadoop.hive.metastore.api.FunctionType; import org.apache.hadoop.hive.metastore.api.Index; import org.apache.hadoop.hive.metastore.api.LongColumnStatsData; +import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Order; import org.apache.hadoop.hive.metastore.api.Partition; @@ -53,6 +56,8 @@ import org.apache.hadoop.hive.metastore.api.PrincipalType; import org.apache.hadoop.hive.metastore.api.ResourceType; import org.apache.hadoop.hive.metastore.api.ResourceUri; import org.apache.hadoop.hive.metastore.api.Role; +import org.apache.hadoop.hive.metastore.api.SQLForeignKey; +import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; import org.apache.hadoop.hive.metastore.api.SerDeInfo; import org.apache.hadoop.hive.metastore.api.SkewedInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; @@ -62,6 +67,7 @@ import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -1391,6 +1397,366 @@ public class TestHBaseStore { Assert.assertEquals(decimalData.getNumDVs(), decimalDataFromDB.getNumDVs()); } + @Test + public void createTableWithPrimaryKey() throws Exception { + String tableName = "pktable"; + String pkName = "test_pk"; + String pkColNames[] = { "col0" }; + Table table = createMultiColumnTable(tableName, "int"); + + List<SQLPrimaryKey> pk = Arrays.asList( + new SQLPrimaryKey(DB, tableName, pkColNames[0], 0, pkName, true, false, true)); + + store.createTableWithConstraints(table, pk, null); + + pk = store.getPrimaryKeys(DB, tableName); + + Assert.assertNotNull(pk); + Assert.assertEquals(1, pk.size()); + Assert.assertEquals(DB, pk.get(0).getTable_db()); + Assert.assertEquals(tableName, pk.get(0).getTable_name()); + Assert.assertEquals(pkColNames[0], pk.get(0).getColumn_name()); + Assert.assertEquals(0, pk.get(0).getKey_seq()); + Assert.assertEquals(pkName, pk.get(0).getPk_name()); + Assert.assertTrue(pk.get(0).isEnable_cstr()); + Assert.assertFalse(pk.get(0).isValidate_cstr()); + Assert.assertTrue(pk.get(0).isRely_cstr()); + + // Drop the primary key + store.dropConstraint(DB, tableName, pkName); + + pk = store.getPrimaryKeys(DB, tableName); + Assert.assertNull(pk); + } + + @Test + public void createTableWithForeignKey() throws Exception { + String tableName = "fktable"; + String pkTable = "pktable"; + String pkName = "test_pk"; + String fkName = "test_fk"; + String fkColNames[] = { "col0" }; + String pkColNames[] = { "pcol0" }; + Table table = createMultiColumnTable(tableName, "int"); + + List<SQLForeignKey> fk = Arrays.asList( + new SQLForeignKey(DB, pkTable, pkColNames[0], DB, tableName, fkColNames[0], 0, 1, 2, + fkName, pkName, true, false, false)); + + store.createTableWithConstraints(table, null, fk); + + fk = store.getForeignKeys(DB, tableName, DB, pkTable); + + Assert.assertNotNull(fk); + Assert.assertEquals(1, fk.size()); + Assert.assertEquals(DB, fk.get(0).getPktable_db()); + Assert.assertEquals(pkTable, fk.get(0).getPktable_name()); + Assert.assertEquals(pkColNames[0], fk.get(0).getPkcolumn_name()); + Assert.assertEquals(DB, fk.get(0).getFktable_db()); + Assert.assertEquals(tableName, fk.get(0).getFktable_name()); + Assert.assertEquals(fkColNames[0], fk.get(0).getFkcolumn_name()); + Assert.assertEquals(0, fk.get(0).getKey_seq()); + Assert.assertEquals(1, fk.get(0).getUpdate_rule()); + Assert.assertEquals(2, fk.get(0).getDelete_rule()); + Assert.assertEquals(fkName, fk.get(0).getFk_name()); + Assert.assertEquals(pkName, fk.get(0).getPk_name()); + Assert.assertTrue(fk.get(0).isEnable_cstr()); + Assert.assertFalse(fk.get(0).isValidate_cstr()); + Assert.assertFalse(fk.get(0).isRely_cstr()); + } + + // Test that we can add a primary key with multiple columns + @Test + public void addMultiColPrimaryKey() throws Exception { + String tableName = "mcpktable"; + String pkName = "test_pk"; + String pkColNames[] = { "col0", "col1", "col2" }; + Table table = createMultiColumnTable(tableName, "int", "varchar(32)", "decimal(10,2)"); + + List<SQLPrimaryKey> pk = Arrays.asList( + new SQLPrimaryKey(DB, tableName, pkColNames[1], 0, pkName, false, true, true), + new SQLPrimaryKey(DB, tableName, pkColNames[2], 1, pkName, false, true, true) + ); + + store.createTable(table); + store.addPrimaryKeys(pk); + + Assert.assertNotNull(pk); + Assert.assertEquals(2, pk.size()); + SQLPrimaryKey[] sorted = pk.toArray(new SQLPrimaryKey[2]); + Arrays.sort(sorted, new Comparator<SQLPrimaryKey>() { + @Override + public int compare(SQLPrimaryKey o1, SQLPrimaryKey o2) { + return o1.getColumn_name().compareTo(o2.getColumn_name()); + } + }); + for (int i = 0; i < 2; i++) { + Assert.assertEquals(DB, sorted[i].getTable_db()); + Assert.assertEquals(tableName, sorted[i].getTable_name()); + Assert.assertEquals(pkColNames[i+1], sorted[i].getColumn_name()); + Assert.assertEquals(i, sorted[i].getKey_seq()); + Assert.assertEquals(pkName, sorted[i].getPk_name()); + Assert.assertFalse(sorted[i].isEnable_cstr()); + Assert.assertTrue(sorted[i].isValidate_cstr()); + Assert.assertTrue(sorted[i].isRely_cstr()); + } + + } + + // Test that we can create a foreign key with multiple columns + @Test + public void addMultiColForeignKey() throws Exception { + String tableName = "mcfktable"; + String pkTable = "pktable"; + String pkName = "test_pk"; + String fkName = "test_fk"; + String fkColNames[] = { "col0", "col1", "col2" }; + String pkColNames[] = { "pcol0", "pcol1" }; + Table table = createMultiColumnTable(tableName, "int", "double", "timestamp"); + + List<SQLForeignKey> fk = Arrays.asList( + new SQLForeignKey(DB, pkTable, pkColNames[0], DB, tableName, fkColNames[1], 0, 1, 2, + fkName, pkName, true, false, false), + new SQLForeignKey(DB, pkTable, pkColNames[1], DB, tableName, fkColNames[2], 1, 1, 2, + fkName, pkName, true, false, false) + ); + + store.createTable(table); + store.addForeignKeys(fk); + + fk = store.getForeignKeys(DB, tableName, DB, pkTable); + + Assert.assertNotNull(fk); + Assert.assertEquals(2, fk.size()); + SQLForeignKey[] sorted = fk.toArray(new SQLForeignKey[2]); + Arrays.sort(sorted, new Comparator<SQLForeignKey>() { + @Override + public int compare(SQLForeignKey o1, SQLForeignKey o2) { + if (o1.getFk_name().equals(o2.getFk_name())) { + return o1.getFkcolumn_name().compareTo(o2.getFkcolumn_name()); + } else { + return o1.getFk_name().compareTo(o2.getFk_name()); + } + } + }); + + for (int i = 0; i < 2; i++) { + Assert.assertEquals(DB, sorted[i].getPktable_db()); + Assert.assertEquals(pkTable, sorted[i].getPktable_name()); + Assert.assertEquals(pkColNames[i], sorted[i].getPkcolumn_name()); + Assert.assertEquals(DB, sorted[i].getFktable_db()); + Assert.assertEquals(tableName, sorted[i].getFktable_name()); + Assert.assertEquals(fkColNames[i+1], sorted[i].getFkcolumn_name()); + Assert.assertEquals(i, sorted[i].getKey_seq()); + Assert.assertEquals(1, sorted[i].getUpdate_rule()); + Assert.assertEquals(2, sorted[i].getDelete_rule()); + Assert.assertEquals(fkName, sorted[i].getFk_name()); + Assert.assertEquals(pkName, sorted[i].getPk_name()); + Assert.assertTrue(sorted[i].isEnable_cstr()); + Assert.assertFalse(sorted[i].isValidate_cstr()); + Assert.assertFalse(sorted[i].isRely_cstr()); + } + + } + + // Test that we can add 2 foreign keys at once + @Test + public void addMultiForeignKeys() throws Exception { + String tableName = "mcfktable"; + String pkTable = "pktable"; + String pkTable2 = "pktable2"; + String pkName = "test_pk"; + String pkName2 = "test_pk2"; + String fkName = "test_fk"; + String fkName2 = "test_fk2"; + String fkColNames[] = { "col0", "col1", "col2" }; + String pkColNames[] = { "pcol0", "pcol1" }; + String pkColNames2[] = { "p2col0" }; + Table table = createMultiColumnTable(tableName, "int", "double", "timestamp"); + + List<SQLForeignKey> fk = Arrays.asList( + new SQLForeignKey(DB, pkTable, pkColNames[0], DB, tableName, fkColNames[1], 0, 1, 2, + fkName, pkName, true, false, true), + new SQLForeignKey(DB, pkTable, pkColNames[1], DB, tableName, fkColNames[2], 1, 1, 2, + fkName, pkName, true, false, true), + new SQLForeignKey(DB, pkTable2, pkColNames2[0], DB, tableName, fkColNames[0], 0, 1, 2, + fkName2, pkName2, true, false, true) + ); + + store.createTable(table); + store.addForeignKeys(fk); + + fk = store.getForeignKeys(DB, tableName, DB, pkTable); + + Assert.assertNotNull(fk); + Assert.assertEquals(3, fk.size()); + SQLForeignKey[] sorted = fk.toArray(new SQLForeignKey[2]); + Arrays.sort(sorted, new Comparator<SQLForeignKey>() { + @Override + public int compare(SQLForeignKey o1, SQLForeignKey o2) { + if (o1.getFk_name().equals(o2.getFk_name())) { + return o1.getFkcolumn_name().compareTo(o2.getFkcolumn_name()); + } else { + return o1.getFk_name().compareTo(o2.getFk_name()); + } + } + }); + + for (int i = 0; i < 2; i++) { + Assert.assertEquals(DB, sorted[i].getPktable_db()); + Assert.assertEquals(pkTable, sorted[i].getPktable_name()); + Assert.assertEquals(pkColNames[i], sorted[i].getPkcolumn_name()); + Assert.assertEquals(DB, sorted[i].getFktable_db()); + Assert.assertEquals(tableName, sorted[i].getFktable_name()); + Assert.assertEquals(fkColNames[i+1], sorted[i].getFkcolumn_name()); + Assert.assertEquals(i, sorted[i].getKey_seq()); + Assert.assertEquals(1, sorted[i].getUpdate_rule()); + Assert.assertEquals(2, sorted[i].getDelete_rule()); + Assert.assertEquals(fkName, sorted[i].getFk_name()); + Assert.assertEquals(pkName, sorted[i].getPk_name()); + Assert.assertTrue(sorted[i].isEnable_cstr()); + Assert.assertFalse(sorted[i].isValidate_cstr()); + Assert.assertTrue(sorted[i].isRely_cstr()); + } + Assert.assertEquals(DB, sorted[2].getPktable_db()); + Assert.assertEquals(pkTable2, sorted[2].getPktable_name()); + Assert.assertEquals(pkColNames2[0], sorted[2].getPkcolumn_name()); + Assert.assertEquals(DB, sorted[2].getFktable_db()); + Assert.assertEquals(tableName, sorted[2].getFktable_name()); + Assert.assertEquals(fkColNames[0], sorted[2].getFkcolumn_name()); + Assert.assertEquals(0, sorted[2].getKey_seq()); + Assert.assertEquals(1, sorted[2].getUpdate_rule()); + Assert.assertEquals(2, sorted[2].getDelete_rule()); + Assert.assertEquals(fkName2, sorted[2].getFk_name()); + Assert.assertEquals(pkName2, sorted[2].getPk_name()); + Assert.assertTrue(sorted[2].isEnable_cstr()); + Assert.assertFalse(sorted[2].isValidate_cstr()); + Assert.assertTrue(sorted[2].isRely_cstr()); + + } + + // Test that we can add a foreign key when one already exists + @Test + public void addSecondForeignKeys() throws Exception { + String tableName = "mcfktable"; + String pkTable = "pktable"; + String pkTable2 = "pktable2"; + String pkName = "test_pk"; + String pkName2 = "test_pk2"; + String fkName = "test_fk"; + String fkName2 = "test_fk2"; + String fkColNames[] = { "col0", "col1", "col2" }; + String pkColNames[] = { "pcol0", "pcol1" }; + String pkColNames2[] = { "p2col0" }; + Table table = createMultiColumnTable(tableName, "int", "double", "timestamp"); + + List<SQLForeignKey> fk = Arrays.asList( + new SQLForeignKey(DB, pkTable, pkColNames[0], DB, tableName, fkColNames[1], 0, 1, 2, + fkName, pkName, true, false, true), + new SQLForeignKey(DB, pkTable, pkColNames[1], DB, tableName, fkColNames[2], 1, 1, 2, + fkName, pkName, true, false, true) + ); + + store.createTable(table); + store.addForeignKeys(fk); + + fk = Arrays.asList( + new SQLForeignKey(DB, pkTable2, pkColNames2[0], DB, tableName, fkColNames[0], 0, 1, 2, + fkName2, pkName2, true, false, true) + ); + store.addForeignKeys(fk); + + fk = store.getForeignKeys(DB, tableName, DB, pkTable); + + Assert.assertNotNull(fk); + Assert.assertEquals(3, fk.size()); + SQLForeignKey[] sorted = fk.toArray(new SQLForeignKey[2]); + Arrays.sort(sorted, new Comparator<SQLForeignKey>() { + @Override + public int compare(SQLForeignKey o1, SQLForeignKey o2) { + if (o1.getFk_name().equals(o2.getFk_name())) { + return o1.getFkcolumn_name().compareTo(o2.getFkcolumn_name()); + } else { + return o1.getFk_name().compareTo(o2.getFk_name()); + } + } + }); + + for (int i = 0; i < 2; i++) { + Assert.assertEquals(DB, sorted[i].getPktable_db()); + Assert.assertEquals(pkTable, sorted[i].getPktable_name()); + Assert.assertEquals(pkColNames[i], sorted[i].getPkcolumn_name()); + Assert.assertEquals(DB, sorted[i].getFktable_db()); + Assert.assertEquals(tableName, sorted[i].getFktable_name()); + Assert.assertEquals(fkColNames[i+1], sorted[i].getFkcolumn_name()); + Assert.assertEquals(i, sorted[i].getKey_seq()); + Assert.assertEquals(1, sorted[i].getUpdate_rule()); + Assert.assertEquals(2, sorted[i].getDelete_rule()); + Assert.assertEquals(fkName, sorted[i].getFk_name()); + Assert.assertEquals(pkName, sorted[i].getPk_name()); + Assert.assertTrue(sorted[i].isEnable_cstr()); + Assert.assertFalse(sorted[i].isValidate_cstr()); + Assert.assertTrue(sorted[i].isRely_cstr()); + } + Assert.assertEquals(DB, sorted[2].getPktable_db()); + Assert.assertEquals(pkTable2, sorted[2].getPktable_name()); + Assert.assertEquals(pkColNames2[0], sorted[2].getPkcolumn_name()); + Assert.assertEquals(DB, sorted[2].getFktable_db()); + Assert.assertEquals(tableName, sorted[2].getFktable_name()); + Assert.assertEquals(fkColNames[0], sorted[2].getFkcolumn_name()); + Assert.assertEquals(0, sorted[2].getKey_seq()); + Assert.assertEquals(1, sorted[2].getUpdate_rule()); + Assert.assertEquals(2, sorted[2].getDelete_rule()); + Assert.assertEquals(fkName2, sorted[2].getFk_name()); + Assert.assertEquals(pkName2, sorted[2].getPk_name()); + Assert.assertTrue(sorted[2].isEnable_cstr()); + Assert.assertFalse(sorted[2].isValidate_cstr()); + Assert.assertTrue(sorted[2].isRely_cstr()); + + store.dropConstraint(DB, tableName, fkName); + + fk = store.getForeignKeys(DB, tableName, DB, pkTable); + Assert.assertNotNull(fk); + Assert.assertEquals(1, fk.size()); + Assert.assertEquals(DB, fk.get(0).getPktable_db()); + Assert.assertEquals(pkTable2, fk.get(0).getPktable_name()); + Assert.assertEquals(pkColNames2[0], fk.get(0).getPkcolumn_name()); + Assert.assertEquals(DB, fk.get(0).getFktable_db()); + Assert.assertEquals(tableName, fk.get(0).getFktable_name()); + Assert.assertEquals(fkColNames[0], fk.get(0).getFkcolumn_name()); + Assert.assertEquals(0, fk.get(0).getKey_seq()); + Assert.assertEquals(1, fk.get(0).getUpdate_rule()); + Assert.assertEquals(2, fk.get(0).getDelete_rule()); + Assert.assertEquals(fkName2, fk.get(0).getFk_name()); + Assert.assertEquals(pkName2, fk.get(0).getPk_name()); + Assert.assertTrue(fk.get(0).isEnable_cstr()); + Assert.assertFalse(fk.get(0).isValidate_cstr()); + Assert.assertTrue(fk.get(0).isRely_cstr()); + + store.dropConstraint(DB, tableName, fkName2); + + fk = store.getForeignKeys(DB, tableName, DB, pkTable); + Assert.assertNull(fk); + } + + // Try adding a primary key when one already exists + @Test(expected= MetaException.class) + public void doublePrimaryKey() throws Exception { + String tableName = "pktable"; + String pkName = "test_pk"; + String pkColNames[] = { "col0" }; + Table table = createMultiColumnTable(tableName, "int"); + + List<SQLPrimaryKey> pk = Arrays.asList( + new SQLPrimaryKey(DB, tableName, pkColNames[0], 0, pkName, true, false, true)); + + store.createTableWithConstraints(table, pk, null); + + store.addPrimaryKeys(pk); + } + + + private Table createMockTableAndPartition(String partType, String partVal) throws Exception { List<FieldSchema> cols = new ArrayList<FieldSchema>(); cols.add(new FieldSchema("col1", partType, "")); @@ -1425,6 +1791,22 @@ public class TestHBaseStore { store.createTable(table); return table; } + + private Table createMultiColumnTable(String tblName, String... types) throws Exception { + List<FieldSchema> cols = new ArrayList<FieldSchema>(); + for (int i = 0; i < types.length; i++) cols.add(new FieldSchema("col" + i, types[i], "")); + SerDeInfo serde = new SerDeInfo("serde", "seriallib", null); + Map<String, String> params = new HashMap<String, String>(); + params.put("key", "value"); + StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 17, + serde, Arrays.asList("bucketcol"), Arrays.asList(new Order("sortcol", 1)), params); + int currentTime = (int)(System.currentTimeMillis() / 1000); + Table table = new Table(tblName, DB, "me", currentTime, currentTime, 0, sd, cols, + emptyParameters, null, null, null); + store.createTable(table); + return table; + } + /** * Returns a dummy table level ColumnStatisticsDesc with default values */