http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/TxnAckTracker.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/TxnAckTracker.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/TxnAckTracker.java new file mode 100644 index 0000000..23e6f3c --- /dev/null +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/TxnAckTracker.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.replication.logging; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.asterix.common.replication.IReplicationDestination; +import org.apache.asterix.common.transactions.ILogRecord; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class TxnAckTracker { + + private static final Logger LOGGER = LogManager.getLogger(); + private final Map<Long, TxnAck> txnsAcks = new HashMap<>(); + + public synchronized void track(ILogRecord logRecord, Set<IReplicationDestination> replicas) { + if (replicas.isEmpty()) { + logRecord.setReplicated(true); + return; + } + final long txnId = logRecord.getTxnId(); + //TODO use LSN instead of txnId when all logs have LSN + txnsAcks.put(txnId, new TxnAck(logRecord, replicas)); + } + + public synchronized void ack(long txnId, IReplicationDestination replica) { + if (!txnsAcks.containsKey(txnId)) { + LOGGER.warn("Received ack for unknown txn {}", txnId); + return; + } + TxnAck txnAcks = txnsAcks.get(txnId); + txnAcks.ack(replica); + if (txnAcks.allAcked()) { + txnsAcks.remove(txnId); + } + } + + public synchronized void unregister(IReplicationDestination replica) { + // assume the ack was received from leaving replicas + final HashSet<Long> pendingTxn = new HashSet<>(txnsAcks.keySet()); + pendingTxn.forEach(txnId -> ack(txnId, replica)); + } +}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/TxnLogUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/TxnLogUtil.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/TxnLogUtil.java deleted file mode 100644 index f51a64d..0000000 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/TxnLogUtil.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.replication.logging; - -public class TxnLogUtil { - - private TxnLogUtil() { - //prevent util class construction - } - - /** - * @param nodeId - * @param LSN - * @return Concatenation of nodeId and LSN - */ - public static String getNodeUniqueLSN(String nodeId, long LSN) { - return nodeId + LSN; - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java new file mode 100644 index 0000000..b6f752f --- /dev/null +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.replication.management; + +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.exceptions.ReplicationException; +import org.apache.asterix.common.replication.IPartitionReplica; +import org.apache.asterix.common.replication.IReplicationDestination; +import org.apache.asterix.common.replication.IReplicationManager; +import org.apache.asterix.common.replication.IReplicationStrategy; +import org.apache.asterix.common.storage.DatasetResourceReference; +import org.apache.asterix.common.storage.ResourceReference; +import org.apache.asterix.replication.api.PartitionReplica; +import org.apache.asterix.replication.api.ReplicationDestination; +import org.apache.asterix.replication.sync.IndexSynchronizer; +import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.replication.IReplicationJob; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexReplicationJob; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class IndexReplicationManager { + + private static final Logger LOGGER = LogManager.getLogger(); + private final IReplicationManager replicationManager; + private final Set<ReplicationDestination> destinations = new HashSet<>(); + private final LinkedBlockingQueue<IReplicationJob> replicationJobsQ = new LinkedBlockingQueue<>(); + private final IReplicationStrategy replicationStrategy; + private final PersistentLocalResourceRepository resourceRepository; + private final INcApplicationContext appCtx; + private final Object transferLock = new Object(); + private final Set<ReplicationDestination> failedDest = new HashSet<>(); + + public IndexReplicationManager(INcApplicationContext appCtx, IReplicationManager replicationManager) { + this.appCtx = appCtx; + this.replicationManager = replicationManager; + this.resourceRepository = (PersistentLocalResourceRepository) appCtx.getLocalResourceRepository(); + replicationStrategy = replicationManager.getReplicationStrategy(); + appCtx.getThreadExecutor().execute(new ReplicationJobsProcessor()); + } + + public void register(ReplicationDestination dest) { + synchronized (transferLock) { + LOGGER.info(() -> "register " + dest); + destinations.add(dest); + failedDest.remove(dest); + } + } + + public void unregister(IReplicationDestination dest) { + synchronized (transferLock) { + LOGGER.info(() -> "unregister " + dest); + destinations.remove(dest); + failedDest.remove(dest); + } + } + + private void handleFailure(ReplicationDestination dest, Exception e) { + synchronized (transferLock) { + if (failedDest.contains(dest)) { + return; + } + LOGGER.error("Replica failed", e); + if (destinations.contains(dest)) { + failedDest.add(dest); + } + replicationManager.notifyFailure(dest, e); + } + } + + public void accept(IReplicationJob job) { + if (job.getExecutionType() == IReplicationJob.ReplicationExecutionType.ASYNC) { + replicationJobsQ.add(job); + return; + } + process(job); + } + + private void process(IReplicationJob job) { + try { + if (skip(job)) { + return; + } + synchronized (transferLock) { + if (destinations.isEmpty()) { + return; + } + final IndexSynchronizer synchronizer = new IndexSynchronizer(job, appCtx); + final int indexPartition = getJobPartition(job); + for (ReplicationDestination dest : destinations) { + try { + Optional<IPartitionReplica> partitionReplica = dest.getPartitionReplica(indexPartition); + if (!partitionReplica.isPresent()) { + continue; + } + PartitionReplica replica = (PartitionReplica) partitionReplica.get(); + synchronizer.sync(replica); + } catch (Exception e) { + handleFailure(dest, e); + } + } + closeChannels(); + } + } finally { + afterReplication(job); + } + } + + private boolean skip(IReplicationJob job) { + try { + final DatasetResourceReference indexFileRef = + resourceRepository.getLocalResourceReference(job.getAnyFile()); + return !replicationStrategy.isMatch(indexFileRef.getDatasetId()); + } catch (HyracksDataException e) { + throw new IllegalStateException("Couldn't find resource for " + job.getAnyFile(), e); + } + } + + private int getJobPartition(IReplicationJob job) { + return ResourceReference.of(job.getAnyFile()).getPartitionNum(); + } + + private void closeChannels() { + if (!replicationJobsQ.isEmpty()) { + return; + } + LOGGER.log(Level.INFO, "No pending replication jobs. Closing connections to replicas"); + for (ReplicationDestination dest : destinations) { + dest.getReplicas().stream().map(PartitionReplica.class::cast).forEach(PartitionReplica::close); + } + } + + private static void afterReplication(IReplicationJob job) { + try { + if (job.getOperation() == IReplicationJob.ReplicationOperation.REPLICATE + && job instanceof ILSMIndexReplicationJob) { + ((ILSMIndexReplicationJob) job).endReplication(); + } + } catch (HyracksDataException e) { + throw new ReplicationException(e); + } + } + + private class ReplicationJobsProcessor implements Runnable { + + @Override + public void run() { + Thread.currentThread().setName(ReplicationJobsProcessor.class.getSimpleName()); + while (!Thread.currentThread().isInterrupted()) { + try { + final IReplicationJob job = replicationJobsQ.take(); + process(job); + } catch (InterruptedException e) { + LOGGER.warn(() -> ReplicationJobsProcessor.class.getSimpleName() + " interrupted.", e); + Thread.currentThread().interrupt(); + } + } + LOGGER.warn("{} stopped.", ReplicationJobsProcessor.class.getSimpleName()); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java new file mode 100644 index 0000000..b28c2f7 --- /dev/null +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.replication.management; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousCloseException; +import java.nio.channels.SocketChannel; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.config.ReplicationProperties; +import org.apache.asterix.common.exceptions.ReplicationException; +import org.apache.asterix.common.replication.IReplicationDestination; +import org.apache.asterix.common.replication.IReplicationManager; +import org.apache.asterix.common.transactions.ILogRecord; +import org.apache.asterix.common.transactions.LogType; +import org.apache.asterix.replication.api.ReplicationDestination; +import org.apache.asterix.replication.logging.ReplicationLogBuffer; +import org.apache.asterix.replication.logging.TxnAckTracker; +import org.apache.asterix.replication.logging.TxnLogReplicator; +import org.apache.asterix.replication.messaging.ReplicateLogsTask; +import org.apache.asterix.replication.messaging.ReplicationProtocol; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class LogReplicationManager { + + private static final Logger LOGGER = LogManager.getLogger(); + private final LinkedBlockingQueue<ReplicationLogBuffer> emptyLogBuffersQ; + private final LinkedBlockingQueue<ReplicationLogBuffer> pendingFlushLogBuffersQ; + private final ByteBuffer txnLogsBatchSizeBuffer = ByteBuffer.allocate(Integer.BYTES); + private final Map<ReplicationDestination, SocketChannel> destinations = new HashMap<>(); + private final IReplicationManager replicationManager; + private final Executor executor; + private final TxnAckTracker ackTracker = new TxnAckTracker(); + private final Set<SocketChannel> failedSockets = new HashSet<>(); + private final Object transferLock = new Object(); + private final INcApplicationContext appCtx; + private final int logPageSize; + private final int logBatchSize; + private ReplicationLogBuffer currentTxnLogBuffer; + private SocketChannel[] destSockets; + + public LogReplicationManager(INcApplicationContext appCtx, IReplicationManager replicationManager) { + this.appCtx = appCtx; + this.replicationManager = replicationManager; + final ReplicationProperties replicationProperties = appCtx.getReplicationProperties(); + logPageSize = replicationProperties.getLogBufferPageSize(); + logBatchSize = replicationProperties.getLogBatchSize(); + executor = appCtx.getThreadExecutor(); + emptyLogBuffersQ = new LinkedBlockingQueue<>(); + pendingFlushLogBuffersQ = new LinkedBlockingQueue<>(); + initBuffers(replicationProperties.getLogBufferNumOfPages()); + TxnLogReplicator txnlogReplicator = new TxnLogReplicator(emptyLogBuffersQ, pendingFlushLogBuffersQ); + ((ExecutorService) executor).submit(txnlogReplicator); + } + + private void initBuffers(int buffers) { + for (int i = 0; i < buffers; i++) { + emptyLogBuffersQ.add(new ReplicationLogBuffer(this, logPageSize, logBatchSize)); + } + try { + getAndInitNewPage(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new ReplicationException(e); + } + } + + public void register(ReplicationDestination dest) { + synchronized (transferLock) { + synchronized (destinations) { + if (destinations.containsKey(dest)) { + return; + } + LOGGER.info(() -> "register " + dest); + SocketChannel socketChannel = dest.getLogReplicationChannel(); + handshake(dest, socketChannel); + destinations.put(dest, socketChannel); + failedSockets.remove(socketChannel); + destSockets = destinations.values().toArray(new SocketChannel[destinations.size()]); + } + } + } + + public void unregister(IReplicationDestination dest) { + synchronized (transferLock) { + synchronized (destinations) { + if (!destinations.containsKey(dest)) { + return; + } + LOGGER.info(() -> "unregister " + dest); + ackTracker.unregister(dest); + SocketChannel destSocket = destinations.remove(dest); + failedSockets.remove(destSocket); + destSockets = destinations.values().toArray(new SocketChannel[destinations.size()]); + endReplication(destSocket); + } + } + } + + public void replicate(ILogRecord logRecord) throws InterruptedException { + if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT) { + synchronized (destinations) { + ackTracker.track(logRecord, new HashSet<>(destinations.keySet())); + } + } + appendToLogBuffer(logRecord); + } + + public void transferBatch(final ByteBuffer buffer) { + // prepare the batch size buffer + txnLogsBatchSizeBuffer.clear(); + txnLogsBatchSizeBuffer.putInt(buffer.remaining()); + txnLogsBatchSizeBuffer.flip(); + + buffer.mark(); + synchronized (transferLock) { + if (destSockets != null) { + for (SocketChannel replicaSocket : destSockets) { + try { + // send batch size then the batch itself + NetworkingUtil.transferBufferToChannel(replicaSocket, txnLogsBatchSizeBuffer); + NetworkingUtil.transferBufferToChannel(replicaSocket, buffer); + } catch (IOException e) { + handleFailure(replicaSocket, e); + } finally { + txnLogsBatchSizeBuffer.position(0); + buffer.reset(); + } + } + } + } + // move the buffer position to the sent limit + buffer.position(buffer.limit()); + } + + public int getLogPageSize() { + return logPageSize; + } + + private synchronized void appendToLogBuffer(ILogRecord logRecord) throws InterruptedException { + if (!currentTxnLogBuffer.hasSpace(logRecord)) { + currentTxnLogBuffer.isFull(true); + if (logRecord.getLogSize() > logPageSize) { + getAndInitNewLargePage(logRecord.getLogSize()); + } else { + getAndInitNewPage(); + } + } + currentTxnLogBuffer.append(logRecord); + } + + private void getAndInitNewPage() throws InterruptedException { + currentTxnLogBuffer = null; + while (currentTxnLogBuffer == null) { + currentTxnLogBuffer = emptyLogBuffersQ.take(); + } + currentTxnLogBuffer.reset(); + pendingFlushLogBuffersQ.add(currentTxnLogBuffer); + } + + private void getAndInitNewLargePage(int pageSize) { + // for now, alloc a new buffer for each large page + currentTxnLogBuffer = new ReplicationLogBuffer(this, pageSize, logBatchSize); + pendingFlushLogBuffersQ.add(currentTxnLogBuffer); + } + + private void handshake(ReplicationDestination dest, SocketChannel socketChannel) { + final String nodeId = appCtx.getServiceContext().getNodeId(); + final ReplicateLogsTask task = new ReplicateLogsTask(nodeId); + ReplicationProtocol.sendTo(socketChannel, task, null); + executor.execute(new TxnAckListener(dest, socketChannel)); + } + + private void endReplication(SocketChannel socketChannel) { + if (socketChannel.isConnected()) { + // end log replication (by sending a dummy log with a single byte) + final ByteBuffer endLogRepBuffer = ReplicationProtocol.getEndLogReplicationBuffer(); + try { + NetworkingUtil.transferBufferToChannel(socketChannel, endLogRepBuffer); + } catch (IOException e) { + LOGGER.warn("Failed to end txn log", e); + } + } + } + + private synchronized void handleFailure(SocketChannel replicaSocket, IOException e) { + if (failedSockets.contains(replicaSocket)) { + return; + } + LOGGER.error("Replica failed", e); + failedSockets.add(replicaSocket); + Optional<ReplicationDestination> socketDest = + destinations.entrySet().stream().filter(entry -> entry.getValue().equals(replicaSocket)) + .map(Map.Entry::getKey).findFirst(); + socketDest.ifPresent(dest -> replicationManager.notifyFailure(dest, e)); + } + + private class TxnAckListener implements Runnable { + private final ReplicationDestination dest; + private final SocketChannel replicaSocket; + + TxnAckListener(ReplicationDestination dest, SocketChannel replicaSocket) { + this.dest = dest; + this.replicaSocket = replicaSocket; + } + + @Override + public void run() { + Thread.currentThread().setName("TxnAckListener (" + dest + ")"); + LOGGER.info("Started listening on socket: {}", dest); + try (BufferedReader incomingResponse = new BufferedReader( + new InputStreamReader(replicaSocket.socket().getInputStream()))) { + while (true) { + final String response = incomingResponse.readLine(); + if (response == null) { + handleFailure(replicaSocket, new IOException("Unexpected response from replica " + dest)); + break; + } + // read ACK + final int txnId = ReplicationProtocol.getTxnIdFromLogAckMessage(response); + ackTracker.ack(txnId, dest); + } + } catch (AsynchronousCloseException e) { + LOGGER.debug(() -> "Stopped listening on socket:" + dest, e); + } catch (IOException e) { + handleFailure(replicaSocket, e); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaStateChecker.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaStateChecker.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaStateChecker.java deleted file mode 100644 index e260de5..0000000 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaStateChecker.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.replication.management; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.nio.channels.SocketChannel; -import java.nio.channels.UnresolvedAddressException; -import java.util.concurrent.Callable; - -import org.apache.asterix.common.replication.Replica; -import org.apache.asterix.common.replication.Replica.ReplicaState; -import org.apache.asterix.replication.functions.ReplicationProtocol; - -public class ReplicaStateChecker implements Callable<Void> { - - private final int WAIT_TIME = 2000; - private final Replica replica; - private final int replicationTimeOut; - private final ReplicationManager replicationManager; - private final boolean suspendReplication; - - public ReplicaStateChecker(Replica replica, int replicationTimeOut, ReplicationManager replicationManager, - boolean suspendReplication) { - this.replica = replica; - this.replicationTimeOut = replicationTimeOut; - this.replicationManager = replicationManager; - this.suspendReplication = suspendReplication; - } - - @Override - public Void call() throws Exception { - Thread.currentThread().setName("ReplicaConnector Thread"); - - long startTime = System.currentTimeMillis(); - InetSocketAddress replicaAddress = InetSocketAddress.createUnresolved(replica.getClusterIp(), - replica.getPort()); - - while (true) { - try (SocketChannel connection = SocketChannel.open()) { - connection.configureBlocking(true); - connection.connect(new InetSocketAddress(replicaAddress.getHostString(), replicaAddress.getPort())); - ByteBuffer buffer = ReplicationProtocol.getGoodbyeBuffer(); - connection.write(buffer); - replicationManager.updateReplicaState(replica.getId(), ReplicaState.ACTIVE, suspendReplication); - return null; - } catch (IOException | UnresolvedAddressException e) { - Thread.sleep(WAIT_TIME); - - //check if connection to replica timed out - if (((System.currentTimeMillis() - startTime) / 1000) >= replicationTimeOut) { - replicationManager.updateReplicaState(replica.getId(), ReplicaState.DEAD, suspendReplication); - return null; - } - } - } - } - -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java index c0863e2..f19f1cd 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java @@ -18,79 +18,23 @@ */ package org.apache.asterix.replication.management; -import java.io.File; import java.io.IOException; -import java.io.RandomAccessFile; import java.net.InetAddress; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousCloseException; -import java.nio.channels.FileChannel; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.function.Predicate; -import java.util.stream.Collectors; -import org.apache.asterix.common.api.IDatasetLifecycleManager; import org.apache.asterix.common.api.INcApplicationContext; -import org.apache.asterix.common.cluster.ClusterPartition; import org.apache.asterix.common.config.ReplicationProperties; -import org.apache.asterix.common.context.IndexInfo; -import org.apache.asterix.common.dataflow.DatasetLocalResource; -import org.apache.asterix.common.exceptions.ACIDException; -import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback; -import org.apache.asterix.common.replication.IReplicaResourcesManager; import org.apache.asterix.common.replication.IReplicationChannel; -import org.apache.asterix.common.replication.IReplicationManager; -import org.apache.asterix.common.replication.IReplicationStrategy; -import org.apache.asterix.common.replication.IReplicationThread; -import org.apache.asterix.common.replication.Replica; -import org.apache.asterix.common.replication.ReplicaEvent; -import org.apache.asterix.common.storage.DatasetResourceReference; -import org.apache.asterix.common.storage.IIndexCheckpointManager; -import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider; -import org.apache.asterix.common.storage.ResourceReference; -import org.apache.asterix.common.transactions.IAppRuntimeContextProvider; -import org.apache.asterix.common.transactions.ILogManager; -import org.apache.asterix.common.transactions.LogRecord; -import org.apache.asterix.common.transactions.LogSource; -import org.apache.asterix.common.transactions.LogType; -import org.apache.asterix.common.utils.StoragePathUtil; -import org.apache.asterix.common.utils.TransactionUtil; -import org.apache.asterix.replication.functions.ReplicaFilesRequest; -import org.apache.asterix.replication.functions.ReplicaIndexFlushRequest; -import org.apache.asterix.replication.functions.ReplicationProtocol; -import org.apache.asterix.replication.functions.ReplicationProtocol.ReplicationRequestType; -import org.apache.asterix.replication.logging.RemoteLogMapping; -import org.apache.asterix.replication.messaging.CheckpointPartitionIndexesTask; -import org.apache.asterix.replication.messaging.DeleteFileTask; -import org.apache.asterix.replication.messaging.PartitionResourcesListTask; -import org.apache.asterix.replication.messaging.ReplicateFileTask; -import org.apache.asterix.replication.storage.LSMComponentLSNSyncTask; -import org.apache.asterix.replication.storage.LSMComponentProperties; -import org.apache.asterix.replication.storage.LSMIndexFileProperties; -import org.apache.asterix.replication.storage.ReplicaResourcesManager; -import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; -import org.apache.asterix.transaction.management.service.logging.LogBuffer; -import org.apache.hyracks.api.application.INCServiceContext; -import org.apache.hyracks.control.common.controllers.NCConfig; -import org.apache.hyracks.control.nc.NodeControllerService; +import org.apache.asterix.replication.api.IReplicaTask; +import org.apache.asterix.replication.api.IReplicationWorker; +import org.apache.asterix.replication.logging.RemoteLogsProcessor; +import org.apache.asterix.replication.messaging.ReplicationProtocol; +import org.apache.asterix.replication.messaging.ReplicationProtocol.ReplicationRequestType; import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; -import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex; -import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager; -import org.apache.hyracks.storage.common.LocalResource; -import org.apache.hyracks.util.StorageUtil; -import org.apache.hyracks.util.StorageUtil.StorageUnit; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -101,200 +45,74 @@ import org.apache.logging.log4j.Logger; public class ReplicationChannel extends Thread implements IReplicationChannel { private static final Logger LOGGER = LogManager.getLogger(); - private static final int LOG_REPLICATION_END_HANKSHAKE_LOG_SIZE = 1; - private final ExecutorService replicationThreads; - private final String localNodeID; - private final ILogManager logManager; - private final ReplicaResourcesManager replicaResourcesManager; private ServerSocketChannel serverSocketChannel = null; - private final IReplicationManager replicationManager; - private final ReplicationProperties replicationProperties; - private final IAppRuntimeContextProvider appContextProvider; - private static final int INTIAL_BUFFER_SIZE = StorageUtil.getIntSizeInBytes(4, StorageUnit.KILOBYTE); - private final LinkedBlockingQueue<LSMComponentLSNSyncTask> lsmComponentRemoteLSN2LocalLSNMappingTaskQ; - private final LinkedBlockingQueue<LogRecord> pendingNotificationRemoteLogsQ; - private final Map<String, LSMComponentProperties> lsmComponentId2PropertiesMap; - private final Map<Long, RemoteLogMapping> localLsn2RemoteMapping; - private final Map<String, RemoteLogMapping> replicaUniqueLSN2RemoteMapping; - private final LSMComponentsSyncService lsmComponentLSNMappingService; - private final ReplicationNotifier replicationNotifier; - private final Object flushLogslock = new Object(); - private final IDatasetLifecycleManager dsLifecycleManager; - private final PersistentLocalResourceRepository localResourceRep; - private final IReplicationStrategy replicationStrategy; - private final NCConfig ncConfig; - private Set nodeHostedPartitions; - private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider; private final INcApplicationContext appCtx; + private final RemoteLogsProcessor logsProcessor; - public ReplicationChannel(String nodeId, ReplicationProperties replicationProperties, ILogManager logManager, - IReplicaResourcesManager replicaResoucesManager, IReplicationManager replicationManager, - INCServiceContext ncServiceContext, IAppRuntimeContextProvider asterixAppRuntimeContextProvider, - IReplicationStrategy replicationStrategy) { - this.logManager = logManager; - this.localNodeID = nodeId; - this.replicaResourcesManager = (ReplicaResourcesManager) replicaResoucesManager; - this.replicationManager = replicationManager; - this.replicationProperties = replicationProperties; - this.appContextProvider = asterixAppRuntimeContextProvider; - this.dsLifecycleManager = asterixAppRuntimeContextProvider.getDatasetLifecycleManager(); - this.localResourceRep = (PersistentLocalResourceRepository) asterixAppRuntimeContextProvider - .getLocalResourceRepository(); - this.replicationStrategy = replicationStrategy; - this.ncConfig = ((NodeControllerService) ncServiceContext.getControllerService()).getConfiguration(); - lsmComponentRemoteLSN2LocalLSNMappingTaskQ = new LinkedBlockingQueue<>(); - pendingNotificationRemoteLogsQ = new LinkedBlockingQueue<>(); - lsmComponentId2PropertiesMap = new ConcurrentHashMap<>(); - replicaUniqueLSN2RemoteMapping = new ConcurrentHashMap<>(); - localLsn2RemoteMapping = new ConcurrentHashMap<>(); - lsmComponentLSNMappingService = new LSMComponentsSyncService(); - replicationNotifier = new ReplicationNotifier(); - replicationThreads = Executors.newCachedThreadPool(ncServiceContext.getThreadFactory()); - Map<String, ClusterPartition[]> nodePartitions = asterixAppRuntimeContextProvider.getAppContext() - .getMetadataProperties().getNodePartitions(); - Set<String> nodeReplicationClients = replicationStrategy.getRemotePrimaryReplicas(nodeId).stream() - .map(Replica::getId).collect(Collectors.toSet()); - List<Integer> clientsPartitions = new ArrayList<>(); - for (String clientId : nodeReplicationClients) { - for (ClusterPartition clusterPartition : nodePartitions.get(clientId)) { - clientsPartitions.add(clusterPartition.getPartitionId()); - } - } - nodeHostedPartitions = new HashSet<>(clientsPartitions.size()); - nodeHostedPartitions.addAll(clientsPartitions); - this.indexCheckpointManagerProvider = - ((INcApplicationContext) ncServiceContext.getApplicationContext()).getIndexCheckpointManagerProvider(); - this.appCtx = (INcApplicationContext) ncServiceContext.getApplicationContext(); + public ReplicationChannel(INcApplicationContext appCtx) { + this.appCtx = appCtx; + logsProcessor = new RemoteLogsProcessor(appCtx); } @Override public void run() { - Thread.currentThread().setName("Replication Channel Thread"); - - String nodeIP = replicationProperties.getNodeIpFromId(localNodeID); - int dataPort = ncConfig.getReplicationPublicPort(); + final String nodeId = appCtx.getServiceContext().getNodeId(); + Thread.currentThread().setName(nodeId + " Replication Channel Thread"); + final ReplicationProperties replicationProperties = appCtx.getReplicationProperties(); + final String nodeIP = replicationProperties.getReplicationAddress(); + final int dataPort = replicationProperties.getReplicationPort(); try { serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(true); - InetSocketAddress replicationChannelAddress = new InetSocketAddress(InetAddress.getByName(nodeIP), - dataPort); + InetSocketAddress replicationChannelAddress = + new InetSocketAddress(InetAddress.getByName(nodeIP), dataPort); serverSocketChannel.socket().bind(replicationChannelAddress); - lsmComponentLSNMappingService.start(); - replicationNotifier.start(); LOGGER.log(Level.INFO, "opened Replication Channel @ IP Address: " + nodeIP + ":" + dataPort); - - //start accepting replication requests while (serverSocketChannel.isOpen()) { SocketChannel socketChannel = serverSocketChannel.accept(); socketChannel.configureBlocking(true); //start a new thread to handle the request - replicationThreads.execute(new ReplicationThread(socketChannel)); + appCtx.getThreadExecutor().execute(new ReplicationWorker(socketChannel)); } } catch (AsynchronousCloseException e) { - LOGGER.warn("Replication channel closed", e); + LOGGER.debug("Replication channel closed", e); } catch (IOException e) { - throw new IllegalStateException( - "Could not open replication channel @ IP Address: " + nodeIP + ":" + dataPort, e); + throw new IllegalStateException("Failed to bind replication channel @ " + nodeIP + ":" + dataPort, e); } } - private void updateLSMComponentRemainingFiles(String lsmComponentId) throws IOException { - LSMComponentProperties lsmCompProp = lsmComponentId2PropertiesMap.get(lsmComponentId); - int remainingFile = lsmCompProp.markFileComplete(); - - //clean up when all the LSM component files have been received. - if (remainingFile == 0) { - if (lsmCompProp.getOpType() == LSMOperationType.FLUSH && lsmCompProp.getReplicaLSN() != null - && replicaUniqueLSN2RemoteMapping.containsKey(lsmCompProp.getNodeUniqueLSN())) { - int remainingIndexes = replicaUniqueLSN2RemoteMapping - .get(lsmCompProp.getNodeUniqueLSN()).numOfFlushedIndexes.decrementAndGet(); - if (remainingIndexes == 0) { - /** - * Note: there is a chance that this will never be removed because some - * index in the dataset was not flushed because it is empty. This could - * be solved by passing only the number of successfully flushed indexes. - */ - replicaUniqueLSN2RemoteMapping.remove(lsmCompProp.getNodeUniqueLSN()); - } - } - - //delete mask to indicate that this component is now valid. - replicaResourcesManager.markLSMComponentReplicaAsValid(lsmCompProp); - lsmComponentId2PropertiesMap.remove(lsmComponentId); - LOGGER.log(Level.INFO, "Completed LSMComponent " + lsmComponentId + " Replication."); - } + public RemoteLogsProcessor getRemoteLogsProcessor() { + return logsProcessor; } @Override public void close() throws IOException { - serverSocketChannel.close(); - LOGGER.log(Level.INFO, "Replication channel closed."); + if (serverSocketChannel != null) { + serverSocketChannel.close(); + LOGGER.info("Replication channel closed."); + } } - /** - * A replication thread is created per received replication request. - */ - private class ReplicationThread implements IReplicationThread { + private class ReplicationWorker implements IReplicationWorker { private final SocketChannel socketChannel; - private final LogRecord remoteLog; - private ByteBuffer inBuffer; - private ByteBuffer outBuffer; + private final ByteBuffer inBuffer; + private final ByteBuffer outBuffer; - public ReplicationThread(SocketChannel socketChannel) { + public ReplicationWorker(SocketChannel socketChannel) { this.socketChannel = socketChannel; - inBuffer = ByteBuffer.allocate(INTIAL_BUFFER_SIZE); - outBuffer = ByteBuffer.allocate(INTIAL_BUFFER_SIZE); - remoteLog = new LogRecord(); + inBuffer = ByteBuffer.allocate(ReplicationProtocol.INITIAL_BUFFER_SIZE); + outBuffer = ByteBuffer.allocate(ReplicationProtocol.INITIAL_BUFFER_SIZE); } @Override public void run() { - Thread.currentThread().setName("Replication Thread"); + final String oldName = Thread.currentThread().getName(); + Thread.currentThread().setName("Replication Workerً"); try { - ReplicationRequestType replicationFunction = ReplicationProtocol.getRequestType(socketChannel, - inBuffer); - while (replicationFunction != ReplicationRequestType.GOODBYE) { - switch (replicationFunction) { - case REPLICATE_LOG: - handleLogReplication(); - break; - case LSM_COMPONENT_PROPERTIES: - handleLSMComponentProperties(); - break; - case REPLICATE_FILE: - handleReplicateFile(); - break; - case DELETE_FILE: - handleDeleteFile(); - break; - case REPLICA_EVENT: - handleReplicaEvent(); - break; - case GET_REPLICA_MAX_LSN: - handleGetReplicaMaxLSN(); - break; - case GET_REPLICA_FILES: - handleGetReplicaFiles(); - break; - case FLUSH_INDEX: - handleFlushIndex(); - break; - case PARTITION_RESOURCES_REQUEST: - handleGetPartitionResources(); - break; - case REPLICATE_RESOURCE_FILE: - handleReplicateResourceFile(); - break; - case DELETE_RESOURCE_FILE: - handleDeleteResourceFile(); - break; - case CHECKPOINT_PARTITION: - handleCheckpointPartition(); - break; - default: - throw new IllegalStateException("Unknown replication request"); - } - replicationFunction = ReplicationProtocol.getRequestType(socketChannel, inBuffer); + ReplicationRequestType requestType = ReplicationProtocol.getRequestType(socketChannel, inBuffer); + while (requestType != ReplicationRequestType.GOODBYE) { + handle(requestType); + requestType = ReplicationProtocol.getRequestType(socketChannel, inBuffer); } } catch (Exception e) { LOGGER.warn("Unexpectedly error during replication.", e); @@ -303,261 +121,10 @@ public class ReplicationChannel extends Thread implements IReplicationChannel { try { socketChannel.close(); } catch (IOException e) { - LOGGER.warn("Filed to close replication socket.", e); - } - } - } - } - - private void handleFlushIndex() throws IOException { - inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer); - //read which indexes are requested to be flushed from remote replica - ReplicaIndexFlushRequest request = ReplicationProtocol.readReplicaIndexFlushRequest(inBuffer); - Set<Long> requestedIndexesToBeFlushed = request.getLaggingRescouresIds(); - - /** - * check which indexes can be flushed (open indexes) and which cannot be - * flushed (closed or have empty memory component). - */ - IDatasetLifecycleManager datasetLifeCycleManager = appContextProvider.getDatasetLifecycleManager(); - List<IndexInfo> openIndexesInfo = datasetLifeCycleManager.getOpenIndexesInfo(); - Set<Integer> datasetsToForceFlush = new HashSet<>(); - for (IndexInfo iInfo : openIndexesInfo) { - if (requestedIndexesToBeFlushed.contains(iInfo.getResourceId())) { - AbstractLSMIOOperationCallback ioCallback = (AbstractLSMIOOperationCallback) iInfo.getIndex() - .getIOOperationCallback(); - //if an index has a pending flush, then the request to flush it will succeed. - if (ioCallback.hasPendingFlush()) { - //remove index to indicate that it will be flushed - requestedIndexesToBeFlushed.remove(iInfo.getResourceId()); - } else if (!((AbstractLSMIndex) iInfo.getIndex()).isCurrentMutableComponentEmpty()) { - /** - * if an index has something to be flushed, then the request to flush it - * will succeed and we need to schedule it to be flushed. - */ - datasetsToForceFlush.add(iInfo.getDatasetId()); - //remove index to indicate that it will be flushed - requestedIndexesToBeFlushed.remove(iInfo.getResourceId()); + LOGGER.warn("Failed to close replication socket.", e); } } - } - - //schedule flush for datasets requested to be flushed - for (int datasetId : datasetsToForceFlush) { - datasetLifeCycleManager.flushDataset(datasetId, true); - } - - //the remaining indexes in the requested set are those which cannot be flushed. - //respond back to the requester that those indexes cannot be flushed - ReplicaIndexFlushRequest laggingIndexesResponse = new ReplicaIndexFlushRequest(requestedIndexesToBeFlushed); - outBuffer = ReplicationProtocol.writeGetReplicaIndexFlushRequest(outBuffer, laggingIndexesResponse); - NetworkingUtil.transferBufferToChannel(socketChannel, outBuffer); - } - - private void handleLSMComponentProperties() throws IOException { - inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer); - LSMComponentProperties lsmCompProp = ReplicationProtocol.readLSMPropertiesRequest(inBuffer); - //create mask to indicate that this component is not valid yet - replicaResourcesManager.createRemoteLSMComponentMask(lsmCompProp); - lsmComponentId2PropertiesMap.put(lsmCompProp.getComponentId(), lsmCompProp); - } - - private void handleReplicateFile() throws IOException { - inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer); - LSMIndexFileProperties afp = ReplicationProtocol.readFileReplicationRequest(inBuffer); - - //get index path - String indexPath = replicaResourcesManager.getIndexPath(afp); - String replicaFilePath = indexPath + File.separator + afp.getFileName(); - - //create file - File destFile = new File(replicaFilePath); - destFile.createNewFile(); - - try (RandomAccessFile fileOutputStream = new RandomAccessFile(destFile, "rw"); - FileChannel fileChannel = fileOutputStream.getChannel()) { - fileOutputStream.setLength(afp.getFileSize()); - NetworkingUtil.downloadFile(fileChannel, socketChannel); - fileChannel.force(true); - - if (afp.requiresAck()) { - ReplicationProtocol.sendAck(socketChannel); - } - if (afp.isLSMComponentFile()) { - String componentId = LSMComponentProperties.getLSMComponentID(afp.getFilePath()); - final LSMComponentProperties lsmComponentProperties = lsmComponentId2PropertiesMap.get(componentId); - // merge operations do not generate flush logs - if (afp.requiresAck() && lsmComponentProperties.getOpType() == LSMOperationType.FLUSH) { - LSMComponentLSNSyncTask syncTask = - new LSMComponentLSNSyncTask(componentId, destFile.getAbsolutePath()); - lsmComponentRemoteLSN2LocalLSNMappingTaskQ.offer(syncTask); - } else { - updateLSMComponentRemainingFiles(componentId); - } - } else { - //index metadata file - final ResourceReference indexRef = ResourceReference.of(destFile.getAbsolutePath()); - indexCheckpointManagerProvider.get(indexRef).init(logManager.getAppendLSN()); - } - } - } - - private void handleGetReplicaMaxLSN() throws IOException { - long maxLNS = logManager.getAppendLSN(); - outBuffer.clear(); - outBuffer.putLong(maxLNS); - outBuffer.flip(); - NetworkingUtil.transferBufferToChannel(socketChannel, outBuffer); - } - - private void handleGetReplicaFiles() throws IOException { - inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer); - ReplicaFilesRequest request = ReplicationProtocol.readReplicaFileRequest(inBuffer); - - LSMIndexFileProperties fileProperties = new LSMIndexFileProperties(); - - List<String> filesList; - Set<Integer> partitionIds = request.getPartitionIds(); - Set<String> requesterExistingFiles = request.getExistingFiles(); - Map<Integer, ClusterPartition> clusterPartitions = appContextProvider.getAppContext() - .getMetadataProperties().getClusterPartitions(); - - // Flush replicated datasets to generate the latest LSM components - dsLifecycleManager.flushDataset(replicationStrategy); - for (Integer partitionId : partitionIds) { - ClusterPartition partition = clusterPartitions.get(partitionId); - filesList = replicaResourcesManager.getPartitionIndexesFiles(partition.getPartitionId(), false); - //start sending files - for (String filePath : filesList) { - // Send only files of datasets that are replciated. - DatasetResourceReference indexFileRef = localResourceRep.getLocalResourceReference(filePath); - if (!replicationStrategy.isMatch(indexFileRef.getDatasetId())) { - continue; - } - String relativeFilePath = StoragePathUtil.getIndexFileRelativePath(filePath); - //if the file already exists on the requester, skip it - if (!requesterExistingFiles.contains(relativeFilePath)) { - try (RandomAccessFile fromFile = new RandomAccessFile(filePath, "r"); - FileChannel fileChannel = fromFile.getChannel();) { - long fileSize = fileChannel.size(); - fileProperties.initialize(filePath, fileSize, partition.getNodeId(), false, false); - outBuffer = ReplicationProtocol.writeFileReplicationRequest(outBuffer, fileProperties, - ReplicationRequestType.REPLICATE_FILE); - - //send file info - NetworkingUtil.transferBufferToChannel(socketChannel, outBuffer); - - //transfer file - NetworkingUtil.sendFile(fileChannel, socketChannel); - } - } - } - } - - //send goodbye (end of files) - ReplicationProtocol.sendGoodbye(socketChannel); - } - - private void handleReplicaEvent() throws IOException { - inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer); - ReplicaEvent event = ReplicationProtocol.readReplicaEventRequest(inBuffer); - replicationManager.reportReplicaEvent(event); - } - - private void handleDeleteFile() throws IOException { - inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer); - LSMIndexFileProperties fileProp = ReplicationProtocol.readFileReplicationRequest(inBuffer); - replicaResourcesManager.deleteIndexFile(fileProp); - if (fileProp.requiresAck()) { - ReplicationProtocol.sendAck(socketChannel); - } - } - - private void handleLogReplication() throws IOException, ACIDException { - //set initial buffer size to a log buffer page size - inBuffer = ByteBuffer.allocate(logManager.getLogPageSize()); - while (true) { - //read a batch of logs - inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer); - //check if it is end of handshake (a single byte log) - if (inBuffer.remaining() == LOG_REPLICATION_END_HANKSHAKE_LOG_SIZE) { - break; - } - - processLogsBatch(inBuffer); - } - } - - private void processLogsBatch(ByteBuffer buffer) throws ACIDException { - while (buffer.hasRemaining()) { - //get rid of log size - inBuffer.getInt(); - //Deserialize log - remoteLog.readRemoteLog(inBuffer); - remoteLog.setLogSource(LogSource.REMOTE); - - switch (remoteLog.getLogType()) { - case LogType.UPDATE: - case LogType.ENTITY_COMMIT: - logManager.log(remoteLog); - break; - case LogType.JOB_COMMIT: - case LogType.ABORT: - LogRecord jobTerminationLog = new LogRecord(); - TransactionUtil.formJobTerminateLogRecord(jobTerminationLog, remoteLog.getTxnId(), - remoteLog.getLogType() == LogType.JOB_COMMIT); - jobTerminationLog.setReplicationThread(this); - jobTerminationLog.setLogSource(LogSource.REMOTE); - logManager.log(jobTerminationLog); - break; - case LogType.FLUSH: - //store mapping information for flush logs to use them in incoming LSM components. - RemoteLogMapping flushLogMap = new RemoteLogMapping(); - LogRecord flushLog = new LogRecord(); - TransactionUtil.formFlushLogRecord(flushLog, remoteLog.getDatasetId(), null, - remoteLog.getNodeId(), remoteLog.getNumOfFlushedIndexes()); - flushLog.setReplicationThread(this); - flushLog.setLogSource(LogSource.REMOTE); - flushLogMap.setRemoteNodeID(remoteLog.getNodeId()); - flushLogMap.setRemoteLSN(remoteLog.getLSN()); - synchronized (localLsn2RemoteMapping) { - logManager.log(flushLog); - //the log LSN value is updated by logManager.log(.) to a local value - flushLogMap.setLocalLSN(flushLog.getLSN()); - flushLogMap.numOfFlushedIndexes.set(flushLog.getNumOfFlushedIndexes()); - replicaUniqueLSN2RemoteMapping.put(flushLogMap.getNodeUniqueLSN(), flushLogMap); - localLsn2RemoteMapping.put(flushLog.getLSN(), flushLogMap); - } - synchronized (flushLogslock) { - flushLogslock.notify(); - } - break; - default: - LOGGER.error("Unsupported LogType: " + remoteLog.getLogType()); - } - } - } - - /** - * this method is called sequentially by {@link LogBuffer#notifyReplicationTermination()} - * for JOB_COMMIT, JOB_ABORT, and FLUSH log types. - */ - @Override - public void notifyLogReplicationRequester(LogRecord logRecord) { - switch (logRecord.getLogType()) { - case LogType.JOB_COMMIT: - case LogType.ABORT: - pendingNotificationRemoteLogsQ.offer(logRecord); - break; - case LogType.FLUSH: - final RemoteLogMapping remoteLogMapping; - synchronized (localLsn2RemoteMapping) { - remoteLogMapping = localLsn2RemoteMapping.remove(logRecord.getLSN()); - } - checkpointReplicaIndexes(remoteLogMapping, logRecord.getDatasetId()); - break; - default: - throw new IllegalStateException("Unexpected log type: " + logRecord.getLogType()); + Thread.currentThread().setName(oldName); } } @@ -571,115 +138,10 @@ public class ReplicationChannel extends Thread implements IReplicationChannel { return outBuffer; } - private void checkpointReplicaIndexes(RemoteLogMapping remoteLogMapping, int datasetId) { - try { - Predicate<LocalResource> replicaIndexesPredicate = lr -> { - DatasetLocalResource dls = (DatasetLocalResource) lr.getResource(); - return dls.getDatasetId() == datasetId && !localResourceRep.getActivePartitions() - .contains(dls.getPartition()); - }; - final Map<Long, LocalResource> resources = localResourceRep.getResources(replicaIndexesPredicate); - final List<DatasetResourceReference> replicaIndexesRef = - resources.values().stream().map(DatasetResourceReference::of).collect(Collectors.toList()); - for (DatasetResourceReference replicaIndexRef : replicaIndexesRef) { - final IIndexCheckpointManager indexCheckpointManager = - indexCheckpointManagerProvider.get(replicaIndexRef); - synchronized (indexCheckpointManager) { - indexCheckpointManager - .masterFlush(remoteLogMapping.getRemoteLSN(), remoteLogMapping.getLocalLSN()); - } - } - } catch (Exception e) { - LOGGER.error("Failed to checkpoint replica indexes", e); - } - } - - private void handleGetPartitionResources() throws IOException { - final PartitionResourcesListTask task = (PartitionResourcesListTask) ReplicationProtocol - .readMessage(ReplicationRequestType.PARTITION_RESOURCES_REQUEST, socketChannel, inBuffer); - task.perform(appCtx, this); - } - - private void handleReplicateResourceFile() throws HyracksDataException { - ReplicateFileTask task = (ReplicateFileTask) ReplicationProtocol - .readMessage(ReplicationRequestType.REPLICATE_RESOURCE_FILE, socketChannel, inBuffer); + private void handle(ReplicationRequestType requestType) throws HyracksDataException { + final IReplicaTask task = + (IReplicaTask) ReplicationProtocol.readMessage(requestType, socketChannel, inBuffer); task.perform(appCtx, this); } - - private void handleDeleteResourceFile() throws HyracksDataException { - DeleteFileTask task = (DeleteFileTask) ReplicationProtocol - .readMessage(ReplicationRequestType.DELETE_RESOURCE_FILE, socketChannel, inBuffer); - task.perform(appCtx, this); - } - - private void handleCheckpointPartition() throws HyracksDataException { - CheckpointPartitionIndexesTask task = (CheckpointPartitionIndexesTask) ReplicationProtocol - .readMessage(ReplicationRequestType.CHECKPOINT_PARTITION, socketChannel, inBuffer); - task.perform(appCtx, this); - } - } - - /** - * This thread is responsible for sending JOB_COMMIT/ABORT ACKs to replication clients. - */ - private class ReplicationNotifier extends Thread { - @Override - public void run() { - Thread.currentThread().setName("ReplicationNotifier Thread"); - while (true) { - try { - LogRecord logRecord = pendingNotificationRemoteLogsQ.take(); - //send ACK to requester - logRecord.getReplicationThread().getChannel().socket().getOutputStream() - .write((localNodeID + ReplicationProtocol.JOB_REPLICATION_ACK + logRecord.getTxnId() - + System.lineSeparator()).getBytes()); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } catch (IOException e) { - LOGGER.warn("Failed to send job replication ACK", e); - } - } - } - } - - /** - * This thread is responsible for synchronizing the LSN of - * the received LSM components to a local LSN. - */ - private class LSMComponentsSyncService extends Thread { - - @Override - public void run() { - Thread.currentThread().setName("LSMComponentsSyncService Thread"); - - while (true) { - try { - LSMComponentLSNSyncTask syncTask = lsmComponentRemoteLSN2LocalLSNMappingTaskQ.take(); - LSMComponentProperties lsmCompProp = lsmComponentId2PropertiesMap.get(syncTask.getComponentId()); - syncLSMComponentFlushLSN(lsmCompProp, syncTask); - updateLSMComponentRemainingFiles(lsmCompProp.getComponentId()); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } catch (Exception e) { - LOGGER.error("Unexpected exception during LSN synchronization", e); - } - } - } - - private void syncLSMComponentFlushLSN(LSMComponentProperties lsmCompProp, LSMComponentLSNSyncTask syncTask) - throws InterruptedException, IOException { - final String componentFilePath = syncTask.getComponentFilePath(); - final ResourceReference indexRef = ResourceReference.of(componentFilePath); - final IIndexCheckpointManager indexCheckpointManager = indexCheckpointManagerProvider.get(indexRef); - synchronized (indexCheckpointManager) { - long masterLsn = lsmCompProp.getOriginalLSN(); - // wait until the lsn mapping is flushed to disk - while (!indexCheckpointManager.isFlushed(masterLsn)) { - indexCheckpointManager.wait(); - } - indexCheckpointManager - .replicated(AbstractLSMIndexFileManager.getComponentEndTime(indexRef.getName()), masterLsn); - } - } } }