HBASE-18960 A few bug fixes and minor improvements around batchMutate

* batch validation and preparation is done before we start iterating over 
operations for writes
* durability, familyCellMaps and observedExceptions are batch wide and are now 
sotred in BatchOperation,
  as a result durability is consistent across all operations in a batch
* for all operations done by preBatchMutate() CP hook, operation status is 
updated to success
* doWALAppend() is modified to habdle replay and is used from 
doMiniBatchMutate()
* minor improvements

Signed-off-by: Michael Stack <st...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e1941aa6
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e1941aa6
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e1941aa6

Branch: refs/heads/HBASE-18410
Commit: e1941aa6d14afd116a555fc93a3149f3e7c20af2
Parents: 70f4c5d
Author: Umesh Agashe <uaga...@cloudera.com>
Authored: Fri Oct 6 15:40:05 2017 -0700
Committer: Michael Stack <st...@apache.org>
Committed: Tue Oct 17 13:57:00 2017 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/regionserver/HRegion.java      | 420 ++++++++-----------
 .../regionserver/TestHRegionReplayEvents.java   |  21 +
 2 files changed, 207 insertions(+), 234 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/e1941aa6/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 0bef925..1cbb689 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -661,7 +661,7 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
 
   private final MetricsRegion metricsRegion;
   private final MetricsRegionWrapperImpl metricsRegionWrapper;
-  private final Durability durability;
+  private final Durability regionDurability;
   private final boolean regionStatsEnabled;
   // Stores the replication scope of the various column families of the table
   // that has non-default scope
@@ -787,9 +787,8 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
      */
     this.rowProcessorTimeout = conf.getLong(
         "hbase.hregion.row.processor.timeout", DEFAULT_ROW_PROCESSOR_TIMEOUT);
-    this.durability = htd.getDurability() == Durability.USE_DEFAULT
-        ? DEFAULT_DURABILITY
-        : htd.getDurability();
+    this.regionDurability = htd.getDurability() == Durability.USE_DEFAULT ?
+        DEFAULT_DURABILITY : htd.getDurability();
     if (rsServices != null) {
       this.rsAccounting = this.rsServices.getRegionServerAccounting();
       // don't initialize coprocessors if not running within a regionserver
@@ -1945,13 +1944,6 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
   // upkeep.
   
//////////////////////////////////////////////////////////////////////////////
   /**
-   * @return returns size of largest HStore.
-   */
-  public long getLargestHStoreSize() {
-    return 
stores.values().stream().mapToLong(HStore::getSize).max().orElse(0L);
-  }
-
-  /**
    * Do preparation for pending compaction.
    * @throws IOException
    */
@@ -3018,21 +3010,28 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
   }
 
   /**
-   * Struct-like class that tracks the progress of a batch operation,
-   * accumulating status codes and tracking the index at which processing
-   * is proceeding.
+   * Struct-like class that tracks the progress of a batch operation, 
accumulating status codes
+   * and tracking the index at which processing is proceeding. These batch 
operations may get
+   * split into mini-batches for processing.
    */
   private abstract static class BatchOperation<T> {
     T[] operations;
     int nextIndexToProcess = 0;
     OperationStatus[] retCodeDetails;
     WALEdit[] walEditsFromCoprocessors;
+    // reference family cell maps directly so coprocessors can mutate them if 
desired
+    Map<byte[], List<Cell>>[] familyCellMaps;
+    ObservedExceptionsInBatch observedExceptions;
+    Durability durability;  //Durability of the batch (highest durability of 
all operations)
 
     public BatchOperation(T[] operations) {
       this.operations = operations;
       this.retCodeDetails = new OperationStatus[operations.length];
       this.walEditsFromCoprocessors = new WALEdit[operations.length];
       Arrays.fill(this.retCodeDetails, OperationStatus.NOT_RUN);
+      familyCellMaps = new Map[operations.length];
+      observedExceptions = new ObservedExceptionsInBatch();
+      durability = Durability.USE_DEFAULT;
     }
 
     public abstract Mutation getMutation(int index);
@@ -3046,12 +3045,69 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
     public boolean isDone() {
       return nextIndexToProcess == operations.length;
     }
