HIVE-18781: Create/Replicate Open, Commit (without writes) and Abort Txn events 
(Mahesh Kumar Behera, reviewed by Sankar Hariappan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/59483bca
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/59483bca
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/59483bca

Branch: refs/heads/master
Commit: 59483bca262880d3e7ef1b873d3c21176e9294cb
Parents: 9e98d59
Author: Sankar Hariappan <sank...@apache.org>
Authored: Mon Apr 2 10:02:09 2018 +0530
Committer: Sankar Hariappan <sank...@apache.org>
Committed: Mon Apr 2 10:02:09 2018 +0530

----------------------------------------------------------------------
 .../listener/DbNotificationListener.java        |  152 +-
 .../listener/TestDbNotificationListener.java    |   15 +
 .../TestReplicationScenariosAcidTables.java     |  184 +
 .../hadoop/hive/ql/parse/WarehouseInstance.java |   21 +-
 .../compactor/TestCleanerWithReplication.java   |    1 +
 .../upgrade/derby/054-HIVE-18781.derby.sql      |    9 +
 .../upgrade/derby/hive-schema-3.0.0.derby.sql   |    2 +
 .../derby/hive-txn-schema-3.0.0.derby.sql       |    7 +
 .../derby/upgrade-2.3.0-to-3.0.0.derby.sql      |    1 +
 ql/if/queryplan.thrift                          |    3 +-
 ql/src/gen/thrift/gen-cpp/queryplan_types.cpp   |    8 +-
 ql/src/gen/thrift/gen-cpp/queryplan_types.h     |    3 +-
 .../hadoop/hive/ql/plan/api/StageType.java      |    5 +-
 ql/src/gen/thrift/gen-php/Types.php             |    2 +
 ql/src/gen/thrift/gen-py/queryplan/ttypes.py    |    3 +
 ql/src/gen/thrift/gen-rb/queryplan_types.rb     |    5 +-
 .../apache/hadoop/hive/ql/exec/ReplTxnTask.java |   92 +
 .../apache/hadoop/hive/ql/exec/ReplTxnWork.java |   91 +
 .../apache/hadoop/hive/ql/exec/TaskFactory.java |    1 +
 .../org/apache/hadoop/hive/ql/io/AcidUtils.java |   10 +
 .../hadoop/hive/ql/lockmgr/DbTxnManager.java    |   40 +
 .../hadoop/hive/ql/lockmgr/DummyTxnManager.java |   15 +
 .../hadoop/hive/ql/lockmgr/HiveTxnManager.java  |   27 +-
 .../hive/ql/parse/ImportSemanticAnalyzer.java   |    8 -
 .../ql/parse/ReplicationSemanticAnalyzer.java   |   64 +-
 .../hadoop/hive/ql/parse/repl/DumpType.java     |   21 +
 .../parse/repl/dump/events/AbortTxnHandler.java |   42 +
 .../repl/dump/events/CommitTxnHandler.java      |   43 +
 .../repl/dump/events/EventHandlerFactory.java   |    3 +
 .../parse/repl/dump/events/OpenTxnHandler.java  |   42 +
 .../repl/load/message/AbortTxnHandler.java      |   53 +
 .../repl/load/message/CommitTxnHandler.java     |   53 +
 .../parse/repl/load/message/OpenTxnHandler.java |   52 +
 .../hive/metastore/txn/TestTxnHandler.java      |   21 +
 .../gen/thrift/gen-cpp/ThriftHiveMetastore.cpp  | 2258 ++++++------
 .../gen/thrift/gen-cpp/hive_metastore_types.cpp | 3222 +++++++++---------
 .../gen/thrift/gen-cpp/hive_metastore_types.h   |   50 +-
 .../hive/metastore/api/AbortTxnRequest.java     |  112 +-
 .../hive/metastore/api/AbortTxnsRequest.java    |   32 +-
 .../metastore/api/AddDynamicPartitions.java     |   32 +-
 .../api/AllocateTableWriteIdsRequest.java       |   32 +-
 .../api/AllocateTableWriteIdsResponse.java      |   36 +-
 .../metastore/api/ClearFileMetadataRequest.java |   32 +-
 .../hive/metastore/api/ClientCapabilities.java  |   32 +-
 .../hive/metastore/api/CommitTxnRequest.java    |  112 +-
 .../hive/metastore/api/CompactionRequest.java   |   44 +-
 .../hive/metastore/api/CreationMetadata.java    |   32 +-
 .../metastore/api/FindSchemasByColsResp.java    |   36 +-
 .../hive/metastore/api/FireEventRequest.java    |   32 +-
 .../metastore/api/GetAllFunctionsResponse.java  |   36 +-
 .../api/GetFileMetadataByExprRequest.java       |   32 +-
 .../api/GetFileMetadataByExprResult.java        |   48 +-
 .../metastore/api/GetFileMetadataRequest.java   |   32 +-
 .../metastore/api/GetFileMetadataResult.java    |   44 +-
 .../hive/metastore/api/GetTablesRequest.java    |   32 +-
 .../hive/metastore/api/GetTablesResult.java     |   36 +-
 .../metastore/api/GetValidWriteIdsRequest.java  |   32 +-
 .../metastore/api/GetValidWriteIdsResponse.java |   36 +-
 .../api/HeartbeatTxnRangeResponse.java          |   64 +-
 .../metastore/api/InsertEventRequestData.java   |   64 +-
 .../hadoop/hive/metastore/api/LockRequest.java  |   36 +-
 .../hive/metastore/api/Materialization.java     |   32 +-
 .../api/NotificationEventResponse.java          |   36 +-
 .../hive/metastore/api/OpenTxnRequest.java      |  269 +-
 .../hive/metastore/api/OpenTxnsResponse.java    |   32 +-
 .../metastore/api/PutFileMetadataRequest.java   |   64 +-
 .../hive/metastore/api/SchemaVersion.java       |   36 +-
 .../hive/metastore/api/ShowCompactResponse.java |   36 +-
 .../hive/metastore/api/ShowLocksResponse.java   |   36 +-
 .../hive/metastore/api/TableValidWriteIds.java  |   32 +-
 .../hive/metastore/api/ThriftHiveMetastore.java | 2476 +++++++-------
 .../hive/metastore/api/WMFullResourcePlan.java  |  144 +-
 .../api/WMGetAllResourcePlanResponse.java       |   36 +-
 .../WMGetTriggersForResourePlanResponse.java    |   36 +-
 .../api/WMValidateResourcePlanResponse.java     |   64 +-
 .../gen-php/metastore/ThriftHiveMetastore.php   | 1394 ++++----
 .../src/gen/thrift/gen-php/metastore/Types.php  |  946 ++---
 .../hive_metastore/ThriftHiveMetastore.py       |  940 ++---
 .../gen/thrift/gen-py/hive_metastore/ttypes.py  |  604 ++--
 .../gen/thrift/gen-rb/hive_metastore_types.rb   |   14 +-
 .../hadoop/hive/metastore/HiveMetaStore.java    |   26 +-
 .../hive/metastore/HiveMetaStoreClient.java     |   46 +-
 .../hadoop/hive/metastore/IMetaStoreClient.java |   37 +
 .../hive/metastore/MetaStoreEventListener.java  |   31 +-
 .../metastore/MetaStoreListenerNotifier.java    |   23 +
 .../hive/metastore/events/AbortTxnEvent.java    |   83 +
 .../hive/metastore/events/CommitTxnEvent.java   |   83 +
 .../hive/metastore/events/ListenerEvent.java    |   17 +-
 .../hive/metastore/events/OpenTxnEvent.java     |   83 +
 .../metastore/messaging/AbortTxnMessage.java    |   36 +
 .../metastore/messaging/CommitTxnMessage.java   |   36 +
 .../hive/metastore/messaging/EventMessage.java  |    5 +-
 .../messaging/MessageDeserializer.java          |   21 +
 .../metastore/messaging/MessageFactory.java     |   29 +-
 .../metastore/messaging/OpenTxnMessage.java     |   38 +
 .../event/filters/DatabaseAndTableFilter.java   |   12 +-
 .../messaging/json/JSONAbortTxnMessage.java     |   87 +
 .../messaging/json/JSONCommitTxnMessage.java    |   87 +
 .../messaging/json/JSONMessageDeserializer.java |   30 +
 .../messaging/json/JSONMessageFactory.java      |   18 +
 .../messaging/json/JSONOpenTxnMessage.java      |  104 +
 .../hadoop/hive/metastore/txn/TxnDbUtil.java    |   66 +
 .../hadoop/hive/metastore/txn/TxnHandler.java   |  157 +-
 .../src/main/resources/package.jdo              |    2 +
 .../main/sql/derby/hive-schema-3.0.0.derby.sql  |    8 +
 .../sql/derby/upgrade-2.3.0-to-3.0.0.derby.sql  |   10 +
 .../main/sql/mssql/hive-schema-3.0.0.mssql.sql  |   21 +
 .../sql/mssql/upgrade-2.3.0-to-3.0.0.mssql.sql  |   22 +
 .../main/sql/mysql/hive-schema-3.0.0.mysql.sql  |    9 +
 .../sql/mysql/upgrade-2.3.0-to-3.0.0.mysql.sql  |   10 +
 .../sql/oracle/hive-schema-3.0.0.oracle.sql     |   11 +
 .../oracle/upgrade-2.3.0-to-3.0.0.oracle.sql    |   10 +
 .../sql/postgres/hive-schema-3.0.0.postgres.sql |    8 +
 .../upgrade-2.3.0-to-3.0.0.postgres.sql         |   10 +
 .../src/main/thrift/hive_metastore.thrift       |    4 +
 .../HiveMetaStoreClientPreCatalog.java          |   42 +-
 116 files changed, 9698 insertions(+), 6536 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
----------------------------------------------------------------------
diff --git 
a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
 
b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
index 8523428..9480145 100644
--- 
a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
+++ 
b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
@@ -18,6 +18,10 @@
 package org.apache.hive.hcatalog.listener;
 
 import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
@@ -65,16 +69,21 @@ import 
org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
 import org.apache.hadoop.hive.metastore.events.DropTableEvent;
 import org.apache.hadoop.hive.metastore.events.InsertEvent;
 import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent;
+import org.apache.hadoop.hive.metastore.events.OpenTxnEvent;
+import org.apache.hadoop.hive.metastore.events.CommitTxnEvent;
+import org.apache.hadoop.hive.metastore.events.AbortTxnEvent;
 import org.apache.hadoop.hive.metastore.events.ListenerEvent;
 import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType;
 import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
+import org.apache.hadoop.hive.metastore.messaging.OpenTxnMessage;
 import org.apache.hadoop.hive.metastore.messaging.PartitionFiles;
+import org.apache.hadoop.hive.metastore.tools.SQLGenerator;
+import org.apache.hadoop.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import com.google.common.collect.Lists;
-
 import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_CATALOG_NAME;
+import static org.apache.hadoop.hive.metastore.DatabaseProduct.MYSQL;
 
 /**
  * An implementation of {@link 
org.apache.hadoop.hive.metastore.MetaStoreEventListener} that
@@ -438,6 +447,49 @@ public class DbNotificationListener extends 
TransactionalMetaStoreEventListener
     process(event, insertEvent);
   }
 
+  @Override
+  public void onOpenTxn(OpenTxnEvent openTxnEvent) throws MetaException {
+    int lastTxnIdx = openTxnEvent.getTxnIds().size() - 1;
+    OpenTxnMessage msg = 
msgFactory.buildOpenTxnMessage(openTxnEvent.getTxnIds().get(0),
+            openTxnEvent.getTxnIds().get(lastTxnIdx));
+    NotificationEvent event =
+            new NotificationEvent(0, now(), EventType.OPEN_TXN.toString(), 
msg.toString());
+
+    try {
+      addNotificationLog(event, openTxnEvent);
+    } catch (SQLException e) {
+      throw new MetaException("Unable to execute direct SQL " + 
StringUtils.stringifyException(e));
+    }
+  }
+
+  @Override
+  public void onCommitTxn(CommitTxnEvent commitTxnEvent) throws MetaException {
+    NotificationEvent event =
+            new NotificationEvent(0, now(), EventType.COMMIT_TXN.toString(), 
msgFactory.buildCommitTxnMessage(
+                    commitTxnEvent.getTxnId())
+                    .toString());
+
+    try {
+      addNotificationLog(event, commitTxnEvent);
+    } catch (SQLException e) {
+      throw new MetaException("Unable to execute direct SQL " + 
StringUtils.stringifyException(e));
+    }
+  }
+
+  @Override
+  public void onAbortTxn(AbortTxnEvent abortTxnEvent) throws MetaException {
+    NotificationEvent event =
+        new NotificationEvent(0, now(), EventType.ABORT_TXN.toString(), 
msgFactory.buildAbortTxnMessage(
+            abortTxnEvent.getTxnId())
+            .toString());
+
+    try {
+      addNotificationLog(event, abortTxnEvent);
+    } catch (SQLException e) {
+      throw new MetaException("Unable to execute direct SQL " + 
StringUtils.stringifyException(e));
+    }
+  }
+
   /**
    * @param partSetDoneEvent
    * @throws MetaException
@@ -549,6 +601,102 @@ public class DbNotificationListener extends 
TransactionalMetaStoreEventListener
     return (int)millis;
   }
 
+  static String quoteString(String input) {
+    return "'" + input + "'";
+  }
+
+  private void addNotificationLog(NotificationEvent event, ListenerEvent 
listenerEvent)
+          throws MetaException, SQLException {
+    if ((listenerEvent.getConnection() == null) || 
(listenerEvent.getSqlGenerator() == null)) {
+      LOG.info("connection or sql generator is not set so executing sql via 
DN");
+      process(event, listenerEvent);
+      return;
+    }
+    Statement stmt = null;
+    ResultSet rs = null;
+    try {
+      stmt = listenerEvent.getConnection().createStatement();
+      SQLGenerator sqlGenerator = listenerEvent.getSqlGenerator();
+      event.setMessageFormat(msgFactory.getMessageFormat());
+
+      if (sqlGenerator.getDbProduct() == MYSQL) {
+        stmt.execute("SET @@session.sql_mode=ANSI_QUOTES");
+      }
+
+      String s = sqlGenerator.addForUpdateClause("select \"NEXT_EVENT_ID\" " +
+              " from \"NOTIFICATION_SEQUENCE\"");
+      LOG.debug("Going to execute query <" + s + ">");
+      rs = stmt.executeQuery(s);
+      if (!rs.next()) {
+        throw new MetaException("Transaction database not properly " +
+                "configured, can't find next event id.");
+      }
+      long nextEventId = rs.getLong(1);
+      long updatedEventid = nextEventId + 1;
+      s = "update \"NOTIFICATION_SEQUENCE\" set \"NEXT_EVENT_ID\" = " + 
updatedEventid;
+      LOG.debug("Going to execute update <" + s + ">");
+      stmt.executeUpdate(s);
+
+      s = sqlGenerator.addForUpdateClause("select \"NEXT_VAL\" from " +
+              "\"SEQUENCE_TABLE\" where \"SEQUENCE_NAME\" = " +
+              " 'org.apache.hadoop.hive.metastore.model.MNotificationLog'");
+      LOG.debug("Going to execute query <" + s + ">");
+      rs = stmt.executeQuery(s);
+      if (!rs.next()) {
+        throw new MetaException("failed to get next NEXT_VAL from 
SEQUENCE_TABLE");
+      }
+
+      long nextNLId = rs.getLong(1);
+      long updatedNLId = nextNLId + 1;
+      s = "update \"SEQUENCE_TABLE\" set \"NEXT_VAL\" = " + updatedNLId + " 
where \"SEQUENCE_NAME\" = " +
+
+              " 'org.apache.hadoop.hive.metastore.model.MNotificationLog'";
+      LOG.debug("Going to execute update <" + s + ">");
+      stmt.executeUpdate(s);
+
+      List<String> insert = new ArrayList<>();
+
+      insert.add(0, nextNLId + "," + nextEventId + "," + now() + "," +
+              quoteString(event.getEventType()) + "," + 
quoteString(event.getDbName()) + "," +
+              quoteString(" ") + "," + quoteString(event.getMessage()) + "," +
+              quoteString(event.getMessageFormat()));
+
+      List<String> sql = sqlGenerator.createInsertValuesStmt(
+              "\"NOTIFICATION_LOG\" (\"NL_ID\", \"EVENT_ID\", \"EVENT_TIME\", 
" +
+                      " \"EVENT_TYPE\", \"DB_NAME\"," +
+                      " \"TBL_NAME\", \"MESSAGE\", \"MESSAGE_FORMAT\")", 
insert);
+      for (String q : sql) {
+        LOG.info("Going to execute insert <" + q + ">");
+        stmt.execute(q);
+      }
+
+      // Set the DB_NOTIFICATION_EVENT_ID for future reference by other 
listeners.
+      if (event.isSetEventId()) {
+        listenerEvent.putParameter(
+                
MetaStoreEventListenerConstants.DB_NOTIFICATION_EVENT_ID_KEY_NAME,
+                Long.toString(event.getEventId()));
+      }
+    } catch (SQLException e) {
+      LOG.warn("failed to add notification log" + e.getMessage());
+      throw e;
+    } finally {
+      if (stmt != null && !stmt.isClosed()) {
+        try {
+          stmt.close();
+        } catch (SQLException e) {
+          LOG.warn("Failed to close statement " + e.getMessage());
+        }
+      }
+      if (rs != null && !rs.isClosed()) {
+        try {
+          rs.close();
+        } catch (SQLException e) {
+          LOG.warn("Failed to close result set " + e.getMessage());
+        }
+      }
+    }
+  }
+
   /**
    * Process this notification by adding it to metastore DB.
    *

http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
----------------------------------------------------------------------
diff --git 
a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
 
b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
index e0e2965..5459554 100644
--- 
a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
+++ 
b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
@@ -70,6 +70,9 @@ import 
org.apache.hadoop.hive.metastore.events.DropFunctionEvent;
 import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
 import org.apache.hadoop.hive.metastore.events.DropTableEvent;
 import org.apache.hadoop.hive.metastore.events.InsertEvent;
+import org.apache.hadoop.hive.metastore.events.OpenTxnEvent;
+import org.apache.hadoop.hive.metastore.events.CommitTxnEvent;
+import org.apache.hadoop.hive.metastore.events.AbortTxnEvent;
 import org.apache.hadoop.hive.metastore.events.ListenerEvent;
 import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage;
 import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage;
@@ -218,6 +221,18 @@ public class TestDbNotificationListener {
     public void onInsert(InsertEvent insertEvent) throws MetaException {
       pushEventId(EventType.INSERT, insertEvent);
     }
+
+    public void onOpenTxn(OpenTxnEvent openTxnEvent) throws MetaException {
+      pushEventId(EventType.OPEN_TXN, openTxnEvent);
+    }
+
+    public void onCommitTxn(CommitTxnEvent commitTxnEvent) throws 
MetaException {
+      pushEventId(EventType.COMMIT_TXN, commitTxnEvent);
+    }
+
+    public void onAbortTxn(AbortTxnEvent abortTxnEvent) throws MetaException {
+      pushEventId(EventType.ABORT_TXN, abortTxnEvent);
+    }
   }
 
   @SuppressWarnings("rawtypes")

http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
----------------------------------------------------------------------
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
new file mode 100644
index 0000000..cac9922
--- /dev/null
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hive.shims.Utils;
+import org.junit.rules.TestName;
+import org.junit.rules.TestRule;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.BeforeClass;
+import org.junit.AfterClass;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+/**
+ * TestReplicationScenariosAcidTables - test replication for ACID tables
+ */
+public class TestReplicationScenariosAcidTables {
+  @Rule
+  public final TestName testName = new TestName();
+
+  @Rule
+  public TestRule replV1BackwardCompat;
+
+  protected static final Logger LOG = 
LoggerFactory.getLogger(TestReplicationScenarios.class);
+  private static WarehouseInstance primary, replica, replicaNonAcid;
+  private String primaryDbName, replicatedDbName;
+
+  @BeforeClass
+  public static void classLevelSetup() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set("dfs.client.use.datanode.hostname", "true");
+    conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + 
".hosts", "*");
+    MiniDFSCluster miniDFSCluster =
+           new 
MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build();
+    HashMap<String, String> overridesForHiveConf = new HashMap<String, 
String>() {{
+        put("fs.defaultFS", 
miniDFSCluster.getFileSystem().getUri().toString());
+        put("hive.support.concurrency", "true");
+        put("hive.txn.manager", 
"org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
+    }};
+    primary = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf);
+    replica = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf);
+    HashMap<String, String> overridesForHiveConf1 = new HashMap<String, 
String>() {{
+        put("fs.defaultFS", 
miniDFSCluster.getFileSystem().getUri().toString());
+        put("hive.support.concurrency", "false");
+        put("hive.txn.manager", 
"org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager");
+    }};
+    replicaNonAcid = new WarehouseInstance(LOG, miniDFSCluster, 
overridesForHiveConf1);
+  }
+
+  @AfterClass
+  public static void classLevelTearDown() throws IOException {
+    primary.close();
+    replica.close();
+  }
+
+  @Before
+  public void setup() throws Throwable {
+    replV1BackwardCompat = primary.getReplivationV1CompatRule(new 
ArrayList<>());
+    primaryDbName = testName.getMethodName() + "_" + 
+System.currentTimeMillis();
+    replicatedDbName = "replicated_" + primaryDbName;
+    primary.run("create database " + primaryDbName);
+  }
+
+  @After
+  public void tearDown() throws Throwable {
+    primary.run("drop database if exists " + primaryDbName + " cascade");
+    replica.run("drop database if exists " + replicatedDbName + " cascade");
+    replicaNonAcid.run("drop database if exists " + replicatedDbName + " 
cascade");
+  }
+
+  @Test
+  public void testOpenTxnEvent() throws Throwable {
+    String tableName = testName.getMethodName();
+    WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName, null);
+    replica.load(replicatedDbName, bootStrapDump.dumpLocation)
+            .run("REPL STATUS " + replicatedDbName)
+            .verifyResult(bootStrapDump.lastReplicationId);
+
+    // create table will start and coomit the transaction
+    primary.run("CREATE TABLE " + tableName +
+            " (key int, value int) PARTITIONED BY (load_date date) " +
+            "CLUSTERED BY(key) INTO 3 BUCKETS STORED AS ORC TBLPROPERTIES 
('transactional'='true')")
+            .run("SHOW TABLES LIKE '" + tableName + "'")
+            .verifyResult(tableName)
+            .run("INSERT INTO " + tableName + " partition 
(load_date='2016-03-01') VALUES (1, 1)")
+            .run("select key from " + tableName)
+            .verifyResult("1");
+
+    WarehouseInstance.Tuple incrementalDump =
+            primary.dump(primaryDbName, bootStrapDump.lastReplicationId);
+    replica.load(replicatedDbName, incrementalDump.dumpLocation)
+            .run("REPL STATUS " + replicatedDbName)
+            .verifyResult(incrementalDump.lastReplicationId)
+            .run("SHOW TABLES LIKE '" + tableName + "'")
+            .verifyResult(tableName);
+
+    // Test the idempotent behavior of Open and Commit Txn
+    replica.load(replicatedDbName, incrementalDump.dumpLocation)
+            .run("REPL STATUS " + replicatedDbName)
+            .verifyResult(incrementalDump.lastReplicationId)
+            .run("SHOW TABLES LIKE '" + tableName + "'")
+            .verifyResult(tableName);
+  }
+
+  @Test
+  public void testAbortTxnEvent() throws Throwable {
+    String tableName = testName.getMethodName();
+    String tableNameFail = testName.getMethodName() + "Fail";
+    WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName, null);
+    replica.load(replicatedDbName, bootStrapDump.dumpLocation)
+            .run("REPL STATUS " + replicatedDbName)
+            .verifyResult(bootStrapDump.lastReplicationId);
+
+    // this should fail
+    primary.runFailure("CREATE TABLE " + tableNameFail +
+            " (key int, value int) PARTITIONED BY (load_date date) " +
+            "CLUSTERED BY(key) ('transactional'='true')")
+            .run("SHOW TABLES LIKE '" + tableNameFail + "'")
+            .verifyFailure(new String[]{tableNameFail});
+
+    WarehouseInstance.Tuple incrementalDump =
+            primary.dump(primaryDbName, bootStrapDump.lastReplicationId);
+    replica.load(replicatedDbName, incrementalDump.dumpLocation)
+            .run("REPL STATUS " + replicatedDbName)
+            .verifyResult(incrementalDump.lastReplicationId)
+            .run("SHOW TABLES LIKE '" + tableNameFail + "'")
+            .verifyFailure(new String[]{tableNameFail});
+
+    // Test the idempotent behavior of Abort Txn
+    replica.load(replicatedDbName, incrementalDump.dumpLocation)
+            .run("REPL STATUS " + replicatedDbName)
+            .verifyResult(incrementalDump.lastReplicationId)
+            .run("SHOW TABLES LIKE '" + tableNameFail + "'")
+            .verifyFailure(new String[]{tableNameFail});
+  }
+
+  @Test
+  public void testTxnEventNonAcid() throws Throwable {
+    String tableName = testName.getMethodName();
+    WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName, null);
+    replicaNonAcid.load(replicatedDbName, bootStrapDump.dumpLocation)
+            .run("REPL STATUS " + replicatedDbName)
+            .verifyResult(bootStrapDump.lastReplicationId);
+
+    primary.run("CREATE TABLE " + tableName +
+            " (key int, value int) PARTITIONED BY (load_date date) " +
+            "CLUSTERED BY(key) INTO 3 BUCKETS STORED AS ORC TBLPROPERTIES 
('transactional'='true')")
+            .run("SHOW TABLES LIKE '" + tableName + "'")
+            .verifyResult(tableName)
+            .run("INSERT INTO " + tableName +
+                    " partition (load_date='2016-03-01') VALUES (1, 1)")
+            .run("select key from " + tableName)
+            .verifyResult("1");
+
+    WarehouseInstance.Tuple incrementalDump =
+            primary.dump(primaryDbName, bootStrapDump.lastReplicationId);
+    replicaNonAcid.loadFailure(replicatedDbName, incrementalDump.dumpLocation)
+            .run("REPL STATUS " + replicatedDbName)
+            .verifyResult(bootStrapDump.lastReplicationId);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
----------------------------------------------------------------------
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
index 0006ea5..fe4660c 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.MetaStoreTestUtils;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
 import org.apache.hadoop.hive.ql.DriverFactory;
 import org.apache.hadoop.hive.ql.IDriver;
 import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork;
@@ -122,7 +123,6 @@ public class WarehouseInstance implements Closeable {
     hiveConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3);
     hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
     hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
-    hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
     System.setProperty(HiveConf.ConfVars.PREEXECHOOKS.varname, " ");
     System.setProperty(HiveConf.ConfVars.POSTEXECHOOKS.varname, " ");
 
@@ -135,6 +135,10 @@ public class WarehouseInstance implements Closeable {
     driver = DriverFactory.newDriver(hiveConf);
     SessionState.start(new CliSessionState(hiveConf));
     client = new HiveMetaStoreClient(hiveConf);
+
+    TxnDbUtil.cleanDb(hiveConf);
+    TxnDbUtil.prepDb(hiveConf);
+
     // change the value for the next instance.
     ++uniqueIdentifier;
   }
@@ -176,6 +180,14 @@ public class WarehouseInstance implements Closeable {
     return this;
   }
 
+  WarehouseInstance runFailure(String command) throws Throwable {
+    CommandProcessorResponse ret = driver.run(command);
+    if (ret.getException() == null) {
+      throw new RuntimeException("command execution passed for a invalid 
command" + command);
+    }
+    return this;
+  }
+
   Tuple dump(String dbName, String lastReplicationId, List<String> 
withClauseOptions)
       throws Throwable {
     String dumpCommand =
@@ -224,6 +236,13 @@ public class WarehouseInstance implements Closeable {
     return run(replStatusCmd);
   }
 
+  WarehouseInstance loadFailure(String replicatedDbName, String dumpLocation) 
throws Throwable {
+    runFailure("EXPLAIN REPL LOAD " + replicatedDbName + " FROM '" + 
dumpLocation + "'");
+    printOutput();
+    runFailure("REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + 
"'");
+    return this;
+  }
+
   WarehouseInstance verifyResult(String data) throws IOException {
     verifyResults(data == null ? new String[] {} : new String[] { data });
     return this;

http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java
----------------------------------------------------------------------
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java
index c0751a7..597544f 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java
@@ -60,6 +60,7 @@ public class TestCleanerWithReplication extends CompactorTest 
{
     TxnDbUtil.cleanDb(conf);
     conf.set("fs.defaultFS", fs.getUri().toString());
     conf.setBoolVar(HiveConf.ConfVars.REPLCMENABLED, true);
+    TxnDbUtil.prepDb(conf);
     ms = new HiveMetaStoreClient(conf);
     txnHandler = TxnUtils.getTxnStore(conf);
     cmRootDirectory = new Path(conf.get(HiveConf.ConfVars.REPLCMDIR.varname));

http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/metastore/scripts/upgrade/derby/054-HIVE-18781.derby.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/derby/054-HIVE-18781.derby.sql 
b/metastore/scripts/upgrade/derby/054-HIVE-18781.derby.sql
new file mode 100644
index 0000000..8a50663
--- /dev/null
+++ b/metastore/scripts/upgrade/derby/054-HIVE-18781.derby.sql
@@ -0,0 +1,9 @@
+
+CREATE TABLE REPL_TXN_MAP (
+  RTM_REPL_POLICY varchar(256) NOT NULL,
+  RTM_SRC_TXN_ID bigint NOT NULL,
+  RTM_TARGET_TXN_ID bigint NOT NULL,
+  PRIMARY KEY (RTM_REPL_POLICY, RTM_SRC_TXN_ID)
+);
+
+INSERT INTO "APP"."SEQUENCE_TABLE" ("SEQUENCE_NAME", "NEXT_VAL") SELECT * FROM 
(VALUES ('org.apache.hadoop.hive.metastore.model.MNotificationLog', 1)) 
tmp_table WHERE NOT EXISTS ( SELECT "NEXT_VAL" FROM "APP"."SEQUENCE_TABLE" 
WHERE "SEQUENCE_NAME" = 
'org.apache.hadoop.hive.metastore.model.MNotificationLog');

http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/metastore/scripts/upgrade/derby/hive-schema-3.0.0.derby.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/derby/hive-schema-3.0.0.derby.sql 
b/metastore/scripts/upgrade/derby/hive-schema-3.0.0.derby.sql
index 2b1dd5b..259b105 100644
--- a/metastore/scripts/upgrade/derby/hive-schema-3.0.0.derby.sql
+++ b/metastore/scripts/upgrade/derby/hive-schema-3.0.0.derby.sql
@@ -74,6 +74,8 @@ CREATE TABLE "APP"."SDS" ("SD_ID" BIGINT NOT NULL, 
"INPUT_FORMAT" VARCHAR(4000),
 
 CREATE TABLE "APP"."SEQUENCE_TABLE" ("SEQUENCE_NAME" VARCHAR(256) NOT NULL, 
"NEXT_VAL" BIGINT NOT NULL);
 
+INSERT INTO "APP"."SEQUENCE_TABLE" ("SEQUENCE_NAME", "NEXT_VAL") VALUES 
('org.apache.hadoop.hive.metastore.model.MNotificationLog', 1);
+
 RUN '022-HIVE-11107.derby.sql';
 
 CREATE TABLE "APP"."BUCKETING_COLS" ("SD_ID" BIGINT NOT NULL, 
"BUCKET_COL_NAME" VARCHAR(256), "INTEGER_IDX" INTEGER NOT NULL);

http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/metastore/scripts/upgrade/derby/hive-txn-schema-3.0.0.derby.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/derby/hive-txn-schema-3.0.0.derby.sql 
b/metastore/scripts/upgrade/derby/hive-txn-schema-3.0.0.derby.sql
index 99838b4..27d67e6 100644
--- a/metastore/scripts/upgrade/derby/hive-txn-schema-3.0.0.derby.sql
+++ b/metastore/scripts/upgrade/derby/hive-txn-schema-3.0.0.derby.sql
@@ -155,3 +155,10 @@ CREATE TABLE WRITE_SET (
   WS_COMMIT_ID bigint NOT NULL,
   WS_OPERATION_TYPE char(1) NOT NULL
 );
+
+CREATE TABLE REPL_TXN_MAP (
+  RTM_REPL_POLICY varchar(256) NOT NULL,
+  RTM_SRC_TXN_ID bigint NOT NULL,
+  RTM_TARGET_TXN_ID bigint NOT NULL,
+  PRIMARY KEY (RTM_REPL_POLICY, RTM_SRC_TXN_ID)
+);

http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/metastore/scripts/upgrade/derby/upgrade-2.3.0-to-3.0.0.derby.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/derby/upgrade-2.3.0-to-3.0.0.derby.sql 
b/metastore/scripts/upgrade/derby/upgrade-2.3.0-to-3.0.0.derby.sql
index 1a3c00a..ee2fb67 100644
--- a/metastore/scripts/upgrade/derby/upgrade-2.3.0-to-3.0.0.derby.sql
+++ b/metastore/scripts/upgrade/derby/upgrade-2.3.0-to-3.0.0.derby.sql
@@ -11,5 +11,6 @@ RUN '050-HIVE-18192.derby.sql';
 RUN '051-HIVE-18675.derby.sql';
 RUN '052-HIVE-18965.derby.sql';
 RUN '053-HIVE-18755.derby.sql';
+RUN '054-HIVE-18781.derby.sql';
 
 UPDATE "APP".VERSION SET SCHEMA_VERSION='3.0.0', VERSION_COMMENT='Hive release 
version 3.0.0' where VER_ID=1;

http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/ql/if/queryplan.thrift
----------------------------------------------------------------------
diff --git a/ql/if/queryplan.thrift b/ql/if/queryplan.thrift
index aaf644a..ad778e3 100644
--- a/ql/if/queryplan.thrift
+++ b/ql/if/queryplan.thrift
@@ -102,7 +102,8 @@ enum StageType {
   COLUMNSTATS,
   REPL_DUMP,
   REPL_BOOTSTRAP_LOAD,
-  REPL_STATE_LOG
+  REPL_STATE_LOG,
+  REPL_TXN
 }
 
 struct Stage {

http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp
----------------------------------------------------------------------
diff --git a/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp 
b/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp
index 7262017..b6eb12a 100644
--- a/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp
+++ b/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp
@@ -118,7 +118,8 @@ int _kStageTypeValues[] = {
   StageType::COLUMNSTATS,
   StageType::REPL_DUMP,
   StageType::REPL_BOOTSTRAP_LOAD,
-  StageType::REPL_STATE_LOG
+  StageType::REPL_STATE_LOG,
+  StageType::REPL_TXN
 };
 const char* _kStageTypeNames[] = {
   "CONDITIONAL",
@@ -135,9 +136,10 @@ const char* _kStageTypeNames[] = {
   "COLUMNSTATS",
   "REPL_DUMP",
   "REPL_BOOTSTRAP_LOAD",
-  "REPL_STATE_LOG"
+  "REPL_STATE_LOG",
+  "REPL_TXN"
 };
-const std::map<int, const char*> 
_StageType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(15, 
_kStageTypeValues, _kStageTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, 
NULL));
+const std::map<int, const char*> 
_StageType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(16, 
_kStageTypeValues, _kStageTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, 
NULL));
 
 
 Adjacency::~Adjacency() throw() {

http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/ql/src/gen/thrift/gen-cpp/queryplan_types.h
----------------------------------------------------------------------
diff --git a/ql/src/gen/thrift/gen-cpp/queryplan_types.h 
b/ql/src/gen/thrift/gen-cpp/queryplan_types.h
index 18dc867..eb02107 100644
--- a/ql/src/gen/thrift/gen-cpp/queryplan_types.h
+++ b/ql/src/gen/thrift/gen-cpp/queryplan_types.h
@@ -96,7 +96,8 @@ struct StageType {
     COLUMNSTATS = 11,
     REPL_DUMP = 12,
     REPL_BOOTSTRAP_LOAD = 13,
-    REPL_STATE_LOG = 14
+    REPL_STATE_LOG = 14,
+    REPL_TXN = 15
   };
 };
 

http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java
----------------------------------------------------------------------
diff --git 
a/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java
 
b/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java
index ed408d2..08822b3 100644
--- 
a/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java
+++ 
b/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java
@@ -26,7 +26,8 @@ public enum StageType implements org.apache.thrift.TEnum {
   COLUMNSTATS(11),
   REPL_DUMP(12),
   REPL_BOOTSTRAP_LOAD(13),
-  REPL_STATE_LOG(14);
+  REPL_STATE_LOG(14),
+  REPL_TXN(15);
 
   private final int value;
 
@@ -77,6 +78,8 @@ public enum StageType implements org.apache.thrift.TEnum {
         return REPL_BOOTSTRAP_LOAD;
       case 14:
         return REPL_STATE_LOG;
+      case 15:
+        return REPL_TXN;
       default:
         return null;
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/ql/src/gen/thrift/gen-php/Types.php
----------------------------------------------------------------------
diff --git a/ql/src/gen/thrift/gen-php/Types.php 
b/ql/src/gen/thrift/gen-php/Types.php
index bca2eee..df4e41d 100644
--- a/ql/src/gen/thrift/gen-php/Types.php
+++ b/ql/src/gen/thrift/gen-php/Types.php
@@ -117,6 +117,7 @@ final class StageType {
   const REPL_DUMP = 12;
   const REPL_BOOTSTRAP_LOAD = 13;
   const REPL_STATE_LOG = 14;
+  const REPL_TXN = 15;
   static public $__names = array(
     0 => 'CONDITIONAL',
     1 => 'COPY',
@@ -133,6 +134,7 @@ final class StageType {
     12 => 'REPL_DUMP',
     13 => 'REPL_BOOTSTRAP_LOAD',
     14 => 'REPL_STATE_LOG',
+    15 => 'REPL_TXN',
   );
 }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/ql/src/gen/thrift/gen-py/queryplan/ttypes.py
----------------------------------------------------------------------
diff --git a/ql/src/gen/thrift/gen-py/queryplan/ttypes.py 
b/ql/src/gen/thrift/gen-py/queryplan/ttypes.py
index 1f0d627..85d39fd 100644
--- a/ql/src/gen/thrift/gen-py/queryplan/ttypes.py
+++ b/ql/src/gen/thrift/gen-py/queryplan/ttypes.py
@@ -163,6 +163,7 @@ class StageType:
   REPL_DUMP = 12
   REPL_BOOTSTRAP_LOAD = 13
   REPL_STATE_LOG = 14
+  REPL_TXN = 15
 
   _VALUES_TO_NAMES = {
     0: "CONDITIONAL",
@@ -180,6 +181,7 @@ class StageType:
     12: "REPL_DUMP",
     13: "REPL_BOOTSTRAP_LOAD",
     14: "REPL_STATE_LOG",
+    15: "REPL_TXN",
   }
 
   _NAMES_TO_VALUES = {
@@ -198,6 +200,7 @@ class StageType:
     "REPL_DUMP": 12,
     "REPL_BOOTSTRAP_LOAD": 13,
     "REPL_STATE_LOG": 14,
+    "REPL_TXN": 15,
   }
 
 

http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/ql/src/gen/thrift/gen-rb/queryplan_types.rb
----------------------------------------------------------------------
diff --git a/ql/src/gen/thrift/gen-rb/queryplan_types.rb 
b/ql/src/gen/thrift/gen-rb/queryplan_types.rb
index 88d9c17..6010f3d 100644
--- a/ql/src/gen/thrift/gen-rb/queryplan_types.rb
+++ b/ql/src/gen/thrift/gen-rb/queryplan_types.rb
@@ -75,8 +75,9 @@ module StageType
   REPL_DUMP = 12
   REPL_BOOTSTRAP_LOAD = 13
   REPL_STATE_LOG = 14
-  VALUE_MAP = {0 => "CONDITIONAL", 1 => "COPY", 2 => "DDL", 3 => "MAPRED", 4 
=> "EXPLAIN", 5 => "FETCH", 6 => "FUNC", 7 => "MAPREDLOCAL", 8 => "MOVE", 9 => 
"STATS", 10 => "DEPENDENCY_COLLECTION", 11 => "COLUMNSTATS", 12 => "REPL_DUMP", 
13 => "REPL_BOOTSTRAP_LOAD", 14 => "REPL_STATE_LOG"}
-  VALID_VALUES = Set.new([CONDITIONAL, COPY, DDL, MAPRED, EXPLAIN, FETCH, 
FUNC, MAPREDLOCAL, MOVE, STATS, DEPENDENCY_COLLECTION, COLUMNSTATS, REPL_DUMP, 
REPL_BOOTSTRAP_LOAD, REPL_STATE_LOG]).freeze
+  REPL_TXN = 15
+  VALUE_MAP = {0 => "CONDITIONAL", 1 => "COPY", 2 => "DDL", 3 => "MAPRED", 4 
=> "EXPLAIN", 5 => "FETCH", 6 => "FUNC", 7 => "MAPREDLOCAL", 8 => "MOVE", 9 => 
"STATS", 10 => "DEPENDENCY_COLLECTION", 11 => "COLUMNSTATS", 12 => "REPL_DUMP", 
13 => "REPL_BOOTSTRAP_LOAD", 14 => "REPL_STATE_LOG", 15 => "REPL_TXN"}
+  VALID_VALUES = Set.new([CONDITIONAL, COPY, DDL, MAPRED, EXPLAIN, FETCH, 
FUNC, MAPREDLOCAL, MOVE, STATS, DEPENDENCY_COLLECTION, COLUMNSTATS, REPL_DUMP, 
REPL_BOOTSTRAP_LOAD, REPL_STATE_LOG, REPL_TXN]).freeze
 end
 
 class Adjacency

http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java
new file mode 100644
index 0000000..1cdeeb6
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
+import org.apache.hadoop.hive.ql.plan.api.StageType;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.StringUtils;
+import java.util.List;
+
+/**
+ * ReplTxnTask.
+ * Used for replaying the transaction related events.
+ */
+public class ReplTxnTask extends Task<ReplTxnWork> {
+
+  private static final long serialVersionUID = 1L;
+
+  public ReplTxnTask() {
+    super();
+  }
+
+  @Override
+  public int execute(DriverContext driverContext) {
+    String replPolicy = work.getReplPolicy();
+    if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
+      Utilities.FILE_OP_LOGGER.trace("Executing ReplTxnTask " + 
work.getOperationType().toString() +
+              " for txn ids : " + work.getTxnIds().toString() + " replPolicy : 
" + replPolicy);
+    }
+    try {
+      HiveTxnManager txnManager = driverContext.getCtx().getHiveTxnManager();
+      String user = UserGroupInformation.getCurrentUser().getUserName();
+      LOG.debug("Replaying " + work.getOperationType().toString() + " Event 
for policy " +
+              replPolicy + " with srcTxn " + work.getTxnIds().toString());
+      switch(work.getOperationType()) {
+      case REPL_OPEN_TXN:
+        List<Long> txnIds = txnManager.replOpenTxn(replPolicy, 
work.getTxnIds(), user);
+        assert txnIds.size() == work.getTxnIds().size();
+        LOG.info("Replayed OpenTxn Event for policy " + replPolicy + " with 
srcTxn " +
+                work.getTxnIds().toString() + " and target txn id " + 
txnIds.toString());
+        return 0;
+      case REPL_ABORT_TXN:
+        for (long txnId : work.getTxnIds()) {
+          txnManager.replRollbackTxn(replPolicy, txnId);
+          LOG.info("Replayed AbortTxn Event for policy " + replPolicy + " with 
srcTxn " + txnId);
+        }
+        return 0;
+      case REPL_COMMIT_TXN:
+        for (long txnId : work.getTxnIds()) {
+          txnManager.replCommitTxn(replPolicy, txnId);
+          LOG.info("Replayed CommitTxn Event for policy " + replPolicy + " 
with srcTxn " + txnId);
+        }
+        return 0;
+      default:
+        LOG.error("Operation Type " + work.getOperationType() + " is not 
supported ");
+        return 1;
+      }
+    } catch (Exception e) {
+      console.printError("Failed with exception " + e.getMessage(), "\n"
+          + StringUtils.stringifyException(e));
+      setException(e);
+      return 1;
+    }
+  }
+
+  @Override
+  public StageType getType() {
+    return StageType.REPL_TXN;
+  }
+
+  @Override
+  public String getName() {
+    return "REPL_TRANSACTION";
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnWork.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnWork.java
new file mode 100644
index 0000000..9467415
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnWork.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec;
+
+import java.io.Serializable;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.hive.ql.plan.Explain;
+import org.apache.hadoop.hive.ql.plan.Explain.Level;
+import java.util.List;
+
+/**
+ * ReplTxnTask.
+ * Used for replaying the transaction related events.
+ */
+@Explain(displayName = "Replication Transaction", explainLevels = { 
Level.USER, Level.DEFAULT, Level.EXTENDED })
+public class ReplTxnWork implements Serializable {
+  private static final long serialVersionUID = 1L;
+  private String dbName;
+  private String tableName;
+  private List<Long> txnIds;
+
+  /**
+   * OperationType.
+   * Different kind of events supported for replaying.
+   */
+  public enum OperationType {
+    REPL_OPEN_TXN, REPL_ABORT_TXN, REPL_COMMIT_TXN
+  }
+
+  OperationType operation;
+
+  public ReplTxnWork(String dbName, String tableName, List<Long> txnIds, 
OperationType type) {
+    this.txnIds = txnIds;
+    this.dbName = dbName;
+    this.tableName = tableName;
+    this.operation = type;
+  }
+
+  public ReplTxnWork(String dbName, String tableName, Long txnId, 
OperationType type) {
+    this.txnIds = Lists.newArrayList(txnId);
+    this.dbName = dbName;
+    this.tableName = tableName;
+    this.operation = type;
+  }
+
+  public List<Long> getTxnIds() {
+    return txnIds;
+  }
+
+  public Long getTxnId(int idx) {
+    return txnIds.get(idx);
+  }
+
+  public String getDbName() {
+    return dbName;
+  }
+
+  public String getTableName()  {
+    return tableName;
+  }
+
+  public String getReplPolicy() {
+    if ((dbName == null) || (dbName.isEmpty())) {
+      return null;
+    } else if ((tableName == null) || (tableName.isEmpty())) {
+      return dbName.toLowerCase() + ".*";
+    } else {
+      return dbName.toLowerCase() + "." + tableName.toLowerCase();
+    }
+  }
+
+  public OperationType getOperationType() {
+    return operation;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
index ccfd4cb..10a2ed2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
@@ -113,6 +113,7 @@ public final class TaskFactory {
     taskvec.add(new TaskTuple<>(ReplLoadWork.class, ReplLoadTask.class));
     taskvec.add(new TaskTuple<>(ReplStateLogWork.class, 
ReplStateLogTask.class));
     taskvec.add(new TaskTuple<ExportWork>(ExportWork.class, ExportTask.class));
+    taskvec.add(new TaskTuple<ReplTxnWork>(ReplTxnWork.class, 
ReplTxnTask.class));
   }
 
   private static ThreadLocal<Integer> tid = new ThreadLocal<Integer>() {

http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index a9ebc90..44a7496 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -1794,4 +1794,14 @@ public class AcidUtils {
     }
     return fileList;
   }
+
+  public static boolean isAcidEnabled(HiveConf hiveConf) {
+    String txnMgr = hiveConf.getVar(HiveConf.ConfVars.HIVE_TXN_MANAGER);
+    boolean concurrency =  
hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY);
+    String dbTxnMgr = "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager";
+    if (txnMgr.equals(dbTxnMgr) && concurrency) {
+      return true;
+    }
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java 
b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index 89ca1ff..6513e0f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -203,6 +203,15 @@ public final class DbTxnManager extends HiveTxnManagerImpl 
{
   }
 
   @Override
+  public List<Long> replOpenTxn(String replPolicy, List<Long> srcTxnIds, 
String user)  throws LockException {
+    try {
+      return getMS().replOpenTxn(replPolicy, srcTxnIds, user);
+    } catch (TException e) {
+      throw new LockException(e, ErrorMsg.METASTORE_COMMUNICATION_FAILED);
+    }
+  }
+
+  @Override
   public long openTxn(Context ctx, String user) throws LockException {
     return openTxn(ctx, user, 0);
   }
@@ -591,6 +600,22 @@ public final class DbTxnManager extends HiveTxnManagerImpl 
{
   }
 
   @Override
+  public void replCommitTxn(String replPolicy, long srcTxnId) throws 
LockException {
+    try {
+      getMS().replCommitTxn(srcTxnId, replPolicy);
+    } catch (NoSuchTxnException e) {
+      LOG.error("Metastore could not find " + 
JavaUtils.txnIdToString(srcTxnId));
+      throw new LockException(e, ErrorMsg.TXN_NO_SUCH_TRANSACTION, 
JavaUtils.txnIdToString(srcTxnId));
+    } catch (TxnAbortedException e) {
+      LockException le = new LockException(e, ErrorMsg.TXN_ABORTED, 
JavaUtils.txnIdToString(srcTxnId), e.getMessage());
+      LOG.error(le.getMessage());
+      throw le;
+    } catch (TException e) {
+      throw new 
LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), e);
+    }
+  }
+
+  @Override
   public void commitTxn() throws LockException {
     if (!isTxnOpen()) {
       throw new RuntimeException("Attempt to commit before opening a 
transaction");
@@ -617,6 +642,21 @@ public final class DbTxnManager extends HiveTxnManagerImpl 
{
       tableWriteIds.clear();
     }
   }
+  @Override
+  public void replRollbackTxn(String replPolicy, long srcTxnId) throws 
LockException {
+    try {
+      getMS().replRollbackTxn(srcTxnId, replPolicy);
+    } catch (NoSuchTxnException e) {
+      LOG.error("Metastore could not find " + 
JavaUtils.txnIdToString(srcTxnId));
+      throw new LockException(e, ErrorMsg.TXN_NO_SUCH_TRANSACTION, 
JavaUtils.txnIdToString(srcTxnId));
+    } catch (TxnAbortedException e) {
+      LockException le = new LockException(e, ErrorMsg.TXN_ABORTED, 
JavaUtils.txnIdToString(srcTxnId), e.getMessage());
+      LOG.error(le.getMessage());
+      throw le;
+    } catch (TException e) {
+      throw new 
LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), e);
+    }
+  }
 
   @Override
   public void rollbackTxn() throws LockException {

http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java 
b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
index 7413074..9057bb9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
@@ -56,6 +56,11 @@ class DummyTxnManager extends HiveTxnManagerImpl {
     return 0L;
   }
   @Override
+  public List<Long> replOpenTxn(String replPolicy, List<Long> srcTxnIds, 
String user)  throws LockException {
+    return null;
+  }
+
+  @Override
   public boolean isTxnOpen() {
     return false;
   }
@@ -209,11 +214,21 @@ class DummyTxnManager extends HiveTxnManagerImpl {
   }
 
   @Override
+  public void replCommitTxn(String replPolicy, long srcTxnId) throws 
LockException {
+    // No-op
+  }
+
+  @Override
   public void rollbackTxn() throws LockException {
     // No-op
   }
 
   @Override
+  public void replRollbackTxn(String replPolicy, long srcTxnId) throws 
LockException {
+    // No-op
+  }
+
+  @Override
   public void heartbeat() throws LockException {
     // No-op
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java 
b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
index 0db2a2c..490f3b8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.hive.ql.plan.LockDatabaseDesc;
 import org.apache.hadoop.hive.ql.plan.LockTableDesc;
 import org.apache.hadoop.hive.ql.plan.UnlockDatabaseDesc;
 import org.apache.hadoop.hive.ql.plan.UnlockTableDesc;
-
 import java.util.List;
 
 /**
@@ -48,6 +47,32 @@ public interface HiveTxnManager {
   long openTxn(Context ctx, String user) throws LockException;
 
   /**
+   * Open a new transaction in target cluster.
+   * @param replPolicy Replication policy to uniquely identify the source 
cluster.
+   * @param srcTxnIds The ids of the transaction at the source cluster
+   * @param user The user who has fired the repl load command
+   * @return The new transaction id.
+   * @throws LockException in case of failure to start the transaction.
+   */
+  List<Long> replOpenTxn(String replPolicy, List<Long> srcTxnIds, String user) 
 throws LockException;
+
+  /**
+   * Commit the transaction in target cluster.
+   * @param replPolicy Replication policy to uniquely identify the source 
cluster.
+   * @param srcTxnId The id of the transaction at the source cluster
+   * @throws LockException in case of failure to commit the transaction.
+   */
+  void replCommitTxn(String replPolicy, long srcTxnId)  throws LockException;
+
+ /**
+   * Abort the transaction in target cluster.
+   * @param replPolicy Replication policy to uniquely identify the source 
cluster.
+   * @param srcTxnId The id of the transaction at the source cluster
+   * @throws LockException in case of failure to abort the transaction.
+   */
+  void replRollbackTxn(String replPolicy, long srcTxnId)  throws LockException;
+
+  /**
    * Get the lock manager.  This must be used rather than instantiating an
    * instance of the lock manager directly as the transaction manager will
    * choose which lock manager to instantiate.

http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index e862589..8b639f7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -409,10 +409,6 @@ public class ImportSemanticAnalyzer extends 
BaseSemanticAnalyzer {
 
     Task<?> copyTask = null;
     if (replicationSpec.isInReplicationScope()) {
-      if (isSourceMm || isAcid(writeId)) {
-        // Note: this is replication gap, not MM gap... Repl V2 is not ready 
yet.
-        throw new RuntimeException("Replicating MM and ACID tables is not 
supported");
-      }
       copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, 
destPath, x.getConf());
     } else {
       CopyWork cw = new CopyWork(dataPath, destPath, false);
@@ -502,10 +498,6 @@ public class ImportSemanticAnalyzer extends 
BaseSemanticAnalyzer {
 
       Task<?> copyTask = null;
       if (replicationSpec.isInReplicationScope()) {
-        if (isSourceMm || isAcid(writeId)) {
-          // Note: this is replication gap, not MM gap... Repl V2 is not ready 
yet.
-          throw new RuntimeException("Replicating MM and ACID tables is not 
supported");
-        }
         copyTask = ReplCopyTask.getLoadCopyTask(
             replicationSpec, new Path(srcLocation), destPath, x.getConf());
       } else {

http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index 9f18efd..79b2e48 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -34,6 +34,7 @@ import 
org.apache.hadoop.hive.ql.exec.repl.bootstrap.ReplLoadWork;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
 import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
 import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
 import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
@@ -243,6 +244,60 @@ public class ReplicationSemanticAnalyzer extends 
BaseSemanticAnalyzer {
     }
   }
 
+  private boolean shouldReplayEvent(FileStatus dir, DumpType dumpType) throws 
SemanticException {
+    // This functions filters out all the events which are already replayed. 
This can be done only
+    // for transaction related events as for other kind of events we can not 
gurantee that the last
+    // repl id stored in the database/table is valid.
+    if ((dumpType != DumpType.EVENT_ABORT_TXN) &&
+            (dumpType != DumpType.EVENT_OPEN_TXN) &&
+            (dumpType != DumpType.EVENT_COMMIT_TXN)) {
+      return true;
+    }
+
+    // if database itself is null then we can not filter out anything.
+    if (dbNameOrPattern == null || dbNameOrPattern.isEmpty()) {
+      return true;
+    } else if ((tblNameOrPattern == null) || (tblNameOrPattern.isEmpty())) {
+      Database database;
+      try {
+        database = Hive.get().getDatabase(dbNameOrPattern);
+      } catch (HiveException e) {
+        LOG.error("failed to get the database " + dbNameOrPattern);
+        throw new SemanticException(e);
+      }
+      String replLastId;
+      Map<String, String> params = database.getParameters();
+      if (params != null && 
(params.containsKey(ReplicationSpec.KEY.CURR_STATE_ID.toString()))) {
+        replLastId = params.get(ReplicationSpec.KEY.CURR_STATE_ID.toString());
+        if (Long.parseLong(replLastId) >= 
Long.parseLong(dir.getPath().getName())) {
+          LOG.debug("Event " + dumpType + " with replId " + 
Long.parseLong(dir.getPath().getName())
+                  + " is already replayed. LastReplId - " +  
Long.parseLong(replLastId));
+          return false;
+        }
+      }
+    } else {
+      Table tbl;
+      try {
+        tbl = Hive.get().getTable(dbNameOrPattern, tblNameOrPattern);
+      } catch (HiveException e) {
+        LOG.error("failed to get the table " + dbNameOrPattern + "." + 
tblNameOrPattern);
+        throw new SemanticException(e);
+      }
+      if (tbl != null) {
+        Map<String, String> params = tbl.getParameters();
+        if (params != null && 
(params.containsKey(ReplicationSpec.KEY.CURR_STATE_ID.toString()))) {
+          String replLastId = 
params.get(ReplicationSpec.KEY.CURR_STATE_ID.toString());
+          if (Long.parseLong(replLastId) >= 
Long.parseLong(dir.getPath().getName())) {
+            LOG.debug("Event " + dumpType + " with replId " + 
Long.parseLong(dir.getPath().getName())
+                    + " is already replayed. LastReplId - " +  
Long.parseLong(replLastId));
+            return false;
+          }
+        }
+      }
+    }
+    return true;
+  }
+
   /*
    * Example dump dirs we need to be able to handle :
    *
@@ -380,6 +435,13 @@ public class ReplicationSemanticAnalyzer extends 
BaseSemanticAnalyzer {
                 loadPath.toString(), dirsInLoadPath.length);
 
         for (FileStatus dir : dirsInLoadPath){
+          String locn = dir.getPath().toUri().toString();
+          DumpMetaData eventDmd = new DumpMetaData(new Path(locn), conf);
+
+          if (!shouldReplayEvent(dir, eventDmd.getDumpType())) {
+            continue;
+          }
+
           LOG.debug("Loading event from {} to {}.{}", dir.getPath().toUri(), 
dbNameOrPattern, tblNameOrPattern);
 
           // event loads will behave similar to table loads, with one crucial 
difference
@@ -401,8 +463,6 @@ public class ReplicationSemanticAnalyzer extends 
BaseSemanticAnalyzer {
           // Once this entire chain is generated, we add evTaskRoot to 
rootTasks, so as to execute the
           // entire chain
 
-          String locn = dir.getPath().toUri().toString();
-          DumpMetaData eventDmd = new DumpMetaData(new Path(locn), conf);
           MessageHandler.Context context = new 
MessageHandler.Context(dbNameOrPattern,
                                                           tblNameOrPattern, 
locn, taskChainTail,
                                                           eventDmd, conf, db, 
ctx, LOG);

http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java
index c69ecc9..5fab0d2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java
@@ -37,6 +37,9 @@ import 
org.apache.hadoop.hive.ql.parse.repl.load.message.RenameTableHandler;
 import org.apache.hadoop.hive.ql.parse.repl.load.message.TableHandler;
 import 
org.apache.hadoop.hive.ql.parse.repl.load.message.TruncatePartitionHandler;
 import org.apache.hadoop.hive.ql.parse.repl.load.message.TruncateTableHandler;
+import org.apache.hadoop.hive.ql.parse.repl.load.message.OpenTxnHandler;
+import org.apache.hadoop.hive.ql.parse.repl.load.message.CommitTxnHandler;
+import org.apache.hadoop.hive.ql.parse.repl.load.message.AbortTxnHandler;
 
 public enum DumpType {
 
@@ -183,6 +186,24 @@ public enum DumpType {
     public MessageHandler handler() {
       return new DropDatabaseHandler();
     }
+  },
+  EVENT_OPEN_TXN("EVENT_OPEN_TXN") {
+    @Override
+    public MessageHandler handler() {
+      return new OpenTxnHandler();
+    }
+  },
+  EVENT_COMMIT_TXN("EVENT_COMMIT_TXN") {
+    @Override
+    public MessageHandler handler() {
+      return new CommitTxnHandler();
+    }
+  },
+  EVENT_ABORT_TXN("EVENT_ABORT_TXN") {
+    @Override
+    public MessageHandler handler() {
+      return new AbortTxnHandler();
+    }
   };
 
   String type = null;

http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbortTxnHandler.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbortTxnHandler.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbortTxnHandler.java
new file mode 100644
index 0000000..b9a5d21
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbortTxnHandler.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.dump.events;
+
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
+
+class AbortTxnHandler extends AbstractEventHandler {
+
+  AbortTxnHandler(NotificationEvent event) {
+    super(event);
+  }
+
+  @Override
+  public void handle(Context withinContext) throws Exception {
+    LOG.info("Processing#{} ABORT_TXN message : {}", fromEventId(), 
event.getMessage());
+    DumpMetaData dmd = withinContext.createDmd(this);
+    dmd.setPayload(event.getMessage());
+    dmd.write();
+  }
+
+  @Override
+  public DumpType dumpType() {
+    return DumpType.EVENT_ABORT_TXN;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java
new file mode 100644
index 0000000..db97d7c
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java
@@ -0,0 +1,43 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.dump.events;
+
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
+
+class CommitTxnHandler extends AbstractEventHandler {
+
+  CommitTxnHandler(NotificationEvent event) {
+    super(event);
+  }
+
+  @Override
+  public void handle(Context withinContext) throws Exception {
+    LOG.info("Processing#{} COMMIT_TXN message : {}", fromEventId(), 
event.getMessage());
+    DumpMetaData dmd = withinContext.createDmd(this);
+    dmd.setPayload(event.getMessage());
+    dmd.write();
+  }
+
+  @Override
+  public DumpType dumpType() {
+    return DumpType.EVENT_COMMIT_TXN;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java
index 9955246..10ff21c 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java
@@ -50,6 +50,9 @@ public class EventHandlerFactory {
     register(MessageFactory.DROP_CONSTRAINT_EVENT, 
DropConstraintHandler.class);
     register(MessageFactory.CREATE_DATABASE_EVENT, 
CreateDatabaseHandler.class);
     register(MessageFactory.DROP_DATABASE_EVENT, DropDatabaseHandler.class);
+    register(MessageFactory.OPEN_TXN_EVENT, OpenTxnHandler.class);
+    register(MessageFactory.COMMIT_TXN_EVENT, CommitTxnHandler.class);
+    register(MessageFactory.ABORT_TXN_EVENT, AbortTxnHandler.class);
   }
 
   static void register(String event, Class<? extends EventHandler> 
handlerClazz) {

http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/OpenTxnHandler.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/OpenTxnHandler.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/OpenTxnHandler.java
new file mode 100644
index 0000000..fe81fe1
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/OpenTxnHandler.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.dump.events;
+
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
+
+class OpenTxnHandler extends AbstractEventHandler {
+
+  OpenTxnHandler(NotificationEvent event) {
+    super(event);
+  }
+
+  @Override
+  public void handle(Context withinContext) throws Exception {
+    LOG.info("Processing#{} OPEN_TXN message : {}", fromEventId(), 
event.getMessage());
+    DumpMetaData dmd = withinContext.createDmd(this);
+    dmd.setPayload(event.getMessage());
+    dmd.write();
+  }
+
+  @Override
+  public DumpType dumpType() {
+    return DumpType.EVENT_OPEN_TXN;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbortTxnHandler.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbortTxnHandler.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbortTxnHandler.java
new file mode 100644
index 0000000..5b2c85b
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbortTxnHandler.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.load.message;
+
+import org.apache.hadoop.hive.metastore.messaging.AbortTxnMessage;
+import org.apache.hadoop.hive.ql.exec.ReplTxnWork;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * AbortTxnHandler
+ * Target(Load) side handler for abort transaction event.
+ */
+public class AbortTxnHandler extends AbstractMessageHandler {
+  @Override
+  public List<Task<? extends Serializable>> handle(Context context)
+      throws SemanticException {
+    if (!AcidUtils.isAcidEnabled(context.hiveConf)) {
+      context.log.error("Cannot load transaction events as acid is not 
enabled");
+      throw new SemanticException("Cannot load transaction events as acid is 
not enabled");
+    }
+
+    AbortTxnMessage msg = 
deserializer.getAbortTxnMessage(context.dmd.getPayload());
+
+    Task<ReplTxnWork> abortTxnTask = TaskFactory.get(
+        new ReplTxnWork(context.dbName, context.tableName, msg.getTxnId(), 
ReplTxnWork.OperationType.REPL_ABORT_TXN),
+        context.hiveConf
+    );
+    updatedMetadata.set(context.dmd.getEventTo().toString(), context.dbName, 
context.tableName, null);
+    context.log.debug("Added Abort txn task : {}", abortTxnTask.getId());
+    return Collections.singletonList(abortTxnTask);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java
new file mode 100644
index 0000000..461a0f1
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.load.message;
+
+import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage;
+import org.apache.hadoop.hive.ql.exec.ReplTxnWork;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * CommitTxnHandler
+ * Target(Load) side handler for commit transaction event.
+ */
+public class CommitTxnHandler extends AbstractMessageHandler {
+  @Override
+  public List<Task<? extends Serializable>> handle(Context context)
+      throws SemanticException {
+    if (!AcidUtils.isAcidEnabled(context.hiveConf)) {
+      context.log.error("Cannot load transaction events as acid is not 
enabled");
+      throw new SemanticException("Cannot load transaction events as acid is 
not enabled");
+    }
+
+    CommitTxnMessage msg = 
deserializer.getCommitTxnMessage(context.dmd.getPayload());
+
+    Task<ReplTxnWork> commitTxnTask = TaskFactory.get(
+        new ReplTxnWork(context.dbName, context.tableName, msg.getTxnId(),
+                ReplTxnWork.OperationType.REPL_COMMIT_TXN), context.hiveConf
+    );
+    updatedMetadata.set(context.dmd.getEventTo().toString(), context.dbName, 
context.tableName, null);
+    context.log.debug("Added Commit txn task : {}", commitTxnTask.getId());
+    return Collections.singletonList(commitTxnTask);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/OpenTxnHandler.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/OpenTxnHandler.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/OpenTxnHandler.java
new file mode 100644
index 0000000..c6349ea
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/OpenTxnHandler.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.load.message;
+
+import org.apache.hadoop.hive.metastore.messaging.OpenTxnMessage;
+import org.apache.hadoop.hive.ql.exec.ReplTxnWork;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * OpenTxnHandler
+ * Target(Load) side handler for open transaction event.
+ */
+public class OpenTxnHandler extends AbstractMessageHandler {
+  @Override
+  public List<Task<? extends Serializable>> handle(Context context)
+      throws SemanticException {
+    if (!AcidUtils.isAcidEnabled(context.hiveConf)) {
+      context.log.error("Cannot load transaction events as acid is not 
enabled");
+      throw new SemanticException("Cannot load transaction events as acid is 
not enabled");
+    }
+    OpenTxnMessage msg = 
deserializer.getOpenTxnMessage(context.dmd.getPayload());
+
+    Task<ReplTxnWork> openTxnTask = TaskFactory.get(
+        new ReplTxnWork(context.dbName, context.tableName, msg.getTxnIds(), 
ReplTxnWork.OperationType.REPL_OPEN_TXN),
+        context.hiveConf
+    );
+    updatedMetadata.set(context.dmd.getEventTo().toString(), context.dbName, 
context.tableName, null);
+    context.log.debug("Added Open txn task : {}", openTxnTask.getId());
+    return Collections.singletonList(openTxnTask);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/59483bca/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java 
b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
index 7b510dd..f1da15b 100644
--- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
+++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.metastore.txn;
 import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
+import org.apache.hadoop.hive.metastore.api.AbortTxnsRequest;
 import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions;
 import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsRequest;
 import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsResponse;
@@ -78,6 +79,8 @@ import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
 
 import static junit.framework.Assert.assertEquals;
 import static junit.framework.Assert.assertFalse;
@@ -1541,6 +1544,24 @@ public class TestTxnHandler {
     Assert.assertTrue("regex should be retryable", result);
   }
 
+  @Test
+  public void testReplOpenTxn() throws Exception {
+    int numTxn = 50000;
+    conf.setIntVar(HiveConf.ConfVars.HIVE_TXN_MAX_OPEN_BATCH, numTxn);
+    OpenTxnRequest rqst = new OpenTxnRequest(numTxn, "me", "localhost");
+    rqst.setReplPolicy("default.*");
+    rqst.setReplSrcTxnIds(LongStream.rangeClosed(1, numTxn)
+            .boxed().collect(Collectors.toList()));
+    OpenTxnsResponse openedTxns = txnHandler.openTxns(rqst);
+    List<Long> txnList = openedTxns.getTxn_ids();
+    assertEquals(txnList.size(), numTxn);
+    for (long i = 0; i < numTxn; i++) {
+      long txnId = txnList.get((int) i);
+      assertEquals(i+1, txnId);
+    }
+    txnHandler.abortTxns(new AbortTxnsRequest(txnList));
+  }
+
   private void updateTxns(Connection conn) throws SQLException {
     Statement stmt = conn.createStatement();
     stmt.executeUpdate("update TXNS set txn_last_heartbeat = 
txn_last_heartbeat + 1");

Reply via email to