HIVE-18781: Create/Replicate Open, Commit (without writes) and Abort Txn events (Mahesh Kumar Behera, reviewed by Sankar Hariappan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/59483bca Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/59483bca Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/59483bca Branch: refs/heads/master Commit: 59483bca262880d3e7ef1b873d3c21176e9294cb Parents: 9e98d59 Author: Sankar Hariappan <sank...@apache.org> Authored: Mon Apr 2 10:02:09 2018 +0530 Committer: Sankar Hariappan <sank...@apache.org> Committed: Mon Apr 2 10:02:09 2018 +0530 ---------------------------------------------------------------------- .../listener/DbNotificationListener.java | 152 +- .../listener/TestDbNotificationListener.java | 15 + .../TestReplicationScenariosAcidTables.java | 184 + .../hadoop/hive/ql/parse/WarehouseInstance.java | 21 +- .../compactor/TestCleanerWithReplication.java | 1 + .../upgrade/derby/054-HIVE-18781.derby.sql | 9 + .../upgrade/derby/hive-schema-3.0.0.derby.sql | 2 + .../derby/hive-txn-schema-3.0.0.derby.sql | 7 + .../derby/upgrade-2.3.0-to-3.0.0.derby.sql | 1 + ql/if/queryplan.thrift | 3 +- ql/src/gen/thrift/gen-cpp/queryplan_types.cpp | 8 +- ql/src/gen/thrift/gen-cpp/queryplan_types.h | 3 +- .../hadoop/hive/ql/plan/api/StageType.java | 5 +- ql/src/gen/thrift/gen-php/Types.php | 2 + ql/src/gen/thrift/gen-py/queryplan/ttypes.py | 3 + ql/src/gen/thrift/gen-rb/queryplan_types.rb | 5 +- .../apache/hadoop/hive/ql/exec/ReplTxnTask.java | 92 + .../apache/hadoop/hive/ql/exec/ReplTxnWork.java | 91 + .../apache/hadoop/hive/ql/exec/TaskFactory.java | 1 + .../org/apache/hadoop/hive/ql/io/AcidUtils.java | 10 + .../hadoop/hive/ql/lockmgr/DbTxnManager.java | 40 + .../hadoop/hive/ql/lockmgr/DummyTxnManager.java | 15 + .../hadoop/hive/ql/lockmgr/HiveTxnManager.java | 27 +- .../hive/ql/parse/ImportSemanticAnalyzer.java | 8 - .../ql/parse/ReplicationSemanticAnalyzer.java | 64 +- .../hadoop/hive/ql/parse/repl/DumpType.java | 21 + .../parse/repl/dump/events/AbortTxnHandler.java | 42 + .../repl/dump/events/CommitTxnHandler.java | 43 + .../repl/dump/events/EventHandlerFactory.java | 3 + .../parse/repl/dump/events/OpenTxnHandler.java | 42 + .../repl/load/message/AbortTxnHandler.java | 53 + .../repl/load/message/CommitTxnHandler.java | 53 + .../parse/repl/load/message/OpenTxnHandler.java | 52 + .../hive/metastore/txn/TestTxnHandler.java | 21 + .../gen/thrift/gen-cpp/ThriftHiveMetastore.cpp | 2258 ++++++------ .../gen/thrift/gen-cpp/hive_metastore_types.cpp | 3222 +++++++++--------- .../gen/thrift/gen-cpp/hive_metastore_types.h | 50 +- .../hive/metastore/api/AbortTxnRequest.java | 112 +- .../hive/metastore/api/AbortTxnsRequest.java | 32 +- .../metastore/api/AddDynamicPartitions.java | 32 +- .../api/AllocateTableWriteIdsRequest.java | 32 +- .../api/AllocateTableWriteIdsResponse.java | 36 +- .../metastore/api/ClearFileMetadataRequest.java | 32 +- .../hive/metastore/api/ClientCapabilities.java | 32 +- .../hive/metastore/api/CommitTxnRequest.java | 112 +- .../hive/metastore/api/CompactionRequest.java | 44 +- .../hive/metastore/api/CreationMetadata.java | 32 +- .../metastore/api/FindSchemasByColsResp.java | 36 +- .../hive/metastore/api/FireEventRequest.java | 32 +- .../metastore/api/GetAllFunctionsResponse.java | 36 +- .../api/GetFileMetadataByExprRequest.java | 32 +- .../api/GetFileMetadataByExprResult.java | 48 +- .../metastore/api/GetFileMetadataRequest.java | 32 +- .../metastore/api/GetFileMetadataResult.java | 44 +- .../hive/metastore/api/GetTablesRequest.java | 32 +- .../hive/metastore/api/GetTablesResult.java | 36 +- .../metastore/api/GetValidWriteIdsRequest.java | 32 +- .../metastore/api/GetValidWriteIdsResponse.java | 36 +- .../api/HeartbeatTxnRangeResponse.java | 64 +- .../metastore/api/InsertEventRequestData.java | 64 +- .../hadoop/hive/metastore/api/LockRequest.java | 36 +- .../hive/metastore/api/Materialization.java | 32 +- .../api/NotificationEventResponse.java | 36 +- .../hive/metastore/api/OpenTxnRequest.java | 269 +- .../hive/metastore/api/OpenTxnsResponse.java | 32 +- .../metastore/api/PutFileMetadataRequest.java | 64 +- .../hive/metastore/api/SchemaVersion.java | 36 +- .../hive/metastore/api/ShowCompactResponse.java | 36 +- .../hive/metastore/api/ShowLocksResponse.java | 36 +- .../hive/metastore/api/TableValidWriteIds.java | 32 +- .../hive/metastore/api/ThriftHiveMetastore.java | 2476 +++++++------- .../hive/metastore/api/WMFullResourcePlan.java | 144 +- .../api/WMGetAllResourcePlanResponse.java | 36 +- .../WMGetTriggersForResourePlanResponse.java | 36 +- .../api/WMValidateResourcePlanResponse.java | 64 +- .../gen-php/metastore/ThriftHiveMetastore.php | 1394 ++++---- .../src/gen/thrift/gen-php/metastore/Types.php | 946 ++--- .../hive_metastore/ThriftHiveMetastore.py | 940 ++--- .../gen/thrift/gen-py/hive_metastore/ttypes.py | 604 ++-- .../gen/thrift/gen-rb/hive_metastore_types.rb | 14 +- .../hadoop/hive/metastore/HiveMetaStore.java | 26 +- .../hive/metastore/HiveMetaStoreClient.java | 46 +- .../hadoop/hive/metastore/IMetaStoreClient.java | 37 + .../hive/metastore/MetaStoreEventListener.java | 31 +- .../metastore/MetaStoreListenerNotifier.java | 23 + .../hive/metastore/events/AbortTxnEvent.java | 83 + .../hive/metastore/events/CommitTxnEvent.java | 83 + .../hive/metastore/events/ListenerEvent.java | 17 +- .../hive/metastore/events/OpenTxnEvent.java | 83 + .../metastore/messaging/AbortTxnMessage.java | 36 + .../metastore/messaging/CommitTxnMessage.java | 36 + .../hive/metastore/messaging/EventMessage.java | 5 +- .../messaging/MessageDeserializer.java | 21 + .../metastore/messaging/MessageFactory.java | 29 +- .../metastore/messaging/OpenTxnMessage.java | 38 + .../event/filters/DatabaseAndTableFilter.java | 12 +- .../messaging/json/JSONAbortTxnMessage.java | 87 + .../messaging/json/JSONCommitTxnMessage.java | 87 + .../messaging/json/JSONMessageDeserializer.java | 30 + .../messaging/json/JSONMessageFactory.java | 18 + .../messaging/json/JSONOpenTxnMessage.java | 104 + .../hadoop/hive/metastore/txn/TxnDbUtil.java | 66 + .../hadoop/hive/metastore/txn/TxnHandler.java | 157 +- .../src/main/resources/package.jdo | 2 + .../main/sql/derby/hive-schema-3.0.0.derby.sql | 8 + .../sql/derby/upgrade-2.3.0-to-3.0.0.derby.sql | 10 + .../main/sql/mssql/hive-schema-3.0.0.mssql.sql | 21 + .../sql/mssql/upgrade-2.3.0-to-3.0.0.mssql.sql | 22 + .../main/sql/mysql/hive-schema-3.0.0.mysql.sql | 9 + .../sql/mysql/upgrade-2.3.0-to-3.0.0.mysql.sql | 10 + .../sql/oracle/hive-schema-3.0.0.oracle.sql | 11 + .../oracle/upgrade-2.3.0-to-3.0.0.oracle.sql | 10 + .../sql/postgres/hive-schema-3.0.0.postgres.sql | 8 + .../upgrade-2.3.0-to-3.0.0.postgres.sql | 10 + .../src/main/thrift/hive_metastore.thrift | 4 + .../HiveMetaStoreClientPreCatalog.java | 42 +- 116 files changed, 9698 insertions(+), 6536 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java ---------------------------------------------------------------------- diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java index 8523428..9480145 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java @@ -18,6 +18,10 @@ package org.apache.hive.hcatalog.listener; import java.io.IOException; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.concurrent.TimeUnit; @@ -65,16 +69,21 @@ import org.apache.hadoop.hive.metastore.events.DropPartitionEvent; import org.apache.hadoop.hive.metastore.events.DropTableEvent; import org.apache.hadoop.hive.metastore.events.InsertEvent; import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent; +import org.apache.hadoop.hive.metastore.events.OpenTxnEvent; +import org.apache.hadoop.hive.metastore.events.CommitTxnEvent; +import org.apache.hadoop.hive.metastore.events.AbortTxnEvent; import org.apache.hadoop.hive.metastore.events.ListenerEvent; import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType; import org.apache.hadoop.hive.metastore.messaging.MessageFactory; +import org.apache.hadoop.hive.metastore.messaging.OpenTxnMessage; import org.apache.hadoop.hive.metastore.messaging.PartitionFiles; +import org.apache.hadoop.hive.metastore.tools.SQLGenerator; +import org.apache.hadoop.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import com.google.common.collect.Lists; - import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_CATALOG_NAME; +import static org.apache.hadoop.hive.metastore.DatabaseProduct.MYSQL; /** * An implementation of {@link org.apache.hadoop.hive.metastore.MetaStoreEventListener} that @@ -438,6 +447,49 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener process(event, insertEvent); } + @Override + public void onOpenTxn(OpenTxnEvent openTxnEvent) throws MetaException { + int lastTxnIdx = openTxnEvent.getTxnIds().size() - 1; + OpenTxnMessage msg = msgFactory.buildOpenTxnMessage(openTxnEvent.getTxnIds().get(0), + openTxnEvent.getTxnIds().get(lastTxnIdx)); + NotificationEvent event = + new NotificationEvent(0, now(), EventType.OPEN_TXN.toString(), msg.toString()); + + try { + addNotificationLog(event, openTxnEvent); + } catch (SQLException e) { + throw new MetaException("Unable to execute direct SQL " + StringUtils.stringifyException(e)); + } + } + + @Override + public void onCommitTxn(CommitTxnEvent commitTxnEvent) throws MetaException { + NotificationEvent event = + new NotificationEvent(0, now(), EventType.COMMIT_TXN.toString(), msgFactory.buildCommitTxnMessage( + commitTxnEvent.getTxnId()) + .toString()); + + try { + addNotificationLog(event, commitTxnEvent); + } catch (SQLException e) { + throw new MetaException("Unable to execute direct SQL " + StringUtils.stringifyException(e)); + } + } + + @Override + public void onAbortTxn(AbortTxnEvent abortTxnEvent) throws MetaException { + NotificationEvent event = + new NotificationEvent(0, now(), EventType.ABORT_TXN.toString(), msgFactory.buildAbortTxnMessage( + abortTxnEvent.getTxnId()) + .toString()); + + try { + addNotificationLog(event, abortTxnEvent); + } catch (SQLException e) { + throw new MetaException("Unable to execute direct SQL " + StringUtils.stringifyException(e)); + } + } + /** * @param partSetDoneEvent * @throws MetaException @@ -549,6 +601,102 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener return (int)millis; } + static String quoteString(String input) { + return "'" + input + "'"; + } + + private void addNotificationLog(NotificationEvent event, ListenerEvent listenerEvent) + throws MetaException, SQLException { + if ((listenerEvent.getConnection() == null) || (listenerEvent.getSqlGenerator() == null)) { + LOG.info("connection or sql generator is not set so executing sql via DN"); + process(event, listenerEvent); + return; + } + Statement stmt = null; + ResultSet rs = null; + try { + stmt = listenerEvent.getConnection().createStatement(); + SQLGenerator sqlGenerator = listenerEvent.getSqlGenerator(); + event.setMessageFormat(msgFactory.getMessageFormat()); + + if (sqlGenerator.getDbProduct() == MYSQL) { + stmt.execute("SET @@session.sql_mode=ANSI_QUOTES"); + } + + String s = sqlGenerator.addForUpdateClause("select \"NEXT_EVENT_ID\" " + + " from \"NOTIFICATION_SEQUENCE\""); + LOG.debug("Going to execute query <" + s + ">"); + rs = stmt.executeQuery(s); + if (!rs.next()) { + throw new MetaException("Transaction database not properly " + + "configured, can't find next event id."); + } + long nextEventId = rs.getLong(1); + long updatedEventid = nextEventId + 1; + s = "update \"NOTIFICATION_SEQUENCE\" set \"NEXT_EVENT_ID\" = " + updatedEventid; + LOG.debug("Going to execute update <" + s + ">"); + stmt.executeUpdate(s); + + s = sqlGenerator.addForUpdateClause("select \"NEXT_VAL\" from " + + "\"SEQUENCE_TABLE\" where \"SEQUENCE_NAME\" = " + + " 'org.apache.hadoop.hive.metastore.model.MNotificationLog'"); + LOG.debug("Going to execute query <" + s + ">"); + rs = stmt.executeQuery(s); + if (!rs.next()) { + throw new MetaException("failed to get next NEXT_VAL from SEQUENCE_TABLE"); + } + + long nextNLId = rs.getLong(1); + long updatedNLId = nextNLId + 1; + s = "update \"SEQUENCE_TABLE\" set \"NEXT_VAL\" = " + updatedNLId + " where \"SEQUENCE_NAME\" = " + + + " 'org.apache.hadoop.hive.metastore.model.MNotificationLog'"; + LOG.debug("Going to execute update <" + s + ">"); + stmt.executeUpdate(s); + + List<String> insert = new ArrayList<>(); + + insert.add(0, nextNLId + "," + nextEventId + "," + now() + "," + + quoteString(event.getEventType()) + "," + quoteString(event.getDbName()) + "," + + quoteString(" ") + "," + quoteString(event.getMessage()) + "," + + quoteString(event.getMessageFormat())); + + List<String> sql = sqlGenerator.createInsertValuesStmt( + "\"NOTIFICATION_LOG\" (\"NL_ID\", \"EVENT_ID\", \"EVENT_TIME\", " + + " \"EVENT_TYPE\", \"DB_NAME\"," + + " \"TBL_NAME\", \"MESSAGE\", \"MESSAGE_FORMAT\")", insert); + for (String q : sql) { + LOG.info("Going to execute insert <" + q + ">"); + stmt.execute(q); + } + + // Set the DB_NOTIFICATION_EVENT_ID for future reference by other listeners. + if (event.isSetEventId()) { + listenerEvent.putParameter( + MetaStoreEventListenerConstants.DB_NOTIFICATION_EVENT_ID_KEY_NAME, + Long.toString(event.getEventId())); + } + } catch (SQLException e) { + LOG.warn("failed to add notification log" + e.getMessage()); + throw e; + } finally { + if (stmt != null && !stmt.isClosed()) { + try { + stmt.close(); + } catch (SQLException e) { + LOG.warn("Failed to close statement " + e.getMessage()); + } + } + if (rs != null && !rs.isClosed()) { + try { + rs.close(); + } catch (SQLException e) { + LOG.warn("Failed to close result set " + e.getMessage()); + } + } + } + } + /** * Process this notification by adding it to metastore DB. * http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java ---------------------------------------------------------------------- diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java index e0e2965..5459554 100644 --- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java +++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java @@ -70,6 +70,9 @@ import org.apache.hadoop.hive.metastore.events.DropFunctionEvent; import org.apache.hadoop.hive.metastore.events.DropPartitionEvent; import org.apache.hadoop.hive.metastore.events.DropTableEvent; import org.apache.hadoop.hive.metastore.events.InsertEvent; +import org.apache.hadoop.hive.metastore.events.OpenTxnEvent; +import org.apache.hadoop.hive.metastore.events.CommitTxnEvent; +import org.apache.hadoop.hive.metastore.events.AbortTxnEvent; import org.apache.hadoop.hive.metastore.events.ListenerEvent; import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage; import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage; @@ -218,6 +221,18 @@ public class TestDbNotificationListener { public void onInsert(InsertEvent insertEvent) throws MetaException { pushEventId(EventType.INSERT, insertEvent); } + + public void onOpenTxn(OpenTxnEvent openTxnEvent) throws MetaException { + pushEventId(EventType.OPEN_TXN, openTxnEvent); + } + + public void onCommitTxn(CommitTxnEvent commitTxnEvent) throws MetaException { + pushEventId(EventType.COMMIT_TXN, commitTxnEvent); + } + + public void onAbortTxn(AbortTxnEvent abortTxnEvent) throws MetaException { + pushEventId(EventType.ABORT_TXN, abortTxnEvent); + } } @SuppressWarnings("rawtypes") http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java new file mode 100644 index 0000000..cac9922 --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hive.shims.Utils; +import org.junit.rules.TestName; +import org.junit.rules.TestRule; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.BeforeClass; +import org.junit.AfterClass; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; + +/** + * TestReplicationScenariosAcidTables - test replication for ACID tables + */ +public class TestReplicationScenariosAcidTables { + @Rule + public final TestName testName = new TestName(); + + @Rule + public TestRule replV1BackwardCompat; + + protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class); + private static WarehouseInstance primary, replica, replicaNonAcid; + private String primaryDbName, replicatedDbName; + + @BeforeClass + public static void classLevelSetup() throws Exception { + Configuration conf = new Configuration(); + conf.set("dfs.client.use.datanode.hostname", "true"); + conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*"); + MiniDFSCluster miniDFSCluster = + new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build(); + HashMap<String, String> overridesForHiveConf = new HashMap<String, String>() {{ + put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString()); + put("hive.support.concurrency", "true"); + put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager"); + }}; + primary = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf); + replica = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf); + HashMap<String, String> overridesForHiveConf1 = new HashMap<String, String>() {{ + put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString()); + put("hive.support.concurrency", "false"); + put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager"); + }}; + replicaNonAcid = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf1); + } + + @AfterClass + public static void classLevelTearDown() throws IOException { + primary.close(); + replica.close(); + } + + @Before + public void setup() throws Throwable { + replV1BackwardCompat = primary.getReplivationV1CompatRule(new ArrayList<>()); + primaryDbName = testName.getMethodName() + "_" + +System.currentTimeMillis(); + replicatedDbName = "replicated_" + primaryDbName; + primary.run("create database " + primaryDbName); + } + + @After + public void tearDown() throws Throwable { + primary.run("drop database if exists " + primaryDbName + " cascade"); + replica.run("drop database if exists " + replicatedDbName + " cascade"); + replicaNonAcid.run("drop database if exists " + replicatedDbName + " cascade"); + } + + @Test + public void testOpenTxnEvent() throws Throwable { + String tableName = testName.getMethodName(); + WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName, null); + replica.load(replicatedDbName, bootStrapDump.dumpLocation) + .run("REPL STATUS " + replicatedDbName) + .verifyResult(bootStrapDump.lastReplicationId); + + // create table will start and coomit the transaction + primary.run("CREATE TABLE " + tableName + + " (key int, value int) PARTITIONED BY (load_date date) " + + "CLUSTERED BY(key) INTO 3 BUCKETS STORED AS ORC TBLPROPERTIES ('transactional'='true')") + .run("SHOW TABLES LIKE '" + tableName + "'") + .verifyResult(tableName) + .run("INSERT INTO " + tableName + " partition (load_date='2016-03-01') VALUES (1, 1)") + .run("select key from " + tableName) + .verifyResult("1"); + + WarehouseInstance.Tuple incrementalDump = + primary.dump(primaryDbName, bootStrapDump.lastReplicationId); + replica.load(replicatedDbName, incrementalDump.dumpLocation) + .run("REPL STATUS " + replicatedDbName) + .verifyResult(incrementalDump.lastReplicationId) + .run("SHOW TABLES LIKE '" + tableName + "'") + .verifyResult(tableName); + + // Test the idempotent behavior of Open and Commit Txn + replica.load(replicatedDbName, incrementalDump.dumpLocation) + .run("REPL STATUS " + replicatedDbName) + .verifyResult(incrementalDump.lastReplicationId) + .run("SHOW TABLES LIKE '" + tableName + "'") + .verifyResult(tableName); + } + + @Test + public void testAbortTxnEvent() throws Throwable { + String tableName = testName.getMethodName(); + String tableNameFail = testName.getMethodName() + "Fail"; + WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName, null); + replica.load(replicatedDbName, bootStrapDump.dumpLocation) + .run("REPL STATUS " + replicatedDbName) + .verifyResult(bootStrapDump.lastReplicationId); + + // this should fail + primary.runFailure("CREATE TABLE " + tableNameFail + + " (key int, value int) PARTITIONED BY (load_date date) " + + "CLUSTERED BY(key) ('transactional'='true')") + .run("SHOW TABLES LIKE '" + tableNameFail + "'") + .verifyFailure(new String[]{tableNameFail}); + + WarehouseInstance.Tuple incrementalDump = + primary.dump(primaryDbName, bootStrapDump.lastReplicationId); + replica.load(replicatedDbName, incrementalDump.dumpLocation) + .run("REPL STATUS " + replicatedDbName) + .verifyResult(incrementalDump.lastReplicationId) + .run("SHOW TABLES LIKE '" + tableNameFail + "'") + .verifyFailure(new String[]{tableNameFail}); + + // Test the idempotent behavior of Abort Txn + replica.load(replicatedDbName, incrementalDump.dumpLocation) + .run("REPL STATUS " + replicatedDbName) + .verifyResult(incrementalDump.lastReplicationId) + .run("SHOW TABLES LIKE '" + tableNameFail + "'") + .verifyFailure(new String[]{tableNameFail}); + } + + @Test + public void testTxnEventNonAcid() throws Throwable { + String tableName = testName.getMethodName(); + WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName, null); + replicaNonAcid.load(replicatedDbName, bootStrapDump.dumpLocation) + .run("REPL STATUS " + replicatedDbName) + .verifyResult(bootStrapDump.lastReplicationId); + + primary.run("CREATE TABLE " + tableName + + " (key int, value int) PARTITIONED BY (load_date date) " + + "CLUSTERED BY(key) INTO 3 BUCKETS STORED AS ORC TBLPROPERTIES ('transactional'='true')") + .run("SHOW TABLES LIKE '" + tableName + "'") + .verifyResult(tableName) + .run("INSERT INTO " + tableName + + " partition (load_date='2016-03-01') VALUES (1, 1)") + .run("select key from " + tableName) + .verifyResult("1"); + + WarehouseInstance.Tuple incrementalDump = + primary.dump(primaryDbName, bootStrapDump.lastReplicationId); + replicaNonAcid.loadFailure(replicatedDbName, incrementalDump.dumpLocation) + .run("REPL STATUS " + replicatedDbName) + .verifyResult(bootStrapDump.lastReplicationId); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java index 0006ea5..fe4660c 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.MetaStoreTestUtils; +import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; import org.apache.hadoop.hive.ql.DriverFactory; import org.apache.hadoop.hive.ql.IDriver; import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork; @@ -122,7 +123,6 @@ public class WarehouseInstance implements Closeable { hiveConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3); hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); - hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); System.setProperty(HiveConf.ConfVars.PREEXECHOOKS.varname, " "); System.setProperty(HiveConf.ConfVars.POSTEXECHOOKS.varname, " "); @@ -135,6 +135,10 @@ public class WarehouseInstance implements Closeable { driver = DriverFactory.newDriver(hiveConf); SessionState.start(new CliSessionState(hiveConf)); client = new HiveMetaStoreClient(hiveConf); + + TxnDbUtil.cleanDb(hiveConf); + TxnDbUtil.prepDb(hiveConf); + // change the value for the next instance. ++uniqueIdentifier; } @@ -176,6 +180,14 @@ public class WarehouseInstance implements Closeable { return this; } + WarehouseInstance runFailure(String command) throws Throwable { + CommandProcessorResponse ret = driver.run(command); + if (ret.getException() == null) { + throw new RuntimeException("command execution passed for a invalid command" + command); + } + return this; + } + Tuple dump(String dbName, String lastReplicationId, List<String> withClauseOptions) throws Throwable { String dumpCommand = @@ -224,6 +236,13 @@ public class WarehouseInstance implements Closeable { return run(replStatusCmd); } + WarehouseInstance loadFailure(String replicatedDbName, String dumpLocation) throws Throwable { + runFailure("EXPLAIN REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "'"); + printOutput(); + runFailure("REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "'"); + return this; + } + WarehouseInstance verifyResult(String data) throws IOException { verifyResults(data == null ? new String[] {} : new String[] { data }); return this; http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java index c0751a7..597544f 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java @@ -60,6 +60,7 @@ public class TestCleanerWithReplication extends CompactorTest { TxnDbUtil.cleanDb(conf); conf.set("fs.defaultFS", fs.getUri().toString()); conf.setBoolVar(HiveConf.ConfVars.REPLCMENABLED, true); + TxnDbUtil.prepDb(conf); ms = new HiveMetaStoreClient(conf); txnHandler = TxnUtils.getTxnStore(conf); cmRootDirectory = new Path(conf.get(HiveConf.ConfVars.REPLCMDIR.varname)); http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/metastore/scripts/upgrade/derby/054-HIVE-18781.derby.sql ---------------------------------------------------------------------- diff --git a/metastore/scripts/upgrade/derby/054-HIVE-18781.derby.sql b/metastore/scripts/upgrade/derby/054-HIVE-18781.derby.sql new file mode 100644 index 0000000..8a50663 --- /dev/null +++ b/metastore/scripts/upgrade/derby/054-HIVE-18781.derby.sql @@ -0,0 +1,9 @@ + +CREATE TABLE REPL_TXN_MAP ( + RTM_REPL_POLICY varchar(256) NOT NULL, + RTM_SRC_TXN_ID bigint NOT NULL, + RTM_TARGET_TXN_ID bigint NOT NULL, + PRIMARY KEY (RTM_REPL_POLICY, RTM_SRC_TXN_ID) +); + +INSERT INTO "APP"."SEQUENCE_TABLE" ("SEQUENCE_NAME", "NEXT_VAL") SELECT * FROM (VALUES ('org.apache.hadoop.hive.metastore.model.MNotificationLog', 1)) tmp_table WHERE NOT EXISTS ( SELECT "NEXT_VAL" FROM "APP"."SEQUENCE_TABLE" WHERE "SEQUENCE_NAME" = 'org.apache.hadoop.hive.metastore.model.MNotificationLog'); http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/metastore/scripts/upgrade/derby/hive-schema-3.0.0.derby.sql ---------------------------------------------------------------------- diff --git a/metastore/scripts/upgrade/derby/hive-schema-3.0.0.derby.sql b/metastore/scripts/upgrade/derby/hive-schema-3.0.0.derby.sql index 2b1dd5b..259b105 100644 --- a/metastore/scripts/upgrade/derby/hive-schema-3.0.0.derby.sql +++ b/metastore/scripts/upgrade/derby/hive-schema-3.0.0.derby.sql @@ -74,6 +74,8 @@ CREATE TABLE "APP"."SDS" ("SD_ID" BIGINT NOT NULL, "INPUT_FORMAT" VARCHAR(4000), CREATE TABLE "APP"."SEQUENCE_TABLE" ("SEQUENCE_NAME" VARCHAR(256) NOT NULL, "NEXT_VAL" BIGINT NOT NULL); +INSERT INTO "APP"."SEQUENCE_TABLE" ("SEQUENCE_NAME", "NEXT_VAL") VALUES ('org.apache.hadoop.hive.metastore.model.MNotificationLog', 1); + RUN '022-HIVE-11107.derby.sql'; CREATE TABLE "APP"."BUCKETING_COLS" ("SD_ID" BIGINT NOT NULL, "BUCKET_COL_NAME" VARCHAR(256), "INTEGER_IDX" INTEGER NOT NULL); http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/metastore/scripts/upgrade/derby/hive-txn-schema-3.0.0.derby.sql ---------------------------------------------------------------------- diff --git a/metastore/scripts/upgrade/derby/hive-txn-schema-3.0.0.derby.sql b/metastore/scripts/upgrade/derby/hive-txn-schema-3.0.0.derby.sql index 99838b4..27d67e6 100644 --- a/metastore/scripts/upgrade/derby/hive-txn-schema-3.0.0.derby.sql +++ b/metastore/scripts/upgrade/derby/hive-txn-schema-3.0.0.derby.sql @@ -155,3 +155,10 @@ CREATE TABLE WRITE_SET ( WS_COMMIT_ID bigint NOT NULL, WS_OPERATION_TYPE char(1) NOT NULL ); + +CREATE TABLE REPL_TXN_MAP ( + RTM_REPL_POLICY varchar(256) NOT NULL, + RTM_SRC_TXN_ID bigint NOT NULL, + RTM_TARGET_TXN_ID bigint NOT NULL, + PRIMARY KEY (RTM_REPL_POLICY, RTM_SRC_TXN_ID) +); http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/metastore/scripts/upgrade/derby/upgrade-2.3.0-to-3.0.0.derby.sql ---------------------------------------------------------------------- diff --git a/metastore/scripts/upgrade/derby/upgrade-2.3.0-to-3.0.0.derby.sql b/metastore/scripts/upgrade/derby/upgrade-2.3.0-to-3.0.0.derby.sql index 1a3c00a..ee2fb67 100644 --- a/metastore/scripts/upgrade/derby/upgrade-2.3.0-to-3.0.0.derby.sql +++ b/metastore/scripts/upgrade/derby/upgrade-2.3.0-to-3.0.0.derby.sql @@ -11,5 +11,6 @@ RUN '050-HIVE-18192.derby.sql'; RUN '051-HIVE-18675.derby.sql'; RUN '052-HIVE-18965.derby.sql'; RUN '053-HIVE-18755.derby.sql'; +RUN '054-HIVE-18781.derby.sql'; UPDATE "APP".VERSION SET SCHEMA_VERSION='3.0.0', VERSION_COMMENT='Hive release version 3.0.0' where VER_ID=1; http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/ql/if/queryplan.thrift ---------------------------------------------------------------------- diff --git a/ql/if/queryplan.thrift b/ql/if/queryplan.thrift index aaf644a..ad778e3 100644 --- a/ql/if/queryplan.thrift +++ b/ql/if/queryplan.thrift @@ -102,7 +102,8 @@ enum StageType { COLUMNSTATS, REPL_DUMP, REPL_BOOTSTRAP_LOAD, - REPL_STATE_LOG + REPL_STATE_LOG, + REPL_TXN } struct Stage { http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp ---------------------------------------------------------------------- diff --git a/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp b/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp index 7262017..b6eb12a 100644 --- a/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp +++ b/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp @@ -118,7 +118,8 @@ int _kStageTypeValues[] = { StageType::COLUMNSTATS, StageType::REPL_DUMP, StageType::REPL_BOOTSTRAP_LOAD, - StageType::REPL_STATE_LOG + StageType::REPL_STATE_LOG, + StageType::REPL_TXN }; const char* _kStageTypeNames[] = { "CONDITIONAL", @@ -135,9 +136,10 @@ const char* _kStageTypeNames[] = { "COLUMNSTATS", "REPL_DUMP", "REPL_BOOTSTRAP_LOAD", - "REPL_STATE_LOG" + "REPL_STATE_LOG", + "REPL_TXN" }; -const std::map<int, const char*> _StageType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(15, _kStageTypeValues, _kStageTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL)); +const std::map<int, const char*> _StageType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(16, _kStageTypeValues, _kStageTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL)); Adjacency::~Adjacency() throw() { http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/ql/src/gen/thrift/gen-cpp/queryplan_types.h ---------------------------------------------------------------------- diff --git a/ql/src/gen/thrift/gen-cpp/queryplan_types.h b/ql/src/gen/thrift/gen-cpp/queryplan_types.h index 18dc867..eb02107 100644 --- a/ql/src/gen/thrift/gen-cpp/queryplan_types.h +++ b/ql/src/gen/thrift/gen-cpp/queryplan_types.h @@ -96,7 +96,8 @@ struct StageType { COLUMNSTATS = 11, REPL_DUMP = 12, REPL_BOOTSTRAP_LOAD = 13, - REPL_STATE_LOG = 14 + REPL_STATE_LOG = 14, + REPL_TXN = 15 }; }; http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java ---------------------------------------------------------------------- diff --git a/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java b/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java index ed408d2..08822b3 100644 --- a/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java +++ b/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java @@ -26,7 +26,8 @@ public enum StageType implements org.apache.thrift.TEnum { COLUMNSTATS(11), REPL_DUMP(12), REPL_BOOTSTRAP_LOAD(13), - REPL_STATE_LOG(14); + REPL_STATE_LOG(14), + REPL_TXN(15); private final int value; @@ -77,6 +78,8 @@ public enum StageType implements org.apache.thrift.TEnum { return REPL_BOOTSTRAP_LOAD; case 14: return REPL_STATE_LOG; + case 15: + return REPL_TXN; default: return null; } http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/ql/src/gen/thrift/gen-php/Types.php ---------------------------------------------------------------------- diff --git a/ql/src/gen/thrift/gen-php/Types.php b/ql/src/gen/thrift/gen-php/Types.php index bca2eee..df4e41d 100644 --- a/ql/src/gen/thrift/gen-php/Types.php +++ b/ql/src/gen/thrift/gen-php/Types.php @@ -117,6 +117,7 @@ final class StageType { const REPL_DUMP = 12; const REPL_BOOTSTRAP_LOAD = 13; const REPL_STATE_LOG = 14; + const REPL_TXN = 15; static public $__names = array( 0 => 'CONDITIONAL', 1 => 'COPY', @@ -133,6 +134,7 @@ final class StageType { 12 => 'REPL_DUMP', 13 => 'REPL_BOOTSTRAP_LOAD', 14 => 'REPL_STATE_LOG', + 15 => 'REPL_TXN', ); } http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/ql/src/gen/thrift/gen-py/queryplan/ttypes.py ---------------------------------------------------------------------- diff --git a/ql/src/gen/thrift/gen-py/queryplan/ttypes.py b/ql/src/gen/thrift/gen-py/queryplan/ttypes.py index 1f0d627..85d39fd 100644 --- a/ql/src/gen/thrift/gen-py/queryplan/ttypes.py +++ b/ql/src/gen/thrift/gen-py/queryplan/ttypes.py @@ -163,6 +163,7 @@ class StageType: REPL_DUMP = 12 REPL_BOOTSTRAP_LOAD = 13 REPL_STATE_LOG = 14 + REPL_TXN = 15 _VALUES_TO_NAMES = { 0: "CONDITIONAL", @@ -180,6 +181,7 @@ class StageType: 12: "REPL_DUMP", 13: "REPL_BOOTSTRAP_LOAD", 14: "REPL_STATE_LOG", + 15: "REPL_TXN", } _NAMES_TO_VALUES = { @@ -198,6 +200,7 @@ class StageType: "REPL_DUMP": 12, "REPL_BOOTSTRAP_LOAD": 13, "REPL_STATE_LOG": 14, + "REPL_TXN": 15, } http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/ql/src/gen/thrift/gen-rb/queryplan_types.rb ---------------------------------------------------------------------- diff --git a/ql/src/gen/thrift/gen-rb/queryplan_types.rb b/ql/src/gen/thrift/gen-rb/queryplan_types.rb index 88d9c17..6010f3d 100644 --- a/ql/src/gen/thrift/gen-rb/queryplan_types.rb +++ b/ql/src/gen/thrift/gen-rb/queryplan_types.rb @@ -75,8 +75,9 @@ module StageType REPL_DUMP = 12 REPL_BOOTSTRAP_LOAD = 13 REPL_STATE_LOG = 14 - VALUE_MAP = {0 => "CONDITIONAL", 1 => "COPY", 2 => "DDL", 3 => "MAPRED", 4 => "EXPLAIN", 5 => "FETCH", 6 => "FUNC", 7 => "MAPREDLOCAL", 8 => "MOVE", 9 => "STATS", 10 => "DEPENDENCY_COLLECTION", 11 => "COLUMNSTATS", 12 => "REPL_DUMP", 13 => "REPL_BOOTSTRAP_LOAD", 14 => "REPL_STATE_LOG"} - VALID_VALUES = Set.new([CONDITIONAL, COPY, DDL, MAPRED, EXPLAIN, FETCH, FUNC, MAPREDLOCAL, MOVE, STATS, DEPENDENCY_COLLECTION, COLUMNSTATS, REPL_DUMP, REPL_BOOTSTRAP_LOAD, REPL_STATE_LOG]).freeze + REPL_TXN = 15 + VALUE_MAP = {0 => "CONDITIONAL", 1 => "COPY", 2 => "DDL", 3 => "MAPRED", 4 => "EXPLAIN", 5 => "FETCH", 6 => "FUNC", 7 => "MAPREDLOCAL", 8 => "MOVE", 9 => "STATS", 10 => "DEPENDENCY_COLLECTION", 11 => "COLUMNSTATS", 12 => "REPL_DUMP", 13 => "REPL_BOOTSTRAP_LOAD", 14 => "REPL_STATE_LOG", 15 => "REPL_TXN"} + VALID_VALUES = Set.new([CONDITIONAL, COPY, DDL, MAPRED, EXPLAIN, FETCH, FUNC, MAPREDLOCAL, MOVE, STATS, DEPENDENCY_COLLECTION, COLUMNSTATS, REPL_DUMP, REPL_BOOTSTRAP_LOAD, REPL_STATE_LOG, REPL_TXN]).freeze end class Adjacency http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java new file mode 100644 index 0000000..1cdeeb6 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; +import org.apache.hadoop.hive.ql.plan.api.StageType; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.StringUtils; +import java.util.List; + +/** + * ReplTxnTask. + * Used for replaying the transaction related events. + */ +public class ReplTxnTask extends Task<ReplTxnWork> { + + private static final long serialVersionUID = 1L; + + public ReplTxnTask() { + super(); + } + + @Override + public int execute(DriverContext driverContext) { + String replPolicy = work.getReplPolicy(); + if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { + Utilities.FILE_OP_LOGGER.trace("Executing ReplTxnTask " + work.getOperationType().toString() + + " for txn ids : " + work.getTxnIds().toString() + " replPolicy : " + replPolicy); + } + try { + HiveTxnManager txnManager = driverContext.getCtx().getHiveTxnManager(); + String user = UserGroupInformation.getCurrentUser().getUserName(); + LOG.debug("Replaying " + work.getOperationType().toString() + " Event for policy " + + replPolicy + " with srcTxn " + work.getTxnIds().toString()); + switch(work.getOperationType()) { + case REPL_OPEN_TXN: + List<Long> txnIds = txnManager.replOpenTxn(replPolicy, work.getTxnIds(), user); + assert txnIds.size() == work.getTxnIds().size(); + LOG.info("Replayed OpenTxn Event for policy " + replPolicy + " with srcTxn " + + work.getTxnIds().toString() + " and target txn id " + txnIds.toString()); + return 0; + case REPL_ABORT_TXN: + for (long txnId : work.getTxnIds()) { + txnManager.replRollbackTxn(replPolicy, txnId); + LOG.info("Replayed AbortTxn Event for policy " + replPolicy + " with srcTxn " + txnId); + } + return 0; + case REPL_COMMIT_TXN: + for (long txnId : work.getTxnIds()) { + txnManager.replCommitTxn(replPolicy, txnId); + LOG.info("Replayed CommitTxn Event for policy " + replPolicy + " with srcTxn " + txnId); + } + return 0; + default: + LOG.error("Operation Type " + work.getOperationType() + " is not supported "); + return 1; + } + } catch (Exception e) { + console.printError("Failed with exception " + e.getMessage(), "\n" + + StringUtils.stringifyException(e)); + setException(e); + return 1; + } + } + + @Override + public StageType getType() { + return StageType.REPL_TXN; + } + + @Override + public String getName() { + return "REPL_TRANSACTION"; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnWork.java new file mode 100644 index 0000000..9467415 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnWork.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec; + +import java.io.Serializable; + +import com.google.common.collect.Lists; +import org.apache.hadoop.hive.ql.plan.Explain; +import org.apache.hadoop.hive.ql.plan.Explain.Level; +import java.util.List; + +/** + * ReplTxnTask. + * Used for replaying the transaction related events. + */ +@Explain(displayName = "Replication Transaction", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) +public class ReplTxnWork implements Serializable { + private static final long serialVersionUID = 1L; + private String dbName; + private String tableName; + private List<Long> txnIds; + + /** + * OperationType. + * Different kind of events supported for replaying. + */ + public enum OperationType { + REPL_OPEN_TXN, REPL_ABORT_TXN, REPL_COMMIT_TXN + } + + OperationType operation; + + public ReplTxnWork(String dbName, String tableName, List<Long> txnIds, OperationType type) { + this.txnIds = txnIds; + this.dbName = dbName; + this.tableName = tableName; + this.operation = type; + } + + public ReplTxnWork(String dbName, String tableName, Long txnId, OperationType type) { + this.txnIds = Lists.newArrayList(txnId); + this.dbName = dbName; + this.tableName = tableName; + this.operation = type; + } + + public List<Long> getTxnIds() { + return txnIds; + } + + public Long getTxnId(int idx) { + return txnIds.get(idx); + } + + public String getDbName() { + return dbName; + } + + public String getTableName() { + return tableName; + } + + public String getReplPolicy() { + if ((dbName == null) || (dbName.isEmpty())) { + return null; + } else if ((tableName == null) || (tableName.isEmpty())) { + return dbName.toLowerCase() + ".*"; + } else { + return dbName.toLowerCase() + "." + tableName.toLowerCase(); + } + } + + public OperationType getOperationType() { + return operation; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java index ccfd4cb..10a2ed2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java @@ -113,6 +113,7 @@ public final class TaskFactory { taskvec.add(new TaskTuple<>(ReplLoadWork.class, ReplLoadTask.class)); taskvec.add(new TaskTuple<>(ReplStateLogWork.class, ReplStateLogTask.class)); taskvec.add(new TaskTuple<ExportWork>(ExportWork.class, ExportTask.class)); + taskvec.add(new TaskTuple<ReplTxnWork>(ReplTxnWork.class, ReplTxnTask.class)); } private static ThreadLocal<Integer> tid = new ThreadLocal<Integer>() { http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index a9ebc90..44a7496 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -1794,4 +1794,14 @@ public class AcidUtils { } return fileList; } + + public static boolean isAcidEnabled(HiveConf hiveConf) { + String txnMgr = hiveConf.getVar(HiveConf.ConfVars.HIVE_TXN_MANAGER); + boolean concurrency = hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY); + String dbTxnMgr = "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager"; + if (txnMgr.equals(dbTxnMgr) && concurrency) { + return true; + } + return false; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index 89ca1ff..6513e0f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -203,6 +203,15 @@ public final class DbTxnManager extends HiveTxnManagerImpl { } @Override + public List<Long> replOpenTxn(String replPolicy, List<Long> srcTxnIds, String user) throws LockException { + try { + return getMS().replOpenTxn(replPolicy, srcTxnIds, user); + } catch (TException e) { + throw new LockException(e, ErrorMsg.METASTORE_COMMUNICATION_FAILED); + } + } + + @Override public long openTxn(Context ctx, String user) throws LockException { return openTxn(ctx, user, 0); } @@ -591,6 +600,22 @@ public final class DbTxnManager extends HiveTxnManagerImpl { } @Override + public void replCommitTxn(String replPolicy, long srcTxnId) throws LockException { + try { + getMS().replCommitTxn(srcTxnId, replPolicy); + } catch (NoSuchTxnException e) { + LOG.error("Metastore could not find " + JavaUtils.txnIdToString(srcTxnId)); + throw new LockException(e, ErrorMsg.TXN_NO_SUCH_TRANSACTION, JavaUtils.txnIdToString(srcTxnId)); + } catch (TxnAbortedException e) { + LockException le = new LockException(e, ErrorMsg.TXN_ABORTED, JavaUtils.txnIdToString(srcTxnId), e.getMessage()); + LOG.error(le.getMessage()); + throw le; + } catch (TException e) { + throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), e); + } + } + + @Override public void commitTxn() throws LockException { if (!isTxnOpen()) { throw new RuntimeException("Attempt to commit before opening a transaction"); @@ -617,6 +642,21 @@ public final class DbTxnManager extends HiveTxnManagerImpl { tableWriteIds.clear(); } } + @Override + public void replRollbackTxn(String replPolicy, long srcTxnId) throws LockException { + try { + getMS().replRollbackTxn(srcTxnId, replPolicy); + } catch (NoSuchTxnException e) { + LOG.error("Metastore could not find " + JavaUtils.txnIdToString(srcTxnId)); + throw new LockException(e, ErrorMsg.TXN_NO_SUCH_TRANSACTION, JavaUtils.txnIdToString(srcTxnId)); + } catch (TxnAbortedException e) { + LockException le = new LockException(e, ErrorMsg.TXN_ABORTED, JavaUtils.txnIdToString(srcTxnId), e.getMessage()); + LOG.error(le.getMessage()); + throw le; + } catch (TException e) { + throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), e); + } + } @Override public void rollbackTxn() throws LockException { http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java index 7413074..9057bb9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java @@ -56,6 +56,11 @@ class DummyTxnManager extends HiveTxnManagerImpl { return 0L; } @Override + public List<Long> replOpenTxn(String replPolicy, List<Long> srcTxnIds, String user) throws LockException { + return null; + } + + @Override public boolean isTxnOpen() { return false; } @@ -209,11 +214,21 @@ class DummyTxnManager extends HiveTxnManagerImpl { } @Override + public void replCommitTxn(String replPolicy, long srcTxnId) throws LockException { + // No-op + } + + @Override public void rollbackTxn() throws LockException { // No-op } @Override + public void replRollbackTxn(String replPolicy, long srcTxnId) throws LockException { + // No-op + } + + @Override public void heartbeat() throws LockException { // No-op } http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java index 0db2a2c..490f3b8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java @@ -28,7 +28,6 @@ import org.apache.hadoop.hive.ql.plan.LockDatabaseDesc; import org.apache.hadoop.hive.ql.plan.LockTableDesc; import org.apache.hadoop.hive.ql.plan.UnlockDatabaseDesc; import org.apache.hadoop.hive.ql.plan.UnlockTableDesc; - import java.util.List; /** @@ -48,6 +47,32 @@ public interface HiveTxnManager { long openTxn(Context ctx, String user) throws LockException; /** + * Open a new transaction in target cluster. + * @param replPolicy Replication policy to uniquely identify the source cluster. + * @param srcTxnIds The ids of the transaction at the source cluster + * @param user The user who has fired the repl load command + * @return The new transaction id. + * @throws LockException in case of failure to start the transaction. + */ + List<Long> replOpenTxn(String replPolicy, List<Long> srcTxnIds, String user) throws LockException; + + /** + * Commit the transaction in target cluster. + * @param replPolicy Replication policy to uniquely identify the source cluster. + * @param srcTxnId The id of the transaction at the source cluster + * @throws LockException in case of failure to commit the transaction. + */ + void replCommitTxn(String replPolicy, long srcTxnId) throws LockException; + + /** + * Abort the transaction in target cluster. + * @param replPolicy Replication policy to uniquely identify the source cluster. + * @param srcTxnId The id of the transaction at the source cluster + * @throws LockException in case of failure to abort the transaction. + */ + void replRollbackTxn(String replPolicy, long srcTxnId) throws LockException; + + /** * Get the lock manager. This must be used rather than instantiating an * instance of the lock manager directly as the transaction manager will * choose which lock manager to instantiate. http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index e862589..8b639f7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -409,10 +409,6 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { Task<?> copyTask = null; if (replicationSpec.isInReplicationScope()) { - if (isSourceMm || isAcid(writeId)) { - // Note: this is replication gap, not MM gap... Repl V2 is not ready yet. - throw new RuntimeException("Replicating MM and ACID tables is not supported"); - } copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, destPath, x.getConf()); } else { CopyWork cw = new CopyWork(dataPath, destPath, false); @@ -502,10 +498,6 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { Task<?> copyTask = null; if (replicationSpec.isInReplicationScope()) { - if (isSourceMm || isAcid(writeId)) { - // Note: this is replication gap, not MM gap... Repl V2 is not ready yet. - throw new RuntimeException("Replicating MM and ACID tables is not supported"); - } copyTask = ReplCopyTask.getLoadCopyTask( replicationSpec, new Path(srcLocation), destPath, x.getConf()); } else { http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 9f18efd..79b2e48 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.exec.repl.bootstrap.ReplLoadWork; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.repl.DumpType; import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; @@ -243,6 +244,60 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { } } + private boolean shouldReplayEvent(FileStatus dir, DumpType dumpType) throws SemanticException { + // This functions filters out all the events which are already replayed. This can be done only + // for transaction related events as for other kind of events we can not gurantee that the last + // repl id stored in the database/table is valid. + if ((dumpType != DumpType.EVENT_ABORT_TXN) && + (dumpType != DumpType.EVENT_OPEN_TXN) && + (dumpType != DumpType.EVENT_COMMIT_TXN)) { + return true; + } + + // if database itself is null then we can not filter out anything. + if (dbNameOrPattern == null || dbNameOrPattern.isEmpty()) { + return true; + } else if ((tblNameOrPattern == null) || (tblNameOrPattern.isEmpty())) { + Database database; + try { + database = Hive.get().getDatabase(dbNameOrPattern); + } catch (HiveException e) { + LOG.error("failed to get the database " + dbNameOrPattern); + throw new SemanticException(e); + } + String replLastId; + Map<String, String> params = database.getParameters(); + if (params != null && (params.containsKey(ReplicationSpec.KEY.CURR_STATE_ID.toString()))) { + replLastId = params.get(ReplicationSpec.KEY.CURR_STATE_ID.toString()); + if (Long.parseLong(replLastId) >= Long.parseLong(dir.getPath().getName())) { + LOG.debug("Event " + dumpType + " with replId " + Long.parseLong(dir.getPath().getName()) + + " is already replayed. LastReplId - " + Long.parseLong(replLastId)); + return false; + } + } + } else { + Table tbl; + try { + tbl = Hive.get().getTable(dbNameOrPattern, tblNameOrPattern); + } catch (HiveException e) { + LOG.error("failed to get the table " + dbNameOrPattern + "." + tblNameOrPattern); + throw new SemanticException(e); + } + if (tbl != null) { + Map<String, String> params = tbl.getParameters(); + if (params != null && (params.containsKey(ReplicationSpec.KEY.CURR_STATE_ID.toString()))) { + String replLastId = params.get(ReplicationSpec.KEY.CURR_STATE_ID.toString()); + if (Long.parseLong(replLastId) >= Long.parseLong(dir.getPath().getName())) { + LOG.debug("Event " + dumpType + " with replId " + Long.parseLong(dir.getPath().getName()) + + " is already replayed. LastReplId - " + Long.parseLong(replLastId)); + return false; + } + } + } + } + return true; + } + /* * Example dump dirs we need to be able to handle : * @@ -380,6 +435,13 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { loadPath.toString(), dirsInLoadPath.length); for (FileStatus dir : dirsInLoadPath){ + String locn = dir.getPath().toUri().toString(); + DumpMetaData eventDmd = new DumpMetaData(new Path(locn), conf); + + if (!shouldReplayEvent(dir, eventDmd.getDumpType())) { + continue; + } + LOG.debug("Loading event from {} to {}.{}", dir.getPath().toUri(), dbNameOrPattern, tblNameOrPattern); // event loads will behave similar to table loads, with one crucial difference @@ -401,8 +463,6 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { // Once this entire chain is generated, we add evTaskRoot to rootTasks, so as to execute the // entire chain - String locn = dir.getPath().toUri().toString(); - DumpMetaData eventDmd = new DumpMetaData(new Path(locn), conf); MessageHandler.Context context = new MessageHandler.Context(dbNameOrPattern, tblNameOrPattern, locn, taskChainTail, eventDmd, conf, db, ctx, LOG); http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java index c69ecc9..5fab0d2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java @@ -37,6 +37,9 @@ import org.apache.hadoop.hive.ql.parse.repl.load.message.RenameTableHandler; import org.apache.hadoop.hive.ql.parse.repl.load.message.TableHandler; import org.apache.hadoop.hive.ql.parse.repl.load.message.TruncatePartitionHandler; import org.apache.hadoop.hive.ql.parse.repl.load.message.TruncateTableHandler; +import org.apache.hadoop.hive.ql.parse.repl.load.message.OpenTxnHandler; +import org.apache.hadoop.hive.ql.parse.repl.load.message.CommitTxnHandler; +import org.apache.hadoop.hive.ql.parse.repl.load.message.AbortTxnHandler; public enum DumpType { @@ -183,6 +186,24 @@ public enum DumpType { public MessageHandler handler() { return new DropDatabaseHandler(); } + }, + EVENT_OPEN_TXN("EVENT_OPEN_TXN") { + @Override + public MessageHandler handler() { + return new OpenTxnHandler(); + } + }, + EVENT_COMMIT_TXN("EVENT_COMMIT_TXN") { + @Override + public MessageHandler handler() { + return new CommitTxnHandler(); + } + }, + EVENT_ABORT_TXN("EVENT_ABORT_TXN") { + @Override + public MessageHandler handler() { + return new AbortTxnHandler(); + } }; String type = null; http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbortTxnHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbortTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbortTxnHandler.java new file mode 100644 index 0000000..b9a5d21 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbortTxnHandler.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.dump.events; + +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.ql.parse.repl.DumpType; +import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; + +class AbortTxnHandler extends AbstractEventHandler { + + AbortTxnHandler(NotificationEvent event) { + super(event); + } + + @Override + public void handle(Context withinContext) throws Exception { + LOG.info("Processing#{} ABORT_TXN message : {}", fromEventId(), event.getMessage()); + DumpMetaData dmd = withinContext.createDmd(this); + dmd.setPayload(event.getMessage()); + dmd.write(); + } + + @Override + public DumpType dumpType() { + return DumpType.EVENT_ABORT_TXN; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java new file mode 100644 index 0000000..db97d7c --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java @@ -0,0 +1,43 @@ + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.dump.events; + +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.ql.parse.repl.DumpType; +import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; + +class CommitTxnHandler extends AbstractEventHandler { + + CommitTxnHandler(NotificationEvent event) { + super(event); + } + + @Override + public void handle(Context withinContext) throws Exception { + LOG.info("Processing#{} COMMIT_TXN message : {}", fromEventId(), event.getMessage()); + DumpMetaData dmd = withinContext.createDmd(this); + dmd.setPayload(event.getMessage()); + dmd.write(); + } + + @Override + public DumpType dumpType() { + return DumpType.EVENT_COMMIT_TXN; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java index 9955246..10ff21c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java @@ -50,6 +50,9 @@ public class EventHandlerFactory { register(MessageFactory.DROP_CONSTRAINT_EVENT, DropConstraintHandler.class); register(MessageFactory.CREATE_DATABASE_EVENT, CreateDatabaseHandler.class); register(MessageFactory.DROP_DATABASE_EVENT, DropDatabaseHandler.class); + register(MessageFactory.OPEN_TXN_EVENT, OpenTxnHandler.class); + register(MessageFactory.COMMIT_TXN_EVENT, CommitTxnHandler.class); + register(MessageFactory.ABORT_TXN_EVENT, AbortTxnHandler.class); } static void register(String event, Class<? extends EventHandler> handlerClazz) { http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/OpenTxnHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/OpenTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/OpenTxnHandler.java new file mode 100644 index 0000000..fe81fe1 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/OpenTxnHandler.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.dump.events; + +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.ql.parse.repl.DumpType; +import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; + +class OpenTxnHandler extends AbstractEventHandler { + + OpenTxnHandler(NotificationEvent event) { + super(event); + } + + @Override + public void handle(Context withinContext) throws Exception { + LOG.info("Processing#{} OPEN_TXN message : {}", fromEventId(), event.getMessage()); + DumpMetaData dmd = withinContext.createDmd(this); + dmd.setPayload(event.getMessage()); + dmd.write(); + } + + @Override + public DumpType dumpType() { + return DumpType.EVENT_OPEN_TXN; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbortTxnHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbortTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbortTxnHandler.java new file mode 100644 index 0000000..5b2c85b --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbortTxnHandler.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.load.message; + +import org.apache.hadoop.hive.metastore.messaging.AbortTxnMessage; +import org.apache.hadoop.hive.ql.exec.ReplTxnWork; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import java.io.Serializable; +import java.util.Collections; +import java.util.List; + +/** + * AbortTxnHandler + * Target(Load) side handler for abort transaction event. + */ +public class AbortTxnHandler extends AbstractMessageHandler { + @Override + public List<Task<? extends Serializable>> handle(Context context) + throws SemanticException { + if (!AcidUtils.isAcidEnabled(context.hiveConf)) { + context.log.error("Cannot load transaction events as acid is not enabled"); + throw new SemanticException("Cannot load transaction events as acid is not enabled"); + } + + AbortTxnMessage msg = deserializer.getAbortTxnMessage(context.dmd.getPayload()); + + Task<ReplTxnWork> abortTxnTask = TaskFactory.get( + new ReplTxnWork(context.dbName, context.tableName, msg.getTxnId(), ReplTxnWork.OperationType.REPL_ABORT_TXN), + context.hiveConf + ); + updatedMetadata.set(context.dmd.getEventTo().toString(), context.dbName, context.tableName, null); + context.log.debug("Added Abort txn task : {}", abortTxnTask.getId()); + return Collections.singletonList(abortTxnTask); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java new file mode 100644 index 0000000..461a0f1 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.load.message; + +import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage; +import org.apache.hadoop.hive.ql.exec.ReplTxnWork; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import java.io.Serializable; +import java.util.Collections; +import java.util.List; + +/** + * CommitTxnHandler + * Target(Load) side handler for commit transaction event. + */ +public class CommitTxnHandler extends AbstractMessageHandler { + @Override + public List<Task<? extends Serializable>> handle(Context context) + throws SemanticException { + if (!AcidUtils.isAcidEnabled(context.hiveConf)) { + context.log.error("Cannot load transaction events as acid is not enabled"); + throw new SemanticException("Cannot load transaction events as acid is not enabled"); + } + + CommitTxnMessage msg = deserializer.getCommitTxnMessage(context.dmd.getPayload()); + + Task<ReplTxnWork> commitTxnTask = TaskFactory.get( + new ReplTxnWork(context.dbName, context.tableName, msg.getTxnId(), + ReplTxnWork.OperationType.REPL_COMMIT_TXN), context.hiveConf + ); + updatedMetadata.set(context.dmd.getEventTo().toString(), context.dbName, context.tableName, null); + context.log.debug("Added Commit txn task : {}", commitTxnTask.getId()); + return Collections.singletonList(commitTxnTask); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/OpenTxnHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/OpenTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/OpenTxnHandler.java new file mode 100644 index 0000000..c6349ea --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/OpenTxnHandler.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl.load.message; + +import org.apache.hadoop.hive.metastore.messaging.OpenTxnMessage; +import org.apache.hadoop.hive.ql.exec.ReplTxnWork; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import java.io.Serializable; +import java.util.Collections; +import java.util.List; + +/** + * OpenTxnHandler + * Target(Load) side handler for open transaction event. + */ +public class OpenTxnHandler extends AbstractMessageHandler { + @Override + public List<Task<? extends Serializable>> handle(Context context) + throws SemanticException { + if (!AcidUtils.isAcidEnabled(context.hiveConf)) { + context.log.error("Cannot load transaction events as acid is not enabled"); + throw new SemanticException("Cannot load transaction events as acid is not enabled"); + } + OpenTxnMessage msg = deserializer.getOpenTxnMessage(context.dmd.getPayload()); + + Task<ReplTxnWork> openTxnTask = TaskFactory.get( + new ReplTxnWork(context.dbName, context.tableName, msg.getTxnIds(), ReplTxnWork.OperationType.REPL_OPEN_TXN), + context.hiveConf + ); + updatedMetadata.set(context.dmd.getEventTo().toString(), context.dbName, context.tableName, null); + context.log.debug("Added Open txn task : {}", openTxnTask.getId()); + return Collections.singletonList(openTxnTask); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java index 7b510dd..f1da15b 100644 --- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java +++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hive.metastore.txn; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.AbortTxnRequest; +import org.apache.hadoop.hive.metastore.api.AbortTxnsRequest; import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions; import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsRequest; import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsResponse; @@ -78,6 +79,8 @@ import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.LongStream; import static junit.framework.Assert.assertEquals; import static junit.framework.Assert.assertFalse; @@ -1541,6 +1544,24 @@ public class TestTxnHandler { Assert.assertTrue("regex should be retryable", result); } + @Test + public void testReplOpenTxn() throws Exception { + int numTxn = 50000; + conf.setIntVar(HiveConf.ConfVars.HIVE_TXN_MAX_OPEN_BATCH, numTxn); + OpenTxnRequest rqst = new OpenTxnRequest(numTxn, "me", "localhost"); + rqst.setReplPolicy("default.*"); + rqst.setReplSrcTxnIds(LongStream.rangeClosed(1, numTxn) + .boxed().collect(Collectors.toList())); + OpenTxnsResponse openedTxns = txnHandler.openTxns(rqst); + List<Long> txnList = openedTxns.getTxn_ids(); + assertEquals(txnList.size(), numTxn); + for (long i = 0; i < numTxn; i++) { + long txnId = txnList.get((int) i); + assertEquals(i+1, txnId); + } + txnHandler.abortTxns(new AbortTxnsRequest(txnList)); + } + private void updateTxns(Connection conn) throws SQLException { Statement stmt = conn.createStatement(); stmt.executeUpdate("update TXNS set txn_last_heartbeat = txn_last_heartbeat + 1");