This is an automated email from the ASF dual-hosted git repository.
bharat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
The following commit(s) were added to refs/heads/master by this push:
new d3551e2 HDDS-3475. Use transactionInfo table to persist transaction
information. (#856)
d3551e2 is described below
commit d3551e28bd76715e4009992f625da9b0d7d5b499
Author: Bharat Viswanadham <[email protected]>
AuthorDate: Thu May 28 18:38:20 2020 -0700
HDDS-3475. Use transactionInfo table to persist transaction information.
(#856)
---
.../ozone/om/ratis/OzoneManagerDoubleBuffer.java | 79 ++++++++++++++++++++--
.../ozone/om/ratis/OzoneManagerStateMachine.java | 11 +++
...OzoneManagerProtocolServerSideTranslatorPB.java | 32 ++++++---
...tOzoneManagerDoubleBufferWithDummyResponse.java | 14 ++++
...TestOzoneManagerDoubleBufferWithOMResponse.java | 13 ++++
5 files changed, 132 insertions(+), 17 deletions(-)
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
index 8ea145e..fcc4ab4 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
@@ -26,9 +26,11 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.function.SupplierWithIOException;
import org.apache.hadoop.hdds.tracing.TracingUtil;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
@@ -44,6 +46,8 @@ import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.hdds.utils.db.BatchOperation;
import org.apache.ratis.util.ExitUtils;
+import static org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_KEY;
+
/**
* This class implements DoubleBuffer implementation of OMClientResponse's. In
* DoubleBuffer it has 2 buffers one is currentBuffer and other is
@@ -92,6 +96,13 @@ public final class OzoneManagerDoubleBuffer {
private final boolean isTracingEnabled;
/**
+ * function which will get term associated with the transaction index.
+ */
+ private Function<Long, Long> indexToTerm;
+
+
+
+ /**
* Builder for creating OzoneManagerDoubleBuffer.
*/
public static class Builder {
@@ -99,6 +110,7 @@ public final class OzoneManagerDoubleBuffer {
private OzoneManagerRatisSnapshot rs;
private boolean isRatisEnabled = false;
private boolean isTracingEnabled = false;
+ private Function<Long, Long> indexToTerm = null;
public Builder setOmMetadataManager(OMMetadataManager omm) {
this.mm = omm;
@@ -121,15 +133,27 @@ public final class OzoneManagerDoubleBuffer {
return this;
}
+ public Builder setIndexToTerm(Function<Long, Long> termGet) {
+ this.indexToTerm = termGet;
+ return this;
+ }
+
public OzoneManagerDoubleBuffer build() {
+ if (isRatisEnabled) {
+ Preconditions.checkNotNull(rs, "When ratis is enabled, " +
+ "OzoneManagerRatisSnapshot should not be null");
+ Preconditions.checkNotNull(indexToTerm, "When ratis is enabled " +
+ "indexToTerm should not be null");
+ }
return new OzoneManagerDoubleBuffer(mm, rs, isRatisEnabled,
- isTracingEnabled);
+ isTracingEnabled, indexToTerm);
}
}
private OzoneManagerDoubleBuffer(OMMetadataManager omMetadataManager,
OzoneManagerRatisSnapshot ozoneManagerRatisSnapShot,
- boolean isRatisEnabled, boolean isTracingEnabled) {
+ boolean isRatisEnabled, boolean isTracingEnabled,
+ Function<Long, Long> indexToTerm) {
this.currentBuffer = new ConcurrentLinkedQueue<>();
this.readyBuffer = new ConcurrentLinkedQueue<>();
@@ -147,6 +171,7 @@ public final class OzoneManagerDoubleBuffer {
this.ozoneManagerRatisSnapShot = ozoneManagerRatisSnapShot;
this.ozoneManagerDoubleBufferMetrics =
OzoneManagerDoubleBufferMetrics.create();
+ this.indexToTerm = indexToTerm;
isRunning.set(true);
// Daemon thread which runs in back ground and flushes transactions to DB.
@@ -185,6 +210,19 @@ public final class OzoneManagerDoubleBuffer {
}
/**
+ * Add to writeBatch {@link OMTransactionInfo}.
+ */
+ private Void addToBatchTransactionInfoWithTrace(String parentName,
+ long transactionIndex, SupplierWithIOException<Void> supplier)
+ throws IOException {
+ if (!isTracingEnabled) {
+ return supplier.get();
+ }
+ String spanName = "DB-addWriteBatch-transactioninfo-" + transactionIndex;
+ return TracingUtil.executeAsChildSpan(spanName, parentName, supplier);
+ }
+
+ /**
* Runs in a background thread and batches the transaction in currentBuffer
* and commit to DB.
*/
@@ -193,6 +231,7 @@ public final class OzoneManagerDoubleBuffer {
try {
if (canFlush()) {
setReadyBuffer();
+ List<Long> flushedEpochs = null;
try(BatchOperation batchOperation = omMetadataManager.getStore()
.initBatchOperation()) {
@@ -214,6 +253,27 @@ public final class OzoneManagerDoubleBuffer {
}
});
+ // Only when ratis is enabled commit transaction info to DB.
+ if (isRatisEnabled) {
+ flushedEpochs =
+ readyBuffer.stream().map(DoubleBufferEntry::getTrxLogIndex)
+ .sorted().collect(Collectors.toList());
+ long lastRatisTransactionIndex =
+ flushedEpochs.get(flushedEpochs.size() - 1);
+ long term = indexToTerm.apply(lastRatisTransactionIndex);
+
+ addToBatchTransactionInfoWithTrace(lastTraceId.get(),
+ lastRatisTransactionIndex,
+ (SupplierWithIOException<Void>) () -> {
+ omMetadataManager.getTransactionInfoTable().putWithBatch(
+ batchOperation, TRANSACTION_INFO_KEY,
+ new OMTransactionInfo.Builder()
+ .setTransactionIndex(lastRatisTransactionIndex)
+ .setCurrentTerm(term).build());
+ return null;
+ });
+ }
+
long startTime = Time.monotonicNowNanos();
flushBatchWithTrace(lastTraceId.get(), readyBuffer.size(),
(SupplierWithIOException<Void>) () -> {
@@ -246,13 +306,20 @@ public final class OzoneManagerDoubleBuffer {
flushedTransactionsSize);
}
- List<Long> flushedEpochs =
- readyBuffer.stream().map(DoubleBufferEntry::getTrxLogIndex)
- .sorted().collect(Collectors.toList());
+ // When non-HA do the sort step here, as the sorted list is not
+ // required for flush to DB. As in non-HA we want to complete
+ // futures as quick as possible after flush to DB, to release rpc
+ // handler threads.
+ if (!isRatisEnabled) {
+ flushedEpochs =
+ readyBuffer.stream().map(DoubleBufferEntry::getTrxLogIndex)
+ .sorted().collect(Collectors.toList());
+ }
+
cleanupCache(flushedEpochs);
-
+ // Clean up committed transactions.
readyBuffer.clear();
// update the last updated index in OzoneManagerStateMachine.
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
index 3cf22f6..33ac961 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
@@ -112,6 +112,7 @@ public class OzoneManagerStateMachine extends
BaseStateMachine {
.setOzoneManagerRatisSnapShot(this::updateLastAppliedIndex)
.enableRatis(true)
.enableTracing(isTracingEnabled)
+ .setIndexToTerm(this::getTermForIndex)
.build();
this.handler = new OzoneManagerRequestHandler(ozoneManager,
@@ -561,4 +562,14 @@ public class OzoneManagerStateMachine extends
BaseStateMachine {
void addApplyTransactionTermIndex(long term, long index) {
applyTransactionMap.put(index, term);
}
+
+ /**
+ * Return term associated with transaction index.
+ * @param transactionIndex
+ * @return
+ */
+ public long getTermForIndex(long transactionIndex) {
+ return applyTransactionMap.get(transactionIndex);
+ }
+
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
index 6a6cdc4..d64db61 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
@@ -74,17 +74,27 @@ public class OzoneManagerProtocolServerSideTranslatorPB
implements
boolean enableRatis) {
this.ozoneManager = impl;
this.isRatisEnabled = enableRatis;
- this.ozoneManagerDoubleBuffer = new OzoneManagerDoubleBuffer.Builder()
- .setOmMetadataManager(ozoneManager.getMetadataManager())
- // Do nothing.
- // For OM NON-HA code, there is no need to save transaction index.
- // As we wait until the double buffer flushes DB to disk.
- .setOzoneManagerRatisSnapShot((i) -> {})
- .enableRatis(isRatisEnabled)
- .enableTracing(TracingUtil.isTracingEnabled(
- ozoneManager.getConfiguration()))
- .build();
- handler = new OzoneManagerRequestHandler(impl, ozoneManagerDoubleBuffer);
+
+ if (isRatisEnabled) {
+ // In case of ratis is enabled, handler in ServerSideTransaltorPB is used
+ // only for read requests and read requests does not require
+ // double-buffer to be initialized.
+ this.ozoneManagerDoubleBuffer = null;
+ handler = new OzoneManagerRequestHandler(impl, null);
+ } else {
+ this.ozoneManagerDoubleBuffer = new OzoneManagerDoubleBuffer.Builder()
+ .setOmMetadataManager(ozoneManager.getMetadataManager())
+ // Do nothing.
+ // For OM NON-HA code, there is no need to save transaction index.
+ // As we wait until the double buffer flushes DB to disk.
+ .setOzoneManagerRatisSnapShot((i) -> {
+ })
+ .enableRatis(isRatisEnabled)
+ .enableTracing(TracingUtil.isTracingEnabled(
+ ozoneManager.getConfiguration()))
+ .build();
+ handler = new OzoneManagerRequestHandler(impl, ozoneManagerDoubleBuffer);
+ }
this.omRatisServer = ratisServer;
dispatcher = new OzoneProtocolMessageDispatcher<>("OzoneProtocol",
metrics, LOG);
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithDummyResponse.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithDummyResponse.java
index 12de31a..81e69a4 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithDummyResponse.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithDummyResponse.java
@@ -23,6 +23,7 @@ import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -43,8 +44,10 @@ import org.apache.hadoop.util.Time;
import org.apache.hadoop.hdds.utils.db.BatchOperation;
import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
+import static org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_KEY;
import static org.apache.hadoop.test.GenericTestUtils.waitFor;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
/**
@@ -57,6 +60,7 @@ public class TestOzoneManagerDoubleBufferWithDummyResponse {
private OzoneManagerDoubleBuffer doubleBuffer;
private final AtomicLong trxId = new AtomicLong(0);
private long lastAppliedIndex;
+ private long term = 1L;
@Rule
public TemporaryFolder folder = new TemporaryFolder();
@@ -75,6 +79,7 @@ public class TestOzoneManagerDoubleBufferWithDummyResponse {
.setOmMetadataManager(omMetadataManager)
.setOzoneManagerRatisSnapShot(ozoneManagerRatisSnapshot)
.enableRatis(true)
+ .setIndexToTerm((val) -> term)
.build();
}
@@ -124,6 +129,15 @@ public class TestOzoneManagerDoubleBufferWithDummyResponse
{
// Check lastAppliedIndex is updated correctly or not.
assertEquals(bucketCount, lastAppliedIndex);
+
+
+ OMTransactionInfo omTransactionInfo =
+ omMetadataManager.getTransactionInfoTable().get(TRANSACTION_INFO_KEY);
+ assertNotNull(omTransactionInfo);
+
+ Assert.assertEquals(lastAppliedIndex,
+ omTransactionInfo.getTransactionIndex());
+ Assert.assertEquals(term, omTransactionInfo.getCurrentTerm());
}
/**
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithOMResponse.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithOMResponse.java
index 5c274f0..b369341 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithOMResponse.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithOMResponse.java
@@ -61,6 +61,8 @@ import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Daemon;
import org.mockito.Mockito;
+import static org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_KEY;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;
@@ -81,6 +83,7 @@ public class TestOzoneManagerDoubleBufferWithOMResponse {
private final AtomicLong trxId = new AtomicLong(0);
private OzoneManagerRatisSnapshot ozoneManagerRatisSnapshot;
private volatile long lastAppliedIndex;
+ private long term = 1L;
@Rule
public TemporaryFolder folder = new TemporaryFolder();
@@ -107,6 +110,7 @@ public class TestOzoneManagerDoubleBufferWithOMResponse {
setOmMetadataManager(omMetadataManager).
setOzoneManagerRatisSnapShot(ozoneManagerRatisSnapshot)
.enableRatis(true)
+ .setIndexToTerm((i) -> term)
.build();
ozoneManagerDoubleBufferHelper = doubleBuffer::add;
}
@@ -190,6 +194,15 @@ public class TestOzoneManagerDoubleBufferWithOMResponse {
// Check lastAppliedIndex is updated correctly or not.
Assert.assertEquals(bucketCount + deleteCount + 1, lastAppliedIndex);
+
+
+ OMTransactionInfo omTransactionInfo =
+ omMetadataManager.getTransactionInfoTable().get(TRANSACTION_INFO_KEY);
+ assertNotNull(omTransactionInfo);
+
+ Assert.assertEquals(lastAppliedIndex,
+ omTransactionInfo.getTransactionIndex());
+ Assert.assertEquals(term, omTransactionInfo.getCurrentTerm());
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]