Modified: hive/trunk/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb?rev=1657387&r1=1657386&r2=1657387&view=diff ============================================================================== --- hive/trunk/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb (original) +++ hive/trunk/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb Wed Feb 4 19:24:12 2015 @@ -2077,18 +2077,58 @@ class CurrentNotificationEventId ::Thrift::Struct.generate_accessors self end +class InsertEventRequestData + include ::Thrift::Struct, ::Thrift::Struct_Union + FILESADDED = 1 + + FIELDS = { + FILESADDED => {:type => ::Thrift::Types::LIST, :name => 'filesAdded', :element => {:type => ::Thrift::Types::STRING}} + } + + def struct_fields; FIELDS; end + + def validate + raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field filesAdded is unset!') unless @filesAdded + end + + ::Thrift::Struct.generate_accessors self +end + +class FireEventRequestData < ::Thrift::Union + include ::Thrift::Struct_Union + class << self + def insertData(val) + FireEventRequestData.new(:insertData, val) + end + end + + INSERTDATA = 1 + + FIELDS = { + INSERTDATA => {:type => ::Thrift::Types::STRUCT, :name => 'insertData', :class => ::InsertEventRequestData} + } + + def struct_fields; FIELDS; end + + def validate + raise(StandardError, 'Union fields are not set.') if get_set_field.nil? || get_value.nil? + end + + ::Thrift::Union.generate_accessors self +end + class FireEventRequest include ::Thrift::Struct, ::Thrift::Struct_Union - EVENTTYPE = 1 - DBNAME = 2 - SUCCESSFUL = 3 + SUCCESSFUL = 1 + DATA = 2 + DBNAME = 3 TABLENAME = 4 PARTITIONVALS = 5 FIELDS = { - EVENTTYPE => {:type => ::Thrift::Types::I32, :name => 'eventType', :enum_class => ::EventRequestType}, - DBNAME => {:type => ::Thrift::Types::STRING, :name => 'dbName'}, SUCCESSFUL => {:type => ::Thrift::Types::BOOL, :name => 'successful'}, + DATA => {:type => ::Thrift::Types::STRUCT, :name => 'data', :class => ::FireEventRequestData}, + DBNAME => {:type => ::Thrift::Types::STRING, :name => 'dbName', :optional => true}, TABLENAME => {:type => ::Thrift::Types::STRING, :name => 'tableName', :optional => true}, PARTITIONVALS => {:type => ::Thrift::Types::LIST, :name => 'partitionVals', :element => {:type => ::Thrift::Types::STRING}, :optional => true} } @@ -2096,12 +2136,23 @@ class FireEventRequest def struct_fields; FIELDS; end def validate - raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field eventType is unset!') unless @eventType - raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field dbName is unset!') unless @dbName raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field successful is unset!') if @successful.nil? - unless @eventType.nil? || ::EventRequestType::VALID_VALUES.include?(@eventType) - raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Invalid value of field eventType!') - end + raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field data is unset!') unless @data + end + + ::Thrift::Struct.generate_accessors self +end + +class FireEventResponse + include ::Thrift::Struct, ::Thrift::Struct_Union + + FIELDS = { + + } + + def struct_fields; FIELDS; end + + def validate end ::Thrift::Struct.generate_accessors self
Modified: hive/trunk/metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb?rev=1657387&r1=1657386&r2=1657387&view=diff ============================================================================== --- hive/trunk/metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb (original) +++ hive/trunk/metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb Wed Feb 4 19:24:12 2015 @@ -2007,18 +2007,19 @@ module ThriftHiveMetastore raise ::Thrift::ApplicationException.new(::Thrift::ApplicationException::MISSING_RESULT, 'get_current_notificationEventId failed: unknown result') end - def fire_notification_event(rqst) - send_fire_notification_event(rqst) - recv_fire_notification_event() + def fire_listener_event(rqst) + send_fire_listener_event(rqst) + return recv_fire_listener_event() end - def send_fire_notification_event(rqst) - send_message('fire_notification_event', Fire_notification_event_args, :rqst => rqst) + def send_fire_listener_event(rqst) + send_message('fire_listener_event', Fire_listener_event_args, :rqst => rqst) end - def recv_fire_notification_event() - result = receive_message(Fire_notification_event_result) - return + def recv_fire_listener_event() + result = receive_message(Fire_listener_event_result) + return result.success unless result.success.nil? + raise ::Thrift::ApplicationException.new(::Thrift::ApplicationException::MISSING_RESULT, 'fire_listener_event failed: unknown result') end end @@ -3551,11 +3552,11 @@ module ThriftHiveMetastore write_result(result, oprot, 'get_current_notificationEventId', seqid) end - def process_fire_notification_event(seqid, iprot, oprot) - args = read_args(iprot, Fire_notification_event_args) - result = Fire_notification_event_result.new() - @handler.fire_notification_event(args.rqst) - write_result(result, oprot, 'fire_notification_event', seqid) + def process_fire_listener_event(seqid, iprot, oprot) + args = read_args(iprot, Fire_listener_event_args) + result = Fire_listener_event_result.new() + result.success = @handler.fire_listener_event(args.rqst) + write_result(result, oprot, 'fire_listener_event', seqid) end end @@ -8109,7 +8110,7 @@ module ThriftHiveMetastore ::Thrift::Struct.generate_accessors self end - class Fire_notification_event_args + class Fire_listener_event_args include ::Thrift::Struct, ::Thrift::Struct_Union RQST = 1 @@ -8125,11 +8126,12 @@ module ThriftHiveMetastore ::Thrift::Struct.generate_accessors self end - class Fire_notification_event_result + class Fire_listener_event_result include ::Thrift::Struct, ::Thrift::Struct_Union + SUCCESS = 0 FIELDS = { - + SUCCESS => {:type => ::Thrift::Types::STRUCT, :name => 'success', :class => ::FireEventResponse} } def struct_fields; FIELDS; end Modified: hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java?rev=1657387&r1=1657386&r2=1657387&view=diff ============================================================================== --- hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java (original) +++ hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java Wed Feb 4 19:24:12 2015 @@ -85,6 +85,7 @@ import org.apache.hadoop.hive.metastore. import org.apache.hadoop.hive.metastore.api.EnvironmentContext; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.FireEventRequest; +import org.apache.hadoop.hive.metastore.api.FireEventResponse; import org.apache.hadoop.hive.metastore.api.Function; import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse; @@ -5559,19 +5560,20 @@ public class HiveMetaStore extends Thrif } @Override - public void fire_notification_event(FireEventRequest rqst) throws TException { - switch (rqst.getEventType()) { - case INSERT: + public FireEventResponse fire_listener_event(FireEventRequest rqst) throws TException { + switch (rqst.getData().getSetField()) { + case INSERT_DATA: InsertEvent event = new InsertEvent(rqst.getDbName(), rqst.getTableName(), - rqst.getPartitionVals(), rqst.isSuccessful(), this); + rqst.getPartitionVals(), rqst.getData().getInsertData().getFilesAdded(), + rqst.isSuccessful(), this); for (MetaStoreEventListener listener : listeners) { listener.onInsert(event); } - break; + return new FireEventResponse(); default: - throw new TException("Event type " + rqst.getEventType().toString() + " not currently " + - "supported."); + throw new TException("Event type " + rqst.getData().getSetField().toString() + + " not currently supported."); } } Modified: hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java?rev=1657387&r1=1657386&r2=1657387&view=diff ============================================================================== --- hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java (original) +++ hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java Wed Feb 4 19:24:12 2015 @@ -71,6 +71,7 @@ import org.apache.hadoop.hive.metastore. import org.apache.hadoop.hive.metastore.api.EnvironmentContext; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.FireEventRequest; +import org.apache.hadoop.hive.metastore.api.FireEventResponse; import org.apache.hadoop.hive.metastore.api.Function; import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; import org.apache.hadoop.hive.metastore.api.GetPrincipalsInRoleRequest; @@ -1874,8 +1875,8 @@ public class HiveMetaStoreClient impleme } @Override - public void fireNotificationEvent(FireEventRequest rqst) throws TException { - client.fire_notification_event(rqst); + public FireEventResponse fireListenerEvent(FireEventRequest rqst) throws TException { + return client.fire_listener_event(rqst); } /** Modified: hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java?rev=1657387&r1=1657386&r2=1657387&view=diff ============================================================================== --- hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java (original) +++ hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java Wed Feb 4 19:24:12 2015 @@ -18,31 +18,47 @@ package org.apache.hadoop.hive.metastore; + +import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.CompactionType; +import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; +import org.apache.hadoop.hive.metastore.api.FireEventRequest; +import org.apache.hadoop.hive.metastore.api.FireEventResponse; +import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; +import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse; +import org.apache.hadoop.hive.metastore.api.LockRequest; +import org.apache.hadoop.hive.metastore.api.LockResponse; +import org.apache.hadoop.hive.metastore.api.NoSuchLockException; +import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; +import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse; +import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; +import org.apache.hadoop.hive.metastore.api.ShowLocksResponse; +import org.apache.hadoop.hive.metastore.api.TxnAbortedException; +import org.apache.hadoop.hive.metastore.api.TxnOpenException; +import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; +import org.apache.thrift.TException; + import java.util.List; import java.util.Map; import org.apache.hadoop.hive.common.ObjectPair; -import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.classification.InterfaceAudience.Public; import org.apache.hadoop.hive.common.classification.InterfaceStability.Evolving; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.AggrStats; import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; -import org.apache.hadoop.hive.metastore.api.CompactionType; import org.apache.hadoop.hive.metastore.api.ConfigValSecurityException; -import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; -import org.apache.hadoop.hive.metastore.api.FireEventRequest; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Function; -import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; import org.apache.hadoop.hive.metastore.api.GetPrincipalsInRoleRequest; import org.apache.hadoop.hive.metastore.api.GetPrincipalsInRoleResponse; import org.apache.hadoop.hive.metastore.api.GetRoleGrantsForPrincipalRequest; import org.apache.hadoop.hive.metastore.api.GetRoleGrantsForPrincipalResponse; -import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse; import org.apache.hadoop.hive.metastore.api.HiveObjectPrivilege; import org.apache.hadoop.hive.metastore.api.HiveObjectRef; import org.apache.hadoop.hive.metastore.api.Index; @@ -50,15 +66,8 @@ import org.apache.hadoop.hive.metastore. import org.apache.hadoop.hive.metastore.api.InvalidObjectException; import org.apache.hadoop.hive.metastore.api.InvalidOperationException; import org.apache.hadoop.hive.metastore.api.InvalidPartitionException; -import org.apache.hadoop.hive.metastore.api.LockRequest; -import org.apache.hadoop.hive.metastore.api.LockResponse; import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.NoSuchLockException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; -import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; -import org.apache.hadoop.hive.metastore.api.NotificationEvent; -import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; -import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.PartitionEventType; import org.apache.hadoop.hive.metastore.api.PrincipalPrivilegeSet; @@ -66,16 +75,10 @@ import org.apache.hadoop.hive.metastore. import org.apache.hadoop.hive.metastore.api.PrivilegeBag; import org.apache.hadoop.hive.metastore.api.Role; import org.apache.hadoop.hive.metastore.api.SetPartitionsStatsRequest; -import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; -import org.apache.hadoop.hive.metastore.api.ShowLocksResponse; import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.metastore.api.TxnAbortedException; -import org.apache.hadoop.hive.metastore.api.TxnOpenException; import org.apache.hadoop.hive.metastore.api.UnknownDBException; import org.apache.hadoop.hive.metastore.api.UnknownPartitionException; import org.apache.hadoop.hive.metastore.api.UnknownTableException; -import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; -import org.apache.thrift.TException; /** * Wrapper around hive metastore thrift api @@ -1350,9 +1353,10 @@ public interface IMetaStoreClient { * Request that the metastore fire an event. Currently this is only supported for DML * operations, since the metastore knows when DDL operations happen. * @param request + * @return response, type depends on type of request * @throws TException */ - void fireNotificationEvent(FireEventRequest request) throws TException; + FireEventResponse fireListenerEvent(FireEventRequest request) throws TException; class IncompatibleMetastoreException extends MetaException { IncompatibleMetastoreException(String message) { Modified: hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java?rev=1657387&r1=1657386&r2=1657387&view=diff ============================================================================== --- hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java (original) +++ hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java Wed Feb 4 19:24:12 2015 @@ -34,6 +34,7 @@ public class InsertEvent extends Listene private final String db; private final String table; private final List<String> partVals; + private final List<String> files; /** * @@ -43,12 +44,13 @@ public class InsertEvent extends Listene * @param status status of insert, true = success, false = failure * @param handler handler that is firing the event */ - public InsertEvent(String db, String table, List<String> partitions, boolean status, - HMSHandler handler) { + public InsertEvent(String db, String table, List<String> partitions, List<String> files, + boolean status, HMSHandler handler) { super(status, handler); this.db = db; this.table = table; this.partVals = partitions; + this.files = files; } public String getDb() { @@ -67,4 +69,12 @@ public class InsertEvent extends Listene public List<String> getPartitions() { return partVals; } + + /** + * Get list of files created as a result of this DML operation + * @return list of new files + */ + public List<String> getFiles() { + return files; + } } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java?rev=1657387&r1=1657386&r2=1657387&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java Wed Feb 4 19:24:12 2015 @@ -73,7 +73,10 @@ import org.apache.hadoop.hive.metastore. import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.CompactionType; import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.EventRequestType; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.FireEventRequest; +import org.apache.hadoop.hive.metastore.api.FireEventRequestData; import org.apache.hadoop.hive.metastore.api.Function; import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; import org.apache.hadoop.hive.metastore.api.GetRoleGrantsForPrincipalRequest; @@ -82,6 +85,7 @@ import org.apache.hadoop.hive.metastore. import org.apache.hadoop.hive.metastore.api.HiveObjectRef; import org.apache.hadoop.hive.metastore.api.HiveObjectType; import org.apache.hadoop.hive.metastore.api.Index; +import org.apache.hadoop.hive.metastore.api.InsertEventRequestData; import org.apache.hadoop.hive.metastore.api.InvalidOperationException; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; @@ -1307,6 +1311,7 @@ public class Hive { * location/inputformat/outputformat/serde details from table spec * @param isSrcLocal * If the source directory is LOCAL + * @param isAcid true if this is an ACID operation */ public Partition loadPartition(Path loadPath, Table tbl, Map<String, String> partSpec, boolean replace, boolean holdDDLTime, @@ -1354,16 +1359,19 @@ public class Hive { newPartPath = oldPartPath; } + List<Path> newFiles = null; if (replace) { Hive.replaceFiles(tbl.getPath(), loadPath, newPartPath, oldPartPath, getConf(), isSrcLocal); } else { + newFiles = new ArrayList<Path>(); FileSystem fs = tbl.getDataLocation().getFileSystem(conf); - Hive.copyFiles(conf, loadPath, newPartPath, fs, isSrcLocal, isAcid); + Hive.copyFiles(conf, loadPath, newPartPath, fs, isSrcLocal, isAcid, newFiles); } boolean forceCreate = (!holdDDLTime) ? true : false; - newTPart = getPartition(tbl, partSpec, forceCreate, newPartPath.toString(), inheritTableSpecs); + newTPart = getPartition(tbl, partSpec, forceCreate, newPartPath.toString(), + inheritTableSpecs, newFiles); // recreate the partition if it existed before if (!holdDDLTime) { if (isSkewedStoreAsSubdir) { @@ -1376,7 +1384,8 @@ public class Hive { skewedInfo.setSkewedColValueLocationMaps(skewedColValueLocationMaps); newCreatedTpart.getSd().setSkewedInfo(skewedInfo); alterPartition(tbl.getDbName(), tbl.getTableName(), new Partition(tbl, newCreatedTpart)); - newTPart = getPartition(tbl, partSpec, true, newPartPath.toString(), inheritTableSpecs); + newTPart = getPartition(tbl, partSpec, true, newPartPath.toString(), inheritTableSpecs, + newFiles); return new Partition(tbl, newCreatedTpart); } } @@ -1486,6 +1495,8 @@ private void constructOneLBLocationMap(F * @param replace * @param numDP number of dynamic partitions * @param holdDDLTime + * @param listBucketingEnabled + * @param isAcid true if this is an ACID operation * @return partition map details (PartitionSpec and Partition) * @throws HiveException */ @@ -1577,11 +1588,12 @@ private void constructOneLBLocationMap(F public void loadTable(Path loadPath, String tableName, boolean replace, boolean holdDDLTime, boolean isSrcLocal, boolean isSkewedStoreAsSubdir, boolean isAcid) throws HiveException { + List<Path> newFiles = new ArrayList<Path>(); Table tbl = getTable(tableName); if (replace) { tbl.replaceFiles(loadPath, isSrcLocal); } else { - tbl.copyFiles(loadPath, isSrcLocal, isAcid); + tbl.copyFiles(loadPath, isSrcLocal, isAcid, newFiles); tbl.getParameters().put(StatsSetupConst.STATS_GENERATED_VIA_STATS_TASK, "true"); } @@ -1606,6 +1618,7 @@ private void constructOneLBLocationMap(F throw new HiveException(e); } } + fireInsertEvent(tbl, null, newFiles); } /** @@ -1693,7 +1706,7 @@ private void constructOneLBLocationMap(F public Partition getPartition(Table tbl, Map<String, String> partSpec, boolean forceCreate) throws HiveException { - return getPartition(tbl, partSpec, forceCreate, null, true); + return getPartition(tbl, partSpec, forceCreate, null, true, null); } /** @@ -1711,8 +1724,32 @@ private void constructOneLBLocationMap(F * @return result partition object or null if there is no partition * @throws HiveException */ + public Partition getPartition(Table tbl, Map<String, String> partSpec, boolean forceCreate, + String partPath, boolean inheritTableSpecs) + throws HiveException { + return getPartition(tbl, partSpec, forceCreate, partPath, inheritTableSpecs, null); + } + + /** + * Returns partition metadata + * + * @param tbl + * the partition's table + * @param partSpec + * partition keys and values + * @param forceCreate + * if this is true and partition doesn't exist then a partition is + * created + * @param partPath the path where the partition data is located + * @param inheritTableSpecs whether to copy over the table specs for if/of/serde + * @param newFiles An optional list of new files that were moved into this partition. If + * non-null these will be included in the DML event sent to the metastore. + * @return result partition object or null if there is no partition + * @throws HiveException + */ public Partition getPartition(Table tbl, Map<String, String> partSpec, - boolean forceCreate, String partPath, boolean inheritTableSpecs) throws HiveException { + boolean forceCreate, String partPath, boolean inheritTableSpecs, List<Path> newFiles) + throws HiveException { tbl.validatePartColumnNames(partSpec, true); List<String> pvals = new ArrayList<String>(); for (FieldSchema field : tbl.getPartCols()) { @@ -1772,6 +1809,7 @@ private void constructOneLBLocationMap(F } else { alterPartitionSpec(tbl, partSpec, tpart, inheritTableSpecs, partPath); + fireInsertEvent(tbl, partSpec, newFiles); } } if (tpart == null) { @@ -1813,6 +1851,36 @@ private void constructOneLBLocationMap(F alterPartition(fullName, new Partition(tbl, tpart)); } + private void fireInsertEvent(Table tbl, Map<String, String> partitionSpec, List<Path> newFiles) + throws HiveException { + if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML)) { + LOG.debug("Firing dml insert event"); + FireEventRequestData data = new FireEventRequestData(); + InsertEventRequestData insertData = new InsertEventRequestData(); + data.setInsertData(insertData); + if (newFiles != null && newFiles.size() > 0) { + for (Path p : newFiles) { + insertData.addToFilesAdded(p.toString()); + } + } + FireEventRequest rqst = new FireEventRequest(true, data); + rqst.setDbName(tbl.getDbName()); + rqst.setTableName(tbl.getTableName()); + if (partitionSpec != null && partitionSpec.size() > 0) { + List<String> partVals = new ArrayList<String>(partitionSpec.size()); + for (FieldSchema fs : tbl.getPartitionKeys()) { + partVals.add(partitionSpec.get(fs.getName())); + } + rqst.setPartitionVals(partVals); + } + try { + getMSC().fireListenerEvent(rqst); + } catch (TException e) { + throw new HiveException(e); + } + } + } + public boolean dropPartition(String tblName, List<String> part_vals, boolean deleteData) throws HiveException { String[] names = Utilities.getDbTableName(tblName); @@ -2502,10 +2570,12 @@ private void constructOneLBLocationMap(F * @param fs Filesystem * @param isSrcLocal true if source is on local file system * @param isAcid true if this is an ACID based write + * @param newFiles if this is non-null, a list of files that were created as a result of this + * move will be returned. * @throws HiveException */ static protected void copyFiles(HiveConf conf, Path srcf, Path destf, - FileSystem fs, boolean isSrcLocal, boolean isAcid) throws HiveException { + FileSystem fs, boolean isSrcLocal, boolean isAcid, List<Path> newFiles) throws HiveException { boolean inheritPerms = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS); try { @@ -2537,7 +2607,7 @@ private void constructOneLBLocationMap(F // If we're moving files around for an ACID write then the rules and paths are all different. // You can blame this on Owen. if (isAcid) { - moveAcidFiles(srcFs, srcs, destf); + moveAcidFiles(srcFs, srcs, destf, newFiles); } else { // check that source and target paths exist List<List<Path[]>> result = checkPaths(conf, fs, srcs, srcFs, destf, false); @@ -2549,6 +2619,7 @@ private void constructOneLBLocationMap(F throw new IOException("Cannot move " + sdpair[0] + " to " + sdpair[1]); } + if (newFiles != null) newFiles.add(sdpair[1]); } } } catch (IOException e) { @@ -2557,8 +2628,8 @@ private void constructOneLBLocationMap(F } } - private static void moveAcidFiles(FileSystem fs, FileStatus[] stats, Path dst) - throws HiveException { + private static void moveAcidFiles(FileSystem fs, FileStatus[] stats, Path dst, + List<Path> newFiles) throws HiveException { // The layout for ACID files is table|partname/base|delta/bucket // We will always only be writing delta files. In the buckets created by FileSinkOperator // it will look like bucket/delta/bucket. So we need to move that into the above structure. @@ -2622,6 +2693,7 @@ private void constructOneLBLocationMap(F LOG.info("Moving bucket " + bucketSrc.toUri().toString() + " to " + bucketDest.toUri().toString()); fs.rename(bucketSrc, bucketDest); + if (newFiles != null) newFiles.add(bucketDest); } } catch (IOException e) { throw new HiveException("Error moving acid files " + e.getMessage(), e); Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java?rev=1657387&r1=1657386&r2=1657387&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java Wed Feb 4 19:24:12 2015 @@ -650,12 +650,15 @@ public class Table implements Serializab * If the source directory is LOCAL * @param isAcid * True if this is an ACID based insert, update, or delete + * @param newFiles optional list of paths. If non-null, then all files copyied to the table + * will be added to this list. */ - protected void copyFiles(Path srcf, boolean isSrcLocal, boolean isAcid) throws HiveException { + protected void copyFiles(Path srcf, boolean isSrcLocal, boolean isAcid, List<Path> newFiles) + throws HiveException { FileSystem fs; try { fs = getDataLocation().getFileSystem(Hive.get().getConf()); - Hive.copyFiles(Hive.get().getConf(), srcf, getPath(), fs, isSrcLocal, isAcid); + Hive.copyFiles(Hive.get().getConf(), srcf, getPath(), fs, isSrcLocal, isAcid, newFiles); } catch (IOException e) { throw new HiveException("addFiles: filesystem error in check phase", e); }