GEODE-2398: Retry oplog channel.write on silent failures

Implemented limited retries in two forms of Oplog.flush() when channel.write() 
is called.
If write() returns bytes witten less than the change in the ByteBuffer 
positions, then reset
buffer positions and re-try writing for a liomited number of times. Throws
IOException if the write doesn't succeeded after a few retries (max
number of retries is defined by a static).

Added new unit tests.


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/9b0f1657
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/9b0f1657
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/9b0f1657

Branch: refs/heads/feature/GEODE-2449
Commit: 9b0f16570aad4abc82b71d0d16167a9774449d41
Parents: 5d98a8c
Author: Ken Howe <kh...@pivotal.io>
Authored: Wed Feb 1 09:16:40 2017 -0800
Committer: Ken Howe <kh...@pivotal.io>
Committed: Mon Feb 13 13:50:07 2017 -0800

----------------------------------------------------------------------
 .../org/apache/geode/internal/cache/Oplog.java  |  66 ++++-
 .../geode/internal/cache/OplogFlushTest.java    | 258 +++++++++++++++++++
 2 files changed, 321 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/9b0f1657/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java
index 0b98364..4e426a0 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java
@@ -5186,7 +5186,14 @@ public final class Oplog implements CompactableOplog, 
Flushable {
     // flush(olf, true);
   }
 
