Murtadha Hubail has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/883

Change subject: Txn Log Replication Optimizations
......................................................................

Txn Log Replication Optimizations

- Add configurations for txn log replication.
- Establish handshake for continuous log replication.
- Replicate logs in batches instead of one by one.
- Eliminate HashMap iterator object creation per logs batch.
- Fix synchronization for jobs' ACKs from remote replicas.
- Bug fixes for big objects replication.

Change-Id: I25b5b84eba0cd41ac8e87e71368072879fcf8582
---
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
M asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
M 
asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java
R 
asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/TxnLogReplicator.java
M 
asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java
M 
asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
M 
asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
M 
asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
16 files changed, 437 insertions(+), 338 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/83/883/1

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
index 0a6d62d..c71d77e 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
@@ -150,7 +150,7 @@
         }
     }
 
-    private void startReplicationService() {
+    private void startReplicationService() throws InterruptedException {
         //Open replication channel
         runtimeContext.getReplicationChannel().start();
 
diff --git 
a/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml 
b/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml
index b1d0649..1d693f9 100644
--- a/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml
+++ b/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml
@@ -98,10 +98,4 @@
     <description>Enabling plot of Algebricks plan to tmp folder. (Default = 
false)
     </description>
   </property>
-  <property>
-    <name>log.level</name>
-    <value>WARNING</value>
-    <description>The minimum log level to be displayed. (Default = INFO)
-    </description>
-  </property>
 </asterixConfiguration>
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
index 5d31d9a..90ec60b 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
@@ -26,6 +26,8 @@
 import org.apache.asterix.common.replication.Replica;
 import org.apache.asterix.event.schema.cluster.Cluster;
 import org.apache.asterix.event.schema.cluster.Node;
+import org.apache.hyracks.util.StorageUtil;
+import org.apache.hyracks.util.StorageUtil.StorageUnit;
 
 public class AsterixReplicationProperties extends AbstractAsterixProperties {
 
@@ -38,6 +40,16 @@
     private static final String NODE_IP_ADDRESS_DEFAULT = "127.0.0.1";
     private final String NODE_NAME_PREFIX;
     private final Cluster cluster;
+
+    private static final String REPLICATION_LOG_BATCH_SIZE_KEY = 
"replication.log.batchsize";
+    private static final int REPLICATION_LOG_BATCH_SIZE_DEFAULT = 
StorageUtil.getSizeInBytes(4, StorageUnit.KILOBYTE);
+
+    private static final String REPLICATION_LOG_BUFFER_NUM_PAGES_KEY = 
"replication.log.buffer.numpages";
+    private static int REPLICATION_LOG_BUFFER_NUM_PAGES_DEFAULT = 8;
+
+    private static final String REPLICATION_LOG_BUFFER_PAGE_SIZE_KEY = 
"replication.log.buffer.pagesize";
+    private static final int REPLICATION_LOG_BUFFER_PAGE_SIZE_DEFAULT = 
StorageUtil.getSizeInBytes(128,
+            StorageUnit.KILOBYTE);
 
     public AsterixReplicationProperties(AsterixPropertiesAccessor accessor, 
Cluster cluster) {
         super(accessor);
@@ -245,4 +257,19 @@
     public int getMaxRemoteRecoveryAttempts() {
         return MAX_REMOTE_RECOVERY_ATTEMPTS;
     }
+
+    public int getLogBufferPageSize() {
+        return accessor.getProperty(REPLICATION_LOG_BUFFER_PAGE_SIZE_KEY, 
REPLICATION_LOG_BUFFER_PAGE_SIZE_DEFAULT,
+                PropertyInterpreters.getIntegerPropertyInterpreter());
+    }
+
+    public int getLogBufferNumOfPages() {
+        return accessor.getProperty(REPLICATION_LOG_BUFFER_NUM_PAGES_KEY, 
REPLICATION_LOG_BUFFER_NUM_PAGES_DEFAULT,
+                PropertyInterpreters.getIntegerPropertyInterpreter());
+    }
+
+    public int getLogBatchSize() {
+        return accessor.getProperty(REPLICATION_LOG_BATCH_SIZE_KEY, 
REPLICATION_LOG_BATCH_SIZE_DEFAULT,
+                PropertyInterpreters.getIntegerPropertyInterpreter());
+    }
 }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
index a2738e8..9f9d74b 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
@@ -43,6 +43,7 @@
      * Requests the remaining LSM disk components files from active remote 
replicas.
      *
      * @throws IOException
+     * @throws InterruptedException
      */
-    public void completeFailbackProcess() throws IOException;
+    public void completeFailbackProcess() throws IOException, 
InterruptedException;
 }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
index 755fbbd..6bd1505 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.common.replication;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Set;
 
 import org.apache.asterix.common.transactions.ILogRecord;
@@ -31,8 +32,9 @@
      *
      * @param logRecord
      *            The log record to be replicated,
+     * @throws InterruptedException
      */
-    public void replicateLog(ILogRecord logRecord);
+    public void replicateLog(ILogRecord logRecord) throws InterruptedException;
 
     /**
      * Checks whether a log record has been replicated
@@ -79,8 +81,10 @@
 
     /**
      * Starts processing of ASYNC replication jobs as well as Txn logs.
+     *
+     * @throws InterruptedException
      */
-    public void startReplicationThreads();
+    public void startReplicationThreads() throws InterruptedException;
 
     /**
      * Checks and sets each remote replica state.
@@ -114,4 +118,13 @@
      * @throws IOException
      */
     public void requestFlushLaggingReplicaIndexes(long 
nonSharpCheckpointTargetLSN) throws IOException;
+
+    /**
+     * Transfers the contents of the {@code buffer} to active remote replicas.
+     * The transfer starts from the {@code buffer} current position to its 
limit.
+     * After the transfer, the {@code buffer} position will be its limit.
+     *
+     * @param buffer
+     */
+    public void replicateTxnLogBatch(ByteBuffer buffer);
 }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java
index 4c3f728..b8fe4b2 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java
@@ -62,8 +62,7 @@
     public InetSocketAddress getAddress(AsterixReplicationProperties 
asterixReplicationProperties) {
         String replicaIPAddress = node.getClusterIp();
         int replicationPort = 
asterixReplicationProperties.getDataReplicationPort(node.getId());
-        InetSocketAddress replicaAddress = 
InetSocketAddress.createUnresolved(replicaIPAddress, replicationPort);
-        return replicaAddress;
+        return InetSocketAddress.createUnresolved(replicaIPAddress, 
replicationPort);
     }
 
     public static Replica create(DataInput input) throws IOException {
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
index 3738cd1..1992a00 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
@@ -110,9 +110,7 @@
 
     public String getNodeId();
 
-    public int writeRemoteRecoveryLog(ByteBuffer buffer);
-
-    public RecordReadStatus readRemoteLog(ByteBuffer buffer, boolean 
remoteRecoveryLog);
+    public void readRemoteLog(ByteBuffer buffer);
 
     public void setReplicationThread(IReplicationThread replicationThread);
 
@@ -120,11 +118,7 @@
 
     public byte getLogSource();
 
-    public int getSerializedLogSize();
-
-    public void writeLogRecord(ByteBuffer buffer, long appendLSN);
-
-    public ByteBuffer getSerializedLog();
+    public int getRemoteLogSize();
 
     public void setNodeId(String nodeId);
 
@@ -138,4 +132,6 @@
      * @return a flag indicating whether the log record should be sent to 
remote replicas
      */
     public boolean isReplicated();
+
+    public void writeRemoteLogRecord(ByteBuffer buffer);
 }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
index fd56913..88698e3 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
@@ -44,8 +44,7 @@
  * PKValueSize(4)
  * PKValue(PKValueSize)
  * ---------------------------
- * [Header3] (20 bytes) : only for update log type
- * PrevLSN(8)
+ * [Header3] (12 bytes) : only for update log type
  * ResourceId(8) //stored in .metadata of the corresponding index in NC node
  * LogRecordSize(4)
  * ---------------------------
@@ -71,6 +70,24 @@
  */
 
 public class LogRecord implements ILogRecord {
+
+    private static final int LOG_SOURCE_LEN = Byte.BYTES;
+    private static final int TYPE_LEN = Byte.BYTES;
+    public static final int PKHASH_LEN = Integer.BYTES;
+    public static final int PKSZ_LEN = Integer.BYTES;
+    private static final int RS_PARTITION_LEN = Integer.BYTES;
+    private static final int RSID_LEN = Long.BYTES;
+    private static final int LOGRCD_SZ_LEN = Integer.BYTES;
+    private static final int FLDCNT_LEN = Integer.BYTES;
+    private static final int NEWOP_LEN = Byte.BYTES;
+    private static final int NEWVALSZ_LEN = Integer.BYTES;
+    private static final int CHKSUM_LEN = Long.BYTES;
+
+    private static final int ALL_RECORD_HEADER_LEN = LOG_SOURCE_LEN + TYPE_LEN 
+ JobId.BYTES;
+    private static final int ENTITYCOMMIT_UPDATE_HEADER_LEN = RS_PARTITION_LEN 
+ DatasetId.BYTES + PKHASH_LEN
+            + PKSZ_LEN;
+    private static final int UPDATE_LSN_HEADER = RSID_LEN + LOGRCD_SZ_LEN;
+    private static final int UPDATE_BODY_HEADER = FLDCNT_LEN + NEWOP_LEN + 
NEWVALSZ_LEN;
 
     // ------------- fields in a log record (begin) ------------//
     private byte logSource;
@@ -101,9 +118,8 @@
     private int[] PKFields;
     private PrimaryIndexOperationTracker opTracker;
     private IReplicationThread replicationThread;
-    private ByteBuffer serializedLog;
     /**
-     * The fields (numOfFlushedIndexes and nodeId) are used for serialized 
flush logs only
+     * The fields (numOfFlushedIndexes and nodeId) are used for remote flush 
logs only
      * to indicate the source of the log and how many indexes were flushed 
using its LSN.
      */
     private int numOfFlushedIndexes;
@@ -118,25 +134,6 @@
         checksumGen = new CRC32();
         logSource = LogSource.LOCAL;
     }
-
-    private final static int LOG_SOURCE_LEN = Byte.BYTES;
-    private final static int TYPE_LEN = Byte.BYTES;
-    public final static int PKHASH_LEN = Integer.BYTES;
-    public final static int PKSZ_LEN = Integer.BYTES;
-    private final static int RS_PARTITION_LEN = Integer.BYTES;
-    private final static int RSID_LEN = Long.BYTES;
-    private final static int LOGRCD_SZ_LEN = Integer.BYTES;
-    private final static int FLDCNT_LEN = Integer.BYTES;
-    private final static int NEWOP_LEN = Byte.BYTES;
-    private final static int NEWVALSZ_LEN = Integer.BYTES;
-    private final static int CHKSUM_LEN = Long.BYTES;
-
-    private final static int ALL_RECORD_HEADER_LEN = LOG_SOURCE_LEN + TYPE_LEN 
+ JobId.BYTES;
-    private final static int ENTITYCOMMIT_UPDATE_HEADER_LEN = RS_PARTITION_LEN 
+ DatasetId.BYTES + PKHASH_LEN
-            + PKSZ_LEN;
-    private final static int UPDATE_LSN_HEADER = RSID_LEN + LOGRCD_SZ_LEN;
-    private final static int UPDATE_BODY_HEADER = FLDCNT_LEN + NEWOP_LEN + 
NEWVALSZ_LEN;
-    private final static int REMOTE_FLUSH_LOG_EXTRA_FIELDS_LEN = Long.BYTES + 
Integer.BYTES + Integer.BYTES;
 
     private void writeLogRecordCommonFields(ByteBuffer buffer) {
         buffer.put(logSource);
@@ -173,39 +170,15 @@
         buffer.putLong(checksum);
     }
 
-    // this method is used when replication is enabled to include the log 
record LSN in the serialized version
     @Override
-    public void writeLogRecord(ByteBuffer buffer, long appendLSN) {
-        int beginOffset = buffer.position();
+    public void writeRemoteLogRecord(ByteBuffer buffer) {
         writeLogRecordCommonFields(buffer);
-
-        if (replicated) {
-            //copy the serialized log to send it to replicas
-            int serializedLogSize = getSerializedLogSize();
-            if (serializedLog == null || serializedLog.capacity() < 
serializedLogSize) {
-                serializedLog = ByteBuffer.allocate(serializedLogSize);
-            } else {
-                serializedLog.clear();
-            }
-
-            int currentPosition = buffer.position();
-            int currentLogSize = (currentPosition - beginOffset);
-
-            buffer.position(beginOffset);
-            buffer.get(serializedLog.array(), 0, currentLogSize);
-            serializedLog.position(currentLogSize);
-            if (logType == LogType.FLUSH) {
-                serializedLog.putLong(appendLSN);
-                serializedLog.putInt(numOfFlushedIndexes);
-                serializedLog.putInt(nodeId.length());
-                serializedLog.put(nodeId.getBytes());
-            }
-            serializedLog.flip();
-            buffer.position(currentPosition);
+        if (logType == LogType.FLUSH) {
+            buffer.putLong(LSN);
+            buffer.putInt(numOfFlushedIndexes);
+            buffer.putInt(nodeId.length());
+            buffer.put(nodeId.getBytes());
         }
-
-        checksum = generateChecksum(buffer, beginOffset, logSize - CHKSUM_LEN);
-        buffer.putLong(checksum);
     }
 
     private void writePKValue(ByteBuffer buffer) {
@@ -221,8 +194,12 @@
     }
 
     private void writeTuple(ByteBuffer buffer, ITupleReference tuple, int 
size) {
-        tupleWriter.writeTuple(tuple, buffer.array(), buffer.position());
-        // writeTuple() doesn't change the position of the buffer.
+        if (logSource == LogSource.LOCAL) {
+            tupleWriter.writeTuple(tuple, buffer.array(), buffer.position());
+        } else {
+            //since the tuple is already serialized in remote logs, just copy 
it from beginning to end.
+            System.arraycopy(tuple.getFieldData(0), 0, buffer.array(), 
buffer.position(), size);
+        }
         buffer.position(buffer.position() + size);
     }
 
@@ -323,47 +300,19 @@
     }
 
     @Override
-    public RecordReadStatus readRemoteLog(ByteBuffer buffer, boolean 
remoteRecoveryLog) {
-        int beginOffset = buffer.position();
-
+    public void readRemoteLog(ByteBuffer buffer) {
         //read common fields
-        RecordReadStatus status = readLogCommonFields(buffer);
-        if (status != RecordReadStatus.OK) {
-            buffer.position(beginOffset);
-            return status;
-        }
+        readLogCommonFields(buffer);
 
         if (logType == LogType.FLUSH) {
-            if (buffer.remaining() >= REMOTE_FLUSH_LOG_EXTRA_FIELDS_LEN) {
-                LSN = buffer.getLong();
-                numOfFlushedIndexes = buffer.getInt();
-                //read serialized node id
-                int nodeIdLength = buffer.getInt();
-                if (buffer.remaining() >= nodeIdLength) {
-                    byte[] nodeIdBytes = new byte[nodeIdLength];
-                    buffer.get(nodeIdBytes);
-                    nodeId = new String(nodeIdBytes);
-                } else {
-                    buffer.position(beginOffset);
-                    return RecordReadStatus.TRUNCATED;
-                }
-            } else {
-                buffer.position(beginOffset);
-                return RecordReadStatus.TRUNCATED;
-            }
+            LSN = buffer.getLong();
+            numOfFlushedIndexes = buffer.getInt();
+            //read serialized node id
+            int nodeIdLength = buffer.getInt();
+            byte[] nodeIdBytes = new byte[nodeIdLength];
+            buffer.get(nodeIdBytes);
+            nodeId = new String(nodeIdBytes);
         }
-
-        //remote recovery logs need to have the LSN to check which should be 
replayed
-        if (remoteRecoveryLog) {
-            if (buffer.remaining() >= Long.BYTES) {
-                LSN = buffer.getLong();
-            } else {
-                buffer.position(beginOffset);
-                return RecordReadStatus.TRUNCATED;
-            }
-        }
-
-        return RecordReadStatus.OK;
     }
 
     private ITupleReference readPKValue(ByteBuffer buffer) {
@@ -443,16 +392,6 @@
             builder.append(" ResourceId : ").append(resourceId);
         }
         return builder.toString();
-    }
-
-    @Override
-    public int writeRemoteRecoveryLog(ByteBuffer buffer) {
-        int bufferBegin = buffer.position();
-        writeLogRecordCommonFields(buffer);
-        //FLUSH logs should not included in remote recovery
-        //LSN must be included in all remote recovery logs
-        buffer.putLong(LSN);
-        return buffer.position() - bufferBegin;
     }
 
     ////////////////////////////////////////////
@@ -535,18 +474,18 @@
     }
 
     @Override
-    public int getSerializedLogSize() {
-        int serilizedSize = logSize;
+    public int getRemoteLogSize() {
+        int remoteLogSize = logSize;
         if (logType == LogType.FLUSH) {
             //LSN
-            serilizedSize += Long.BYTES;
+            remoteLogSize += Long.BYTES;
             //num of indexes
-            serilizedSize += Integer.BYTES;
+            remoteLogSize += Integer.BYTES;
             //serialized node id String
-            serilizedSize += Integer.BYTES + nodeId.length();
+            remoteLogSize += Integer.BYTES + nodeId.length();
         }
-        serilizedSize -= CHKSUM_LEN;
-        return serilizedSize;
+        remoteLogSize -= CHKSUM_LEN;
+        return remoteLogSize;
     }
 
     @Override
@@ -628,15 +567,6 @@
 
     public PrimaryIndexOperationTracker getOpTracker() {
         return opTracker;
-    }
-
-    @Override
-    public ByteBuffer getSerializedLog() {
-        return serializedLog;
-    }
-
-    public void setSerializedLog(ByteBuffer serializedLog) {
-        this.serializedLog = serializedLog;
     }
 
     @Override
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java
index 588968c..6afba79 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java
@@ -18,53 +18,42 @@
  */
 package org.apache.asterix.replication.logging;
 
-import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.asterix.common.transactions.ILogRecord;
-import org.apache.asterix.replication.functions.ReplicationProtocol;
-import org.apache.asterix.replication.management.NetworkingUtil;
 import org.apache.asterix.replication.management.ReplicationManager;
 
 public class ReplicationLogBuffer {
     private final int logBufferSize;
     private final AtomicBoolean full;
     private int appendOffset;
-    private int flushOffset;
+    private int replicationOffset;
     private final ByteBuffer appendBuffer;
-    private final ByteBuffer flushBuffer;
+    private final ByteBuffer replicationBuffer;
     private boolean stop;
-    private Map<String, SocketChannel> replicaSockets;
     private ReplicationManager replicationManager;
+    private final int batchSize;
 
-    public ReplicationLogBuffer(ReplicationManager replicationManager, int 
logBufferSize) {
+    public ReplicationLogBuffer(ReplicationManager replicationManager, int 
logBufferSize, int batchSize) {
         this.replicationManager = replicationManager;
         this.logBufferSize = logBufferSize;
+        this.batchSize = batchSize;
         appendBuffer = ByteBuffer.allocate(logBufferSize);
-        flushBuffer = appendBuffer.duplicate();
+        replicationBuffer = appendBuffer.duplicate();
         full = new AtomicBoolean(false);
         appendOffset = 0;
-        flushOffset = 0;
+        replicationOffset = 0;
     }
 
     public void append(ILogRecord logRecord) {
-        
appendBuffer.putInt(ReplicationProtocol.ReplicationRequestType.REPLICATE_LOG.ordinal());
-        appendBuffer.putInt(logRecord.getSerializedLogSize());
-        appendBuffer.put(logRecord.getSerializedLog());
+        appendBuffer.putInt(logRecord.getRemoteLogSize());
+        logRecord.writeRemoteLogRecord(appendBuffer);
 
         synchronized (this) {
             appendOffset += getLogReplicationSize(logRecord);
             this.notify();
         }
-    }
-
-    public void setReplicationSockets(Map<String, SocketChannel> 
replicaSockets) {
-        this.replicaSockets = replicaSockets;
     }
 
     public synchronized void isFull(boolean full) {
@@ -77,18 +66,18 @@
     }
 
     private static int getLogReplicationSize(ILogRecord logRecord) {
-        //request type + request length + serialized log length
-        return Integer.BYTES + Integer.BYTES + 
logRecord.getSerializedLogSize();
+        //log length (4 bytes) + remote log size
+        return Integer.BYTES + logRecord.getRemoteLogSize();
     }
 
     public void reset() {
         appendBuffer.position(0);
         appendBuffer.limit(logBufferSize);
-        flushBuffer.position(0);
-        flushBuffer.limit(logBufferSize);
+        replicationBuffer.position(0);
+        replicationBuffer.limit(logBufferSize);
         full.set(false);
         appendOffset = 0;
-        flushOffset = 0;
+        replicationOffset = 0;
         stop = false;
     }
 
@@ -96,7 +85,7 @@
         int endOffset;
         while (!full.get()) {
             synchronized (this) {
-                if (appendOffset - flushOffset == 0 && !full.get()) {
+                if (appendOffset - replicationOffset == 0 && !full.get()) {
                     try {
                         if (stop) {
                             break;
@@ -108,45 +97,51 @@
                 }
                 endOffset = appendOffset;
             }
-            internalFlush(flushOffset, endOffset);
+            internalFlush(replicationOffset, endOffset);
         }
-
-        internalFlush(flushOffset, appendOffset);
+        internalFlush(replicationOffset, appendOffset);
     }
 
     private void internalFlush(int beginOffset, int endOffset) {
         if (endOffset > beginOffset) {
-            int begingPos = flushBuffer.position();
-            flushBuffer.limit(endOffset);
-            sendRequest(replicaSockets, flushBuffer);
-            flushBuffer.position(begingPos + (endOffset - beginOffset));
-            flushOffset = endOffset;
+            int begingPos = replicationBuffer.position();
+            replicationBuffer.limit(endOffset);
+            transferBuffer(replicationBuffer);
+            replicationBuffer.position(begingPos + (endOffset - beginOffset));
+            replicationOffset = endOffset;
         }
     }
 
-    private void sendRequest(Map<String, SocketChannel> replicaSockets, 
ByteBuffer requestBuffer) {
-        Iterator<Map.Entry<String, SocketChannel>> iterator = 
replicaSockets.entrySet().iterator();
-        int begin = requestBuffer.position();
-        while (iterator.hasNext()) {
-            Entry<String, SocketChannel> replicaSocket = iterator.next();
-            SocketChannel clientSocket = replicaSocket.getValue();
-            try {
-                NetworkingUtil.transferBufferToChannel(clientSocket, 
requestBuffer);
-            } catch (IOException e) {
-                if (clientSocket.isOpen()) {
-                    try {
-                        clientSocket.close();
-                    } catch (IOException e2) {
-                        e2.printStackTrace();
+    private void transferBuffer(ByteBuffer buffer) {
+        if (buffer.remaining() <= batchSize) {
+            replicationManager.replicateTxnLogBatch(buffer);
+        } else {
+            int totalTransferLimit = buffer.limit();
+            while (buffer.hasRemaining()) {
+                if (buffer.remaining() > batchSize) {
+                    /**
+                     * break into smaller batches
+                     */
+                    //mark the beginning of this batch
+                    buffer.mark();
+                    int currentBatchSize = 0;
+                    while (currentBatchSize < batchSize) {
+                        int logSize = replicationBuffer.getInt();
+                        //add the size of the log record itself + 4 bytes for 
its size 
+                        currentBatchSize += logSize + Integer.BYTES;
+                        //go to the beginning of the next log
+                        buffer.position(buffer.position() + logSize);
                     }
+                    //set the limit to the end of this batch
+                    buffer.limit(buffer.position());
+                    //return to the beginning of the batch position
+                    buffer.reset();
                 }
-                replicationManager.reportFailedReplica(replicaSocket.getKey());
-                iterator.remove();
-            } finally {
-                requestBuffer.position(begin);
+                replicationManager.replicateTxnLogBatch(buffer);
+                //return the original limit to check the new remaining size
+                buffer.limit(totalTransferLimit);
             }
         }
-
     }
 
     public boolean isStop() {
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogFlusher.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/TxnLogReplicator.java
similarity index 83%
rename from 
asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogFlusher.java
rename to 
asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/TxnLogReplicator.java
index 3312cb1..d9d60d6 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogFlusher.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/TxnLogReplicator.java
@@ -21,24 +21,22 @@
 import java.util.concurrent.Callable;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Level;
 import java.util.logging.Logger;
-
-import org.apache.asterix.common.transactions.LogRecord;
 
 /**
  * This class is responsible for sending transactions logs to remote replicas.
  */
-public class ReplicationLogFlusher implements Callable<Boolean> {
-    private final static Logger LOGGER = 
Logger.getLogger(ReplicationLogFlusher.class.getName());
-    private final static ReplicationLogBuffer POISON_PILL = new 
ReplicationLogBuffer(null,
-            LogRecord.JOB_TERMINATE_LOG_SIZE);
+public class TxnLogReplicator implements Callable<Boolean> {
+    private static final Logger LOGGER = 
Logger.getLogger(TxnLogReplicator.class.getName());
+    private static final ReplicationLogBuffer POISON_PILL = new 
ReplicationLogBuffer(null, 0, 0);
     private final LinkedBlockingQueue<ReplicationLogBuffer> emptyQ;
     private final LinkedBlockingQueue<ReplicationLogBuffer> flushQ;
     private ReplicationLogBuffer flushPage;
     private final AtomicBoolean isStarted;
     private final AtomicBoolean terminateFlag;
 
-    public ReplicationLogFlusher(LinkedBlockingQueue<ReplicationLogBuffer> 
emptyQ,
+    public TxnLogReplicator(LinkedBlockingQueue<ReplicationLogBuffer> emptyQ,
             LinkedBlockingQueue<ReplicationLogBuffer> flushQ) {
         this.emptyQ = emptyQ;
         this.flushQ = flushQ;
@@ -74,7 +72,7 @@
 
     @Override
     public Boolean call() {
-        Thread.currentThread().setName("Replication Log Flusher");
+        Thread.currentThread().setName("TxnLog Replicator");
         synchronized (isStarted) {
             isStarted.set(true);
             isStarted.notify();
@@ -91,6 +89,7 @@
                     if (flushPage == null) {
                         continue;
                     }
+                    Thread.currentThread().interrupt();
                 }
                 flushPage.flush();
                 // TODO: pool large pages
@@ -99,8 +98,9 @@
                 }
             }
         } catch (Exception e) {
-            e.printStackTrace();
-            LOGGER.severe("ReplicationLogFlusher is terminating abnormally. 
Logs Replication Stopped.");
+            if (LOGGER.isLoggable(Level.SEVERE)) {
+                LOGGER.log(Level.SEVERE, "TxnLogReplicator is terminating 
abnormally. Logs Replication Stopped.", e);
+            }
             throw e;
         }
     }
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java
index 6023cb1..bc7451f 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java
@@ -21,6 +21,7 @@
 import java.io.EOFException;
 import java.io.IOException;
 import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.net.NetworkInterface;
 import java.net.SocketException;
 import java.nio.ByteBuffer;
@@ -88,7 +89,8 @@
         return hostName;
     }
 
-    public static void transferBufferToChannel(SocketChannel socketChannel, 
ByteBuffer requestBuffer) throws IOException {
+    public static void transferBufferToChannel(SocketChannel socketChannel, 
ByteBuffer requestBuffer)
+            throws IOException {
         while (requestBuffer.hasRemaining()) {
             socketChannel.write(requestBuffer);
         }
@@ -107,4 +109,10 @@
         long fileSize = fileChannel.size();
         fileChannel.transferFrom(socketChannel, pos, fileSize);
     }
+
+    public static InetSocketAddress getSocketAddress(SocketChannel 
socketChannel) {
+        String hostAddress = 
socketChannel.socket().getInetAddress().getHostAddress();
+        int port = socketChannel.socket().getPort();
+        return InetSocketAddress.createUnresolved(hostAddress, port);
+    }
 }
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
index a152f6c..4eeaede 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
@@ -414,46 +414,62 @@
         }
 
         private void handleLogReplication() throws IOException, ACIDException {
-            inBuffer = ReplicationProtocol.readRequest(socketChannel, 
inBuffer);
+            //set initial buffer size to a log buffer page size
+            inBuffer = ByteBuffer.allocate(logManager.getLogPageSize());
+            while (true) {
+                //read a batch of logs
+                inBuffer = ReplicationProtocol.readRequest(socketChannel, 
inBuffer);
+                //check if it is end of handshake (a single byte log)
+                if (inBuffer.remaining() == 1) {
+                    break;
+                }
 
-            //Deserialize log
-            remoteLog.readRemoteLog(inBuffer, false);
-            remoteLog.setLogSource(LogSource.REMOTE);
+                /**
+                 * Process logs batch
+                 */
+                while (inBuffer.hasRemaining()) {
+                    //get rid of log size
+                    inBuffer.getInt();
+                    //Deserialize log
+                    remoteLog.readRemoteLog(inBuffer);
+                    remoteLog.setLogSource(LogSource.REMOTE);
 
-            switch (remoteLog.getLogType()) {
-                case LogType.UPDATE:
-                case LogType.ENTITY_COMMIT:
-                case LogType.UPSERT_ENTITY_COMMIT:
-                    //if the log partition belongs to a partitions hosted on 
this node, replicate it
-                    if 
(nodeHostedPartitions.contains(remoteLog.getResourcePartition())) {
-                        logManager.log(remoteLog);
+                    switch (remoteLog.getLogType()) {
+                        case LogType.UPDATE:
+                        case LogType.ENTITY_COMMIT:
+                        case LogType.UPSERT_ENTITY_COMMIT:
+                            //if the log partition belongs to a partitions 
hosted on this node, replicate it
+                            if 
(nodeHostedPartitions.contains(remoteLog.getResourcePartition())) {
+                                logManager.log(remoteLog);
+                            }
+                            break;
+                        case LogType.JOB_COMMIT:
+                        case LogType.ABORT:
+                            LogRecord jobTerminationLog = new LogRecord();
+                            
TransactionUtil.formJobTerminateLogRecord(jobTerminationLog, 
remoteLog.getJobId(),
+                                    remoteLog.getLogType() == 
LogType.JOB_COMMIT);
+                            jobTerminationLog.setReplicationThread(this);
+                            jobTerminationLog.setLogSource(LogSource.REMOTE);
+                            logManager.log(jobTerminationLog);
+                            break;
+                        case LogType.FLUSH:
+                            //store mapping information for flush logs to use 
them in incoming LSM components.
+                            RemoteLogMapping flushLogMap = new 
RemoteLogMapping();
+                            flushLogMap.setRemoteNodeID(remoteLog.getNodeId());
+                            flushLogMap.setRemoteLSN(remoteLog.getLSN());
+                            logManager.log(remoteLog);
+                            //the log LSN value is updated by 
logManager.log(.) to a local value
+                            flushLogMap.setLocalLSN(remoteLog.getLSN());
+                            
flushLogMap.numOfFlushedIndexes.set(remoteLog.getNumOfFlushedIndexes());
+                            
replicaUniqueLSN2RemoteMapping.put(flushLogMap.getNodeUniqueLSN(), flushLogMap);
+                            synchronized (flushLogslock) {
+                                flushLogslock.notify();
+                            }
+                            break;
+                        default:
+                            LOGGER.severe("Unsupported LogType: " + 
remoteLog.getLogType());
                     }
-                    break;
-                case LogType.JOB_COMMIT:
-                case LogType.ABORT:
-                    LogRecord jobTerminationLog = new LogRecord();
-                    
TransactionUtil.formJobTerminateLogRecord(jobTerminationLog, 
remoteLog.getJobId(),
-                            remoteLog.getLogType() == LogType.JOB_COMMIT);
-                    jobTerminationLog.setReplicationThread(this);
-                    jobTerminationLog.setLogSource(LogSource.REMOTE);
-                    logManager.log(jobTerminationLog);
-                    break;
-                case LogType.FLUSH:
-                    //store mapping information for flush logs to use them in 
incoming LSM components.
-                    RemoteLogMapping flushLogMap = new RemoteLogMapping();
-                    flushLogMap.setRemoteNodeID(remoteLog.getNodeId());
-                    flushLogMap.setRemoteLSN(remoteLog.getLSN());
-                    logManager.log(remoteLog);
-                    //the log LSN value is updated by logManager.log(.) to a 
local value
-                    flushLogMap.setLocalLSN(remoteLog.getLSN());
-                    
flushLogMap.numOfFlushedIndexes.set(remoteLog.getNumOfFlushedIndexes());
-                    
replicaUniqueLSN2RemoteMapping.put(flushLogMap.getNodeUniqueLSN(), flushLogMap);
-                    synchronized (flushLogslock) {
-                        flushLogslock.notify();
-                    }
-                    break;
-                default:
-                    LOGGER.severe("Unsupported LogType: " + 
remoteLog.getLogType());
+                }
             }
         }
 
@@ -594,4 +610,4 @@
             }
         }
     }
-}
\ No newline at end of file
+}
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
index ee872a5..fc1956d 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
@@ -71,7 +71,7 @@
 import org.apache.asterix.replication.functions.ReplicationProtocol;
 import 
org.apache.asterix.replication.functions.ReplicationProtocol.ReplicationRequestType;
 import org.apache.asterix.replication.logging.ReplicationLogBuffer;
-import org.apache.asterix.replication.logging.ReplicationLogFlusher;
+import org.apache.asterix.replication.logging.TxnLogReplicator;
 import org.apache.asterix.replication.storage.LSMComponentProperties;
 import org.apache.asterix.replication.storage.LSMIndexFileProperties;
 import org.apache.asterix.replication.storage.ReplicaResourcesManager;
@@ -86,6 +86,8 @@
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexReplicationJob;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
+import org.apache.hyracks.util.StorageUtil;
+import org.apache.hyracks.util.StorageUtil.StorageUnit;
 
 /**
  * This class is used to process replication jobs and maintain remote replicas 
states
@@ -93,7 +95,7 @@
 public class ReplicationManager implements IReplicationManager {
 
     private static final Logger LOGGER = 
Logger.getLogger(ReplicationManager.class.getName());
-    private final int INITIAL_REPLICATION_FACTOR = 1;
+    private static final int INITIAL_REPLICATION_FACTOR = 1;
     private final String nodeId;
     private ExecutorService replicationListenerThreads;
     private final Map<Integer, Set<String>> jobCommitAcks;
@@ -114,7 +116,7 @@
     private final AtomicBoolean replicationSuspended;
     private AtomicBoolean terminateJobsReplication;
     private AtomicBoolean jobsReplicationSuspended;
-    private final static int INITIAL_BUFFER_SIZE = 4000; //4KB
+    private static final int INITIAL_BUFFER_SIZE = 
StorageUtil.getSizeInBytes(4, StorageUnit.KILOBYTE);
     private final Set<String> shuttingDownReplicaIds;
     //replication threads
     private ReplicationJobsProccessor replicationJobsProcessor;
@@ -128,9 +130,10 @@
     private LinkedBlockingQueue<ReplicationLogBuffer> emptyLogBuffersQ;
     private LinkedBlockingQueue<ReplicationLogBuffer> pendingFlushLogBuffersQ;
     protected ReplicationLogBuffer currentTxnLogBuffer;
-    private ReplicationLogFlusher txnlogsReplicator;
+    private TxnLogReplicator txnlogReplicator;
     private Future<? extends Object> txnLogReplicatorTask;
-    private Map<String, SocketChannel> logsReplicaSockets = null;
+    private SocketChannel[] logsRepSockets;
+    private final ByteBuffer txnLogsBatchSizeBuffer = 
ByteBuffer.allocate(Integer.BYTES);
 
     //TODO this class needs to be refactored by moving its private classes to 
separate files
     //and possibly using MessageBroker to send/receive remote replicas events.
@@ -179,13 +182,14 @@
             clientPartitonsSet.addAll(clientPartitions);
             replica2PartitionsMap.put(replica.getId(), clientPartitonsSet);
         }
-        int numLogBuffers = logManager.getNumLogPages();
+        int numLogBuffers = replicationProperties.getLogBufferNumOfPages();
         emptyLogBuffersQ = new 
LinkedBlockingQueue<ReplicationLogBuffer>(numLogBuffers);
         pendingFlushLogBuffersQ = new 
LinkedBlockingQueue<ReplicationLogBuffer>(numLogBuffers);
 
-        int logBufferSize = logManager.getLogPageSize();
+        int logBufferSize = replicationProperties.getLogBufferPageSize();
         for (int i = 0; i < numLogBuffers; i++) {
-            emptyLogBuffersQ.offer(new ReplicationLogBuffer(this, 
logBufferSize));
+            emptyLogBuffersQ
+                    .offer(new ReplicationLogBuffer(this, logBufferSize, 
replicationProperties.getLogBatchSize()));
         }
     }
 
@@ -209,16 +213,12 @@
     }
 
     @Override
-    public void replicateLog(ILogRecord logRecord) {
+    public void replicateLog(ILogRecord logRecord) throws InterruptedException 
{
         if (logRecord.getLogType() == LogType.JOB_COMMIT || 
logRecord.getLogType() == LogType.ABORT) {
             //if replication is suspended, wait until it is resumed.
             while (replicationSuspended.get()) {
                 synchronized (replicationSuspended) {
-                    try {
-                        replicationSuspended.wait();
-                    } catch (InterruptedException e) {
-                        //ignore
-                    }
+                    replicationSuspended.wait();
                 }
             }
             Set<String> replicaIds = Collections.synchronizedSet(new 
HashSet<String>());
@@ -232,29 +232,23 @@
     protected void getAndInitNewLargePage(int pageSize) {
         // for now, alloc a new buffer for each large page
         // TODO: consider pooling large pages
-        currentTxnLogBuffer = new ReplicationLogBuffer(this, pageSize);
-        currentTxnLogBuffer.setReplicationSockets(logsReplicaSockets);
+        currentTxnLogBuffer = new ReplicationLogBuffer(this, pageSize, 
replicationProperties.getLogBufferPageSize());
         pendingFlushLogBuffersQ.offer(currentTxnLogBuffer);
     }
 
-    protected void getAndInitNewPage() {
+    protected void getAndInitNewPage() throws InterruptedException {
         currentTxnLogBuffer = null;
         while (currentTxnLogBuffer == null) {
-            try {
-                currentTxnLogBuffer = emptyLogBuffersQ.take();
-            } catch (InterruptedException e) {
-                //ignore
-            }
+            currentTxnLogBuffer = emptyLogBuffersQ.take();
         }
         currentTxnLogBuffer.reset();
-        currentTxnLogBuffer.setReplicationSockets(logsReplicaSockets);
         pendingFlushLogBuffersQ.offer(currentTxnLogBuffer);
     }
 
-    private synchronized void appendToLogBuffer(ILogRecord logRecord) {
+    private synchronized void appendToLogBuffer(ILogRecord logRecord) throws 
InterruptedException {
         if (!currentTxnLogBuffer.hasSpace(logRecord)) {
             currentTxnLogBuffer.isFull(true);
-            if (logRecord.getLogSize() > logManager.getLogPageSize()) {
+            if (logRecord.getLogSize() > getLogPageSize()) {
                 getAndInitNewLargePage(logRecord.getLogSize());
             } else {
                 getAndInitNewPage();
@@ -319,6 +313,7 @@
                         }
 
                         LOGGER.log(Level.INFO, "Replicating file: " + 
filePath);
+
                         //open file for reading
                         try (RandomAccessFile fromFile = new 
RandomAccessFile(filePath, "r");
                                 FileChannel fileChannel = 
fromFile.getChannel();) {
@@ -331,6 +326,7 @@
                                         .getComponentsToBeReplicated().get(0);
                                 long LSNByteOffset = 
AsterixLSMIndexUtil.getComponentFileLSNOffset(
                                         (AbstractLSMIndex) 
LSMComponentJob.getLSMIndex(), diskComponent, filePath);
+                                //                                
System.out.println("LSN BYTE OFFSET: " + LSNByteOffset);
                                 asterixFileProperties.initialize(filePath, 
fileSize, nodeId, isLSMComponentFile,
                                         LSNByteOffset, remainingFiles == 0);
                             } else {
@@ -458,7 +454,7 @@
     }
 
     /**
-     * Suspends proccessing replication jobs.
+     * Suspends processing replication jobs/logs.
      *
      * @param force
      *            a flag indicates if replication should be suspended right 
away or when the pending jobs are completed.
@@ -484,7 +480,7 @@
         }
 
         //suspend logs replication
-        if (txnlogsReplicator != null) {
+        if (txnlogReplicator != null) {
             terminateTxnLogsReplicator();
         }
     }
@@ -492,45 +488,118 @@
     /**
      * Opens a new connection with Active remote replicas and starts a listen 
thread per connection.
      */
-    private void establishTxnLogsReplicationConnection() {
-        logsReplicaSockets = getActiveRemoteReplicasSockets();
+    private void establishTxnLogsRepHandshake() {
+        Map<String, SocketChannel> activeRemoteReplicasSockets = 
getActiveRemoteReplicasSockets();
+        logsRepSockets = new SocketChannel[activeRemoteReplicasSockets.size()];
+        int i = 0;
         //start a listener thread per connection
-        for (Entry<String, SocketChannel> entry : 
logsReplicaSockets.entrySet()) {
+        for (Entry<String, SocketChannel> entry : 
activeRemoteReplicasSockets.entrySet()) {
+            logsRepSockets[i] = entry.getValue();
             replicationListenerThreads
                     .execute(new 
TxnLogsReplicationResponseListener(entry.getKey(), entry.getValue()));
+            i++;
+        }
+
+        /**
+         * establish log replication handshake
+         */
+        ByteBuffer handshakeBuffer = 
ByteBuffer.allocate(ReplicationProtocol.REPLICATION_REQUEST_TYPE_SIZE)
+                
.putInt(ReplicationProtocol.ReplicationRequestType.REPLICATE_LOG.ordinal());
+        handshakeBuffer.flip();
+        //send handshake request
+        for (SocketChannel replicaSocket : logsRepSockets) {
+            try {
+                NetworkingUtil.transferBufferToChannel(replicaSocket, 
handshakeBuffer);
+            } catch (IOException e) {
+                handleReplicationFailure(replicaSocket, e);
+            } finally {
+                handshakeBuffer.position(0);
+            }
         }
     }
 
+    private void handleReplicationFailure(SocketChannel socketChannel, 
Throwable t) {
+        if (LOGGER.isLoggable(Level.WARNING)) {
+            LOGGER.log(Level.WARNING, "Could not complete replication 
request.", t);
+        }
+        if (socketChannel.isOpen()) {
+            try {
+                socketChannel.close();
+            } catch (IOException e) {
+                if (LOGGER.isLoggable(Level.WARNING)) {
+                    LOGGER.log(Level.WARNING, "Could not close socket.", e);
+                }
+            }
+        }
+        reportFailedReplica(getReplicaIdBySocket(socketChannel));
+    }
+
     /**
-     * Stops ReplicationFlusherThread and closes the sockets used to replicate 
logs.
+     * Stops TxnLogReplicator and closes the sockets used to replicate logs.
      */
     private void terminateTxnLogsReplicator() {
-        LOGGER.log(Level.INFO, "Terminating ReplicationLogFlusher thread ...");
-        txnlogsReplicator.terminate();
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Terminating TxnLogReplicator thread ...");
+        }
+        txnlogReplicator.terminate();
         try {
             txnLogReplicatorTask.get();
         } catch (ExecutionException | InterruptedException e) {
-            LOGGER.log(Level.WARNING, "RepicationLogFlusher thread terminated 
abnormally");
-            e.printStackTrace();
+            if (LOGGER.isLoggable(Level.SEVERE)) {
+                LOGGER.log(Level.SEVERE, "TxnLogReplicator thread terminated 
abnormally", e);
+            }
         }
-        LOGGER.log(Level.INFO, "LogFlusher thread is terminated.");
 
-        if (logsReplicaSockets != null) {
-            //wait for any ACK to arrive before closing sockets.
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("TxnLogReplicator thread was terminated.");
+        }
+
+        /**
+         * End log replication handshake (by sending a dummy log with a single 
byte)
+         */
+        ByteBuffer endLogRepHandshake = ByteBuffer.allocate(Integer.SIZE + 
1).putInt(1).put((byte) 0);
+        endLogRepHandshake.flip();
+        for (SocketChannel replicaSocket : logsRepSockets) {
+            try {
+                NetworkingUtil.transferBufferToChannel(replicaSocket, 
endLogRepHandshake);
+            } catch (IOException e) {
+                handleReplicationFailure(replicaSocket, e);
+            } finally {
+                endLogRepHandshake.position(0);
+            }
+        }
+
+        //wait for any ACK to arrive before closing sockets.
+        if (logsRepSockets != null) {
             synchronized (jobCommitAcks) {
-                while (jobCommitAcks.size() != 0) {
-                    try {
+                try {
+                    while (jobCommitAcks.size() != 0) {
                         jobCommitAcks.wait();
-                    } catch (InterruptedException e) {
-                        //ignore
                     }
+                } catch (InterruptedException e) {
+                    if (LOGGER.isLoggable(Level.SEVERE)) {
+                        LOGGER.log(Level.SEVERE, "Interrupted while waiting 
for jobs ACK", e);
+                    }
+                    Thread.currentThread().interrupt();
                 }
             }
-
-            //close log replication sockets
-            closeReplicaSockets(logsReplicaSockets);
-            logsReplicaSockets = null;
         }
+
+        /**
+         * Close log replication sockets
+         */
+        ByteBuffer goodbyeBuffer = ReplicationProtocol.getGoodbyeBuffer();
+        for (SocketChannel replicaSocket : logsRepSockets) {
+            try {
+                //send batch size
+                NetworkingUtil.transferBufferToChannel(replicaSocket, 
goodbyeBuffer);
+            } catch (IOException e) {
+                handleReplicationFailure(replicaSocket, e);
+            } finally {
+                goodbyeBuffer.position(0);
+            }
+        }
+        logsRepSockets = null;
     }
 
     /**
@@ -651,8 +720,10 @@
      *            The new state of the replica.
      * @param suspendReplication
      *            a flag indicating whether to suspend replication on state 
change or not.
+     * @throws InterruptedException
      */
-    public synchronized void updateReplicaState(String replicaId, ReplicaState 
newState, boolean suspendReplication) {
+    public synchronized void updateReplicaState(String replicaId, ReplicaState 
newState, boolean suspendReplication)
+            throws InterruptedException {
         Replica replica = replicas.get(replicaId);
 
         if (replica.getState() == newState) {
@@ -702,22 +773,24 @@
      *            The remote replica id the ACK received from.
      */
     private void addAckToJob(int jobId, String replicaId) {
-        //add ACK to the job
-        if (jobCommitAcks.containsKey(jobId)) {
-            Set<String> replicaIds = jobCommitAcks.get(jobId);
-            replicaIds.add(replicaId);
-        } else {
-            throw new IllegalStateException("Job ID not found in pending job 
commits  " + jobId);
-        }
+        synchronized (jobCommitAcks) {
+            //add ACK to the job
+            if (jobCommitAcks.containsKey(jobId)) {
+                Set<String> replicaIds = jobCommitAcks.get(jobId);
+                replicaIds.add(replicaId);
+            } else {
+                if (LOGGER.isLoggable(Level.WARNING)) {
+                    LOGGER.warning("Invalid job replication ACK received for 
jobId(" + jobId + ")");
+                }
+                return;
+            }
 
-        //if got ACKs from all remote replicas, notify pending jobs if any
-        if (jobCommitAcks.get(jobId).size() == replicationFactor) {
-            synchronized (replicationJobsPendingAcks) {
-                if (replicationJobsPendingAcks.containsKey(jobId)) {
-                    ILogRecord pendingLog = 
replicationJobsPendingAcks.get(jobId);
-                    synchronized (pendingLog) {
-                        pendingLog.notify();
-                    }
+            //if got ACKs from all remote replicas, notify pending jobs if any
+
+            if (jobCommitAcks.get(jobId).size() == replicationFactor && 
replicationJobsPendingAcks.containsKey(jobId)) {
+                ILogRecord pendingLog = replicationJobsPendingAcks.get(jobId);
+                synchronized (pendingLog) {
+                    pendingLog.notify();
                 }
             }
         }
@@ -725,26 +798,25 @@
 
     @Override
     public boolean hasBeenReplicated(ILogRecord logRecord) {
-        if (jobCommitAcks.containsKey(logRecord.getJobId())) {
-            //check if all ACKs have been received
-            if (jobCommitAcks.get(logRecord.getJobId()).size() == 
replicationFactor) {
-                jobCommitAcks.remove(logRecord.getJobId());
+        int jobId = logRecord.getJobId();
+        if (jobCommitAcks.containsKey(jobId)) {
+            synchronized (jobCommitAcks) {
+                //check if all ACKs have been received
+                if (jobCommitAcks.get(jobId).size() == replicationFactor) {
+                    jobCommitAcks.remove(jobId);
 
-                if 
(replicationJobsPendingAcks.containsKey(logRecord.getJobId())) {
-                    replicationJobsPendingAcks.remove(logRecord);
-                }
+                    //remove from pending jobs if exists
+                    replicationJobsPendingAcks.remove(jobId);
 
-                //notify any threads waiting for all jobs to finish
-                if (jobCommitAcks.size() == 0) {
-                    synchronized (jobCommitAcks) {
+                    //notify any threads waiting for all jobs to finish
+                    if (jobCommitAcks.size() == 0) {
                         jobCommitAcks.notifyAll();
                     }
+                    return true;
+                } else {
+                    replicationJobsPendingAcks.putIfAbsent(jobId, logRecord);
+                    return false;
                 }
-
-                return true;
-            } else {
-                replicationJobsPendingAcks.putIfAbsent(logRecord.getJobId(), 
logRecord);
-                return false;
             }
         }
 
@@ -776,7 +848,7 @@
      * @throws IOException
      */
     private SocketChannel getReplicaSocket(String replicaId) throws 
IOException {
-        Replica replica = replicas.get(replicaId);
+        Replica replica = replicationProperties.getReplicaById(replicaId);
         SocketChannel sc = SocketChannel.open();
         sc.configureBlocking(true);
         InetSocketAddress address = replica.getAddress(replicationProperties);
@@ -854,9 +926,7 @@
 
     @Override
     public void reportReplicaEvent(ReplicaEvent event) {
-        synchronized (replicaEventsQ) {
-            replicaEventsQ.offer(event);
-        }
+        replicaEventsQ.offer(event);
     }
 
     /**
@@ -867,6 +937,9 @@
      */
     public void reportFailedReplica(String replicaId) {
         Replica replica = replicas.get(replicaId);
+        if (replica == null) {
+            return;
+        }
         if (replica.getState() == ReplicaState.DEAD) {
             return;
         }
@@ -878,16 +951,28 @@
         reportReplicaEvent(event);
     }
 
+    private String getReplicaIdBySocket(SocketChannel socketChannel) {
+        InetSocketAddress socketAddress = 
NetworkingUtil.getSocketAddress(socketChannel);
+        for (Replica replica : replicas.values()) {
+            InetSocketAddress replicaAddress = 
replica.getAddress(replicationProperties);
+            if 
(replicaAddress.getHostName().equals(socketAddress.getHostName())
+                    && replicaAddress.getPort() == socketAddress.getPort()) {
+                return replica.getId();
+            }
+        }
+        return null;
+    }
+
     @Override
-    public void startReplicationThreads() {
+    public void startReplicationThreads() throws InterruptedException {
         replicationJobsProcessor = new ReplicationJobsProccessor();
 
         //start/continue processing jobs/logs
-        if (logsReplicaSockets == null) {
-            establishTxnLogsReplicationConnection();
+        if (logsRepSockets == null) {
+            establishTxnLogsRepHandshake();
             getAndInitNewPage();
-            txnlogsReplicator = new ReplicationLogFlusher(emptyLogBuffersQ, 
pendingFlushLogBuffersQ);
-            txnLogReplicatorTask = 
asterixAppRuntimeContextProvider.getThreadExecutor().submit(txnlogsReplicator);
+            txnlogReplicator = new TxnLogReplicator(emptyLogBuffersQ, 
pendingFlushLogBuffersQ);
+            txnLogReplicatorTask = 
asterixAppRuntimeContextProvider.getThreadExecutor().submit(txnlogReplicator);
         }
 
         replicationJobsProcessor.start();
@@ -1037,7 +1122,42 @@
     }
 
     public int getLogPageSize() {
-        return logManager.getLogPageSize();
+        return replicationProperties.getLogBufferPageSize();
+    }
+
+    @Override
+    public void replicateTxnLogBatch(final ByteBuffer buffer) {
+        //if replication is suspended, wait until it is resumed
+        try {
+            while (replicationSuspended.get()) {
+                synchronized (replicationSuspended) {
+                    replicationSuspended.wait();
+                }
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+        //prepare the batch size buffer
+        txnLogsBatchSizeBuffer.clear();
+        txnLogsBatchSizeBuffer.putInt(buffer.remaining());
+        txnLogsBatchSizeBuffer.flip();
+
+        buffer.mark();
+        for (SocketChannel replicaSocket : logsRepSockets) {
+            try {
+                //send batch size
+                NetworkingUtil.transferBufferToChannel(replicaSocket, 
txnLogsBatchSizeBuffer);
+                //send log
+                NetworkingUtil.transferBufferToChannel(replicaSocket, buffer);
+            } catch (IOException e) {
+                handleReplicationFailure(replicaSocket, e);
+            } finally {
+                txnLogsBatchSizeBuffer.position(0);
+                buffer.reset();
+            }
+        }
+        //move the buffer position to the sent limit 
+        buffer.position(buffer.limit());
     }
 
     //supporting classes
@@ -1068,12 +1188,12 @@
                             break;
                     }
                 } catch (InterruptedException e) {
-                    //ignore
+                    Thread.currentThread().interrupt();
                 }
             }
         }
 
-        public void handleReplicaFailure(String replicaId) {
+        public void handleReplicaFailure(String replicaId) throws 
InterruptedException {
             Replica replica = replicas.get(replicaId);
 
             if (replica.getState() == ReplicaState.DEAD) {
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
index 47e60b2..d14a0f7 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
@@ -204,7 +204,7 @@
     }
 
     @Override
-    public void completeFailbackProcess() throws IOException {
+    public void completeFailbackProcess() throws IOException, 
InterruptedException {
         ILogManager logManager = 
runtimeContext.getTransactionSubsystem().getLogManager();
         ReplicaResourcesManager replicaResourcesManager = 
(ReplicaResourcesManager) runtimeContext
                 .getReplicaResourcesManager();
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
index 872adcd..e9d8a8c 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
@@ -113,7 +113,7 @@
 
     @Override
     public void appendWithReplication(ILogRecord logRecord, long appendLSN) {
-        logRecord.writeLogRecord(appendBuffer, appendLSN);
+        logRecord.writeLogRecord(appendBuffer);
 
         if (logRecord.getLogSource() == LogSource.LOCAL && 
logRecord.getLogType() != LogType.FLUSH
                 && logRecord.getLogType() != LogType.WAIT) {
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
index cacd036..8e615af 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
@@ -37,10 +37,6 @@
 
     @Override
     public void log(ILogRecord logRecord) throws ACIDException {
-        if (logRecord.getLogSize() > logPageSize) {
-            throw new IllegalStateException();
-        }
-
         //only locally generated logs should be replicated
         logRecord.setReplicated(logRecord.getLogSource() == LogSource.LOCAL && 
logRecord.getLogType() != LogType.WAIT);
 
@@ -58,7 +54,11 @@
         syncAppendToLogTail(logRecord);
 
         if (logRecord.isReplicated()) {
-            replicationManager.replicateLog(logRecord);
+            try {
+                replicationManager.replicateLog(logRecord);
+            } catch (InterruptedException e) {
+                throw new ACIDException(e);
+            }
         }
 
         if (logRecord.getLogSource() == LogSource.LOCAL) {
@@ -69,7 +69,7 @@
                         try {
                             logRecord.wait();
                         } catch (InterruptedException e) {
-                            //ignore
+                            Thread.currentThread().interrupt();
                         }
                     }
 
@@ -79,7 +79,7 @@
                             try {
                                 logRecord.wait();
                             } catch (InterruptedException e) {
-                                //ignore
+                                Thread.currentThread().interrupt();
                             }
                         }
                     }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/883
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I25b5b84eba0cd41ac8e87e71368072879fcf8582
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <[email protected]>

Reply via email to