Repository: hive
Updated Branches:
  refs/heads/master 12f5550ca -> bbd99ed60


http://git-wip-us.apache.org/repos/asf/hive/blob/bbd99ed6/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py 
b/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
index 7927a46..2f1c3cf 100644
--- a/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
+++ b/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
@@ -11144,15 +11144,18 @@ class InsertEventRequestData:
   """
   Attributes:
    - filesAdded
+   - filesAddedChecksum
   """
 
   thrift_spec = (
     None, # 0
     (1, TType.LIST, 'filesAdded', (TType.STRING,None), None, ), # 1
+    (2, TType.LIST, 'filesAddedChecksum', (TType.STRING,None), None, ), # 2
   )
 
-  def __init__(self, filesAdded=None,):
+  def __init__(self, filesAdded=None, filesAddedChecksum=None,):
     self.filesAdded = filesAdded
+    self.filesAddedChecksum = filesAddedChecksum
 
   def read(self, iprot):
     if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and 
isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is 
not None and fastbinary is not None:
@@ -11173,6 +11176,16 @@ class InsertEventRequestData:
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
+      elif fid == 2:
+        if ftype == TType.LIST:
+          self.filesAddedChecksum = []
+          (_etype501, _size498) = iprot.readListBegin()
+          for _i502 in xrange(_size498):
+            _elem503 = iprot.readString()
+            self.filesAddedChecksum.append(_elem503)
+          iprot.readListEnd()
+        else:
+          iprot.skip(ftype)
       else:
         iprot.skip(ftype)
       iprot.readFieldEnd()
@@ -11186,8 +11199,15 @@ class InsertEventRequestData:
     if self.filesAdded is not None:
       oprot.writeFieldBegin('filesAdded', TType.LIST, 1)
       oprot.writeListBegin(TType.STRING, len(self.filesAdded))
-      for iter498 in self.filesAdded:
-        oprot.writeString(iter498)
+      for iter504 in self.filesAdded:
+        oprot.writeString(iter504)
+      oprot.writeListEnd()
+      oprot.writeFieldEnd()
+    if self.filesAddedChecksum is not None:
+      oprot.writeFieldBegin('filesAddedChecksum', TType.LIST, 2)
+      oprot.writeListBegin(TType.STRING, len(self.filesAddedChecksum))
+      for iter505 in self.filesAddedChecksum:
+        oprot.writeString(iter505)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()
@@ -11202,6 +11222,7 @@ class InsertEventRequestData:
   def __hash__(self):
     value = 17
     value = (value * 31) ^ hash(self.filesAdded)
+    value = (value * 31) ^ hash(self.filesAddedChecksum)
     return value
 
   def __repr__(self):
@@ -11340,10 +11361,10 @@ class FireEventRequest:
       elif fid == 5:
         if ftype == TType.LIST:
           self.partitionVals = []
-          (_etype502, _size499) = iprot.readListBegin()
-          for _i503 in xrange(_size499):
-            _elem504 = iprot.readString()
-            self.partitionVals.append(_elem504)
+          (_etype509, _size506) = iprot.readListBegin()
+          for _i510 in xrange(_size506):
+            _elem511 = iprot.readString()
+            self.partitionVals.append(_elem511)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -11376,8 +11397,8 @@ class FireEventRequest:
     if self.partitionVals is not None:
       oprot.writeFieldBegin('partitionVals', TType.LIST, 5)
       oprot.writeListBegin(TType.STRING, len(self.partitionVals))
-      for iter505 in self.partitionVals:
-        oprot.writeString(iter505)
+      for iter512 in self.partitionVals:
+        oprot.writeString(iter512)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()
@@ -11564,12 +11585,12 @@ class GetFileMetadataByExprResult:
       if fid == 1:
         if ftype == TType.MAP:
           self.metadata = {}
-          (_ktype507, _vtype508, _size506 ) = iprot.readMapBegin()
-          for _i510 in xrange(_size506):
-            _key511 = iprot.readI64()
-            _val512 = MetadataPpdResult()
-            _val512.read(iprot)
-            self.metadata[_key511] = _val512
+          (_ktype514, _vtype515, _size513 ) = iprot.readMapBegin()
+          for _i517 in xrange(_size513):
+            _key518 = iprot.readI64()
+            _val519 = MetadataPpdResult()
+            _val519.read(iprot)
+            self.metadata[_key518] = _val519
           iprot.readMapEnd()
         else:
           iprot.skip(ftype)
