This is an automated email from the ASF dual-hosted git repository. anishek pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new f86ca3e HIVE-22865: Include data in replication staging directory ( Pravin Kumar Sinha, reviewed by Aasha Medhi, Anishek Agarwal) f86ca3e is described below commit f86ca3e446ba2670173b1b50211d6cce976baf2a Author: Anishek Agarwal <anis...@gmail.com> AuthorDate: Wed Mar 11 10:21:10 2020 +0530 HIVE-22865: Include data in replication staging directory ( Pravin Kumar Sinha, reviewed by Aasha Medhi, Anishek Agarwal) --- data/files/repl_dump/{ => hive}/_dumpmetadata | 0 .../repl_dump/{ => hive}/test_hcube_2/_metadata | 0 .../{ => hive}/test_hcube_2/tbl/_metadata | 0 .../delta_0000001_0000001_0000/_orc_acid_version | 1 + .../data/delta_0000001_0000001_0000/bucket_00000 | Bin 0 -> 611 bytes .../{ => hive}/test_hcube_2/tbl1/_metadata | 0 .../delta_0000001_0000001_0000/_orc_acid_version | 1 + .../data/delta_0000001_0000001_0000/bucket_00000 | Bin 0 -> 611 bytes .../delta_0000002_0000002_0000/_orc_acid_version | 1 + .../data/delta_0000002_0000002_0000/bucket_00000 | Bin 0 -> 620 bytes .../delta_0000003_0000003_0000/_orc_acid_version | 1 + .../data/delta_0000003_0000003_0000/bucket_00000 | Bin 0 -> 621 bytes .../{ => hive}/test_hcube_2/tbl2/_metadata | 0 .../delta_0000001_0000001_0000/_orc_acid_version | 1 + .../data/delta_0000001_0000001_0000/bucket_00000 | Bin 0 -> 611 bytes .../delta_0000002_0000002_0000/_orc_acid_version | 1 + .../data/delta_0000002_0000002_0000/bucket_00000 | Bin 0 -> 620 bytes .../{ => hive}/test_hcube_2/tbl3/_metadata | 0 .../delta_0000001_0000001_0000/_orc_acid_version | 1 + .../data/delta_0000001_0000001_0000/bucket_00000 | Bin 0 -> 611 bytes .../{ => hive}/test_hcube_2/tbl4/_metadata | 0 .../delta_0000001_0000001_0000/_orc_acid_version | 1 + .../data/delta_0000001_0000001_0000/bucket_00000 | Bin 0 -> 611 bytes .../{ => hive}/test_hcube_2/tbl5/_metadata | 0 .../delta_0000001_0000001_0000/_orc_acid_version | 1 + .../data/delta_0000001_0000001_0000/bucket_00000 | Bin 0 -> 611 bytes .../{ => hive}/test_hcube_2/tbl6/_metadata | 0 .../delta_0000001_0000001_0000/_orc_acid_version | 1 + .../data/delta_0000001_0000001_0000/bucket_00000 | Bin 0 -> 611 bytes .../delta_0000001_0000001_0000/_orc_acid_version | 1 + .../data/delta_0000001_0000001_0000/bucket_00000 | Bin 0 -> 611 bytes data/files/repl_dump/test_hcube_2/tbl/data/_files | 1 - data/files/repl_dump/test_hcube_2/tbl1/data/_files | 3 - data/files/repl_dump/test_hcube_2/tbl2/data/_files | 2 - data/files/repl_dump/test_hcube_2/tbl3/data/_files | 1 - data/files/repl_dump/test_hcube_2/tbl4/data/_files | 1 - data/files/repl_dump/test_hcube_2/tbl5/data/_files | 1 - .../repl_dump/test_hcube_2/tbl6/fld1=1/_files | 1 - .../repl_dump/test_hcube_2/tbl6/fld1=2/_files | 1 - .../hive/ql/parse/TestReplicationScenarios.java | 143 +++++++++++++++------ ...estReplicationScenariosAcidTablesBootstrap.java | 4 +- .../TestReplicationScenariosAcrossInstances.java | 12 +- .../TestReplicationScenariosExternalTables.java | 57 ++++---- ...icationScenariosExternalTablesMetaDataOnly.java | 19 ++- .../parse/TestReplicationWithTableMigration.java | 4 +- .../parse/TestTableLevelReplicationScenarios.java | 22 ++-- .../org/apache/hadoop/hive/ql/exec/ExportTask.java | 7 +- .../apache/hadoop/hive/ql/exec/ReplCopyTask.java | 26 ++-- .../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 41 ++++-- .../hadoop/hive/ql/exec/repl/ReplLoadWork.java | 5 +- .../repl/bootstrap/load/table/LoadPartitions.java | 5 +- .../exec/repl/bootstrap/load/table/LoadTable.java | 2 +- .../hadoop/hive/ql/exec/repl/util/ReplUtils.java | 10 +- .../org/apache/hadoop/hive/ql/parse/EximUtil.java | 34 +++++ .../hive/ql/parse/ImportSemanticAnalyzer.java | 6 +- .../hive/ql/parse/ReplicationSemanticAnalyzer.java | 11 +- .../hadoop/hive/ql/parse/ReplicationSpec.java | 24 +--- .../hadoop/hive/ql/parse/repl/CopyUtils.java | 52 ++++---- .../dump/BootStrapReplicationSpecFunction.java | 1 - .../hive/ql/parse/repl/dump/PartitionExport.java | 16 ++- .../hive/ql/parse/repl/dump/TableExport.java | 23 ++-- .../repl/dump/events/AbstractEventHandler.java | 56 ++++++++ .../repl/dump/events/AddPartitionHandler.java | 3 +- .../parse/repl/dump/events/CommitTxnHandler.java | 20 ++- .../parse/repl/dump/events/CreateTableHandler.java | 2 +- .../ql/parse/repl/dump/events/EventHandler.java | 8 +- .../ql/parse/repl/dump/events/InsertHandler.java | 2 +- .../hive/ql/parse/repl/dump/io/FileOperations.java | 127 ++++-------------- .../hadoop/hive/ql/exec/repl/TestReplDumpTask.java | 4 +- 69 files changed, 460 insertions(+), 308 deletions(-) diff --git a/data/files/repl_dump/_dumpmetadata b/data/files/repl_dump/hive/_dumpmetadata similarity index 100% rename from data/files/repl_dump/_dumpmetadata rename to data/files/repl_dump/hive/_dumpmetadata diff --git a/data/files/repl_dump/test_hcube_2/_metadata b/data/files/repl_dump/hive/test_hcube_2/_metadata similarity index 100% rename from data/files/repl_dump/test_hcube_2/_metadata rename to data/files/repl_dump/hive/test_hcube_2/_metadata diff --git a/data/files/repl_dump/test_hcube_2/tbl/_metadata b/data/files/repl_dump/hive/test_hcube_2/tbl/_metadata similarity index 100% rename from data/files/repl_dump/test_hcube_2/tbl/_metadata rename to data/files/repl_dump/hive/test_hcube_2/tbl/_metadata diff --git a/data/files/repl_dump/hive/test_hcube_2/tbl/data/delta_0000001_0000001_0000/_orc_acid_version b/data/files/repl_dump/hive/test_hcube_2/tbl/data/delta_0000001_0000001_0000/_orc_acid_version new file mode 100644 index 0000000..d8263ee --- /dev/null +++ b/data/files/repl_dump/hive/test_hcube_2/tbl/data/delta_0000001_0000001_0000/_orc_acid_version @@ -0,0 +1 @@ +2 \ No newline at end of file diff --git a/data/files/repl_dump/hive/test_hcube_2/tbl/data/delta_0000001_0000001_0000/bucket_00000 b/data/files/repl_dump/hive/test_hcube_2/tbl/data/delta_0000001_0000001_0000/bucket_00000 new file mode 100644 index 0000000..826b8c7 Binary files /dev/null and b/data/files/repl_dump/hive/test_hcube_2/tbl/data/delta_0000001_0000001_0000/bucket_00000 differ diff --git a/data/files/repl_dump/test_hcube_2/tbl1/_metadata b/data/files/repl_dump/hive/test_hcube_2/tbl1/_metadata similarity index 100% rename from data/files/repl_dump/test_hcube_2/tbl1/_metadata rename to data/files/repl_dump/hive/test_hcube_2/tbl1/_metadata diff --git a/data/files/repl_dump/hive/test_hcube_2/tbl1/data/delta_0000001_0000001_0000/_orc_acid_version b/data/files/repl_dump/hive/test_hcube_2/tbl1/data/delta_0000001_0000001_0000/_orc_acid_version new file mode 100644 index 0000000..d8263ee --- /dev/null +++ b/data/files/repl_dump/hive/test_hcube_2/tbl1/data/delta_0000001_0000001_0000/_orc_acid_version @@ -0,0 +1 @@ +2 \ No newline at end of file diff --git a/data/files/repl_dump/hive/test_hcube_2/tbl1/data/delta_0000001_0000001_0000/bucket_00000 b/data/files/repl_dump/hive/test_hcube_2/tbl1/data/delta_0000001_0000001_0000/bucket_00000 new file mode 100644 index 0000000..826b8c7 Binary files /dev/null and b/data/files/repl_dump/hive/test_hcube_2/tbl1/data/delta_0000001_0000001_0000/bucket_00000 differ diff --git a/data/files/repl_dump/hive/test_hcube_2/tbl1/data/delta_0000002_0000002_0000/_orc_acid_version b/data/files/repl_dump/hive/test_hcube_2/tbl1/data/delta_0000002_0000002_0000/_orc_acid_version new file mode 100644 index 0000000..d8263ee --- /dev/null +++ b/data/files/repl_dump/hive/test_hcube_2/tbl1/data/delta_0000002_0000002_0000/_orc_acid_version @@ -0,0 +1 @@ +2 \ No newline at end of file diff --git a/data/files/repl_dump/hive/test_hcube_2/tbl1/data/delta_0000002_0000002_0000/bucket_00000 b/data/files/repl_dump/hive/test_hcube_2/tbl1/data/delta_0000002_0000002_0000/bucket_00000 new file mode 100644 index 0000000..f1dbd1a Binary files /dev/null and b/data/files/repl_dump/hive/test_hcube_2/tbl1/data/delta_0000002_0000002_0000/bucket_00000 differ diff --git a/data/files/repl_dump/hive/test_hcube_2/tbl1/data/delta_0000003_0000003_0000/_orc_acid_version b/data/files/repl_dump/hive/test_hcube_2/tbl1/data/delta_0000003_0000003_0000/_orc_acid_version new file mode 100644 index 0000000..d8263ee --- /dev/null +++ b/data/files/repl_dump/hive/test_hcube_2/tbl1/data/delta_0000003_0000003_0000/_orc_acid_version @@ -0,0 +1 @@ +2 \ No newline at end of file diff --git a/data/files/repl_dump/hive/test_hcube_2/tbl1/data/delta_0000003_0000003_0000/bucket_00000 b/data/files/repl_dump/hive/test_hcube_2/tbl1/data/delta_0000003_0000003_0000/bucket_00000 new file mode 100644 index 0000000..87f5ed0 Binary files /dev/null and b/data/files/repl_dump/hive/test_hcube_2/tbl1/data/delta_0000003_0000003_0000/bucket_00000 differ diff --git a/data/files/repl_dump/test_hcube_2/tbl2/_metadata b/data/files/repl_dump/hive/test_hcube_2/tbl2/_metadata similarity index 100% rename from data/files/repl_dump/test_hcube_2/tbl2/_metadata rename to data/files/repl_dump/hive/test_hcube_2/tbl2/_metadata diff --git a/data/files/repl_dump/hive/test_hcube_2/tbl2/data/delta_0000001_0000001_0000/_orc_acid_version b/data/files/repl_dump/hive/test_hcube_2/tbl2/data/delta_0000001_0000001_0000/_orc_acid_version new file mode 100644 index 0000000..d8263ee --- /dev/null +++ b/data/files/repl_dump/hive/test_hcube_2/tbl2/data/delta_0000001_0000001_0000/_orc_acid_version @@ -0,0 +1 @@ +2 \ No newline at end of file diff --git a/data/files/repl_dump/hive/test_hcube_2/tbl2/data/delta_0000001_0000001_0000/bucket_00000 b/data/files/repl_dump/hive/test_hcube_2/tbl2/data/delta_0000001_0000001_0000/bucket_00000 new file mode 100644 index 0000000..826b8c7 Binary files /dev/null and b/data/files/repl_dump/hive/test_hcube_2/tbl2/data/delta_0000001_0000001_0000/bucket_00000 differ diff --git a/data/files/repl_dump/hive/test_hcube_2/tbl2/data/delta_0000002_0000002_0000/_orc_acid_version b/data/files/repl_dump/hive/test_hcube_2/tbl2/data/delta_0000002_0000002_0000/_orc_acid_version new file mode 100644 index 0000000..d8263ee --- /dev/null +++ b/data/files/repl_dump/hive/test_hcube_2/tbl2/data/delta_0000002_0000002_0000/_orc_acid_version @@ -0,0 +1 @@ +2 \ No newline at end of file diff --git a/data/files/repl_dump/hive/test_hcube_2/tbl2/data/delta_0000002_0000002_0000/bucket_00000 b/data/files/repl_dump/hive/test_hcube_2/tbl2/data/delta_0000002_0000002_0000/bucket_00000 new file mode 100644 index 0000000..f1dbd1a Binary files /dev/null and b/data/files/repl_dump/hive/test_hcube_2/tbl2/data/delta_0000002_0000002_0000/bucket_00000 differ diff --git a/data/files/repl_dump/test_hcube_2/tbl3/_metadata b/data/files/repl_dump/hive/test_hcube_2/tbl3/_metadata similarity index 100% rename from data/files/repl_dump/test_hcube_2/tbl3/_metadata rename to data/files/repl_dump/hive/test_hcube_2/tbl3/_metadata diff --git a/data/files/repl_dump/hive/test_hcube_2/tbl3/data/delta_0000001_0000001_0000/_orc_acid_version b/data/files/repl_dump/hive/test_hcube_2/tbl3/data/delta_0000001_0000001_0000/_orc_acid_version new file mode 100644 index 0000000..d8263ee --- /dev/null +++ b/data/files/repl_dump/hive/test_hcube_2/tbl3/data/delta_0000001_0000001_0000/_orc_acid_version @@ -0,0 +1 @@ +2 \ No newline at end of file diff --git a/data/files/repl_dump/hive/test_hcube_2/tbl3/data/delta_0000001_0000001_0000/bucket_00000 b/data/files/repl_dump/hive/test_hcube_2/tbl3/data/delta_0000001_0000001_0000/bucket_00000 new file mode 100644 index 0000000..826b8c7 Binary files /dev/null and b/data/files/repl_dump/hive/test_hcube_2/tbl3/data/delta_0000001_0000001_0000/bucket_00000 differ diff --git a/data/files/repl_dump/test_hcube_2/tbl4/_metadata b/data/files/repl_dump/hive/test_hcube_2/tbl4/_metadata similarity index 100% rename from data/files/repl_dump/test_hcube_2/tbl4/_metadata rename to data/files/repl_dump/hive/test_hcube_2/tbl4/_metadata diff --git a/data/files/repl_dump/hive/test_hcube_2/tbl4/data/delta_0000001_0000001_0000/_orc_acid_version b/data/files/repl_dump/hive/test_hcube_2/tbl4/data/delta_0000001_0000001_0000/_orc_acid_version new file mode 100644 index 0000000..d8263ee --- /dev/null +++ b/data/files/repl_dump/hive/test_hcube_2/tbl4/data/delta_0000001_0000001_0000/_orc_acid_version @@ -0,0 +1 @@ +2 \ No newline at end of file diff --git a/data/files/repl_dump/hive/test_hcube_2/tbl4/data/delta_0000001_0000001_0000/bucket_00000 b/data/files/repl_dump/hive/test_hcube_2/tbl4/data/delta_0000001_0000001_0000/bucket_00000 new file mode 100644 index 0000000..826b8c7 Binary files /dev/null and b/data/files/repl_dump/hive/test_hcube_2/tbl4/data/delta_0000001_0000001_0000/bucket_00000 differ diff --git a/data/files/repl_dump/test_hcube_2/tbl5/_metadata b/data/files/repl_dump/hive/test_hcube_2/tbl5/_metadata similarity index 100% rename from data/files/repl_dump/test_hcube_2/tbl5/_metadata rename to data/files/repl_dump/hive/test_hcube_2/tbl5/_metadata diff --git a/data/files/repl_dump/hive/test_hcube_2/tbl5/data/delta_0000001_0000001_0000/_orc_acid_version b/data/files/repl_dump/hive/test_hcube_2/tbl5/data/delta_0000001_0000001_0000/_orc_acid_version new file mode 100644 index 0000000..d8263ee --- /dev/null +++ b/data/files/repl_dump/hive/test_hcube_2/tbl5/data/delta_0000001_0000001_0000/_orc_acid_version @@ -0,0 +1 @@ +2 \ No newline at end of file diff --git a/data/files/repl_dump/hive/test_hcube_2/tbl5/data/delta_0000001_0000001_0000/bucket_00000 b/data/files/repl_dump/hive/test_hcube_2/tbl5/data/delta_0000001_0000001_0000/bucket_00000 new file mode 100644 index 0000000..826b8c7 Binary files /dev/null and b/data/files/repl_dump/hive/test_hcube_2/tbl5/data/delta_0000001_0000001_0000/bucket_00000 differ diff --git a/data/files/repl_dump/test_hcube_2/tbl6/_metadata b/data/files/repl_dump/hive/test_hcube_2/tbl6/_metadata similarity index 100% rename from data/files/repl_dump/test_hcube_2/tbl6/_metadata rename to data/files/repl_dump/hive/test_hcube_2/tbl6/_metadata diff --git a/data/files/repl_dump/hive/test_hcube_2/tbl6/fld1=1/data/delta_0000001_0000001_0000/_orc_acid_version b/data/files/repl_dump/hive/test_hcube_2/tbl6/fld1=1/data/delta_0000001_0000001_0000/_orc_acid_version new file mode 100644 index 0000000..d8263ee --- /dev/null +++ b/data/files/repl_dump/hive/test_hcube_2/tbl6/fld1=1/data/delta_0000001_0000001_0000/_orc_acid_version @@ -0,0 +1 @@ +2 \ No newline at end of file diff --git a/data/files/repl_dump/hive/test_hcube_2/tbl6/fld1=1/data/delta_0000001_0000001_0000/bucket_00000 b/data/files/repl_dump/hive/test_hcube_2/tbl6/fld1=1/data/delta_0000001_0000001_0000/bucket_00000 new file mode 100644 index 0000000..826b8c7 Binary files /dev/null and b/data/files/repl_dump/hive/test_hcube_2/tbl6/fld1=1/data/delta_0000001_0000001_0000/bucket_00000 differ diff --git a/data/files/repl_dump/hive/test_hcube_2/tbl6/fld1=2/data/delta_0000001_0000001_0000/_orc_acid_version b/data/files/repl_dump/hive/test_hcube_2/tbl6/fld1=2/data/delta_0000001_0000001_0000/_orc_acid_version new file mode 100644 index 0000000..d8263ee --- /dev/null +++ b/data/files/repl_dump/hive/test_hcube_2/tbl6/fld1=2/data/delta_0000001_0000001_0000/_orc_acid_version @@ -0,0 +1 @@ +2 \ No newline at end of file diff --git a/data/files/repl_dump/hive/test_hcube_2/tbl6/fld1=2/data/delta_0000001_0000001_0000/bucket_00000 b/data/files/repl_dump/hive/test_hcube_2/tbl6/fld1=2/data/delta_0000001_0000001_0000/bucket_00000 new file mode 100644 index 0000000..826b8c7 Binary files /dev/null and b/data/files/repl_dump/hive/test_hcube_2/tbl6/fld1=2/data/delta_0000001_0000001_0000/bucket_00000 differ diff --git a/data/files/repl_dump/test_hcube_2/tbl/data/_files b/data/files/repl_dump/test_hcube_2/tbl/data/_files deleted file mode 100644 index 2c94c7f..0000000 --- a/data/files/repl_dump/test_hcube_2/tbl/data/_files +++ /dev/null @@ -1 +0,0 @@ -../../data/files/test_hcube_2.db/tbl/delta_0000001_0000001_0000/bucket_00000###delta_0000001_0000001_0000 diff --git a/data/files/repl_dump/test_hcube_2/tbl1/data/_files b/data/files/repl_dump/test_hcube_2/tbl1/data/_files deleted file mode 100644 index 1d2d92d..0000000 --- a/data/files/repl_dump/test_hcube_2/tbl1/data/_files +++ /dev/null @@ -1,3 +0,0 @@ -../../data/files/test_hcube_2.db/tbl1/delta_0000001_0000001_0000/bucket_00000###delta_0000001_0000001_0000 -../../data/files/test_hcube_2.db/tbl1/delta_0000002_0000002_0000/bucket_00000###delta_0000002_0000002_0000 -../../data/files/test_hcube_2.db/tbl1/delta_0000003_0000003_0000/bucket_00000###delta_0000003_0000003_0000 diff --git a/data/files/repl_dump/test_hcube_2/tbl2/data/_files b/data/files/repl_dump/test_hcube_2/tbl2/data/_files deleted file mode 100644 index 954b8fe..0000000 --- a/data/files/repl_dump/test_hcube_2/tbl2/data/_files +++ /dev/null @@ -1,2 +0,0 @@ -../../data/files/test_hcube_2.db/tbl2/delta_0000001_0000001_0000/bucket_00000###delta_0000001_0000001_0000 -../../data/files/test_hcube_2.db/tbl2/delta_0000002_0000002_0000/bucket_00000###delta_0000002_0000002_0000 diff --git a/data/files/repl_dump/test_hcube_2/tbl3/data/_files b/data/files/repl_dump/test_hcube_2/tbl3/data/_files deleted file mode 100644 index 662fcd8..0000000 --- a/data/files/repl_dump/test_hcube_2/tbl3/data/_files +++ /dev/null @@ -1 +0,0 @@ -../../data/files/test_hcube_2.db/tbl3/delta_0000001_0000001_0000/bucket_00000###delta_0000001_0000001_0000 diff --git a/data/files/repl_dump/test_hcube_2/tbl4/data/_files b/data/files/repl_dump/test_hcube_2/tbl4/data/_files deleted file mode 100644 index 19e578e..0000000 --- a/data/files/repl_dump/test_hcube_2/tbl4/data/_files +++ /dev/null @@ -1 +0,0 @@ -../../data/files/test_hcube_2.db/tbl4/delta_0000001_0000001_0000/bucket_00000###delta_0000001_0000001_0000 diff --git a/data/files/repl_dump/test_hcube_2/tbl5/data/_files b/data/files/repl_dump/test_hcube_2/tbl5/data/_files deleted file mode 100644 index 7b3b655..0000000 --- a/data/files/repl_dump/test_hcube_2/tbl5/data/_files +++ /dev/null @@ -1 +0,0 @@ -../../data/files/test_hcube_2.db/tbl5/delta_0000001_0000001_0000/bucket_00000###delta_0000001_0000001_0000 diff --git a/data/files/repl_dump/test_hcube_2/tbl6/fld1=1/_files b/data/files/repl_dump/test_hcube_2/tbl6/fld1=1/_files deleted file mode 100644 index 7101dcf..0000000 --- a/data/files/repl_dump/test_hcube_2/tbl6/fld1=1/_files +++ /dev/null @@ -1 +0,0 @@ -../../data/files/test_hcube_2.db/tbl6/delta_0000001_0000001_0000/bucket_00000###delta_0000001_0000001_0000 diff --git a/data/files/repl_dump/test_hcube_2/tbl6/fld1=2/_files b/data/files/repl_dump/test_hcube_2/tbl6/fld1=2/_files deleted file mode 100644 index 7101dcf..0000000 --- a/data/files/repl_dump/test_hcube_2/tbl6/fld1=2/_files +++ /dev/null @@ -1 +0,0 @@ -../../data/files/test_hcube_2.db/tbl6/delta_0000001_0000001_0000/bucket_00000###delta_0000001_0000001_0000 diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index b709ce7..c46103a 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -66,6 +66,7 @@ import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork; import org.apache.hadoop.hive.ql.exec.repl.ReplLoadWork; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.parse.repl.load.EventDumpDirComparator; import org.apache.hadoop.hive.ql.processors.CommandProcessorException; @@ -368,7 +369,8 @@ public class TestReplicationScenarios { private Task getReplLoadRootTask(String replicadb, boolean isIncrementalDump, Tuple tuple) throws Throwable { HiveConf confTemp = new HiveConf(); confTemp.set("hive.repl.enable.move.optimization", "true"); - ReplLoadWork replLoadWork = new ReplLoadWork(confTemp, tuple.dumpLocation, replicadb, + Path loadPath = new Path(tuple.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + ReplLoadWork replLoadWork = new ReplLoadWork(confTemp, loadPath.toString(), replicadb, null, null, isIncrementalDump, Long.valueOf(tuple.lastReplId), Collections.emptyList()); Task replLoadTask = TaskFactory.get(replLoadWork, confTemp); @@ -1335,6 +1337,70 @@ public class TestReplicationScenarios { } @Test + public void testBootstrapWithDataInDumpDir() throws IOException { + String nameOfTest = "testBootstrapWithDataInDumpDir"; + String dbName = createDB(nameOfTest, driver); + String replDbName = dbName + "_dupe"; + run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE", driver); + run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE", driver); + String[] unptnData1 = new String[] {"eleven", "twelve"}; + String[] unptnData2 = new String[] {"thirteen", "fourteen", "fifteen"}; + String[] unptnAllData = new String[] {"eleven", "twelve", "thirteen", "fourteen", "fifteen"}; + String[] ptnData1 = new String[] {"one", "two", "three"}; + String[] ptnData2 = new String[] {"four", "five"}; + String[] empty = new String[] {}; + String unptnedFileName1 = nameOfTest + "_unptn_1"; + String unptnedFileName2 = nameOfTest + "_unptn_2"; + String ptnedFileName1 = nameOfTest + "_ptn_1"; + String ptnedFileName2 = nameOfTest + "_ptn_2"; + + String unptnLocn1= new Path(TEST_PATH, unptnedFileName1).toUri().getPath(); + String unptnLocn2 = new Path(TEST_PATH, unptnedFileName2).toUri().getPath(); + String ptnLocn1 = new Path(TEST_PATH, ptnedFileName1).toUri().getPath(); + String ptnLocn2 = new Path(TEST_PATH, ptnedFileName2).toUri().getPath(); + createTestDataFile(unptnLocn1, unptnData1); + createTestDataFile(unptnLocn2, unptnData2); + createTestDataFile(ptnLocn1, ptnData1); + createTestDataFile(ptnLocn2, ptnData2); + verifySetup("SELECT * from " + dbName + ".unptned", empty, driverMirror); + verifySetup("SELECT * from " + dbName + ".ptned", empty, driverMirror); + run("LOAD DATA LOCAL INPATH '" + unptnLocn1 + "' OVERWRITE INTO TABLE " + dbName + ".unptned", driver); + run("LOAD DATA LOCAL INPATH '" + unptnLocn2 + "' INTO TABLE " + dbName + ".unptned", driver); + run("LOAD DATA LOCAL INPATH '" + ptnLocn1 + "' OVERWRITE INTO TABLE " + dbName + + ".ptned PARTITION(b=1)", driver); + run("LOAD DATA LOCAL INPATH '" + ptnLocn2 + "' INTO TABLE " + dbName + + ".ptned PARTITION(b=2)", driver); + Tuple dump = replDumpDb(dbName); + Path path = new Path(System.getProperty("test.warehouse.dir", "")); + String tableRelativeSrcPath = dbName.toLowerCase()+".db" + File.separator + "unptned"; + Path srcFileLocation = new Path(path, tableRelativeSrcPath + File.separator + unptnedFileName1); + String tgtFileRelativePath = ReplUtils.REPL_HIVE_BASE_DIR + File.separator + dbName.toLowerCase() + File.separator + + "unptned" + File.separator + EximUtil.DATA_PATH_NAME +File.separator + unptnedFileName1; + Path tgtFileLocation = new Path(dump.dumpLocation, tgtFileRelativePath); + //A file in table at src location should be copied to $dumplocation/hive/<db>/<table>/data/<unptned_fileName> + verifyChecksum(srcFileLocation, tgtFileLocation, true); + srcFileLocation = new Path(path, tableRelativeSrcPath + File.separator + unptnedFileName2); + verifyChecksum(srcFileLocation, tgtFileLocation, false); + + String partitionRelativeSrcPath = dbName.toLowerCase()+".db" + File.separator + "ptned" + File.separator + "b=1"; + srcFileLocation = new Path(path, partitionRelativeSrcPath + File.separator + ptnedFileName1); + tgtFileRelativePath = ReplUtils.REPL_HIVE_BASE_DIR + File.separator + dbName.toLowerCase() + + File.separator + "ptned" + File.separator + "b=1" + File.separator + + EximUtil.DATA_PATH_NAME +File.separator + ptnedFileName1; + tgtFileLocation = new Path(dump.dumpLocation, tgtFileRelativePath); + //A partitioned file in table at src location should be copied to + // $dumplocation/hive/<db>/<table>/<partition>/data/<unptned_fileName> + verifyChecksum(srcFileLocation, tgtFileLocation, true); + partitionRelativeSrcPath = dbName.toLowerCase()+".db" + File.separator + "ptned" + File.separator + "b=2"; + srcFileLocation = new Path(path, partitionRelativeSrcPath + File.separator + ptnedFileName2); + loadAndVerify(replDbName, dbName, dump.lastReplId); + verifyChecksum(srcFileLocation, tgtFileLocation, false); + verifySetup("SELECT * from " + replDbName + ".unptned", unptnAllData, driver); + verifyRun("SELECT a from " + replDbName + ".ptned WHERE b=1", ptnData1, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned WHERE b=2", ptnData2, driverMirror); + } + + @Test public void testIncrementalLoad() throws IOException { String testName = "incrementalLoad"; String dbName = createDB(testName, driver); @@ -1348,55 +1414,55 @@ public class TestReplicationScenarios { Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName); - String[] unptn_data = new String[] { "eleven", "twelve" }; - String[] ptn_data_1 = new String[] { "thirteen", "fourteen", "fifteen" }; - String[] ptn_data_2 = new String[] { "fifteen", "sixteen", "seventeen" }; + String[] unptnData = new String[] {"eleven", "twelve"}; + String[] ptnData1 = new String[] {"thirteen", "fourteen", "fifteen"}; + String[] ptnData2 = new String[] {"fifteen", "sixteen", "seventeen"}; String[] empty = new String[] {}; - String unptn_locn = new Path(TEST_PATH, testName + "_unptn").toUri().getPath(); - String ptn_locn_1 = new Path(TEST_PATH, testName + "_ptn1").toUri().getPath(); - String ptn_locn_2 = new Path(TEST_PATH, testName + "_ptn2").toUri().getPath(); + String unptnLocn = new Path(TEST_PATH, testName + "_unptn").toUri().getPath(); + String ptnLocn1 = new Path(TEST_PATH, testName + "_ptn1").toUri().getPath(); + String ptnLocn2 = new Path(TEST_PATH, testName + "_ptn2").toUri().getPath(); - createTestDataFile(unptn_locn, unptn_data); - createTestDataFile(ptn_locn_1, ptn_data_1); - createTestDataFile(ptn_locn_2, ptn_data_2); + createTestDataFile(unptnLocn, unptnData); + createTestDataFile(ptnLocn1, ptnData1); + createTestDataFile(ptnLocn2, ptnData2); verifySetup("SELECT a from " + dbName + ".ptned_empty", empty, driverMirror); verifySetup("SELECT * from " + dbName + ".unptned_empty", empty, driverMirror); - run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned", driver); - verifySetup("SELECT * from " + dbName + ".unptned", unptn_data, driver); + run("LOAD DATA LOCAL INPATH '" + unptnLocn + "' OVERWRITE INTO TABLE " + dbName + ".unptned", driver); + verifySetup("SELECT * from " + dbName + ".unptned", unptnData, driver); run("CREATE TABLE " + dbName + ".unptned_late LIKE " + dbName + ".unptned", driver); run("INSERT INTO TABLE " + dbName + ".unptned_late SELECT * FROM " + dbName + ".unptned", driver); - verifySetup("SELECT * from " + dbName + ".unptned_late", unptn_data, driver); + verifySetup("SELECT * from " + dbName + ".unptned_late", unptnData, driver); Tuple incrementalDump = incrementalLoadAndVerify(dbName, replDbName); - verifyRun("SELECT * from " + replDbName + ".unptned_late", unptn_data, driverMirror); + verifyRun("SELECT * from " + replDbName + ".unptned_late", unptnData, driverMirror); run("ALTER TABLE " + dbName + ".ptned ADD PARTITION (b=1)", driver); - run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + run("LOAD DATA LOCAL INPATH '" + ptnLocn1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=1)", driver); - verifySetup("SELECT a from " + dbName + ".ptned WHERE b=1", ptn_data_1, driver); - run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + verifySetup("SELECT a from " + dbName + ".ptned WHERE b=1", ptnData1, driver); + run("LOAD DATA LOCAL INPATH '" + ptnLocn2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=2)", driver); - verifySetup("SELECT a from " + dbName + ".ptned WHERE b=2", ptn_data_2, driver); + verifySetup("SELECT a from " + dbName + ".ptned WHERE b=2", ptnData2, driver); run("CREATE TABLE " + dbName + ".ptned_late(a string) PARTITIONED BY (b int) STORED AS TEXTFILE", driver); run("INSERT INTO TABLE " + dbName + ".ptned_late PARTITION(b=1) SELECT a FROM " + dbName + ".ptned WHERE b=1", driver); - verifySetup("SELECT a from " + dbName + ".ptned_late WHERE b=1", ptn_data_1, driver); + verifySetup("SELECT a from " + dbName + ".ptned_late WHERE b=1", ptnData1, driver); run("INSERT INTO TABLE " + dbName + ".ptned_late PARTITION(b=2) SELECT a FROM " + dbName + ".ptned WHERE b=2", driver); - verifySetup("SELECT a from " + dbName + ".ptned_late WHERE b=2", ptn_data_2, driver); + verifySetup("SELECT a from " + dbName + ".ptned_late WHERE b=2", ptnData2, driver); incrementalLoadAndVerify(dbName, replDbName); - verifyRun("SELECT a from " + replDbName + ".ptned_late WHERE b=1", ptn_data_1, driverMirror); - verifyRun("SELECT a from " + replDbName + ".ptned_late WHERE b=2", ptn_data_2, driverMirror); - verifyRun("SELECT a from " + replDbName + ".ptned WHERE b=1", ptn_data_1, driverMirror); - verifyRun("SELECT a from " + replDbName + ".ptned WHERE b=2", ptn_data_2, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned_late WHERE b=1", ptnData1, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned_late WHERE b=2", ptnData2, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned WHERE b=1", ptnData1, driverMirror); + verifyRun("SELECT a from " + replDbName + ".ptned WHERE b=2", ptnData2, driverMirror); } @Test @@ -2652,16 +2718,13 @@ public class TestReplicationScenarios { FileSystem dataFs = ptnLoc.getFileSystem(hconf); assert(dataFs.rename(ptnLoc, tmpLoc)); - // Replicate all the events happened so far. It should fail as the data files missing in + // Replicate all the events happened so far. It should fail during dump as the data files missing in // original path and not available in CM as well. - Tuple incrDump = replDumpDb(dbName); - verifyFail("REPL LOAD " + dbName + " INTO " + replDbName, driverMirror); - - verifyRun("SELECT a from " + replDbName + ".ptned where (b=1) ORDER BY a", empty, driverMirror); - verifyFail("SELECT a from " + replDbName + ".ptned_tmp where (b=1) ORDER BY a", driverMirror); + verifyFail("REPL DUMP " + dbName, driverMirror); // Move the files back to original data location assert(dataFs.rename(tmpLoc, ptnLoc)); + Tuple incrDump = replDumpDb(dbName); loadAndVerify(replDbName, dbName, incrDump.lastReplId); verifyRun("SELECT a from " + replDbName + ".ptned where (b=1) ORDER BY a", ptn_data_1, driverMirror); @@ -3121,7 +3184,7 @@ public class TestReplicationScenarios { } @Test - public void testLoadCmPathMissing() throws IOException { + public void testLoadCmPathMissing() throws Exception { String dbName = createDB(testName.getMethodName(), driver); run("CREATE TABLE " + dbName + ".normal(a int)", driver); run("INSERT INTO " + dbName + ".normal values (1)", driver); @@ -3139,15 +3202,8 @@ public class TestReplicationScenarios { long fileCount = cs.getFileCount(); assertTrue(fileCount != 0); fs.delete(path); - - try { - driverMirror.run("REPL LOAD " + dbName + " INTO " + dbName); - assert false; - } catch (CommandProcessorException e) { - assertTrue(e.getResponseCode() == ErrorMsg.REPL_FILE_MISSING_FROM_SRC_AND_CM_PATH.getErrorCode()); - } + driverMirror.run("REPL LOAD " + dbName + " INTO " + dbName); run("drop database " + dbName, true, driver); - fs.create(path, false); } @Test @@ -3434,6 +3490,15 @@ public class TestReplicationScenarios { } } + private void verifyChecksum(Path sourceFilePath, Path targetFilePath, boolean shouldMatch) throws IOException { + FileSystem srcFS = sourceFilePath.getFileSystem(hconf); + FileSystem tgtFS = targetFilePath.getFileSystem(hconf); + if (shouldMatch) { + assertTrue(srcFS.getFileChecksum(sourceFilePath).equals(tgtFS.getFileChecksum(targetFilePath))); + } else { + assertFalse(srcFS.getFileChecksum(sourceFilePath).equals(tgtFS.getFileChecksum(targetFilePath))); + } + } private List<String> getOutput(IDriver myDriver) throws IOException { List<String> results = new ArrayList<>(); myDriver.getResults(results); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTablesBootstrap.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTablesBootstrap.java index 36841ba..a683fd4 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTablesBootstrap.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTablesBootstrap.java @@ -34,6 +34,7 @@ import org.junit.Test; import org.junit.BeforeClass; import javax.annotation.Nullable; +import java.io.File; import java.util.Arrays; import java.util.HashMap; import java.util.LinkedList; @@ -80,7 +81,8 @@ public class TestReplicationScenariosAcidTablesBootstrap replica.load(replicatedDbName, primaryDbName); verifyIncLoad(replicatedDbName, incrementalDump.lastReplicationId); // Ckpt should be set on bootstrapped tables. - replica.verifyIfCkptSetForTables(replicatedDbName, acidTableNames, incrementalDump.dumpLocation); + String hiveDumpLocation = incrementalDump.dumpLocation + File.separator + ReplUtils.REPL_HIVE_BASE_DIR; + replica.verifyIfCkptSetForTables(replicatedDbName, acidTableNames, hiveDumpLocation); // Take a second normal incremental dump after Acid table boostrap prepareInc2AcidData(primaryDbName, primary.hiveConf); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index ff1de9e..c5532cf 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -43,6 +43,7 @@ import org.junit.BeforeClass; import org.junit.Test; import javax.annotation.Nullable; +import java.io.File; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; @@ -834,7 +835,8 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro private void verifyIfCkptSet(Map<String, String> props, String dumpDir) { assertTrue(props.containsKey(ReplUtils.REPL_CHECKPOINT_KEY)); - assertTrue(props.get(ReplUtils.REPL_CHECKPOINT_KEY).equals(dumpDir)); + String hiveDumpDir = dumpDir + File.separator + ReplUtils.REPL_HIVE_BASE_DIR; + assertTrue(props.get(ReplUtils.REPL_CHECKPOINT_KEY).equals(hiveDumpDir)); } private void verifyIfCkptPropMissing(Map<String, String> props) { @@ -928,10 +930,11 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro .run("insert into table4 values (1,2)") .dump(primaryDbName, Collections.emptyList()); - Path path = new Path(incremental.dumpLocation); + String hiveDumpDir = incremental.dumpLocation + File.separator + ReplUtils.REPL_HIVE_BASE_DIR; + Path path = new Path(hiveDumpDir); FileSystem fs = path.getFileSystem(conf); FileStatus[] fileStatus = fs.listStatus(path); - int numEvents = fileStatus.length - 1; //one is metadata file + int numEvents = fileStatus.length - 2; //one is metadata file and one data dir replica.load(replicatedDbName, primaryDbName, Collections.singletonList("'hive.repl.approx.max.load.tasks'='1'")) @@ -1095,7 +1098,8 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro .verifyResults(Collections.singletonList("10")) .run("select country from t2 order by country") .verifyResults(Arrays.asList("india", "uk", "us")); - replica.verifyIfCkptSet(replicatedDbName, tuple.dumpLocation); + String hiveDumpLocation = tuple.dumpLocation + File.separator + ReplUtils.REPL_HIVE_BASE_DIR; + replica.verifyIfCkptSet(replicatedDbName, hiveDumpLocation); // Retry with same dump with which it was already loaded also fails. replica.loadFailure(replicatedDbName, primaryDbName); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java index df304c2..81feaf5 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder; import org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; @@ -39,6 +40,7 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -106,7 +108,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros // the _external_tables_file info only should be created if external tables are to be replicated not otherwise assertFalse(primary.miniDFSCluster.getFileSystem() - .exists(new Path(new Path(tuple.dumpLocation, primaryDbName.toLowerCase()), FILE_NAME))); + .exists(new Path(tuple.dumpLocation, relativeExtInfoPath(primaryDbName)))); replica.load(replicatedDbName, primaryDbName, loadWithClause) .run("repl status " + replicatedDbName) @@ -126,7 +128,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros // the _external_tables_file info only should be created if external tables are to be replicated not otherwise assertFalse(primary.miniDFSCluster.getFileSystem() - .exists(new Path(tuple.dumpLocation, FILE_NAME))); + .exists(new Path(tuple.dumpLocation, relativeExtInfoPath(null)))); replica.load(replicatedDbName, primaryDbName, loadWithClause) .run("use " + replicatedDbName) @@ -150,8 +152,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros .dumpWithCommand("repl dump " + primaryDbName); // verify that the external table info is written correctly for bootstrap - assertExternalFileInfo(Arrays.asList("t1", "t2"), - new Path(new Path(tuple.dumpLocation, primaryDbName.toLowerCase()), FILE_NAME)); + assertExternalFileInfo(Arrays.asList("t1", "t2"), tuple.dumpLocation, primaryDbName); List<String> withClauseOptions = externalTableBasePathWithClause(); @@ -168,8 +169,9 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros .run("select country from t2 where country = 'france'") .verifyResult("france"); + String hiveDumpLocation = tuple.dumpLocation + File.separator + ReplUtils.REPL_HIVE_BASE_DIR; // Ckpt should be set on bootstrapped db. - replica.verifyIfCkptSet(replicatedDbName, tuple.dumpLocation); + replica.verifyIfCkptSet(replicatedDbName, hiveDumpLocation); assertTablePartitionLocation(primaryDbName + ".t1", replicatedDbName + ".t1"); assertTablePartitionLocation(primaryDbName + ".t2", replicatedDbName + ".t2"); @@ -181,8 +183,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros .dumpWithCommand("repl dump " + primaryDbName); // verify that the external table info is written correctly for incremental - assertExternalFileInfo(Arrays.asList("t1", "t2", "t3", "t4"), - new Path(tuple.dumpLocation, FILE_NAME)); + assertExternalFileInfo(Arrays.asList("t1", "t2", "t3", "t4"), tuple.dumpLocation); replica.load(replicatedDbName, primaryDbName, withClauseOptions) .run("use " + replicatedDbName) @@ -200,8 +201,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros .dumpWithCommand("repl dump " + primaryDbName); // verify that the external table info is written correctly for incremental - assertExternalFileInfo(Arrays.asList("t2", "t3", "t4"), - new Path(tuple.dumpLocation, FILE_NAME)); + assertExternalFileInfo(Arrays.asList("t2", "t3", "t4"), tuple.dumpLocation); } /** @@ -310,8 +310,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros .run("insert into t2 partition(country='india') values ('bangalore')") .dumpWithCommand("repl dump " + primaryDbName); - assertExternalFileInfo(Collections.singletonList("t2"), - new Path(new Path(tuple.dumpLocation, primaryDbName.toLowerCase()), FILE_NAME)); + assertExternalFileInfo(Collections.singletonList("t2"), tuple.dumpLocation, primaryDbName); replica.load(replicatedDbName, primaryDbName, loadWithClause) .run("use " + replicatedDbName) @@ -334,8 +333,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros .run("insert into t2 partition(country='australia') values ('sydney')") .dump(primaryDbName); - assertExternalFileInfo(Collections.singletonList("t2"), - new Path(tuple.dumpLocation, FILE_NAME)); + assertExternalFileInfo(Collections.singletonList("t2"), tuple.dumpLocation); replica.load(replicatedDbName, primaryDbName, loadWithClause) .run("use " + replicatedDbName) @@ -418,7 +416,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros .run("alter table t1 add partition(country='us')") .dump(primaryDbName); - assertExternalFileInfo(Collections.singletonList("t1"), new Path(tuple.dumpLocation, FILE_NAME)); + assertExternalFileInfo(Collections.singletonList("t1"), tuple.dumpLocation); // Add new data externally, to a partition, but under the partition level top directory // Also, it is added after dumping the events but data should be seen at target after REPL LOAD. @@ -452,7 +450,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros // Repl load with zero events but external tables location info should present. tuple = primary.dump(primaryDbName); - assertExternalFileInfo(Collections.singletonList("t1"), new Path(tuple.dumpLocation, FILE_NAME)); + assertExternalFileInfo(Collections.singletonList("t1"), tuple.dumpLocation); replica.load(replicatedDbName, primaryDbName, loadWithClause) .run("use " + replicatedDbName) @@ -505,7 +503,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros // the _external_tables_file info only should be created if external tables are to be replicated not otherwise assertFalse(primary.miniDFSCluster.getFileSystem() - .exists(new Path(new Path(tuple.dumpLocation, primaryDbName.toLowerCase()), FILE_NAME))); + .exists(new Path(tuple.dumpLocation, relativeExtInfoPath(primaryDbName)))); replica.load(replicatedDbName, primaryDbName, loadWithClause) .status(replicatedDbName) @@ -529,14 +527,14 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros // the _external_tables_file info should be created as external tables are to be replicated. assertTrue(primary.miniDFSCluster.getFileSystem() - .exists(new Path(tuple.dumpLocation, FILE_NAME))); + .exists(new Path(tuple.dumpLocation, relativeExtInfoPath(null)))); // verify that the external table info is written correctly for incremental - assertExternalFileInfo(Arrays.asList("t2", "t3"), - new Path(tuple.dumpLocation, FILE_NAME)); + assertExternalFileInfo(Arrays.asList("t2", "t3"), tuple.dumpLocation); // _bootstrap directory should be created as bootstrap enabled on external tables. - Path dumpPath = new Path(tuple.dumpLocation, INC_BOOTSTRAP_ROOT_DIR_NAME); + String hiveDumpLocation = tuple.dumpLocation + File.separator + ReplUtils.REPL_HIVE_BASE_DIR; + Path dumpPath = new Path(hiveDumpLocation, INC_BOOTSTRAP_ROOT_DIR_NAME); assertTrue(primary.miniDFSCluster.getFileSystem().exists(dumpPath)); // _bootstrap/<db_name>/t2 @@ -562,7 +560,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros .verifyReplTargetProperty(replicatedDbName); // Ckpt should be set on bootstrapped tables. - replica.verifyIfCkptSetForTables(replicatedDbName, Arrays.asList("t2", "t3"), tuple.dumpLocation); + replica.verifyIfCkptSetForTables(replicatedDbName, Arrays.asList("t2", "t3"), hiveDumpLocation); // Drop source tables to see if target points to correct data or not after bootstrap load. primary.run("use " + primaryDbName) @@ -753,8 +751,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros } // Only table t2 should exist in the data location list file. - assertExternalFileInfo(Collections.singletonList("t2"), - new Path(tupleInc.dumpLocation, FILE_NAME)); + assertExternalFileInfo(Collections.singletonList("t2"), tupleInc.dumpLocation); // The newly inserted data "2" should be missing in table "t1". But, table t2 should exist and have // inserted data. @@ -904,8 +901,20 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros return ReplicationTestUtils.externalTableBasePathWithClause(REPLICA_EXTERNAL_BASE, replica); } - private void assertExternalFileInfo(List<String> expected, Path externalTableInfoFile) + private void assertExternalFileInfo(List<String> expected, String dumplocation) throws IOException { + assertExternalFileInfo(expected, dumplocation, null); + } + private void assertExternalFileInfo(List<String> expected, String dumplocation, String dbName) throws IOException { + Path externalTableInfoFile = new Path(dumplocation, relativeExtInfoPath(dbName)); ReplicationTestUtils.assertExternalFileInfo(primary, expected, externalTableInfoFile); } + private String relativeExtInfoPath(String dbName) { + + if (dbName == null) { + return ReplUtils.REPL_HIVE_BASE_DIR + File.separator + FILE_NAME; + } else { + return ReplUtils.REPL_HIVE_BASE_DIR + File.separator + dbName.toLowerCase() + File.separator + FILE_NAME; + } + } } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java index bf691f3..624f29b 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java @@ -37,6 +37,7 @@ import org.junit.BeforeClass; import org.junit.Test; import javax.annotation.Nullable; +import java.io.File; import java.io.IOException; import java.util.Arrays; import java.util.Collections; @@ -49,6 +50,7 @@ import java.util.stream.Collectors; import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.FILE_NAME; import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME; +import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.REPL_HIVE_BASE_DIR; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -167,7 +169,8 @@ public class TestReplicationScenariosExternalTablesMetaDataOnly extends BaseRepl .verifyResult(null); // Ckpt should be set on bootstrapped db. - replica.verifyIfCkptSet(replicatedDbName, tuple.dumpLocation); + String hiveDumpLocation = tuple.dumpLocation + File.separator + REPL_HIVE_BASE_DIR; + replica.verifyIfCkptSet(replicatedDbName, hiveDumpLocation); tuple = primary.run("use " + primaryDbName) .run("create external table t3 (id int)") @@ -476,16 +479,18 @@ public class TestReplicationScenariosExternalTablesMetaDataOnly extends BaseRepl .run("create table t4 as select * from t3") .dump(primaryDbName, dumpWithClause); + String hiveDumpDir = tuple.dumpLocation + File.separator + REPL_HIVE_BASE_DIR; // the _external_tables_file info should be created as external tables are to be replicated. assertTrue(primary.miniDFSCluster.getFileSystem() - .exists(new Path(tuple.dumpLocation, FILE_NAME))); + .exists(new Path(hiveDumpDir, FILE_NAME))); // verify that the external table info is written correctly for incremental assertExternalFileInfo(Arrays.asList("t2", "t3"), - new Path(tuple.dumpLocation, FILE_NAME)); + new Path(hiveDumpDir, FILE_NAME)); + // _bootstrap directory should be created as bootstrap enabled on external tables. - Path dumpPath = new Path(tuple.dumpLocation, INC_BOOTSTRAP_ROOT_DIR_NAME); + Path dumpPath = new Path(hiveDumpDir, INC_BOOTSTRAP_ROOT_DIR_NAME); assertTrue(primary.miniDFSCluster.getFileSystem().exists(dumpPath)); // _bootstrap/<db_name>/t2 @@ -511,7 +516,8 @@ public class TestReplicationScenariosExternalTablesMetaDataOnly extends BaseRepl .verifyReplTargetProperty(replicatedDbName); // Ckpt should be set on bootstrapped tables. - replica.verifyIfCkptSetForTables(replicatedDbName, Arrays.asList("t2", "t3"), tuple.dumpLocation); + hiveDumpDir = tuple.dumpLocation + File.separator + REPL_HIVE_BASE_DIR; + replica.verifyIfCkptSetForTables(replicatedDbName, Arrays.asList("t2", "t3"), hiveDumpDir); // Drop source tables to see if target points to correct data or not after bootstrap load. primary.run("use " + primaryDbName) @@ -576,7 +582,8 @@ public class TestReplicationScenariosExternalTablesMetaDataOnly extends BaseRepl } // Only table t2 should exist in the data location list file. - assertFalseExternalFileInfo(new Path(tupleInc.dumpLocation, FILE_NAME)); + String hiveDumpDir = tupleInc.dumpLocation + File.separator + REPL_HIVE_BASE_DIR; + assertFalseExternalFileInfo(new Path(hiveDumpDir, FILE_NAME)); // The newly inserted data "2" should be missing in table "t1". But, table t2 should exist and have // inserted data. diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigration.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigration.java index 3eab045..642a8dc 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigration.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigration.java @@ -203,7 +203,7 @@ public class TestReplicationWithTableMigration { .run("insert into tflattextpart partition(country='india') values(1111), (2222)") .run("insert into tflattextpart partition(country='us') values(3333)") .run( - "create table tacidloc (id int) clustered by(id) into 3 buckets stored as orc LOCATION '/tmp' ") + "create table tacidloc (id int) clustered by(id) into 3 buckets stored as orc LOCATION '/tmp/fol' ") .run("insert into tacidloc values(1)") .run("insert into tacidloc values(2)") .run("insert into tacidloc values(3)") @@ -211,7 +211,7 @@ public class TestReplicationWithTableMigration { "create table tacidpartloc (place string) partitioned by (country string) clustered by(place) " + "into 3 buckets stored as orc ") - .run("alter table tacidpartloc add partition(country='france') LOCATION '/tmp/part'") + .run("alter table tacidpartloc add partition(country='france') LOCATION '/tmp/fol/part'") .run("insert into tacidpartloc partition(country='india') values('mumbai')") .run("insert into tacidpartloc partition(country='us') values('sf')") .run("insert into tacidpartloc partition(country='france') values('paris')") diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java index 0c44100..ad6c002 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java @@ -31,6 +31,7 @@ import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; +import java.io.File; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -227,8 +228,9 @@ public class TestTableLevelReplicationScenarios extends BaseReplicationScenarios private void verifyBootstrapDirInIncrementalDump(String dumpLocation, String[] bootstrappedTables) throws Throwable { + String hiveDumpDir = dumpLocation + File.separator + ReplUtils.REPL_HIVE_BASE_DIR; // _bootstrap directory should be created as bootstrap enabled on external tables. - Path dumpPath = new Path(dumpLocation, INC_BOOTSTRAP_ROOT_DIR_NAME); + Path dumpPath = new Path(hiveDumpDir, INC_BOOTSTRAP_ROOT_DIR_NAME); // If nothing to be bootstrapped. if (bootstrappedTables.length == 0) { @@ -252,7 +254,8 @@ public class TestTableLevelReplicationScenarios extends BaseReplicationScenarios private void verifyTableListForPolicy(String dumpLocation, String[] tableList) throws Throwable { FileSystem fileSystem = primary.miniDFSCluster.getFileSystem(); - Path tableListFile = new Path(dumpLocation, ReplUtils.REPL_TABLE_LIST_DIR_NAME); + String hiveDumpLocation = dumpLocation + File.separator + ReplUtils.REPL_HIVE_BASE_DIR; + Path tableListFile = new Path(hiveDumpLocation, ReplUtils.REPL_TABLE_LIST_DIR_NAME); tableListFile = new Path(tableListFile, primaryDbName.toLowerCase()); if (tableList == null) { @@ -496,13 +499,14 @@ public class TestTableLevelReplicationScenarios extends BaseReplicationScenarios WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) .dump(replPolicy, dumpWithClause); + String hiveDumpDir = tuple.dumpLocation + File.separator + ReplUtils.REPL_HIVE_BASE_DIR; // the _external_tables_file info should be created as external tables are to be replicated. Assert.assertTrue(primary.miniDFSCluster.getFileSystem() - .exists(new Path(new Path(tuple.dumpLocation, primaryDbName.toLowerCase()), FILE_NAME))); + .exists(new Path(new Path(hiveDumpDir, primaryDbName.toLowerCase()), FILE_NAME))); // Verify that the external table info contains only table "a2". ReplicationTestUtils.assertExternalFileInfo(primary, Arrays.asList("a2"), - new Path(new Path(tuple.dumpLocation, primaryDbName.toLowerCase()), FILE_NAME)); + new Path(new Path(hiveDumpDir, primaryDbName.toLowerCase()), FILE_NAME)); replica.load(replicatedDbName, replPolicy, loadWithClause) .run("use " + replicatedDbName) @@ -535,13 +539,14 @@ public class TestTableLevelReplicationScenarios extends BaseReplicationScenarios WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) .dump(replPolicy, dumpWithClause); + String hiveDumpDir = tuple.dumpLocation + File.separator + ReplUtils.REPL_HIVE_BASE_DIR; // the _external_tables_file info should be created as external tables are to be replicated. Assert.assertTrue(primary.miniDFSCluster.getFileSystem() - .exists(new Path(tuple.dumpLocation, FILE_NAME))); + .exists(new Path(hiveDumpDir, FILE_NAME))); // Verify that the external table info contains only table "a2". ReplicationTestUtils.assertExternalFileInfo(primary, Arrays.asList("a2"), - new Path(tuple.dumpLocation, FILE_NAME)); + new Path(hiveDumpDir, FILE_NAME)); replica.load(replicatedDbName, replPolicy, loadWithClause) .run("use " + replicatedDbName) @@ -684,13 +689,14 @@ public class TestTableLevelReplicationScenarios extends BaseReplicationScenarios WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) .dump(replPolicy, oldReplPolicy, dumpWithClause); + String hiveDumpDir = tuple.dumpLocation + File.separator + ReplUtils.REPL_HIVE_BASE_DIR; // the _external_tables_file info should be created as external tables are to be replicated. Assert.assertTrue(primary.miniDFSCluster.getFileSystem() - .exists(new Path(tuple.dumpLocation, FILE_NAME))); + .exists(new Path(hiveDumpDir, FILE_NAME))); // Verify that the external table info contains table "a2" and "c2". ReplicationTestUtils.assertExternalFileInfo(primary, Arrays.asList("a2", "c2"), - new Path(tuple.dumpLocation, FILE_NAME)); + new Path(hiveDumpDir, FILE_NAME)); // Verify if the expected tables are bootstrapped. verifyBootstrapDirInIncrementalDump(tuple.dumpLocation, bootstrappedTables); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java index ebc590d..56f0c93 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java @@ -17,9 +17,7 @@ */ package org.apache.hadoop.hive.ql.exec; -import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.metadata.Hive; -import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.dump.TableExport; import org.apache.hadoop.hive.ql.plan.ExportWork; import org.apache.hadoop.hive.ql.plan.api.StageType; @@ -27,7 +25,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Serializable; - public class ExportTask extends Task<ExportWork> implements Serializable { private static final long serialVersionUID = 1L; @@ -53,9 +50,7 @@ public class ExportTask extends Task<ExportWork> implements Serializable { work.acidPostProcess(db); TableExport tableExport = new TableExport(exportPaths, work.getTableSpec(), work.getReplicationSpec(), db, null, conf, work.getMmContext()); - if (!tableExport.write()) { - throw new SemanticException(ErrorMsg.INCOMPATIBLE_SCHEMA.getMsg()); - } + tableExport.write(true); } catch (Exception e) { LOG.error("failed", e); setException(e); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java index 470357a..3c7274c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java @@ -255,7 +255,7 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable { return 2; } // Copy the files from different source file systems to one destination directory - new CopyUtils(rwork.distCpDoAsUser(), conf, dstFs).copyAndVerify(toPath, srcFiles); + new CopyUtils(rwork.distCpDoAsUser(), conf, dstFs).copyAndVerify(toPath, srcFiles, fromPath); // If a file is copied from CM path, then need to rename them using original source file name // This is needed to avoid having duplicate files in target if same event is applied twice @@ -324,10 +324,18 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable { public static Task<?> getLoadCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath, HiveConf conf, boolean isAutoPurge, boolean needRecycle, boolean copyToMigratedTxnTable) { + return getLoadCopyTask(replicationSpec, srcPath, dstPath, conf, isAutoPurge, needRecycle, + copyToMigratedTxnTable, true); + } + + public static Task<?> getLoadCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath, + HiveConf conf, boolean isAutoPurge, boolean needRecycle, + boolean copyToMigratedTxnTable, boolean readSourceAsFileList) { Task<?> copyTask = null; LOG.debug("ReplCopyTask:getLoadCopyTask: {}=>{}", srcPath, dstPath); if ((replicationSpec != null) && replicationSpec.isInReplicationScope()){ ReplCopyWork rcwork = new ReplCopyWork(srcPath, dstPath, false); + rcwork.setReadSrcAsFilesList(readSourceAsFileList); if (replicationSpec.isReplace() && (conf.getBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION) || copyToMigratedTxnTable)) { rcwork.setDeleteDestIfExist(true); rcwork.setAutoPurge(isAutoPurge); @@ -339,15 +347,8 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable { // replace events getting replayed in the first incremental load. rcwork.setCheckDuplicateCopy(replicationSpec.needDupCopyCheck() && !replicationSpec.isReplace()); LOG.debug("ReplCopyTask:\trcwork"); - if (replicationSpec.isLazy()) { - LOG.debug("ReplCopyTask:\tlazy"); - rcwork.setReadSrcAsFilesList(true); - - // It is assumed isLazy flag is set only for REPL LOAD flow. - // IMPORT always do deep copy. So, distCpDoAsUser will be null by default in ReplCopyWork. - String distCpDoAsUser = conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER); - rcwork.setDistCpDoAsUser(distCpDoAsUser); - } + String distCpDoAsUser = conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER); + rcwork.setDistCpDoAsUser(distCpDoAsUser); copyTask = TaskFactory.get(rcwork, conf); } else { LOG.debug("ReplCopyTask:\tcwork"); @@ -360,4 +361,9 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable { HiveConf conf) { return getLoadCopyTask(replicationSpec, srcPath, dstPath, conf, false, false, false); } + + public static Task<?> getLoadCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath, + HiveConf conf, boolean readSourceAsFileList) { + return getLoadCopyTask(replicationSpec, srcPath, dstPath, conf, false, false, false, readSourceAsFileList); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index fd06968..92e45b4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hive.metastore.messaging.event.filters.AndFilter; import org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFilter; import org.apache.hadoop.hive.metastore.messaging.event.filters.ReplEventFilter; import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.ReplCopyTask; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.io.AcidUtils; @@ -51,6 +52,7 @@ import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.metadata.events.EventUtils; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec; import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.EximUtil.ReplPathMapping; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.DumpType; @@ -125,17 +127,18 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { Base64.getEncoder().encodeToString(work.dbNameOrPattern.toLowerCase() .getBytes(StandardCharsets.UTF_8.name()))); Path currentDumpPath = new Path(dumpRoot, getNextDumpDir()); - DumpMetaData dmd = new DumpMetaData(currentDumpPath, conf); + Path hiveDumpRoot = new Path(currentDumpPath, ReplUtils.REPL_HIVE_BASE_DIR); + DumpMetaData dmd = new DumpMetaData(hiveDumpRoot, conf); // Initialize ReplChangeManager instance since we will require it to encode file URI. ReplChangeManager.getInstance(conf); Path cmRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLCMDIR)); Long lastReplId; if (!dumpRoot.getFileSystem(conf).exists(dumpRoot) || dumpRoot.getFileSystem(conf).listStatus(dumpRoot).length == 0) { - lastReplId = bootStrapDump(currentDumpPath, dmd, cmRoot, hiveDb); + lastReplId = bootStrapDump(hiveDumpRoot, dmd, cmRoot, hiveDb); } else { work.setEventFrom(getEventFromPreviousDumpMetadata(dumpRoot)); - lastReplId = incrementalDump(currentDumpPath, dmd, cmRoot, hiveDb); + lastReplId = incrementalDump(hiveDumpRoot, dmd, cmRoot, hiveDb); } prepareReturnValues(Arrays.asList(currentDumpPath.toUri().toString(), String.valueOf(lastReplId))); } catch (Exception e) { @@ -155,7 +158,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { latestUpdatedStatus = status; } } - DumpMetaData dmd = new DumpMetaData(latestUpdatedStatus.getPath(), conf); + DumpMetaData dmd = new DumpMetaData(new Path(latestUpdatedStatus.getPath(), ReplUtils.REPL_HIVE_BASE_DIR), conf); if (dmd.isIncrementalDump()) { return dmd.getEventTo(); } @@ -331,7 +334,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { NotificationEvent ev = evIter.next(); lastReplId = ev.getEventId(); Path evRoot = new Path(dumpRoot, String.valueOf(lastReplId)); - dumpEvent(ev, evRoot, cmRoot, hiveDb); + dumpEvent(ev, evRoot, dumpRoot, cmRoot, hiveDb); } replLogger.endLog(lastReplId.toString()); @@ -370,8 +373,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { // Dump the table to be bootstrapped if required. if (shouldBootstrapDumpTable(table)) { HiveWrapper.Tuple<Table> tableTuple = new HiveWrapper(hiveDb, dbName).table(table); - dumpTable(dbName, tableName, validTxnList, dbRoot, bootDumpBeginReplId, hiveDb, - tableTuple); + dumpTable(dbName, tableName, validTxnList, dbRoot, dumpRoot, bootDumpBeginReplId, hiveDb, tableTuple); } if (tableList != null && isTableSatifiesConfig(table)) { tableList.add(tableName); @@ -411,9 +413,10 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { return new Path(dumpRoot, dbName); } - private void dumpEvent(NotificationEvent ev, Path evRoot, Path cmRoot, Hive db) throws Exception { + private void dumpEvent(NotificationEvent ev, Path evRoot, Path dumpRoot, Path cmRoot, Hive db) throws Exception { EventHandler.Context context = new EventHandler.Context( evRoot, + dumpRoot, cmRoot, db, conf, @@ -531,7 +534,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { LOG.debug("Adding table {} to external tables list", tblName); writer.dataLocationDump(tableTuple.object); } - dumpTable(dbName, tblName, validTxnList, dbRoot, bootDumpBeginReplId, hiveDb, + dumpTable(dbName, tblName, validTxnList, dbRoot, dumpRoot, bootDumpBeginReplId, hiveDb, tableTuple); } catch (InvalidTableException te) { // Bootstrap dump shouldn't fail if the table is dropped/renamed while dumping it. @@ -589,7 +592,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { return dbRoot; } - void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, long lastReplId, + void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, Path dumproot, long lastReplId, Hive hiveDb, HiveWrapper.Tuple<Table> tuple) throws Exception { LOG.info("Bootstrap Dump for table " + tblName); TableSpec tableSpec = new TableSpec(tuple.object); @@ -608,10 +611,20 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { } MmContext mmCtx = MmContext.createIfNeeded(tableSpec.tableHandle); tuple.replicationSpec.setRepl(true); - new TableExport( - exportPaths, tableSpec, tuple.replicationSpec, hiveDb, distCpDoAsUser, conf, mmCtx).write(); - + List<ReplPathMapping> replPathMappings = new TableExport( + exportPaths, tableSpec, tuple.replicationSpec, hiveDb, distCpDoAsUser, conf, mmCtx).write(false); replLogger.tableLog(tblName, tableSpec.tableHandle.getTableType()); + if (tableSpec.tableHandle.getTableType().equals(TableType.EXTERNAL_TABLE) + || Utils.shouldDumpMetaDataOnly(tuple.object, conf)) { + return; + } + for (ReplPathMapping replPathMapping: replPathMappings) { + Task<?> copyTask = ReplCopyTask.getLoadCopyTask( + tuple.replicationSpec, replPathMapping.getSrcPath(), replPathMapping.getTargetPath(), conf, false); + this.addDependentTask(copyTask); + LOG.info("Scheduled a repl copy task from [{}] to [{}]", + replPathMapping.getSrcPath(), replPathMapping.getTargetPath()); + } } private String getValidWriteIdList(String dbName, String tblName, String validTxnString) throws LockException { @@ -680,7 +693,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { private ReplicationSpec getNewReplicationSpec(String evState, String objState, boolean isMetadataOnly) { - return new ReplicationSpec(true, isMetadataOnly, evState, objState, false, true, true); + return new ReplicationSpec(true, isMetadataOnly, evState, objState, false, true); } private String getNextDumpDir() { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java index 272373e..f25c714 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.ql.plan.Explain; import org.apache.hadoop.hive.ql.session.LineageState; import org.apache.hadoop.hive.ql.exec.Task; +import java.io.File; import java.io.IOException; import java.io.Serializable; import java.util.Iterator; @@ -76,7 +77,9 @@ public class ReplLoadWork implements Serializable { if ((currentReplScope != null) && StringUtils.isNotBlank(dbNameToLoadIn)) { currentReplScope.setDbName(dbNameToLoadIn); } - this.bootstrapDumpToCleanTables = hiveConf.get(ReplUtils.REPL_CLEAN_TABLES_FROM_BOOTSTRAP_CONFIG); + String bootstrapDumpToCleanTablesLoc = hiveConf.get(ReplUtils.REPL_CLEAN_TABLES_FROM_BOOTSTRAP_CONFIG); + this.bootstrapDumpToCleanTables = bootstrapDumpToCleanTablesLoc == null ? null : bootstrapDumpToCleanTablesLoc + + File.separator + ReplUtils.REPL_HIVE_BASE_DIR; this.needCleanTablesFromBootstrap = StringUtils.isNotBlank(this.bootstrapDumpToCleanTables); rootTask = null; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java index 1159774..05a590a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.HiveTableName; import org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.SemanticException; @@ -242,9 +243,9 @@ public class LoadPartitions { Task<?> copyTask = ReplCopyTask.getLoadCopyTask( event.replicationSpec(), - sourceWarehousePartitionLocation, + new Path(sourceWarehousePartitionLocation, EximUtil.DATA_PATH_NAME), stagingDir, - context.hiveConf + context.hiveConf, false ); Task<?> movePartitionTask = null; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java index 65588fd..82a3031 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java @@ -296,7 +296,7 @@ public class LoadTable { + table.getCompleteName() + " with source location: " + dataPath.toString() + " and target location " + tgtPath.toString()); - Task<?> copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, tmpPath, context.hiveConf); + Task<?> copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, tmpPath, context.hiveConf, false); MoveWork moveWork = new MoveWork(new HashSet<>(), new HashSet<>(), null, null, false); if (AcidUtils.isTransactionalTable(table)) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java index 504c9d4..64ecf42 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; import org.apache.hadoop.hive.ql.plan.ColumnStatsUpdateWork; @@ -79,6 +80,9 @@ public class ReplUtils { // Root directory for dumping bootstrapped tables along with incremental events dump. public static final String INC_BOOTSTRAP_ROOT_DIR_NAME = "_bootstrap"; + // Root base directory name for hive. + public static final String REPL_HIVE_BASE_DIR = "hive"; + // Name of the directory which stores the list of tables included in the policy in case of table level replication. // One file per database, named after the db name. The directory is not created for db level replication. public static final String REPL_TABLE_LIST_DIR_NAME = "_tables"; @@ -236,7 +240,8 @@ public class ReplUtils { return p -> { try { return fs.isDirectory(p) && !p.getName().equalsIgnoreCase(ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME) - && !p.getName().equalsIgnoreCase(ReplUtils.REPL_TABLE_LIST_DIR_NAME); + && !p.getName().equalsIgnoreCase(ReplUtils.REPL_TABLE_LIST_DIR_NAME) + && !p.getName().equalsIgnoreCase(EximUtil.DATA_PATH_NAME); } catch (IOException e) { throw new RuntimeException(e); } @@ -246,7 +251,8 @@ public class ReplUtils { public static PathFilter getBootstrapDirectoryFilter(final FileSystem fs) { return p -> { try { - return fs.isDirectory(p) && !p.getName().equalsIgnoreCase(ReplUtils.REPL_TABLE_LIST_DIR_NAME); + return fs.isDirectory(p) && !p.getName().equalsIgnoreCase(ReplUtils.REPL_TABLE_LIST_DIR_NAME) + && !p.getName().equalsIgnoreCase(EximUtil.DATA_PATH_NAME); } catch (IOException e) { throw new RuntimeException(e); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java index f588b0d..e65cbf5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java @@ -157,6 +157,40 @@ public class EximUtil { } } + /** + * Wrapper class for mapping replication source and target path for copying data. + */ + public static class ReplPathMapping { + private Path srcPath; + private Path tgtPath; + + public ReplPathMapping(Path srcPath, Path tgtPath) { + if (srcPath == null) { + throw new IllegalArgumentException("Source Path can not be null."); + } + this.srcPath = srcPath; + if (tgtPath == null) { + throw new IllegalArgumentException("Target Path can not be null."); + } + this.tgtPath = tgtPath; + } + + public Path getSrcPath() { + return srcPath; + } + + public void setSrcPath(Path srcPath) { + this.srcPath = srcPath; + } + + public Path getTargetPath() { + return tgtPath; + } + + public void setTargetPath(Path targetPath) { + this.tgtPath = targetPath; + } + } private EximUtil() { } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index dd97f3d..7354a3e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -505,8 +505,9 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { Task<?> copyTask = null; if (replicationSpec.isInReplicationScope()) { + boolean isImport = ReplicationSpec.Type.IMPORT.equals(replicationSpec.getReplSpecType()); copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, destPath, x.getConf(), - isAutoPurge, needRecycle, copyToMigratedTxnTable); + isAutoPurge, needRecycle, copyToMigratedTxnTable, !isImport); } else { copyTask = TaskFactory.get(new CopyWork(dataPath, destPath, false)); } @@ -640,8 +641,9 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { Task<?> copyTask = null; if (replicationSpec.isInReplicationScope()) { + boolean isImport = ReplicationSpec.Type.IMPORT.equals(replicationSpec.getReplSpecType()); copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, new Path(srcLocation), destPath, - x.getConf(), isAutoPurge, needRecycle, copyToMigratedTxnTable); + x.getConf(), isAutoPurge, needRecycle, copyToMigratedTxnTable, !isImport); } else { copyTask = TaskFactory.get(new CopyWork(new Path(srcLocation), destPath, false)); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 703eb11..c2e9f88 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork; import org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables; import org.apache.hadoop.hive.ql.exec.repl.ReplLoadWork; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -438,17 +439,19 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { String currentReplStatusOfTarget = getReplStatus(replScope.getDbName()); if (currentReplStatusOfTarget == null) { //bootstrap - return statuses[0].getPath(); + return new Path(statuses[0].getPath(), ReplUtils.REPL_HIVE_BASE_DIR); } else { - DumpMetaData latestDump = new DumpMetaData(statuses[statuses.length - 1].getPath(), conf); + DumpMetaData latestDump = new DumpMetaData( + new Path(statuses[statuses.length - 1].getPath(), ReplUtils.REPL_HIVE_BASE_DIR), conf); if (Long.parseLong(currentReplStatusOfTarget.trim()) >= latestDump.getEventTo()) { isTargetAlreadyLoaded = true; } else { for (FileStatus status : statuses) { - DumpMetaData dmd = new DumpMetaData(status.getPath(), conf); + Path hiveLoadPath = new Path(status.getPath(), ReplUtils.REPL_HIVE_BASE_DIR); + DumpMetaData dmd = new DumpMetaData(hiveLoadPath, conf); if (dmd.isIncrementalDump() && Long.parseLong(currentReplStatusOfTarget.trim()) < dmd.getEventTo()) { - return status.getPath(); + return hiveLoadPath; } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java index 99b09e5..13e4a8c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java @@ -42,7 +42,6 @@ public class ReplicationSpec { private String eventId = null; private String currStateId = null; private boolean isNoop = false; - private boolean isLazy = false; // lazy mode => we only list files, and expect that the eventual copy will pull data in. private boolean isReplace = true; // default is that the import mode is insert overwrite private String validWriteIdList = null; // WriteIds snapshot for replicating ACID/MM tables. //TxnIds snapshot @@ -60,7 +59,6 @@ public class ReplicationSpec { EVENT_ID("repl.event.id"), CURR_STATE_ID(ReplConst.REPL_TARGET_TABLE_PROPERTY), NOOP("repl.noop"), - LAZY("repl.lazy"), IS_REPLACE("repl.is.replace"), VALID_WRITEID_LIST("repl.valid.writeid.list"), VALID_TXN_LIST("repl.valid.txnid.list") @@ -117,18 +115,17 @@ public class ReplicationSpec { } public ReplicationSpec(String fromId, String toId) { - this(true, false, fromId, toId, false, true, false); + this(true, false, fromId, toId, false, false); } public ReplicationSpec(boolean isInReplicationScope, boolean isMetadataOnly, String eventReplicationState, String currentReplicationState, - boolean isNoop, boolean isLazy, boolean isReplace) { + boolean isNoop, boolean isReplace) { this.isInReplicationScope = isInReplicationScope; this.isMetadataOnly = isMetadataOnly; this.eventId = eventReplicationState; this.currStateId = currentReplicationState; this.isNoop = isNoop; - this.isLazy = isLazy; this.isReplace = isReplace; this.specType = Type.DEFAULT; } @@ -149,7 +146,6 @@ public class ReplicationSpec { this.eventId = keyFetcher.apply(ReplicationSpec.KEY.EVENT_ID.toString()); this.currStateId = keyFetcher.apply(ReplicationSpec.KEY.CURR_STATE_ID.toString()); this.isNoop = Boolean.parseBoolean(keyFetcher.apply(ReplicationSpec.KEY.NOOP.toString())); - this.isLazy = Boolean.parseBoolean(keyFetcher.apply(ReplicationSpec.KEY.LAZY.toString())); this.isReplace = Boolean.parseBoolean(keyFetcher.apply(ReplicationSpec.KEY.IS_REPLACE.toString())); this.validWriteIdList = keyFetcher.apply(ReplicationSpec.KEY.VALID_WRITEID_LIST.toString()); this.validTxnList = keyFetcher.apply(KEY.VALID_TXN_LIST.toString()); @@ -325,20 +321,6 @@ public class ReplicationSpec { } /** - * @return whether or not the current replication action is set to be lazy - */ - public boolean isLazy() { - return isLazy; - } - - /** - * @param isLazy whether or not the current replication action should be lazy - */ - public void setLazy(boolean isLazy){ - this.isLazy = isLazy; - } - - /** * @return the WriteIds snapshot for the current ACID/MM table being replicated */ public String getValidWriteIdList() { @@ -385,8 +367,6 @@ public class ReplicationSpec { return getCurrentReplicationState(); case NOOP: return String.valueOf(isNoop()); - case LAZY: - return String.valueOf(isLazy()); case IS_REPLACE: return String.valueOf(isReplace()); case VALID_WRITEID_LIST: diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java index 73c863e..fd70260 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java @@ -71,29 +71,35 @@ public class CopyUtils { // Used by replication, copy files from source to destination. It is possible source file is // changed/removed during copy, so double check the checksum after copy, // if not match, copy again from cm - public void copyAndVerify(Path destRoot, - List<ReplChangeManager.FileInfo> srcFiles) throws IOException, LoginException, HiveFatalException { - Map<FileSystem, Map< Path, List<ReplChangeManager.FileInfo>>> map = fsToFileMap(srcFiles, destRoot); + public void copyAndVerify(Path destRoot, List<ReplChangeManager.FileInfo> srcFiles, Path origSrcPtah) + throws IOException, LoginException, HiveFatalException { UserGroupInformation proxyUser = getProxyUser(); + FileSystem sourceFs = origSrcPtah.getFileSystem(hiveConf); + boolean useRegularCopy = regularCopy(sourceFs, srcFiles); try { - for (Map.Entry<FileSystem, Map<Path, List<ReplChangeManager.FileInfo>>> entry : map.entrySet()) { - Map<Path, List<ReplChangeManager.FileInfo>> destMap = entry.getValue(); - for (Map.Entry<Path, List<ReplChangeManager.FileInfo>> destMapEntry : destMap.entrySet()) { - Path destination = destMapEntry.getKey(); - List<ReplChangeManager.FileInfo> fileInfoList = destMapEntry.getValue(); - // Get the file system again from cache. There is a chance that the file system stored in the map is closed. - // For instance, doCopyRetry closes the file system in case of i/o exceptions. - FileSystem sourceFs = fileInfoList.get(0).getSourcePath().getFileSystem(hiveConf); - boolean useRegularCopy = regularCopy(sourceFs, fileInfoList); - - if (!destinationFs.exists(destination) - && !FileUtils.mkdir(destinationFs, destination, hiveConf)) { - LOG.error("Failed to create destination directory: " + destination); - throw new IOException("Destination directory creation failed"); + if (!useRegularCopy) { + srcFiles.clear(); + srcFiles.add(new ReplChangeManager.FileInfo(sourceFs, origSrcPtah, null)); + doCopyRetry(sourceFs, srcFiles, destRoot, proxyUser, useRegularCopy); + } else { + Map<FileSystem, Map< Path, List<ReplChangeManager.FileInfo>>> map = fsToFileMap(srcFiles, destRoot); + for (Map.Entry<FileSystem, Map<Path, List<ReplChangeManager.FileInfo>>> entry : map.entrySet()) { + Map<Path, List<ReplChangeManager.FileInfo>> destMap = entry.getValue(); + for (Map.Entry<Path, List<ReplChangeManager.FileInfo>> destMapEntry : destMap.entrySet()) { + Path destination = destMapEntry.getKey(); + List<ReplChangeManager.FileInfo> fileInfoList = destMapEntry.getValue(); + // Get the file system again from cache. There is a chance that the file system stored in the map is closed. + // For instance, doCopyRetry closes the file system in case of i/o exceptions. + sourceFs = fileInfoList.get(0).getSourcePath().getFileSystem(hiveConf); + if (!destinationFs.exists(destination) + && !FileUtils.mkdir(destinationFs, destination, hiveConf)) { + LOG.error("Failed to create destination directory: " + destination); + throw new IOException("Destination directory creation failed"); + } + + // Copy files with retry logic on failure or source file is dropped or changed. + doCopyRetry(sourceFs, fileInfoList, destination, proxyUser, true); } - - // Copy files with retry logic on failure or source file is dropped or changed. - doCopyRetry(sourceFs, fileInfoList, destination, proxyUser, useRegularCopy); } } } finally { @@ -181,12 +187,12 @@ public class CopyUtils { continue; } Path srcPath = srcFile.getEffectivePath(); - Path destPath = new Path(destination, srcPath.getName()); - if (destinationFs.exists(destPath)) { + //Path destPath = new Path(destination, srcPath.getName()); + if (destinationFs.exists(destination)) { // If destination file is present and checksum of source mismatch, then retry copy. if (isSourceFileMismatch(sourceFs, srcFile)) { // Delete the incorrectly copied file and retry with CM path - destinationFs.delete(destPath, true); + destinationFs.delete(destination, true); srcFile.setIsUseSourcePath(false); } else { // If the retry logic is reached after copy error, then include the copied file as well. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/BootStrapReplicationSpecFunction.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/BootStrapReplicationSpecFunction.java index 4b2812e..7d64196 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/BootStrapReplicationSpecFunction.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/BootStrapReplicationSpecFunction.java @@ -43,7 +43,6 @@ class BootStrapReplicationSpecFunction implements HiveWrapper.Tuple.Function<Rep "replv2", "will-be-set", false, - true, false ); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java index 9e24799..454998f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java @@ -23,6 +23,8 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.PartitionIterable; +import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.EximUtil.ReplPathMapping; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.repl.dump.io.FileOperations; import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext; @@ -73,8 +75,10 @@ class PartitionExport { this.callersSession = SessionState.get(); } - void write(final ReplicationSpec forReplicationSpec) throws InterruptedException, HiveException { + List<ReplPathMapping> write(final ReplicationSpec forReplicationSpec, boolean isExportTask) + throws InterruptedException, HiveException { List<Future<?>> futures = new LinkedList<>(); + List<ReplPathMapping> replCopyPathMappings = new LinkedList<>(); //Collections.synchronizedList(new LinkedList<>()); ExecutorService producer = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setNameFormat("partition-submitter-thread-%d").build()); futures.add(producer.submit(() -> { @@ -116,8 +120,9 @@ class PartitionExport { forReplicationSpec, hiveConf); Path rootDataDumpDir = paths.partitionExportDir(partitionName); new FileOperations(dataPathList, rootDataDumpDir, distCpDoAsUser, hiveConf, mmCtx) - .export(forReplicationSpec); + .export(isExportTask); LOG.debug("Thread: {}, finish partition dump {}", threadName, partitionName); + return new ReplPathMapping(partition.getDataLocation(), new Path(rootDataDumpDir, EximUtil.DATA_PATH_NAME)); } catch (Exception e) { throw new RuntimeException(e.getMessage(), e); } @@ -126,7 +131,11 @@ class PartitionExport { consumer.shutdown(); for (Future<?> future : futures) { try { - future.get(); + Object retVal = future.get(); + if (retVal != null) { + ReplPathMapping replPathMapping = (ReplPathMapping)retVal; + replCopyPathMappings.add(replPathMapping); + } } catch (Exception e) { LOG.error("failed", e.getCause()); throw new HiveException(e.getCause().getMessage(), e.getCause()); @@ -134,5 +143,6 @@ class PartitionExport { } // may be drive this via configuration as well. consumer.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); + return replCopyPathMappings; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java index 97a1dd3..a384c7e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.PartitionIterable; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec; import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.EximUtil.ReplPathMapping; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.dump.io.FileOperations; @@ -46,6 +47,7 @@ import java.net.URI; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -86,20 +88,21 @@ public class TableExport { this.mmCtx = mmCtx; } - public boolean write() throws SemanticException { + public List<ReplPathMapping> write(boolean isExportTask) throws SemanticException { + List<ReplPathMapping> replPathMappings = Collections.emptyList(); if (tableSpec == null) { writeMetaData(null); - return true; } else if (shouldExport()) { PartitionIterable withPartitions = getPartitions(); writeMetaData(withPartitions); if (!replicationSpec.isMetadataOnly() && !(replicationSpec.isRepl() && tableSpec.tableHandle.getTableType().equals(TableType.EXTERNAL_TABLE))) { - writeData(withPartitions); + replPathMappings = writeData(withPartitions, isExportTask); } - return true; + } else if (isExportTask) { + throw new SemanticException(ErrorMsg.INCOMPATIBLE_SCHEMA.getMsg()); } - return false; + return replPathMappings; } private PartitionIterable getPartitions() throws SemanticException { @@ -149,20 +152,24 @@ public class TableExport { } } - private void writeData(PartitionIterable partitions) throws SemanticException { + private List<ReplPathMapping> writeData(PartitionIterable partitions, boolean isExportTask) throws SemanticException { + List<ReplPathMapping> replCopyPathMappings = new LinkedList<>(); try { if (tableSpec.tableHandle.isPartitioned()) { if (partitions == null) { throw new IllegalStateException("partitions cannot be null for partitionTable :" + tableSpec.getTableName().getTable()); } - new PartitionExport(paths, partitions, distCpDoAsUser, conf, mmCtx).write(replicationSpec); + replCopyPathMappings = new PartitionExport( + paths, partitions, distCpDoAsUser, conf, mmCtx).write(replicationSpec, isExportTask); } else { List<Path> dataPathList = Utils.getDataPathList(tableSpec.tableHandle.getDataLocation(), replicationSpec, conf); + replCopyPathMappings.add(new ReplPathMapping(tableSpec.tableHandle.getDataLocation(), paths.dataExportDir())); new FileOperations(dataPathList, paths.dataExportDir(), distCpDoAsUser, conf, mmCtx) - .export(replicationSpec); + .export(isExportTask); } + return replCopyPathMappings; } catch (Exception e) { throw new SemanticException(e.getMessage(), e); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java index b996703..8046077 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java @@ -17,15 +17,32 @@ */ package org.apache.hadoop.hive.ql.parse.repl.dump.events; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.ReplChangeManager; +import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.messaging.EventMessage; import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer; import org.apache.hadoop.hive.metastore.messaging.MessageEncoder; import org.apache.hadoop.hive.metastore.messaging.MessageFactory; import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageEncoder; +import org.apache.hadoop.hive.ql.metadata.HiveFatalException; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.repl.CopyUtils; +import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.security.auth.login.LoginException; +import java.io.BufferedWriter; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + abstract class AbstractEventHandler<T extends EventMessage> implements EventHandler { static final Logger LOG = LoggerFactory.getLogger(AbstractEventHandler.class); static final MessageEncoder jsonMessageEncoder = JSONMessageEncoder.getInstance(); @@ -71,4 +88,43 @@ abstract class AbstractEventHandler<T extends EventMessage> implements EventHand public long toEventId() { return event.getEventId(); } + + protected void writeFileEntry(String dbName, Table table, String file, BufferedWriter fileListWriter, + Context withinContext) + throws IOException, LoginException, MetaException, HiveFatalException { + HiveConf hiveConf = withinContext.hiveConf; + String distCpDoAsUser = hiveConf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER); + if (!Utils.shouldDumpMetaDataOnly(table, withinContext.hiveConf)) { + Path dataPath = new Path(withinContext.dumpRoot.toString(), EximUtil.DATA_PATH_NAME); + List<ReplChangeManager.FileInfo> filePaths = new ArrayList<>(); + String[] decodedURISplits = ReplChangeManager.decodeFileUri(file); + String srcDataFile = decodedURISplits[0]; + Path srcDataPath = new Path(srcDataFile); + if (dataPath.toUri().getScheme() == null) { + dataPath = new Path(srcDataPath.toUri().getScheme(), srcDataPath.toUri().getAuthority(), dataPath.toString()); + } + String eventTblPath = event.getEventId() + File.separator + dbName + File.separator + table.getTableName(); + String srcDataFileRelativePath = null; + if (srcDataFile.contains(table.getPath().toString())) { + srcDataFileRelativePath = srcDataFile.substring(table.getPath().toString().length() + 1); + } else if (decodedURISplits[3] == null) { + srcDataFileRelativePath = srcDataPath.getName(); + } else { + srcDataFileRelativePath = srcDataFileRelativePath + File.separator + srcDataPath.getName(); + } + Path targetPath = new Path(dataPath, eventTblPath + File.separator + srcDataFileRelativePath); + String encodedTargetPath = ReplChangeManager.encodeFileUri( + targetPath.toString(), decodedURISplits[1], decodedURISplits[3]); + ReplChangeManager.FileInfo f = ReplChangeManager.getFileInfo(new Path(decodedURISplits[0]), + decodedURISplits[1], decodedURISplits[2], decodedURISplits[3], hiveConf); + filePaths.add(f); + FileSystem dstFs = targetPath.getFileSystem(hiveConf); + Path finalTargetPath = targetPath.getParent(); + if (decodedURISplits[3] != null) { + finalTargetPath = finalTargetPath.getParent(); + } + new CopyUtils(distCpDoAsUser, hiveConf, dstFs).copyAndVerify(finalTargetPath, filePaths, srcDataPath); + fileListWriter.write(encodedTargetPath + "\n"); + } + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java index 42e74b3..a06b90d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java @@ -110,8 +110,7 @@ class AddPartitionHandler extends AbstractEventHandler { // encoded filename/checksum of files, write into _files try (BufferedWriter fileListWriter = writer(withinContext, qlPtn)) { for (String file : files) { - fileListWriter.write(file); - fileListWriter.newLine(); + writeFileEntry(qlMdTable.getDbName(), qlMdTable, file, fileListWriter, withinContext); } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java index 7d7dc26..dc87506 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java @@ -25,18 +25,23 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStore; import org.apache.hadoop.hive.metastore.RawStore; import org.apache.hadoop.hive.metastore.ReplChangeManager; +import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.api.WriteEventInfo; import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage; import org.apache.hadoop.hive.metastore.utils.StringUtils; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; +import org.apache.hadoop.hive.ql.metadata.HiveFatalException; import org.apache.hadoop.hive.ql.metadata.HiveUtils; import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.DumpType; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; import org.apache.hadoop.fs.FileSystem; + +import javax.security.auth.login.LoginException; import java.io.BufferedWriter; import java.io.IOException; import java.io.OutputStreamWriter; @@ -60,17 +65,19 @@ class CommitTxnHandler extends AbstractEventHandler<CommitTxnMessage> { return new BufferedWriter(new OutputStreamWriter(fs.create(filesPath))); } - private void writeDumpFiles(Context withinContext, Iterable<String> files, Path dataPath) throws IOException { + private void writeDumpFiles(Table qlMdTable, Context withinContext, Iterable<String> files, Path dataPath) + throws IOException, LoginException, MetaException, HiveFatalException { // encoded filename/checksum of files, write into _files try (BufferedWriter fileListWriter = writer(withinContext, dataPath)) { for (String file : files) { - fileListWriter.write(file + "\n"); + writeFileEntry(qlMdTable.getDbName(), qlMdTable, file, fileListWriter, withinContext); } } } private void createDumpFile(Context withinContext, org.apache.hadoop.hive.ql.metadata.Table qlMdTable, - List<Partition> qlPtns, List<List<String>> fileListArray) throws IOException, SemanticException { + List<Partition> qlPtns, List<List<String>> fileListArray) + throws IOException, SemanticException, LoginException, MetaException, HiveFatalException { if (fileListArray == null || fileListArray.isEmpty()) { return; } @@ -86,17 +93,18 @@ class CommitTxnHandler extends AbstractEventHandler<CommitTxnMessage> { if ((null == qlPtns) || qlPtns.isEmpty()) { Path dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME); - writeDumpFiles(withinContext, fileListArray.get(0), dataPath); + writeDumpFiles(qlMdTable, withinContext, fileListArray.get(0), dataPath); } else { for (int idx = 0; idx < qlPtns.size(); idx++) { Path dataPath = new Path(withinContext.eventRoot, qlPtns.get(idx).getName()); - writeDumpFiles(withinContext, fileListArray.get(idx), dataPath); + writeDumpFiles(qlMdTable, withinContext, fileListArray.get(idx), dataPath); } } } private void createDumpFileForTable(Context withinContext, org.apache.hadoop.hive.ql.metadata.Table qlMdTable, - List<Partition> qlPtns, List<List<String>> fileListArray) throws IOException, SemanticException { + List<Partition> qlPtns, List<List<String>> fileListArray) + throws IOException, SemanticException, LoginException, MetaException, HiveFatalException { Path newPath = HiveUtils.getDumpPath(withinContext.eventRoot, qlMdTable.getDbName(), qlMdTable.getTableName()); Context context = new Context(withinContext); context.setEventRoot(newPath); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java index 355374a..c853223 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java @@ -84,7 +84,7 @@ class CreateTableHandler extends AbstractEventHandler<CreateTableMessage> { // encoded filename/checksum of files, write into _files try (BufferedWriter fileListWriter = writer(withinContext, dataPath)) { for (String file : files) { - fileListWriter.write(file + "\n"); + writeFileEntry(qlMdTable.getDbName(), qlMdTable, file, fileListWriter, withinContext); } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java index 7d00f89..ae70298 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java @@ -36,6 +36,7 @@ public interface EventHandler { DumpType dumpType(); class Context { + Path dumpRoot; Path eventRoot; final Path cmRoot; final Hive db; @@ -45,8 +46,10 @@ public interface EventHandler { final ReplScope oldReplScope; private Set<String> tablesForBootstrap; - public Context(Path eventRoot, Path cmRoot, Hive db, HiveConf hiveConf, ReplicationSpec replicationSpec, - ReplScope replScope, ReplScope oldReplScope, Set<String> tablesForBootstrap) { + public Context(Path eventRoot, Path dumpRoot, Path cmRoot, Hive db, HiveConf hiveConf, + ReplicationSpec replicationSpec, ReplScope replScope, ReplScope oldReplScope, + Set<String> tablesForBootstrap) { + this.dumpRoot = dumpRoot; this.eventRoot = eventRoot; this.cmRoot = cmRoot; this.db = db; @@ -58,6 +61,7 @@ public interface EventHandler { } public Context(Context other) { + this.dumpRoot = other.dumpRoot; this.eventRoot = other.eventRoot; this.cmRoot = other.cmRoot; this.db = other.db; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java index 5a18d57..4e02620 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java @@ -97,7 +97,7 @@ class InsertHandler extends AbstractEventHandler<InsertMessage> { // encoded filename/checksum of files, write into _files try (BufferedWriter fileListWriter = writer(withinContext, dataPath)) { for (String file : files) { - fileListWriter.write(file + "\n"); + writeFileEntry(qlMdTable.getDbName(), qlMdTable, file, fileListWriter, withinContext); } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java index fc5419c..5754bf2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java @@ -17,10 +17,9 @@ */ package org.apache.hadoop.hive.ql.parse.repl.dump.io; -import java.io.BufferedWriter; import java.io.FileNotFoundException; import java.io.IOException; -import java.io.OutputStreamWriter; +import java.net.URI; import java.util.ArrayList; import java.util.List; @@ -29,26 +28,20 @@ import javax.security.auth.login.LoginException; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.ReplChangeManager; -import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.HiveInputFormat; -import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.LoadSemanticAnalyzer; -import org.apache.hadoop.hive.ql.parse.ReplicationSpec; -import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.CopyUtils; import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext; -import org.apache.hadoop.hive.shims.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static org.apache.hadoop.hive.ql.ErrorMsg.FILE_NOT_FOUND; + //TODO: this object is created once to call one method and then immediately destroyed. //So it's basically just a roundabout way to pass arguments to a static method. Simplify? public class FileOperations { @@ -75,11 +68,11 @@ public class FileOperations { exportFileSystem = exportRootDataDir.getFileSystem(hiveConf); } - public void export(ReplicationSpec forReplicationSpec) throws Exception { - if (forReplicationSpec.isLazy()) { - exportFilesAsList(); - } else { + public void export(boolean isExportTask) throws Exception { + if (isExportTask) { copyFiles(); + } else { + validateSrcPathListExists(); } } @@ -112,10 +105,10 @@ public class FileOperations { fromPath = dataFileSystem.makeQualified(fromPath); List<Path> validPaths = new ArrayList<>(), dirsWithOriginals = new ArrayList<>(); HiveInputFormat.processPathsForMmRead(dataPathList, - hiveConf, ids, validPaths, dirsWithOriginals); + hiveConf, ids, validPaths, dirsWithOriginals); String fromPathStr = fromPath.toString(); if (!fromPathStr.endsWith(Path.SEPARATOR)) { - fromPathStr += Path.SEPARATOR; + fromPathStr += Path.SEPARATOR; } for (Path validPath : validPaths) { // Export valid directories with a modified name so they don't look like bases/deltas. @@ -134,75 +127,19 @@ public class FileOperations { srcPaths.add(fileStatus.getPath()); } Utilities.FILE_OP_LOGGER.debug("Exporting originals from {} to {}", - dirWithOriginals, exportRootDataDir); + dirWithOriginals, exportRootDataDir); new CopyUtils(distCpDoAsUser, hiveConf, exportRootDataDir.getFileSystem(hiveConf)). doCopy(exportRootDataDir, srcPaths); } } } - /** - * This needs the root data directory to which the data needs to be exported to. - * The data export here is a list of files either in table/partition that are written to the _files - * in the exportRootDataDir provided. - */ - private void exportFilesAsList() throws SemanticException, IOException, LoginException { - if (dataPathList.isEmpty()) { - return; - } - boolean done = false; - int repeat = 0; - while (!done) { - // This is only called for replication that handles MM tables; no need for mmCtx. - try (BufferedWriter writer = writer()) { - for (Path dataPath : dataPathList) { - writeFilesList(listFilesInDir(dataPath), writer, AcidUtils.getAcidSubDir(dataPath)); - } - done = true; - } catch (IOException e) { - if (e instanceof FileNotFoundException) { - logger.error("exporting data files in dir : " + dataPathList + " to " + exportRootDataDir + " failed"); - throw new FileNotFoundException(FILE_NOT_FOUND.format(e.getMessage())); - } - repeat++; - logger.info("writeFilesList failed", e); - if (repeat >= FileUtils.MAX_IO_ERROR_RETRY) { - logger.error("exporting data files in dir : " + dataPathList + " to " + exportRootDataDir + " failed"); - throw new IOException(ErrorMsg.REPL_FILE_SYSTEM_OPERATION_RETRY.getMsg()); - } - - int sleepTime = FileUtils.getSleepTime(repeat - 1); - logger.info(" sleep for {} milliseconds for retry num {} ", sleepTime , repeat); - try { - Thread.sleep(sleepTime); - } catch (InterruptedException timerEx) { - logger.info("thread sleep interrupted", timerEx.getMessage()); - } - - // in case of io error, reset the file system object - FileSystem.closeAllForUGI(Utils.getUGI()); - dataFileSystem = dataPathList.get(0).getFileSystem(hiveConf); - exportFileSystem = exportRootDataDir.getFileSystem(hiveConf); - Path exportPath = new Path(exportRootDataDir, EximUtil.FILES_NAME); - if (exportFileSystem.exists(exportPath)) { - exportFileSystem.delete(exportPath, true); - } - } - } - } - - private void writeFilesList(FileStatus[] fileStatuses, BufferedWriter writer, String encodedSubDirs) - throws IOException { - for (FileStatus fileStatus : fileStatuses) { - if (fileStatus.isDirectory()) { - // Write files inside the sub-directory. - Path subDir = fileStatus.getPath(); - writeFilesList(listFilesInDir(subDir), writer, encodedSubDir(encodedSubDirs, subDir)); - } else { - writer.write(encodedUri(fileStatus, encodedSubDirs)); - writer.newLine(); - } + public Path getPathWithSchemeAndAuthority(Path targetFilePath, Path currentFilePath) { + if (targetFilePath.toUri().getScheme() == null) { + URI currentURI = currentFilePath.toUri(); + targetFilePath = new Path(currentURI.getScheme(), currentURI.getAuthority(), targetFilePath.toUri().getPath()); } + return targetFilePath; } private FileStatus[] listFilesInDir(Path path) throws IOException { @@ -212,30 +149,20 @@ public class FileOperations { }); } - private BufferedWriter writer() throws IOException { - Path exportToFile = new Path(exportRootDataDir, EximUtil.FILES_NAME); - if (exportFileSystem.exists(exportToFile)) { - throw new IllegalArgumentException( - exportToFile.toString() + " already exists and cant export data from path(dir) " - + dataPathList); + /** + * Since the bootstrap will do table directory level copy, need to check for existence of src path. + */ + private void validateSrcPathListExists() throws IOException, LoginException { + if (dataPathList.isEmpty()) { + return; } - logger.debug("exporting data files in dir : " + dataPathList + " to " + exportToFile); - return new BufferedWriter( - new OutputStreamWriter(exportFileSystem.create(exportToFile)) - ); - } - - private String encodedSubDir(String encodedParentDirs, Path subDir) { - if (null == encodedParentDirs) { - return subDir.getName(); - } else { - return encodedParentDirs + Path.SEPARATOR + subDir.getName(); + try { + for (Path dataPath : dataPathList) { + listFilesInDir(dataPath); + } + } catch (FileNotFoundException e) { + logger.error("exporting data files in dir : " + dataPathList + " to " + exportRootDataDir + " failed"); + throw new FileNotFoundException(FILE_NOT_FOUND.format(e.getMessage())); } } - - private String encodedUri(FileStatus fileStatus, String encodedSubDir) throws IOException { - Path currentDataFilePath = fileStatus.getPath(); - String checkSum = ReplChangeManager.checksumFor(currentDataFilePath, dataFileSystem); - return ReplChangeManager.encodeFileUri(currentDataFilePath.toString(), checkSum, encodedSubDir); - } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java index 9f90a36..2651ea4 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java @@ -127,8 +127,8 @@ public class TestReplDumpTask { private int tableDumpCount = 0; @Override - void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, - long lastReplId, Hive hiveDb, HiveWrapper.Tuple<Table> tuple) + void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, Path replDataDir, + long lastReplId, Hive hiveDb, HiveWrapper.Tuple<Table> tuple) throws Exception { tableDumpCount++; if (tableDumpCount > 1) {