+
+    /**
+     * Validates each mutation and prepares a batch for write.
+     * NOTE: As CP prePut()/ preDelete() hooks may modify mutations, this 
method should be called
+     * after prePut()/ preDelete() CP hooks are run for all mutations in a 
batch.
+     */
+    public void checkAndPrepare(final HRegion region) throws IOException {
+      long now = EnvironmentEdgeManager.currentTime();
+      for (int i = 0 ; i < operations.length; i++) {
+        // Skip anything that "ran" already
+        if (retCodeDetails[i].getOperationStatusCode() == 
OperationStatusCode.NOT_RUN) {
+          Mutation mutation = getMutation(i);
+
+          try {
+            region.checkAndPrepareMutation(mutation, isInReplay(), now);
+
+            // store the family map reference to allow for mutations
+            familyCellMaps[i] = mutation.getFamilyCellMap();
+            // store durability for the batch (highest durability of all 
operations in the batch)
+            Durability tmpDur = 
region.getEffectiveDurability(mutation.getDurability());
+            if (tmpDur.ordinal() > durability.ordinal()) {
+              durability = tmpDur;
+            }
+          } catch (NoSuchColumnFamilyException nscf) {
+            final String msg = "No such column family in batch mutation. ";
+            if (observedExceptions.hasSeenNoSuchFamily()) {
+              LOG.warn(msg + nscf.getMessage());
+            } else {
+              LOG.warn(msg, nscf);
+              observedExceptions.sawNoSuchFamily();
+            }
+            retCodeDetails[i] = new OperationStatus(
+                OperationStatusCode.BAD_FAMILY, nscf.getMessage());
+          } catch (FailedSanityCheckException fsce) {
+            final String msg = "Batch Mutation did not pass sanity check. ";
+            if (observedExceptions.hasSeenFailedSanityCheck()) {
+              LOG.warn(msg + fsce.getMessage());
+            } else {
+              LOG.warn(msg, fsce);
+              observedExceptions.sawFailedSanityCheck();
+            }
+            retCodeDetails[i] = new OperationStatus(
+                OperationStatusCode.SANITY_CHECK_FAILURE, fsce.getMessage());
+          } catch (WrongRegionException we) {
+            final String msg = "Batch mutation had a row that does not belong 
to this region. ";
+            if (observedExceptions.hasSeenWrongRegion()) {
+              LOG.warn(msg + we.getMessage());
+            } else {
+              LOG.warn(msg, we);
+              observedExceptions.sawWrongRegion();
+            }
+            retCodeDetails[i] = new OperationStatus(
+                OperationStatusCode.SANITY_CHECK_FAILURE, we.getMessage());
+          }
+        }
+      }
+    }
   }
 