@@ -11591,9 +11612,9 @@ class GetFileMetadataByExprResult:
     if self.metadata is not None:
       oprot.writeFieldBegin('metadata', TType.MAP, 1)
       oprot.writeMapBegin(TType.I64, TType.STRUCT, len(self.metadata))
-      for kiter513,viter514 in self.metadata.items():
-        oprot.writeI64(kiter513)
-        viter514.write(oprot)
+      for kiter520,viter521 in self.metadata.items():
+        oprot.writeI64(kiter520)
+        viter521.write(oprot)
       oprot.writeMapEnd()
       oprot.writeFieldEnd()
     if self.isSupported is not None:
@@ -11663,10 +11684,10 @@ class GetFileMetadataByExprRequest:
       if fid == 1:
         if ftype == TType.LIST:
           self.fileIds = []
-          (_etype518, _size515) = iprot.readListBegin()
-          for _i519 in xrange(_size515):
-            _elem520 = iprot.readI64()
-            self.fileIds.append(_elem520)
+          (_etype525, _size522) = iprot.readListBegin()
+          for _i526 in xrange(_size522):
+            _elem527 = iprot.readI64()
+            self.fileIds.append(_elem527)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -11698,8 +11719,8 @@ class GetFileMetadataByExprRequest:
     if self.fileIds is not None:
       oprot.writeFieldBegin('fileIds', TType.LIST, 1)
       oprot.writeListBegin(TType.I64, len(self.fileIds))
-      for iter521 in self.fileIds:
-        oprot.writeI64(iter521)
+      for iter528 in self.fileIds:
+        oprot.writeI64(iter528)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     if self.expr is not None:
@@ -11773,11 +11794,11 @@ class GetFileMetadataResult:
       if fid == 1:
         if ftype == TType.MAP:
           self.metadata = {}
-          (_ktype523, _vtype524, _size522 ) = iprot.readMapBegin()
-          for _i526 in xrange(_size522):
-            _key527 = iprot.readI64()
-            _val528 = iprot.readString()
-            self.metadata[_key527] = _val528
+          (_ktype530, _vtype531, _size529 ) = iprot.readMapBegin()
+          for _i533 in xrange(_size529):
+            _key534 = iprot.readI64()
+            _val535 = iprot.readString()
+            self.metadata[_key534] = _val535
           iprot.readMapEnd()
         else:
           iprot.skip(ftype)
@@ -11799,9 +11820,9 @@ class GetFileMetadataResult:
     if self.metadata is not None:
       oprot.writeFieldBegin('metadata', TType.MAP, 1)
       oprot.writeMapBegin(TType.I64, TType.STRING, len(self.metadata))
-      for kiter529,viter530 in self.metadata.items():
-        oprot.writeI64(kiter529)
-        oprot.writeString(viter530)
+      for kiter536,viter537 in self.metadata.items():
+        oprot.writeI64(kiter536)
+        oprot.writeString(viter537)
       oprot.writeMapEnd()
       oprot.writeFieldEnd()
     if self.isSupported is not None:
@@ -11862,10 +11883,10 @@ class GetFileMetadataRequest:
       if fid == 1:
         if ftype == TType.LIST:
           self.fileIds = []
-          (_etype534, _size531) = iprot.readListBegin()
-          for _i535 in xrange(_size531):
-            _elem536 = iprot.readI64()
-            self.fileIds.append(_elem536)
+          (_etype541, _size538) = iprot.readListBegin()
+          for _i542 in xrange(_size538):
+            _elem543 = iprot.readI64()
+            self.fileIds.append(_elem543)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -11882,8 +11903,8 @@ class GetFileMetadataRequest:
     if self.fileIds is not None:
       oprot.writeFieldBegin('fileIds', TType.LIST, 1)
       oprot.writeListBegin(TType.I64, len(self.fileIds))
-      for iter537 in self.fileIds:
-        oprot.writeI64(iter537)
+      for iter544 in self.fileIds:
+        oprot.writeI64(iter544)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()
@@ -11989,20 +12010,20 @@ class PutFileMetadataRequest:
       if fid == 1:
         if ftype == TType.LIST:
           self.fileIds = []
-          (_etype541, _size538) = iprot.readListBegin()
-          for _i542 in xrange(_size538):
-            _elem543 = iprot.readI64()
-            self.fileIds.append(_elem543)
+          (_etype548, _size545) = iprot.readListBegin()
+          for _i549 in xrange(_size545):
+            _elem550 = iprot.readI64()
+            self.fileIds.append(_elem550)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
       elif fid == 2:
         if ftype == TType.LIST:
           self.metadata = []
