Modified: hive/branches/hbase-metastore/metastore/src/gen/thrift/gen-py/hive_metastore/ThriftHiveMetastore-remote URL: http://svn.apache.org/viewvc/hive/branches/hbase-metastore/metastore/src/gen/thrift/gen-py/hive_metastore/ThriftHiveMetastore-remote?rev=1657394&r1=1657393&r2=1657394&view=diff ============================================================================== --- hive/branches/hbase-metastore/metastore/src/gen/thrift/gen-py/hive_metastore/ThriftHiveMetastore-remote (original) +++ hive/branches/hbase-metastore/metastore/src/gen/thrift/gen-py/hive_metastore/ThriftHiveMetastore-remote Wed Feb 4 20:00:49 2015 @@ -142,6 +142,7 @@ if len(sys.argv) <= 1 or sys.argv[1] == print ' ShowCompactResponse show_compact(ShowCompactRequest rqst)' print ' NotificationEventResponse get_next_notification(NotificationEventRequest rqst)' print ' CurrentNotificationEventId get_current_notificationEventId()' + print ' void flushCache()' print '' sys.exit(0) @@ -907,6 +908,12 @@ elif cmd == 'get_current_notificationEve sys.exit(1) pp.pprint(client.get_current_notificationEventId()) +elif cmd == 'flushCache': + if len(args) != 0: + print 'flushCache requires 0 args' + sys.exit(1) + pp.pprint(client.flushCache()) + else: print 'Unrecognized method %s' % cmd sys.exit(1)
Modified: hive/branches/hbase-metastore/metastore/src/gen/thrift/gen-py/hive_metastore/ThriftHiveMetastore.py URL: http://svn.apache.org/viewvc/hive/branches/hbase-metastore/metastore/src/gen/thrift/gen-py/hive_metastore/ThriftHiveMetastore.py?rev=1657394&r1=1657393&r2=1657394&view=diff ============================================================================== --- hive/branches/hbase-metastore/metastore/src/gen/thrift/gen-py/hive_metastore/ThriftHiveMetastore.py (original) +++ hive/branches/hbase-metastore/metastore/src/gen/thrift/gen-py/hive_metastore/ThriftHiveMetastore.py Wed Feb 4 20:00:49 2015 @@ -986,6 +986,9 @@ class Iface(fb303.FacebookService.Iface) def get_current_notificationEventId(self, ): pass + def flushCache(self, ): + pass + class Client(fb303.FacebookService.Client, Iface): """ @@ -5261,6 +5264,29 @@ class Client(fb303.FacebookService.Clien return result.success raise TApplicationException(TApplicationException.MISSING_RESULT, "get_current_notificationEventId failed: unknown result"); + def flushCache(self, ): + self.send_flushCache() + self.recv_flushCache() + + def send_flushCache(self, ): + self._oprot.writeMessageBegin('flushCache', TMessageType.CALL, self._seqid) + args = flushCache_args() + args.write(self._oprot) + self._oprot.writeMessageEnd() + self._oprot.trans.flush() + + def recv_flushCache(self, ): + (fname, mtype, rseqid) = self._iprot.readMessageBegin() + if mtype == TMessageType.EXCEPTION: + x = TApplicationException() + x.read(self._iprot) + self._iprot.readMessageEnd() + raise x + result = flushCache_result() + result.read(self._iprot) + self._iprot.readMessageEnd() + return + class Processor(fb303.FacebookService.Processor, Iface, TProcessor): def __init__(self, handler): @@ -5384,6 +5410,7 @@ class Processor(fb303.FacebookService.Pr self._processMap["show_compact"] = Processor.process_show_compact self._processMap["get_next_notification"] = Processor.process_get_next_notification self._processMap["get_current_notificationEventId"] = Processor.process_get_current_notificationEventId + self._processMap["flushCache"] = Processor.process_flushCache def process(self, iprot, oprot): (name, type, seqid) = iprot.readMessageBegin() @@ -7291,6 +7318,17 @@ class Processor(fb303.FacebookService.Pr oprot.writeMessageEnd() oprot.trans.flush() + def process_flushCache(self, seqid, iprot, oprot): + args = flushCache_args() + args.read(iprot) + iprot.readMessageEnd() + result = flushCache_result() + self._handler.flushCache() + oprot.writeMessageBegin("flushCache", TMessageType.REPLY, seqid) + result.write(oprot) + oprot.writeMessageEnd() + oprot.trans.flush() + # HELPER FUNCTIONS AND STRUCTURES @@ -26478,6 +26516,90 @@ class get_current_notificationEventId_re oprot.writeFieldStop() oprot.writeStructEnd() + def validate(self): + return + + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + +class flushCache_args: + + thrift_spec = ( + ) + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('flushCache_args') + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + +class flushCache_result: + + thrift_spec = ( + ) + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('flushCache_result') + oprot.writeFieldStop() + oprot.writeStructEnd() + def validate(self): return Modified: hive/branches/hbase-metastore/metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb URL: http://svn.apache.org/viewvc/hive/branches/hbase-metastore/metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb?rev=1657394&r1=1657393&r2=1657394&view=diff ============================================================================== --- hive/branches/hbase-metastore/metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb (original) +++ hive/branches/hbase-metastore/metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb Wed Feb 4 20:00:49 2015 @@ -2007,6 +2007,20 @@ module ThriftHiveMetastore raise ::Thrift::ApplicationException.new(::Thrift::ApplicationException::MISSING_RESULT, 'get_current_notificationEventId failed: unknown result') end + def flushCache() + send_flushCache() + recv_flushCache() + end + + def send_flushCache() + send_message('flushCache', FlushCache_args) + end + + def recv_flushCache() + result = receive_message(FlushCache_result) + return + end + end class Processor < ::FacebookService::Processor @@ -3537,6 +3551,13 @@ module ThriftHiveMetastore write_result(result, oprot, 'get_current_notificationEventId', seqid) end + def process_flushCache(seqid, iprot, oprot) + args = read_args(iprot, FlushCache_args) + result = FlushCache_result.new() + @handler.flushCache() + write_result(result, oprot, 'flushCache', seqid) + end + end # HELPER FUNCTIONS AND STRUCTURES @@ -8081,6 +8102,36 @@ module ThriftHiveMetastore } def struct_fields; FIELDS; end + + def validate + end + + ::Thrift::Struct.generate_accessors self + end + + class FlushCache_args + include ::Thrift::Struct, ::Thrift::Struct_Union + + FIELDS = { + + } + + def struct_fields; FIELDS; end + + def validate + end + + ::Thrift::Struct.generate_accessors self + end + + class FlushCache_result + include ::Thrift::Struct, ::Thrift::Struct_Union + + FIELDS = { + + } + + def struct_fields; FIELDS; end def validate end Modified: hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java URL: http://svn.apache.org/viewvc/hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java?rev=1657394&r1=1657393&r2=1657394&view=diff ============================================================================== --- hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java (original) +++ hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java Wed Feb 4 20:00:49 2015 @@ -574,6 +574,19 @@ public class HiveMetaStore extends Thrif + rawStoreClassName)); Configuration conf = getConf(); + if (hiveConf.getBoolVar(ConfVars.METASTORE_FASTPATH)) { + LOG.info("Fastpath, skipping raw store proxy"); + try { + RawStore rs = ((Class<? extends RawStore>) MetaStoreUtils.getClass( + rawStoreClassName)).newInstance(); + rs.setConf(conf); + return rs; + } catch (Exception e) { + LOG.fatal("Unable to instantiate raw store directly in fastpath mode"); + throw new RuntimeException(e); + } + } + return RawStoreProxy.getProxy(hiveConf, conf, rawStoreClassName, threadLocalId.get()); } @@ -5448,6 +5461,11 @@ public class HiveMetaStore extends Thrif } @Override + public void flushCache() throws TException { + getMS().flushCache(); + } + + @Override public GetPrincipalsInRoleResponse get_principals_in_role(GetPrincipalsInRoleRequest request) throws MetaException, TException { Modified: hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java URL: http://svn.apache.org/viewvc/hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java?rev=1657394&r1=1657393&r2=1657394&view=diff ============================================================================== --- hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java (original) +++ hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java Wed Feb 4 20:00:49 2015 @@ -153,6 +153,7 @@ public class HiveMetaStoreClient impleme private URI metastoreUris[]; private final HiveMetaHookLoader hookLoader; protected final HiveConf conf; + protected boolean fastpath = false; private String tokenStrForm; private final boolean localMetaStore; private final MetaStoreFilterHook filterHook; @@ -185,10 +186,20 @@ public class HiveMetaStoreClient impleme if (localMetaStore) { // instantiate the metastore server handler directly instead of connecting // through the network - client = HiveMetaStore.newRetryingHMSHandler("hive client", conf, true); + if (conf.getBoolVar(ConfVars.METASTORE_FASTPATH)) { + client = new HiveMetaStore.HMSHandler("hive client", conf, true); + fastpath = true; + } else { + client = HiveMetaStore.newRetryingHMSHandler("hive client", conf, true); + } isConnected = true; snapshotActiveConf(); return; + } else { + if (conf.getBoolVar(ConfVars.METASTORE_FASTPATH)) { + throw new RuntimeException("You can't set hive.metastore.fastpath to true when you're " + + "talking to the thrift metastore service. You must run the metastore locally."); + } } // get the number retries @@ -514,7 +525,8 @@ public class HiveMetaStoreClient impleme public Partition add_partition(Partition new_part, EnvironmentContext envContext) throws InvalidObjectException, AlreadyExistsException, MetaException, TException { - return deepCopy(client.add_partition_with_environment_context(new_part, envContext)); + Partition p = client.add_partition_with_environment_context(new_part, envContext); + return fastpath ? p : deepCopy(p); } /** @@ -574,8 +586,9 @@ public class HiveMetaStoreClient impleme public Partition appendPartition(String db_name, String table_name, List<String> part_vals, EnvironmentContext envContext) throws InvalidObjectException, AlreadyExistsException, MetaException, TException { - return deepCopy(client.append_partition_with_environment_context(db_name, table_name, - part_vals, envContext)); + Partition p = client.append_partition_with_environment_context(db_name, table_name, + part_vals, envContext); + return fastpath ? p : deepCopy(p); } @Override @@ -587,8 +600,9 @@ public class HiveMetaStoreClient impleme public Partition appendPartition(String dbName, String tableName, String partName, EnvironmentContext envContext) throws InvalidObjectException, AlreadyExistsException, MetaException, TException { - return deepCopy(client.append_partition_by_name_with_environment_context(dbName, tableName, - partName, envContext)); + Partition p = client.append_partition_by_name_with_environment_context(dbName, tableName, + partName, envContext); + return fastpath ? p : deepCopy(p); } /** @@ -980,8 +994,8 @@ public class HiveMetaStoreClient impleme @Override public List<Partition> listPartitions(String db_name, String tbl_name, short max_parts) throws NoSuchObjectException, MetaException, TException { - return deepCopyPartitions(filterHook.filterPartitions( - client.get_partitions(db_name, tbl_name, max_parts))); + List<Partition> parts = client.get_partitions(db_name, tbl_name, max_parts); + return fastpath ? parts : deepCopyPartitions(filterHook.filterPartitions(parts)); } @Override @@ -994,16 +1008,17 @@ public class HiveMetaStoreClient impleme public List<Partition> listPartitions(String db_name, String tbl_name, List<String> part_vals, short max_parts) throws NoSuchObjectException, MetaException, TException { - return deepCopyPartitions(filterHook.filterPartitions( - client.get_partitions_ps(db_name, tbl_name, part_vals, max_parts))); + List<Partition> parts = client.get_partitions_ps(db_name, tbl_name, part_vals, max_parts); + return fastpath ? parts : deepCopyPartitions(filterHook.filterPartitions(parts)); } @Override public List<Partition> listPartitionsWithAuthInfo(String db_name, String tbl_name, short max_parts, String user_name, List<String> group_names) throws NoSuchObjectException, MetaException, TException { - return deepCopyPartitions(filterHook.filterPartitions( - client.get_partitions_with_auth(db_name, tbl_name, max_parts, user_name, group_names))); + List<Partition> parts = client.get_partitions_with_auth(db_name, tbl_name, max_parts, + user_name, group_names); + return fastpath ? parts :deepCopyPartitions(filterHook.filterPartitions(parts)); } @Override @@ -1011,8 +1026,9 @@ public class HiveMetaStoreClient impleme String tbl_name, List<String> part_vals, short max_parts, String user_name, List<String> group_names) throws NoSuchObjectException, MetaException, TException { - return deepCopyPartitions(filterHook.filterPartitions(client.get_partitions_ps_with_auth(db_name, - tbl_name, part_vals, max_parts, user_name, group_names))); + List<Partition> parts = client.get_partitions_ps_with_auth(db_name, + tbl_name, part_vals, max_parts, user_name, group_names); + return fastpath ? parts : deepCopyPartitions(filterHook.filterPartitions(parts)); } /** @@ -1033,8 +1049,8 @@ public class HiveMetaStoreClient impleme public List<Partition> listPartitionsByFilter(String db_name, String tbl_name, String filter, short max_parts) throws MetaException, NoSuchObjectException, TException { - return deepCopyPartitions(filterHook.filterPartitions( - client.get_partitions_by_filter(db_name, tbl_name, filter, max_parts))); + List<Partition> parts = client.get_partitions_by_filter(db_name, tbl_name, filter, max_parts); + return fastpath ? parts :deepCopyPartitions(filterHook.filterPartitions(parts)); } @Override @@ -1070,9 +1086,13 @@ public class HiveMetaStoreClient impleme throw new IncompatibleMetastoreException( "Metastore doesn't support listPartitionsByExpr: " + te.getMessage()); } - r.setPartitions(filterHook.filterPartitions(r.getPartitions())); - // TODO: in these methods, do we really need to deepcopy? - deepCopyPartitions(r.getPartitions(), result); + if (fastpath) { + result.addAll(r.getPartitions()); + } else { + r.setPartitions(filterHook.filterPartitions(r.getPartitions())); + // TODO: in these methods, do we really need to deepcopy? + deepCopyPartitions(r.getPartitions(), result); + } return !r.isSetHasUnknownPartitions() || r.isHasUnknownPartitions(); // Assume the worst. } @@ -1088,7 +1108,8 @@ public class HiveMetaStoreClient impleme @Override public Database getDatabase(String name) throws NoSuchObjectException, MetaException, TException { - return deepCopy(filterHook.filterDatabase(client.get_database(name))); + Database d = client.get_database(name); + return fastpath ? d :deepCopy(filterHook.filterDatabase(d)); } /** @@ -1104,15 +1125,15 @@ public class HiveMetaStoreClient impleme @Override public Partition getPartition(String db_name, String tbl_name, List<String> part_vals) throws NoSuchObjectException, MetaException, TException { - return deepCopy(filterHook.filterPartition( - client.get_partition(db_name, tbl_name, part_vals))); + Partition p = client.get_partition(db_name, tbl_name, part_vals); + return fastpath ? p : deepCopy(filterHook.filterPartition(p)); } @Override public List<Partition> getPartitionsByNames(String db_name, String tbl_name, List<String> part_names) throws NoSuchObjectException, MetaException, TException { - return deepCopyPartitions(filterHook.filterPartitions( - client.get_partitions_by_names(db_name, tbl_name, part_names))); + List<Partition> parts = client.get_partitions_by_names(db_name, tbl_name, part_names); + return fastpath ? parts : deepCopyPartitions(filterHook.filterPartitions(parts)); } @Override @@ -1120,8 +1141,9 @@ public class HiveMetaStoreClient impleme List<String> part_vals, String user_name, List<String> group_names) throws MetaException, UnknownTableException, NoSuchObjectException, TException { - return deepCopy(filterHook.filterPartition(client.get_partition_with_auth(db_name, - tbl_name, part_vals, user_name, group_names))); + Partition p = client.get_partition_with_auth(db_name, tbl_name, part_vals, user_name, + group_names); + return fastpath ? p : deepCopy(filterHook.filterPartition(p)); } /** @@ -1138,7 +1160,8 @@ public class HiveMetaStoreClient impleme @Override public Table getTable(String dbname, String name) throws MetaException, TException, NoSuchObjectException { - return deepCopy(filterHook.filterTable(client.get_table(dbname, name))); + Table t = client.get_table(dbname, name); + return fastpath ? t : deepCopy(filterHook.filterTable(t)); } /** {@inheritDoc} */ @@ -1146,15 +1169,16 @@ public class HiveMetaStoreClient impleme @Deprecated public Table getTable(String tableName) throws MetaException, TException, NoSuchObjectException { - return filterHook.filterTable(getTable(DEFAULT_DATABASE_NAME, tableName)); + Table t = getTable(DEFAULT_DATABASE_NAME, tableName); + return fastpath ? t : filterHook.filterTable(t); } /** {@inheritDoc} */ @Override public List<Table> getTableObjectsByName(String dbName, List<String> tableNames) throws MetaException, InvalidOperationException, UnknownDBException, TException { - return deepCopyTables(filterHook.filterTables( - client.get_table_objects_by_name(dbName, tableNames))); + List<Table> tabs = client.get_table_objects_by_name(dbName, tableNames); + return fastpath ? tabs : deepCopyTables(filterHook.filterTables(tabs)); } /** {@inheritDoc} */ @@ -1263,7 +1287,8 @@ public class HiveMetaStoreClient impleme public List<FieldSchema> getFields(String db, String tableName) throws MetaException, TException, UnknownTableException, UnknownDBException { - return deepCopyFieldSchemas(client.get_fields(db, tableName)); + List<FieldSchema> fields = client.get_fields(db, tableName); + return fastpath ? fields : deepCopyFieldSchemas(fields); } /** @@ -1371,6 +1396,16 @@ public class HiveMetaStoreClient impleme return client.set_aggr_stats_for(request); } + @Override + public void flushCache() { + try { + client.flushCache(); + } catch (TException e) { + // Not much we can do about it honestly + LOG.warn("Got error flushing the cache", e); + } + } + /** {@inheritDoc} */ @Override public List<ColumnStatisticsObj> getTableColumnStatistics(String dbName, String tableName, @@ -1421,7 +1456,8 @@ public class HiveMetaStoreClient impleme public List<FieldSchema> getSchema(String db, String tableName) throws MetaException, TException, UnknownTableException, UnknownDBException { - return deepCopyFieldSchemas(client.get_schema(db, tableName)); + List<FieldSchema> fields = client.get_schema(db, tableName); + return fastpath ? fields : deepCopyFieldSchemas(fields); } @Override @@ -1433,8 +1469,8 @@ public class HiveMetaStoreClient impleme @Override public Partition getPartition(String db, String tableName, String partName) throws MetaException, TException, UnknownTableException, NoSuchObjectException { - return deepCopy( - filterHook.filterPartition(client.get_partition_by_name(db, tableName, partName))); + Partition p = client.get_partition_by_name(db, tableName, partName); + return fastpath ? p : deepCopy(filterHook.filterPartition(p)); } public Partition appendPartitionByName(String dbName, String tableName, String partName) @@ -1445,8 +1481,9 @@ public class HiveMetaStoreClient impleme public Partition appendPartitionByName(String dbName, String tableName, String partName, EnvironmentContext envContext) throws InvalidObjectException, AlreadyExistsException, MetaException, TException { - return deepCopy(client.append_partition_by_name_with_environment_context(dbName, tableName, - partName, envContext)); + Partition p = client.append_partition_by_name_with_environment_context(dbName, tableName, + partName, envContext); + return fastpath ? p : deepCopy(p); } public boolean dropPartitionByName(String dbName, String tableName, String partName, @@ -1948,7 +1985,8 @@ public class HiveMetaStoreClient impleme @Override public Function getFunction(String dbName, String funcName) throws MetaException, TException { - return deepCopy(client.get_function(dbName, funcName)); + Function f = client.get_function(dbName, funcName); + return fastpath ? f : deepCopy(f); } @Override Modified: hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java URL: http://svn.apache.org/viewvc/hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java?rev=1657394&r1=1657393&r2=1657394&view=diff ============================================================================== --- hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java (original) +++ hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java Wed Feb 4 20:00:49 2015 @@ -1379,4 +1379,10 @@ public interface IMetaStoreClient { List<String> colNames, List<String> partName) throws NoSuchObjectException, MetaException, TException; boolean setPartitionColumnStatistics(SetPartitionsStatsRequest request) throws NoSuchObjectException, InvalidObjectException, MetaException, TException, InvalidInputException; + + /** + * Flush any catalog objects held by the metastore implementation. Note that this does not + * flush statistics objects. This should be called at the beginning of each query. + */ + void flushCache(); } Modified: hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java URL: http://svn.apache.org/viewvc/hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java?rev=1657394&r1=1657393&r2=1657394&view=diff ============================================================================== --- hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java (original) +++ hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java Wed Feb 4 20:00:49 2015 @@ -6115,6 +6115,11 @@ public class ObjectStore implements RawS }.run(true); } + @Override + public void flushCache() { + // NOP as there's no caching + } + private List<MPartitionColumnStatistics> getMPartitionColumnStatistics( Table table, List<String> partNames, List<String> colNames) throws NoSuchObjectException, MetaException { Modified: hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java URL: http://svn.apache.org/viewvc/hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java?rev=1657394&r1=1657393&r2=1657394&view=diff ============================================================================== --- hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java (original) +++ hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java Wed Feb 4 20:00:49 2015 @@ -589,5 +589,10 @@ public interface RawStore extends Config * @return */ public CurrentNotificationEventId getCurrentNotificationEventId(); - + + /* + * Flush any catalog objects held by the metastore implementation. Note that this does not + * flush statistics objects. This should be called at the beginning of each query. + */ + public void flushCache(); } Modified: hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/RawStoreProxy.java URL: http://svn.apache.org/viewvc/hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/RawStoreProxy.java?rev=1657394&r1=1657393&r2=1657394&view=diff ============================================================================== --- hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/RawStoreProxy.java (original) +++ hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/RawStoreProxy.java Wed Feb 4 20:00:49 2015 @@ -26,6 +26,8 @@ import java.lang.reflect.UndeclaredThrow import java.util.List; import org.apache.commons.lang.ClassUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hadoop.hive.common.classification.InterfaceStability; @@ -37,6 +39,8 @@ import org.apache.hadoop.util.Reflection @InterfaceStability.Evolving public class RawStoreProxy implements InvocationHandler { + static final private Log LOG = LogFactory.getLog(RawStoreProxy.class.getName()); + private final RawStore base; private final MetaStoreInit.MetaStoreInitData metaStoreInitData = new MetaStoreInit.MetaStoreInitData(); @@ -95,6 +99,7 @@ public class RawStoreProxy implements In Object ret = null; try { + LOG.info("Invoking " + method.toGenericString()); ret = method.invoke(base, args); } catch (UndeclaredThrowableException e) { throw e.getCause(); Added: hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/Counter.java URL: http://svn.apache.org/viewvc/hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/Counter.java?rev=1657394&view=auto ============================================================================== --- hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/Counter.java (added) +++ hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/Counter.java Wed Feb 4 20:00:49 2015 @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hive.metastore.hbase; + +/** + * A simple metric to count how many times something occurs. + */ +class Counter { + private final String name; + private long cnt; + + Counter(String name) { + this.name = name; + cnt = 0; + } + + void incr() { + cnt++; + } + + void clear() { + cnt = 0; + } + + String dump() { + StringBuilder bldr = new StringBuilder("Dumping metric: "); + bldr.append(name).append(' ').append(cnt); + return bldr.toString(); + } + +} Added: hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/DatabaseWritable.java URL: http://svn.apache.org/viewvc/hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/DatabaseWritable.java?rev=1657394&view=auto ============================================================================== --- hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/DatabaseWritable.java (added) +++ hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/DatabaseWritable.java Wed Feb 4 20:00:49 2015 @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hive.metastore.hbase; + +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.io.Writable; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * Wrapper for {@link org.apache.hadoop.hive.metastore.api.Database} that makes it writable + */ +class DatabaseWritable implements Writable { + final Database db; + + DatabaseWritable() { + this.db = new Database(); + } + + DatabaseWritable(Database db) { + this.db = db; + } + + @Override + public void write(DataOutput out) throws IOException { + HBaseUtils.writeStr(out, db.getName()); + HBaseUtils.writeStr(out, db.getDescription()); + HBaseUtils.writeStr(out, db.getLocationUri()); + HBaseUtils.writeStrStrMap(out, db.getParameters()); + HBaseUtils.writePrivileges(out, db.getPrivileges()); + HBaseUtils.writeStr(out, db.getOwnerName()); + HBaseUtils.writePrincipalType(out, db.getOwnerType()); + } + + @Override + public void readFields(DataInput in) throws IOException { + db.setName(HBaseUtils.readStr(in)); + db.setDescription(HBaseUtils.readStr(in)); + db.setLocationUri(HBaseUtils.readStr(in)); + db.setParameters(HBaseUtils.readStrStrMap(in)); + db.setPrivileges(HBaseUtils.readPrivileges(in)); + db.setOwnerName(HBaseUtils.readStr(in)); + db.setOwnerType(HBaseUtils.readPrincipalType(in)); + } +} Added: hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java URL: http://svn.apache.org/viewvc/hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java?rev=1657394&view=auto ============================================================================== --- hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java (added) +++ hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java Wed Feb 4 20:00:49 2015 @@ -0,0 +1,993 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hive.metastore.hbase; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.CompareFilter; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.RegexStringComparator; +import org.apache.hadoop.hbase.filter.RowFilter; +import org.apache.hadoop.hive.common.ObjectPair; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.ColumnStatistics; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Role; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; + +import java.io.IOException; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Deque; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + + +/** + * Class to manage storing object in and reading them from HBase. + */ +class HBaseReadWrite { + + @VisibleForTesting final static String DB_TABLE = "DBS"; + @VisibleForTesting final static String PART_TABLE = "PARTITIONS"; + @VisibleForTesting final static String ROLE_TABLE = "ROLES"; + @VisibleForTesting final static String SD_TABLE = "SDS"; + @VisibleForTesting final static String TABLE_TABLE = "TBLS"; + @VisibleForTesting final static byte[] CATALOG_CF = "c".getBytes(HBaseUtils.ENCODING); + @VisibleForTesting final static byte[] STATS_CF = "s".getBytes(HBaseUtils.ENCODING); + @VisibleForTesting final static String NO_CACHE_CONF = "no.use.cache"; + private final static byte[] CATALOG_COL = "cat".getBytes(HBaseUtils.ENCODING); + private final static byte[] REF_COUNT_COL = "ref".getBytes(HBaseUtils.ENCODING); + private final static int tablesToCache = 10; + + // TODO Add privileges as a second column in the CATALOG_CF + + private final static String[] tableNames = { DB_TABLE, PART_TABLE, ROLE_TABLE, SD_TABLE, + TABLE_TABLE }; + static final private Log LOG = LogFactory.getLog(HBaseReadWrite.class.getName()); + + private static ThreadLocal<HBaseReadWrite> self = new ThreadLocal<HBaseReadWrite>() { + @Override + protected HBaseReadWrite initialValue() { + if (staticConf == null) { + throw new RuntimeException("Attempt to create HBaseReadWrite with no configuration set"); + } + return new HBaseReadWrite(staticConf); + } + }; + + private static boolean tablesCreated = false; + private static Configuration staticConf = null; + + private Configuration conf; + private HConnection conn; + private Map<String, HTableInterface> tables; + private MessageDigest md; + private ObjectCache<ObjectPair<String, String>, Table> tableCache; + private ObjectCache<ByteArrayWrapper, StorageDescriptor> sdCache; + private PartitionCache partCache; + private StatsCache statsCache; + private Counter tableHits; + private Counter tableMisses; + private Counter tableOverflows; + private Counter partHits; + private Counter partMisses; + private Counter partOverflows; + private Counter sdHits; + private Counter sdMisses; + private Counter sdOverflows; + private List<Counter> counters; + + /** + * Get the instance of HBaseReadWrite for the current thread. This is intended to be used by + * {@link org.apache.hadoop.hive.metastore.hbase.HBaseStore} since it creates the thread local + * version of this class. + * @param configuration Configuration object + * @return thread's instance of HBaseReadWrite + */ + static HBaseReadWrite getInstance(Configuration configuration) { + staticConf = configuration; + return self.get(); + } + + /** + * Get the instance of HBaseReadWrite for the current thread. This is inteded to be used after + * the thread has been initialized. Woe betide you if that's not the case. + * @return thread's instance of HBaseReadWrite + */ + static HBaseReadWrite getInstance() { + return self.get(); + } + + private HBaseReadWrite(Configuration configuration) { + conf = configuration; + try { + conn = HConnectionManager.createConnection(conf); + } catch (IOException e) { + throw new RuntimeException(e); + } + tables = new HashMap<String, HTableInterface>(); + + try { + md = MessageDigest.getInstance("MD5"); + } catch (NoSuchAlgorithmException e) { + throw new RuntimeException(e); + } + int totalObjectsToCache = + HiveConf.getIntVar(conf, HiveConf.ConfVars.METASTORE_HBASE_CACHE_SIZE); + + tableHits = new Counter("table cache hits"); + tableMisses = new Counter("table cache misses"); + tableOverflows = new Counter("table cache overflows"); + partHits = new Counter("partition cache hits"); + partMisses = new Counter("partition cache misses"); + partOverflows = new Counter("partition cache overflows"); + sdHits = new Counter("storage descriptor cache hits"); + sdMisses = new Counter("storage descriptor cache misses"); + sdOverflows = new Counter("storage descriptor cache overflows"); + counters = new ArrayList<Counter>(); + counters.add(tableHits); + counters.add(tableMisses); + counters.add(tableOverflows); + counters.add(partHits); + counters.add(partMisses); + counters.add(partOverflows); + counters.add(sdHits); + counters.add(sdMisses); + counters.add(sdOverflows); + + // Divide 50/50 between catalog and stats, then give 1% of catalog space to storage + // descriptors (storage descriptors are shared, so 99% should be the same for a + // given table). + int sdsCacheSize = totalObjectsToCache / 100; + if (conf.getBoolean(NO_CACHE_CONF, false)) { + tableCache = new BogusObjectCache<ObjectPair<String, String>, Table>(); + sdCache = new BogusObjectCache<ByteArrayWrapper, StorageDescriptor>(); + partCache = new BogusPartitionCache(); + statsCache = StatsCache.getBogusStatsCache(); + } else { + tableCache = new ObjectCache<ObjectPair<String, String>, Table>(tablesToCache, tableHits, + tableMisses, tableOverflows); + sdCache = new ObjectCache<ByteArrayWrapper, StorageDescriptor>(sdsCacheSize, sdHits, + sdMisses, sdOverflows); + partCache = new PartitionCache(totalObjectsToCache / 2, partHits, partMisses, partOverflows); + statsCache = StatsCache.getInstance(conf); + } + } + + // Synchronize this so not everyone's doing it at once. + static synchronized void createTablesIfNotExist() throws IOException { + if (!tablesCreated) { + LOG.debug("Determing which tables need created"); + HBaseAdmin admin = new HBaseAdmin(self.get().conn); + LOG.debug("Got hbase admin"); + for (String name : tableNames) { + LOG.debug("Checking for table " + name); + if (self.get().getHTable(name) == null) { + LOG.debug("Creating table " + name); + HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(name)); + tableDesc.addFamily(new HColumnDescriptor(CATALOG_CF)); + // Only table and partitions need stats + if (TABLE_TABLE.equals(name) || PART_TABLE.equals(name)) { + tableDesc.addFamily(new HColumnDescriptor(STATS_CF)); + } + admin.createTable(tableDesc); + } + } + admin.close(); + tablesCreated = true; + } + } + + /** + * Begin a transaction + */ + void begin() { + // NOP for now + } + + /** + * Commit a transaction + */ + void commit() { + // NOP for now + } + + void rollback() { + // NOP for now + } + + void close() throws IOException { + for (HTableInterface htab : tables.values()) htab.close(); + conn.close(); + } + + /** + * Fetch a database object + * @param name name of the database to fetch + * @return the database object, or null if there is no such database + * @throws IOException + */ + Database getDb(String name) throws IOException { + byte[] key = HBaseUtils.buildKey(name); + byte[] serialized = read(DB_TABLE, key, CATALOG_CF, CATALOG_COL); + if (serialized == null) return null; + DatabaseWritable db = new DatabaseWritable(); + HBaseUtils.deserialize(db, serialized); + return db.db; + } + + /** + * Store a database object + * @param database database object to store + * @throws IOException + */ + void putDb(Database database) throws IOException { + DatabaseWritable db = new DatabaseWritable(database); + byte[] key = HBaseUtils.buildKey(db.db.getName()); + byte[] serialized = HBaseUtils.serialize(db); + store(DB_TABLE, key, CATALOG_CF, CATALOG_COL, serialized); + flush(); + } + + /** + * Drop a database + * @param name name of db to drop + * @throws IOException + */ + void deleteDb(String name) throws IOException { + byte[] key = HBaseUtils.buildKey(name); + delete(DB_TABLE, key, null, null); + flush(); + } + + /** + * Fetch one partition + * @param dbName database table is in + * @param tableName table partition is in + * @param partVals list of values that specify the partition, given in the same order as the + * columns they belong to + * @return The partition objec,t or null if there is no such partition + * @throws IOException + */ + Partition getPartition(String dbName, String tableName, List<String> partVals) + throws IOException { + return getPartition(dbName, tableName, partVals, true); + } + + /** + * Add a partition + * @param partition partition object to add + * @throws IOException + */ + void putPartition(Partition partition) throws IOException { + PartitionWritable part = new PartitionWritable(partition); + byte[] key = buildPartitionKey(part); + byte[] serialized = HBaseUtils.serialize(part); + store(PART_TABLE, key, CATALOG_CF, CATALOG_COL, serialized); + flush(); + partCache.put(partition.getDbName(), partition.getTableName(), partition); + } + + /** + * Find all the partitions in a table. + * @param dbName name of the database the table is in + * @param tableName table name + * @param maxPartitions max partitions to fetch. If negative all partitions will be returned. + * @return List of partitions that match the criteria. + * @throws IOException + */ + List<Partition> scanPartitionsInTable(String dbName, String tableName, int maxPartitions) + throws IOException { + if (maxPartitions < 0) maxPartitions = Integer.MAX_VALUE; + Collection<Partition> cached = partCache.getAllForTable(dbName, tableName); + if (cached != null) { + return maxPartitions < cached.size() + ? new ArrayList<Partition>(cached).subList(0, maxPartitions) + : new ArrayList<Partition>(cached); + } + byte[] keyPrefix = HBaseUtils.buildKeyWithTrailingSeparator(dbName, tableName); + List<Partition> parts = scanOnPrefix(PART_TABLE, keyPrefix, CATALOG_CF, CATALOG_COL, -1); + partCache.put(dbName, tableName, parts, true); + return maxPartitions < parts.size() ? parts.subList(0, maxPartitions) : parts; + } + + /** + * Scan partitions based on partial key information. + * @param dbName name of database, required + * @param tableName name of table, required + * @param partVals partial specification of values. Any values that are unknown can be left + * null in the list. For example, if a table had two partition columns date + * and region (in that order), and partitions ('today', 'na'), ('today', 'eu'), + * ('tomorrow', 'na'), ('tomorrow', 'eu') then passing ['today'] would return + * ('today', 'na') and ('today', 'eu') while passing [null, 'eu'] would return + * ('today', 'eu') and ('tomorrow', 'eu') + * @param maxPartitions Maximum number of entries to return. + * @return list of partitions that match the specified information + * @throws IOException + */ + List<Partition> scanPartitions(String dbName, String tableName, List<String> partVals, + int maxPartitions) throws IOException { + byte[] keyPrefix; + if (partVals == null || partVals.size() == 0) { + keyPrefix = HBaseUtils.buildKeyWithTrailingSeparator(dbName, tableName); + return scanOnPrefix(PART_TABLE, keyPrefix, CATALOG_CF, CATALOG_COL, maxPartitions); + } + int firstNull = 0; + for (; firstNull < partVals.size(); firstNull++) { + if (partVals.get(firstNull) == null) break; + } + if (firstNull == partVals.size()) { + keyPrefix = buildPartitionKey(dbName, tableName, partVals); + return scanOnPrefix(PART_TABLE, keyPrefix, CATALOG_CF, CATALOG_COL, maxPartitions); + } + keyPrefix = buildPartitionKey(dbName, tableName, partVals.subList(0, firstNull)); + StringBuilder regex = new StringBuilder(); + regex.append(dbName); + regex.append(':'); + regex.append(tableName); + for (String val : partVals) { + regex.append(HBaseUtils.KEY_SEPARATOR); + if (val == null) regex.append("[^" + HBaseUtils.KEY_SEPARATOR + "]+"); // Will this do + // what I want? + else regex.append(val); + } + + Filter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, + new RegexStringComparator(regex.toString())); + + List<Partition> parts = scanOnPrefixWithFilter(PART_TABLE, keyPrefix, CATALOG_CF, CATALOG_COL, + maxPartitions, filter); + partCache.put(dbName, tableName, parts, false); + return parts; + } + + /** + * Delete a partition + * @param dbName database name that table is in + * @param tableName table partition is in + * @param partVals partition values that define this partition, in the same order as the + * partition columns they are values for + * @throws IOException + */ + void deletePartition(String dbName, String tableName, List<String> partVals) throws IOException { + // Find the partition so I can get the storage descriptor and drop it + partCache.remove(dbName, tableName, partVals); + Partition p = getPartition(dbName, tableName, partVals, false); + decrementStorageDescriptorRefCount(p.getSd()); + byte[] key = buildPartitionKey(dbName, tableName, partVals); + delete(PART_TABLE, key, null, null); + flush(); + } + + /** + * Fetch a role + * @param roleName name of the role + * @return role object, or null if no such role + * @throws IOException + */ + Role getRole(String roleName) throws IOException { + byte[] key = HBaseUtils.buildKey(roleName); + byte[] serialized = read(ROLE_TABLE, key, CATALOG_CF, CATALOG_COL); + if (serialized == null) return null; + RoleWritable role = new RoleWritable(); + HBaseUtils.deserialize(role, serialized); + return role.role; + } + + /** + * Add a new role + * @param role role object + * @throws IOException + */ + void putRole(Role role) throws IOException { + byte[] key = HBaseUtils.buildKey(role.getRoleName()); + byte[] serialized = HBaseUtils.serialize(new RoleWritable(role)); + store(ROLE_TABLE, key, CATALOG_CF, CATALOG_COL, serialized); + flush(); + } + + /** + * Drop a role + * @param roleName name of role to drop + * @throws IOException + */ + void deleteRole(String roleName) throws IOException { + byte[] key = HBaseUtils.buildKey(roleName); + delete(ROLE_TABLE, key, null, null); + flush(); + } + + /** + * Fetch a table object + * @param dbName database the table is in + * @param tableName table name + * @return Table object, or null if no such table + * @throws IOException + */ + Table getTable(String dbName, String tableName) throws IOException { + return getTable(dbName, tableName, true); + } + + /** + * Put a table object + * @param table table object + * @throws IOException + */ + void putTable(Table table) throws IOException { + byte[] key = HBaseUtils.buildKey(table.getDbName(), table.getTableName()); + byte[] serialized = HBaseUtils.serialize(new TableWritable(table)); + store(TABLE_TABLE, key, CATALOG_CF, CATALOG_COL, serialized); + flush(); + tableCache.put(new ObjectPair<String, String>(table.getDbName(), table.getTableName()), table); + } + + /** + * Delete a table + * @param dbName name of database table is in + * @param tableName table to drop + * @throws IOException + */ + void deleteTable(String dbName, String tableName) throws IOException { + tableCache.remove(new ObjectPair<String, String>(dbName, tableName)); + // Find the table so I can get the storage descriptor and drop it + Table t = getTable(dbName, tableName, false); + decrementStorageDescriptorRefCount(t.getSd()); + byte[] key = HBaseUtils.buildKey(dbName, tableName); + delete(TABLE_TABLE, key, null, null); + flush(); + } + + /** + * If this serde has already been read, then return it from the cache. If not, read it, then + * return it. + * @param hash + * @return + * @throws IOException + */ + StorageDescriptor getStorageDescriptor(byte[] hash) throws IOException { + ByteArrayWrapper hashKey = new ByteArrayWrapper(hash); + StorageDescriptor cached = sdCache.get(hashKey); + if (cached != null) return cached; + byte[] serialized = read(SD_TABLE, hash, CATALOG_CF, CATALOG_COL); + if (serialized == null) { + throw new RuntimeException("Woh, bad! Trying to fetch a non-existent storage descriptor " + + "from hash " + hash); + } + StorageDescriptor sd = new StorageDescriptor(); + HBaseUtils.deserializeStorageDescriptor(sd, serialized); + sdCache.put(hashKey, sd); + return sd; + } + + /** + * Lower the reference count on the storage descriptor by one. If it goes to zero, then it + * will be deleted. + * @param sd Storage descriptor + * @throws IOException + */ + void decrementStorageDescriptorRefCount(StorageDescriptor sd) throws IOException { + byte[] serialized = HBaseUtils.serializeStorageDescriptor(sd); + byte[] key = hash(serialized); + for (int i = 0; i < 10; i++) { + byte[] serializedRefCnt = read(SD_TABLE, key, CATALOG_CF, REF_COUNT_COL); + if (serializedRefCnt == null) { + // Someone deleted it before we got to it, no worries + return; + } + int refCnt = Integer.valueOf(new String(serializedRefCnt, HBaseUtils.ENCODING)); + HTableInterface htab = getHTable(SD_TABLE); + if (refCnt-- < 1) { + Delete d = new Delete(key); + if (htab.checkAndDelete(key, CATALOG_CF, REF_COUNT_COL, serializedRefCnt, d)) { + sdCache.remove(new ByteArrayWrapper(key)); + return; + } + } else { + Put p = new Put(key); + p.add(CATALOG_CF, REF_COUNT_COL, Integer.toString(refCnt).getBytes(HBaseUtils.ENCODING)); + if (htab.checkAndPut(key, CATALOG_CF, REF_COUNT_COL, serializedRefCnt, p)) { + return; + } + } + } + throw new IOException("Too many unsuccessful attepts to decrement storage counter"); + } + + /** + * Place the common parts of a storage descriptor into the cache. + * @param storageDescriptor storage descriptor to store. + * @return id of the entry in the cache, to be written in for the storage descriptor + */ + byte[] putStorageDescriptor(StorageDescriptor storageDescriptor) throws IOException { + byte[] sd = HBaseUtils.serializeStorageDescriptor(storageDescriptor); + byte[] key = hash(sd); + for (int i = 0; i < 10; i++) { + byte[] serializedRefCnt = read(SD_TABLE, key, CATALOG_CF, REF_COUNT_COL); + HTableInterface htab = getHTable(SD_TABLE); + if (serializedRefCnt == null) { + // We are the first to put it in the DB + Put p = new Put(key); + p.add(CATALOG_CF, CATALOG_COL, sd); + p.add(CATALOG_CF, REF_COUNT_COL, "0".getBytes(HBaseUtils.ENCODING)); + if (htab.checkAndPut(key, CATALOG_CF, REF_COUNT_COL, null, p)) { + sdCache.put(new ByteArrayWrapper(key), storageDescriptor); + return key; + } + } else { + // Just increment the reference count + int refCnt = Integer.valueOf(new String(serializedRefCnt, HBaseUtils.ENCODING)) + 1; + Put p = new Put(key); + p.add(CATALOG_CF, REF_COUNT_COL, Integer.toString(refCnt).getBytes(HBaseUtils.ENCODING)); + if (htab.checkAndPut(key, CATALOG_CF, REF_COUNT_COL, serializedRefCnt, p)) { + return key; + } + } + } + throw new IOException("Too many unsuccessful attepts to increment storage counter"); + } + + /** + * Update statistics for one or more columns for a table or a partition. + * @param dbName database the table is in + * @param tableName table to update statistics for + * @param partName name of the partition, can be null if these are table level statistics. + * @param partVals partition values that define partition to update statistics for. If this is + * null, then these will be assumed to be table level statistics. + * @param stats Stats object with stats for one or more columns. + * @throws IOException + */ + void updateStatistics(String dbName, String tableName, String partName, List<String> partVals, + ColumnStatistics stats) throws IOException { + byte[] key = getStatisticsKey(dbName, tableName, partVals); + String hbaseTable = getStatisticsTable(partVals); + + byte[][] colnames = new byte[stats.getStatsObjSize()][]; + byte[][] serializeds = new byte[stats.getStatsObjSize()][]; + for (int i = 0; i < stats.getStatsObjSize(); i++) { + ColumnStatisticsObj obj = stats.getStatsObj().get(i); + serializeds[i] = HBaseUtils.serializeStatsForOneColumn(stats, obj); + String colname = obj.getColName(); + colnames[i] = HBaseUtils.buildKey(colname); + statsCache.put(dbName, tableName, partName, colname, obj, + stats.getStatsDesc().getLastAnalyzed()); + } + store(hbaseTable, key, STATS_CF, colnames, serializeds); + flush(); + } + + /** + * Get Statistics for a table + * @param dbName name of database table is in + * @param tableName name of table + * @param colNames list of column names to get statistics for + * @return column statistics for indicated table + * @throws IOException + */ + ColumnStatistics getTableStatistics(String dbName, String tableName, List<String> colNames) + throws IOException { + byte[] key = HBaseUtils.buildKey(dbName, tableName); + ColumnStatistics stats = new ColumnStatistics(); + ColumnStatisticsDesc desc = new ColumnStatisticsDesc(); + desc.setIsTblLevel(true); + desc.setDbName(dbName); + desc.setTableName(tableName); + stats.setStatsDesc(desc); + + // First we have to go through and see what's in the cache and fetch what we can from there. + // Then we'll fetch the rest from HBase + List<String> stillLookingFor = new ArrayList<String>(); + for (int i = 0; i < colNames.size(); i++) { + StatsCache.StatsInfo info = + statsCache.getTableStatistics(dbName, tableName, colNames.get(i)); + if (info == null) { + stillLookingFor.add(colNames.get(i)); + } else { + info.stats.setColName(colNames.get(i)); + stats.addToStatsObj(info.stats); + stats.getStatsDesc().setLastAnalyzed(Math.max(stats.getStatsDesc().getLastAnalyzed(), + info.lastAnalyzed)); + } + } + if (stillLookingFor.size() == 0) return stats; + + byte[][] colKeys = new byte[stillLookingFor.size()][]; + for (int i = 0; i < colKeys.length; i++) { + colKeys[i] = HBaseUtils.buildKey(stillLookingFor.get(i)); + } + Result res = read(TABLE_TABLE, key, STATS_CF, colKeys); + for (int i = 0; i < colKeys.length; i++) { + byte[] serialized = res.getValue(STATS_CF, colKeys[i]); + if (serialized == null) { + // There were no stats for this column, so skip it + continue; + } + ColumnStatisticsObj obj = HBaseUtils.deserializeStatsForOneColumn(stats, serialized); + statsCache.put(dbName, tableName, null, stillLookingFor.get(i), obj, + stats.getStatsDesc().getLastAnalyzed()); + obj.setColName(stillLookingFor.get(i)); + stats.addToStatsObj(obj); + } + return stats; + } + + /** + * Get statistics for a set of partitions + * @param dbName name of database table is in + * @param tableName table partitions are in + * @param partNames names of the partitions, used only to set values inside the return stats + * objects. + * @param partVals partition values for each partition, needed because this class doesn't know + * how to translate from partName to partVals + * @param colNames column names to fetch stats for. These columns will be fetched for all + * requested partitions. + * @return list of ColumnStats, one for each partition. The values will be in the same order + * as the partNames list that was passed in. + * @throws IOException + */ + List<ColumnStatistics> getPartitionStatistics(String dbName, String tableName, + List<String> partNames, + List<List<String>> partVals, + List<String> colNames) throws IOException { + // Go through the cache first, see what we can fetch from there. This is complicated because + // we may have different columns for different partitions + List<ColumnStatistics> statsList = new ArrayList<ColumnStatistics>(partNames.size()); + List<PartStatsInfo> stillLookingFor = new ArrayList<PartStatsInfo>(); + for (int pOff = 0; pOff < partVals.size(); pOff++) { + // Add an entry for this partition in the list + ColumnStatistics stats = new ColumnStatistics(); + ColumnStatisticsDesc desc = new ColumnStatisticsDesc(); + desc.setIsTblLevel(false); + desc.setDbName(dbName); + desc.setTableName(tableName); + desc.setPartName(partNames.get(pOff)); + stats.setStatsDesc(desc); + statsList.add(stats); + PartStatsInfo missing = null; + + for (int cOff = 0; cOff < colNames.size(); cOff++) { + StatsCache.StatsInfo info = statsCache.getPartitionStatistics(dbName, tableName, + partNames.get(pOff), colNames.get(cOff)); + if (info == null) { + if (missing == null) { + // We haven't started an entry for this one yet + missing = new PartStatsInfo(stats, partVals.get(pOff), partNames.get(pOff)); + stillLookingFor.add(missing); + } + missing.colNames.add(colNames.get(cOff)); + } else { + info.stats.setColName(colNames.get(cOff)); + stats.addToStatsObj(info.stats); + stats.getStatsDesc().setLastAnalyzed(Math.max(stats.getStatsDesc().getLastAnalyzed(), + info.lastAnalyzed)); + } + } + } + if (stillLookingFor.size() == 0) return statsList; + + // Build the list of gets. It may be different for each partition now depending on what we + // found in the cache. + List<Get> gets = new ArrayList<Get>(); + for (PartStatsInfo pi : stillLookingFor) { + byte[][] colKeys = new byte[pi.colNames.size()][]; + for (int i = 0; i < colKeys.length; i++) { + colKeys[i] = HBaseUtils.buildKey(pi.colNames.get(i)); + } + pi.colKeys = colKeys; + + byte[] key = buildPartitionKey(dbName, tableName, pi.partVals); + Get g = new Get(key); + for (byte[] colName : colKeys) g.addColumn(STATS_CF, colName); + gets.add(g); + } + HTableInterface htab = getHTable(PART_TABLE); + Result[] results = htab.get(gets); + + for (int pOff = 0; pOff < results.length; pOff++) { + PartStatsInfo pi = stillLookingFor.get(pOff); + for (int cOff = 0; cOff < pi.colNames.size(); cOff++) { + byte[] serialized = results[pOff].getValue(STATS_CF, pi.colKeys[cOff]); + if (serialized == null) { + // There were no stats for this column, so skip it + continue; + } + ColumnStatisticsObj obj = HBaseUtils.deserializeStatsForOneColumn(pi.stats, serialized); + statsCache.put(dbName, tableName, pi.partName, pi.colNames.get(cOff), obj, + pi.stats.getStatsDesc().getLastAnalyzed()); + obj.setColName(pi.colNames.get(cOff)); + pi.stats.addToStatsObj(obj); + } + } + return statsList; + } + + /** + * This should be called whenever a new query is started. + */ + void flushCatalogCache() { + for (Counter counter : counters) { + LOG.debug(counter.dump()); + counter.clear(); + } + tableCache.flush(); + sdCache.flush(); + partCache.flush(); + } + + @VisibleForTesting + int countStorageDescriptor() throws IOException { + ResultScanner scanner = getHTable(SD_TABLE).getScanner(new Scan()); + int cnt = 0; + while (scanner.next() != null) cnt++; + return cnt; + } + + private Table getTable(String dbName, String tableName, boolean populateCache) + throws IOException { + ObjectPair<String, String> hashKey = new ObjectPair<String, String>(dbName, tableName); + Table cached = tableCache.get(hashKey); + if (cached != null) return cached; + byte[] key = HBaseUtils.buildKey(dbName, tableName); + byte[] serialized = read(TABLE_TABLE, key, CATALOG_CF, CATALOG_COL); + if (serialized == null) return null; + TableWritable table = new TableWritable(); + HBaseUtils.deserialize(table, serialized); + if (populateCache) tableCache.put(hashKey, table.table); + return table.table; + } + + private Partition getPartition(String dbName, String tableName, List<String> partVals, + boolean populateCache) throws IOException { + Partition cached = partCache.get(dbName, tableName, partVals); + if (cached != null) return cached; + byte[] key = buildPartitionKey(dbName, tableName, partVals); + byte[] serialized = read(PART_TABLE, key, CATALOG_CF, CATALOG_COL); + if (serialized == null) return null; + PartitionWritable part = new PartitionWritable(); + HBaseUtils.deserialize(part, serialized); + if (populateCache) partCache.put(dbName, tableName, part.part); + return part.part; + } + + private void store(String table, byte[] key, byte[] colFam, byte[] colName, byte[] obj) + throws IOException { + HTableInterface htab = getHTable(table); + Put p = new Put(key); + p.add(colFam, colName, obj); + htab.put(p); + } + + private void store(String table, byte[] key, byte[] colFam, byte[][] colName, byte[][] obj) + throws IOException { + HTableInterface htab = getHTable(table); + Put p = new Put(key); + for (int i = 0; i < colName.length; i++) { + p.add(colFam, colName[i], obj[i]); + } + htab.put(p); + } + + private byte[] read(String table, byte[] key, byte[] colFam, byte[] colName) throws IOException { + HTableInterface htab = getHTable(table); + Get g = new Get(key); + g.addColumn(colFam, colName); + Result res = htab.get(g); + return res.getValue(colFam, colName); + } + + private Result read(String table, byte[] key, byte[] colFam, byte[][] colNames) + throws IOException { + HTableInterface htab = getHTable(table); + Get g = new Get(key); + for (byte[] colName : colNames) g.addColumn(colFam, colName); + return htab.get(g); + } + + // Delete a row. If colFam and colName are not null, then only the named column will be + // deleted. If colName is null and colFam is not, only the named family will be deleted. If + // both are null the entire row will be deleted. + private void delete(String table, byte[] key, byte[] colFam, byte[] colName) throws IOException { + HTableInterface htab = getHTable(table); + Delete d = new Delete(key); + if (colName != null) d.deleteColumn(colFam, colName); + else if (colFam != null) d.deleteFamily(colFam); + htab.delete(d); + } + + private List<Partition> scanOnPrefix(String table, byte[] keyPrefix, byte[] colFam, byte[] colName, + int maxResults) throws IOException { + return scanOnPrefixWithFilter(table, keyPrefix, colFam, colName, maxResults, null); + } + + private List<Partition> scanOnPrefixWithFilter(String table, byte[] keyPrefix, byte[] colFam, + byte[] colName, int maxResults, Filter filter) + throws IOException { + HTableInterface htab = getHTable(table); + byte[] stop = Arrays.copyOf(keyPrefix, keyPrefix.length); + stop[stop.length - 1]++; + Scan s = new Scan(keyPrefix, stop); + s.addColumn(colFam, colName); + if (filter != null) s.setFilter(filter); + ResultScanner scanner = htab.getScanner(s); + List<Partition> parts = new ArrayList<Partition>(); + int numToFetch = maxResults < 0 ? Integer.MAX_VALUE : maxResults; + Iterator<Result> iter = scanner.iterator(); + for (int i = 0; i < numToFetch && iter.hasNext(); i++) { + PartitionWritable p = new PartitionWritable(); + HBaseUtils.deserialize(p, iter.next().getValue(colFam, colName)); + parts.add(p.part); + } + return parts; + } + + private HTableInterface getHTable(String table) throws IOException { + HTableInterface htab = tables.get(table); + if (htab == null) { + LOG.debug("Trying to connect to table " + table); + try { + htab = conn.getTable(table); + // Calling gettable doesn't actually connect to the region server, it's very light + // weight, so call something else so we actually reach out and touch the region server + // and see if the table is there. + Result r = htab.get(new Get("nosuchkey".getBytes(HBaseUtils.ENCODING))); + } catch (IOException e) { + LOG.info("Caught exception when table was missing"); + return null; + } + htab.setAutoFlushTo(false); + tables.put(table, htab); + } + return htab; + } + + private void flush() throws IOException { + for (HTableInterface htab : tables.values()) htab.flushCommits(); + } + + private byte[] buildPartitionKey(String dbName, String tableName, List<String> partVals) { + Deque<String> keyParts = new ArrayDeque<String>(partVals); + keyParts.addFirst(tableName); + keyParts.addFirst(dbName); + return HBaseUtils.buildKey(keyParts.toArray(new String[keyParts.size()])); + } + + private byte[] buildPartitionKey(PartitionWritable part) throws IOException { + Deque<String> keyParts = new ArrayDeque<String>(part.part.getValues()); + keyParts.addFirst(part.part.getTableName()); + keyParts.addFirst(part.part.getDbName()); + return HBaseUtils.buildKey(keyParts.toArray(new String[keyParts.size()])); + } + + private byte[] hash(byte[] serialized) throws IOException { + md.update(serialized); + return md.digest(); + } + + private byte[] getStatisticsKey(String dbName, String tableName, List<String> partVals) { + return partVals == null ? + HBaseUtils.buildKey(dbName, tableName) : + buildPartitionKey(dbName, tableName, partVals); + } + + private String getStatisticsTable(List<String> partVals) { + return partVals == null ? TABLE_TABLE : PART_TABLE; + } + + /** + * Use this for unit testing only, so that a mock connection object can be passed in. + * @param connection Mock connection objecct + */ + @VisibleForTesting + void setConnection(HConnection connection) { + conn = connection; + } + + private static class ByteArrayWrapper { + byte[] wrapped; + + ByteArrayWrapper(byte[] b) { + wrapped = b; + } + + @Override + public boolean equals(Object other) { + if (other instanceof ByteArrayWrapper) { + return Arrays.equals(((ByteArrayWrapper)other).wrapped, wrapped); + } else { + return false; + } + } + + @Override + public int hashCode() { + return Arrays.hashCode(wrapped); + } + } + + private static class PartStatsInfo { + ColumnStatistics stats; + String partName; + List<String> colNames; + List<String> partVals; + byte[][] colKeys; + + PartStatsInfo(ColumnStatistics s, List<String> pv, String pn) { + stats = s; partVals = pv; partName = pn; + colNames = new ArrayList<String>(); + colKeys = null; + } + } + + // For testing without the cache + private static class BogusObjectCache<K, V> extends ObjectCache<K, V> { + static Counter bogus = new Counter("bogus"); + + BogusObjectCache() { + super(1, bogus, bogus, bogus); + } + + @Override + V get(K key) { + return null; + } + } + + private static class BogusPartitionCache extends PartitionCache { + static Counter bogus = new Counter("bogus"); + + BogusPartitionCache() { + super(1, bogus, bogus, bogus); + } + + @Override + Collection<Partition> getAllForTable(String dbName, String tableName) { + return null; + } + + @Override + Partition get(String dbName, String tableName, List<String> partVals) { + return null; + } + } +} Added: hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseSchemaTool.java URL: http://svn.apache.org/viewvc/hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseSchemaTool.java?rev=1657394&view=auto ============================================================================== --- hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseSchemaTool.java (added) +++ hive/branches/hbase-metastore/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseSchemaTool.java Wed Feb 4 20:00:49 2015 @@ -0,0 +1,219 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hive.metastore.hbase; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.api.ColumnStatistics; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Role; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.thrift.TBase; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.protocol.TSimpleJSONProtocol; +import org.apache.thrift.transport.TMemoryBuffer; + +import java.io.IOException; +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.List; + +/** + * A tool to dump contents from the HBase store in a human readable form + */ +public class HBaseSchemaTool { + + private static String[] commands = {"db", "part", "parts", "role", "table"}; + + public static void main(String[] args) throws Exception { + Options options = new Options(); + + options.addOption(OptionBuilder + .withLongOpt("column") + .withDescription("Comma separated list of column names") + .hasArg() + .create('c')); + + options.addOption(OptionBuilder + .withLongOpt("db") + .withDescription("Database name") + .hasArg() + .create('d')); + + options.addOption(OptionBuilder + .withLongOpt("help") + .withDescription("You're looking at it") + .create('h')); + + options.addOption(OptionBuilder + .withLongOpt("role") + .withDescription("Role name") + .hasArg() + .create('r')); + + options.addOption(OptionBuilder + .withLongOpt("partvals") + .withDescription("Comma separated list of partition values, in order of partition columns") + .hasArg() + .create('p')); + + options.addOption(OptionBuilder + .withLongOpt("stats") + .withDescription("Get statistics rather than catalog object") + .create('s')); + + options.addOption(OptionBuilder + .withLongOpt("table") + .withDescription("Table name") + .hasArg() + .create('t')); + + CommandLine cli = new GnuParser().parse(options, args); + + if (cli.hasOption('h')) { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp("hbaseschematool", options); + return; + } + + String[] cmds = cli.getArgs(); + if (cmds.length != 1) { + System.err.print("Must include a cmd, valid cmds are: "); + for (int i = 0; i < commands.length; i++) { + if (i != 0) System.err.print(", "); + System.err.print(commands[i]); + } + System.err.println(); + System.exit(1); + } + String cmd = cmds[0]; + + List<String> parts = null; + if (cli.hasOption('p')) { + parts = Arrays.asList(cli.getOptionValue('p').split(",")); + } + + List<String> cols = null; + if (cli.hasOption('c')) { + cols = Arrays.asList(cli.getOptionValue('c').split(",")); + } + + HBaseSchemaTool tool = new HBaseSchemaTool(cli.getOptionValue('d'), cli.getOptionValue('t'), + parts, cli.getOptionValue('r'), cols, cli.hasOption('s')); + Method method = tool.getClass().getMethod(cmd); + method.invoke(tool); + + + } + + private HBaseReadWrite hrw; + private String dbName; + private String tableName; + private List<String> partVals; + private String roleName; + private List<String> colNames; + private boolean hasStats; + + private HBaseSchemaTool(String dbname, String tn, List<String> pv, String rn, List<String> cn, + boolean s) { + dbName = dbname; + tableName = tn; + partVals = pv; + roleName = rn; + colNames = cn; + hasStats = s; + hrw = HBaseReadWrite.getInstance(new Configuration()); + } + + public void db() throws IOException, TException { + Database db = hrw.getDb(dbName); + if (db == null) System.err.println("No such database: " + db); + else dump(db); + } + + public void part() throws IOException, TException { + if (hasStats) { + Table table = hrw.getTable(dbName, tableName); + if (table == null) { + System.err.println("No such table: " + dbName + "." + tableName); + return; + } + String partName = HBaseStore.partName(table, partVals); + List<ColumnStatistics> stats = hrw.getPartitionStatistics(dbName, tableName, + Arrays.asList(partName), Arrays.asList(partVals), colNames); + if (stats == null) { + System.err.println("No stats for " + dbName + "." + tableName + "." + + StringUtils.join(partVals, ':')); + } else { + for (ColumnStatistics stat : stats) dump(stat); + } + } else { + Partition part = hrw.getPartition(dbName, tableName, partVals); + if (part == null) { + System.err.println("No such partition: " + dbName + "." + tableName + "." + + StringUtils.join(partVals, ':')); + } else { + dump(part); + } + } + } + + public void parts() throws IOException, TException { + List<Partition> parts = hrw.scanPartitionsInTable(dbName, tableName, -1); + if (parts == null) { + System.err.println("No such table: " + dbName + "." + tableName); + } else { + for (Partition p : parts) dump(p); + } + } + + public void role() throws IOException, TException { + Role role = hrw.getRole(roleName); + if (role == null) System.err.println("No such role: " + roleName); + else dump(role); + } + + public void table() throws IOException, TException { + if (hasStats) { + ColumnStatistics stats = hrw.getTableStatistics(dbName, tableName, colNames); + if (stats == null) System.err.println("No stats for " + dbName + "." + tableName); + else dump(stats); + } else { + Table table = hrw.getTable(dbName, tableName); + if (table == null) System.err.println("No such table: " + dbName + "." + tableName); + else dump(table); + } + } + + private void dump(TBase thriftObj) throws TException { + TMemoryBuffer buf = new TMemoryBuffer(1000); + TProtocol protocol = new TSimpleJSONProtocol(buf); + thriftObj.write(protocol); + System.out.println(new String(buf.getArray())); + } + + +}