+  private static final int MAX_CHANNEL_RETRIES = 5;
+
   private final void flush(OplogFile olf, boolean doSync) throws IOException {
+    int flushed;
+    int channelBytesWritten;
+    int numChannelRetries = 0;
+    int bbStartPos;
+    long channelStartPos;
     try {
       synchronized (this.lock/* olf */) {
         if (olf.RAFClosed) {
@@ -5195,9 +5202,27 @@ public final class Oplog implements CompactableOplog, 
Flushable {
         ByteBuffer bb = olf.writeBuf;
         if (bb != null && bb.position() != 0) {
           bb.flip();
-          int flushed = 0;
+          flushed = 0;
           do {
-            flushed += olf.channel.write(bb);
+            channelBytesWritten = 0;
+            bbStartPos = bb.position();
+            channelStartPos = olf.channel.position();
+            // differentiate between bytes written on this channel.write() 
iteration and the
+            // total number of bytes written to the channel on this call
+            channelBytesWritten = olf.channel.write(bb);
+            flushed += channelBytesWritten;
+            // Expect channelBytesWritten and the changes in pp.position() and 
channel.position() to
+            // be the same. If they are not, then the channel.write() silently 
failed. The following
+            // retry separates spurious failures from permanent channel 
failures.
+            if (channelBytesWritten != bb.position() - bbStartPos) {
+              if (numChannelRetries++ < MAX_CHANNEL_RETRIES) {
+                // Reset the ByteBuffer position, but take into account 
anything that did get
+                // written to the channel
+                bb.position(bbStartPos + (int) (olf.channel.position() - 
channelStartPos));
+              } else {
+                throw new IOException("Failed to write Oplog entry to" + 
olf.f.getName());
+              }
+            }
           } while (bb.hasRemaining());
           // update bytesFlushed after entire writeBuffer is flushed to fix bug
           // 41201
@@ -5222,6 +5247,11 @@ public final class Oplog implements CompactableOplog, 
Flushable {
 
   private final void flush(OplogFile olf, ByteBuffer b1, ByteBuffer b2) throws 
IOException {
     try {
+      long channelStartPos;
+      long expectedWritten;
+      long flushed;
+      int numChannelRetries = 0;
+      boolean retryWrite = false;
       synchronized (this.lock/* olf */) {
         if (olf.RAFClosed) {
           return;
@@ -5229,7 +5259,25 @@ public final class Oplog implements CompactableOplog, 
Flushable {
         this.bbArray[0] = b1;
         this.bbArray[1] = b2;
         b1.flip();
-        long flushed = olf.channel.write(this.bbArray);
+        int b1StartPos = b1.position();
+        int b2StartPos = b2.position();
+        expectedWritten = b1.limit() - b1StartPos + b2.limit() - b2StartPos;
+        channelStartPos = olf.channel.position();
+
+        do {
+          retryWrite = false;
+          flushed = olf.channel.write(this.bbArray);
+          if (flushed != expectedWritten) {
+            if (numChannelRetries++ < MAX_CHANNEL_RETRIES) {
+              retryWrite = true;
+              olf.channel.position(channelStartPos);
+              b1.position(b1StartPos);
+              b2.position(b2StartPos);
+            } else {
+              throw new IOException("Failed to write Oplog entry to" + 
olf.f.getName());
+            }
+          }
+        } while (retryWrite);
         this.bbArray[0] = null;
         this.bbArray[1] = null;
         // update bytesFlushed after entire writeBuffer is flushed to fix bug 
41201
@@ -6382,6 +6430,18 @@ public final class Oplog implements CompactableOplog, 
Flushable {
     return "oplog#" + getOplogId() /* + "DEBUG" + 
System.identityHashCode(this) */;
   }
 
+  /**
+   * Method to be used only for testing
+   * 
+   * @param ch Object to replace the channel in the Oplog.crf
+   * @return original channel object
+   */
+  UninterruptibleFileChannel testSetCrfChannel(UninterruptibleFileChannel ch) {
+    UninterruptibleFileChannel chPrev = this.crf.channel;
+    this.crf.channel = ch;
+    return chPrev;
+  }
+
   // //////// Methods used during recovery //////////////
 
   // ////////////////////Inner Classes //////////////////////

http://git-wip-us.apache.org/repos/asf/geode/blob/9b0f1657/geode-core/src/test/java/org/apache/geode/internal/cache/OplogFlushTest.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/OplogFlushTest.java 
b/geode-core/src/test/java/org/apache/geode/internal/cache/OplogFlushTest.java
new file mode 100644
index 0000000..1d484e4
--- /dev/null
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/OplogFlushTest.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.cache;
+
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.spy;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TestName;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import org.apache.geode.cache.DiskAccessException;
+import org.apache.geode.internal.cache.persistence.UninterruptibleFileChannel;
+import org.apache.geode.test.junit.categories.IntegrationTest;
+
+/**
+ * Testing recovery from failures writing Oplog entries
+ */
+@Category(IntegrationTest.class)
+public class OplogFlushTest extends DiskRegionTestingBase {
+
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  // How many times to fake the write failures
+  private int nFakeChannelWrites = 0;
+  private Oplog ol = null;
+  private ByteBuffer bb1 = null;
+  private ByteBuffer bb2 = null;
+  private ByteBuffer[] bbArray = new ByteBuffer[2];
+  private UninterruptibleFileChannel ch;
+  private UninterruptibleFileChannel spyCh;
+
+  @Rule
+  public TestName name = new TestName();
+
+  class FakeChannelWriteBB implements Answer<Integer> {
+
+    @Override
+    public Integer answer(InvocationOnMock invocation) throws Throwable {
+      return fakeWriteBB(ol, bb1);
+    }
+  }
+
+  private int fakeWriteBB(Oplog ol, ByteBuffer bb) throws IOException {
+    if (nFakeChannelWrites > 0) {
+      bb.position(bb.limit());
+      --nFakeChannelWrites;
+      return 0;
+    }
+    doCallRealMethod().when(spyCh).write(bb);
+    return spyCh.write(bb);
+  }
+
+  private void verifyBB(ByteBuffer bb, byte[] src) {
+    bb.flip();
+    for (int i = 0; i < src.length; ++i) {
+      assertEquals("Channel contents does not match expected at index " + i, 
src[i], bb.get());
+    }
+  }
+
+  private void doChannelFlushWithFailures(Oplog[] oplogs, int numFailures) 
throws IOException {
+    nFakeChannelWrites = numFailures;
+    ol = oplogs[0];
+    ch = ol.getFileChannel();
+    spyCh = spy(ch);
+    ol.testSetCrfChannel(spyCh);
+
+    byte[] entry1 = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 
17, 18, 19};
+    byte[] entry2 = {100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 
111, 112, 113, 114, 115,
+        116, 117, 118, 119};
+
+    bb1 = ol.getWriteBuf();
+    try {
+      // Force channel.write() failures when writing the first entry
+      doAnswer(new FakeChannelWriteBB()).when(spyCh).write(bb1);
+      long chStartPos = ol.getFileChannel().position();
+      bb1.clear();
+      bb1.put(entry1);
+      ol.flushAll(true);
+
+      // Write the 2nd entry without forced channel failures
+      nFakeChannelWrites = 0;
+      bb1 = ol.getWriteBuf();
+      bb1.clear();
+      bb1.put(entry2);
+      ol.flushAll(true);
+      long chEndPos = ol.getFileChannel().position();
+      assertEquals("Change in channel position does not equal the size of the 
data flushed",
+          entry1.length + entry2.length, chEndPos - chStartPos);
+      ByteBuffer dst = ByteBuffer.allocateDirect(entry1.length);
+      ol.getFileChannel().position(chStartPos);
+      ol.getFileChannel().read(dst);
+      verifyBB(dst, entry1);
+    } finally {
+      region.destroyRegion();
+    }
+  }
+
+  @Test
+  public void testAsyncChannelWriteRetriesOnFailureDuringFlush() throws 
Exception {
+    region = DiskRegionHelperFactory.getAsyncOverFlowAndPersistRegion(cache, 
null);
+    DiskRegion dr = ((LocalRegion) region).getDiskRegion();
+    Oplog[] oplogs = dr.getDiskStore().persistentOplogs.getAllOplogs();
+    assertNotNull("Unexpected null Oplog[] for " + dr.getName(), oplogs);
+    assertNotNull("Unexpected null Oplog", oplogs[0]);
+
+    doChannelFlushWithFailures(oplogs, 1 /* write failure */);
+  }
+
+  @Test
+  public void testChannelWriteRetriesOnFailureDuringFlush() throws Exception {
+    region = DiskRegionHelperFactory.getSyncOverFlowAndPersistRegion(cache, 
null);
+    DiskRegion dr = ((LocalRegion) region).getDiskRegion();
+    Oplog[] oplogs = dr.getDiskStore().persistentOplogs.getAllOplogs();
+    assertNotNull("Unexpected null Oplog[] for " + dr.getName(), oplogs);
+    assertNotNull("Unexpected null Oplog", oplogs[0]);
+
+    doChannelFlushWithFailures(oplogs, 1 /* write failure */);
+  }
+
+  @Test
+  public void testChannelRecoversFromWriteFailureRepeatedRetriesDuringFlush() 
throws Exception {
+    region = DiskRegionHelperFactory.getSyncOverFlowAndPersistRegion(cache, 
null);
+    DiskRegion dr = ((LocalRegion) region).getDiskRegion();
+    Oplog[] oplogs = dr.getDiskStore().persistentOplogs.getAllOplogs();
+    assertNotNull("Unexpected null Oplog[] for " + dr.getName(), oplogs);
+    assertNotNull("Unexpected null Oplog", oplogs[0]);
+
+    doChannelFlushWithFailures(oplogs, 3 /* write failures */);
+  }
+
+  @Test
+  public void 
testOplogFlushThrowsIOExceptioniWhenNumberOfChannelWriteRetriesExceedsLimit()
+      throws Exception {
+    expectedException.expect(DiskAccessException.class);
+    expectedException.expectCause(instanceOf(IOException.class));
+    region = DiskRegionHelperFactory.getSyncOverFlowAndPersistRegion(cache, 
null);
+    DiskRegion dr = ((LocalRegion) region).getDiskRegion();
+    Oplog[] oplogs = dr.getDiskStore().persistentOplogs.getAllOplogs();
+    assertNotNull("Unexpected null Oplog[] for " + dr.getName(), oplogs);
+    assertNotNull("Unexpected null Oplog", oplogs[0]);
+
+    doChannelFlushWithFailures(oplogs, 6 /* exceeds the retry limit in Oplog 
*/);
+  }
+
+  class FakeChannelWriteBBArray implements Answer<Long> {
+
+    @Override
+    public Long answer(InvocationOnMock invocation) throws Throwable {
+      bbArray = ol.bbArray;
+      return fakeWriteBBArray(ol, bbArray);
+    }
+  }
+
+  private long fakeWriteBBArray(Oplog ol, ByteBuffer[] bbA) throws IOException 
{
+    if (nFakeChannelWrites > 0) {
+      for (int i = 0; i < bbA.length; ++i) {
+        bbA[i].position(bbA[i].limit());
+      }
+      --nFakeChannelWrites;
+      return 0;
+    }
+    doCallRealMethod().when(spyCh).write(bbA);
+    return spyCh.write(bbA);
+  }
+
+  private void doChannelBBArrayFlushWithFailures(Oplog[] oplogs, int 
numFailures)
+      throws IOException {
+    nFakeChannelWrites = numFailures;
+    ol = oplogs[0];
+    ch = ol.getFileChannel();
+    spyCh = spy(ch);
+    ol.testSetCrfChannel(spyCh);
+
+    byte[] entry1 = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15};
+    byte[] entry2 = {16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 
30, 31};
+    byte[] entry3 = {32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 
46, 47};
+
+    bb1 = ByteBuffer.allocateDirect(entry1.length);
+    bb2 = ByteBuffer.allocateDirect(entry2.length);
+    ByteBuffer[] bbArray = ol.bbArray;
+    try {
+      // Force channel.write() failures when writing the first entry
+      doAnswer(new FakeChannelWriteBBArray()).when(spyCh).write(bbArray);
+      long chStartPos = ol.getFileChannel().position();
+      bb1.clear();
+      bb1.put(entry1);
+      bb2.clear();
+      bb2.put(entry2);
+      bb2.flip();
+      ol.flush(bb1, bb2);
+
+      // Write the 2nd entry without forced channel failures
+      nFakeChannelWrites = 0;
+      bb1.clear();
+      bb1.put(entry2);
+      bb1 = ol.getWriteBuf();
+      ol.flushAll(true);
+      long chEndPos = ol.getFileChannel().position();
+      assertEquals("Change in channel position does not equal the size of the 
data flushed",
+          entry1.length + entry2.length, chEndPos - chStartPos);
+      ByteBuffer dst = ByteBuffer.allocateDirect(entry1.length);
+      ol.getFileChannel().position(chStartPos);
+      ol.getFileChannel().read(dst);
+      verifyBB(dst, entry1);
+    } finally {
+      region.destroyRegion();
+    }
+  }
+
+  @Test
+  public void testChannelRecoversFromWriteFailureOfByteBufferArray() throws 
Exception {
+    region = DiskRegionHelperFactory.getSyncOverFlowAndPersistRegion(cache, 
null);
+    DiskRegion dr = ((LocalRegion) region).getDiskRegion();
+    Oplog[] oplogs = dr.getDiskStore().persistentOplogs.getAllOplogs();
+    assertNotNull("Unexpected null Oplog[] for " + dr.getName(), oplogs);
+    assertNotNull("Unexpected null Oplog", oplogs[0]);
+
+    doChannelBBArrayFlushWithFailures(oplogs, 1 /* write failures */);
+  }
+
+  @Test
+  public void 
testOplogFlushOfByteBufferArrayThrowsIOExceptionWhenNumberOfChannelWriteRetriesExceedsLimit()
+      throws Exception {
+    expectedException.expect(instanceOf(IOException.class));
+    region = DiskRegionHelperFactory.getSyncOverFlowAndPersistRegion(cache, 
null);
+    DiskRegion dr = ((LocalRegion) region).getDiskRegion();
+    Oplog[] oplogs = dr.getDiskStore().persistentOplogs.getAllOplogs();
+    assertNotNull("Unexpected null Oplog[] for " + dr.getName(), oplogs);
+    assertNotNull("Unexpected null Oplog", oplogs[0]);
+
+    doChannelBBArrayFlushWithFailures(oplogs, 6 /* exceeds the retry limit in 
Oplog */);
+  }
+}

Reply via email to