-          (_etype547, _size544) = iprot.readListBegin()
-          for _i548 in xrange(_size544):
-            _elem549 = iprot.readString()
-            self.metadata.append(_elem549)
+          (_etype554, _size551) = iprot.readListBegin()
+          for _i555 in xrange(_size551):
+            _elem556 = iprot.readString()
+            self.metadata.append(_elem556)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -12024,15 +12045,15 @@ class PutFileMetadataRequest:
     if self.fileIds is not None:
       oprot.writeFieldBegin('fileIds', TType.LIST, 1)
       oprot.writeListBegin(TType.I64, len(self.fileIds))
-      for iter550 in self.fileIds:
-        oprot.writeI64(iter550)
+      for iter557 in self.fileIds:
+        oprot.writeI64(iter557)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     if self.metadata is not None:
       oprot.writeFieldBegin('metadata', TType.LIST, 2)
       oprot.writeListBegin(TType.STRING, len(self.metadata))
-      for iter551 in self.metadata:
-        oprot.writeString(iter551)
+      for iter558 in self.metadata:
+        oprot.writeString(iter558)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     if self.type is not None:
@@ -12140,10 +12161,10 @@ class ClearFileMetadataRequest:
       if fid == 1:
         if ftype == TType.LIST:
           self.fileIds = []
-          (_etype555, _size552) = iprot.readListBegin()
-          for _i556 in xrange(_size552):
-            _elem557 = iprot.readI64()
-            self.fileIds.append(_elem557)
+          (_etype562, _size559) = iprot.readListBegin()
+          for _i563 in xrange(_size559):
+            _elem564 = iprot.readI64()
+            self.fileIds.append(_elem564)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -12160,8 +12181,8 @@ class ClearFileMetadataRequest:
     if self.fileIds is not None:
       oprot.writeFieldBegin('fileIds', TType.LIST, 1)
       oprot.writeListBegin(TType.I64, len(self.fileIds))
-      for iter558 in self.fileIds:
-        oprot.writeI64(iter558)
+      for iter565 in self.fileIds:
+        oprot.writeI64(iter565)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()
@@ -12390,11 +12411,11 @@ class GetAllFunctionsResponse:
       if fid == 1:
         if ftype == TType.LIST:
           self.functions = []
-          (_etype562, _size559) = iprot.readListBegin()
-          for _i563 in xrange(_size559):
-            _elem564 = Function()
-            _elem564.read(iprot)
-            self.functions.append(_elem564)
+          (_etype569, _size566) = iprot.readListBegin()
+          for _i570 in xrange(_size566):
+            _elem571 = Function()
+            _elem571.read(iprot)
+            self.functions.append(_elem571)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -12411,8 +12432,8 @@ class GetAllFunctionsResponse:
     if self.functions is not None:
       oprot.writeFieldBegin('functions', TType.LIST, 1)
       oprot.writeListBegin(TType.STRUCT, len(self.functions))
-      for iter565 in self.functions:
-        iter565.write(oprot)
+      for iter572 in self.functions:
+        iter572.write(oprot)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()
@@ -12464,10 +12485,10 @@ class ClientCapabilities:
       if fid == 1:
         if ftype == TType.LIST:
           self.values = []
-          (_etype569, _size566) = iprot.readListBegin()
-          for _i570 in xrange(_size566):
-            _elem571 = iprot.readI32()
-            self.values.append(_elem571)
+          (_etype576, _size573) = iprot.readListBegin()
+          for _i577 in xrange(_size573):
+            _elem578 = iprot.readI32()
+            self.values.append(_elem578)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -12484,8 +12505,8 @@ class ClientCapabilities:
     if self.values is not None:
       oprot.writeFieldBegin('values', TType.LIST, 1)
       oprot.writeListBegin(TType.I32, len(self.values))
-      for iter572 in self.values:
-        oprot.writeI32(iter572)
+      for iter579 in self.values:
+        oprot.writeI32(iter579)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()
@@ -12714,10 +12735,10 @@ class GetTablesRequest:
       elif fid == 2:
         if ftype == TType.LIST:
           self.tblNames = []
-          (_etype576, _size573) = iprot.readListBegin()
-          for _i577 in xrange(_size573):
-            _elem578 = iprot.readString()
-            self.tblNames.append(_elem578)
+          (_etype583, _size580) = iprot.readListBegin()
+          for _i584 in xrange(_size580):
+            _elem585 = iprot.readString()
+            self.tblNames.append(_elem585)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -12744,8 +12765,8 @@ class GetTablesRequest:
     if self.tblNames is not None:
       oprot.writeFieldBegin('tblNames', TType.LIST, 2)
       oprot.writeListBegin(TType.STRING, len(self.tblNames))
