Murtadha Hubail has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/3469
Change subject: [NO ISSUE][REPL] Suspend Dataset Checkpointing on Replica Sync
......................................................................
[NO ISSUE][REPL] Suspend Dataset Checkpointing on Replica Sync
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
- Before synchronizing replicas, stop datasets checkpointing to
prevent new files from being generated due to async IO operations
triggered by checkpointing.
- Instead of sync'ing current files to replicas then scheduling a flush
and sync'ing any newly generated files, just flush datasets before
the initial sync then sync all the files in one go.
Change-Id: I058fd48bc0fb89a1e16448ce516c3410bb4d681d
---
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java
M
asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
M
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
3 files changed, 37 insertions(+), 11 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/69/3469/1
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java
index 36cea55..954e399 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java
@@ -58,4 +58,14 @@
* @param id
*/
void completed(TxnId id);
+
+ /**
+ * Suspends checkpointing datasets
+ */
+ void suspend();
+
+ /**
+ * Resumes checkpointing datasets
+ */
+ void resume();
}
diff --git
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
index 0f0b5bd..123709b 100644
---
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
+++
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
@@ -22,6 +22,7 @@
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.replication.IReplicationStrategy;
+import org.apache.asterix.common.transactions.ICheckpointManager;
import org.apache.asterix.replication.api.PartitionReplica;
import org.apache.asterix.replication.messaging.CheckpointPartitionIndexesTask;
import org.apache.asterix.replication.messaging.ReplicationProtocol;
@@ -45,21 +46,25 @@
public void sync() throws IOException {
final Object syncLock =
appCtx.getReplicaManager().getReplicaSyncLock();
synchronized (syncLock) {
- syncFiles();
- checkpointReplicaIndexes();
- appCtx.getReplicationManager().register(replica);
+ final ICheckpointManager checkpointManager =
appCtx.getTransactionSubsystem().getCheckpointManager();
+ try {
+ // suspend checkpointing datasets to prevent async IO
operations while sync'ing replicas
+ checkpointManager.suspend();
+ syncFiles();
+ checkpointReplicaIndexes();
+ appCtx.getReplicationManager().register(replica);
+ } finally {
+ checkpointManager.resume();
+ }
}
}
private void syncFiles() throws IOException {
final ReplicaFilesSynchronizer fileSync = new
ReplicaFilesSynchronizer(appCtx, replica);
- waitForReplicatedDatasetsIO();
- fileSync.sync();
// flush replicated dataset to generate disk component for any
remaining in-memory components
final IReplicationStrategy replStrategy =
appCtx.getReplicationManager().getReplicationStrategy();
appCtx.getDatasetLifecycleManager().flushDataset(replStrategy);
waitForReplicatedDatasetsIO();
- // sync any newly generated files
fileSync.sync();
}
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
index ce523db..b85742e 100644
---
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
@@ -18,6 +18,10 @@
*/
package org.apache.asterix.transaction.management.service.recovery;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.transactions.CheckpointProperties;
import org.apache.asterix.common.transactions.ICheckpointManager;
@@ -26,10 +30,6 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
/**
* An implementation of {@link ICheckpointManager} that defines the logic
@@ -40,6 +40,7 @@
private static final Logger LOGGER = LogManager.getLogger();
private static final long NO_SECURED_LSN = -1L;
private final Map<TxnId, Long> securedLSNs;
+ private boolean suspended = false;
public CheckpointManager(ITransactionSubsystem txnSubsystem,
CheckpointProperties checkpointProperties) {
super(txnSubsystem, checkpointProperties);
@@ -76,7 +77,7 @@
}
final long minFirstLSN =
txnSubsystem.getRecoveryManager().getMinFirstLSN();
boolean checkpointSucceeded = minFirstLSN >= checkpointTargetLSN;
- if (!checkpointSucceeded) {
+ if (!checkpointSucceeded && !suspended) {
// Flush datasets with indexes behind target checkpoint LSN
IDatasetLifecycleManager datasetLifecycleManager =
txnSubsystem.getApplicationContext().getDatasetLifecycleManager();
@@ -100,6 +101,16 @@
securedLSNs.remove(id);
}
+ @Override
+ public synchronized void suspend() {
+ suspended = true;
+ }
+
+ @Override
+ public synchronized void resume() {
+ suspended = false;
+ }
+
private synchronized long getMinSecuredLSN() {
return securedLSNs.isEmpty() ? NO_SECURED_LSN :
Collections.min(securedLSNs.values());
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/3469
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: stabilization-f69489
Gerrit-MessageType: newchange
Gerrit-Change-Id: I058fd48bc0fb89a1e16448ce516c3410bb4d681d
Gerrit-Change-Number: 3469
Gerrit-PatchSet: 1
Gerrit-Owner: Murtadha Hubail <[email protected]>