-  private static class MutationBatch extends BatchOperation<Mutation> {
+  private static class MutationBatchOperation extends BatchOperation<Mutation> 
{
     private long nonceGroup;
     private long nonce;
-    public MutationBatch(Mutation[] operations, long nonceGroup, long nonce) {
+    public MutationBatchOperation(Mutation[] operations, long nonceGroup, long 
nonce) {
       super(operations);
       this.nonceGroup = nonceGroup;
       this.nonce = nonce;
@@ -3088,9 +3144,9 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
     }
   }
 
-  private static class ReplayBatch extends BatchOperation<MutationReplay> {
+  private static class ReplayBatchOperation extends 
BatchOperation<MutationReplay> {
     private long replaySeqId = 0;
-    public ReplayBatch(MutationReplay[] operations, long seqId) {
+    public ReplayBatchOperation(MutationReplay[] operations, long seqId) {
       super(operations);
       this.replaySeqId = seqId;
     }
@@ -3133,10 +3189,9 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
     //  * batchMutate with single mutation - put/delete, separate or from 
checkAndMutate.
     //  * coprocessor calls (see ex. BulkDeleteEndpoint).
     // So nonces are not really ever used by HBase. They could be by coprocs, 
and checkAnd...
-    return batchMutate(new MutationBatch(mutations, nonceGroup, nonce));
+    return batchMutate(new MutationBatchOperation(mutations, nonceGroup, 
nonce));
   }
 
-  @Override
   public OperationStatus[] batchMutate(Mutation[] mutations) throws 
IOException {
     return batchMutate(mutations, HConstants.NO_NONCE, HConstants.NO_NONCE);
   }
@@ -3162,12 +3217,14 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
       }
       return statuses;
     }
-    return batchMutate(new ReplayBatch(mutations, replaySeqId));
+    return batchMutate(new ReplayBatchOperation(mutations, replaySeqId));
   }
 
   /**
    * Perform a batch of mutations.
-   * It supports only Put and Delete mutations and will ignore other types 
passed.
+   * It supports only Put and Delete mutations and will ignore other types 
passed. Operations in
+   * a batch are stored with highest durability specified of for all 
operations in a batch,
+   * except for {@link Durability#SKIP_WAL}.
    * @param batchOp contains the list of mutations
    * @return an array of OperationStatus which internally contains the
    *         OperationStatusCode and the exceptionMessage if any.
@@ -3187,8 +3244,10 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
         if (!initialized) {
           this.writeRequestsCount.add(batchOp.operations.length);
           if (!batchOp.isInReplay()) {
-            doPreBatchMutateHook(batchOp);
+            callPreMutateCPHooks(batchOp);
           }
+          // validate and prepare batch for write, after CP pre-hooks
+          batchOp.checkAndPrepare(this);
           initialized = true;
         }
         doMiniBatchMutate(batchOp);
@@ -3201,8 +3260,11 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
     return batchOp.retCodeDetails;
   }
 
-  private void doPreBatchMutateHook(BatchOperation<?> batchOp)
-      throws IOException {
+  /**
+   * Runs prePut/ preDelete coprocessor hooks for each mutation in a batch.
+   * @param batchOp
+   */
+  private void callPreMutateCPHooks(BatchOperation<?> batchOp) throws 
IOException {
     /* Run coprocessor pre hook outside of locks to avoid deadlock */
     WALEdit walEdit = new WALEdit();
     if (coprocessorHost != null) {
@@ -3252,27 +3314,24 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
     long currentNonce = HConstants.NO_NONCE;
     WALEdit walEdit = null;
     boolean locked = false;
-    // reference family maps directly so coprocessors can mutate them if 
desired
-    Map<byte[], List<Cell>>[] familyMaps = new Map[batchOp.operations.length];
     // We try to set up a batch in the range [firstIndex,lastIndexExclusive)
     int firstIndex = batchOp.nextIndexToProcess;
     int lastIndexExclusive = firstIndex;
     boolean success = false;
+    boolean doneByCoprocessor = false;
     int noOfPuts = 0;
     int noOfDeletes = 0;
     WriteEntry writeEntry = null;
     int cellCount = 0;
     /** Keep track of the locks we hold so we can release them in finally 
clause */
     List<RowLock> acquiredRowLocks = 
Lists.newArrayListWithCapacity(batchOp.operations.length);
-    MemStoreSize memstoreSize = new MemStoreSize();
-    final ObservedExceptionsInBatch observedExceptions = new 
ObservedExceptionsInBatch();
+    MemStoreSize memStoreSize = new MemStoreSize();
     try {
       // STEP 1. Try to acquire as many locks as we can, and ensure we acquire 
at least one.
       int numReadyToWrite = 0;
-      long now = EnvironmentEdgeManager.currentTime();
-      while (lastIndexExclusive < batchOp.operations.length) {
-        if (checkBatchOp(batchOp, lastIndexExclusive, familyMaps, now, 
observedExceptions)) {
-          lastIndexExclusive++;
+      for (; lastIndexExclusive < batchOp.operations.length; 
lastIndexExclusive++) {
+        if (batchOp.retCodeDetails[lastIndexExclusive].getOperationStatusCode()
+            != OperationStatusCode.NOT_RUN) {
           continue;
         }
         Mutation mutation = batchOp.getMutation(lastIndexExclusive);
@@ -3293,9 +3352,8 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
           acquiredRowLocks.add(rowLock);
         }
 
-        lastIndexExclusive++;
         numReadyToWrite++;
-        if (replay) {
+        if (replay || getEffectiveDurability(mutation.getDurability()) != 
Durability.SKIP_WAL) {
           for (List<Cell> cells : mutation.getFamilyCellMap().values()) {
             cellCount += cells.size();
           }
@@ -3303,42 +3361,36 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
       }
 
       // We've now grabbed as many mutations off the list as we can
-
-      // STEP 2. Update any LATEST_TIMESTAMP timestamps
-      // We should record the timestamp only after we have acquired the 
rowLock,
-      // otherwise, newer puts/deletes are not guaranteed to have a newer 
timestamp
-      now = EnvironmentEdgeManager.currentTime();
-      byte[] byteNow = Bytes.toBytes(now);
-
       // Nothing to put/delete -- an exception in the above such as 
NoSuchColumnFamily?
       if (numReadyToWrite <= 0) {
         return;
       }
 
-      for (int i = firstIndex; !replay && i < lastIndexExclusive; i++) {
-        // skip invalid
-        if (batchOp.retCodeDetails[i].getOperationStatusCode()
-            != OperationStatusCode.NOT_RUN) {
-          // lastIndexExclusive was incremented above.
-          continue;
-        }
+      // STEP 2. Update any LATEST_TIMESTAMP timestamps
+      // We should record the timestamp only after we have acquired the 
rowLock,
+      // otherwise, newer puts/deletes are not guaranteed to have a newer 
timestamp
+      long now = EnvironmentEdgeManager.currentTime();
+      if (!replay) {
+        byte[] byteNow = Bytes.toBytes(now);
+        for (int i = firstIndex; i < lastIndexExclusive; i++) {
+          // skip invalid
+          if (batchOp.retCodeDetails[i].getOperationStatusCode() != 
OperationStatusCode.NOT_RUN) {
+            // lastIndexExclusive was incremented above.
+            continue;
+          }
 
-        Mutation mutation = batchOp.getMutation(i);
-        if (mutation instanceof Put) {
-          updateCellTimestamps(familyMaps[i].values(), byteNow);
-          noOfPuts++;
-        } else {
-          prepareDeleteTimestamps(mutation, familyMaps[i], byteNow);
-          noOfDeletes++;
-        }
-        rewriteCellTags(familyMaps[i], mutation);
-        WALEdit fromCP = batchOp.walEditsFromCoprocessors[i];
-        if (fromCP != null) {
-          cellCount += fromCP.size();
-        }
-        if (getEffectiveDurability(mutation.getDurability()) != 
Durability.SKIP_WAL) {
-          for (List<Cell> cells : familyMaps[i].values()) {
-            cellCount += cells.size();
+          Mutation mutation = batchOp.getMutation(i);
+          if (mutation instanceof Put) {
+            updateCellTimestamps(batchOp.familyCellMaps[i].values(), byteNow);
+            noOfPuts++;
+          } else {
+            prepareDeleteTimestamps(mutation, batchOp.familyCellMaps[i], 
byteNow);
+            noOfDeletes++;
+          }
+          rewriteCellTags(batchOp.familyCellMaps[i], mutation);
+          WALEdit fromCP = batchOp.walEditsFromCoprocessors[i];
+          if (fromCP != null) {
+            cellCount += fromCP.size();
           }
         }
       }
@@ -3351,6 +3403,7 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
           new MiniBatchOperationInProgress<>(batchOp.getMutationsForCoprocs(),
           batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, 
firstIndex, lastIndexExclusive);
         if (coprocessorHost.preBatchMutate(miniBatchOp)) {
+          doneByCoprocessor = true;
           return;
         } else {
           for (int i = firstIndex; i < lastIndexExclusive; i++) {
@@ -3368,15 +3421,16 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
             // Else Coprocessor added more Mutations corresponding to the 
Mutation at this index.
             for (int j = 0; j < cpMutations.length; j++) {
               Mutation cpMutation = cpMutations[j];
-              Map<byte[], List<Cell>> cpFamilyMap = 
cpMutation.getFamilyCellMap();
-              checkAndPrepareMutation(cpMutation, replay, cpFamilyMap, now);
+              checkAndPrepareMutation(cpMutation, replay, now);
 
               // Acquire row locks. If not, the whole batch will fail.
               acquiredRowLocks.add(getRowLockInternal(cpMutation.getRow(), 
true));
 
               // Returned mutations from coprocessor correspond to the 
Mutation at index i. We can
               // directly add the cells from those mutations to the familyMaps 
of this mutation.
-              mergeFamilyMaps(familyMaps[i], cpFamilyMap); // will get added 
to the memstore later
+              Map<byte[], List<Cell>> cpFamilyMap = 
cpMutation.getFamilyCellMap();
+              // will get added to the memStore later
+              mergeFamilyMaps(batchOp.familyCellMaps[i], cpFamilyMap);
 
               // The durability of returned mutation is replaced by the 
corresponding mutation.
               // If the corresponding mutation contains the SKIP_WAL, we 
shouldn't count the
@@ -3393,7 +3447,6 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
 
       // STEP 3. Build WAL edit
       walEdit = new WALEdit(cellCount, replay);
-      Durability durability = Durability.USE_DEFAULT;
       for (int i = firstIndex; i < lastIndexExclusive; i++) {
         // Skip puts that were determined to be invalid during preprocessing
         if (batchOp.retCodeDetails[i].getOperationStatusCode() != 
OperationStatusCode.NOT_RUN) {
@@ -3401,12 +3454,8 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
         }
 
         Mutation m = batchOp.getMutation(i);
-        Durability tmpDur = getEffectiveDurability(m.getDurability());
-        if (tmpDur.ordinal() > durability.ordinal()) {
-          durability = tmpDur;
-        }
         // we use durability of the original mutation for the mutation passed 
by CP.
-        if (tmpDur == Durability.SKIP_WAL) {
+        if (getEffectiveDurability(m.getDurability()) == Durability.SKIP_WAL) {
           recordMutationWithoutWal(m.getFamilyCellMap());
           continue;
         }
@@ -3431,58 +3480,21 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
             walEdit.add(cell);
           }
         }
-        addFamilyMapToWALEdit(familyMaps[i], walEdit);
+        addFamilyMapToWALEdit(batchOp.familyCellMaps[i], walEdit);
       }
 
       // STEP 4. Append the final edit to WAL and sync.
       Mutation mutation = batchOp.getMutation(firstIndex);
-      WALKey walKey = null;
-      long txid;
-      if (replay) {
-        // use wal key from the original
-        walKey = new WALKey(this.getRegionInfo().getEncodedNameAsBytes(),
-          this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
-          mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc);
-        walKey.setOrigLogSeqNum(batchOp.getReplaySequenceId());
-        if (!walEdit.isEmpty()) {
-          txid = this.wal.append(this.getRegionInfo(), walKey, walEdit, true);
-          if (txid != 0) {
-            sync(txid, durability);
-          }
-        }
-      } else {
-        try {
-          if (!walEdit.isEmpty()) {
-            // we use HLogKey here instead of WALKey directly to support 
legacy coprocessors.
-            walKey = new WALKey(this.getRegionInfo().getEncodedNameAsBytes(),
-                this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, 
now,
-                mutation.getClusterIds(), currentNonceGroup, currentNonce, 
mvcc,
-                this.getReplicationScope());
-            // TODO: Use the doAppend methods below... complicated by the 
replay stuff above.
-            txid = this.wal.append(this.getRegionInfo(), walKey, walEdit, 
true);
-            if (txid != 0) {
-              sync(txid, durability);
-            }
-            if (writeEntry == null) {
-              // if MVCC not preassigned, wait here until assigned
-              writeEntry = walKey.getWriteEntry();
-            }
-          }
-        } catch (IOException ioe) {
-          if (walKey != null && writeEntry == null) {
-            // the writeEntry is not preassigned and error occurred during 
append or sync
-            mvcc.complete(walKey.getWriteEntry());
-          }
-          throw ioe;
-        }
-      }
-      if (walKey == null) {
-        // If no walKey, then not in replay and skipping WAL or some such. 
Begin an MVCC transaction
-        // to get sequence id.
+      writeEntry = doWALAppend(walEdit, batchOp.durability, 
mutation.getClusterIds(), now,
+          currentNonceGroup, currentNonce,
+          replay ? batchOp.getReplaySequenceId() : WALKey.NO_SEQUENCE_ID);
+      if (!replay && writeEntry == null) {
+        // If no writeEntry, then not in replay and skipping WAL or some such. 
Begin an MVCC
+        // transaction to get sequence id.
         writeEntry = mvcc.begin();
       }
 
-      // STEP 5. Write back to memstore
+      // STEP 5. Write back to memStore
       for (int i = firstIndex; i < lastIndexExclusive; i++) {
         if (batchOp.retCodeDetails[i].getOperationStatusCode() != 
OperationStatusCode.NOT_RUN) {
           continue;
@@ -3493,14 +3505,14 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
         // we use durability of the original mutation for the mutation passed 
by CP.
         boolean updateSeqId = replay || batchOp.getMutation(i).getDurability() 
== Durability.SKIP_WAL;
         if (updateSeqId) {
-          this.updateSequenceId(familyMaps[i].values(),
+          this.updateSequenceId(batchOp.familyCellMaps[i].values(),
             replay? batchOp.getReplaySequenceId(): 
writeEntry.getWriteNumber());
         }
-        applyFamilyMapToMemStore(familyMaps[i], memstoreSize);
+        applyFamilyMapToMemStore(batchOp.familyCellMaps[i], memStoreSize);
       }
 
       // update memstore size
-      this.addAndGetMemStoreSize(memstoreSize);
+      this.addAndGetMemStoreSize(memStoreSize);
 
       // calling the post CP hook for batch mutation
       if (!replay && coprocessorHost != null) {
@@ -3511,30 +3523,33 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
       }
 
       // STEP 6. Complete mvcc.
-      if (replay) {
-        this.mvcc.advanceTo(batchOp.getReplaySequenceId());
-      } else {
-        // writeEntry won't be empty if not in replay mode
+      if (writeEntry != null) {
         mvcc.completeAndWait(writeEntry);
         writeEntry = null;
       }
+      if (replay) {
+        this.mvcc.advanceTo(batchOp.getReplaySequenceId());
+      }
+
+      success = true;
+    } finally {
+      // Call complete rather than completeAndWait because we probably had 
error if walKey != null
+      if (writeEntry != null) mvcc.complete(writeEntry);
 
-      // STEP 7. Release row locks, etc.
       if (locked) {
         this.updatesLock.readLock().unlock();
-        locked = false;
       }
       releaseRowLocks(acquiredRowLocks);
 
-      for (int i = firstIndex; i < lastIndexExclusive; i ++) {
+      for (int i = firstIndex; i < lastIndexExclusive; i++) {
         if (batchOp.retCodeDetails[i] == OperationStatus.NOT_RUN) {
-          batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
+          batchOp.retCodeDetails[i] =
+              success || doneByCoprocessor ? OperationStatus.SUCCESS : 
OperationStatus.FAILURE;
         }
       }
 
-      // STEP 8. Run coprocessor post hooks. This should be done after the wal 
is
       // synced so that the coprocessor contract is adhered to.
-      if (!replay && coprocessorHost != null) {
+      if (!replay && coprocessorHost != null && !doneByCoprocessor) {
         for (int i = firstIndex; i < lastIndexExclusive; i++) {
           // only for successful puts
           if (batchOp.retCodeDetails[i].getOperationStatusCode()
@@ -3550,15 +3565,6 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
         }
       }
 
-      success = true;
-    } finally {
-      // Call complete rather than completeAndWait because we probably had 
error if walKey != null
-      if (writeEntry != null) mvcc.complete(writeEntry);
-      if (locked) {
-        this.updatesLock.readLock().unlock();
-      }
-      releaseRowLocks(acquiredRowLocks);
-
       // See if the column families were consistent through the whole thing.
       // if they were then keep them. If they were not then pass a null.
       // null will be treated as unknown.
@@ -3577,13 +3583,7 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
           this.metricsRegion.updateDelete();
         }
       }
-      if (!success) {
-        for (int i = firstIndex; i < lastIndexExclusive; i++) {
-          if (batchOp.retCodeDetails[i].getOperationStatusCode() == 
OperationStatusCode.NOT_RUN) {
-            batchOp.retCodeDetails[i] = OperationStatus.FAILURE;
-          }
-        }
-      }
+
       if (coprocessorHost != null && !batchOp.isInReplay()) {
         // call the coprocessor hook to do any finalization steps
         // after the put is done
@@ -3622,75 +3622,20 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
     this.mvcc.complete(walKey.getWriteEntry());
   }
 
-  private boolean checkBatchOp(BatchOperation<?> batchOp, final int 
lastIndexExclusive,
-      final Map<byte[], List<Cell>>[] familyMaps, final long now,
-      final ObservedExceptionsInBatch observedExceptions)
-  throws IOException {
-    boolean skip = false;
-    // Skip anything that "ran" already
-    if (batchOp.retCodeDetails[lastIndexExclusive].getOperationStatusCode()
-        != OperationStatusCode.NOT_RUN) {
-      return true;
-    }
-    Mutation mutation = batchOp.getMutation(lastIndexExclusive);
-    Map<byte[], List<Cell>> familyMap = mutation.getFamilyCellMap();
-    // store the family map reference to allow for mutations
-    familyMaps[lastIndexExclusive] = familyMap;
-
-    try {
-      checkAndPrepareMutation(mutation, batchOp.isInReplay(), familyMap, now);
-    } catch (NoSuchColumnFamilyException nscf) {
-      final String msg = "No such column family in batch mutation. ";
-      if (observedExceptions.hasSeenNoSuchFamily()) {
-        LOG.warn(msg + nscf.getMessage());
-      } else {
-        LOG.warn(msg, nscf);
-        observedExceptions.sawNoSuchFamily();
-      }
-      batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
-          OperationStatusCode.BAD_FAMILY, nscf.getMessage());
-      skip = true;
-    } catch (FailedSanityCheckException fsce) {
-      final String msg = "Batch Mutation did not pass sanity check. ";
-      if (observedExceptions.hasSeenFailedSanityCheck()) {
-        LOG.warn(msg + fsce.getMessage());
-      } else {
-        LOG.warn(msg, fsce);
-        observedExceptions.sawFailedSanityCheck();
-      }
-      batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
-          OperationStatusCode.SANITY_CHECK_FAILURE, fsce.getMessage());
-      skip = true;
-    } catch (WrongRegionException we) {
-      final String msg = "Batch mutation had a row that does not belong to 
this region. ";
-      if (observedExceptions.hasSeenWrongRegion()) {
-        LOG.warn(msg + we.getMessage());
-      } else {
-        LOG.warn(msg, we);
-        observedExceptions.sawWrongRegion();
-      }
-      batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
-          OperationStatusCode.SANITY_CHECK_FAILURE, we.getMessage());
-      skip = true;
-    }
-    return skip;
-  }
-
-  private void checkAndPrepareMutation(Mutation mutation, boolean replay,
-      final Map<byte[], List<Cell>> familyMap, final long now)
-          throws IOException {
+  private void checkAndPrepareMutation(Mutation mutation, boolean replay, 
final long now)
+      throws IOException {
+    checkRow(mutation.getRow(), "doMiniBatchMutation");
     if (mutation instanceof Put) {
       // Check the families in the put. If bad, skip this one.
       if (replay) {
-        removeNonExistentColumnFamilyForReplay(familyMap);
+        removeNonExistentColumnFamilyForReplay(mutation.getFamilyCellMap());
       } else {
-        checkFamilies(familyMap.keySet());
+        checkFamilies(mutation.getFamilyCellMap().keySet());
       }
       checkTimestamps(mutation.getFamilyCellMap(), now);
     } else {
       prepareDelete((Delete)mutation);
     }
-    checkRow(mutation.getRow(), "doMiniBatchMutation");
   }
 
   /**
@@ -3721,7 +3666,7 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
    * the table descriptor.
    */
   protected Durability getEffectiveDurability(Durability d) {
-    return d == Durability.USE_DEFAULT ? this.durability : d;
+    return d == Durability.USE_DEFAULT ? this.regionDurability : d;
   }
 
   @Override
@@ -7430,28 +7375,42 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
       nonceGroup, nonce);
   }
 
+  private WriteEntry doWALAppend(WALEdit walEdit, Durability durability, 
List<UUID> clusterIds,
+      long now, long nonceGroup, long nonce) throws IOException {
+    return doWALAppend(walEdit, durability, clusterIds, now, nonceGroup, nonce,
+        WALKey.NO_SEQUENCE_ID);
+  }
+
   /**
    * @return writeEntry associated with this append
    */
   private WriteEntry doWALAppend(WALEdit walEdit, Durability durability, 
List<UUID> clusterIds,
-      long now, long nonceGroup, long nonce)
-  throws IOException {
+      long now, long nonceGroup, long nonce, long replaySeqId) throws 
IOException {
+    Preconditions.checkArgument(!walEdit.isReplay() || replaySeqId != 
WALKey.NO_SEQUENCE_ID,
+        "Invalid replay sequence Id for replay WALEdit!");
     WriteEntry writeEntry = null;
-    // Using default cluster id, as this can only happen in the originating 
cluster.
-    // A slave cluster receives the final value (not the delta) as a Put. We 
use HLogKey
-    // here instead of WALKey directly to support legacy coprocessors.
-    WALKey walKey = new WALKey(this.getRegionInfo().getEncodedNameAsBytes(),
-      this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, 
clusterIds,
-      nonceGroup, nonce, mvcc, this.getReplicationScope());
-    try {
-      long txid =
-        this.wal.append(this.getRegionInfo(), walKey, walEdit, true);
-      // Call sync on our edit.
-      if (txid != 0) sync(txid, durability);
-      writeEntry = walKey.getWriteEntry();
-    } catch (IOException ioe) {
-      if (walKey != null) mvcc.complete(walKey.getWriteEntry());
-      throw ioe;
+    if (!walEdit.isEmpty()) {
+      // Using default cluster id, as this can only happen in the originating 
cluster.
+      // A slave cluster receives the final value (not the delta) as a Put. We 
use HLogKey
+      // here instead of WALKey directly to support legacy coprocessors.
+      WALKey walKey = walEdit.isReplay() ? new 
WALKey(this.getRegionInfo().getEncodedNameAsBytes(),
+          this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, 
clusterIds, nonceGroup,
+          nonce, mvcc) :
+          new WALKey(this.getRegionInfo().getEncodedNameAsBytes(),
+          this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, 
clusterIds,
+          nonceGroup, nonce, mvcc, this.getReplicationScope());
+      if (walEdit.isReplay()) {
+        walKey.setOrigLogSeqNum(replaySeqId);
+      }
+      try {
+        long txid = this.wal.append(this.getRegionInfo(), walKey, walEdit, 
true);
+        // Call sync on our edit.
+        if (txid != 0) sync(txid, durability);
+        writeEntry = walKey.getWriteEntry();
+      } catch (IOException ioe) {
+        if (walKey != null) mvcc.complete(walKey.getWriteEntry());
+        throw ioe;
+      }
     }
     return writeEntry;
   }
@@ -7846,13 +7805,6 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
   }
 
   /**
-   * Give the region a chance to prepare before it is split.
-   */
-  protected void prepareToSplit() {
-    // nothing
-  }
-
-  /**
    * Return the splitpoint. null indicates the region isn't splittable
    * If the splitpoint isn't explicitly specified, it will go over the stores
    * to find the best splitpoint. Currently the criteria of best splitpoint
@@ -8115,7 +8067,7 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
    * Check whether we should sync the wal from the table's durability settings
    */
   private boolean shouldSyncWAL() {
-    return durability.ordinal() >  Durability.ASYNC_WAL.ordinal();
+    return regionDurability.ordinal() >  Durability.ASYNC_WAL.ordinal();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/e1941aa6/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
index 7cb28f9..f1d9475 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
@@ -310,6 +310,27 @@ public class TestHRegionReplayEvents {
   }
 
   @Test
+  public void testBatchReplayWithMultipleNonces() throws IOException {
+    try {
+      MutationReplay[] mutations = new MutationReplay[100];
+      for (int i = 0; i < 100; i++) {
+        Put put = new Put(Bytes.toBytes(i));
+        put.setDurability(Durability.SYNC_WAL);
+        for (byte[] familly : this.families) {
+          put.addColumn(familly, this.cq, null);
+          long nonceNum = i / 10;
+          mutations[i] = new MutationReplay(MutationType.PUT, put, nonceNum, 
nonceNum);
+        }
+      }
+      primaryRegion.batchReplay(mutations, 20);
+    } catch (Exception e) {
+      String msg = "Error while replay of batch with multiple nonces. ";
+      LOG.error(msg, e);
+      fail(msg + e.getMessage());
+    }
+  }
+
+  @Test
   public void testReplayFlushesAndCompactions() throws IOException {
     // initiate a secondary region with some data.
 

Reply via email to