-      for iter579 in self.tblNames:
-        oprot.writeString(iter579)
+      for iter586 in self.tblNames:
+        oprot.writeString(iter586)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     if self.capabilities is not None:
@@ -12805,11 +12826,11 @@ class GetTablesResult:
       if fid == 1:
         if ftype == TType.LIST:
           self.tables = []
-          (_etype583, _size580) = iprot.readListBegin()
-          for _i584 in xrange(_size580):
-            _elem585 = Table()
-            _elem585.read(iprot)
-            self.tables.append(_elem585)
+          (_etype590, _size587) = iprot.readListBegin()
+          for _i591 in xrange(_size587):
+            _elem592 = Table()
+            _elem592.read(iprot)
+            self.tables.append(_elem592)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -12826,8 +12847,8 @@ class GetTablesResult:
     if self.tables is not None:
       oprot.writeFieldBegin('tables', TType.LIST, 1)
       oprot.writeListBegin(TType.STRUCT, len(self.tables))
-      for iter586 in self.tables:
-        iter586.write(oprot)
+      for iter593 in self.tables:
+        iter593.write(oprot)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()

http://git-wip-us.apache.org/repos/asf/hive/blob/bbd99ed6/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb 
b/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
index c82edd6..ebed504 100644
--- a/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
+++ b/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
@@ -2502,9 +2502,11 @@ end
 class InsertEventRequestData
   include ::Thrift::Struct, ::Thrift::Struct_Union
   FILESADDED = 1
+  FILESADDEDCHECKSUM = 2
 
   FIELDS = {
-    FILESADDED => {:type => ::Thrift::Types::LIST, :name => 'filesAdded', 
:element => {:type => ::Thrift::Types::STRING}}
+    FILESADDED => {:type => ::Thrift::Types::LIST, :name => 'filesAdded', 
:element => {:type => ::Thrift::Types::STRING}},
+    FILESADDEDCHECKSUM => {:type => ::Thrift::Types::LIST, :name => 
'filesAddedChecksum', :element => {:type => ::Thrift::Types::STRING, :binary => 
true}, :optional => true}
   }
 
   def struct_fields; FIELDS; end

