This is an automated email from the ASF dual-hosted git repository.

bharat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new d3551e2  HDDS-3475. Use transactionInfo table to persist transaction 
information. (#856)
d3551e2 is described below

commit d3551e28bd76715e4009992f625da9b0d7d5b499
Author: Bharat Viswanadham <[email protected]>
AuthorDate: Thu May 28 18:38:20 2020 -0700

    HDDS-3475. Use transactionInfo table to persist transaction information. 
(#856)
---
 .../ozone/om/ratis/OzoneManagerDoubleBuffer.java   | 79 ++++++++++++++++++++--
 .../ozone/om/ratis/OzoneManagerStateMachine.java   | 11 +++
 ...OzoneManagerProtocolServerSideTranslatorPB.java | 32 ++++++---
 ...tOzoneManagerDoubleBufferWithDummyResponse.java | 14 ++++
 ...TestOzoneManagerDoubleBufferWithOMResponse.java | 13 ++++
 5 files changed, 132 insertions(+), 17 deletions(-)

diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
index 8ea145e..fcc4ab4 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
@@ -26,9 +26,11 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.function.SupplierWithIOException;
 import org.apache.hadoop.hdds.tracing.TracingUtil;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
@@ -44,6 +46,8 @@ import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.hdds.utils.db.BatchOperation;
 import org.apache.ratis.util.ExitUtils;
 
+import static org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_KEY;
+
 /**
  * This class implements DoubleBuffer implementation of OMClientResponse's. In
  * DoubleBuffer it has 2 buffers one is currentBuffer and other is
@@ -92,6 +96,13 @@ public final class OzoneManagerDoubleBuffer {
   private final boolean isTracingEnabled;
 
   /**
+   * function which will get term associated with the transaction index.
+   */
+  private Function<Long, Long> indexToTerm;
+
+
+
+  /**
    *  Builder for creating OzoneManagerDoubleBuffer.
    */
   public static class Builder {
@@ -99,6 +110,7 @@ public final class OzoneManagerDoubleBuffer {
     private OzoneManagerRatisSnapshot rs;
     private boolean isRatisEnabled = false;
     private boolean isTracingEnabled = false;
+    private Function<Long, Long> indexToTerm = null;
 
     public Builder setOmMetadataManager(OMMetadataManager omm) {
       this.mm = omm;
@@ -121,15 +133,27 @@ public final class OzoneManagerDoubleBuffer {
       return this;
     }
 
+    public Builder setIndexToTerm(Function<Long, Long> termGet) {
+      this.indexToTerm = termGet;
+      return this;
+    }
+
     public OzoneManagerDoubleBuffer build() {
+      if (isRatisEnabled) {
+        Preconditions.checkNotNull(rs, "When ratis is enabled, " +
+                "OzoneManagerRatisSnapshot should not be null");
+        Preconditions.checkNotNull(indexToTerm, "When ratis is enabled " +
+            "indexToTerm should not be null");
+      }
       return new OzoneManagerDoubleBuffer(mm, rs, isRatisEnabled,
-          isTracingEnabled);
+          isTracingEnabled, indexToTerm);
     }
   }
 
   private OzoneManagerDoubleBuffer(OMMetadataManager omMetadataManager,
       OzoneManagerRatisSnapshot ozoneManagerRatisSnapShot,
-      boolean isRatisEnabled, boolean isTracingEnabled) {
+      boolean isRatisEnabled, boolean isTracingEnabled,
+      Function<Long, Long> indexToTerm) {
     this.currentBuffer = new ConcurrentLinkedQueue<>();
     this.readyBuffer = new ConcurrentLinkedQueue<>();
 
@@ -147,6 +171,7 @@ public final class OzoneManagerDoubleBuffer {
     this.ozoneManagerRatisSnapShot = ozoneManagerRatisSnapShot;
     this.ozoneManagerDoubleBufferMetrics =
         OzoneManagerDoubleBufferMetrics.create();
+    this.indexToTerm = indexToTerm;
 
     isRunning.set(true);
     // Daemon thread which runs in back ground and flushes transactions to DB.
@@ -185,6 +210,19 @@ public final class OzoneManagerDoubleBuffer {
   }
 
   /**
+   * Add to writeBatch {@link OMTransactionInfo}.
+   */
+  private Void addToBatchTransactionInfoWithTrace(String parentName,
+      long transactionIndex, SupplierWithIOException<Void> supplier)
+      throws IOException {
+    if (!isTracingEnabled) {
+      return supplier.get();
+    }
+    String spanName = "DB-addWriteBatch-transactioninfo-" + transactionIndex;
+    return TracingUtil.executeAsChildSpan(spanName, parentName, supplier);
+  }
+
+  /**
    * Runs in a background thread and batches the transaction in currentBuffer
    * and commit to DB.
    */
@@ -193,6 +231,7 @@ public final class OzoneManagerDoubleBuffer {
       try {
         if (canFlush()) {
           setReadyBuffer();
+          List<Long> flushedEpochs = null;
           try(BatchOperation batchOperation = omMetadataManager.getStore()
               .initBatchOperation()) {
 
@@ -214,6 +253,27 @@ public final class OzoneManagerDoubleBuffer {
               }
             });
 
+            // Only when ratis is enabled commit transaction info to DB.
+            if (isRatisEnabled) {
+              flushedEpochs =
+                  readyBuffer.stream().map(DoubleBufferEntry::getTrxLogIndex)
+                      .sorted().collect(Collectors.toList());
+              long lastRatisTransactionIndex =
+                  flushedEpochs.get(flushedEpochs.size() - 1);
+              long term = indexToTerm.apply(lastRatisTransactionIndex);
+
+              addToBatchTransactionInfoWithTrace(lastTraceId.get(),
+                  lastRatisTransactionIndex,
+                  (SupplierWithIOException<Void>) () -> {
+                  omMetadataManager.getTransactionInfoTable().putWithBatch(
+                      batchOperation, TRANSACTION_INFO_KEY,
+                      new OMTransactionInfo.Builder()
+                      .setTransactionIndex(lastRatisTransactionIndex)
+                      .setCurrentTerm(term).build());
+                  return null;
+                });
+            }
+
             long startTime = Time.monotonicNowNanos();
             flushBatchWithTrace(lastTraceId.get(), readyBuffer.size(),
                 (SupplierWithIOException<Void>) () -> {
@@ -246,13 +306,20 @@ public final class OzoneManagerDoubleBuffer {
                 flushedTransactionsSize);
           }
 
-          List<Long> flushedEpochs =
-              readyBuffer.stream().map(DoubleBufferEntry::getTrxLogIndex)
-                  .sorted().collect(Collectors.toList());
+          // When non-HA do the sort step here, as the sorted list is not
+          // required for flush to DB. As in non-HA we want to complete
+          // futures as quick as possible after flush to DB, to release rpc
+          // handler threads.
+          if (!isRatisEnabled) {
+            flushedEpochs =
+                readyBuffer.stream().map(DoubleBufferEntry::getTrxLogIndex)
+                    .sorted().collect(Collectors.toList());
+          }
+
 
           cleanupCache(flushedEpochs);
 
-
+          // Clean up committed transactions.
           readyBuffer.clear();
 
           // update the last updated index in OzoneManagerStateMachine.
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
index 3cf22f6..33ac961 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
@@ -112,6 +112,7 @@ public class OzoneManagerStateMachine extends 
BaseStateMachine {
         .setOzoneManagerRatisSnapShot(this::updateLastAppliedIndex)
         .enableRatis(true)
         .enableTracing(isTracingEnabled)
+        .setIndexToTerm(this::getTermForIndex)
         .build();
 
     this.handler = new OzoneManagerRequestHandler(ozoneManager,
@@ -561,4 +562,14 @@ public class OzoneManagerStateMachine extends 
BaseStateMachine {
   void addApplyTransactionTermIndex(long term, long index) {
     applyTransactionMap.put(index, term);
   }
+
+  /**
+   * Return term associated with transaction index.
+   * @param transactionIndex
+   * @return
+   */
+  public long getTermForIndex(long transactionIndex) {
+    return applyTransactionMap.get(transactionIndex);
+  }
+
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
index 6a6cdc4..d64db61 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
@@ -74,17 +74,27 @@ public class OzoneManagerProtocolServerSideTranslatorPB 
implements
       boolean enableRatis) {
     this.ozoneManager = impl;
     this.isRatisEnabled = enableRatis;
-    this.ozoneManagerDoubleBuffer = new OzoneManagerDoubleBuffer.Builder()
-        .setOmMetadataManager(ozoneManager.getMetadataManager())
-        // Do nothing.
-        // For OM NON-HA code, there is no need to save transaction index.
-        // As we wait until the double buffer flushes DB to disk.
-        .setOzoneManagerRatisSnapShot((i) -> {})
-        .enableRatis(isRatisEnabled)
-        .enableTracing(TracingUtil.isTracingEnabled(
-            ozoneManager.getConfiguration()))
-        .build();
-    handler = new OzoneManagerRequestHandler(impl, ozoneManagerDoubleBuffer);
+
+    if (isRatisEnabled) {
+      // In case of ratis is enabled, handler in ServerSideTransaltorPB is used
+      // only for read requests and read requests does not require
+      // double-buffer to be initialized.
+      this.ozoneManagerDoubleBuffer = null;
+      handler = new OzoneManagerRequestHandler(impl, null);
+    } else {
+      this.ozoneManagerDoubleBuffer = new OzoneManagerDoubleBuffer.Builder()
+          .setOmMetadataManager(ozoneManager.getMetadataManager())
+          // Do nothing.
+          // For OM NON-HA code, there is no need to save transaction index.
+          // As we wait until the double buffer flushes DB to disk.
+          .setOzoneManagerRatisSnapShot((i) -> {
+          })
+          .enableRatis(isRatisEnabled)
+          .enableTracing(TracingUtil.isTracingEnabled(
+              ozoneManager.getConfiguration()))
+          .build();
+      handler = new OzoneManagerRequestHandler(impl, ozoneManagerDoubleBuffer);
+    }
     this.omRatisServer = ratisServer;
     dispatcher = new OzoneProtocolMessageDispatcher<>("OzoneProtocol",
         metrics, LOG);
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithDummyResponse.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithDummyResponse.java
index 12de31a..81e69a4 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithDummyResponse.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithDummyResponse.java
@@ -23,6 +23,7 @@ import java.util.UUID;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -43,8 +44,10 @@ import org.apache.hadoop.util.Time;
 import org.apache.hadoop.hdds.utils.db.BatchOperation;
 
 import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
+import static org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_KEY;
 import static org.apache.hadoop.test.GenericTestUtils.waitFor;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -57,6 +60,7 @@ public class TestOzoneManagerDoubleBufferWithDummyResponse {
   private OzoneManagerDoubleBuffer doubleBuffer;
   private final AtomicLong trxId = new AtomicLong(0);
   private long lastAppliedIndex;
+  private long term = 1L;
 
   @Rule
   public TemporaryFolder folder = new TemporaryFolder();
@@ -75,6 +79,7 @@ public class TestOzoneManagerDoubleBufferWithDummyResponse {
         .setOmMetadataManager(omMetadataManager)
         .setOzoneManagerRatisSnapShot(ozoneManagerRatisSnapshot)
         .enableRatis(true)
+        .setIndexToTerm((val) -> term)
         .build();
   }
 
@@ -124,6 +129,15 @@ public class TestOzoneManagerDoubleBufferWithDummyResponse 
{
 
     // Check lastAppliedIndex is updated correctly or not.
     assertEquals(bucketCount, lastAppliedIndex);
+
+
+    OMTransactionInfo omTransactionInfo =
+        omMetadataManager.getTransactionInfoTable().get(TRANSACTION_INFO_KEY);
+    assertNotNull(omTransactionInfo);
+
+    Assert.assertEquals(lastAppliedIndex,
+        omTransactionInfo.getTransactionIndex());
+    Assert.assertEquals(term, omTransactionInfo.getCurrentTerm());
   }
 
   /**
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithOMResponse.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithOMResponse.java
index 5c274f0..b369341 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithOMResponse.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithOMResponse.java
@@ -61,6 +61,8 @@ import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.Daemon;
 import org.mockito.Mockito;
 
+import static org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_KEY;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.when;
@@ -81,6 +83,7 @@ public class TestOzoneManagerDoubleBufferWithOMResponse {
   private final AtomicLong trxId = new AtomicLong(0);
   private OzoneManagerRatisSnapshot ozoneManagerRatisSnapshot;
   private volatile long lastAppliedIndex;
+  private long term = 1L;
 
   @Rule
   public TemporaryFolder folder = new TemporaryFolder();
@@ -107,6 +110,7 @@ public class TestOzoneManagerDoubleBufferWithOMResponse {
         setOmMetadataManager(omMetadataManager).
         setOzoneManagerRatisSnapShot(ozoneManagerRatisSnapshot)
         .enableRatis(true)
+        .setIndexToTerm((i) -> term)
         .build();
     ozoneManagerDoubleBufferHelper = doubleBuffer::add;
   }
@@ -190,6 +194,15 @@ public class TestOzoneManagerDoubleBufferWithOMResponse {
 
     // Check lastAppliedIndex is updated correctly or not.
     Assert.assertEquals(bucketCount + deleteCount + 1, lastAppliedIndex);
+
+
+    OMTransactionInfo omTransactionInfo =
+        omMetadataManager.getTransactionInfoTable().get(TRANSACTION_INFO_KEY);
+    assertNotNull(omTransactionInfo);
+
+    Assert.assertEquals(lastAppliedIndex,
+        omTransactionInfo.getTransactionIndex());
+    Assert.assertEquals(term, omTransactionInfo.getCurrentTerm());
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to