>From Murtadha Al Hubail <[email protected]>:
Murtadha Al Hubail has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17632 )
Change subject: [ASTERIXDB-3219][REPL] Add timeout to log replication
......................................................................
[ASTERIXDB-3219][REPL] Add timeout to log replication
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Do not wait indefinitely for logs to be replicated.
Change-Id: I53b3a0d23514fce09082556e031f822dfe426a35
---
M
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
1 file changed, 35 insertions(+), 1 deletion(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/32/17632/1
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
index 8a1cc65..1c614c9 100644
---
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.transaction.management.service.logging;
+import java.util.concurrent.TimeUnit;
+
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.replication.IReplicationManager;
import org.apache.asterix.common.replication.IReplicationStrategy;
@@ -26,6 +28,7 @@
import org.apache.asterix.common.transactions.LogSource;
import org.apache.asterix.common.transactions.LogType;
import org.apache.hyracks.api.util.InvokeUtil;
+import org.apache.logging.log4j.Logger;
import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
import it.unimi.dsi.fastutil.longs.LongSet;
@@ -33,12 +36,16 @@
public class LogManagerWithReplication extends LogManager {
+ private static final Logger LOGGER =
org.apache.logging.log4j.LogManager.getLogger();
private IReplicationManager replicationManager;
private IReplicationStrategy replicationStrategy;
private final LongSet replicatedTxn = LongSets.synchronize(new
LongOpenHashSet());
+ private final long replicationTimeoutMillis;
public LogManagerWithReplication(ITransactionSubsystem txnSubsystem) {
super(txnSubsystem);
+ replicationTimeoutMillis = TimeUnit.SECONDS
+
.toMillis(txnSubsystem.getApplicationContext().getReplicationProperties().getReplicationTimeOut());
}
@SuppressWarnings("squid:S2445")
@@ -94,8 +101,18 @@
//wait for job Commit/Abort ACK from replicas
if (logRecord.isReplicate() && (logRecord.getLogType() ==
LogType.JOB_COMMIT
|| logRecord.getLogType() == LogType.ABORT)) {
+ long replicationTimeOut = replicationTimeoutMillis;
while (!logRecord.isReplicated()) {
- logRecord.wait();
+ if (replicationTimeOut <= 0) {
+ LOGGER.warn(
+ "{} ms passed without receiving acks
for log {}; setting log as replicated due to timeout",
+ replicationTimeoutMillis,
logRecord.getLogRecordForDisplay());
+ logRecord.setReplicated(true);
+ continue;
+ }
+ final long startTime = System.nanoTime();
+ logRecord.wait(replicationTimeOut);
+ replicationTimeOut -=
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
}
}
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17632
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: neo
Gerrit-Change-Id: I53b3a0d23514fce09082556e031f822dfe426a35
Gerrit-Change-Number: 17632
Gerrit-PatchSet: 1
Gerrit-Owner: Murtadha Al Hubail <[email protected]>
Gerrit-MessageType: newchange