Repository: hive Updated Branches: refs/heads/master 12f5550ca -> bbd99ed60
http://git-wip-us.apache.org/repos/asf/hive/blob/bbd99ed6/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py ---------------------------------------------------------------------- diff --git a/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py b/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py index 7927a46..2f1c3cf 100644 --- a/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py +++ b/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py @@ -11144,15 +11144,18 @@ class InsertEventRequestData: """ Attributes: - filesAdded + - filesAddedChecksum """ thrift_spec = ( None, # 0 (1, TType.LIST, 'filesAdded', (TType.STRING,None), None, ), # 1 + (2, TType.LIST, 'filesAddedChecksum', (TType.STRING,None), None, ), # 2 ) - def __init__(self, filesAdded=None,): + def __init__(self, filesAdded=None, filesAddedChecksum=None,): self.filesAdded = filesAdded + self.filesAddedChecksum = filesAddedChecksum def read(self, iprot): if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: @@ -11173,6 +11176,16 @@ class InsertEventRequestData: iprot.readListEnd() else: iprot.skip(ftype) + elif fid == 2: + if ftype == TType.LIST: + self.filesAddedChecksum = [] + (_etype501, _size498) = iprot.readListBegin() + for _i502 in xrange(_size498): + _elem503 = iprot.readString() + self.filesAddedChecksum.append(_elem503) + iprot.readListEnd() + else: + iprot.skip(ftype) else: iprot.skip(ftype) iprot.readFieldEnd() @@ -11186,8 +11199,15 @@ class InsertEventRequestData: if self.filesAdded is not None: oprot.writeFieldBegin('filesAdded', TType.LIST, 1) oprot.writeListBegin(TType.STRING, len(self.filesAdded)) - for iter498 in self.filesAdded: - oprot.writeString(iter498) + for iter504 in self.filesAdded: + oprot.writeString(iter504) + oprot.writeListEnd() + oprot.writeFieldEnd() + if self.filesAddedChecksum is not None: + oprot.writeFieldBegin('filesAddedChecksum', TType.LIST, 2) + oprot.writeListBegin(TType.STRING, len(self.filesAddedChecksum)) + for iter505 in self.filesAddedChecksum: + oprot.writeString(iter505) oprot.writeListEnd() oprot.writeFieldEnd() oprot.writeFieldStop() @@ -11202,6 +11222,7 @@ class InsertEventRequestData: def __hash__(self): value = 17 value = (value * 31) ^ hash(self.filesAdded) + value = (value * 31) ^ hash(self.filesAddedChecksum) return value def __repr__(self): @@ -11340,10 +11361,10 @@ class FireEventRequest: elif fid == 5: if ftype == TType.LIST: self.partitionVals = [] - (_etype502, _size499) = iprot.readListBegin() - for _i503 in xrange(_size499): - _elem504 = iprot.readString() - self.partitionVals.append(_elem504) + (_etype509, _size506) = iprot.readListBegin() + for _i510 in xrange(_size506): + _elem511 = iprot.readString() + self.partitionVals.append(_elem511) iprot.readListEnd() else: iprot.skip(ftype) @@ -11376,8 +11397,8 @@ class FireEventRequest: if self.partitionVals is not None: oprot.writeFieldBegin('partitionVals', TType.LIST, 5) oprot.writeListBegin(TType.STRING, len(self.partitionVals)) - for iter505 in self.partitionVals: - oprot.writeString(iter505) + for iter512 in self.partitionVals: + oprot.writeString(iter512) oprot.writeListEnd() oprot.writeFieldEnd() oprot.writeFieldStop() @@ -11564,12 +11585,12 @@ class GetFileMetadataByExprResult: if fid == 1: if ftype == TType.MAP: self.metadata = {} - (_ktype507, _vtype508, _size506 ) = iprot.readMapBegin() - for _i510 in xrange(_size506): - _key511 = iprot.readI64() - _val512 = MetadataPpdResult() - _val512.read(iprot) - self.metadata[_key511] = _val512 + (_ktype514, _vtype515, _size513 ) = iprot.readMapBegin() + for _i517 in xrange(_size513): + _key518 = iprot.readI64() + _val519 = MetadataPpdResult() + _val519.read(iprot) + self.metadata[_key518] = _val519 iprot.readMapEnd() else: iprot.skip(ftype) @@ -11591,9 +11612,9 @@ class GetFileMetadataByExprResult: if self.metadata is not None: oprot.writeFieldBegin('metadata', TType.MAP, 1) oprot.writeMapBegin(TType.I64, TType.STRUCT, len(self.metadata)) - for kiter513,viter514 in self.metadata.items(): - oprot.writeI64(kiter513) - viter514.write(oprot) + for kiter520,viter521 in self.metadata.items(): + oprot.writeI64(kiter520) + viter521.write(oprot) oprot.writeMapEnd() oprot.writeFieldEnd() if self.isSupported is not None: @@ -11663,10 +11684,10 @@ class GetFileMetadataByExprRequest: if fid == 1: if ftype == TType.LIST: self.fileIds = [] - (_etype518, _size515) = iprot.readListBegin() - for _i519 in xrange(_size515): - _elem520 = iprot.readI64() - self.fileIds.append(_elem520) + (_etype525, _size522) = iprot.readListBegin() + for _i526 in xrange(_size522): + _elem527 = iprot.readI64() + self.fileIds.append(_elem527) iprot.readListEnd() else: iprot.skip(ftype) @@ -11698,8 +11719,8 @@ class GetFileMetadataByExprRequest: if self.fileIds is not None: oprot.writeFieldBegin('fileIds', TType.LIST, 1) oprot.writeListBegin(TType.I64, len(self.fileIds)) - for iter521 in self.fileIds: - oprot.writeI64(iter521) + for iter528 in self.fileIds: + oprot.writeI64(iter528) oprot.writeListEnd() oprot.writeFieldEnd() if self.expr is not None: @@ -11773,11 +11794,11 @@ class GetFileMetadataResult: if fid == 1: if ftype == TType.MAP: self.metadata = {} - (_ktype523, _vtype524, _size522 ) = iprot.readMapBegin() - for _i526 in xrange(_size522): - _key527 = iprot.readI64() - _val528 = iprot.readString() - self.metadata[_key527] = _val528 + (_ktype530, _vtype531, _size529 ) = iprot.readMapBegin() + for _i533 in xrange(_size529): + _key534 = iprot.readI64() + _val535 = iprot.readString() + self.metadata[_key534] = _val535 iprot.readMapEnd() else: iprot.skip(ftype) @@ -11799,9 +11820,9 @@ class GetFileMetadataResult: if self.metadata is not None: oprot.writeFieldBegin('metadata', TType.MAP, 1) oprot.writeMapBegin(TType.I64, TType.STRING, len(self.metadata)) - for kiter529,viter530 in self.metadata.items(): - oprot.writeI64(kiter529) - oprot.writeString(viter530) + for kiter536,viter537 in self.metadata.items(): + oprot.writeI64(kiter536) + oprot.writeString(viter537) oprot.writeMapEnd() oprot.writeFieldEnd() if self.isSupported is not None: @@ -11862,10 +11883,10 @@ class GetFileMetadataRequest: if fid == 1: if ftype == TType.LIST: self.fileIds = [] - (_etype534, _size531) = iprot.readListBegin() - for _i535 in xrange(_size531): - _elem536 = iprot.readI64() - self.fileIds.append(_elem536) + (_etype541, _size538) = iprot.readListBegin() + for _i542 in xrange(_size538): + _elem543 = iprot.readI64() + self.fileIds.append(_elem543) iprot.readListEnd() else: iprot.skip(ftype) @@ -11882,8 +11903,8 @@ class GetFileMetadataRequest: if self.fileIds is not None: oprot.writeFieldBegin('fileIds', TType.LIST, 1) oprot.writeListBegin(TType.I64, len(self.fileIds)) - for iter537 in self.fileIds: - oprot.writeI64(iter537) + for iter544 in self.fileIds: + oprot.writeI64(iter544) oprot.writeListEnd() oprot.writeFieldEnd() oprot.writeFieldStop() @@ -11989,20 +12010,20 @@ class PutFileMetadataRequest: if fid == 1: if ftype == TType.LIST: self.fileIds = [] - (_etype541, _size538) = iprot.readListBegin() - for _i542 in xrange(_size538): - _elem543 = iprot.readI64() - self.fileIds.append(_elem543) + (_etype548, _size545) = iprot.readListBegin() + for _i549 in xrange(_size545): + _elem550 = iprot.readI64() + self.fileIds.append(_elem550) iprot.readListEnd() else: iprot.skip(ftype) elif fid == 2: if ftype == TType.LIST: self.metadata = [] - (_etype547, _size544) = iprot.readListBegin() - for _i548 in xrange(_size544): - _elem549 = iprot.readString() - self.metadata.append(_elem549) + (_etype554, _size551) = iprot.readListBegin() + for _i555 in xrange(_size551): + _elem556 = iprot.readString() + self.metadata.append(_elem556) iprot.readListEnd() else: iprot.skip(ftype) @@ -12024,15 +12045,15 @@ class PutFileMetadataRequest: if self.fileIds is not None: oprot.writeFieldBegin('fileIds', TType.LIST, 1) oprot.writeListBegin(TType.I64, len(self.fileIds)) - for iter550 in self.fileIds: - oprot.writeI64(iter550) + for iter557 in self.fileIds: + oprot.writeI64(iter557) oprot.writeListEnd() oprot.writeFieldEnd() if self.metadata is not None: oprot.writeFieldBegin('metadata', TType.LIST, 2) oprot.writeListBegin(TType.STRING, len(self.metadata)) - for iter551 in self.metadata: - oprot.writeString(iter551) + for iter558 in self.metadata: + oprot.writeString(iter558) oprot.writeListEnd() oprot.writeFieldEnd() if self.type is not None: @@ -12140,10 +12161,10 @@ class ClearFileMetadataRequest: if fid == 1: if ftype == TType.LIST: self.fileIds = [] - (_etype555, _size552) = iprot.readListBegin() - for _i556 in xrange(_size552): - _elem557 = iprot.readI64() - self.fileIds.append(_elem557) + (_etype562, _size559) = iprot.readListBegin() + for _i563 in xrange(_size559): + _elem564 = iprot.readI64() + self.fileIds.append(_elem564) iprot.readListEnd() else: iprot.skip(ftype) @@ -12160,8 +12181,8 @@ class ClearFileMetadataRequest: if self.fileIds is not None: oprot.writeFieldBegin('fileIds', TType.LIST, 1) oprot.writeListBegin(TType.I64, len(self.fileIds)) - for iter558 in self.fileIds: - oprot.writeI64(iter558) + for iter565 in self.fileIds: + oprot.writeI64(iter565) oprot.writeListEnd() oprot.writeFieldEnd() oprot.writeFieldStop() @@ -12390,11 +12411,11 @@ class GetAllFunctionsResponse: if fid == 1: if ftype == TType.LIST: self.functions = [] - (_etype562, _size559) = iprot.readListBegin() - for _i563 in xrange(_size559): - _elem564 = Function() - _elem564.read(iprot) - self.functions.append(_elem564) + (_etype569, _size566) = iprot.readListBegin() + for _i570 in xrange(_size566): + _elem571 = Function() + _elem571.read(iprot) + self.functions.append(_elem571) iprot.readListEnd() else: iprot.skip(ftype) @@ -12411,8 +12432,8 @@ class GetAllFunctionsResponse: if self.functions is not None: oprot.writeFieldBegin('functions', TType.LIST, 1) oprot.writeListBegin(TType.STRUCT, len(self.functions)) - for iter565 in self.functions: - iter565.write(oprot) + for iter572 in self.functions: + iter572.write(oprot) oprot.writeListEnd() oprot.writeFieldEnd() oprot.writeFieldStop() @@ -12464,10 +12485,10 @@ class ClientCapabilities: if fid == 1: if ftype == TType.LIST: self.values = [] - (_etype569, _size566) = iprot.readListBegin() - for _i570 in xrange(_size566): - _elem571 = iprot.readI32() - self.values.append(_elem571) + (_etype576, _size573) = iprot.readListBegin() + for _i577 in xrange(_size573): + _elem578 = iprot.readI32() + self.values.append(_elem578) iprot.readListEnd() else: iprot.skip(ftype) @@ -12484,8 +12505,8 @@ class ClientCapabilities: if self.values is not None: oprot.writeFieldBegin('values', TType.LIST, 1) oprot.writeListBegin(TType.I32, len(self.values)) - for iter572 in self.values: - oprot.writeI32(iter572) + for iter579 in self.values: + oprot.writeI32(iter579) oprot.writeListEnd() oprot.writeFieldEnd() oprot.writeFieldStop() @@ -12714,10 +12735,10 @@ class GetTablesRequest: elif fid == 2: if ftype == TType.LIST: self.tblNames = [] - (_etype576, _size573) = iprot.readListBegin() - for _i577 in xrange(_size573): - _elem578 = iprot.readString() - self.tblNames.append(_elem578) + (_etype583, _size580) = iprot.readListBegin() + for _i584 in xrange(_size580): + _elem585 = iprot.readString() + self.tblNames.append(_elem585) iprot.readListEnd() else: iprot.skip(ftype) @@ -12744,8 +12765,8 @@ class GetTablesRequest: if self.tblNames is not None: oprot.writeFieldBegin('tblNames', TType.LIST, 2) oprot.writeListBegin(TType.STRING, len(self.tblNames)) - for iter579 in self.tblNames: - oprot.writeString(iter579) + for iter586 in self.tblNames: + oprot.writeString(iter586) oprot.writeListEnd() oprot.writeFieldEnd() if self.capabilities is not None: @@ -12805,11 +12826,11 @@ class GetTablesResult: if fid == 1: if ftype == TType.LIST: self.tables = [] - (_etype583, _size580) = iprot.readListBegin() - for _i584 in xrange(_size580): - _elem585 = Table() - _elem585.read(iprot) - self.tables.append(_elem585) + (_etype590, _size587) = iprot.readListBegin() + for _i591 in xrange(_size587): + _elem592 = Table() + _elem592.read(iprot) + self.tables.append(_elem592) iprot.readListEnd() else: iprot.skip(ftype) @@ -12826,8 +12847,8 @@ class GetTablesResult: if self.tables is not None: oprot.writeFieldBegin('tables', TType.LIST, 1) oprot.writeListBegin(TType.STRUCT, len(self.tables)) - for iter586 in self.tables: - iter586.write(oprot) + for iter593 in self.tables: + iter593.write(oprot) oprot.writeListEnd() oprot.writeFieldEnd() oprot.writeFieldStop() http://git-wip-us.apache.org/repos/asf/hive/blob/bbd99ed6/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb ---------------------------------------------------------------------- diff --git a/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb b/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb index c82edd6..ebed504 100644 --- a/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb +++ b/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb @@ -2502,9 +2502,11 @@ end class InsertEventRequestData include ::Thrift::Struct, ::Thrift::Struct_Union FILESADDED = 1 + FILESADDEDCHECKSUM = 2 FIELDS = { - FILESADDED => {:type => ::Thrift::Types::LIST, :name => 'filesAdded', :element => {:type => ::Thrift::Types::STRING}} + FILESADDED => {:type => ::Thrift::Types::LIST, :name => 'filesAdded', :element => {:type => ::Thrift::Types::STRING}}, + FILESADDEDCHECKSUM => {:type => ::Thrift::Types::LIST, :name => 'filesAddedChecksum', :element => {:type => ::Thrift::Types::STRING, :binary => true}, :optional => true} } def struct_fields; FIELDS; end http://git-wip-us.apache.org/repos/asf/hive/blob/bbd99ed6/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index d0a66b0..2892da3 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -6485,23 +6485,23 @@ public class HiveMetaStore extends ThriftHiveMetastore { @Override public FireEventResponse fire_listener_event(FireEventRequest rqst) throws TException { switch (rqst.getData().getSetField()) { - case INSERT_DATA: - InsertEvent event = new InsertEvent(rqst.getDbName(), rqst.getTableName(), - rqst.getPartitionVals(), rqst.getData().getInsertData().getFilesAdded(), - rqst.isSuccessful(), this); - for (MetaStoreEventListener transactionalListener : transactionalListeners) { - transactionalListener.onInsert(event); - } + case INSERT_DATA: + InsertEvent event = + new InsertEvent(rqst.getDbName(), rqst.getTableName(), rqst.getPartitionVals(), rqst + .getData().getInsertData(), rqst.isSuccessful(), this); + for (MetaStoreEventListener transactionalListener : transactionalListeners) { + transactionalListener.onInsert(event); + } - for (MetaStoreEventListener listener : listeners) { - listener.onInsert(event); - } + for (MetaStoreEventListener listener : listeners) { + listener.onInsert(event); + } - return new FireEventResponse(); + return new FireEventResponse(); - default: - throw new TException("Event type " + rqst.getData().getSetField().toString() + - " not currently supported."); + default: + throw new TException("Event type " + rqst.getData().getSetField().toString() + + " not currently supported."); } } http://git-wip-us.apache.org/repos/asf/hive/blob/bbd99ed6/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java b/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java index 102754e..d9a42a7 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java @@ -18,18 +18,16 @@ package org.apache.hadoop.hive.metastore.events; -import org.apache.hadoop.hive.metastore.api.ClientCapabilities; - import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; - import org.apache.hadoop.hive.metastore.api.GetTableRequest; - import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler; -import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.InsertEventRequestData; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Table; +import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -40,24 +38,26 @@ public class InsertEvent extends ListenerEvent { // we have just the string names, but that's fine for what we need. private final String db; private final String table; - private final Map<String,String> keyValues; + private final Map<String, String> keyValues; private final List<String> files; + private List<ByteBuffer> fileChecksums = new ArrayList<ByteBuffer>(); /** * * @param db name of the database the table is in * @param table name of the table being inserted into * @param partVals list of partition values, can be null + * @param insertData the inserted files & their checksums * @param status status of insert, true = success, false = failure * @param handler handler that is firing the event */ - public InsertEvent(String db, String table, List<String> partVals, List<String> files, - boolean status, HMSHandler handler) throws MetaException, NoSuchObjectException { + public InsertEvent(String db, String table, List<String> partVals, + InsertEventRequestData insertData, boolean status, HMSHandler handler) throws MetaException, + NoSuchObjectException { super(status, handler); this.db = db; this.table = table; - this.files = files; - // TODO: why does this use the handler directly? + this.files = insertData.getFilesAdded(); GetTableRequest req = new GetTableRequest(db, table); req.setCapabilities(HiveMetaStoreClient.TEST_VERSION); Table t = handler.get_table_req(req).getTable(); @@ -67,11 +67,15 @@ public class InsertEvent extends ListenerEvent { keyValues.put(t.getPartitionKeys().get(i).getName(), partVals.get(i)); } } + if (insertData.isSetFilesAddedChecksum()) { + fileChecksums = insertData.getFilesAddedChecksum(); + } } public String getDb() { return db; } + /** * @return The table. */ @@ -82,15 +86,25 @@ public class InsertEvent extends ListenerEvent { /** * @return List of values for the partition keys. */ - public Map<String,String> getPartitionKeyValues() { + public Map<String, String> getPartitionKeyValues() { return keyValues; } /** * Get list of files created as a result of this DML operation + * * @return list of new files */ public List<String> getFiles() { return files; } + + /** + * Get a list of file checksums corresponding to the files created (if available) + * + * @return + */ + public List<ByteBuffer> getFileChecksums() { + return fileChecksums; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/bbd99ed6/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java index adf2fd8..fdb8e80 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.util.ReflectionUtils; +import java.nio.ByteBuffer; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -229,13 +230,28 @@ public abstract class MessageFactory { /** * Factory method for building insert message + * * @param db Name of the database the insert occurred in * @param table Name of the table the insert occurred in - * @param partVals Partition values for the partition that the insert occurred in, may be null - * if the insert was done into a non-partitioned table + * @param partVals Partition values for the partition that the insert occurred in, may be null if + * the insert was done into a non-partitioned table * @param files List of files created as a result of the insert, may be null. * @return instance of InsertMessage */ public abstract InsertMessage buildInsertMessage(String db, String table, - Map<String,String> partVals, List<String> files); + Map<String, String> partVals, List<String> files); + + /** + * Factory method for building insert message + * + * @param db Name of the database the insert occurred in + * @param table Name of the table the insert occurred in + * @param partVals Partition values for the partition that the insert occurred in, may be null if + * the insert was done into a non-partitioned table + * @param files List of files created as a result of the insert, may be null + * @param fileChecksums List of checksums corresponding to the files added during insert + * @return instance of InsertMessage + */ + public abstract InsertMessage buildInsertMessage(String db, String table, + Map<String, String> partVals, List<String> files, List<ByteBuffer> fileChecksums); } http://git-wip-us.apache.org/repos/asf/hive/blob/bbd99ed6/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java index ef89b17..bd9f9ec 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java @@ -22,11 +22,13 @@ package org.apache.hadoop.hive.metastore.messaging.json; import org.apache.hadoop.hive.metastore.messaging.InsertMessage; import org.codehaus.jackson.annotate.JsonProperty; +import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.List; import java.util.Map; /** - * JSON implementation of DropTableMessage. + * JSON implementation of InsertMessage */ public class JSONInsertMessage extends InsertMessage { @@ -40,15 +42,19 @@ public class JSONInsertMessage extends InsertMessage { List<String> files; @JsonProperty - Map<String,String> partKeyVals; + Map<String, String> partKeyVals; + + @JsonProperty + List<byte[]> fileChecksums; /** * Default constructor, needed for Jackson. */ - public JSONInsertMessage() {} + public JSONInsertMessage() { + } public JSONInsertMessage(String server, String servicePrincipal, String db, String table, - Map<String,String> partKeyVals, List<String> files, Long timestamp) { + Map<String, String> partKeyVals, List<String> files, Long timestamp) { this.server = server; this.servicePrincipal = servicePrincipal; this.db = db; @@ -59,15 +65,30 @@ public class JSONInsertMessage extends InsertMessage { checkValid(); } + public JSONInsertMessage(String server, String servicePrincipal, String db, String table, + Map<String, String> partKeyVals, List<String> files, List<ByteBuffer> checksums, + Long timestamp) { + this(server, servicePrincipal, db, table, partKeyVals, files, timestamp); + fileChecksums = new ArrayList<byte[]>(); + for (ByteBuffer checksum : checksums) { + byte[] checksumBytes = new byte[checksum.remaining()]; + checksum.get(checksumBytes); + fileChecksums.add(checksumBytes); + } + } @Override - public String getTable() { return table; } + public String getTable() { + return table; + } @Override - public String getServer() { return server; } + public String getServer() { + return server; + } @Override - public Map<String,String> getPartitionKeyValues() { + public Map<String, String> getPartitionKeyValues() { return partKeyVals; } @@ -77,20 +98,29 @@ public class JSONInsertMessage extends InsertMessage { } @Override - public String getServicePrincipal() { return servicePrincipal; } + public String getServicePrincipal() { + return servicePrincipal; + } @Override - public String getDB() { return db; } + public String getDB() { + return db; + } @Override - public Long getTimestamp() { return timestamp; } + public Long getTimestamp() { + return timestamp; + } + + public List<byte[]> getFileChecksums() { + return fileChecksums; + } @Override public String toString() { try { return JSONMessageDeserializer.mapper.writeValueAsString(this); - } - catch (Exception exception) { + } catch (Exception exception) { throw new IllegalArgumentException("Could not serialize: ", exception); } } http://git-wip-us.apache.org/repos/asf/hive/blob/bbd99ed6/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java index 0407210..c57f577 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java @@ -19,7 +19,9 @@ package org.apache.hadoop.hive.metastore.messaging.json; +import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; @@ -57,6 +59,7 @@ import org.codehaus.jackson.JsonFactory; import org.codehaus.jackson.JsonNode; import org.codehaus.jackson.JsonParser; import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.node.ArrayNode; import org.codehaus.jackson.node.ObjectNode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -168,6 +171,13 @@ public class JSONMessageFactory extends MessageFactory { files, now()); } + @Override + public InsertMessage buildInsertMessage(String db, String table, Map<String, String> partKeyVals, + List<String> files, List<ByteBuffer> fileChecksums) { + return new JSONInsertMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, db, table, partKeyVals, + files, fileChecksums, now()); + } + private long now() { return System.currentTimeMillis() / 1000; } @@ -255,4 +265,30 @@ public class JSONMessageFactory extends MessageFactory { deSerializer.deserialize(indexObj, tableJson, "UTF-8"); return indexObj; } + + /** + * Convert a json ArrayNode to an ordered list of Strings + * + * @param arrayNode: the json array node + * @param elements: the list to populate + * @return + */ + public static List<String> getAsList(ArrayNode arrayNode, List<String> elements) { + Iterator<JsonNode> arrayNodeIterator = arrayNode.iterator(); + while (arrayNodeIterator.hasNext()) { + JsonNode node = arrayNodeIterator.next(); + elements.add(node.asText()); + } + return elements; + } + + public static LinkedHashMap<String, String> getAsMap(ObjectNode objectNode, + LinkedHashMap<String, String> hashMap) { + Iterator<Map.Entry<String, JsonNode>> objectNodeIterator = objectNode.getFields(); + while (objectNodeIterator.hasNext()) { + Map.Entry<String, JsonNode> objectAsMap = objectNodeIterator.next(); + hashMap.put(objectAsMap.getKey(), objectAsMap.getValue().asText()); + } + return hashMap; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/bbd99ed6/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index f62d5f3..be5a6a9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -26,7 +26,8 @@ import static org.apache.hadoop.hive.serde.serdeConstants.LINE_DELIM; import static org.apache.hadoop.hive.serde.serdeConstants.MAPKEY_DELIM; import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_FORMAT; import static org.apache.hadoop.hive.serde.serdeConstants.STRING_TYPE_NAME; - +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.PrintStream; @@ -57,6 +58,7 @@ import javax.jdo.JDODataStoreException; import org.apache.calcite.plan.RelOptMaterialization; import org.apache.commons.io.FilenameUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileChecksum; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -2336,29 +2338,39 @@ private void constructOneLBLocationMap(FileStatus fSta, LOG.debug("Not firing dml insert event as " + tbl.getTableName() + " is temporary"); return; } - FireEventRequestData data = new FireEventRequestData(); - InsertEventRequestData insertData = new InsertEventRequestData(); - data.setInsertData(insertData); - if (newFiles != null && newFiles.size() > 0) { - for (Path p : newFiles) { - insertData.addToFilesAdded(p.toString()); + try { + FileSystem fileSystem = tbl.getDataLocation().getFileSystem(conf); + FireEventRequestData data = new FireEventRequestData(); + InsertEventRequestData insertData = new InsertEventRequestData(); + data.setInsertData(insertData); + if (newFiles != null && newFiles.size() > 0) { + for (Path p : newFiles) { + insertData.addToFilesAdded(p.toString()); + FileChecksum cksum = fileSystem.getFileChecksum(p); + // File checksum is not implemented for local filesystem (RawLocalFileSystem) + if (cksum != null) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + cksum.write(new DataOutputStream(baos)); + insertData.addToFilesAddedChecksum(ByteBuffer.wrap(baos.toByteArray())); + } else { + insertData.addToFilesAddedChecksum(ByteBuffer.allocate(0)); + } + } + } else { + insertData.setFilesAdded(new ArrayList<String>()); } - } else { - insertData.setFilesAdded(new ArrayList<String>()); - } - FireEventRequest rqst = new FireEventRequest(true, data); - rqst.setDbName(tbl.getDbName()); - rqst.setTableName(tbl.getTableName()); - if (partitionSpec != null && partitionSpec.size() > 0) { - List<String> partVals = new ArrayList<String>(partitionSpec.size()); - for (FieldSchema fs : tbl.getPartitionKeys()) { - partVals.add(partitionSpec.get(fs.getName())); + FireEventRequest rqst = new FireEventRequest(true, data); + rqst.setDbName(tbl.getDbName()); + rqst.setTableName(tbl.getTableName()); + if (partitionSpec != null && partitionSpec.size() > 0) { + List<String> partVals = new ArrayList<String>(partitionSpec.size()); + for (FieldSchema fs : tbl.getPartitionKeys()) { + partVals.add(partitionSpec.get(fs.getName())); + } + rqst.setPartitionVals(partVals); } - rqst.setPartitionVals(partVals); - } - try { getMSC().fireListenerEvent(rqst); - } catch (TException e) { + } catch (IOException | TException e) { throw new HiveException(e); } } @@ -3019,7 +3031,7 @@ private void constructOneLBLocationMap(FileStatus fSta, * * I'll leave the below loop for now until a better approach is found. */ - + int counter = 1; if (!isRenameAllowed || isBlobStoragePath) { while (destFs.exists(destFilePath)) { http://git-wip-us.apache.org/repos/asf/hive/blob/bbd99ed6/shims/common/src/main/java/org/apache/hadoop/fs/ProxyLocalFileSystem.java ---------------------------------------------------------------------- diff --git a/shims/common/src/main/java/org/apache/hadoop/fs/ProxyLocalFileSystem.java b/shims/common/src/main/java/org/apache/hadoop/fs/ProxyLocalFileSystem.java index bd97521..197b965 100644 --- a/shims/common/src/main/java/org/apache/hadoop/fs/ProxyLocalFileSystem.java +++ b/shims/common/src/main/java/org/apache/hadoop/fs/ProxyLocalFileSystem.java @@ -18,18 +18,21 @@ package org.apache.hadoop.fs; +import java.io.DataInput; +import java.io.DataOutput; import java.io.IOException; import java.net.URI; - +import java.security.MessageDigest; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.Shell; import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.io.MD5Hash; /**************************************************************** * A Proxy for LocalFileSystem * - * As an example, it serves uri's corresponding to: - * 'pfile:///' namespace with using a LocalFileSystem + * As an example, it serves uri's corresponding to: 'pfile:///' namespace with using a + * LocalFileSystem *****************************************************************/ public class ProxyLocalFileSystem extends FilterFileSystem { @@ -46,7 +49,7 @@ public class ProxyLocalFileSystem extends FilterFileSystem { } public ProxyLocalFileSystem(FileSystem fs) { - throw new RuntimeException ("Unsupported Constructor"); + throw new RuntimeException("Unsupported Constructor"); } @Override @@ -59,17 +62,16 @@ public class ProxyLocalFileSystem extends FilterFileSystem { if (Shell.WINDOWS) { // Replace the encoded backward slash with forward slash // Remove the windows drive letter - nameUriString = nameUriString.replaceAll("%5C", "/") - .replaceFirst("/[c-zC-Z]:", "/") - .replaceFirst("^[c-zC-Z]:", ""); + nameUriString = + nameUriString.replaceAll("%5C", "/").replaceFirst("/[c-zC-Z]:", "/") + .replaceFirst("^[c-zC-Z]:", ""); name = URI.create(nameUriString); } String authority = name.getAuthority() != null ? name.getAuthority() : ""; String proxyUriString = scheme + "://" + authority + "/"; - fs = ShimLoader.getHadoopShims().createProxyFileSystem( - localFs, URI.create(proxyUriString)); + fs = ShimLoader.getHadoopShims().createProxyFileSystem(localFs, URI.create(proxyUriString)); fs.initialize(name, conf); } @@ -78,4 +80,88 @@ public class ProxyLocalFileSystem extends FilterFileSystem { public String getScheme() { return scheme; } + + @Override + // For pfile, calculate the checksum for use in testing + public FileChecksum getFileChecksum(Path f) throws IOException { + if (scheme.equalsIgnoreCase("pfile") && fs.isFile(f)) { + return getPFileChecksum(f); + } + return fs.getFileChecksum(f); + } + + private FileChecksum getPFileChecksum(Path f) throws IOException { + MessageDigest md5Digest; + try { + md5Digest = MessageDigest.getInstance("MD5"); + MD5Hash md5Hash = new MD5Hash(getMD5Checksum(fs.open(f))); + return new PFileChecksum(md5Hash, md5Digest.getAlgorithm()); + } catch (Exception e) { + throw new IOException(e); + } + } + + /** + * Calculate MD5 checksum from data in FSDataInputStream + * @param fsInputStream + * @return byte array with md5 checksum bytes + * @throws Exception + */ + static byte[] getMD5Checksum(FSDataInputStream fsInputStream) throws Exception { + byte[] buffer = new byte[1024]; + MessageDigest md5Digest = MessageDigest.getInstance("MD5"); + int numRead = 0; + while (numRead != -1) { + numRead = fsInputStream.read(buffer); + if (numRead > 0) { + md5Digest.update(buffer, 0, numRead); + } + } + fsInputStream.close(); + return md5Digest.digest(); + } + + /** + * Checksum implementation for PFile uses in testing + */ + public static class PFileChecksum extends FileChecksum { + private MD5Hash md5; + private String algorithmName; + + public PFileChecksum(MD5Hash md5, String algorithmName) { + this.md5 = md5; + this.algorithmName = algorithmName; + } + + @Override + public void write(DataOutput out) throws IOException { + md5.write(out); + } + + @Override + public void readFields(DataInput in) throws IOException { + md5.readFields(in); + } + + @Override + public String getAlgorithmName() { + return algorithmName; + } + + @Override + public int getLength() { + if (md5 != null) { + return md5.getDigest().length; + } + return 0; + } + + @Override + public byte[] getBytes() { + if (md5 != null) { + return md5.getDigest(); + } + return new byte[0]; + } + } }