Modified: hive/trunk/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
URL: 
http://svn.apache.org/viewvc/hive/trunk/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb?rev=1657387&r1=1657386&r2=1657387&view=diff
==============================================================================
--- hive/trunk/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb 
(original)
+++ hive/trunk/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb Wed Feb  
4 19:24:12 2015
@@ -2077,18 +2077,58 @@ class CurrentNotificationEventId
   ::Thrift::Struct.generate_accessors self
 end
 
+class InsertEventRequestData
+  include ::Thrift::Struct, ::Thrift::Struct_Union
+  FILESADDED = 1
+
+  FIELDS = {
+    FILESADDED => {:type => ::Thrift::Types::LIST, :name => 'filesAdded', 
:element => {:type => ::Thrift::Types::STRING}}
+  }
+
+  def struct_fields; FIELDS; end
+
+  def validate
+    raise 
::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required 
field filesAdded is unset!') unless @filesAdded
+  end
+
+  ::Thrift::Struct.generate_accessors self
+end
+
+class FireEventRequestData < ::Thrift::Union
+  include ::Thrift::Struct_Union
+  class << self
+    def insertData(val)
+      FireEventRequestData.new(:insertData, val)
+    end
+  end
+
+  INSERTDATA = 1
+
+  FIELDS = {
+    INSERTDATA => {:type => ::Thrift::Types::STRUCT, :name => 'insertData', 
:class => ::InsertEventRequestData}
+  }
+
+  def struct_fields; FIELDS; end
+
+  def validate
+    raise(StandardError, 'Union fields are not set.') if get_set_field.nil? || 
get_value.nil?
+  end
+
+  ::Thrift::Union.generate_accessors self
+end
+
 class FireEventRequest
   include ::Thrift::Struct, ::Thrift::Struct_Union
-  EVENTTYPE = 1
-  DBNAME = 2
-  SUCCESSFUL = 3
+  SUCCESSFUL = 1
+  DATA = 2
+  DBNAME = 3
   TABLENAME = 4
   PARTITIONVALS = 5
 
   FIELDS = {
-    EVENTTYPE => {:type => ::Thrift::Types::I32, :name => 'eventType', 
:enum_class => ::EventRequestType},
-    DBNAME => {:type => ::Thrift::Types::STRING, :name => 'dbName'},
     SUCCESSFUL => {:type => ::Thrift::Types::BOOL, :name => 'successful'},
+    DATA => {:type => ::Thrift::Types::STRUCT, :name => 'data', :class => 
::FireEventRequestData},
+    DBNAME => {:type => ::Thrift::Types::STRING, :name => 'dbName', :optional 
=> true},
     TABLENAME => {:type => ::Thrift::Types::STRING, :name => 'tableName', 
:optional => true},
     PARTITIONVALS => {:type => ::Thrift::Types::LIST, :name => 
'partitionVals', :element => {:type => ::Thrift::Types::STRING}, :optional => 
true}
   }
@@ -2096,12 +2136,23 @@ class FireEventRequest
   def struct_fields; FIELDS; end
 
   def validate
-    raise 
::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required 
field eventType is unset!') unless @eventType
-    raise 
::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required 
field dbName is unset!') unless @dbName
     raise 
::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required 
field successful is unset!') if @successful.nil?
-    unless @eventType.nil? || 
::EventRequestType::VALID_VALUES.include?(@eventType)
-      raise 
::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Invalid 
value of field eventType!')
-    end
+    raise 
::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required 
field data is unset!') unless @data
+  end
+
+  ::Thrift::Struct.generate_accessors self
+end
+
+class FireEventResponse
+  include ::Thrift::Struct, ::Thrift::Struct_Union
+
+  FIELDS = {
+
+  }
+
+  def struct_fields; FIELDS; end
+
+  def validate
   end
 
   ::Thrift::Struct.generate_accessors self

Modified: hive/trunk/metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb
URL: 
http://svn.apache.org/viewvc/hive/trunk/metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb?rev=1657387&r1=1657386&r2=1657387&view=diff
==============================================================================
--- hive/trunk/metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb 
(original)
+++ hive/trunk/metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb Wed Feb 
 4 19:24:12 2015
@@ -2007,18 +2007,19 @@ module ThriftHiveMetastore
       raise 
::Thrift::ApplicationException.new(::Thrift::ApplicationException::MISSING_RESULT,
 'get_current_notificationEventId failed: unknown result')
     end
 
