Repository: hive Updated Branches: refs/heads/master 01e691c5c -> 2f501a8a0
HIVE-15366: REPL LOAD & DUMP support for incremental INSERT events (Vaibhav Gumashta reviewed by Sushanth Sowmyan) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2f501a8a Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2f501a8a Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2f501a8a Branch: refs/heads/master Commit: 2f501a8a024bf25701f97f4621ceda9b080be95d Parents: 01e691c Author: Vaibhav Gumashta <vgumas...@hortonworks.com> Authored: Mon Jan 9 13:05:47 2017 -0800 Committer: Vaibhav Gumashta <vgumas...@hortonworks.com> Committed: Mon Jan 9 13:05:47 2017 -0800 ---------------------------------------------------------------------- .../listener/TestDbNotificationListener.java | 27 ++---- .../hive/ql/TestReplicationScenarios.java | 92 ++++++++++++++++++++ metastore/if/hive_metastore.thrift | 4 +- .../gen/thrift/gen-cpp/hive_metastore_types.cpp | 4 +- .../metastore/api/InsertEventRequestData.java | 40 ++++----- .../gen/thrift/gen-rb/hive_metastore_types.rb | 2 +- .../hive/metastore/events/InsertEvent.java | 5 +- .../hive/metastore/messaging/InsertMessage.java | 4 +- .../metastore/messaging/MessageFactory.java | 3 +- .../messaging/json/JSONInsertMessage.java | 27 +++--- .../messaging/json/JSONMessageFactory.java | 2 +- .../hadoop/hive/ql/exec/ReplCopyTask.java | 14 +-- .../apache/hadoop/hive/ql/metadata/Hive.java | 9 +- .../apache/hadoop/hive/ql/parse/EximUtil.java | 30 +++++-- .../hive/ql/parse/ExportSemanticAnalyzer.java | 4 +- .../hive/ql/parse/ImportSemanticAnalyzer.java | 4 +- .../ql/parse/ReplicationSemanticAnalyzer.java | 47 +++++++++- 17 files changed, 230 insertions(+), 88 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/2f501a8a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java ---------------------------------------------------------------------- diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java index 39356ae..4eabb24 100644 --- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java +++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java @@ -913,7 +913,7 @@ public class TestDbNotificationListener { assertEquals(defaultDbName, event.getDbName()); assertEquals(tblName, event.getTableName()); // Parse the message field - verifyInsertJSON(event, defaultDbName, tblName, false); + verifyInsertJSON(event, defaultDbName, tblName); } @Test @@ -967,7 +967,7 @@ public class TestDbNotificationListener { assertEquals(defaultDbName, event.getDbName()); assertEquals(tblName, event.getTableName()); // Parse the message field - verifyInsertJSON(event, defaultDbName, tblName, false); + verifyInsertJSON(event, defaultDbName, tblName); ObjectNode jsonTree = JSONMessageFactory.getJsonTree(event); LinkedHashMap<String, String> partKeyValsFromNotif = JSONMessageFactory.getAsMap((ObjectNode) jsonTree.get("partKeyVals"), @@ -1057,7 +1057,7 @@ public class TestDbNotificationListener { assertEquals(firstEventId + 3, event.getEventId()); assertEquals(EventType.INSERT.toString(), event.getEventType()); // Parse the message field - verifyInsertJSON(event, defaultDbName, tblName, true); + verifyInsertJSON(event, defaultDbName, tblName); event = rsp.getEvents().get(4); assertEquals(firstEventId + 5, event.getEventId()); @@ -1090,7 +1090,7 @@ public class TestDbNotificationListener { assertEquals(firstEventId + 3, event.getEventId()); assertEquals(EventType.INSERT.toString(), event.getEventType()); // Parse the message field - verifyInsertJSON(event, null, sourceTblName, true); + verifyInsertJSON(event, null, sourceTblName); event = rsp.getEvents().get(4); assertEquals(firstEventId + 5, event.getEventId()); @@ -1165,13 +1165,13 @@ public class TestDbNotificationListener { assertEquals(firstEventId + 4, event.getEventId()); assertEquals(EventType.INSERT.toString(), event.getEventType()); // Parse the message field - verifyInsertJSON(event, null, tblName, true); + verifyInsertJSON(event, null, tblName); event = rsp.getEvents().get(6); assertEquals(firstEventId + 7, event.getEventId()); assertEquals(EventType.INSERT.toString(), event.getEventType()); // Parse the message field - verifyInsertJSON(event, null, tblName, true); + verifyInsertJSON(event, null, tblName); event = rsp.getEvents().get(9); assertEquals(firstEventId + 10, event.getEventId()); @@ -1181,13 +1181,13 @@ public class TestDbNotificationListener { assertEquals(firstEventId + 11, event.getEventId()); assertEquals(EventType.INSERT.toString(), event.getEventType()); // Parse the message field - verifyInsertJSON(event, null, tblName, true); + verifyInsertJSON(event, null, tblName); event = rsp.getEvents().get(13); assertEquals(firstEventId + 14, event.getEventId()); assertEquals(EventType.INSERT.toString(), event.getEventType()); // Parse the message field - verifyInsertJSON(event, null, tblName, true); + verifyInsertJSON(event, null, tblName); event = rsp.getEvents().get(16); assertEquals(firstEventId + 17, event.getEventId()); @@ -1223,8 +1223,7 @@ public class TestDbNotificationListener { assertTrue(event.getMessage().matches(".*\"ds\":\"todaytwo\".*")); } - private void verifyInsertJSON(NotificationEvent event, String dbName, String tblName, - boolean verifyChecksums) throws Exception { + private void verifyInsertJSON(NotificationEvent event, String dbName, String tblName) throws Exception { // Parse the message field ObjectNode jsonTree = JSONMessageFactory.getJsonTree(event); System.out.println("JSONInsertMessage: " + jsonTree.toString()); @@ -1239,14 +1238,6 @@ public class TestDbNotificationListener { List<String> files = JSONMessageFactory.getAsList((ArrayNode) jsonTree.get("files"), new ArrayList<String>()); assertTrue(files.size() > 0); - if (verifyChecksums) { - // Should have list of file checksums - List<String> fileChecksums = - JSONMessageFactory.getAsList((ArrayNode) jsonTree.get("fileChecksums"), - new ArrayList<String>()); - assertTrue(fileChecksums.size() > 0); - - } } @Test http://git-wip-us.apache.org/repos/asf/hive/blob/2f501a8a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java index e29aa22..6b86080 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java @@ -592,6 +592,98 @@ public class TestReplicationScenarios { verifyResults(ptn_data_2); } + @Test + public void testIncrementalInserts() throws IOException { + String testName = "incrementalInserts"; + LOG.info("Testing " + testName); + String dbName = testName + "_" + tid; + + run("CREATE DATABASE " + dbName); + + run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE"); + run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE"); + run("CREATE TABLE " + dbName + ".unptned_empty(a string) STORED AS TEXTFILE"); + run("CREATE TABLE " + dbName + + ".ptned_empty(a string) partitioned by (b int) STORED AS TEXTFILE"); + + advanceDumpDir(); + run("REPL DUMP " + dbName); + String replDumpLocn = getResult(0, 0); + String replDumpId = getResult(0, 1, true); + LOG.info("Dumped to {} with id {}", replDumpLocn, replDumpId); + run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'"); + + String[] unptn_data = new String[] { "eleven", "twelve" }; + String[] ptn_data_1 = new String[] { "thirteen", "fourteen", "fifteen" }; + String[] ptn_data_2 = new String[] { "fifteen", "sixteen", "seventeen" }; + String[] empty = new String[] {}; + + String unptn_locn = new Path(TEST_PATH, testName + "_unptn").toUri().getPath(); + String ptn_locn_1 = new Path(TEST_PATH, testName + "_ptn1").toUri().getPath(); + String ptn_locn_2 = new Path(TEST_PATH, testName + "_ptn2").toUri().getPath(); + + createTestDataFile(unptn_locn, unptn_data); + createTestDataFile(ptn_locn_1, ptn_data_1); + createTestDataFile(ptn_locn_2, ptn_data_2); + + run("SELECT a from " + dbName + ".ptned_empty"); + verifyResults(empty); + run("SELECT * from " + dbName + ".unptned_empty"); + verifyResults(empty); + + run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned"); + run("SELECT * from " + dbName + ".unptned"); + verifyResults(unptn_data); + run("CREATE TABLE " + dbName + ".unptned_late LIKE " + dbName + ".unptned"); + run("INSERT INTO TABLE " + dbName + ".unptned_late SELECT * FROM " + dbName + ".unptned"); + run("SELECT * from " + dbName + ".unptned_late"); + verifyResults(unptn_data); + + advanceDumpDir(); + run("REPL DUMP " + dbName + " FROM " + replDumpId); + String incrementalDumpLocn = getResult(0, 0); + String incrementalDumpId = getResult(0, 1, true); + LOG.info("Dumped to {} with id {}", incrementalDumpLocn, incrementalDumpId); + run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'"); + printOutput(); + run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'"); + run("SELECT * from " + dbName + "_dupe.unptned_late"); + verifyResults(unptn_data); + + run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + + ".ptned PARTITION(b=1)"); + run("SELECT a from " + dbName + ".ptned WHERE b=1"); + verifyResults(ptn_data_1); + run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + + ".ptned PARTITION(b=2)"); + run("SELECT a from " + dbName + ".ptned WHERE b=2"); + verifyResults(ptn_data_2); + + run("CREATE TABLE " + dbName + + ".ptned_late(a string) PARTITIONED BY (b int) STORED AS TEXTFILE"); + run("INSERT INTO TABLE " + dbName + ".ptned_late PARTITION(b=1) SELECT a FROM " + dbName + + ".ptned WHERE b=1"); + run("SELECT a from " + dbName + ".ptned_late WHERE b=1"); + verifyResults(ptn_data_1); + + run("INSERT INTO TABLE " + dbName + ".ptned_late PARTITION(b=2) SELECT a FROM " + dbName + + ".ptned WHERE b=2"); + run("SELECT a from " + dbName + ".ptned_late WHERE b=2"); + verifyResults(ptn_data_2); + + advanceDumpDir(); + run("REPL DUMP " + dbName + " FROM " + replDumpId); + incrementalDumpLocn = getResult(0, 0); + incrementalDumpId = getResult(0, 1, true); + LOG.info("Dumped to {} with id {}", incrementalDumpLocn, incrementalDumpId); + run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'"); + printOutput(); + run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'"); + run("SELECT a from " + dbName + "_dupe.ptned_late WHERE b=1"); + verifyResults(ptn_data_1); + run("SELECT a from " + dbName + "_dupe.ptned_late WHERE b=2"); + verifyResults(ptn_data_2); + } private String getResult(int rowNum, int colNum) throws IOException { return getResult(rowNum,colNum,false); http://git-wip-us.apache.org/repos/asf/hive/blob/2f501a8a/metastore/if/hive_metastore.thrift ---------------------------------------------------------------------- diff --git a/metastore/if/hive_metastore.thrift b/metastore/if/hive_metastore.thrift index 79592ea..bf80455 100755 --- a/metastore/if/hive_metastore.thrift +++ b/metastore/if/hive_metastore.thrift @@ -812,8 +812,8 @@ struct CurrentNotificationEventId { struct InsertEventRequestData { 1: required list<string> filesAdded, - // Checksum of files (UTF8 encoded string) added during this insert event (at the time they were added) - 2: optional list<binary> filesAddedChecksum, + // Checksum of files (hex string of checksum byte payload) + 2: optional list<string> filesAddedChecksum, } union FireEventRequestData { http://git-wip-us.apache.org/repos/asf/hive/blob/2f501a8a/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp ---------------------------------------------------------------------- diff --git a/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp b/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp index 1311b20..d605049 100644 --- a/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp +++ b/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp @@ -16166,7 +16166,7 @@ uint32_t InsertEventRequestData::read(::apache::thrift::protocol::TProtocol* ipr uint32_t _i660; for (_i660 = 0; _i660 < _size656; ++_i660) { - xfer += iprot->readBinary(this->filesAddedChecksum[_i660]); + xfer += iprot->readString(this->filesAddedChecksum[_i660]); } xfer += iprot->readListEnd(); } @@ -16213,7 +16213,7 @@ uint32_t InsertEventRequestData::write(::apache::thrift::protocol::TProtocol* op std::vector<std::string> ::const_iterator _iter662; for (_iter662 = this->filesAddedChecksum.begin(); _iter662 != this->filesAddedChecksum.end(); ++_iter662) { - xfer += oprot->writeBinary((*_iter662)); + xfer += oprot->writeString((*_iter662)); } xfer += oprot->writeListEnd(); } http://git-wip-us.apache.org/repos/asf/hive/blob/2f501a8a/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/InsertEventRequestData.java ---------------------------------------------------------------------- diff --git a/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/InsertEventRequestData.java b/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/InsertEventRequestData.java index 39a607d..fd1dc06 100644 --- a/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/InsertEventRequestData.java +++ b/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/InsertEventRequestData.java @@ -48,7 +48,7 @@ public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEve } private List<String> filesAdded; // required - private List<ByteBuffer> filesAddedChecksum; // optional + private List<String> filesAddedChecksum; // optional /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { @@ -121,7 +121,7 @@ public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEve new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)))); tmpMap.put(_Fields.FILES_ADDED_CHECKSUM, new org.apache.thrift.meta_data.FieldMetaData("filesAddedChecksum", org.apache.thrift.TFieldRequirementType.OPTIONAL, new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST, - new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING , true)))); + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(InsertEventRequestData.class, metaDataMap); } @@ -145,7 +145,7 @@ public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEve this.filesAdded = __this__filesAdded; } if (other.isSetFilesAddedChecksum()) { - List<ByteBuffer> __this__filesAddedChecksum = new ArrayList<ByteBuffer>(other.filesAddedChecksum); + List<String> __this__filesAddedChecksum = new ArrayList<String>(other.filesAddedChecksum); this.filesAddedChecksum = __this__filesAddedChecksum; } } @@ -202,22 +202,22 @@ public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEve return (this.filesAddedChecksum == null) ? 0 : this.filesAddedChecksum.size(); } - public java.util.Iterator<ByteBuffer> getFilesAddedChecksumIterator() { + public java.util.Iterator<String> getFilesAddedChecksumIterator() { return (this.filesAddedChecksum == null) ? null : this.filesAddedChecksum.iterator(); } - public void addToFilesAddedChecksum(ByteBuffer elem) { + public void addToFilesAddedChecksum(String elem) { if (this.filesAddedChecksum == null) { - this.filesAddedChecksum = new ArrayList<ByteBuffer>(); + this.filesAddedChecksum = new ArrayList<String>(); } this.filesAddedChecksum.add(elem); } - public List<ByteBuffer> getFilesAddedChecksum() { + public List<String> getFilesAddedChecksum() { return this.filesAddedChecksum; } - public void setFilesAddedChecksum(List<ByteBuffer> filesAddedChecksum) { + public void setFilesAddedChecksum(List<String> filesAddedChecksum) { this.filesAddedChecksum = filesAddedChecksum; } @@ -250,7 +250,7 @@ public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEve if (value == null) { unsetFilesAddedChecksum(); } else { - setFilesAddedChecksum((List<ByteBuffer>)value); + setFilesAddedChecksum((List<String>)value); } break; @@ -396,7 +396,7 @@ public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEve if (this.filesAddedChecksum == null) { sb.append("null"); } else { - org.apache.thrift.TBaseHelper.toString(this.filesAddedChecksum, sb); + sb.append(this.filesAddedChecksum); } first = false; } @@ -469,11 +469,11 @@ public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEve if (schemeField.type == org.apache.thrift.protocol.TType.LIST) { { org.apache.thrift.protocol.TList _list561 = iprot.readListBegin(); - struct.filesAddedChecksum = new ArrayList<ByteBuffer>(_list561.size); - ByteBuffer _elem562; + struct.filesAddedChecksum = new ArrayList<String>(_list561.size); + String _elem562; for (int _i563 = 0; _i563 < _list561.size; ++_i563) { - _elem562 = iprot.readBinary(); + _elem562 = iprot.readString(); struct.filesAddedChecksum.add(_elem562); } iprot.readListEnd(); @@ -513,9 +513,9 @@ public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEve oprot.writeFieldBegin(FILES_ADDED_CHECKSUM_FIELD_DESC); { oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRING, struct.filesAddedChecksum.size())); - for (ByteBuffer _iter565 : struct.filesAddedChecksum) + for (String _iter565 : struct.filesAddedChecksum) { - oprot.writeBinary(_iter565); + oprot.writeString(_iter565); } oprot.writeListEnd(); } @@ -554,9 +554,9 @@ public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEve if (struct.isSetFilesAddedChecksum()) { { oprot.writeI32(struct.filesAddedChecksum.size()); - for (ByteBuffer _iter567 : struct.filesAddedChecksum) + for (String _iter567 : struct.filesAddedChecksum) { - oprot.writeBinary(_iter567); + oprot.writeString(_iter567); } } } @@ -580,11 +580,11 @@ public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEve if (incoming.get(0)) { { org.apache.thrift.protocol.TList _list571 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRING, iprot.readI32()); - struct.filesAddedChecksum = new ArrayList<ByteBuffer>(_list571.size); - ByteBuffer _elem572; + struct.filesAddedChecksum = new ArrayList<String>(_list571.size); + String _elem572; for (int _i573 = 0; _i573 < _list571.size; ++_i573) { - _elem572 = iprot.readBinary(); + _elem572 = iprot.readString(); struct.filesAddedChecksum.add(_elem572); } } http://git-wip-us.apache.org/repos/asf/hive/blob/2f501a8a/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb ---------------------------------------------------------------------- diff --git a/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb b/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb index ebed504..b6050c6 100644 --- a/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb +++ b/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb @@ -2506,7 +2506,7 @@ class InsertEventRequestData FIELDS = { FILESADDED => {:type => ::Thrift::Types::LIST, :name => 'filesAdded', :element => {:type => ::Thrift::Types::STRING}}, - FILESADDEDCHECKSUM => {:type => ::Thrift::Types::LIST, :name => 'filesAddedChecksum', :element => {:type => ::Thrift::Types::STRING, :binary => true}, :optional => true} + FILESADDEDCHECKSUM => {:type => ::Thrift::Types::LIST, :name => 'filesAddedChecksum', :element => {:type => ::Thrift::Types::STRING}, :optional => true} } def struct_fields; FIELDS; end http://git-wip-us.apache.org/repos/asf/hive/blob/2f501a8a/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java b/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java index d9a42a7..7bc0e04 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java @@ -26,7 +26,6 @@ import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Table; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; @@ -40,7 +39,7 @@ public class InsertEvent extends ListenerEvent { private final String table; private final Map<String, String> keyValues; private final List<String> files; - private List<ByteBuffer> fileChecksums = new ArrayList<ByteBuffer>(); + private List<String> fileChecksums = new ArrayList<String>(); /** * @@ -104,7 +103,7 @@ public class InsertEvent extends ListenerEvent { * * @return */ - public List<ByteBuffer> getFileChecksums() { + public List<String> getFileChecksums() { return fileChecksums; } } http://git-wip-us.apache.org/repos/asf/hive/blob/2f501a8a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java index fe747df..7e6e34e 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java @@ -45,7 +45,9 @@ public abstract class InsertMessage extends EventMessage { public abstract Map<String,String> getPartitionKeyValues(); /** - * Get the list of files created as a result of this DML operation. May be null. + * Get the list of files created as a result of this DML operation. May be null. The file uri may + * be an encoded uri, which represents both a uri and the file checksum + * * @return List of new files, or null. */ public abstract List<String> getFiles(); http://git-wip-us.apache.org/repos/asf/hive/blob/2f501a8a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java index fdb8e80..df25f43 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java @@ -28,7 +28,6 @@ import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.util.ReflectionUtils; -import java.nio.ByteBuffer; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -253,5 +252,5 @@ public abstract class MessageFactory { * @return instance of InsertMessage */ public abstract InsertMessage buildInsertMessage(String db, String table, - Map<String, String> partVals, List<String> files, List<ByteBuffer> fileChecksums); + Map<String, String> partVals, List<String> files, List<String> fileChecksums); } http://git-wip-us.apache.org/repos/asf/hive/blob/2f501a8a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java index bd9f9ec..820cc9c 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java @@ -22,8 +22,6 @@ package org.apache.hadoop.hive.metastore.messaging.json; import org.apache.hadoop.hive.metastore.messaging.InsertMessage; import org.codehaus.jackson.annotate.JsonProperty; -import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -44,9 +42,6 @@ public class JSONInsertMessage extends InsertMessage { @JsonProperty Map<String, String> partKeyVals; - @JsonProperty - List<byte[]> fileChecksums; - /** * Default constructor, needed for Jackson. */ @@ -66,17 +61,21 @@ public class JSONInsertMessage extends InsertMessage { } public JSONInsertMessage(String server, String servicePrincipal, String db, String table, - Map<String, String> partKeyVals, List<String> files, List<ByteBuffer> checksums, - Long timestamp) { + Map<String, String> partKeyVals, List<String> files, List<String> checksums, Long timestamp) { this(server, servicePrincipal, db, table, partKeyVals, files, timestamp); - fileChecksums = new ArrayList<byte[]>(); - for (ByteBuffer checksum : checksums) { - byte[] checksumBytes = new byte[checksum.remaining()]; - checksum.get(checksumBytes); - fileChecksums.add(checksumBytes); + for (int i = 0; i < files.size(); i++) { + if ((!checksums.isEmpty()) && (checksums.get(i) != null) && !checksums.get(i).isEmpty()) { + files.set(i, encodeFileUri(files.get(i), checksums.get(i))); + } } } + // TODO: this needs to be enhanced once change management based filesystem is implemented + // Currently using fileuri#checksum as the format + private String encodeFileUri(String fileUriStr, String fileChecksum) { + return fileUriStr + "#" + fileChecksum; + } + @Override public String getTable() { return table; @@ -112,10 +111,6 @@ public class JSONInsertMessage extends InsertMessage { return timestamp; } - public List<byte[]> getFileChecksums() { - return fileChecksums; - } - @Override public String toString() { try { http://git-wip-us.apache.org/repos/asf/hive/blob/2f501a8a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java index 9954902..2749371 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java @@ -176,7 +176,7 @@ public class JSONMessageFactory extends MessageFactory { @Override public InsertMessage buildInsertMessage(String db, String table, Map<String, String> partKeyVals, - List<String> files, List<ByteBuffer> fileChecksums) { + List<String> files, List<String> fileChecksums) { return new JSONInsertMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, db, table, partKeyVals, files, fileChecksums, now()); } http://git-wip-us.apache.org/repos/asf/hive/blob/2f501a8a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java index 4c0f817..e6b943b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java @@ -18,7 +18,7 @@ package org.apache.hadoop.hive.ql.exec; -import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.plan.CopyWork; import org.apache.hadoop.hive.ql.plan.ReplCopyWork; @@ -27,7 +27,6 @@ import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.IOException; import java.io.InputStreamReader; -import java.io.OutputStream; import java.io.OutputStreamWriter; import java.io.Serializable; import java.util.ArrayList; @@ -113,7 +112,7 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable { BufferedWriter listBW = null; if (rwork.getListFilesOnOutputBehaviour()){ - Path listPath = new Path(toPath,"_files"); + Path listPath = new Path(toPath,EximUtil.FILES_NAME); LOG.debug("ReplCopyTask : generating _files at :" + listPath.toUri().toString()); if (dstFs.exists(listPath)){ console.printError("Cannot make target _files file:" + listPath.toString()); @@ -169,7 +168,7 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable { private List<FileStatus> filesInFileListing(FileSystem fs, Path path) throws IOException { - Path fileListing = new Path(path, "_files"); + Path fileListing = new Path(path, EximUtil.FILES_NAME); LOG.debug("ReplCopyTask filesInFileListing() reading " + fileListing.toUri()); if (! fs.exists(fileListing)){ LOG.debug("ReplCopyTask : _files does not exist"); @@ -184,8 +183,11 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable { String line = null; while ( (line = br.readLine()) != null){ LOG.debug("ReplCopyTask :_filesReadLine:" + line); - Path p = new Path(line); - FileSystem srcFs = p.getFileSystem(conf); // TODO : again, fs cache should make this okay, but if not, revisit + String fileUriStr = EximUtil.getCMDecodedFileName(line); + // TODO HIVE-15490: Add checksum validation here + Path p = new Path(fileUriStr); + // TODO: again, fs cache should make this okay, but if not, revisit + FileSystem srcFs = p.getFileSystem(conf); ret.add(srcFs.getFileStatus(p)); // Note - we need srcFs rather than fs, because it is possible that the _files lists files // which are from a different filesystem than the fs where the _files file itself was loaded http://git-wip-us.apache.org/repos/asf/hive/blob/2f501a8a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index be5a6a9..c5b3517 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -2349,11 +2349,12 @@ private void constructOneLBLocationMap(FileStatus fSta, FileChecksum cksum = fileSystem.getFileChecksum(p); // File checksum is not implemented for local filesystem (RawLocalFileSystem) if (cksum != null) { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - cksum.write(new DataOutputStream(baos)); - insertData.addToFilesAddedChecksum(ByteBuffer.wrap(baos.toByteArray())); + String checksumString = + StringUtils.byteToHexString(cksum.getBytes(), 0, cksum.getLength()); + insertData.addToFilesAddedChecksum(checksumString); } else { - insertData.addToFilesAddedChecksum(ByteBuffer.allocate(0)); + // Add an empty checksum string for filesystems that don't generate one + insertData.addToFilesAddedChecksum(""); } } } else { http://git-wip-us.apache.org/repos/asf/hive/blob/2f501a8a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java index 6e9602f..34e53d2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java @@ -19,9 +19,9 @@ package org.apache.hadoop.hive.ql.parse; import com.google.common.base.Function; + import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.exec.Task; @@ -31,6 +31,7 @@ import org.apache.hadoop.hive.ql.metadata.Hive; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileChecksum; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; @@ -50,6 +51,7 @@ import org.json.JSONException; import org.json.JSONObject; import javax.annotation.Nullable; + import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; @@ -73,7 +75,10 @@ import java.util.TreeMap; */ public class EximUtil { - public static final String METADATA_NAME="_metadata"; + public static final String METADATA_NAME = "_metadata"; + public static final String FILES_NAME = "_files"; + public static final String DATA_PATH_NAME = "data"; + public static final String URI_FRAGMENT_SEPARATOR = "#"; private static final Logger LOG = LoggerFactory.getLogger(EximUtil.class); @@ -278,6 +283,7 @@ public class EximUtil { if (replicationSpec == null){ replicationSpec = new ReplicationSpec(); // instantiate default values if not specified } + if (tableHandle == null){ replicationSpec.setNoop(true); } @@ -351,10 +357,6 @@ public class EximUtil { jgen.close(); // JsonGenerator owns the OutputStream, so it closes it when we call close. } - private static void write(OutputStream out, String s) throws IOException { - out.write(s.getBytes("UTF-8")); - } - /** * Utility class to help return complex value from readMetaData function */ @@ -571,4 +573,20 @@ public class EximUtil { } }; } + + public static String getCMEncodedFileName(String fileURIStr, String fileChecksum) { + // The checksum is set as the fragment portion of the file uri + return fileURIStr + URI_FRAGMENT_SEPARATOR + fileChecksum; + } + + public static String getCMDecodedFileName(String encodedFileURIStr) { + String[] uriAndFragment = encodedFileURIStr.split(URI_FRAGMENT_SEPARATOR); + return uriAndFragment[0]; + } + + public static FileChecksum getCMDecodedChecksum(String encodedFileURIStr) { + // TODO: Implement this as part of HIVE-15490 + return null; + } + } http://git-wip-us.apache.org/repos/asf/hive/blob/2f501a8a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java index f61274b..08bad63 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java @@ -170,7 +170,7 @@ public class ExportSemanticAnalyzer extends BaseSemanticAnalyzer { partitions = null; } - Path path = new Path(ctx.getLocalTmpPath(), "_metadata"); + Path path = new Path(ctx.getLocalTmpPath(), EximUtil.METADATA_NAME); EximUtil.createExportDump( FileSystem.getLocal(conf), path, @@ -202,7 +202,7 @@ public class ExportSemanticAnalyzer extends BaseSemanticAnalyzer { } } else { Path fromPath = ts.tableHandle.getDataLocation(); - Path toDataPath = new Path(parentPath, "data"); + Path toDataPath = new Path(parentPath, EximUtil.DATA_PATH_NAME); Task<? extends Serializable> rTask = ReplCopyTask.getDumpCopyTask(replicationSpec, fromPath, toDataPath, conf); rootTasks.add(rTask); http://git-wip-us.apache.org/repos/asf/hive/blob/2f501a8a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index 5561e06..8c5cac2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -344,7 +344,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { private static Task<?> loadTable(URI fromURI, Table table, boolean replace, Path tgtPath, ReplicationSpec replicationSpec, EximUtil.SemanticAnalyzerWrapperContext x) { - Path dataPath = new Path(fromURI.toString(), "data"); + Path dataPath = new Path(fromURI.toString(), EximUtil.DATA_PATH_NAME); Path tmpPath = x.getCtx().getExternalTmpPath(tgtPath); Task<?> copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, tmpPath, x.getConf()); LoadTableDesc loadTableWork = new LoadTableDesc(tmpPath, @@ -777,7 +777,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { x.getLOG().debug("adding dependent CopyWork/MoveWork for table"); if (tblDesc.isExternal() && (tblDesc.getLocation() == null)) { x.getLOG().debug("Importing in place, no emptiness check, no copying/loading"); - Path dataPath = new Path(fromURI.toString(), "data"); + Path dataPath = new Path(fromURI.toString(), EximUtil.DATA_PATH_NAME); tblDesc.setLocation(dataPath.toString()); } else { Path tablePath = null; http://git-wip-us.apache.org/repos/asf/hive/blob/2f501a8a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 9b83407..85f8c64 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.parse; import com.google.common.base.Function; import com.google.common.collect.Iterables; import com.google.common.primitives.Ints; + import org.antlr.runtime.tree.Tree; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -36,6 +37,7 @@ import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage; import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage; import org.apache.hadoop.hive.metastore.messaging.DropTableMessage; import org.apache.hadoop.hive.metastore.messaging.EventUtils; +import org.apache.hadoop.hive.metastore.messaging.InsertMessage; import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer; import org.apache.hadoop.hive.metastore.messaging.MessageFactory; import org.apache.hadoop.hive.ql.ErrorMsg; @@ -61,13 +63,15 @@ import org.apache.hadoop.hive.ql.plan.RenamePartitionDesc; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.io.IOUtils; - import javax.annotation.Nullable; + import java.io.BufferedReader; +import java.io.BufferedWriter; import java.io.DataOutputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStreamReader; +import java.io.OutputStreamWriter; import java.io.Serializable; import java.net.URI; import java.util.ArrayList; @@ -108,6 +112,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { EVENT_RENAME_TABLE("EVENT_RENAME_TABLE"), EVENT_ALTER_PARTITION("EVENT_ALTER_PARTITION"), EVENT_RENAME_PARTITION("EVENT_RENAME_PARTITION"), + EVENT_INSERT("EVENT_INSERT"), EVENT_UNKNOWN("EVENT_UNKNOWN"); String type = null; @@ -559,7 +564,39 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { dmd.write(); break; } - + } + case MessageFactory.INSERT_EVENT: { + InsertMessage insertMsg = md.getInsertMessage(ev.getMessage()); + String tblName = insertMsg.getTable(); + Table qlMdTable = db.getTable(tblName); + Map<String, String> partSpec = insertMsg.getPartitionKeyValues(); + List<Partition> qlPtns = null; + if (qlMdTable.isPartitioned() && !partSpec.isEmpty()) { + qlPtns = Arrays.asList(db.getPartition(qlMdTable, partSpec, false)); + } + Path metaDataPath = new Path(evRoot, EximUtil.METADATA_NAME); + EximUtil.createExportDump(metaDataPath.getFileSystem(conf), metaDataPath, qlMdTable, qlPtns, + replicationSpec); + Path dataPath = new Path(evRoot, EximUtil.DATA_PATH_NAME); + Path filesPath = new Path(dataPath, EximUtil.FILES_NAME); + FileSystem fs = dataPath.getFileSystem(conf); + BufferedWriter fileListWriter = + new BufferedWriter(new OutputStreamWriter(fs.create(filesPath))); + try { + // TODO: HIVE-15205: move this metadata generation to a task + // Get the encoded filename of files that are being inserted + List<String> files = insertMsg.getFiles(); + for (String fileUriStr : files) { + fileListWriter.write(fileUriStr + "\n"); + } + } finally { + fileListWriter.close(); + } + LOG.info("Processing#{} INSERT message : {}", ev.getEventId(), ev.getMessage()); + DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_INSERT, evid, evid); + dmd.setPayload(ev.getMessage()); + dmd.write(); + break; } // TODO : handle other event types default: @@ -957,6 +994,12 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { LOG.debug("Added rename ptn task : {}:{}->{}", renamePtnTask.getId(), oldPartSpec, newPartSpec); return tasks; } + case EVENT_INSERT: { + md = MessageFactory.getInstance().getDeserializer(); + InsertMessage insertMessage = md.getInsertMessage(dmd.getPayload()); + // Piggybacking in Import logic for now + return analyzeTableLoad(insertMessage.getDB(), insertMessage.getTable(), locn, precursor); + } case EVENT_UNKNOWN: { break; }