Murtadha Hubail has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/883
Change subject: Txn Log Replication Optimizations
......................................................................
Txn Log Replication Optimizations
- Add configurations for txn log replication.
- Establish handshake for continuous log replication.
- Replicate logs in batches instead of one by one.
- Eliminate HashMap iterator object creation per logs batch.
- Fix synchronization for jobs' ACKs from remote replicas.
- Bug fixes for big objects replication.
Change-Id: I25b5b84eba0cd41ac8e87e71368072879fcf8582
---
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
M asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
M
asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java
R
asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/TxnLogReplicator.java
M
asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java
M
asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
M
asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
M
asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
M
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
M
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
16 files changed, 437 insertions(+), 338 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/83/883/1
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
index 0a6d62d..c71d77e 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
@@ -150,7 +150,7 @@
}
}
- private void startReplicationService() {
+ private void startReplicationService() throws InterruptedException {
//Open replication channel
runtimeContext.getReplicationChannel().start();
diff --git
a/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml
b/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml
index b1d0649..1d693f9 100644
--- a/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml
+++ b/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml
@@ -98,10 +98,4 @@
<description>Enabling plot of Algebricks plan to tmp folder. (Default =
false)
</description>
</property>
- <property>
- <name>log.level</name>
- <value>WARNING</value>
- <description>The minimum log level to be displayed. (Default = INFO)
- </description>
- </property>
</asterixConfiguration>
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
index 5d31d9a..90ec60b 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
@@ -26,6 +26,8 @@
import org.apache.asterix.common.replication.Replica;
import org.apache.asterix.event.schema.cluster.Cluster;
import org.apache.asterix.event.schema.cluster.Node;
+import org.apache.hyracks.util.StorageUtil;
+import org.apache.hyracks.util.StorageUtil.StorageUnit;
public class AsterixReplicationProperties extends AbstractAsterixProperties {
@@ -38,6 +40,16 @@
private static final String NODE_IP_ADDRESS_DEFAULT = "127.0.0.1";
private final String NODE_NAME_PREFIX;
private final Cluster cluster;
+
+ private static final String REPLICATION_LOG_BATCH_SIZE_KEY =
"replication.log.batchsize";
+ private static final int REPLICATION_LOG_BATCH_SIZE_DEFAULT =
StorageUtil.getSizeInBytes(4, StorageUnit.KILOBYTE);
+
+ private static final String REPLICATION_LOG_BUFFER_NUM_PAGES_KEY =
"replication.log.buffer.numpages";
+ private static int REPLICATION_LOG_BUFFER_NUM_PAGES_DEFAULT = 8;
+
+ private static final String REPLICATION_LOG_BUFFER_PAGE_SIZE_KEY =
"replication.log.buffer.pagesize";
+ private static final int REPLICATION_LOG_BUFFER_PAGE_SIZE_DEFAULT =
StorageUtil.getSizeInBytes(128,
+ StorageUnit.KILOBYTE);
public AsterixReplicationProperties(AsterixPropertiesAccessor accessor,
Cluster cluster) {
super(accessor);
@@ -245,4 +257,19 @@
public int getMaxRemoteRecoveryAttempts() {
return MAX_REMOTE_RECOVERY_ATTEMPTS;
}
+
+ public int getLogBufferPageSize() {
+ return accessor.getProperty(REPLICATION_LOG_BUFFER_PAGE_SIZE_KEY,
REPLICATION_LOG_BUFFER_PAGE_SIZE_DEFAULT,
+ PropertyInterpreters.getIntegerPropertyInterpreter());
+ }
+
+ public int getLogBufferNumOfPages() {
+ return accessor.getProperty(REPLICATION_LOG_BUFFER_NUM_PAGES_KEY,
REPLICATION_LOG_BUFFER_NUM_PAGES_DEFAULT,
+ PropertyInterpreters.getIntegerPropertyInterpreter());
+ }
+
+ public int getLogBatchSize() {
+ return accessor.getProperty(REPLICATION_LOG_BATCH_SIZE_KEY,
REPLICATION_LOG_BATCH_SIZE_DEFAULT,
+ PropertyInterpreters.getIntegerPropertyInterpreter());
+ }
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
index a2738e8..9f9d74b 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
@@ -43,6 +43,7 @@
* Requests the remaining LSM disk components files from active remote
replicas.
*
* @throws IOException
+ * @throws InterruptedException
*/
- public void completeFailbackProcess() throws IOException;
+ public void completeFailbackProcess() throws IOException,
InterruptedException;
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
index 755fbbd..6bd1505 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
@@ -19,6 +19,7 @@
package org.apache.asterix.common.replication;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Set;
import org.apache.asterix.common.transactions.ILogRecord;
@@ -31,8 +32,9 @@
*
* @param logRecord
* The log record to be replicated,
+ * @throws InterruptedException
*/
- public void replicateLog(ILogRecord logRecord);
+ public void replicateLog(ILogRecord logRecord) throws InterruptedException;
/**
* Checks whether a log record has been replicated
@@ -79,8 +81,10 @@
/**
* Starts processing of ASYNC replication jobs as well as Txn logs.
+ *
+ * @throws InterruptedException
*/
- public void startReplicationThreads();
+ public void startReplicationThreads() throws InterruptedException;
/**
* Checks and sets each remote replica state.
@@ -114,4 +118,13 @@
* @throws IOException
*/
public void requestFlushLaggingReplicaIndexes(long
nonSharpCheckpointTargetLSN) throws IOException;
+
+ /**
+ * Transfers the contents of the {@code buffer} to active remote replicas.
+ * The transfer starts from the {@code buffer} current position to its
limit.
+ * After the transfer, the {@code buffer} position will be its limit.
+ *
+ * @param buffer
+ */
+ public void replicateTxnLogBatch(ByteBuffer buffer);
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java
index 4c3f728..b8fe4b2 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java
@@ -62,8 +62,7 @@
public InetSocketAddress getAddress(AsterixReplicationProperties
asterixReplicationProperties) {
String replicaIPAddress = node.getClusterIp();
int replicationPort =
asterixReplicationProperties.getDataReplicationPort(node.getId());
- InetSocketAddress replicaAddress =
InetSocketAddress.createUnresolved(replicaIPAddress, replicationPort);
- return replicaAddress;
+ return InetSocketAddress.createUnresolved(replicaIPAddress,
replicationPort);
}
public static Replica create(DataInput input) throws IOException {
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
index 3738cd1..1992a00 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
@@ -110,9 +110,7 @@
public String getNodeId();
- public int writeRemoteRecoveryLog(ByteBuffer buffer);
-
- public RecordReadStatus readRemoteLog(ByteBuffer buffer, boolean
remoteRecoveryLog);
+ public void readRemoteLog(ByteBuffer buffer);
public void setReplicationThread(IReplicationThread replicationThread);
@@ -120,11 +118,7 @@
public byte getLogSource();
- public int getSerializedLogSize();
-
- public void writeLogRecord(ByteBuffer buffer, long appendLSN);
-
- public ByteBuffer getSerializedLog();
+ public int getRemoteLogSize();
public void setNodeId(String nodeId);
@@ -138,4 +132,6 @@
* @return a flag indicating whether the log record should be sent to
remote replicas
*/
public boolean isReplicated();
+
+ public void writeRemoteLogRecord(ByteBuffer buffer);
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
index fd56913..88698e3 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
@@ -44,8 +44,7 @@
* PKValueSize(4)
* PKValue(PKValueSize)
* ---------------------------
- * [Header3] (20 bytes) : only for update log type
- * PrevLSN(8)
+ * [Header3] (12 bytes) : only for update log type
* ResourceId(8) //stored in .metadata of the corresponding index in NC node
* LogRecordSize(4)
* ---------------------------
@@ -71,6 +70,24 @@
*/
public class LogRecord implements ILogRecord {
+
+ private static final int LOG_SOURCE_LEN = Byte.BYTES;
+ private static final int TYPE_LEN = Byte.BYTES;
+ public static final int PKHASH_LEN = Integer.BYTES;
+ public static final int PKSZ_LEN = Integer.BYTES;
+ private static final int RS_PARTITION_LEN = Integer.BYTES;
+ private static final int RSID_LEN = Long.BYTES;
+ private static final int LOGRCD_SZ_LEN = Integer.BYTES;
+ private static final int FLDCNT_LEN = Integer.BYTES;
+ private static final int NEWOP_LEN = Byte.BYTES;
+ private static final int NEWVALSZ_LEN = Integer.BYTES;
+ private static final int CHKSUM_LEN = Long.BYTES;
+
+ private static final int ALL_RECORD_HEADER_LEN = LOG_SOURCE_LEN + TYPE_LEN
+ JobId.BYTES;
+ private static final int ENTITYCOMMIT_UPDATE_HEADER_LEN = RS_PARTITION_LEN
+ DatasetId.BYTES + PKHASH_LEN
+ + PKSZ_LEN;
+ private static final int UPDATE_LSN_HEADER = RSID_LEN + LOGRCD_SZ_LEN;
+ private static final int UPDATE_BODY_HEADER = FLDCNT_LEN + NEWOP_LEN +
NEWVALSZ_LEN;
// ------------- fields in a log record (begin) ------------//
private byte logSource;
@@ -101,9 +118,8 @@
private int[] PKFields;
private PrimaryIndexOperationTracker opTracker;
private IReplicationThread replicationThread;
- private ByteBuffer serializedLog;
/**
- * The fields (numOfFlushedIndexes and nodeId) are used for serialized
flush logs only
+ * The fields (numOfFlushedIndexes and nodeId) are used for remote flush
logs only
* to indicate the source of the log and how many indexes were flushed
using its LSN.
*/
private int numOfFlushedIndexes;
@@ -118,25 +134,6 @@
checksumGen = new CRC32();
logSource = LogSource.LOCAL;
}
-
- private final static int LOG_SOURCE_LEN = Byte.BYTES;
- private final static int TYPE_LEN = Byte.BYTES;
- public final static int PKHASH_LEN = Integer.BYTES;
- public final static int PKSZ_LEN = Integer.BYTES;
- private final static int RS_PARTITION_LEN = Integer.BYTES;
- private final static int RSID_LEN = Long.BYTES;
- private final static int LOGRCD_SZ_LEN = Integer.BYTES;
- private final static int FLDCNT_LEN = Integer.BYTES;
- private final static int NEWOP_LEN = Byte.BYTES;
- private final static int NEWVALSZ_LEN = Integer.BYTES;
- private final static int CHKSUM_LEN = Long.BYTES;
-
- private final static int ALL_RECORD_HEADER_LEN = LOG_SOURCE_LEN + TYPE_LEN
+ JobId.BYTES;
- private final static int ENTITYCOMMIT_UPDATE_HEADER_LEN = RS_PARTITION_LEN
+ DatasetId.BYTES + PKHASH_LEN
- + PKSZ_LEN;
- private final static int UPDATE_LSN_HEADER = RSID_LEN + LOGRCD_SZ_LEN;
- private final static int UPDATE_BODY_HEADER = FLDCNT_LEN + NEWOP_LEN +
NEWVALSZ_LEN;
- private final static int REMOTE_FLUSH_LOG_EXTRA_FIELDS_LEN = Long.BYTES +
Integer.BYTES + Integer.BYTES;
private void writeLogRecordCommonFields(ByteBuffer buffer) {
buffer.put(logSource);
@@ -173,39 +170,15 @@
buffer.putLong(checksum);
}
- // this method is used when replication is enabled to include the log
record LSN in the serialized version
@Override
- public void writeLogRecord(ByteBuffer buffer, long appendLSN) {
- int beginOffset = buffer.position();
+ public void writeRemoteLogRecord(ByteBuffer buffer) {
writeLogRecordCommonFields(buffer);
-
- if (replicated) {
- //copy the serialized log to send it to replicas
- int serializedLogSize = getSerializedLogSize();
- if (serializedLog == null || serializedLog.capacity() <
serializedLogSize) {
- serializedLog = ByteBuffer.allocate(serializedLogSize);
- } else {
- serializedLog.clear();
- }
-
- int currentPosition = buffer.position();
- int currentLogSize = (currentPosition - beginOffset);
-
- buffer.position(beginOffset);
- buffer.get(serializedLog.array(), 0, currentLogSize);
- serializedLog.position(currentLogSize);
- if (logType == LogType.FLUSH) {
- serializedLog.putLong(appendLSN);
- serializedLog.putInt(numOfFlushedIndexes);
- serializedLog.putInt(nodeId.length());
- serializedLog.put(nodeId.getBytes());
- }
- serializedLog.flip();
- buffer.position(currentPosition);
+ if (logType == LogType.FLUSH) {
+ buffer.putLong(LSN);
+ buffer.putInt(numOfFlushedIndexes);
+ buffer.putInt(nodeId.length());
+ buffer.put(nodeId.getBytes());
}
-
- checksum = generateChecksum(buffer, beginOffset, logSize - CHKSUM_LEN);
- buffer.putLong(checksum);
}
private void writePKValue(ByteBuffer buffer) {
@@ -221,8 +194,12 @@
}
private void writeTuple(ByteBuffer buffer, ITupleReference tuple, int
size) {
- tupleWriter.writeTuple(tuple, buffer.array(), buffer.position());
- // writeTuple() doesn't change the position of the buffer.
+ if (logSource == LogSource.LOCAL) {
+ tupleWriter.writeTuple(tuple, buffer.array(), buffer.position());
+ } else {
+ //since the tuple is already serialized in remote logs, just copy
it from beginning to end.
+ System.arraycopy(tuple.getFieldData(0), 0, buffer.array(),
buffer.position(), size);
+ }
buffer.position(buffer.position() + size);
}
@@ -323,47 +300,19 @@
}
@Override
- public RecordReadStatus readRemoteLog(ByteBuffer buffer, boolean
remoteRecoveryLog) {
- int beginOffset = buffer.position();
-
+ public void readRemoteLog(ByteBuffer buffer) {
//read common fields
- RecordReadStatus status = readLogCommonFields(buffer);
- if (status != RecordReadStatus.OK) {
- buffer.position(beginOffset);
- return status;
- }
+ readLogCommonFields(buffer);
if (logType == LogType.FLUSH) {
- if (buffer.remaining() >= REMOTE_FLUSH_LOG_EXTRA_FIELDS_LEN) {
- LSN = buffer.getLong();
- numOfFlushedIndexes = buffer.getInt();
- //read serialized node id
- int nodeIdLength = buffer.getInt();
- if (buffer.remaining() >= nodeIdLength) {
- byte[] nodeIdBytes = new byte[nodeIdLength];
- buffer.get(nodeIdBytes);
- nodeId = new String(nodeIdBytes);
- } else {
- buffer.position(beginOffset);
- return RecordReadStatus.TRUNCATED;
- }
- } else {
- buffer.position(beginOffset);
- return RecordReadStatus.TRUNCATED;
- }
+ LSN = buffer.getLong();
+ numOfFlushedIndexes = buffer.getInt();
+ //read serialized node id
+ int nodeIdLength = buffer.getInt();
+ byte[] nodeIdBytes = new byte[nodeIdLength];
+ buffer.get(nodeIdBytes);
+ nodeId = new String(nodeIdBytes);
}
-
- //remote recovery logs need to have the LSN to check which should be
replayed
- if (remoteRecoveryLog) {
- if (buffer.remaining() >= Long.BYTES) {
- LSN = buffer.getLong();
- } else {
- buffer.position(beginOffset);
- return RecordReadStatus.TRUNCATED;
- }
- }
-
- return RecordReadStatus.OK;
}
private ITupleReference readPKValue(ByteBuffer buffer) {
@@ -443,16 +392,6 @@
builder.append(" ResourceId : ").append(resourceId);
}
return builder.toString();
- }
-
- @Override
- public int writeRemoteRecoveryLog(ByteBuffer buffer) {
- int bufferBegin = buffer.position();
- writeLogRecordCommonFields(buffer);
- //FLUSH logs should not included in remote recovery
- //LSN must be included in all remote recovery logs
- buffer.putLong(LSN);
- return buffer.position() - bufferBegin;
}
////////////////////////////////////////////
@@ -535,18 +474,18 @@
}
@Override
- public int getSerializedLogSize() {
- int serilizedSize = logSize;
+ public int getRemoteLogSize() {
+ int remoteLogSize = logSize;
if (logType == LogType.FLUSH) {
//LSN
- serilizedSize += Long.BYTES;
+ remoteLogSize += Long.BYTES;
//num of indexes
- serilizedSize += Integer.BYTES;
+ remoteLogSize += Integer.BYTES;
//serialized node id String
- serilizedSize += Integer.BYTES + nodeId.length();
+ remoteLogSize += Integer.BYTES + nodeId.length();
}
- serilizedSize -= CHKSUM_LEN;
- return serilizedSize;
+ remoteLogSize -= CHKSUM_LEN;
+ return remoteLogSize;
}
@Override
@@ -628,15 +567,6 @@
public PrimaryIndexOperationTracker getOpTracker() {
return opTracker;
- }
-
- @Override
- public ByteBuffer getSerializedLog() {
- return serializedLog;
- }
-
- public void setSerializedLog(ByteBuffer serializedLog) {
- this.serializedLog = serializedLog;
}
@Override
diff --git
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java
index 588968c..6afba79 100644
---
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java
+++
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java
@@ -18,53 +18,42 @@
*/
package org.apache.asterix.replication.logging;
-import java.io.IOException;
import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.asterix.common.transactions.ILogRecord;
-import org.apache.asterix.replication.functions.ReplicationProtocol;
-import org.apache.asterix.replication.management.NetworkingUtil;
import org.apache.asterix.replication.management.ReplicationManager;
public class ReplicationLogBuffer {
private final int logBufferSize;
private final AtomicBoolean full;
private int appendOffset;
- private int flushOffset;
+ private int replicationOffset;
private final ByteBuffer appendBuffer;
- private final ByteBuffer flushBuffer;
+ private final ByteBuffer replicationBuffer;
private boolean stop;
- private Map<String, SocketChannel> replicaSockets;
private ReplicationManager replicationManager;
+ private final int batchSize;
- public ReplicationLogBuffer(ReplicationManager replicationManager, int
logBufferSize) {
+ public ReplicationLogBuffer(ReplicationManager replicationManager, int
logBufferSize, int batchSize) {
this.replicationManager = replicationManager;
this.logBufferSize = logBufferSize;
+ this.batchSize = batchSize;
appendBuffer = ByteBuffer.allocate(logBufferSize);
- flushBuffer = appendBuffer.duplicate();
+ replicationBuffer = appendBuffer.duplicate();
full = new AtomicBoolean(false);
appendOffset = 0;
- flushOffset = 0;
+ replicationOffset = 0;
}
public void append(ILogRecord logRecord) {
-
appendBuffer.putInt(ReplicationProtocol.ReplicationRequestType.REPLICATE_LOG.ordinal());
- appendBuffer.putInt(logRecord.getSerializedLogSize());
- appendBuffer.put(logRecord.getSerializedLog());
+ appendBuffer.putInt(logRecord.getRemoteLogSize());
+ logRecord.writeRemoteLogRecord(appendBuffer);
synchronized (this) {
appendOffset += getLogReplicationSize(logRecord);
this.notify();
}
- }
-
- public void setReplicationSockets(Map<String, SocketChannel>
replicaSockets) {
- this.replicaSockets = replicaSockets;
}
public synchronized void isFull(boolean full) {
@@ -77,18 +66,18 @@
}
private static int getLogReplicationSize(ILogRecord logRecord) {
- //request type + request length + serialized log length
- return Integer.BYTES + Integer.BYTES +
logRecord.getSerializedLogSize();
+ //log length (4 bytes) + remote log size
+ return Integer.BYTES + logRecord.getRemoteLogSize();
}
public void reset() {
appendBuffer.position(0);
appendBuffer.limit(logBufferSize);
- flushBuffer.position(0);
- flushBuffer.limit(logBufferSize);
+ replicationBuffer.position(0);
+ replicationBuffer.limit(logBufferSize);
full.set(false);
appendOffset = 0;
- flushOffset = 0;
+ replicationOffset = 0;
stop = false;
}
@@ -96,7 +85,7 @@
int endOffset;
while (!full.get()) {
synchronized (this) {
- if (appendOffset - flushOffset == 0 && !full.get()) {
+ if (appendOffset - replicationOffset == 0 && !full.get()) {
try {
if (stop) {
break;
@@ -108,45 +97,51 @@
}
endOffset = appendOffset;
}
- internalFlush(flushOffset, endOffset);
+ internalFlush(replicationOffset, endOffset);
}
-
- internalFlush(flushOffset, appendOffset);
+ internalFlush(replicationOffset, appendOffset);
}
private void internalFlush(int beginOffset, int endOffset) {
if (endOffset > beginOffset) {
- int begingPos = flushBuffer.position();
- flushBuffer.limit(endOffset);
- sendRequest(replicaSockets, flushBuffer);
- flushBuffer.position(begingPos + (endOffset - beginOffset));
- flushOffset = endOffset;
+ int begingPos = replicationBuffer.position();
+ replicationBuffer.limit(endOffset);
+ transferBuffer(replicationBuffer);
+ replicationBuffer.position(begingPos + (endOffset - beginOffset));
+ replicationOffset = endOffset;
}
}
- private void sendRequest(Map<String, SocketChannel> replicaSockets,
ByteBuffer requestBuffer) {
- Iterator<Map.Entry<String, SocketChannel>> iterator =
replicaSockets.entrySet().iterator();
- int begin = requestBuffer.position();
- while (iterator.hasNext()) {
- Entry<String, SocketChannel> replicaSocket = iterator.next();
- SocketChannel clientSocket = replicaSocket.getValue();
- try {
- NetworkingUtil.transferBufferToChannel(clientSocket,
requestBuffer);
- } catch (IOException e) {
- if (clientSocket.isOpen()) {
- try {
- clientSocket.close();
- } catch (IOException e2) {
- e2.printStackTrace();
+ private void transferBuffer(ByteBuffer buffer) {
+ if (buffer.remaining() <= batchSize) {
+ replicationManager.replicateTxnLogBatch(buffer);
+ } else {
+ int totalTransferLimit = buffer.limit();
+ while (buffer.hasRemaining()) {
+ if (buffer.remaining() > batchSize) {
+ /**
+ * break into smaller batches
+ */
+ //mark the beginning of this batch
+ buffer.mark();
+ int currentBatchSize = 0;
+ while (currentBatchSize < batchSize) {
+ int logSize = replicationBuffer.getInt();
+ //add the size of the log record itself + 4 bytes for
its size
+ currentBatchSize += logSize + Integer.BYTES;
+ //go to the beginning of the next log
+ buffer.position(buffer.position() + logSize);
}
+ //set the limit to the end of this batch
+ buffer.limit(buffer.position());
+ //return to the beginning of the batch position
+ buffer.reset();
}
- replicationManager.reportFailedReplica(replicaSocket.getKey());
- iterator.remove();
- } finally {
- requestBuffer.position(begin);
+ replicationManager.replicateTxnLogBatch(buffer);
+ //return the original limit to check the new remaining size
+ buffer.limit(totalTransferLimit);
}
}
-
}
public boolean isStop() {
diff --git
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogFlusher.java
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/TxnLogReplicator.java
similarity index 83%
rename from
asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogFlusher.java
rename to
asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/TxnLogReplicator.java
index 3312cb1..d9d60d6 100644
---
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogFlusher.java
+++
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/TxnLogReplicator.java
@@ -21,24 +21,22 @@
import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Level;
import java.util.logging.Logger;
-
-import org.apache.asterix.common.transactions.LogRecord;
/**
* This class is responsible for sending transactions logs to remote replicas.
*/
-public class ReplicationLogFlusher implements Callable<Boolean> {
- private final static Logger LOGGER =
Logger.getLogger(ReplicationLogFlusher.class.getName());
- private final static ReplicationLogBuffer POISON_PILL = new
ReplicationLogBuffer(null,
- LogRecord.JOB_TERMINATE_LOG_SIZE);
+public class TxnLogReplicator implements Callable<Boolean> {
+ private static final Logger LOGGER =
Logger.getLogger(TxnLogReplicator.class.getName());
+ private static final ReplicationLogBuffer POISON_PILL = new
ReplicationLogBuffer(null, 0, 0);
private final LinkedBlockingQueue<ReplicationLogBuffer> emptyQ;
private final LinkedBlockingQueue<ReplicationLogBuffer> flushQ;
private ReplicationLogBuffer flushPage;
private final AtomicBoolean isStarted;
private final AtomicBoolean terminateFlag;
- public ReplicationLogFlusher(LinkedBlockingQueue<ReplicationLogBuffer>
emptyQ,
+ public TxnLogReplicator(LinkedBlockingQueue<ReplicationLogBuffer> emptyQ,
LinkedBlockingQueue<ReplicationLogBuffer> flushQ) {
this.emptyQ = emptyQ;
this.flushQ = flushQ;
@@ -74,7 +72,7 @@
@Override
public Boolean call() {
- Thread.currentThread().setName("Replication Log Flusher");
+ Thread.currentThread().setName("TxnLog Replicator");
synchronized (isStarted) {
isStarted.set(true);
isStarted.notify();
@@ -91,6 +89,7 @@
if (flushPage == null) {
continue;
}
+ Thread.currentThread().interrupt();
}
flushPage.flush();
// TODO: pool large pages
@@ -99,8 +98,9 @@
}
}
} catch (Exception e) {
- e.printStackTrace();
- LOGGER.severe("ReplicationLogFlusher is terminating abnormally.
Logs Replication Stopped.");
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.log(Level.SEVERE, "TxnLogReplicator is terminating
abnormally. Logs Replication Stopped.", e);
+ }
throw e;
}
}
diff --git
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java
index 6023cb1..bc7451f 100644
---
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java
+++
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java
@@ -21,6 +21,7 @@
import java.io.EOFException;
import java.io.IOException;
import java.net.InetAddress;
+import java.net.InetSocketAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.nio.ByteBuffer;
@@ -88,7 +89,8 @@
return hostName;
}
- public static void transferBufferToChannel(SocketChannel socketChannel,
ByteBuffer requestBuffer) throws IOException {
+ public static void transferBufferToChannel(SocketChannel socketChannel,
ByteBuffer requestBuffer)
+ throws IOException {
while (requestBuffer.hasRemaining()) {
socketChannel.write(requestBuffer);
}
@@ -107,4 +109,10 @@
long fileSize = fileChannel.size();
fileChannel.transferFrom(socketChannel, pos, fileSize);
}
+
+ public static InetSocketAddress getSocketAddress(SocketChannel
socketChannel) {
+ String hostAddress =
socketChannel.socket().getInetAddress().getHostAddress();
+ int port = socketChannel.socket().getPort();
+ return InetSocketAddress.createUnresolved(hostAddress, port);
+ }
}
diff --git
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
index a152f6c..4eeaede 100644
---
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
+++
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
@@ -414,46 +414,62 @@
}
private void handleLogReplication() throws IOException, ACIDException {
- inBuffer = ReplicationProtocol.readRequest(socketChannel,
inBuffer);
+ //set initial buffer size to a log buffer page size
+ inBuffer = ByteBuffer.allocate(logManager.getLogPageSize());
+ while (true) {
+ //read a batch of logs
+ inBuffer = ReplicationProtocol.readRequest(socketChannel,
inBuffer);
+ //check if it is end of handshake (a single byte log)
+ if (inBuffer.remaining() == 1) {
+ break;
+ }
- //Deserialize log
- remoteLog.readRemoteLog(inBuffer, false);
- remoteLog.setLogSource(LogSource.REMOTE);
+ /**
+ * Process logs batch
+ */
+ while (inBuffer.hasRemaining()) {
+ //get rid of log size
+ inBuffer.getInt();
+ //Deserialize log
+ remoteLog.readRemoteLog(inBuffer);
+ remoteLog.setLogSource(LogSource.REMOTE);
- switch (remoteLog.getLogType()) {
- case LogType.UPDATE:
- case LogType.ENTITY_COMMIT:
- case LogType.UPSERT_ENTITY_COMMIT:
- //if the log partition belongs to a partitions hosted on
this node, replicate it
- if
(nodeHostedPartitions.contains(remoteLog.getResourcePartition())) {
- logManager.log(remoteLog);
+ switch (remoteLog.getLogType()) {
+ case LogType.UPDATE:
+ case LogType.ENTITY_COMMIT:
+ case LogType.UPSERT_ENTITY_COMMIT:
+ //if the log partition belongs to a partitions
hosted on this node, replicate it
+ if
(nodeHostedPartitions.contains(remoteLog.getResourcePartition())) {
+ logManager.log(remoteLog);
+ }
+ break;
+ case LogType.JOB_COMMIT:
+ case LogType.ABORT:
+ LogRecord jobTerminationLog = new LogRecord();
+
TransactionUtil.formJobTerminateLogRecord(jobTerminationLog,
remoteLog.getJobId(),
+ remoteLog.getLogType() ==
LogType.JOB_COMMIT);
+ jobTerminationLog.setReplicationThread(this);
+ jobTerminationLog.setLogSource(LogSource.REMOTE);
+ logManager.log(jobTerminationLog);
+ break;
+ case LogType.FLUSH:
+ //store mapping information for flush logs to use
them in incoming LSM components.
+ RemoteLogMapping flushLogMap = new
RemoteLogMapping();
+ flushLogMap.setRemoteNodeID(remoteLog.getNodeId());
+ flushLogMap.setRemoteLSN(remoteLog.getLSN());
+ logManager.log(remoteLog);
+ //the log LSN value is updated by
logManager.log(.) to a local value
+ flushLogMap.setLocalLSN(remoteLog.getLSN());
+
flushLogMap.numOfFlushedIndexes.set(remoteLog.getNumOfFlushedIndexes());
+
replicaUniqueLSN2RemoteMapping.put(flushLogMap.getNodeUniqueLSN(), flushLogMap);
+ synchronized (flushLogslock) {
+ flushLogslock.notify();
+ }
+ break;
+ default:
+ LOGGER.severe("Unsupported LogType: " +
remoteLog.getLogType());
}
- break;
- case LogType.JOB_COMMIT:
- case LogType.ABORT:
- LogRecord jobTerminationLog = new LogRecord();
-
TransactionUtil.formJobTerminateLogRecord(jobTerminationLog,
remoteLog.getJobId(),
- remoteLog.getLogType() == LogType.JOB_COMMIT);
- jobTerminationLog.setReplicationThread(this);
- jobTerminationLog.setLogSource(LogSource.REMOTE);
- logManager.log(jobTerminationLog);
- break;
- case LogType.FLUSH:
- //store mapping information for flush logs to use them in
incoming LSM components.
- RemoteLogMapping flushLogMap = new RemoteLogMapping();
- flushLogMap.setRemoteNodeID(remoteLog.getNodeId());
- flushLogMap.setRemoteLSN(remoteLog.getLSN());
- logManager.log(remoteLog);
- //the log LSN value is updated by logManager.log(.) to a
local value
- flushLogMap.setLocalLSN(remoteLog.getLSN());
-
flushLogMap.numOfFlushedIndexes.set(remoteLog.getNumOfFlushedIndexes());
-
replicaUniqueLSN2RemoteMapping.put(flushLogMap.getNodeUniqueLSN(), flushLogMap);
- synchronized (flushLogslock) {
- flushLogslock.notify();
- }
- break;
- default:
- LOGGER.severe("Unsupported LogType: " +
remoteLog.getLogType());
+ }
}
}
@@ -594,4 +610,4 @@
}
}
}
-}
\ No newline at end of file
+}
diff --git
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
index ee872a5..fc1956d 100644
---
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
+++
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
@@ -71,7 +71,7 @@
import org.apache.asterix.replication.functions.ReplicationProtocol;
import
org.apache.asterix.replication.functions.ReplicationProtocol.ReplicationRequestType;
import org.apache.asterix.replication.logging.ReplicationLogBuffer;
-import org.apache.asterix.replication.logging.ReplicationLogFlusher;
+import org.apache.asterix.replication.logging.TxnLogReplicator;
import org.apache.asterix.replication.storage.LSMComponentProperties;
import org.apache.asterix.replication.storage.LSMIndexFileProperties;
import org.apache.asterix.replication.storage.ReplicaResourcesManager;
@@ -86,6 +86,8 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexReplicationJob;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
+import org.apache.hyracks.util.StorageUtil;
+import org.apache.hyracks.util.StorageUtil.StorageUnit;
/**
* This class is used to process replication jobs and maintain remote replicas
states
@@ -93,7 +95,7 @@
public class ReplicationManager implements IReplicationManager {
private static final Logger LOGGER =
Logger.getLogger(ReplicationManager.class.getName());
- private final int INITIAL_REPLICATION_FACTOR = 1;
+ private static final int INITIAL_REPLICATION_FACTOR = 1;
private final String nodeId;
private ExecutorService replicationListenerThreads;
private final Map<Integer, Set<String>> jobCommitAcks;
@@ -114,7 +116,7 @@
private final AtomicBoolean replicationSuspended;
private AtomicBoolean terminateJobsReplication;
private AtomicBoolean jobsReplicationSuspended;
- private final static int INITIAL_BUFFER_SIZE = 4000; //4KB
+ private static final int INITIAL_BUFFER_SIZE =
StorageUtil.getSizeInBytes(4, StorageUnit.KILOBYTE);
private final Set<String> shuttingDownReplicaIds;
//replication threads
private ReplicationJobsProccessor replicationJobsProcessor;
@@ -128,9 +130,10 @@
private LinkedBlockingQueue<ReplicationLogBuffer> emptyLogBuffersQ;
private LinkedBlockingQueue<ReplicationLogBuffer> pendingFlushLogBuffersQ;
protected ReplicationLogBuffer currentTxnLogBuffer;
- private ReplicationLogFlusher txnlogsReplicator;
+ private TxnLogReplicator txnlogReplicator;
private Future<? extends Object> txnLogReplicatorTask;
- private Map<String, SocketChannel> logsReplicaSockets = null;
+ private SocketChannel[] logsRepSockets;
+ private final ByteBuffer txnLogsBatchSizeBuffer =
ByteBuffer.allocate(Integer.BYTES);
//TODO this class needs to be refactored by moving its private classes to
separate files
//and possibly using MessageBroker to send/receive remote replicas events.
@@ -179,13 +182,14 @@
clientPartitonsSet.addAll(clientPartitions);
replica2PartitionsMap.put(replica.getId(), clientPartitonsSet);
}
- int numLogBuffers = logManager.getNumLogPages();
+ int numLogBuffers = replicationProperties.getLogBufferNumOfPages();
emptyLogBuffersQ = new
LinkedBlockingQueue<ReplicationLogBuffer>(numLogBuffers);
pendingFlushLogBuffersQ = new
LinkedBlockingQueue<ReplicationLogBuffer>(numLogBuffers);
- int logBufferSize = logManager.getLogPageSize();
+ int logBufferSize = replicationProperties.getLogBufferPageSize();
for (int i = 0; i < numLogBuffers; i++) {
- emptyLogBuffersQ.offer(new ReplicationLogBuffer(this,
logBufferSize));
+ emptyLogBuffersQ
+ .offer(new ReplicationLogBuffer(this, logBufferSize,
replicationProperties.getLogBatchSize()));
}
}
@@ -209,16 +213,12 @@
}
@Override
- public void replicateLog(ILogRecord logRecord) {
+ public void replicateLog(ILogRecord logRecord) throws InterruptedException
{
if (logRecord.getLogType() == LogType.JOB_COMMIT ||
logRecord.getLogType() == LogType.ABORT) {
//if replication is suspended, wait until it is resumed.
while (replicationSuspended.get()) {
synchronized (replicationSuspended) {
- try {
- replicationSuspended.wait();
- } catch (InterruptedException e) {
- //ignore
- }
+ replicationSuspended.wait();
}
}
Set<String> replicaIds = Collections.synchronizedSet(new
HashSet<String>());
@@ -232,29 +232,23 @@
protected void getAndInitNewLargePage(int pageSize) {
// for now, alloc a new buffer for each large page
// TODO: consider pooling large pages
- currentTxnLogBuffer = new ReplicationLogBuffer(this, pageSize);
- currentTxnLogBuffer.setReplicationSockets(logsReplicaSockets);
+ currentTxnLogBuffer = new ReplicationLogBuffer(this, pageSize,
replicationProperties.getLogBufferPageSize());
pendingFlushLogBuffersQ.offer(currentTxnLogBuffer);
}
- protected void getAndInitNewPage() {
+ protected void getAndInitNewPage() throws InterruptedException {
currentTxnLogBuffer = null;
while (currentTxnLogBuffer == null) {
- try {
- currentTxnLogBuffer = emptyLogBuffersQ.take();
- } catch (InterruptedException e) {
- //ignore
- }
+ currentTxnLogBuffer = emptyLogBuffersQ.take();
}
currentTxnLogBuffer.reset();
- currentTxnLogBuffer.setReplicationSockets(logsReplicaSockets);
pendingFlushLogBuffersQ.offer(currentTxnLogBuffer);
}
- private synchronized void appendToLogBuffer(ILogRecord logRecord) {
+ private synchronized void appendToLogBuffer(ILogRecord logRecord) throws
InterruptedException {
if (!currentTxnLogBuffer.hasSpace(logRecord)) {
currentTxnLogBuffer.isFull(true);
- if (logRecord.getLogSize() > logManager.getLogPageSize()) {
+ if (logRecord.getLogSize() > getLogPageSize()) {
getAndInitNewLargePage(logRecord.getLogSize());
} else {
getAndInitNewPage();
@@ -319,6 +313,7 @@
}
LOGGER.log(Level.INFO, "Replicating file: " +
filePath);
+
//open file for reading
try (RandomAccessFile fromFile = new
RandomAccessFile(filePath, "r");
FileChannel fileChannel =
fromFile.getChannel();) {
@@ -331,6 +326,7 @@
.getComponentsToBeReplicated().get(0);
long LSNByteOffset =
AsterixLSMIndexUtil.getComponentFileLSNOffset(
(AbstractLSMIndex)
LSMComponentJob.getLSMIndex(), diskComponent, filePath);
+ //
System.out.println("LSN BYTE OFFSET: " + LSNByteOffset);
asterixFileProperties.initialize(filePath,
fileSize, nodeId, isLSMComponentFile,
LSNByteOffset, remainingFiles == 0);
} else {
@@ -458,7 +454,7 @@
}
/**
- * Suspends proccessing replication jobs.
+ * Suspends processing replication jobs/logs.
*
* @param force
* a flag indicates if replication should be suspended right
away or when the pending jobs are completed.
@@ -484,7 +480,7 @@
}
//suspend logs replication
- if (txnlogsReplicator != null) {
+ if (txnlogReplicator != null) {
terminateTxnLogsReplicator();
}
}
@@ -492,45 +488,118 @@
/**
* Opens a new connection with Active remote replicas and starts a listen
thread per connection.
*/
- private void establishTxnLogsReplicationConnection() {
- logsReplicaSockets = getActiveRemoteReplicasSockets();
+ private void establishTxnLogsRepHandshake() {
+ Map<String, SocketChannel> activeRemoteReplicasSockets =
getActiveRemoteReplicasSockets();
+ logsRepSockets = new SocketChannel[activeRemoteReplicasSockets.size()];
+ int i = 0;
//start a listener thread per connection
- for (Entry<String, SocketChannel> entry :
logsReplicaSockets.entrySet()) {
+ for (Entry<String, SocketChannel> entry :
activeRemoteReplicasSockets.entrySet()) {
+ logsRepSockets[i] = entry.getValue();
replicationListenerThreads
.execute(new
TxnLogsReplicationResponseListener(entry.getKey(), entry.getValue()));
+ i++;
+ }
+
+ /**
+ * establish log replication handshake
+ */
+ ByteBuffer handshakeBuffer =
ByteBuffer.allocate(ReplicationProtocol.REPLICATION_REQUEST_TYPE_SIZE)
+
.putInt(ReplicationProtocol.ReplicationRequestType.REPLICATE_LOG.ordinal());
+ handshakeBuffer.flip();
+ //send handshake request
+ for (SocketChannel replicaSocket : logsRepSockets) {
+ try {
+ NetworkingUtil.transferBufferToChannel(replicaSocket,
handshakeBuffer);
+ } catch (IOException e) {
+ handleReplicationFailure(replicaSocket, e);
+ } finally {
+ handshakeBuffer.position(0);
+ }
}
}
+ private void handleReplicationFailure(SocketChannel socketChannel,
Throwable t) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.log(Level.WARNING, "Could not complete replication
request.", t);
+ }
+ if (socketChannel.isOpen()) {
+ try {
+ socketChannel.close();
+ } catch (IOException e) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.log(Level.WARNING, "Could not close socket.", e);
+ }
+ }
+ }
+ reportFailedReplica(getReplicaIdBySocket(socketChannel));
+ }
+
/**
- * Stops ReplicationFlusherThread and closes the sockets used to replicate
logs.
+ * Stops TxnLogReplicator and closes the sockets used to replicate logs.
*/
private void terminateTxnLogsReplicator() {
- LOGGER.log(Level.INFO, "Terminating ReplicationLogFlusher thread ...");
- txnlogsReplicator.terminate();
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Terminating TxnLogReplicator thread ...");
+ }
+ txnlogReplicator.terminate();
try {
txnLogReplicatorTask.get();
} catch (ExecutionException | InterruptedException e) {
- LOGGER.log(Level.WARNING, "RepicationLogFlusher thread terminated
abnormally");
- e.printStackTrace();
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.log(Level.SEVERE, "TxnLogReplicator thread terminated
abnormally", e);
+ }
}
- LOGGER.log(Level.INFO, "LogFlusher thread is terminated.");
- if (logsReplicaSockets != null) {
- //wait for any ACK to arrive before closing sockets.
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("TxnLogReplicator thread was terminated.");
+ }
+
+ /**
+ * End log replication handshake (by sending a dummy log with a single
byte)
+ */
+ ByteBuffer endLogRepHandshake = ByteBuffer.allocate(Integer.SIZE +
1).putInt(1).put((byte) 0);
+ endLogRepHandshake.flip();
+ for (SocketChannel replicaSocket : logsRepSockets) {
+ try {
+ NetworkingUtil.transferBufferToChannel(replicaSocket,
endLogRepHandshake);
+ } catch (IOException e) {
+ handleReplicationFailure(replicaSocket, e);
+ } finally {
+ endLogRepHandshake.position(0);
+ }
+ }
+
+ //wait for any ACK to arrive before closing sockets.
+ if (logsRepSockets != null) {
synchronized (jobCommitAcks) {
- while (jobCommitAcks.size() != 0) {
- try {
+ try {
+ while (jobCommitAcks.size() != 0) {
jobCommitAcks.wait();
- } catch (InterruptedException e) {
- //ignore
}
+ } catch (InterruptedException e) {
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.log(Level.SEVERE, "Interrupted while waiting
for jobs ACK", e);
+ }
+ Thread.currentThread().interrupt();
}
}
-
- //close log replication sockets
- closeReplicaSockets(logsReplicaSockets);
- logsReplicaSockets = null;
}
+
+ /**
+ * Close log replication sockets
+ */
+ ByteBuffer goodbyeBuffer = ReplicationProtocol.getGoodbyeBuffer();
+ for (SocketChannel replicaSocket : logsRepSockets) {
+ try {
+ //send batch size
+ NetworkingUtil.transferBufferToChannel(replicaSocket,
goodbyeBuffer);
+ } catch (IOException e) {
+ handleReplicationFailure(replicaSocket, e);
+ } finally {
+ goodbyeBuffer.position(0);
+ }
+ }
+ logsRepSockets = null;
}
/**
@@ -651,8 +720,10 @@
* The new state of the replica.
* @param suspendReplication
* a flag indicating whether to suspend replication on state
change or not.
+ * @throws InterruptedException
*/
- public synchronized void updateReplicaState(String replicaId, ReplicaState
newState, boolean suspendReplication) {
+ public synchronized void updateReplicaState(String replicaId, ReplicaState
newState, boolean suspendReplication)
+ throws InterruptedException {
Replica replica = replicas.get(replicaId);
if (replica.getState() == newState) {
@@ -702,22 +773,24 @@
* The remote replica id the ACK received from.
*/
private void addAckToJob(int jobId, String replicaId) {
- //add ACK to the job
- if (jobCommitAcks.containsKey(jobId)) {
- Set<String> replicaIds = jobCommitAcks.get(jobId);
- replicaIds.add(replicaId);
- } else {
- throw new IllegalStateException("Job ID not found in pending job
commits " + jobId);
- }
+ synchronized (jobCommitAcks) {
+ //add ACK to the job
+ if (jobCommitAcks.containsKey(jobId)) {
+ Set<String> replicaIds = jobCommitAcks.get(jobId);
+ replicaIds.add(replicaId);
+ } else {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Invalid job replication ACK received for
jobId(" + jobId + ")");
+ }
+ return;
+ }
- //if got ACKs from all remote replicas, notify pending jobs if any
- if (jobCommitAcks.get(jobId).size() == replicationFactor) {
- synchronized (replicationJobsPendingAcks) {
- if (replicationJobsPendingAcks.containsKey(jobId)) {
- ILogRecord pendingLog =
replicationJobsPendingAcks.get(jobId);
- synchronized (pendingLog) {
- pendingLog.notify();
- }
+ //if got ACKs from all remote replicas, notify pending jobs if any
+
+ if (jobCommitAcks.get(jobId).size() == replicationFactor &&
replicationJobsPendingAcks.containsKey(jobId)) {
+ ILogRecord pendingLog = replicationJobsPendingAcks.get(jobId);
+ synchronized (pendingLog) {
+ pendingLog.notify();
}
}
}
@@ -725,26 +798,25 @@
@Override
public boolean hasBeenReplicated(ILogRecord logRecord) {
- if (jobCommitAcks.containsKey(logRecord.getJobId())) {
- //check if all ACKs have been received
- if (jobCommitAcks.get(logRecord.getJobId()).size() ==
replicationFactor) {
- jobCommitAcks.remove(logRecord.getJobId());
+ int jobId = logRecord.getJobId();
+ if (jobCommitAcks.containsKey(jobId)) {
+ synchronized (jobCommitAcks) {
+ //check if all ACKs have been received
+ if (jobCommitAcks.get(jobId).size() == replicationFactor) {
+ jobCommitAcks.remove(jobId);
- if
(replicationJobsPendingAcks.containsKey(logRecord.getJobId())) {
- replicationJobsPendingAcks.remove(logRecord);
- }
+ //remove from pending jobs if exists
+ replicationJobsPendingAcks.remove(jobId);
- //notify any threads waiting for all jobs to finish
- if (jobCommitAcks.size() == 0) {
- synchronized (jobCommitAcks) {
+ //notify any threads waiting for all jobs to finish
+ if (jobCommitAcks.size() == 0) {
jobCommitAcks.notifyAll();
}
+ return true;
+ } else {
+ replicationJobsPendingAcks.putIfAbsent(jobId, logRecord);
+ return false;
}
-
- return true;
- } else {
- replicationJobsPendingAcks.putIfAbsent(logRecord.getJobId(),
logRecord);
- return false;
}
}
@@ -776,7 +848,7 @@
* @throws IOException
*/
private SocketChannel getReplicaSocket(String replicaId) throws
IOException {
- Replica replica = replicas.get(replicaId);
+ Replica replica = replicationProperties.getReplicaById(replicaId);
SocketChannel sc = SocketChannel.open();
sc.configureBlocking(true);
InetSocketAddress address = replica.getAddress(replicationProperties);
@@ -854,9 +926,7 @@
@Override
public void reportReplicaEvent(ReplicaEvent event) {
- synchronized (replicaEventsQ) {
- replicaEventsQ.offer(event);
- }
+ replicaEventsQ.offer(event);
}
/**
@@ -867,6 +937,9 @@
*/
public void reportFailedReplica(String replicaId) {
Replica replica = replicas.get(replicaId);
+ if (replica == null) {
+ return;
+ }
if (replica.getState() == ReplicaState.DEAD) {
return;
}
@@ -878,16 +951,28 @@
reportReplicaEvent(event);
}
+ private String getReplicaIdBySocket(SocketChannel socketChannel) {
+ InetSocketAddress socketAddress =
NetworkingUtil.getSocketAddress(socketChannel);
+ for (Replica replica : replicas.values()) {
+ InetSocketAddress replicaAddress =
replica.getAddress(replicationProperties);
+ if
(replicaAddress.getHostName().equals(socketAddress.getHostName())
+ && replicaAddress.getPort() == socketAddress.getPort()) {
+ return replica.getId();
+ }
+ }
+ return null;
+ }
+
@Override
- public void startReplicationThreads() {
+ public void startReplicationThreads() throws InterruptedException {
replicationJobsProcessor = new ReplicationJobsProccessor();
//start/continue processing jobs/logs
- if (logsReplicaSockets == null) {
- establishTxnLogsReplicationConnection();
+ if (logsRepSockets == null) {
+ establishTxnLogsRepHandshake();
getAndInitNewPage();
- txnlogsReplicator = new ReplicationLogFlusher(emptyLogBuffersQ,
pendingFlushLogBuffersQ);
- txnLogReplicatorTask =
asterixAppRuntimeContextProvider.getThreadExecutor().submit(txnlogsReplicator);
+ txnlogReplicator = new TxnLogReplicator(emptyLogBuffersQ,
pendingFlushLogBuffersQ);
+ txnLogReplicatorTask =
asterixAppRuntimeContextProvider.getThreadExecutor().submit(txnlogReplicator);
}
replicationJobsProcessor.start();
@@ -1037,7 +1122,42 @@
}
public int getLogPageSize() {
- return logManager.getLogPageSize();
+ return replicationProperties.getLogBufferPageSize();
+ }
+
+ @Override
+ public void replicateTxnLogBatch(final ByteBuffer buffer) {
+ //if replication is suspended, wait until it is resumed
+ try {
+ while (replicationSuspended.get()) {
+ synchronized (replicationSuspended) {
+ replicationSuspended.wait();
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ //prepare the batch size buffer
+ txnLogsBatchSizeBuffer.clear();
+ txnLogsBatchSizeBuffer.putInt(buffer.remaining());
+ txnLogsBatchSizeBuffer.flip();
+
+ buffer.mark();
+ for (SocketChannel replicaSocket : logsRepSockets) {
+ try {
+ //send batch size
+ NetworkingUtil.transferBufferToChannel(replicaSocket,
txnLogsBatchSizeBuffer);
+ //send log
+ NetworkingUtil.transferBufferToChannel(replicaSocket, buffer);
+ } catch (IOException e) {
+ handleReplicationFailure(replicaSocket, e);
+ } finally {
+ txnLogsBatchSizeBuffer.position(0);
+ buffer.reset();
+ }
+ }
+ //move the buffer position to the sent limit
+ buffer.position(buffer.limit());
}
//supporting classes
@@ -1068,12 +1188,12 @@
break;
}
} catch (InterruptedException e) {
- //ignore
+ Thread.currentThread().interrupt();
}
}
}
- public void handleReplicaFailure(String replicaId) {
+ public void handleReplicaFailure(String replicaId) throws
InterruptedException {
Replica replica = replicas.get(replicaId);
if (replica.getState() == ReplicaState.DEAD) {
diff --git
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
index 47e60b2..d14a0f7 100644
---
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
+++
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
@@ -204,7 +204,7 @@
}
@Override
- public void completeFailbackProcess() throws IOException {
+ public void completeFailbackProcess() throws IOException,
InterruptedException {
ILogManager logManager =
runtimeContext.getTransactionSubsystem().getLogManager();
ReplicaResourcesManager replicaResourcesManager =
(ReplicaResourcesManager) runtimeContext
.getReplicaResourcesManager();
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
index 872adcd..e9d8a8c 100644
---
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
@@ -113,7 +113,7 @@
@Override
public void appendWithReplication(ILogRecord logRecord, long appendLSN) {
- logRecord.writeLogRecord(appendBuffer, appendLSN);
+ logRecord.writeLogRecord(appendBuffer);
if (logRecord.getLogSource() == LogSource.LOCAL &&
logRecord.getLogType() != LogType.FLUSH
&& logRecord.getLogType() != LogType.WAIT) {
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
index cacd036..8e615af 100644
---
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
@@ -37,10 +37,6 @@
@Override
public void log(ILogRecord logRecord) throws ACIDException {
- if (logRecord.getLogSize() > logPageSize) {
- throw new IllegalStateException();
- }
-
//only locally generated logs should be replicated
logRecord.setReplicated(logRecord.getLogSource() == LogSource.LOCAL &&
logRecord.getLogType() != LogType.WAIT);
@@ -58,7 +54,11 @@
syncAppendToLogTail(logRecord);
if (logRecord.isReplicated()) {
- replicationManager.replicateLog(logRecord);
+ try {
+ replicationManager.replicateLog(logRecord);
+ } catch (InterruptedException e) {
+ throw new ACIDException(e);
+ }
}
if (logRecord.getLogSource() == LogSource.LOCAL) {
@@ -69,7 +69,7 @@
try {
logRecord.wait();
} catch (InterruptedException e) {
- //ignore
+ Thread.currentThread().interrupt();
}
}
@@ -79,7 +79,7 @@
try {
logRecord.wait();
} catch (InterruptedException e) {
- //ignore
+ Thread.currentThread().interrupt();
}
}
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/883
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I25b5b84eba0cd41ac8e87e71368072879fcf8582
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <[email protected]>