-    def fire_notification_event(rqst)
-      send_fire_notification_event(rqst)
-      recv_fire_notification_event()
+    def fire_listener_event(rqst)
+      send_fire_listener_event(rqst)
+      return recv_fire_listener_event()
     end
 
-    def send_fire_notification_event(rqst)
-      send_message('fire_notification_event', Fire_notification_event_args, 
:rqst => rqst)
+    def send_fire_listener_event(rqst)
+      send_message('fire_listener_event', Fire_listener_event_args, :rqst => 
rqst)
     end
 
-    def recv_fire_notification_event()
-      result = receive_message(Fire_notification_event_result)
-      return
+    def recv_fire_listener_event()
+      result = receive_message(Fire_listener_event_result)
+      return result.success unless result.success.nil?
+      raise 
::Thrift::ApplicationException.new(::Thrift::ApplicationException::MISSING_RESULT,
 'fire_listener_event failed: unknown result')
     end
 
   end
@@ -3551,11 +3552,11 @@ module ThriftHiveMetastore
       write_result(result, oprot, 'get_current_notificationEventId', seqid)
     end
 
-    def process_fire_notification_event(seqid, iprot, oprot)
-      args = read_args(iprot, Fire_notification_event_args)
-      result = Fire_notification_event_result.new()
-      @handler.fire_notification_event(args.rqst)
-      write_result(result, oprot, 'fire_notification_event', seqid)
+    def process_fire_listener_event(seqid, iprot, oprot)
+      args = read_args(iprot, Fire_listener_event_args)
+      result = Fire_listener_event_result.new()
+      result.success = @handler.fire_listener_event(args.rqst)
+      write_result(result, oprot, 'fire_listener_event', seqid)
     end
 
   end
@@ -8109,7 +8110,7 @@ module ThriftHiveMetastore
     ::Thrift::Struct.generate_accessors self
   end
 
-  class Fire_notification_event_args
+  class Fire_listener_event_args
     include ::Thrift::Struct, ::Thrift::Struct_Union
     RQST = 1
 
@@ -8125,11 +8126,12 @@ module ThriftHiveMetastore
     ::Thrift::Struct.generate_accessors self
   end
 
-  class Fire_notification_event_result
+  class Fire_listener_event_result
     include ::Thrift::Struct, ::Thrift::Struct_Union
+    SUCCESS = 0
 
     FIELDS = {
-
+      SUCCESS => {:type => ::Thrift::Types::STRUCT, :name => 'success', :class 
=> ::FireEventResponse}
     }
 
     def struct_fields; FIELDS; end

Modified: 
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java?rev=1657387&r1=1657386&r2=1657387&view=diff
==============================================================================
--- 
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
 (original)
+++ 
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
 Wed Feb  4 19:24:12 2015
@@ -85,6 +85,7 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.FireEventRequest;
+import org.apache.hadoop.hive.metastore.api.FireEventResponse;
 import org.apache.hadoop.hive.metastore.api.Function;
 import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
 import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse;
@@ -5559,19 +5560,20 @@ public class HiveMetaStore extends Thrif
     }
 
     @Override
-    public void fire_notification_event(FireEventRequest rqst) throws 
TException {
-      switch (rqst.getEventType()) {
-        case INSERT:
+    public FireEventResponse fire_listener_event(FireEventRequest rqst) throws 
TException {
+      switch (rqst.getData().getSetField()) {
+        case INSERT_DATA:
           InsertEvent event = new InsertEvent(rqst.getDbName(), 
rqst.getTableName(),
-              rqst.getPartitionVals(), rqst.isSuccessful(), this);
+              rqst.getPartitionVals(), 
rqst.getData().getInsertData().getFilesAdded(),
+              rqst.isSuccessful(), this);
           for (MetaStoreEventListener listener : listeners) {
             listener.onInsert(event);
           }
-          break;
+          return new FireEventResponse();
 
         default:
-          throw new TException("Event type " + rqst.getEventType().toString() 
+ " not currently " +
-              "supported.");
+          throw new TException("Event type " + 
rqst.getData().getSetField().toString() +
+              " not currently supported.");
       }
 
     }

Modified: 
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java?rev=1657387&r1=1657386&r2=1657387&view=diff
==============================================================================
--- 
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
 (original)
+++ 
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
 Wed Feb  4 19:24:12 2015
@@ -71,6 +71,7 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.FireEventRequest;
+import org.apache.hadoop.hive.metastore.api.FireEventResponse;
 import org.apache.hadoop.hive.metastore.api.Function;
 import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
 import org.apache.hadoop.hive.metastore.api.GetPrincipalsInRoleRequest;
@@ -1874,8 +1875,8 @@ public class HiveMetaStoreClient impleme
   }
 
   @Override
