Repository: hive Updated Branches: refs/heads/master 0dbb896cf -> b3ef75eaa
http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java index f05c231..dac20d2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hive.ql.parse.repl.dump.io; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; @@ -29,12 +28,16 @@ import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.thrift.TException; import org.apache.thrift.TSerializer; import org.apache.thrift.protocol.TJSONProtocol; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Map; public class TableSerializer implements JsonWriter.Serializer { public static final String FIELD_NAME = "table"; + private static final Logger LOG = LoggerFactory.getLogger(TableSerializer.class); + private final org.apache.hadoop.hive.ql.metadata.Table tableHandle; private final Iterable<Partition> partitions; private final HiveConf hiveConf; @@ -53,8 +56,9 @@ public class TableSerializer implements JsonWriter.Serializer { return; } - Table tTable = tableHandle.getTTable(); - tTable = updatePropertiesInTable(tTable, additionalPropertiesProvider); + Table tTable = updatePropertiesInTable( + tableHandle.getTTable(), additionalPropertiesProvider + ); try { TSerializer serializer = new TSerializer(new TJSONProtocol.Factory()); writer.jsonGenerator @@ -83,14 +87,6 @@ public class TableSerializer implements JsonWriter.Serializer { ReplicationSpec.KEY.CURR_STATE_ID.toString(), additionalPropertiesProvider.getCurrentReplicationState()); } - if (isExternalTable(table)) { - // Replication destination will not be external - override if set - table.putToParameters("EXTERNAL", "FALSE"); - } - if (isExternalTableType(table)) { - // Replication dest will not be external - override if set - table.setTableType(TableType.MANAGED_TABLE.toString()); - } } else { // ReplicationSpec.KEY scopeKey = ReplicationSpec.KEY.REPL_SCOPE; // write(out, ",\""+ scopeKey.toString() +"\":\"" + replicationSpec.get(scopeKey) + "\""); @@ -101,17 +97,6 @@ public class TableSerializer implements JsonWriter.Serializer { return table; } - private boolean isExternalTableType(org.apache.hadoop.hive.metastore.api.Table table) { - return table.isSetTableType() - && table.getTableType().equalsIgnoreCase(TableType.EXTERNAL_TABLE.toString()); - } - - private boolean isExternalTable(org.apache.hadoop.hive.metastore.api.Table table) { - Map<String, String> params = table.getParameters(); - return params.containsKey("EXTERNAL") - && params.get("EXTERNAL").equalsIgnoreCase("TRUE"); - } - private void writePartitions(JsonWriter writer, ReplicationSpec additionalPropertiesProvider) throws SemanticException, IOException { writer.jsonGenerator.writeStartArray(); http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/MetadataJson.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/MetadataJson.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/MetadataJson.java index 9907133..b04fdef 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/MetadataJson.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/MetadataJson.java @@ -36,7 +36,6 @@ import org.json.JSONArray; import org.json.JSONException; import org.json.JSONObject; -import javax.annotation.Nullable; import java.util.ArrayList; import java.util.List; @@ -99,14 +98,7 @@ public class MetadataJson { } private ReplicationSpec readReplicationSpec() { - com.google.common.base.Function<String, String> keyFetcher = - new com.google.common.base.Function<String, String>() { - @Override - public String apply(@Nullable String s) { - return jsonEntry(s); - } - }; - return new ReplicationSpec(keyFetcher); + return new ReplicationSpec(this::jsonEntry); } private void checkCompatibility() throws SemanticException, JSONException { http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/InsertHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/InsertHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/InsertHandler.java index d412fd7..fe89ab2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/InsertHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/InsertHandler.java @@ -17,17 +17,41 @@ */ package org.apache.hadoop.hive.ql.parse.repl.load.message; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.messaging.InsertMessage; import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.repl.load.MetaData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.Serializable; +import java.util.Collections; import java.util.List; public class InsertHandler extends AbstractMessageHandler { + private static final Logger LOG = LoggerFactory.getLogger(InsertHandler.class); + @Override public List<Task<? extends Serializable>> handle(Context withinContext) throws SemanticException { + try { + FileSystem fs = + FileSystem.get(new Path(withinContext.location).toUri(), withinContext.hiveConf); + MetaData metaData = + EximUtil.readMetaData(fs, new Path(withinContext.location, EximUtil.METADATA_NAME)); + ReplicationSpec replicationSpec = metaData.getReplicationSpec(); + if (replicationSpec.isNoop()) { + return Collections.emptyList(); + } + } catch (Exception e) { + LOG.error("failed to load insert event", e); + throw new SemanticException(e); + } + InsertMessage insertMessage = deserializer.getInsertMessage(withinContext.dmd.getPayload()); String actualDbName = withinContext.isDbNameEmpty() ? insertMessage.getDB() : withinContext.dbName; http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java index cdf51dd..4ae4894 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java @@ -89,6 +89,10 @@ public interface MessageHandler { return StringUtils.isEmpty(dbName); } + /** + * not sure why we have this, this should always be read from the _metadata file via the + * {@link org.apache.hadoop.hive.ql.parse.repl.load.MetadataJson#readReplicationSpec} + */ ReplicationSpec eventOnlyReplicationSpec() throws SemanticException { String eventId = dmd.getEventTo().toString(); return new ReplicationSpec(eventId, eventId); http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java index f5f4459..56c2abe 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java @@ -17,36 +17,56 @@ */ package org.apache.hadoop.hive.ql.parse.repl.load.message; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage; import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage; +import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables; +import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.EximUtil; -import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.SemanticException; -import org.apache.hadoop.hive.ql.plan.ReplTxnWork; +import org.apache.hadoop.hive.ql.parse.repl.DumpType; +import org.apache.hadoop.hive.ql.parse.repl.load.MetaData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.IOException; import java.io.Serializable; +import java.net.URI; import java.util.ArrayList; -import java.util.Collections; import java.util.List; -import static org.apache.hadoop.hive.ql.parse.repl.DumpType.EVENT_ALTER_PARTITION; -import static org.apache.hadoop.hive.ql.parse.repl.DumpType.EVENT_ALTER_TABLE; - public class TableHandler extends AbstractMessageHandler { + private static final long DEFAULT_WRITE_ID = 0L; + private static final Logger LOG = LoggerFactory.getLogger(TableHandler.class); + @Override public List<Task<? extends Serializable>> handle(Context context) throws SemanticException { try { List<Task<? extends Serializable>> importTasks = new ArrayList<>(); - long writeId = 0; + boolean isExternal = false, isLocationSet = false; + String parsedLocation = null; - if (context.dmd.getDumpType().equals(EVENT_ALTER_TABLE)) { - AlterTableMessage message = deserializer.getAlterTableMessage(context.dmd.getPayload()); - writeId = message.getWriteId(); - } else if (context.dmd.getDumpType().equals(EVENT_ALTER_PARTITION)) { - AlterPartitionMessage message = deserializer.getAlterPartitionMessage(context.dmd.getPayload()); - writeId = message.getWriteId(); + DumpType eventType = context.dmd.getDumpType(); + Tuple tuple = extract(context); + if (tuple.isExternalTable) { + URI fromURI = EximUtil.getValidatedURI(context.hiveConf, context.location); + Path fromPath = new Path(fromURI.getScheme(), fromURI.getAuthority(), fromURI.getPath()); + isLocationSet = true; + isExternal = true; + FileSystem fs = FileSystem.get(fromURI, context.hiveConf); + try { + MetaData rv = EximUtil.readMetaData(fs, new Path(fromPath, EximUtil.METADATA_NAME)); + Table table = new Table(rv.getTable()); + parsedLocation = ReplExternalTables + .externalTableLocation(context.hiveConf, table.getSd().getLocation()); + } catch (IOException e) { + throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(), e); + } } context.nestedContext.setConf(context.hiveConf); @@ -54,13 +74,13 @@ public class TableHandler extends AbstractMessageHandler { new EximUtil.SemanticAnalyzerWrapperContext( context.hiveConf, context.db, readEntitySet, writeEntitySet, importTasks, context.log, context.nestedContext); - x.setEventType(context.dmd.getDumpType()); + x.setEventType(eventType); // REPL LOAD is not partition level. It is always DB or table level. So, passing null for partition specs. // Also, REPL LOAD doesn't support external table and hence no location set as well. - ImportSemanticAnalyzer.prepareImport(false, false, false, false, - (context.precursor != null), null, context.tableName, context.dbName, - null, context.location, x, updatedMetadata, context.getTxnMgr(), writeId); + ImportSemanticAnalyzer.prepareImport(false, isLocationSet, isExternal, false, + (context.precursor != null), parsedLocation, context.tableName, context.dbName, + null, context.location, x, updatedMetadata, context.getTxnMgr(), tuple.writeId); Task<? extends Serializable> openTxnTask = x.getOpenTxnTask(); if (openTxnTask != null && !importTasks.isEmpty()) { @@ -71,8 +91,57 @@ public class TableHandler extends AbstractMessageHandler { } return importTasks; + } catch (RuntimeException e){ + throw e; + } catch (Exception e) { + throw new SemanticException(e); + } + } + + private Tuple extract(Context context) throws SemanticException { + try { + String tableType = null; + long writeId = DEFAULT_WRITE_ID; + switch (context.dmd.getDumpType()) { + case EVENT_CREATE_TABLE: + case EVENT_ADD_PARTITION: + Path metadataPath = new Path(context.location, EximUtil.METADATA_NAME); + MetaData rv = EximUtil.readMetaData( + metadataPath.getFileSystem(context.hiveConf), + metadataPath + ); + tableType = rv.getTable().getTableType(); + break; + case EVENT_ALTER_TABLE: + AlterTableMessage alterTableMessage = + deserializer.getAlterTableMessage(context.dmd.getPayload()); + tableType = alterTableMessage.getTableObjAfter().getTableType(); + writeId = alterTableMessage.getWriteId(); + break; + case EVENT_ALTER_PARTITION: + AlterPartitionMessage msg = deserializer.getAlterPartitionMessage(context.dmd.getPayload()); + tableType = msg.getTableObj().getTableType(); + writeId = msg.getWriteId(); + break; + default: + break; + } + boolean isExternalTable = tableType != null + && TableType.EXTERNAL_TABLE.equals(Enum.valueOf(TableType.class, tableType)); + return new Tuple(isExternalTable, writeId); } catch (Exception e) { + LOG.error("failed to determine if the table associated with the event is external or not", e); throw new SemanticException(e); } } + + private static final class Tuple { + private final boolean isExternalTable; + private final long writeId; + + private Tuple(boolean isExternalTable, long writeId) { + this.isExternalTable = isExternalTable; + this.writeId = writeId; + } + } } http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java index 9e5c071..39f342f 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java @@ -21,6 +21,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.repl.dump.HiveWrapper; import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.junit.Test; import org.junit.runner.RunWith; @@ -44,9 +46,11 @@ import static org.mockito.Mockito.mock; import static org.powermock.api.mockito.PowerMockito.mockStatic; import static org.powermock.api.mockito.PowerMockito.verifyStatic; import static org.powermock.api.mockito.PowerMockito.when; +import static org.powermock.api.mockito.PowerMockito.whenNew; +import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.Writer; @RunWith(PowerMockRunner.class) -@PrepareForTest({ Utils.class }) +@PrepareForTest({ Utils.class, ReplDumpTask.class}) @PowerMockIgnore({ "javax.management.*" }) public class TestReplDumpTask { @@ -111,12 +115,17 @@ public class TestReplDumpTask { when(hive.getAllFunctions()).thenReturn(Collections.emptyList()); when(queryState.getConf()).thenReturn(conf); when(conf.getLong("hive.repl.last.repl.id", -1L)).thenReturn(1L); + when(conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES)).thenReturn(false); + + whenNew(Writer.class).withAnyArguments().thenReturn(mock(Writer.class)); + whenNew(HiveWrapper.class).withAnyArguments().thenReturn(mock(HiveWrapper.class)); ReplDumpTask task = new StubReplDumpTask() { private int tableDumpCount = 0; @Override - void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, long lastReplId, Hive hiveDb) + void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, + long lastReplId, Hive hiveDb, HiveWrapper.Tuple<Table> tuple) throws Exception { tableDumpCount++; if (tableDumpCount > 1) { http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/test/queries/clientpositive/repl_2_exim_basic.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/repl_2_exim_basic.q b/ql/src/test/queries/clientpositive/repl_2_exim_basic.q index 5b75ca8..b9119a9 100644 --- a/ql/src/test/queries/clientpositive/repl_2_exim_basic.q +++ b/ql/src/test/queries/clientpositive/repl_2_exim_basic.q @@ -68,7 +68,6 @@ show create table ext_t_imported; select * from ext_t_imported; -- should have repl.last.id --- also - importing an external table replication export would turn the new table into a managed table import table ext_t_r_imported from 'ql/test/data/exports/ext_t_r'; describe extended ext_t_imported; show table extended like ext_t_r_imported; http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/test/results/clientpositive/repl_2_exim_basic.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/repl_2_exim_basic.q.out b/ql/src/test/results/clientpositive/repl_2_exim_basic.q.out index 40b6ad7..950b5e4 100644 --- a/ql/src/test/results/clientpositive/repl_2_exim_basic.q.out +++ b/ql/src/test/results/clientpositive/repl_2_exim_basic.q.out @@ -411,7 +411,7 @@ PREHOOK: Input: default@ext_t_r_imported POSTHOOK: query: show create table ext_t_r_imported POSTHOOK: type: SHOW_CREATETABLE POSTHOOK: Input: default@ext_t_r_imported -CREATE TABLE `ext_t_r_imported`( +CREATE EXTERNAL TABLE `ext_t_r_imported`( `emp_id` int COMMENT 'employee id') PARTITIONED BY ( `emp_country` string, @@ -425,7 +425,6 @@ OUTPUTFORMAT LOCATION #### A masked pattern was here #### TBLPROPERTIES ( - 'EXTERNAL'='FALSE', 'bucketing_version'='2', 'discover.partitions'='true', 'repl.last.id'='0', http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/testutils/ptest2/conf/deployed/master-mr2.properties ---------------------------------------------------------------------- diff --git a/testutils/ptest2/conf/deployed/master-mr2.properties b/testutils/ptest2/conf/deployed/master-mr2.properties index 6f0056a..9166f4a 100644 --- a/testutils/ptest2/conf/deployed/master-mr2.properties +++ b/testutils/ptest2/conf/deployed/master-mr2.properties @@ -68,7 +68,7 @@ ut.service.batchSize=8 unitTests.module.itests.hive-unit=itests.hive-unit ut.itests.hive-unit.batchSize=9 -ut.itests.hive-unit.skipBatching=TestAcidOnTezWithSplitUpdate TestAcidOnTez TestMTQueries TestCompactor TestSchedulerQueue TestOperationLoggingAPIWithTez TestSSL TestJdbcDriver2 TestJdbcWithMiniHA TestJdbcWithMiniMr TestReplicationScenariosIncrementalLoadAcidTables TestReplIncrementalLoadAcidTablesWithJsonMessage TestReplicationScenarios TestReplWithJsonMessageFormat +ut.itests.hive-unit.skipBatching=TestAcidOnTezWithSplitUpdate TestAcidOnTez TestMTQueries TestCompactor TestSchedulerQueue TestOperationLoggingAPIWithTez TestSSL TestJdbcDriver2 TestJdbcWithMiniHA TestJdbcWithMiniMr TestReplicationScenariosIncrementalLoadAcidTables TestReplIncrementalLoadAcidTablesWithJsonMessage TestReplicationScenarios TestReplWithJsonMessageFormat TestReplWithJsonMessageFormat unitTests.module.itests.qtest=itests.qtest ut.itests.qtest.batchSize=9