Murtadha Hubail has submitted this change and it was merged. Change subject: Txn Log Replication Optimizations ......................................................................
Txn Log Replication Optimizations - Add configurations for txn log replication. - Establish handshake for continuous log replication. - Replicate logs in batches instead of one by one. - Eliminate HashMap iterator object creation per logs batch. - Fix synchronization for jobs' ACKs from remote replicas. - Bug fixes for big objects replication. - Some SONAR fixes for old code. Change-Id: I25b5b84eba0cd41ac8e87e71368072879fcf8582 Reviewed-on: https://asterix-gerrit.ics.uci.edu/883 Reviewed-by: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: abdullah alamoudi <[email protected]> --- M asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java M asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java R asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/TxnLogReplicator.java M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java 16 files changed, 653 insertions(+), 522 deletions(-) Approvals: abdullah alamoudi: Looks good to me, approved Jenkins: Looks good to me, but someone else must approve; Verified diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java index 0a6d62d..c71d77e 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java @@ -150,7 +150,7 @@ } } - private void startReplicationService() { + private void startReplicationService() throws InterruptedException { //Open replication channel runtimeContext.getReplicationChannel().start(); diff --git a/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml b/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml index b1d0649..1d693f9 100644 --- a/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml +++ b/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml @@ -98,10 +98,4 @@ <description>Enabling plot of Algebricks plan to tmp folder. (Default = false) </description> </property> - <property> - <name>log.level</name> - <value>WARNING</value> - <description>The minimum log level to be displayed. (Default = INFO) - </description> - </property> </asterixConfiguration> diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java index 5d31d9a..7f51bbd 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java @@ -26,18 +26,30 @@ import org.apache.asterix.common.replication.Replica; import org.apache.asterix.event.schema.cluster.Cluster; import org.apache.asterix.event.schema.cluster.Node; +import org.apache.hyracks.util.StorageUtil; +import org.apache.hyracks.util.StorageUtil.StorageUnit; public class AsterixReplicationProperties extends AbstractAsterixProperties { private static final Logger LOGGER = Logger.getLogger(AsterixReplicationProperties.class.getName()); - private static int REPLICATION_DATAPORT_DEFAULT = 2000; - private static int REPLICATION_FACTOR_DEFAULT = 1; - private static int REPLICATION_TIME_OUT_DEFAULT = 15; + private static final int REPLICATION_DATAPORT_DEFAULT = 2000; + private static final int REPLICATION_FACTOR_DEFAULT = 1; + private static final int REPLICATION_TIME_OUT_DEFAULT = 15; private static final int MAX_REMOTE_RECOVERY_ATTEMPTS = 5; private static final String NODE_IP_ADDRESS_DEFAULT = "127.0.0.1"; private final String NODE_NAME_PREFIX; private final Cluster cluster; + + private static final String REPLICATION_LOG_BATCH_SIZE_KEY = "replication.log.batchsize"; + private static final int REPLICATION_LOG_BATCH_SIZE_DEFAULT = StorageUtil.getSizeInBytes(4, StorageUnit.KILOBYTE); + + private static final String REPLICATION_LOG_BUFFER_NUM_PAGES_KEY = "replication.log.buffer.numpages"; + private static final int REPLICATION_LOG_BUFFER_NUM_PAGES_DEFAULT = 8; + + private static final String REPLICATION_LOG_BUFFER_PAGE_SIZE_KEY = "replication.log.buffer.pagesize"; + private static final int REPLICATION_LOG_BUFFER_PAGE_SIZE_DEFAULT = StorageUtil.getSizeInBytes(128, + StorageUnit.KILOBYTE); public AsterixReplicationProperties(AsterixPropertiesAccessor accessor, Cluster cluster) { super(accessor); @@ -90,7 +102,7 @@ } public Set<Replica> getRemoteReplicas(String nodeId) { - Set<Replica> remoteReplicas = new HashSet<Replica>();; + Set<Replica> remoteReplicas = new HashSet<>();; int numberOfRemoteReplicas = getReplicationFactor() - 1; //Using chained-declustering @@ -161,7 +173,7 @@ } public Set<String> getRemoteReplicasIds(String nodeId) { - Set<String> remoteReplicasIds = new HashSet<String>(); + Set<String> remoteReplicasIds = new HashSet<>(); Set<Replica> remoteReplicas = getRemoteReplicas(nodeId); for (Replica replica : remoteReplicas) { @@ -176,7 +188,7 @@ } public Set<String> getNodeReplicasIds(String nodeId) { - Set<String> replicaIds = new HashSet<String>(); + Set<String> replicaIds = new HashSet<>(); replicaIds.add(nodeId); replicaIds.addAll(getRemoteReplicasIds(nodeId)); return replicaIds; @@ -245,4 +257,19 @@ public int getMaxRemoteRecoveryAttempts() { return MAX_REMOTE_RECOVERY_ATTEMPTS; } + + public int getLogBufferPageSize() { + return accessor.getProperty(REPLICATION_LOG_BUFFER_PAGE_SIZE_KEY, REPLICATION_LOG_BUFFER_PAGE_SIZE_DEFAULT, + PropertyInterpreters.getIntegerPropertyInterpreter()); + } + + public int getLogBufferNumOfPages() { + return accessor.getProperty(REPLICATION_LOG_BUFFER_NUM_PAGES_KEY, REPLICATION_LOG_BUFFER_NUM_PAGES_DEFAULT, + PropertyInterpreters.getIntegerPropertyInterpreter()); + } + + public int getLogBatchSize() { + return accessor.getProperty(REPLICATION_LOG_BATCH_SIZE_KEY, REPLICATION_LOG_BATCH_SIZE_DEFAULT, + PropertyInterpreters.getIntegerPropertyInterpreter()); + } } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java index a2738e8..9f9d74b 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java @@ -43,6 +43,7 @@ * Requests the remaining LSM disk components files from active remote replicas. * * @throws IOException + * @throws InterruptedException */ - public void completeFailbackProcess() throws IOException; + public void completeFailbackProcess() throws IOException, InterruptedException; } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java index 755fbbd..6bd1505 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java @@ -19,6 +19,7 @@ package org.apache.asterix.common.replication; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Set; import org.apache.asterix.common.transactions.ILogRecord; @@ -31,8 +32,9 @@ * * @param logRecord * The log record to be replicated, + * @throws InterruptedException */ - public void replicateLog(ILogRecord logRecord); + public void replicateLog(ILogRecord logRecord) throws InterruptedException; /** * Checks whether a log record has been replicated @@ -79,8 +81,10 @@ /** * Starts processing of ASYNC replication jobs as well as Txn logs. + * + * @throws InterruptedException */ - public void startReplicationThreads(); + public void startReplicationThreads() throws InterruptedException; /** * Checks and sets each remote replica state. @@ -114,4 +118,13 @@ * @throws IOException */ public void requestFlushLaggingReplicaIndexes(long nonSharpCheckpointTargetLSN) throws IOException; + + /** + * Transfers the contents of the {@code buffer} to active remote replicas. + * The transfer starts from the {@code buffer} current position to its limit. + * After the transfer, the {@code buffer} position will be its limit. + * + * @param buffer + */ + public void replicateTxnLogBatch(ByteBuffer buffer); } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java index 4c3f728..b8fe4b2 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java @@ -62,8 +62,7 @@ public InetSocketAddress getAddress(AsterixReplicationProperties asterixReplicationProperties) { String replicaIPAddress = node.getClusterIp(); int replicationPort = asterixReplicationProperties.getDataReplicationPort(node.getId()); - InetSocketAddress replicaAddress = InetSocketAddress.createUnresolved(replicaIPAddress, replicationPort); - return replicaAddress; + return InetSocketAddress.createUnresolved(replicaIPAddress, replicationPort); } public static Replica create(DataInput input) throws IOException { diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java index 3738cd1..1992a00 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java @@ -110,9 +110,7 @@ public String getNodeId(); - public int writeRemoteRecoveryLog(ByteBuffer buffer); - - public RecordReadStatus readRemoteLog(ByteBuffer buffer, boolean remoteRecoveryLog); + public void readRemoteLog(ByteBuffer buffer); public void setReplicationThread(IReplicationThread replicationThread); @@ -120,11 +118,7 @@ public byte getLogSource(); - public int getSerializedLogSize(); - - public void writeLogRecord(ByteBuffer buffer, long appendLSN); - - public ByteBuffer getSerializedLog(); + public int getRemoteLogSize(); public void setNodeId(String nodeId); @@ -138,4 +132,6 @@ * @return a flag indicating whether the log record should be sent to remote replicas */ public boolean isReplicated(); + + public void writeRemoteLogRecord(ByteBuffer buffer); } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java index fd56913..4823a92 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java @@ -29,7 +29,7 @@ import org.apache.hyracks.storage.am.common.tuples.SimpleTupleReference; import org.apache.hyracks.storage.am.common.tuples.SimpleTupleWriter; -/* +/** * == LogRecordFormat == * --------------------------- * [Header1] (6 bytes) : for all log types @@ -44,8 +44,7 @@ * PKValueSize(4) * PKValue(PKValueSize) * --------------------------- - * [Header3] (20 bytes) : only for update log type - * PrevLSN(8) + * [Header3] (12 bytes) : only for update log type * ResourceId(8) //stored in .metadata of the corresponding index in NC node * LogRecordSize(4) * --------------------------- @@ -61,16 +60,34 @@ * = LogSize = * 1) JOB_COMMIT_LOG_SIZE: 14 bytes (Header1(6) + Tail(8)) * 2) ENTITY_COMMIT || UPSERT_ENTITY_COMMIT: (Header1(6) + Header2(16) + Tail(8)) + PKValueSize - * --> ENTITY_COMMIT_LOG_BASE_SIZE = 30 + * --> ENTITY_COMMIT_LOG_BASE_SIZE = 30 * 3) UPDATE: (Header1(6) + Header2(16) + + Header3(20) + Body(9) + Tail(8)) + PKValueSize + NewValueSize - * --> UPDATE_LOG_BASE_SIZE = 59 + * --> UPDATE_LOG_BASE_SIZE = 59 * 4) FLUSH: 18 bytes (Header1(6) + DatasetId(4) + Tail(8)) * 5) WAIT_LOG_SIZE: 14 bytes (Header1(6) + Tail(8)) - * --> WAIT_LOG only requires LogType Field, but in order to conform the log reader protocol - * it also includes LogSource and JobId fields. + * --> WAIT_LOG only requires LogType Field, but in order to conform the log reader protocol + * it also includes LogSource and JobId fields. */ public class LogRecord implements ILogRecord { + + private static final int LOG_SOURCE_LEN = Byte.BYTES; + private static final int TYPE_LEN = Byte.BYTES; + public static final int PKHASH_LEN = Integer.BYTES; + public static final int PKSZ_LEN = Integer.BYTES; + private static final int RS_PARTITION_LEN = Integer.BYTES; + private static final int RSID_LEN = Long.BYTES; + private static final int LOGRCD_SZ_LEN = Integer.BYTES; + private static final int FLDCNT_LEN = Integer.BYTES; + private static final int NEWOP_LEN = Byte.BYTES; + private static final int NEWVALSZ_LEN = Integer.BYTES; + private static final int CHKSUM_LEN = Long.BYTES; + + private static final int ALL_RECORD_HEADER_LEN = LOG_SOURCE_LEN + TYPE_LEN + JobId.BYTES; + private static final int ENTITYCOMMIT_UPDATE_HEADER_LEN = RS_PARTITION_LEN + DatasetId.BYTES + PKHASH_LEN + + PKSZ_LEN; + private static final int UPDATE_LSN_HEADER = RSID_LEN + LOGRCD_SZ_LEN; + private static final int UPDATE_BODY_HEADER = FLDCNT_LEN + NEWOP_LEN + NEWVALSZ_LEN; // ------------- fields in a log record (begin) ------------// private byte logSource; @@ -101,9 +118,8 @@ private int[] PKFields; private PrimaryIndexOperationTracker opTracker; private IReplicationThread replicationThread; - private ByteBuffer serializedLog; /** - * The fields (numOfFlushedIndexes and nodeId) are used for serialized flush logs only + * The fields (numOfFlushedIndexes and nodeId) are used for remote flush logs only * to indicate the source of the log and how many indexes were flushed using its LSN. */ private int numOfFlushedIndexes; @@ -118,25 +134,6 @@ checksumGen = new CRC32(); logSource = LogSource.LOCAL; } - - private final static int LOG_SOURCE_LEN = Byte.BYTES; - private final static int TYPE_LEN = Byte.BYTES; - public final static int PKHASH_LEN = Integer.BYTES; - public final static int PKSZ_LEN = Integer.BYTES; - private final static int RS_PARTITION_LEN = Integer.BYTES; - private final static int RSID_LEN = Long.BYTES; - private final static int LOGRCD_SZ_LEN = Integer.BYTES; - private final static int FLDCNT_LEN = Integer.BYTES; - private final static int NEWOP_LEN = Byte.BYTES; - private final static int NEWVALSZ_LEN = Integer.BYTES; - private final static int CHKSUM_LEN = Long.BYTES; - - private final static int ALL_RECORD_HEADER_LEN = LOG_SOURCE_LEN + TYPE_LEN + JobId.BYTES; - private final static int ENTITYCOMMIT_UPDATE_HEADER_LEN = RS_PARTITION_LEN + DatasetId.BYTES + PKHASH_LEN - + PKSZ_LEN; - private final static int UPDATE_LSN_HEADER = RSID_LEN + LOGRCD_SZ_LEN; - private final static int UPDATE_BODY_HEADER = FLDCNT_LEN + NEWOP_LEN + NEWVALSZ_LEN; - private final static int REMOTE_FLUSH_LOG_EXTRA_FIELDS_LEN = Long.BYTES + Integer.BYTES + Integer.BYTES; private void writeLogRecordCommonFields(ByteBuffer buffer) { buffer.put(logSource); @@ -173,39 +170,15 @@ buffer.putLong(checksum); } - // this method is used when replication is enabled to include the log record LSN in the serialized version @Override - public void writeLogRecord(ByteBuffer buffer, long appendLSN) { - int beginOffset = buffer.position(); + public void writeRemoteLogRecord(ByteBuffer buffer) { writeLogRecordCommonFields(buffer); - - if (replicated) { - //copy the serialized log to send it to replicas - int serializedLogSize = getSerializedLogSize(); - if (serializedLog == null || serializedLog.capacity() < serializedLogSize) { - serializedLog = ByteBuffer.allocate(serializedLogSize); - } else { - serializedLog.clear(); - } - - int currentPosition = buffer.position(); - int currentLogSize = (currentPosition - beginOffset); - - buffer.position(beginOffset); - buffer.get(serializedLog.array(), 0, currentLogSize); - serializedLog.position(currentLogSize); - if (logType == LogType.FLUSH) { - serializedLog.putLong(appendLSN); - serializedLog.putInt(numOfFlushedIndexes); - serializedLog.putInt(nodeId.length()); - serializedLog.put(nodeId.getBytes()); - } - serializedLog.flip(); - buffer.position(currentPosition); + if (logType == LogType.FLUSH) { + buffer.putLong(LSN); + buffer.putInt(numOfFlushedIndexes); + buffer.putInt(nodeId.length()); + buffer.put(nodeId.getBytes()); } - - checksum = generateChecksum(buffer, beginOffset, logSize - CHKSUM_LEN); - buffer.putLong(checksum); } private void writePKValue(ByteBuffer buffer) { @@ -221,8 +194,12 @@ } private void writeTuple(ByteBuffer buffer, ITupleReference tuple, int size) { - tupleWriter.writeTuple(tuple, buffer.array(), buffer.position()); - // writeTuple() doesn't change the position of the buffer. + if (logSource == LogSource.LOCAL) { + tupleWriter.writeTuple(tuple, buffer.array(), buffer.position()); + } else { + //since the tuple is already serialized in remote logs, just copy it from beginning to end. + System.arraycopy(tuple.getFieldData(0), 0, buffer.array(), buffer.position(), size); + } buffer.position(buffer.position() + size); } @@ -323,47 +300,19 @@ } @Override - public RecordReadStatus readRemoteLog(ByteBuffer buffer, boolean remoteRecoveryLog) { - int beginOffset = buffer.position(); - + public void readRemoteLog(ByteBuffer buffer) { //read common fields - RecordReadStatus status = readLogCommonFields(buffer); - if (status != RecordReadStatus.OK) { - buffer.position(beginOffset); - return status; - } + readLogCommonFields(buffer); if (logType == LogType.FLUSH) { - if (buffer.remaining() >= REMOTE_FLUSH_LOG_EXTRA_FIELDS_LEN) { - LSN = buffer.getLong(); - numOfFlushedIndexes = buffer.getInt(); - //read serialized node id - int nodeIdLength = buffer.getInt(); - if (buffer.remaining() >= nodeIdLength) { - byte[] nodeIdBytes = new byte[nodeIdLength]; - buffer.get(nodeIdBytes); - nodeId = new String(nodeIdBytes); - } else { - buffer.position(beginOffset); - return RecordReadStatus.TRUNCATED; - } - } else { - buffer.position(beginOffset); - return RecordReadStatus.TRUNCATED; - } + LSN = buffer.getLong(); + numOfFlushedIndexes = buffer.getInt(); + //read serialized node id + int nodeIdLength = buffer.getInt(); + byte[] nodeIdBytes = new byte[nodeIdLength]; + buffer.get(nodeIdBytes); + nodeId = new String(nodeIdBytes); } - - //remote recovery logs need to have the LSN to check which should be replayed - if (remoteRecoveryLog) { - if (buffer.remaining() >= Long.BYTES) { - LSN = buffer.getLong(); - } else { - buffer.position(beginOffset); - return RecordReadStatus.TRUNCATED; - } - } - - return RecordReadStatus.OK; } private ITupleReference readPKValue(ByteBuffer buffer) { @@ -443,16 +392,6 @@ builder.append(" ResourceId : ").append(resourceId); } return builder.toString(); - } - - @Override - public int writeRemoteRecoveryLog(ByteBuffer buffer) { - int bufferBegin = buffer.position(); - writeLogRecordCommonFields(buffer); - //FLUSH logs should not included in remote recovery - //LSN must be included in all remote recovery logs - buffer.putLong(LSN); - return buffer.position() - bufferBegin; } //////////////////////////////////////////// @@ -535,18 +474,18 @@ } @Override - public int getSerializedLogSize() { - int serilizedSize = logSize; + public int getRemoteLogSize() { + int remoteLogSize = logSize; if (logType == LogType.FLUSH) { //LSN - serilizedSize += Long.BYTES; + remoteLogSize += Long.BYTES; //num of indexes - serilizedSize += Integer.BYTES; + remoteLogSize += Integer.BYTES; //serialized node id String - serilizedSize += Integer.BYTES + nodeId.length(); + remoteLogSize += Integer.BYTES + nodeId.length(); } - serilizedSize -= CHKSUM_LEN; - return serilizedSize; + remoteLogSize -= CHKSUM_LEN; + return remoteLogSize; } @Override @@ -628,15 +567,6 @@ public PrimaryIndexOperationTracker getOpTracker() { return opTracker; - } - - @Override - public ByteBuffer getSerializedLog() { - return serializedLog; - } - - public void setSerializedLog(ByteBuffer serializedLog) { - this.serializedLog = serializedLog; } @Override diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java index 588968c..283f69f 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java @@ -18,53 +18,42 @@ */ package org.apache.asterix.replication.logging; -import java.io.IOException; import java.nio.ByteBuffer; -import java.nio.channels.SocketChannel; -import java.util.Iterator; -import java.util.Map; -import java.util.Map.Entry; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.asterix.common.transactions.ILogRecord; -import org.apache.asterix.replication.functions.ReplicationProtocol; -import org.apache.asterix.replication.management.NetworkingUtil; import org.apache.asterix.replication.management.ReplicationManager; public class ReplicationLogBuffer { private final int logBufferSize; private final AtomicBoolean full; private int appendOffset; - private int flushOffset; + private int replicationOffset; private final ByteBuffer appendBuffer; - private final ByteBuffer flushBuffer; + private final ByteBuffer replicationBuffer; private boolean stop; - private Map<String, SocketChannel> replicaSockets; private ReplicationManager replicationManager; + private final int batchSize; - public ReplicationLogBuffer(ReplicationManager replicationManager, int logBufferSize) { + public ReplicationLogBuffer(ReplicationManager replicationManager, int logBufferSize, int batchSize) { this.replicationManager = replicationManager; this.logBufferSize = logBufferSize; + this.batchSize = batchSize; appendBuffer = ByteBuffer.allocate(logBufferSize); - flushBuffer = appendBuffer.duplicate(); + replicationBuffer = appendBuffer.duplicate(); full = new AtomicBoolean(false); appendOffset = 0; - flushOffset = 0; + replicationOffset = 0; } public void append(ILogRecord logRecord) { - appendBuffer.putInt(ReplicationProtocol.ReplicationRequestType.REPLICATE_LOG.ordinal()); - appendBuffer.putInt(logRecord.getSerializedLogSize()); - appendBuffer.put(logRecord.getSerializedLog()); + appendBuffer.putInt(logRecord.getRemoteLogSize()); + logRecord.writeRemoteLogRecord(appendBuffer); synchronized (this) { appendOffset += getLogReplicationSize(logRecord); this.notify(); } - } - - public void setReplicationSockets(Map<String, SocketChannel> replicaSockets) { - this.replicaSockets = replicaSockets; } public synchronized void isFull(boolean full) { @@ -77,18 +66,18 @@ } private static int getLogReplicationSize(ILogRecord logRecord) { - //request type + request length + serialized log length - return Integer.BYTES + Integer.BYTES + logRecord.getSerializedLogSize(); + //log length (4 bytes) + remote log size + return Integer.BYTES + logRecord.getRemoteLogSize(); } public void reset() { appendBuffer.position(0); appendBuffer.limit(logBufferSize); - flushBuffer.position(0); - flushBuffer.limit(logBufferSize); + replicationBuffer.position(0); + replicationBuffer.limit(logBufferSize); full.set(false); appendOffset = 0; - flushOffset = 0; + replicationOffset = 0; stop = false; } @@ -96,57 +85,66 @@ int endOffset; while (!full.get()) { synchronized (this) { - if (appendOffset - flushOffset == 0 && !full.get()) { + if (appendOffset - replicationOffset == 0 && !full.get()) { try { if (stop) { break; } this.wait(); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); continue; } } endOffset = appendOffset; } - internalFlush(flushOffset, endOffset); + internalFlush(replicationOffset, endOffset); } - - internalFlush(flushOffset, appendOffset); + internalFlush(replicationOffset, appendOffset); } private void internalFlush(int beginOffset, int endOffset) { if (endOffset > beginOffset) { - int begingPos = flushBuffer.position(); - flushBuffer.limit(endOffset); - sendRequest(replicaSockets, flushBuffer); - flushBuffer.position(begingPos + (endOffset - beginOffset)); - flushOffset = endOffset; + int begingPos = replicationBuffer.position(); + replicationBuffer.limit(endOffset); + transferBuffer(replicationBuffer); + replicationBuffer.position(begingPos + (endOffset - beginOffset)); + replicationOffset = endOffset; } } - private void sendRequest(Map<String, SocketChannel> replicaSockets, ByteBuffer requestBuffer) { - Iterator<Map.Entry<String, SocketChannel>> iterator = replicaSockets.entrySet().iterator(); - int begin = requestBuffer.position(); - while (iterator.hasNext()) { - Entry<String, SocketChannel> replicaSocket = iterator.next(); - SocketChannel clientSocket = replicaSocket.getValue(); - try { - NetworkingUtil.transferBufferToChannel(clientSocket, requestBuffer); - } catch (IOException e) { - if (clientSocket.isOpen()) { - try { - clientSocket.close(); - } catch (IOException e2) { - e2.printStackTrace(); - } - } - replicationManager.reportFailedReplica(replicaSocket.getKey()); - iterator.remove(); - } finally { - requestBuffer.position(begin); - } + private void transferBuffer(ByteBuffer buffer) { + if (buffer.remaining() <= batchSize) { + //the current batch can be sent as it is + replicationManager.replicateTxnLogBatch(buffer); + return; } + /** + * break the batch into smaller batches + */ + int totalTransferLimit = buffer.limit(); + while (buffer.hasRemaining()) { + if (buffer.remaining() > batchSize) { + //mark the beginning of this batch + buffer.mark(); + int currentBatchSize = 0; + while (currentBatchSize < batchSize) { + int logSize = replicationBuffer.getInt(); + //add the size of the log record itself + 4 bytes for its size + currentBatchSize += logSize + Integer.BYTES; + //go to the beginning of the next log + buffer.position(buffer.position() + logSize); + } + //set the limit to the end of this batch + buffer.limit(buffer.position()); + //return to the beginning of the batch position + buffer.reset(); + } + replicationManager.replicateTxnLogBatch(buffer); + //return the original limit to check the new remaining size + buffer.limit(totalTransferLimit); + } } public boolean isStop() { diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogFlusher.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/TxnLogReplicator.java similarity index 71% rename from asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogFlusher.java rename to asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/TxnLogReplicator.java index 3312cb1..118fde6 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogFlusher.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/TxnLogReplicator.java @@ -21,24 +21,22 @@ import java.util.concurrent.Callable; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Level; import java.util.logging.Logger; - -import org.apache.asterix.common.transactions.LogRecord; /** * This class is responsible for sending transactions logs to remote replicas. */ -public class ReplicationLogFlusher implements Callable<Boolean> { - private final static Logger LOGGER = Logger.getLogger(ReplicationLogFlusher.class.getName()); - private final static ReplicationLogBuffer POISON_PILL = new ReplicationLogBuffer(null, - LogRecord.JOB_TERMINATE_LOG_SIZE); +public class TxnLogReplicator implements Callable<Boolean> { + private static final Logger LOGGER = Logger.getLogger(TxnLogReplicator.class.getName()); + private static final ReplicationLogBuffer POISON_PILL = new ReplicationLogBuffer(null, 0, 0); private final LinkedBlockingQueue<ReplicationLogBuffer> emptyQ; private final LinkedBlockingQueue<ReplicationLogBuffer> flushQ; private ReplicationLogBuffer flushPage; private final AtomicBoolean isStarted; private final AtomicBoolean terminateFlag; - public ReplicationLogFlusher(LinkedBlockingQueue<ReplicationLogBuffer> emptyQ, + public TxnLogReplicator(LinkedBlockingQueue<ReplicationLogBuffer> emptyQ, LinkedBlockingQueue<ReplicationLogBuffer> flushQ) { this.emptyQ = emptyQ; this.flushQ = flushQ; @@ -54,7 +52,7 @@ try { isStarted.wait(); } catch (InterruptedException e) { - //ignore + Thread.currentThread().interrupt(); } } } @@ -74,34 +72,37 @@ @Override public Boolean call() { - Thread.currentThread().setName("Replication Log Flusher"); + Thread.currentThread().setName("TxnLog Replicator"); synchronized (isStarted) { isStarted.set(true); isStarted.notify(); } - try { - while (true) { + + while (true) { + try { + if (terminateFlag.get()) { + return true; + } + flushPage = null; - try { - flushPage = flushQ.take(); - if (flushPage == POISON_PILL || terminateFlag.get()) { - return true; - } - } catch (InterruptedException e) { - if (flushPage == null) { - continue; - } + flushPage = flushQ.take(); + if (flushPage == POISON_PILL) { + continue; } flushPage.flush(); // TODO: pool large pages if (flushPage.getLogBufferSize() == flushPage.getReplicationManager().getLogPageSize()) { emptyQ.offer(flushPage); } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (Exception e) { + if (LOGGER.isLoggable(Level.SEVERE)) { + LOGGER.log(Level.SEVERE, "TxnLogReplicator is terminating abnormally. Logs Replication Stopped.", + e); + } + throw e; } - } catch (Exception e) { - e.printStackTrace(); - LOGGER.severe("ReplicationLogFlusher is terminating abnormally. Logs Replication Stopped."); - throw e; } } } diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java index 6023cb1..62c1e4a 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java @@ -21,6 +21,7 @@ import java.io.EOFException; import java.io.IOException; import java.net.InetAddress; +import java.net.InetSocketAddress; import java.net.NetworkInterface; import java.net.SocketException; import java.nio.ByteBuffer; @@ -30,6 +31,10 @@ import java.util.Enumeration; public class NetworkingUtil { + + private NetworkingUtil() { + throw new AssertionError("This util class should not be initialized."); + } public static void readBytes(SocketChannel socketChannel, ByteBuffer byteBuffer, int length) throws IOException { byteBuffer.clear(); @@ -88,7 +93,8 @@ return hostName; } - public static void transferBufferToChannel(SocketChannel socketChannel, ByteBuffer requestBuffer) throws IOException { + public static void transferBufferToChannel(SocketChannel socketChannel, ByteBuffer requestBuffer) + throws IOException { while (requestBuffer.hasRemaining()) { socketChannel.write(requestBuffer); } @@ -107,4 +113,10 @@ long fileSize = fileChannel.size(); fileChannel.transferFrom(socketChannel, pos, fileSize); } + + public static InetSocketAddress getSocketAddress(SocketChannel socketChannel) { + String hostAddress = socketChannel.socket().getInetAddress().getHostAddress(); + int port = socketChannel.socket().getPort(); + return InetSocketAddress.createUnresolved(hostAddress, port); + } } diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java index a152f6c..cabfc77 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java @@ -83,6 +83,7 @@ public class ReplicationChannel extends Thread implements IReplicationChannel { private static final Logger LOGGER = Logger.getLogger(ReplicationChannel.class.getName()); + private static final int LOG_REPLICATION_END_HANKSHAKE_LOG_SIZE = 1; private final ExecutorService replicationThreads; private final String localNodeID; private final ILogManager logManager; @@ -91,8 +92,8 @@ private ServerSocketChannel serverSocketChannel = null; private final IReplicationManager replicationManager; private final AsterixReplicationProperties replicationProperties; - private final IAsterixAppRuntimeContextProvider asterixAppRuntimeContextProvider; - private final static int INTIAL_BUFFER_SIZE = StorageUtil.getSizeInBytes(4, StorageUnit.KILOBYTE); + private final IAsterixAppRuntimeContextProvider appContextProvider; + private static final int INTIAL_BUFFER_SIZE = StorageUtil.getSizeInBytes(4, StorageUnit.KILOBYTE); private final LinkedBlockingQueue<LSMComponentLSNSyncTask> lsmComponentRemoteLSN2LocalLSNMappingTaskQ; private final LinkedBlockingQueue<LogRecord> pendingNotificationRemoteLogsQ; private final Map<String, LSMComponentProperties> lsmComponentId2PropertiesMap; @@ -110,10 +111,10 @@ this.replicaResourcesManager = (ReplicaResourcesManager) replicaResoucesManager; this.replicationManager = replicationManager; this.replicationProperties = replicationProperties; - this.asterixAppRuntimeContextProvider = asterixAppRuntimeContextProvider; - lsmComponentRemoteLSN2LocalLSNMappingTaskQ = new LinkedBlockingQueue<LSMComponentLSNSyncTask>(); + this.appContextProvider = asterixAppRuntimeContextProvider; + lsmComponentRemoteLSN2LocalLSNMappingTaskQ = new LinkedBlockingQueue<>(); pendingNotificationRemoteLogsQ = new LinkedBlockingQueue<>(); - lsmComponentId2PropertiesMap = new ConcurrentHashMap<String, LSMComponentProperties>(); + lsmComponentId2PropertiesMap = new ConcurrentHashMap<>(); replicaUniqueLSN2RemoteMapping = new ConcurrentHashMap<>(); lsmComponentLSNMappingService = new LSMComponentsSyncService(); replicationNotifier = new ReplicationNotifier(); @@ -166,16 +167,17 @@ //clean up when all the LSM component files have been received. if (remainingFile == 0) { - if (lsmCompProp.getOpType() == LSMOperationType.FLUSH && lsmCompProp.getReplicaLSN() != null) { - //if this LSN wont be used for any other index, remove it - if (replicaUniqueLSN2RemoteMapping.containsKey(lsmCompProp.getNodeUniqueLSN())) { - int remainingIndexes = replicaUniqueLSN2RemoteMapping - .get(lsmCompProp.getNodeUniqueLSN()).numOfFlushedIndexes.decrementAndGet(); - if (remainingIndexes == 0) { - //Note: there is a chance that this is never deleted because some index in the dataset was not flushed because it is empty. - //This could be solved by passing only the number of successfully flushed indexes - replicaUniqueLSN2RemoteMapping.remove(lsmCompProp.getNodeUniqueLSN()); - } + if (lsmCompProp.getOpType() == LSMOperationType.FLUSH && lsmCompProp.getReplicaLSN() != null + && replicaUniqueLSN2RemoteMapping.containsKey(lsmCompProp.getNodeUniqueLSN())) { + int remainingIndexes = replicaUniqueLSN2RemoteMapping + .get(lsmCompProp.getNodeUniqueLSN()).numOfFlushedIndexes.decrementAndGet(); + if (remainingIndexes == 0) { + /** + * Note: there is a chance that this will never be removed because some + * index in the dataset was not flushed because it is empty. This could + * be solved by passing only the number of successfully flushed indexes. + */ + replicaUniqueLSN2RemoteMapping.remove(lsmCompProp.getNodeUniqueLSN()); } } @@ -242,20 +244,23 @@ case FLUSH_INDEX: handleFlushIndex(); break; - default: { + default: throw new IllegalStateException("Unknown replication request"); - } } replicationFunction = ReplicationProtocol.getRequestType(socketChannel, inBuffer); } } catch (Exception e) { - e.printStackTrace(); + if (LOGGER.isLoggable(Level.WARNING)) { + LOGGER.log(Level.WARNING, "Unexpectedly error during replication.", e); + } } finally { if (socketChannel.isOpen()) { try { socketChannel.close(); } catch (IOException e) { - e.printStackTrace(); + if (LOGGER.isLoggable(Level.WARNING)) { + LOGGER.log(Level.WARNING, "Filed to close replication socket.", e); + } } } } @@ -263,15 +268,17 @@ private void handleFlushIndex() throws IOException { inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer); - //1. read which indexes are requested to be flushed from remote replica + //read which indexes are requested to be flushed from remote replica ReplicaIndexFlushRequest request = ReplicationProtocol.readReplicaIndexFlushRequest(inBuffer); Set<Long> requestedIndexesToBeFlushed = request.getLaggingRescouresIds(); - //2. check which indexes can be flushed (open indexes) and which cannot be flushed (closed or have empty memory component) - IDatasetLifecycleManager datasetLifeCycleManager = asterixAppRuntimeContextProvider - .getDatasetLifecycleManager(); + /** + * check which indexes can be flushed (open indexes) and which cannot be + * flushed (closed or have empty memory component). + */ + IDatasetLifecycleManager datasetLifeCycleManager = appContextProvider.getDatasetLifecycleManager(); List<IndexInfo> openIndexesInfo = datasetLifeCycleManager.getOpenIndexesInfo(); - Set<Integer> datasetsToForceFlush = new HashSet<Integer>(); + Set<Integer> datasetsToForceFlush = new HashSet<>(); for (IndexInfo iInfo : openIndexesInfo) { if (requestedIndexesToBeFlushed.contains(iInfo.getResourceId())) { AbstractLSMIOOperationCallback ioCallback = (AbstractLSMIOOperationCallback) iInfo.getIndex() @@ -281,7 +288,10 @@ //remove index to indicate that it will be flushed requestedIndexesToBeFlushed.remove(iInfo.getResourceId()); } else if (!((AbstractLSMIndex) iInfo.getIndex()).isCurrentMutableComponentEmpty()) { - //if an index has something to be flushed, then the request to flush it will succeed and we need to schedule it to be flushed. + /** + * if an index has something to be flushed, then the request to flush it + * will succeed and we need to schedule it to be flushed. + */ datasetsToForceFlush.add(iInfo.getDatasetId()); //remove index to indicate that it will be flushed requestedIndexesToBeFlushed.remove(iInfo.getResourceId()); @@ -289,13 +299,13 @@ } } - //3. force flush datasets requested to be flushed + //schedule flush for datasets requested to be flushed for (int datasetId : datasetsToForceFlush) { datasetLifeCycleManager.flushDataset(datasetId, true); } //the remaining indexes in the requested set are those which cannot be flushed. - //4. respond back to the requester that those indexes cannot be flushed + //respond back to the requester that those indexes cannot be flushed ReplicaIndexFlushRequest laggingIndexesResponse = new ReplicaIndexFlushRequest(requestedIndexesToBeFlushed); outBuffer = ReplicationProtocol.writeGetReplicaIndexFlushRequest(outBuffer, laggingIndexesResponse); NetworkingUtil.transferBufferToChannel(socketChannel, outBuffer); @@ -363,7 +373,7 @@ List<String> filesList; Set<String> replicaIds = request.getReplicaIds(); Set<String> requesterExistingFiles = request.getExistingFiles(); - Map<String, ClusterPartition[]> nodePartitions = ((IAsterixPropertiesProvider) asterixAppRuntimeContextProvider + Map<String, ClusterPartition[]> nodePartitions = ((IAsterixPropertiesProvider) appContextProvider .getAppContext()).getMetadataProperties().getNodePartitions(); for (String replicaId : replicaIds) { //get replica partitions @@ -414,50 +424,70 @@ } private void handleLogReplication() throws IOException, ACIDException { - inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer); + //set initial buffer size to a log buffer page size + inBuffer = ByteBuffer.allocate(logManager.getLogPageSize()); + while (true) { + //read a batch of logs + inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer); + //check if it is end of handshake (a single byte log) + if (inBuffer.remaining() == LOG_REPLICATION_END_HANKSHAKE_LOG_SIZE) { + break; + } - //Deserialize log - remoteLog.readRemoteLog(inBuffer, false); - remoteLog.setLogSource(LogSource.REMOTE); - - switch (remoteLog.getLogType()) { - case LogType.UPDATE: - case LogType.ENTITY_COMMIT: - case LogType.UPSERT_ENTITY_COMMIT: - //if the log partition belongs to a partitions hosted on this node, replicate it - if (nodeHostedPartitions.contains(remoteLog.getResourcePartition())) { - logManager.log(remoteLog); - } - break; - case LogType.JOB_COMMIT: - case LogType.ABORT: - LogRecord jobTerminationLog = new LogRecord(); - TransactionUtil.formJobTerminateLogRecord(jobTerminationLog, remoteLog.getJobId(), - remoteLog.getLogType() == LogType.JOB_COMMIT); - jobTerminationLog.setReplicationThread(this); - jobTerminationLog.setLogSource(LogSource.REMOTE); - logManager.log(jobTerminationLog); - break; - case LogType.FLUSH: - //store mapping information for flush logs to use them in incoming LSM components. - RemoteLogMapping flushLogMap = new RemoteLogMapping(); - flushLogMap.setRemoteNodeID(remoteLog.getNodeId()); - flushLogMap.setRemoteLSN(remoteLog.getLSN()); - logManager.log(remoteLog); - //the log LSN value is updated by logManager.log(.) to a local value - flushLogMap.setLocalLSN(remoteLog.getLSN()); - flushLogMap.numOfFlushedIndexes.set(remoteLog.getNumOfFlushedIndexes()); - replicaUniqueLSN2RemoteMapping.put(flushLogMap.getNodeUniqueLSN(), flushLogMap); - synchronized (flushLogslock) { - flushLogslock.notify(); - } - break; - default: - LOGGER.severe("Unsupported LogType: " + remoteLog.getLogType()); + processLogsBatch(inBuffer); } } - //this method is called sequentially by LogPage (notifyReplicationTerminator) for JOB_COMMIT and JOB_ABORT log types. + private void processLogsBatch(ByteBuffer buffer) throws ACIDException { + while (buffer.hasRemaining()) { + //get rid of log size + inBuffer.getInt(); + //Deserialize log + remoteLog.readRemoteLog(inBuffer); + remoteLog.setLogSource(LogSource.REMOTE); + + switch (remoteLog.getLogType()) { + case LogType.UPDATE: + case LogType.ENTITY_COMMIT: + case LogType.UPSERT_ENTITY_COMMIT: + //if the log partition belongs to a partitions hosted on this node, replicate it + if (nodeHostedPartitions.contains(remoteLog.getResourcePartition())) { + logManager.log(remoteLog); + } + break; + case LogType.JOB_COMMIT: + case LogType.ABORT: + LogRecord jobTerminationLog = new LogRecord(); + TransactionUtil.formJobTerminateLogRecord(jobTerminationLog, remoteLog.getJobId(), + remoteLog.getLogType() == LogType.JOB_COMMIT); + jobTerminationLog.setReplicationThread(this); + jobTerminationLog.setLogSource(LogSource.REMOTE); + logManager.log(jobTerminationLog); + break; + case LogType.FLUSH: + //store mapping information for flush logs to use them in incoming LSM components. + RemoteLogMapping flushLogMap = new RemoteLogMapping(); + flushLogMap.setRemoteNodeID(remoteLog.getNodeId()); + flushLogMap.setRemoteLSN(remoteLog.getLSN()); + logManager.log(remoteLog); + //the log LSN value is updated by logManager.log(.) to a local value + flushLogMap.setLocalLSN(remoteLog.getLSN()); + flushLogMap.numOfFlushedIndexes.set(remoteLog.getNumOfFlushedIndexes()); + replicaUniqueLSN2RemoteMapping.put(flushLogMap.getNodeUniqueLSN(), flushLogMap); + synchronized (flushLogslock) { + flushLogslock.notify(); + } + break; + default: + LOGGER.severe("Unsupported LogType: " + remoteLog.getLogType()); + } + } + } + + /** + * this method is called sequentially by LogPage (notifyReplicationTerminator) + * for JOB_COMMIT and JOB_ABORT log types. + */ @Override public void notifyLogReplicationRequester(LogRecord logRecord) { pendingNotificationRemoteLogsQ.offer(logRecord); @@ -480,24 +510,27 @@ try { LogRecord logRecord = pendingNotificationRemoteLogsQ.take(); //send ACK to requester - try { - logRecord.getReplicationThread().getReplicationClientSocket().socket().getOutputStream() - .write((localNodeID + ReplicationProtocol.JOB_REPLICATION_ACK + logRecord.getJobId() - + System.lineSeparator()).getBytes()); - } catch (IOException e) { - LOGGER.severe("Failed to send job replication ACK " + logRecord.getLogRecordForDisplay()); + logRecord.getReplicationThread().getReplicationClientSocket().socket().getOutputStream() + .write((localNodeID + ReplicationProtocol.JOB_REPLICATION_ACK + logRecord.getJobId() + + System.lineSeparator()).getBytes()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (IOException e) { + if (LOGGER.isLoggable(Level.WARNING)) { + LOGGER.log(Level.WARNING, "Failed to send job replication ACK", e); } - } catch (InterruptedException e1) { - LOGGER.severe("ReplicationNotifier Thread interrupted."); } } } } /** - * This thread is responsible for synchronizing the LSN of the received LSM components to a local LSN. + * This thread is responsible for synchronizing the LSN of + * the received LSM components to a local LSN. */ private class LSMComponentsSyncService extends Thread { + private static final int BULKLOAD_LSN = 0; + @Override public void run() { Thread.currentThread().setName("LSMComponentsSyncService Thread"); @@ -506,23 +539,24 @@ try { LSMComponentLSNSyncTask syncTask = lsmComponentRemoteLSN2LocalLSNMappingTaskQ.take(); LSMComponentProperties lsmCompProp = lsmComponentId2PropertiesMap.get(syncTask.getComponentId()); - try { - syncLSMComponentFlushLSN(lsmCompProp, syncTask); - updateLSMComponentRemainingFiles(lsmCompProp.getComponentId()); - } catch (Exception e) { - e.printStackTrace(); - } + syncLSMComponentFlushLSN(lsmCompProp, syncTask); + updateLSMComponentRemainingFiles(lsmCompProp.getComponentId()); } catch (InterruptedException e) { - //ignore + Thread.currentThread().interrupt(); + } catch (Exception e) { + if (LOGGER.isLoggable(Level.SEVERE)) { + LOGGER.log(Level.SEVERE, "Unexpected exception during LSN synchronization", e); + } } + } } private void syncLSMComponentFlushLSN(LSMComponentProperties lsmCompProp, LSMComponentLSNSyncTask syncTask) - throws Exception { + throws InterruptedException, IOException { long remoteLSN = lsmCompProp.getOriginalLSN(); //LSN=0 (bulkload) does not need to be updated and there is no flush log corresponding to it - if (remoteLSN == 0) { + if (remoteLSN == BULKLOAD_LSN) { //since this is the first LSM component of this index, //then set the mapping in the LSN_MAP to the current log LSN because //no other log could've been received for this index since bulkload replication is synchronous. @@ -536,16 +570,21 @@ if (lsmCompProp.getOpType() == LSMOperationType.FLUSH) { //need to look up LSN mapping from memory RemoteLogMapping remoteLogMap = replicaUniqueLSN2RemoteMapping.get(lsmCompProp.getNodeUniqueLSN()); - if (remoteLogMap == null) { + //wait until flush log arrives, and verify the LSM component file still exists + //The component file could be deleted if its NC fails. + while (remoteLogMap == null && Files.exists(path)) { synchronized (flushLogslock) { - remoteLogMap = replicaUniqueLSN2RemoteMapping.get(lsmCompProp.getNodeUniqueLSN()); - //wait until flush log arrives, and verify the LSM component file still exists - //The component file could be deleted if its NC fails. - while (remoteLogMap == null && Files.exists(path)) { - flushLogslock.wait(); - remoteLogMap = replicaUniqueLSN2RemoteMapping.get(lsmCompProp.getNodeUniqueLSN()); - } + flushLogslock.wait(); } + remoteLogMap = replicaUniqueLSN2RemoteMapping.get(lsmCompProp.getNodeUniqueLSN()); + } + + /** + * file has been deleted due to its remote primary replica failure + * before its LSN could've been synchronized. + */ + if (remoteLogMap == null) { + return; } lsmCompProp.setReplicaLSN(remoteLogMap.getLocalLSN()); } else if (lsmCompProp.getOpType() == LSMOperationType.MERGE) { @@ -554,13 +593,14 @@ .getReplicaIndexLSNMap(lsmCompProp.getReplicaComponentPath(replicaResourcesManager)); Long mappingLSN = lsmMap.get(lsmCompProp.getOriginalLSN()); if (mappingLSN == null) { - /* - * this shouldn't happen unless this node just recovered and the first component it received - * is a merged component due to an on-going merge operation while recovery on the remote replica. - * In this case, we use the current append LSN since no new records exist for this index, - * otherwise they would've been flushed. - * This could be prevented by waiting for any IO to finish on the remote replica during recovery. - * + /** + * this shouldn't happen unless this node just recovered and + * the first component it received is a merged component due + * to an on-going merge operation while recovery on the remote + * replica. In this case, we use the current append LSN since + * no new records exist for this index, otherwise they would've + * been flushed. This could be prevented by waiting for any IO + * to finish on the remote replica during recovery. */ mappingLSN = logManager.getAppendLSN(); } @@ -569,9 +609,10 @@ } if (Files.notExists(path)) { - /* - * This could happen when a merged component arrives and deletes the flushed - * component (which we are trying to update) before its flush log arrives since logs and components are received + /** + * This could happen when a merged component arrives and deletes + * the flushed component (which we are trying to update) before + * its flush log arrives since logs and components are received * on different threads. */ return; @@ -594,4 +635,4 @@ } } } -} \ No newline at end of file +} diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java index ee872a5..5ba6ad2 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java @@ -71,7 +71,7 @@ import org.apache.asterix.replication.functions.ReplicationProtocol; import org.apache.asterix.replication.functions.ReplicationProtocol.ReplicationRequestType; import org.apache.asterix.replication.logging.ReplicationLogBuffer; -import org.apache.asterix.replication.logging.ReplicationLogFlusher; +import org.apache.asterix.replication.logging.TxnLogReplicator; import org.apache.asterix.replication.storage.LSMComponentProperties; import org.apache.asterix.replication.storage.LSMIndexFileProperties; import org.apache.asterix.replication.storage.ReplicaResourcesManager; @@ -86,6 +86,8 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexReplicationJob; import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex; +import org.apache.hyracks.util.StorageUtil; +import org.apache.hyracks.util.StorageUtil.StorageUnit; /** * This class is used to process replication jobs and maintain remote replicas states @@ -93,7 +95,7 @@ public class ReplicationManager implements IReplicationManager { private static final Logger LOGGER = Logger.getLogger(ReplicationManager.class.getName()); - private final int INITIAL_REPLICATION_FACTOR = 1; + private static final int INITIAL_REPLICATION_FACTOR = 1; private final String nodeId; private ExecutorService replicationListenerThreads; private final Map<Integer, Set<String>> jobCommitAcks; @@ -114,7 +116,7 @@ private final AtomicBoolean replicationSuspended; private AtomicBoolean terminateJobsReplication; private AtomicBoolean jobsReplicationSuspended; - private final static int INITIAL_BUFFER_SIZE = 4000; //4KB + private static final int INITIAL_BUFFER_SIZE = StorageUtil.getSizeInBytes(4, StorageUnit.KILOBYTE); private final Set<String> shuttingDownReplicaIds; //replication threads private ReplicationJobsProccessor replicationJobsProcessor; @@ -128,9 +130,10 @@ private LinkedBlockingQueue<ReplicationLogBuffer> emptyLogBuffersQ; private LinkedBlockingQueue<ReplicationLogBuffer> pendingFlushLogBuffersQ; protected ReplicationLogBuffer currentTxnLogBuffer; - private ReplicationLogFlusher txnlogsReplicator; + private TxnLogReplicator txnlogReplicator; private Future<? extends Object> txnLogReplicatorTask; - private Map<String, SocketChannel> logsReplicaSockets = null; + private SocketChannel[] logsRepSockets; + private final ByteBuffer txnLogsBatchSizeBuffer = ByteBuffer.allocate(Integer.BYTES); //TODO this class needs to be refactored by moving its private classes to separate files //and possibly using MessageBroker to send/receive remote replicas events. @@ -143,15 +146,15 @@ this.asterixAppRuntimeContextProvider = asterixAppRuntimeContextProvider; this.hostIPAddressFirstOctet = replicationProperties.getReplicaIPAddress(nodeId).substring(0, 3); this.logManager = logManager; - replicationJobsQ = new LinkedBlockingQueue<IReplicationJob>(); - replicaEventsQ = new LinkedBlockingQueue<ReplicaEvent>(); + replicationJobsQ = new LinkedBlockingQueue<>(); + replicaEventsQ = new LinkedBlockingQueue<>(); terminateJobsReplication = new AtomicBoolean(false); jobsReplicationSuspended = new AtomicBoolean(true); replicationSuspended = new AtomicBoolean(true); - replicas = new HashMap<String, Replica>(); - jobCommitAcks = new ConcurrentHashMap<Integer, Set<String>>(); - replicationJobsPendingAcks = new ConcurrentHashMap<Integer, ILogRecord>(); - shuttingDownReplicaIds = new HashSet<String>(); + replicas = new HashMap<>(); + jobCommitAcks = new ConcurrentHashMap<>(); + replicationJobsPendingAcks = new ConcurrentHashMap<>(); + shuttingDownReplicaIds = new HashSet<>(); dataBuffer = ByteBuffer.allocate(INITIAL_BUFFER_SIZE); //Used as async listeners from replicas @@ -179,13 +182,14 @@ clientPartitonsSet.addAll(clientPartitions); replica2PartitionsMap.put(replica.getId(), clientPartitonsSet); } - int numLogBuffers = logManager.getNumLogPages(); - emptyLogBuffersQ = new LinkedBlockingQueue<ReplicationLogBuffer>(numLogBuffers); - pendingFlushLogBuffersQ = new LinkedBlockingQueue<ReplicationLogBuffer>(numLogBuffers); + int numLogBuffers = replicationProperties.getLogBufferNumOfPages(); + emptyLogBuffersQ = new LinkedBlockingQueue<>(numLogBuffers); + pendingFlushLogBuffersQ = new LinkedBlockingQueue<>(numLogBuffers); - int logBufferSize = logManager.getLogPageSize(); + int logBufferSize = replicationProperties.getLogBufferPageSize(); for (int i = 0; i < numLogBuffers; i++) { - emptyLogBuffersQ.offer(new ReplicationLogBuffer(this, logBufferSize)); + emptyLogBuffersQ + .offer(new ReplicationLogBuffer(this, logBufferSize, replicationProperties.getLogBatchSize())); } } @@ -200,7 +204,7 @@ try { replicationSuspended.wait(); } catch (InterruptedException e) { - //ignore + Thread.currentThread().interrupt(); } } } @@ -209,16 +213,12 @@ } @Override - public void replicateLog(ILogRecord logRecord) { + public void replicateLog(ILogRecord logRecord) throws InterruptedException { if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT) { //if replication is suspended, wait until it is resumed. while (replicationSuspended.get()) { synchronized (replicationSuspended) { - try { - replicationSuspended.wait(); - } catch (InterruptedException e) { - //ignore - } + replicationSuspended.wait(); } } Set<String> replicaIds = Collections.synchronizedSet(new HashSet<String>()); @@ -232,29 +232,23 @@ protected void getAndInitNewLargePage(int pageSize) { // for now, alloc a new buffer for each large page // TODO: consider pooling large pages - currentTxnLogBuffer = new ReplicationLogBuffer(this, pageSize); - currentTxnLogBuffer.setReplicationSockets(logsReplicaSockets); + currentTxnLogBuffer = new ReplicationLogBuffer(this, pageSize, replicationProperties.getLogBufferPageSize()); pendingFlushLogBuffersQ.offer(currentTxnLogBuffer); } - protected void getAndInitNewPage() { + protected void getAndInitNewPage() throws InterruptedException { currentTxnLogBuffer = null; while (currentTxnLogBuffer == null) { - try { - currentTxnLogBuffer = emptyLogBuffersQ.take(); - } catch (InterruptedException e) { - //ignore - } + currentTxnLogBuffer = emptyLogBuffersQ.take(); } currentTxnLogBuffer.reset(); - currentTxnLogBuffer.setReplicationSockets(logsReplicaSockets); pendingFlushLogBuffersQ.offer(currentTxnLogBuffer); } - private synchronized void appendToLogBuffer(ILogRecord logRecord) { + private synchronized void appendToLogBuffer(ILogRecord logRecord) throws InterruptedException { if (!currentTxnLogBuffer.hasSpace(logRecord)) { currentTxnLogBuffer.isFull(true); - if (logRecord.getLogSize() > logManager.getLogPageSize()) { + if (logRecord.getLogSize() > getLogPageSize()) { getAndInitNewLargePage(logRecord.getLogSize()); } else { getAndInitNewPage(); @@ -326,7 +320,10 @@ long fileSize = fileChannel.size(); if (LSMComponentJob != null) { - //since this is LSM_COMPONENT REPLICATE job, the job will contain only the component being replicated. + /** + * since this is LSM_COMPONENT REPLICATE job, the job will contain + * only the component being replicated. + */ ILSMComponent diskComponent = LSMComponentJob.getLSMIndexOperationContext() .getComponentsToBeReplicated().get(0); long LSNByteOffset = AsterixLSMIndexUtil.getComponentFileLSNOffset( @@ -362,7 +359,7 @@ } } } catch (IOException e) { - reportFailedReplica(entry.getKey()); + handleReplicationFailure(socketChannel, e); iterator.remove(); } finally { requestBuffer.position(0); @@ -392,7 +389,7 @@ waitForResponse(socketChannel, responseBuffer); } } catch (IOException e) { - reportFailedReplica(entry.getKey()); + handleReplicationFailure(socketChannel, e); iterator.remove(); } finally { requestBuffer.position(0); @@ -458,7 +455,7 @@ } /** - * Suspends proccessing replication jobs. + * Suspends processing replication jobs/logs. * * @param force * a flag indicates if replication should be suspended right away or when the pending jobs are completed. @@ -477,60 +474,134 @@ try { jobsReplicationSuspended.wait(); } catch (InterruptedException e) { - //ignore + Thread.currentThread().interrupt(); } } } } //suspend logs replication - if (txnlogsReplicator != null) { - terminateTxnLogsReplicator(); + if (txnlogReplicator != null) { + endTxnLogReplicationHandshake(); } } /** * Opens a new connection with Active remote replicas and starts a listen thread per connection. */ - private void establishTxnLogsReplicationConnection() { - logsReplicaSockets = getActiveRemoteReplicasSockets(); + private void establishTxnLogReplicationHandshake() { + Map<String, SocketChannel> activeRemoteReplicasSockets = getActiveRemoteReplicasSockets(); + logsRepSockets = new SocketChannel[activeRemoteReplicasSockets.size()]; + int i = 0; //start a listener thread per connection - for (Entry<String, SocketChannel> entry : logsReplicaSockets.entrySet()) { + for (Entry<String, SocketChannel> entry : activeRemoteReplicasSockets.entrySet()) { + logsRepSockets[i] = entry.getValue(); replicationListenerThreads .execute(new TxnLogsReplicationResponseListener(entry.getKey(), entry.getValue())); + i++; + } + + /** + * establish log replication handshake + */ + ByteBuffer handshakeBuffer = ByteBuffer.allocate(ReplicationProtocol.REPLICATION_REQUEST_TYPE_SIZE) + .putInt(ReplicationProtocol.ReplicationRequestType.REPLICATE_LOG.ordinal()); + handshakeBuffer.flip(); + //send handshake request + for (SocketChannel replicaSocket : logsRepSockets) { + try { + NetworkingUtil.transferBufferToChannel(replicaSocket, handshakeBuffer); + } catch (IOException e) { + handleReplicationFailure(replicaSocket, e); + } finally { + handshakeBuffer.position(0); + } } } + private void handleReplicationFailure(SocketChannel socketChannel, Throwable t) { + if (LOGGER.isLoggable(Level.WARNING)) { + LOGGER.log(Level.WARNING, "Could not complete replication request.", t); + } + if (socketChannel.isOpen()) { + try { + socketChannel.close(); + } catch (IOException e) { + if (LOGGER.isLoggable(Level.WARNING)) { + LOGGER.log(Level.WARNING, "Could not close socket.", e); + } + } + } + reportFailedReplica(getReplicaIdBySocket(socketChannel)); + } + /** - * Stops ReplicationFlusherThread and closes the sockets used to replicate logs. + * Stops TxnLogReplicator and closes the sockets used to replicate logs. */ - private void terminateTxnLogsReplicator() { - LOGGER.log(Level.INFO, "Terminating ReplicationLogFlusher thread ..."); - txnlogsReplicator.terminate(); + private void endTxnLogReplicationHandshake() { + if (LOGGER.isLoggable(Level.INFO)) { + LOGGER.info("Terminating TxnLogReplicator thread ..."); + } + txnlogReplicator.terminate(); try { txnLogReplicatorTask.get(); } catch (ExecutionException | InterruptedException e) { - LOGGER.log(Level.WARNING, "RepicationLogFlusher thread terminated abnormally"); - e.printStackTrace(); + if (LOGGER.isLoggable(Level.SEVERE)) { + LOGGER.log(Level.SEVERE, "TxnLogReplicator thread terminated abnormally", e); + } } - LOGGER.log(Level.INFO, "LogFlusher thread is terminated."); - if (logsReplicaSockets != null) { - //wait for any ACK to arrive before closing sockets. + if (LOGGER.isLoggable(Level.INFO)) { + LOGGER.info("TxnLogReplicator thread was terminated."); + } + + /** + * End log replication handshake (by sending a dummy log with a single byte) + */ + ByteBuffer endLogRepHandshake = ByteBuffer.allocate(Integer.SIZE + 1).putInt(1).put((byte) 0); + endLogRepHandshake.flip(); + for (SocketChannel replicaSocket : logsRepSockets) { + try { + NetworkingUtil.transferBufferToChannel(replicaSocket, endLogRepHandshake); + } catch (IOException e) { + handleReplicationFailure(replicaSocket, e); + } finally { + endLogRepHandshake.position(0); + } + } + + //wait for any ACK to arrive before closing sockets. + if (logsRepSockets != null) { synchronized (jobCommitAcks) { - while (jobCommitAcks.size() != 0) { - try { + try { + while (jobCommitAcks.size() != 0) { jobCommitAcks.wait(); - } catch (InterruptedException e) { - //ignore } + } catch (InterruptedException e) { + if (LOGGER.isLoggable(Level.SEVERE)) { + LOGGER.log(Level.SEVERE, "Interrupted while waiting for jobs ACK", e); + } + Thread.currentThread().interrupt(); } } - - //close log replication sockets - closeReplicaSockets(logsReplicaSockets); - logsReplicaSockets = null; } + + /** + * Close log replication sockets + */ + ByteBuffer goodbyeBuffer = ReplicationProtocol.getGoodbyeBuffer(); + for (SocketChannel replicaSocket : logsRepSockets) { + try { + //send goodbye to remote replica + NetworkingUtil.transferBufferToChannel(replicaSocket, goodbyeBuffer); + replicaSocket.close(); + } catch (IOException e) { + handleReplicationFailure(replicaSocket, e); + } finally { + goodbyeBuffer.position(0); + } + } + logsRepSockets = null; } /** @@ -567,14 +638,7 @@ try { NetworkingUtil.transferBufferToChannel(clientSocket, requestBuffer); } catch (IOException e) { - if (clientSocket.isOpen()) { - try { - clientSocket.close(); - } catch (IOException e2) { - e2.printStackTrace(); - } - } - reportFailedReplica(replicaSocket.getKey()); + handleReplicationFailure(clientSocket, e); iterator.remove(); } finally { requestBuffer.position(0); @@ -600,7 +664,7 @@ try { clientSocket.close(); } catch (IOException e) { - e.printStackTrace(); + handleReplicationFailure(clientSocket, e); } } } @@ -636,7 +700,7 @@ try { Thread.sleep(1000); } catch (InterruptedException e) { - e.printStackTrace(); + Thread.currentThread().interrupt(); } } } @@ -651,8 +715,10 @@ * The new state of the replica. * @param suspendReplication * a flag indicating whether to suspend replication on state change or not. + * @throws InterruptedException */ - public synchronized void updateReplicaState(String replicaId, ReplicaState newState, boolean suspendReplication) { + public synchronized void updateReplicaState(String replicaId, ReplicaState newState, boolean suspendReplication) + throws InterruptedException { Replica replica = replicas.get(replicaId); if (replica.getState() == newState) { @@ -680,10 +746,8 @@ if (newState == ReplicaState.ACTIVE) { replicationFactor++; - } else if (newState == ReplicaState.DEAD) { - if (replicationFactor > INITIAL_REPLICATION_FACTOR) { - replicationFactor--; - } + } else if (newState == ReplicaState.DEAD && replicationFactor > INITIAL_REPLICATION_FACTOR) { + replicationFactor--; } LOGGER.log(Level.WARNING, "Replica " + replicaId + " state changed to: " + newState.name() @@ -702,22 +766,24 @@ * The remote replica id the ACK received from. */ private void addAckToJob(int jobId, String replicaId) { - //add ACK to the job - if (jobCommitAcks.containsKey(jobId)) { - Set<String> replicaIds = jobCommitAcks.get(jobId); - replicaIds.add(replicaId); - } else { - throw new IllegalStateException("Job ID not found in pending job commits " + jobId); - } + synchronized (jobCommitAcks) { + //add ACK to the job + if (jobCommitAcks.containsKey(jobId)) { + Set<String> replicaIds = jobCommitAcks.get(jobId); + replicaIds.add(replicaId); + } else { + if (LOGGER.isLoggable(Level.WARNING)) { + LOGGER.warning("Invalid job replication ACK received for jobId(" + jobId + ")"); + } + return; + } - //if got ACKs from all remote replicas, notify pending jobs if any - if (jobCommitAcks.get(jobId).size() == replicationFactor) { - synchronized (replicationJobsPendingAcks) { - if (replicationJobsPendingAcks.containsKey(jobId)) { - ILogRecord pendingLog = replicationJobsPendingAcks.get(jobId); - synchronized (pendingLog) { - pendingLog.notify(); - } + //if got ACKs from all remote replicas, notify pending jobs if any + + if (jobCommitAcks.get(jobId).size() == replicationFactor && replicationJobsPendingAcks.containsKey(jobId)) { + ILogRecord pendingLog = replicationJobsPendingAcks.get(jobId); + synchronized (pendingLog) { + pendingLog.notify(); } } } @@ -725,26 +791,25 @@ @Override public boolean hasBeenReplicated(ILogRecord logRecord) { - if (jobCommitAcks.containsKey(logRecord.getJobId())) { - //check if all ACKs have been received - if (jobCommitAcks.get(logRecord.getJobId()).size() == replicationFactor) { - jobCommitAcks.remove(logRecord.getJobId()); + int jobId = logRecord.getJobId(); + if (jobCommitAcks.containsKey(jobId)) { + synchronized (jobCommitAcks) { + //check if all ACKs have been received + if (jobCommitAcks.get(jobId).size() == replicationFactor) { + jobCommitAcks.remove(jobId); - if (replicationJobsPendingAcks.containsKey(logRecord.getJobId())) { - replicationJobsPendingAcks.remove(logRecord); - } + //remove from pending jobs if exists + replicationJobsPendingAcks.remove(jobId); - //notify any threads waiting for all jobs to finish - if (jobCommitAcks.size() == 0) { - synchronized (jobCommitAcks) { + //notify any threads waiting for all jobs to finish + if (jobCommitAcks.size() == 0) { jobCommitAcks.notifyAll(); } + return true; + } else { + replicationJobsPendingAcks.putIfAbsent(jobId, logRecord); + return false; } - - return true; - } else { - replicationJobsPendingAcks.putIfAbsent(logRecord.getJobId(), logRecord); - return false; } } @@ -753,13 +818,16 @@ } private Map<String, SocketChannel> getActiveRemoteReplicasSockets() { - Map<String, SocketChannel> replicaNodesSockets = new HashMap<String, SocketChannel>(); + Map<String, SocketChannel> replicaNodesSockets = new HashMap<>(); for (Replica replica : replicas.values()) { if (replica.getState() == ReplicaState.ACTIVE) { try { SocketChannel sc = getReplicaSocket(replica.getId()); replicaNodesSockets.put(replica.getId(), sc); } catch (IOException e) { + if (LOGGER.isLoggable(Level.WARNING)) { + LOGGER.log(Level.WARNING, "Could not get replica socket", e); + } reportFailedReplica(replica.getId()); } } @@ -776,7 +844,7 @@ * @throws IOException */ private SocketChannel getReplicaSocket(String replicaId) throws IOException { - Replica replica = replicas.get(replicaId); + Replica replica = replicationProperties.getReplicaById(replicaId); SocketChannel sc = SocketChannel.open(); sc.configureBlocking(true); InetSocketAddress address = replica.getAddress(replicationProperties); @@ -786,7 +854,7 @@ @Override public Set<String> getDeadReplicasIds() { - Set<String> replicasIds = new HashSet<String>(); + Set<String> replicasIds = new HashSet<>(); for (Replica replica : replicas.values()) { if (replica.getState() == ReplicaState.DEAD) { replicasIds.add(replica.getNode().getId()); @@ -797,7 +865,7 @@ @Override public Set<String> getActiveReplicasIds() { - Set<String> replicasIds = new HashSet<String>(); + Set<String> replicasIds = new HashSet<>(); for (Replica replica : replicas.values()) { if (replica.getState() == ReplicaState.ACTIVE) { replicasIds.add(replica.getNode().getId()); @@ -823,40 +891,35 @@ /** * Called during NC shutdown to notify remote replicas about the shutdown - * and wait for remote replicas shutdown notification then closes the local replication channel. + * and wait for remote replicas shutdown notification then closes the local + * replication channel. */ @Override public void stop(boolean dumpState, OutputStream ouputStream) throws IOException { - try { - //stop replication thread afters all jobs/logs have been processed - suspendReplication(false); - //send shutdown event to remote replicas - sendShutdownNotifiction(); - //wait until all shutdown events come from all remote replicas - synchronized (shuttingDownReplicaIds) { - while (!shuttingDownReplicaIds.containsAll(getActiveReplicasIds())) { - try { - shuttingDownReplicaIds.wait(1000); - } catch (InterruptedException e) { - e.printStackTrace(); - } + //stop replication thread afters all jobs/logs have been processed + suspendReplication(false); + //send shutdown event to remote replicas + sendShutdownNotifiction(); + //wait until all shutdown events come from all remote replicas + synchronized (shuttingDownReplicaIds) { + while (!shuttingDownReplicaIds.containsAll(getActiveReplicasIds())) { + try { + shuttingDownReplicaIds.wait(1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } } - LOGGER.log(Level.INFO, "Got shutdown notification from all remote replicas"); - //close replication channel - asterixAppRuntimeContextProvider.getAppContext().getReplicationChannel().close(); - - LOGGER.log(Level.INFO, "Replication manager stopped."); - } catch (Exception e) { - e.printStackTrace(); } + LOGGER.log(Level.INFO, "Got shutdown notification from all remote replicas"); + //close replication channel + asterixAppRuntimeContextProvider.getAppContext().getReplicationChannel().close(); + + LOGGER.log(Level.INFO, "Replication manager stopped."); } @Override public void reportReplicaEvent(ReplicaEvent event) { - synchronized (replicaEventsQ) { - replicaEventsQ.offer(event); - } + replicaEventsQ.offer(event); } /** @@ -867,6 +930,9 @@ */ public void reportFailedReplica(String replicaId) { Replica replica = replicas.get(replicaId); + if (replica == null) { + return; + } if (replica.getState() == ReplicaState.DEAD) { return; } @@ -878,16 +944,28 @@ reportReplicaEvent(event); } + private String getReplicaIdBySocket(SocketChannel socketChannel) { + InetSocketAddress socketAddress = NetworkingUtil.getSocketAddress(socketChannel); + for (Replica replica : replicas.values()) { + InetSocketAddress replicaAddress = replica.getAddress(replicationProperties); + if (replicaAddress.getHostName().equals(socketAddress.getHostName()) + && replicaAddress.getPort() == socketAddress.getPort()) { + return replica.getId(); + } + } + return null; + } + @Override - public void startReplicationThreads() { + public void startReplicationThreads() throws InterruptedException { replicationJobsProcessor = new ReplicationJobsProccessor(); //start/continue processing jobs/logs - if (logsReplicaSockets == null) { - establishTxnLogsReplicationConnection(); + if (logsRepSockets == null) { + establishTxnLogReplicationHandshake(); getAndInitNewPage(); - txnlogsReplicator = new ReplicationLogFlusher(emptyLogBuffersQ, pendingFlushLogBuffersQ); - txnLogReplicatorTask = asterixAppRuntimeContextProvider.getThreadExecutor().submit(txnlogsReplicator); + txnlogReplicator = new TxnLogReplicator(emptyLogBuffersQ, pendingFlushLogBuffersQ); + txnLogReplicatorTask = asterixAppRuntimeContextProvider.getThreadExecutor().submit(txnlogReplicator); } replicationJobsProcessor.start(); @@ -936,7 +1014,11 @@ ReplicationProtocol.sendGoodbye(socketChannel); } - //4. update the LSN_MAP for indexes that were not flushed to the current append LSN to indicate no operations happend. + /** + * 4. update the LSN_MAP for indexes that were not flushed + * to the current append LSN to indicate no operations happened + * since the checkpoint start. + */ if (laggingIndexesResponse != null) { for (Long resouceId : laggingIndexesResponse.getLaggingRescouresIds()) { String indexPath = laggingIndexes.get(resouceId); @@ -955,7 +1037,7 @@ long maxRemoteLSN = 0; ReplicationProtocol.writeGetReplicaMaxLSNRequest(dataBuffer); - Map<String, SocketChannel> replicaSockets = new HashMap<String, SocketChannel>(); + Map<String, SocketChannel> replicaSockets = new HashMap<>(); try { for (String replicaId : remoteReplicas) { replicaSockets.put(replicaId, getReplicaSocket(replicaId)); @@ -1037,7 +1119,42 @@ } public int getLogPageSize() { - return logManager.getLogPageSize(); + return replicationProperties.getLogBufferPageSize(); + } + + @Override + public void replicateTxnLogBatch(final ByteBuffer buffer) { + //if replication is suspended, wait until it is resumed + try { + while (replicationSuspended.get()) { + synchronized (replicationSuspended) { + replicationSuspended.wait(); + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + //prepare the batch size buffer + txnLogsBatchSizeBuffer.clear(); + txnLogsBatchSizeBuffer.putInt(buffer.remaining()); + txnLogsBatchSizeBuffer.flip(); + + buffer.mark(); + for (SocketChannel replicaSocket : logsRepSockets) { + try { + //send batch size + NetworkingUtil.transferBufferToChannel(replicaSocket, txnLogsBatchSizeBuffer); + //send log + NetworkingUtil.transferBufferToChannel(replicaSocket, buffer); + } catch (IOException e) { + handleReplicationFailure(replicaSocket, e); + } finally { + txnLogsBatchSizeBuffer.position(0); + buffer.reset(); + } + } + //move the buffer position to the sent limit + buffer.position(buffer.limit()); } //supporting classes @@ -1068,12 +1185,12 @@ break; } } catch (InterruptedException e) { - //ignore + Thread.currentThread().interrupt(); } } } - public void handleReplicaFailure(String replicaId) { + public void handleReplicaFailure(String replicaId) throws InterruptedException { Replica replica = replicas.get(replicaId); if (replica.getState() == ReplicaState.DEAD) { @@ -1127,12 +1244,16 @@ processJob(job, replicaSockets, reusableBuffer); //if no more jobs to process, close sockets - if (replicationJobsQ.size() == 0) { + if (replicationJobsQ.isEmpty()) { LOGGER.log(Level.INFO, "No pending replication jobs. Closing connections to replicas"); closeSockets(); } - } catch (Exception e) { - e.printStackTrace(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (IOException e) { + if (LOGGER.isLoggable(Level.WARNING)) { + LOGGER.log(Level.WARNING, "Couldn't complete processing replication job", e); + } } } @@ -1169,25 +1290,25 @@ Thread.currentThread().setName("TxnLogs Replication Listener Thread"); LOGGER.log(Level.INFO, "Started listening on socket: " + replicaSocket.socket().getRemoteSocketAddress()); - try { - BufferedReader incomingResponse = new BufferedReader( - new InputStreamReader(replicaSocket.socket().getInputStream())); - String responseLine = ""; + try (BufferedReader incomingResponse = new BufferedReader( + new InputStreamReader(replicaSocket.socket().getInputStream()))) { while (true) { - responseLine = incomingResponse.readLine(); + String responseLine = incomingResponse.readLine(); if (responseLine == null) { break; } //read ACK for job commit log - String replicaId = ReplicationProtocol.getNodeIdFromLogAckMessage(responseLine); + String ackFrom = ReplicationProtocol.getNodeIdFromLogAckMessage(responseLine); int jobId = ReplicationProtocol.getJobIdFromLogAckMessage(responseLine); - addAckToJob(jobId, replicaId); + addAckToJob(jobId, ackFrom); } - } catch (AsynchronousCloseException e1) { - LOGGER.log(Level.INFO, "Replication listener stopped for remote replica: " + replicaId); - } catch (IOException e2) { - reportFailedReplica(replicaId); + } catch (AsynchronousCloseException e) { + if (LOGGER.isLoggable(Level.INFO)) { + LOGGER.log(Level.INFO, "Replication listener stopped for remote replica: " + replicaId, e); + } + } catch (IOException e) { + handleReplicationFailure(replicaSocket, e); } } } -} +} \ No newline at end of file diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java index 47e60b2..4da5fd4 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java @@ -63,8 +63,8 @@ Set<String> nodes = replicationProperties.getNodeReplicationClients(localNodeId); - Map<String, Set<String>> recoveryCandidates = new HashMap<String, Set<String>>(); - Map<String, Integer> candidatesScore = new HashMap<String, Integer>(); + Map<String, Set<String>> recoveryCandidates = new HashMap<>(); + Map<String, Integer> candidatesScore = new HashMap<>(); //2. identify which nodes has backup per lost node data for (String node : nodes) { @@ -80,7 +80,7 @@ } //no active replicas to recover from - if (locations.size() == 0) { + if (locations.isEmpty()) { throw new IllegalStateException("Could not find any ACTIVE replica to recover " + node + " data."); } @@ -94,7 +94,7 @@ recoveryCandidates.put(node, locations); } - Map<String, Set<String>> recoveryList = new HashMap<String, Set<String>>(); + Map<String, Set<String>> recoveryList = new HashMap<>(); //3. find best candidate to recover from per lost replica data for (Entry<String, Set<String>> entry : recoveryCandidates.entrySet()) { @@ -113,7 +113,7 @@ if (recoveryList.containsKey(winner)) { recoveryList.get(winner).add(entry.getKey()); } else { - Set<String> nodesToRecover = new HashSet<String>(); + Set<String> nodesToRecover = new HashSet<>(); nodesToRecover.add(entry.getKey()); recoveryList.put(winner, nodesToRecover); } @@ -196,15 +196,16 @@ } break; } catch (IOException e) { - e.printStackTrace(); - LOGGER.log(Level.WARNING, "Failed during remote recovery. Attempting again..."); + if (LOGGER.isLoggable(Level.WARNING)) { + LOGGER.log(Level.WARNING, "Failed during remote recovery. Attempting again...", e); + } maxRecoveryAttempts--; } } } @Override - public void completeFailbackProcess() throws IOException { + public void completeFailbackProcess() throws IOException, InterruptedException { ILogManager logManager = runtimeContext.getTransactionSubsystem().getLogManager(); ReplicaResourcesManager replicaResourcesManager = (ReplicaResourcesManager) runtimeContext .getReplicaResourcesManager(); @@ -237,7 +238,9 @@ * in case of failure during failback completion process we need to construct a new plan * and get all the files from the start since the remote replicas will change in the new plan. */ - e.printStackTrace(); + if (LOGGER.isLoggable(Level.WARNING)) { + LOGGER.log(Level.WARNING, "Failed during completing failback. Restarting failback process...", e); + } startFailbackProcess(); } diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java index 872adcd..1245674 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java @@ -76,9 +76,9 @@ appendOffset = 0; flushOffset = 0; isLastPage = false; - syncCommitQ = new LinkedBlockingQueue<ILogRecord>(logPageSize / ILogRecord.JOB_TERMINATE_LOG_SIZE); - flushQ = new LinkedBlockingQueue<ILogRecord>(); - remoteJobsQ = new LinkedBlockingQueue<ILogRecord>(); + syncCommitQ = new LinkedBlockingQueue<>(logPageSize / ILogRecord.JOB_TERMINATE_LOG_SIZE); + flushQ = new LinkedBlockingQueue<>(); + remoteJobsQ = new LinkedBlockingQueue<>(); reusableDsId = new DatasetId(-1); reusableJobId = new JobId(-1); } @@ -113,7 +113,7 @@ @Override public void appendWithReplication(ILogRecord logRecord, long appendLSN) { - logRecord.writeLogRecord(appendBuffer, appendLSN); + logRecord.writeLogRecord(appendBuffer); if (logRecord.getLogSource() == LogSource.LOCAL && logRecord.getLogType() != LogType.FLUSH && logRecord.getLogType() != LogType.WAIT) { @@ -135,10 +135,9 @@ logRecord.isFlushed(false); flushQ.offer(logRecord); } - } else if (logRecord.getLogSource() == LogSource.REMOTE) { - if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT) { - remoteJobsQ.offer(logRecord); - } + } else if (logRecord.getLogSource() == LogSource.REMOTE + && (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT)) { + remoteJobsQ.offer(logRecord); } this.notify(); } @@ -347,11 +346,7 @@ IReplicationThread replicationThread = logRecord.getReplicationThread(); if (replicationThread != null) { - try { - replicationThread.notifyLogReplicationRequester(logRecord); - } catch (Exception e) { - e.printStackTrace(); - } + replicationThread.notifyLogReplicationRequester(logRecord); } } diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java index cacd036..0c4cb88 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java @@ -37,10 +37,6 @@ @Override public void log(ILogRecord logRecord) throws ACIDException { - if (logRecord.getLogSize() > logPageSize) { - throw new IllegalStateException(); - } - //only locally generated logs should be replicated logRecord.setReplicated(logRecord.getLogSource() == LogSource.LOCAL && logRecord.getLogType() != LogType.WAIT); @@ -58,7 +54,11 @@ syncAppendToLogTail(logRecord); if (logRecord.isReplicated()) { - replicationManager.replicateLog(logRecord); + try { + replicationManager.replicateLog(logRecord); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } } if (logRecord.getLogSource() == LogSource.LOCAL) { @@ -69,7 +69,7 @@ try { logRecord.wait(); } catch (InterruptedException e) { - //ignore + Thread.currentThread().interrupt(); } } @@ -79,7 +79,7 @@ try { logRecord.wait(); } catch (InterruptedException e) { - //ignore + Thread.currentThread().interrupt(); } } } -- To view, visit https://asterix-gerrit.ics.uci.edu/883 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I25b5b84eba0cd41ac8e87e71368072879fcf8582 Gerrit-PatchSet: 5 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: abdullah alamoudi <[email protected]>