-  public void fireNotificationEvent(FireEventRequest rqst) throws TException {
-    client.fire_notification_event(rqst);
+  public FireEventResponse fireListenerEvent(FireEventRequest rqst) throws 
TException {
+    return client.fire_listener_event(rqst);
   }
 
   /**

Modified: 
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java?rev=1657387&r1=1657386&r2=1657387&view=diff
==============================================================================
--- 
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
 (original)
+++ 
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
 Wed Feb  4 19:24:12 2015
@@ -18,31 +18,47 @@
 
 package org.apache.hadoop.hive.metastore;
 
+
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
+import org.apache.hadoop.hive.metastore.api.FireEventRequest;
+import org.apache.hadoop.hive.metastore.api.FireEventResponse;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
+import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.NoSuchLockException;
+import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
+import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
+import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
+import org.apache.hadoop.hive.metastore.api.TxnOpenException;
+import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
+import org.apache.thrift.TException;
+
 import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.hive.common.ObjectPair;
-import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.classification.InterfaceAudience.Public;
 import 
org.apache.hadoop.hive.common.classification.InterfaceStability.Evolving;
-import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.AggrStats;
 import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
 import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
-import org.apache.hadoop.hive.metastore.api.CompactionType;
 import org.apache.hadoop.hive.metastore.api.ConfigValSecurityException;
-import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
-import org.apache.hadoop.hive.metastore.api.FireEventRequest;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Function;
-import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
 import org.apache.hadoop.hive.metastore.api.GetPrincipalsInRoleRequest;
 import org.apache.hadoop.hive.metastore.api.GetPrincipalsInRoleResponse;
 import org.apache.hadoop.hive.metastore.api.GetRoleGrantsForPrincipalRequest;
 import org.apache.hadoop.hive.metastore.api.GetRoleGrantsForPrincipalResponse;
-import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse;
 import org.apache.hadoop.hive.metastore.api.HiveObjectPrivilege;
 import org.apache.hadoop.hive.metastore.api.HiveObjectRef;
 import org.apache.hadoop.hive.metastore.api.Index;
@@ -50,15 +66,8 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
 import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
 import org.apache.hadoop.hive.metastore.api.InvalidPartitionException;
-import org.apache.hadoop.hive.metastore.api.LockRequest;
-import org.apache.hadoop.hive.metastore.api.LockResponse;
 import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.NoSuchLockException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
-import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
-import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
-import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.PartitionEventType;
 import org.apache.hadoop.hive.metastore.api.PrincipalPrivilegeSet;
@@ -66,16 +75,10 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.metastore.api.PrivilegeBag;
 import org.apache.hadoop.hive.metastore.api.Role;
 import org.apache.hadoop.hive.metastore.api.SetPartitionsStatsRequest;
-import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
-import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
 import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
-import org.apache.hadoop.hive.metastore.api.TxnOpenException;
 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
 import org.apache.hadoop.hive.metastore.api.UnknownPartitionException;
 import org.apache.hadoop.hive.metastore.api.UnknownTableException;
-import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
-import org.apache.thrift.TException;
 
 /**
  * Wrapper around hive metastore thrift api
@@ -1350,9 +1353,10 @@ public interface IMetaStoreClient {
    * Request that the metastore fire an event.  Currently this is only 
supported for DML
    * operations, since the metastore knows when DDL operations happen.
    * @param request
+   * @return response, type depends on type of request
    * @throws TException
    */
-  void fireNotificationEvent(FireEventRequest request) throws TException;
+  FireEventResponse fireListenerEvent(FireEventRequest request) throws 
TException;
 
   class IncompatibleMetastoreException extends MetaException {
     IncompatibleMetastoreException(String message) {

Modified: 
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java?rev=1657387&r1=1657386&r2=1657387&view=diff
==============================================================================
--- 
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java
 (original)
+++ 
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java
 Wed Feb  4 19:24:12 2015
@@ -34,6 +34,7 @@ public class InsertEvent extends Listene
   private final String db;
   private final String table;
   private final List<String> partVals;
+  private final List<String> files;
 
   /**
    *
@@ -43,12 +44,13 @@ public class InsertEvent extends Listene
    * @param status status of insert, true = success, false = failure
    * @param handler handler that is firing the event
    */
-  public InsertEvent(String db, String table, List<String> partitions, boolean 
status,
-                     HMSHandler handler) {
+  public InsertEvent(String db, String table, List<String> partitions, 
List<String> files,
+                     boolean status, HMSHandler handler) {
     super(status, handler);
     this.db = db;
     this.table = table;
     this.partVals = partitions;
+    this.files = files;
   }
 
   public String getDb() {
@@ -67,4 +69,12 @@ public class InsertEvent extends Listene
   public List<String> getPartitions() {
     return partVals;
   }
+
+  /**
+   * Get list of files created as a result of this DML operation
+   * @return list of new files
+   */
+  public List<String> getFiles() {
+    return files;
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java?rev=1657387&r1=1657386&r2=1657387&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java 
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java Wed Feb 
 4 19:24:12 2015
@@ -73,7 +73,10 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.CompactionType;
 import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.EventRequestType;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.FireEventRequest;
+import org.apache.hadoop.hive.metastore.api.FireEventRequestData;
 import org.apache.hadoop.hive.metastore.api.Function;
 import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
 import org.apache.hadoop.hive.metastore.api.GetRoleGrantsForPrincipalRequest;
@@ -82,6 +85,7 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.metastore.api.HiveObjectRef;
 import org.apache.hadoop.hive.metastore.api.HiveObjectType;
 import org.apache.hadoop.hive.metastore.api.Index;
+import org.apache.hadoop.hive.metastore.api.InsertEventRequestData;
 import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
@@ -1307,6 +1311,7 @@ public class Hive {
    *          location/inputformat/outputformat/serde details from table spec
    * @param isSrcLocal
    *          If the source directory is LOCAL
+   * @param isAcid true if this is an ACID operation
    */
   public Partition loadPartition(Path loadPath, Table tbl,
       Map<String, String> partSpec, boolean replace, boolean holdDDLTime,
@@ -1354,16 +1359,19 @@ public class Hive {
         newPartPath = oldPartPath;
       }
 
+      List<Path> newFiles = null;
       if (replace) {
         Hive.replaceFiles(tbl.getPath(), loadPath, newPartPath, oldPartPath, 
getConf(),
             isSrcLocal);
       } else {
+        newFiles = new ArrayList<Path>();
         FileSystem fs = tbl.getDataLocation().getFileSystem(conf);
-        Hive.copyFiles(conf, loadPath, newPartPath, fs, isSrcLocal, isAcid);
+        Hive.copyFiles(conf, loadPath, newPartPath, fs, isSrcLocal, isAcid, 
newFiles);
       }
 
       boolean forceCreate = (!holdDDLTime) ? true : false;
-      newTPart = getPartition(tbl, partSpec, forceCreate, 
newPartPath.toString(), inheritTableSpecs);
+      newTPart = getPartition(tbl, partSpec, forceCreate, 
newPartPath.toString(),
+          inheritTableSpecs, newFiles);
       // recreate the partition if it existed before
       if (!holdDDLTime) {
         if (isSkewedStoreAsSubdir) {
@@ -1376,7 +1384,8 @@ public class Hive {
           skewedInfo.setSkewedColValueLocationMaps(skewedColValueLocationMaps);
           newCreatedTpart.getSd().setSkewedInfo(skewedInfo);
           alterPartition(tbl.getDbName(), tbl.getTableName(), new 
Partition(tbl, newCreatedTpart));
-          newTPart = getPartition(tbl, partSpec, true, newPartPath.toString(), 
inheritTableSpecs);
+          newTPart = getPartition(tbl, partSpec, true, newPartPath.toString(), 
inheritTableSpecs,
+              newFiles);
           return new Partition(tbl, newCreatedTpart);
         }
       }
@@ -1486,6 +1495,8 @@ private void constructOneLBLocationMap(F
    * @param replace
    * @param numDP number of dynamic partitions
    * @param holdDDLTime
+   * @param listBucketingEnabled
+   * @param isAcid true if this is an ACID operation
    * @return partition map details (PartitionSpec and Partition)
    * @throws HiveException
    */
@@ -1577,11 +1588,12 @@ private void constructOneLBLocationMap(F
   public void loadTable(Path loadPath, String tableName, boolean replace,
       boolean holdDDLTime, boolean isSrcLocal, boolean isSkewedStoreAsSubdir, 
boolean isAcid)
       throws HiveException {
+    List<Path> newFiles = new ArrayList<Path>();
     Table tbl = getTable(tableName);
     if (replace) {
       tbl.replaceFiles(loadPath, isSrcLocal);
     } else {
-      tbl.copyFiles(loadPath, isSrcLocal, isAcid);
+      tbl.copyFiles(loadPath, isSrcLocal, isAcid, newFiles);
       tbl.getParameters().put(StatsSetupConst.STATS_GENERATED_VIA_STATS_TASK, 
"true");
     }
 
@@ -1606,6 +1618,7 @@ private void constructOneLBLocationMap(F
         throw new HiveException(e);
       }
     }
+    fireInsertEvent(tbl, null, newFiles);
   }
 
   /**
@@ -1693,7 +1706,7 @@ private void constructOneLBLocationMap(F
 
   public Partition getPartition(Table tbl, Map<String, String> partSpec,
       boolean forceCreate) throws HiveException {
-    return getPartition(tbl, partSpec, forceCreate, null, true);
+    return getPartition(tbl, partSpec, forceCreate, null, true, null);
   }
 
   /**
@@ -1711,8 +1724,32 @@ private void constructOneLBLocationMap(F
    * @return result partition object or null if there is no partition
    * @throws HiveException
    */
+  public Partition getPartition(Table tbl, Map<String, String> partSpec, 
boolean forceCreate,
+                                String partPath, boolean inheritTableSpecs)
+      throws HiveException {
+    return getPartition(tbl, partSpec, forceCreate, partPath, 
inheritTableSpecs, null);
+  }
+
+  /**
+   * Returns partition metadata
+   *
+   * @param tbl
+   *          the partition's table
+   * @param partSpec
+   *          partition keys and values
+   * @param forceCreate
+   *          if this is true and partition doesn't exist then a partition is
+   *          created
+   * @param partPath the path where the partition data is located
+   * @param inheritTableSpecs whether to copy over the table specs for 
if/of/serde
+   * @param newFiles An optional list of new files that were moved into this 
partition.  If
+   *                 non-null these will be included in the DML event sent to 
the metastore.
+   * @return result partition object or null if there is no partition
+   * @throws HiveException
+   */
   public Partition getPartition(Table tbl, Map<String, String> partSpec,
-      boolean forceCreate, String partPath, boolean inheritTableSpecs) throws 
HiveException {
+      boolean forceCreate, String partPath, boolean inheritTableSpecs, 
List<Path> newFiles)
+      throws HiveException {
     tbl.validatePartColumnNames(partSpec, true);
     List<String> pvals = new ArrayList<String>();
     for (FieldSchema field : tbl.getPartCols()) {
@@ -1772,6 +1809,7 @@ private void constructOneLBLocationMap(F
         }
         else {
           alterPartitionSpec(tbl, partSpec, tpart, inheritTableSpecs, 
partPath);
+          fireInsertEvent(tbl, partSpec, newFiles);
         }
       }
       if (tpart == null) {
@@ -1813,6 +1851,36 @@ private void constructOneLBLocationMap(F
     alterPartition(fullName, new Partition(tbl, tpart));
   }
 
+  private void fireInsertEvent(Table tbl, Map<String, String> partitionSpec, 
List<Path> newFiles)
+      throws HiveException {
+    if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML)) {
+      LOG.debug("Firing dml insert event");
+      FireEventRequestData data = new FireEventRequestData();
+      InsertEventRequestData insertData = new InsertEventRequestData();
+      data.setInsertData(insertData);
+      if (newFiles != null && newFiles.size() > 0) {
+        for (Path p : newFiles) {
+          insertData.addToFilesAdded(p.toString());
+        }
+      }
+      FireEventRequest rqst = new FireEventRequest(true, data);
+      rqst.setDbName(tbl.getDbName());
+      rqst.setTableName(tbl.getTableName());
+      if (partitionSpec != null && partitionSpec.size() > 0) {
+        List<String> partVals = new ArrayList<String>(partitionSpec.size());
+        for (FieldSchema fs : tbl.getPartitionKeys()) {
+          partVals.add(partitionSpec.get(fs.getName()));
+        }
+        rqst.setPartitionVals(partVals);
+      }
+      try {
+        getMSC().fireListenerEvent(rqst);
+      } catch (TException e) {
+        throw new HiveException(e);
+      }
+    }
+  }
+
   public boolean dropPartition(String tblName, List<String> part_vals, boolean 
deleteData)
       throws HiveException {
     String[] names = Utilities.getDbTableName(tblName);
@@ -2502,10 +2570,12 @@ private void constructOneLBLocationMap(F
    * @param fs Filesystem
    * @param isSrcLocal true if source is on local file system
    * @param isAcid true if this is an ACID based write
+   * @param newFiles if this is non-null, a list of files that were created as 
a result of this
+   *                 move will be returned.
    * @throws HiveException
    */
   static protected void copyFiles(HiveConf conf, Path srcf, Path destf,
-      FileSystem fs, boolean isSrcLocal, boolean isAcid) throws HiveException {
+      FileSystem fs, boolean isSrcLocal, boolean isAcid, List<Path> newFiles) 
throws HiveException {
     boolean inheritPerms = HiveConf.getBoolVar(conf,
         HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS);
     try {
@@ -2537,7 +2607,7 @@ private void constructOneLBLocationMap(F
     // If we're moving files around for an ACID write then the rules and paths 
are all different.
     // You can blame this on Owen.
     if (isAcid) {
-      moveAcidFiles(srcFs, srcs, destf);
+      moveAcidFiles(srcFs, srcs, destf, newFiles);
     } else {
     // check that source and target paths exist
       List<List<Path[]>> result = checkPaths(conf, fs, srcs, srcFs, destf, 
false);
@@ -2549,6 +2619,7 @@ private void constructOneLBLocationMap(F
               throw new IOException("Cannot move " + sdpair[0] + " to "
                   + sdpair[1]);
             }
+            if (newFiles != null) newFiles.add(sdpair[1]);
           }
         }
       } catch (IOException e) {
@@ -2557,8 +2628,8 @@ private void constructOneLBLocationMap(F
     }
   }
 
-  private static void moveAcidFiles(FileSystem fs, FileStatus[] stats, Path 
dst)
-      throws HiveException {
+  private static void moveAcidFiles(FileSystem fs, FileStatus[] stats, Path 
dst,
+                                    List<Path> newFiles) throws HiveException {
     // The layout for ACID files is table|partname/base|delta/bucket
     // We will always only be writing delta files.  In the buckets created by 
FileSinkOperator
     // it will look like bucket/delta/bucket.  So we need to move that into 
the above structure.
@@ -2622,6 +2693,7 @@ private void constructOneLBLocationMap(F
               LOG.info("Moving bucket " + bucketSrc.toUri().toString() + " to 
" +
                   bucketDest.toUri().toString());
               fs.rename(bucketSrc, bucketDest);
+              if (newFiles != null) newFiles.add(bucketDest);
             }
           } catch (IOException e) {
             throw new HiveException("Error moving acid files " + 
e.getMessage(), e);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java?rev=1657387&r1=1657386&r2=1657387&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java 
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java Wed 
Feb  4 19:24:12 2015
@@ -650,12 +650,15 @@ public class Table implements Serializab
    *          If the source directory is LOCAL
    * @param isAcid
    *          True if this is an ACID based insert, update, or delete
+   * @param newFiles optional list of paths.  If non-null, then all files 
copyied to the table
+   *                 will be added to this list.
    */
-  protected void copyFiles(Path srcf, boolean isSrcLocal, boolean isAcid) 
throws HiveException {
+  protected void copyFiles(Path srcf, boolean isSrcLocal, boolean isAcid, 
List<Path> newFiles)
+      throws HiveException {
     FileSystem fs;
     try {
       fs = getDataLocation().getFileSystem(Hive.get().getConf());
-      Hive.copyFiles(Hive.get().getConf(), srcf, getPath(), fs, isSrcLocal, 
isAcid);
+      Hive.copyFiles(Hive.get().getConf(), srcf, getPath(), fs, isSrcLocal, 
isAcid, newFiles);
     } catch (IOException e) {
       throw new HiveException("addFiles: filesystem error in check phase", e);
     }


Reply via email to