http://git-wip-us.apache.org/repos/asf/hive/blob/bbd99ed6/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java 
b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index d0a66b0..2892da3 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -6485,23 +6485,23 @@ public class HiveMetaStore extends ThriftHiveMetastore {
     @Override
     public FireEventResponse fire_listener_event(FireEventRequest rqst) throws 
TException {
       switch (rqst.getData().getSetField()) {
-        case INSERT_DATA:
-          InsertEvent event = new InsertEvent(rqst.getDbName(), 
rqst.getTableName(),
-              rqst.getPartitionVals(), 
rqst.getData().getInsertData().getFilesAdded(),
-              rqst.isSuccessful(), this);
-          for (MetaStoreEventListener transactionalListener : 
transactionalListeners) {
-            transactionalListener.onInsert(event);
-          }
+      case INSERT_DATA:
+        InsertEvent event =
+            new InsertEvent(rqst.getDbName(), rqst.getTableName(), 
rqst.getPartitionVals(), rqst
+                .getData().getInsertData(), rqst.isSuccessful(), this);
+        for (MetaStoreEventListener transactionalListener : 
transactionalListeners) {
+          transactionalListener.onInsert(event);
+        }
 
-          for (MetaStoreEventListener listener : listeners) {
-            listener.onInsert(event);
-          }
+        for (MetaStoreEventListener listener : listeners) {
+          listener.onInsert(event);
+        }
 
-          return new FireEventResponse();
+        return new FireEventResponse();
 
-        default:
-          throw new TException("Event type " + 
rqst.getData().getSetField().toString() +
-              " not currently supported.");
+      default:
+        throw new TException("Event type " + 
rqst.getData().getSetField().toString()
+            + " not currently supported.");
       }
 
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/bbd99ed6/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java 
b/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java
index 102754e..d9a42a7 100644
--- 
a/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java
+++ 
b/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java
@@ -18,18 +18,16 @@
 
 package org.apache.hadoop.hive.metastore.events;
 
-import org.apache.hadoop.hive.metastore.api.ClientCapabilities;
-
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-
 import org.apache.hadoop.hive.metastore.api.GetTableRequest;
-
 import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.InsertEventRequestData;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Table;
 
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -40,24 +38,26 @@ public class InsertEvent extends ListenerEvent {
   // we have just the string names, but that's fine for what we need.
   private final String db;
   private final String table;
-  private final Map<String,String> keyValues;
+  private final Map<String, String> keyValues;
   private final List<String> files;
+  private List<ByteBuffer> fileChecksums = new ArrayList<ByteBuffer>();
 
   /**
    *
    * @param db name of the database the table is in
    * @param table name of the table being inserted into
    * @param partVals list of partition values, can be null
+   * @param insertData the inserted files & their checksums
    * @param status status of insert, true = success, false = failure
    * @param handler handler that is firing the event
    */
-  public InsertEvent(String db, String table, List<String> partVals, 
List<String> files,
-                     boolean status, HMSHandler handler) throws MetaException, 
NoSuchObjectException {
+  public InsertEvent(String db, String table, List<String> partVals,
+      InsertEventRequestData insertData, boolean status, HMSHandler handler) 
throws MetaException,
+      NoSuchObjectException {
     super(status, handler);
     this.db = db;
     this.table = table;
-    this.files = files;
-    // TODO: why does this use the handler directly?
+    this.files = insertData.getFilesAdded();
     GetTableRequest req = new GetTableRequest(db, table);
     req.setCapabilities(HiveMetaStoreClient.TEST_VERSION);
     Table t = handler.get_table_req(req).getTable();
@@ -67,11 +67,15 @@ public class InsertEvent extends ListenerEvent {
         keyValues.put(t.getPartitionKeys().get(i).getName(), partVals.get(i));
       }
     }
+    if (insertData.isSetFilesAddedChecksum()) {
+      fileChecksums = insertData.getFilesAddedChecksum();
+    }
   }
 
   public String getDb() {
     return db;
   }
+
   /**
    * @return The table.
    */
@@ -82,15 +86,25 @@ public class InsertEvent extends ListenerEvent {
   /**
    * @return List of values for the partition keys.
    */
-  public Map<String,String> getPartitionKeyValues() {
+  public Map<String, String> getPartitionKeyValues() {
     return keyValues;
   }
 
   /**
    * Get list of files created as a result of this DML operation
+   *
    * @return list of new files
    */
   public List<String> getFiles() {
     return files;
   }
+
+  /**
+   * Get a list of file checksums corresponding to the files created (if 
available)
+   *
+   * @return
+   */
+  public List<ByteBuffer> getFileChecksums() {
+    return fileChecksums;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/bbd99ed6/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
 
b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
index adf2fd8..fdb8e80 100644
--- 
a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
+++ 
b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.util.ReflectionUtils;
 
+import java.nio.ByteBuffer;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -229,13 +230,28 @@ public abstract class MessageFactory {
 
   /**
    * Factory method for building insert message
+   *
    * @param db Name of the database the insert occurred in
    * @param table Name of the table the insert occurred in
-   * @param partVals Partition values for the partition that the insert 
occurred in, may be null
-   *                 if the insert was done into a non-partitioned table
+   * @param partVals Partition values for the partition that the insert 
occurred in, may be null if
+   *          the insert was done into a non-partitioned table
    * @param files List of files created as a result of the insert, may be null.
    * @return instance of InsertMessage
    */
   public abstract InsertMessage buildInsertMessage(String db, String table,
-                                                   Map<String,String> 
partVals, List<String> files);
+      Map<String, String> partVals, List<String> files);
+
+  /**
+   * Factory method for building insert message
+   *
+   * @param db Name of the database the insert occurred in
+   * @param table Name of the table the insert occurred in
+   * @param partVals Partition values for the partition that the insert 
occurred in, may be null if
+   *          the insert was done into a non-partitioned table
+   * @param files List of files created as a result of the insert, may be null
+   * @param fileChecksums List of checksums corresponding to the files added 
during insert
+   * @return instance of InsertMessage
+   */
+  public abstract InsertMessage buildInsertMessage(String db, String table,
+      Map<String, String> partVals, List<String> files, List<ByteBuffer> 
fileChecksums);
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/bbd99ed6/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java
 
b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java
index ef89b17..bd9f9ec 100644
--- 
a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java
+++ 
b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java
@@ -22,11 +22,13 @@ package org.apache.hadoop.hive.metastore.messaging.json;
 import org.apache.hadoop.hive.metastore.messaging.InsertMessage;
 import org.codehaus.jackson.annotate.JsonProperty;
 
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
 /**
- * JSON implementation of DropTableMessage.
+ * JSON implementation of InsertMessage
  */
 public class JSONInsertMessage extends InsertMessage {
 
@@ -40,15 +42,19 @@ public class JSONInsertMessage extends InsertMessage {
   List<String> files;
 
   @JsonProperty
-  Map<String,String> partKeyVals;
+  Map<String, String> partKeyVals;
+
+  @JsonProperty
+  List<byte[]> fileChecksums;
 
   /**
    * Default constructor, needed for Jackson.
    */
-  public JSONInsertMessage() {}
+  public JSONInsertMessage() {
+  }
 
   public JSONInsertMessage(String server, String servicePrincipal, String db, 
String table,
-                           Map<String,String> partKeyVals, List<String> files, 
Long timestamp) {
+      Map<String, String> partKeyVals, List<String> files, Long timestamp) {
     this.server = server;
     this.servicePrincipal = servicePrincipal;
     this.db = db;
@@ -59,15 +65,30 @@ public class JSONInsertMessage extends InsertMessage {
     checkValid();
   }
 
+  public JSONInsertMessage(String server, String servicePrincipal, String db, 
String table,
+      Map<String, String> partKeyVals, List<String> files, List<ByteBuffer> 
checksums,
+      Long timestamp) {
+    this(server, servicePrincipal, db, table, partKeyVals, files, timestamp);
+    fileChecksums = new ArrayList<byte[]>();
+    for (ByteBuffer checksum : checksums) {
+      byte[] checksumBytes = new byte[checksum.remaining()];
+      checksum.get(checksumBytes);
+      fileChecksums.add(checksumBytes);
+    }
+  }
 
   @Override
-  public String getTable() { return table; }
+  public String getTable() {
+    return table;
+  }
 
   @Override
-  public String getServer() { return server; }
+  public String getServer() {
+    return server;
+  }
 
   @Override
-  public Map<String,String> getPartitionKeyValues() {
+  public Map<String, String> getPartitionKeyValues() {
     return partKeyVals;
   }
 
@@ -77,20 +98,29 @@ public class JSONInsertMessage extends InsertMessage {
   }
 
   @Override
-  public String getServicePrincipal() { return servicePrincipal; }
+  public String getServicePrincipal() {
+    return servicePrincipal;
+  }
 
   @Override
-  public String getDB() { return db; }
+  public String getDB() {
+    return db;
+  }
 
   @Override
-  public Long getTimestamp() { return timestamp; }
+  public Long getTimestamp() {
+    return timestamp;
+  }
+
+  public List<byte[]> getFileChecksums() {
+    return fileChecksums;
+  }
 
   @Override
   public String toString() {
     try {
       return JSONMessageDeserializer.mapper.writeValueAsString(this);
-    }
-    catch (Exception exception) {
+    } catch (Exception exception) {
       throw new IllegalArgumentException("Could not serialize: ", exception);
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/bbd99ed6/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
 
b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
index 0407210..c57f577 100644
--- 
a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
+++ 
b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
@@ -19,7 +19,9 @@
 
 package org.apache.hadoop.hive.metastore.messaging.json;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -57,6 +59,7 @@ import org.codehaus.jackson.JsonFactory;
 import org.codehaus.jackson.JsonNode;
 import org.codehaus.jackson.JsonParser;
 import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.node.ArrayNode;
 import org.codehaus.jackson.node.ObjectNode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -168,6 +171,13 @@ public class JSONMessageFactory extends MessageFactory {
         files, now());
   }
 
+  @Override
+  public InsertMessage buildInsertMessage(String db, String table, Map<String, 
String> partKeyVals,
+      List<String> files, List<ByteBuffer> fileChecksums) {
+    return new JSONInsertMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, db, 
table, partKeyVals,
+        files, fileChecksums, now());
+  }
+
   private long now() {
     return System.currentTimeMillis() / 1000;
   }
@@ -255,4 +265,30 @@ public class JSONMessageFactory extends MessageFactory {
     deSerializer.deserialize(indexObj, tableJson, "UTF-8");
     return indexObj;
   }
+
+  /**
+   * Convert a json ArrayNode to an ordered list of Strings
+   *
+   * @param arrayNode: the json array node
+   * @param elements: the list to populate
+   * @return
+   */
+  public static List<String> getAsList(ArrayNode arrayNode, List<String> 
elements) {
+    Iterator<JsonNode> arrayNodeIterator = arrayNode.iterator();
+    while (arrayNodeIterator.hasNext()) {
+      JsonNode node = arrayNodeIterator.next();
+      elements.add(node.asText());
+    }
+    return elements;
+  }
+
+  public static LinkedHashMap<String, String> getAsMap(ObjectNode objectNode,
+      LinkedHashMap<String, String> hashMap) {
+    Iterator<Map.Entry<String, JsonNode>> objectNodeIterator = 
objectNode.getFields();
+    while (objectNodeIterator.hasNext()) {
+      Map.Entry<String, JsonNode> objectAsMap = objectNodeIterator.next();
+      hashMap.put(objectAsMap.getKey(), objectAsMap.getValue().asText());
+    }
+    return hashMap;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/bbd99ed6/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java 
b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index f62d5f3..be5a6a9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -26,7 +26,8 @@ import static 
org.apache.hadoop.hive.serde.serdeConstants.LINE_DELIM;
 import static org.apache.hadoop.hive.serde.serdeConstants.MAPKEY_DELIM;
 import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_FORMAT;
 import static org.apache.hadoop.hive.serde.serdeConstants.STRING_TYPE_NAME;
-
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.PrintStream;
@@ -57,6 +58,7 @@ import javax.jdo.JDODataStoreException;
 import org.apache.calcite.plan.RelOptMaterialization;
 import org.apache.commons.io.FilenameUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileChecksum;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -2336,29 +2338,39 @@ private void constructOneLBLocationMap(FileStatus fSta,
         LOG.debug("Not firing dml insert event as " + tbl.getTableName() + " 
is temporary");
         return;
       }
-      FireEventRequestData data = new FireEventRequestData();
-      InsertEventRequestData insertData = new InsertEventRequestData();
-      data.setInsertData(insertData);
-      if (newFiles != null && newFiles.size() > 0) {
-        for (Path p : newFiles) {
-          insertData.addToFilesAdded(p.toString());
+      try {
+        FileSystem fileSystem = tbl.getDataLocation().getFileSystem(conf);
+        FireEventRequestData data = new FireEventRequestData();
+        InsertEventRequestData insertData = new InsertEventRequestData();
+        data.setInsertData(insertData);
+        if (newFiles != null && newFiles.size() > 0) {
+          for (Path p : newFiles) {
+            insertData.addToFilesAdded(p.toString());
+            FileChecksum cksum = fileSystem.getFileChecksum(p);
+            // File checksum is not implemented for local filesystem 
(RawLocalFileSystem)
+            if (cksum != null) {
+              ByteArrayOutputStream baos = new ByteArrayOutputStream();
+              cksum.write(new DataOutputStream(baos));
+              
insertData.addToFilesAddedChecksum(ByteBuffer.wrap(baos.toByteArray()));
+            } else {
+              insertData.addToFilesAddedChecksum(ByteBuffer.allocate(0));
+            }
+          }
+        } else {
+          insertData.setFilesAdded(new ArrayList<String>());
         }
-      } else {
-        insertData.setFilesAdded(new ArrayList<String>());
-      }
-      FireEventRequest rqst = new FireEventRequest(true, data);
-      rqst.setDbName(tbl.getDbName());
-      rqst.setTableName(tbl.getTableName());
-      if (partitionSpec != null && partitionSpec.size() > 0) {
-        List<String> partVals = new ArrayList<String>(partitionSpec.size());
-        for (FieldSchema fs : tbl.getPartitionKeys()) {
-          partVals.add(partitionSpec.get(fs.getName()));
+        FireEventRequest rqst = new FireEventRequest(true, data);
+        rqst.setDbName(tbl.getDbName());
+        rqst.setTableName(tbl.getTableName());
+        if (partitionSpec != null && partitionSpec.size() > 0) {
+          List<String> partVals = new ArrayList<String>(partitionSpec.size());
+          for (FieldSchema fs : tbl.getPartitionKeys()) {
+            partVals.add(partitionSpec.get(fs.getName()));
+          }
+          rqst.setPartitionVals(partVals);
         }
-        rqst.setPartitionVals(partVals);
-      }
-      try {
         getMSC().fireListenerEvent(rqst);
-      } catch (TException e) {
+      } catch (IOException | TException e) {
         throw new HiveException(e);
       }
     }
@@ -3019,7 +3031,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
        *
        * I'll leave the below loop for now until a better approach is found.
        */
-    
+
     int counter = 1;
     if (!isRenameAllowed || isBlobStoragePath) {
       while (destFs.exists(destFilePath)) {

http://git-wip-us.apache.org/repos/asf/hive/blob/bbd99ed6/shims/common/src/main/java/org/apache/hadoop/fs/ProxyLocalFileSystem.java
----------------------------------------------------------------------
diff --git 
a/shims/common/src/main/java/org/apache/hadoop/fs/ProxyLocalFileSystem.java 
b/shims/common/src/main/java/org/apache/hadoop/fs/ProxyLocalFileSystem.java
index bd97521..197b965 100644
--- a/shims/common/src/main/java/org/apache/hadoop/fs/ProxyLocalFileSystem.java
+++ b/shims/common/src/main/java/org/apache/hadoop/fs/ProxyLocalFileSystem.java
@@ -18,18 +18,21 @@
 
 package org.apache.hadoop.fs;
 
+import java.io.DataInput;
+import java.io.DataOutput;
 import java.io.IOException;
 import java.net.URI;
-
+import java.security.MessageDigest;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.io.MD5Hash;
 
 /****************************************************************
  * A Proxy for LocalFileSystem
  *
- * As an example, it serves uri's corresponding to:
- * 'pfile:///' namespace with using a LocalFileSystem
+ * As an example, it serves uri's corresponding to: 'pfile:///' namespace with 
using a
+ * LocalFileSystem
  *****************************************************************/
 
 public class ProxyLocalFileSystem extends FilterFileSystem {
@@ -46,7 +49,7 @@ public class ProxyLocalFileSystem extends FilterFileSystem {
   }
 
   public ProxyLocalFileSystem(FileSystem fs) {
-    throw new RuntimeException ("Unsupported Constructor");
+    throw new RuntimeException("Unsupported Constructor");
   }
 
   @Override
@@ -59,17 +62,16 @@ public class ProxyLocalFileSystem extends FilterFileSystem {
     if (Shell.WINDOWS) {
       // Replace the encoded backward slash with forward slash
       // Remove the windows drive letter
-      nameUriString = nameUriString.replaceAll("%5C", "/")
-          .replaceFirst("/[c-zC-Z]:", "/")
-          .replaceFirst("^[c-zC-Z]:", "");
+      nameUriString =
+          nameUriString.replaceAll("%5C", "/").replaceFirst("/[c-zC-Z]:", "/")
+              .replaceFirst("^[c-zC-Z]:", "");
       name = URI.create(nameUriString);
     }
 
     String authority = name.getAuthority() != null ? name.getAuthority() : "";
     String proxyUriString = scheme + "://" + authority + "/";
 
-    fs = ShimLoader.getHadoopShims().createProxyFileSystem(
-        localFs, URI.create(proxyUriString));
+    fs = ShimLoader.getHadoopShims().createProxyFileSystem(localFs, 
URI.create(proxyUriString));
 
     fs.initialize(name, conf);
   }
@@ -78,4 +80,88 @@ public class ProxyLocalFileSystem extends FilterFileSystem {
   public String getScheme() {
     return scheme;
   }
+
+  @Override
+  // For pfile, calculate the checksum for use in testing
+  public FileChecksum getFileChecksum(Path f) throws IOException {
+    if (scheme.equalsIgnoreCase("pfile") && fs.isFile(f)) {
+      return getPFileChecksum(f);
+    }
+    return fs.getFileChecksum(f);
+  }
+
+  private FileChecksum getPFileChecksum(Path f) throws IOException {
+    MessageDigest md5Digest;
+    try {
+      md5Digest = MessageDigest.getInstance("MD5");
+      MD5Hash md5Hash = new MD5Hash(getMD5Checksum(fs.open(f)));
+      return new PFileChecksum(md5Hash, md5Digest.getAlgorithm());
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * Calculate MD5 checksum from data in FSDataInputStream
+   * @param fsInputStream
+   * @return byte array with md5 checksum bytes
+   * @throws Exception
+   */
+  static byte[] getMD5Checksum(FSDataInputStream fsInputStream) throws 
Exception {
+    byte[] buffer = new byte[1024];
+    MessageDigest md5Digest = MessageDigest.getInstance("MD5");
+    int numRead = 0;
+    while (numRead != -1) {
+      numRead = fsInputStream.read(buffer);
+      if (numRead > 0) {
+        md5Digest.update(buffer, 0, numRead);
+      }
+    }
+    fsInputStream.close();
+    return md5Digest.digest();
+  }
+
+  /**
+   * Checksum implementation for PFile uses in testing
+   */
+  public static class PFileChecksum extends FileChecksum {
+    private MD5Hash md5;
+    private String algorithmName;
+
+    public PFileChecksum(MD5Hash md5, String algorithmName) {
+      this.md5 = md5;
+      this.algorithmName = algorithmName;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      md5.write(out);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      md5.readFields(in);
+    }
+
+    @Override
+    public String getAlgorithmName() {
+      return algorithmName;
+    }
+
+    @Override
+    public int getLength() {
+      if (md5 != null) {
+        return md5.getDigest().length;
+      }
+      return 0;
+    }
+
+    @Override
+    public byte[] getBytes() {
+      if (md5 != null) {
+        return md5.getDigest();
+      }
+      return new byte[0];
+    }
+  }
 }

Reply via email to