GEODE-2398: Updates from review

https://reviews.apache.org/r/56506/


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/fb14e9aa
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/fb14e9aa
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/fb14e9aa

Branch: refs/heads/feature/GEODE-2402
Commit: fb14e9aab263654ed0176dcc3c9738be1b208a82
Parents: 9b0f165
Author: Ken Howe <kh...@pivotal.io>
Authored: Fri Feb 10 16:08:09 2017 -0800
Committer: Ken Howe <kh...@pivotal.io>
Committed: Mon Feb 13 13:50:07 2017 -0800

----------------------------------------------------------------------
 .../org/apache/geode/internal/cache/Oplog.java  | 49 +++--------
 .../geode/internal/cache/OplogFlushTest.java    | 89 --------------------
 2 files changed, 13 insertions(+), 125 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/fb14e9aa/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java
index 4e426a0..270c833 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java
@@ -5189,11 +5189,6 @@ public final class Oplog implements CompactableOplog, 
Flushable {
   private static final int MAX_CHANNEL_RETRIES = 5;
 
   private final void flush(OplogFile olf, boolean doSync) throws IOException {
-    int flushed;
-    int channelBytesWritten;
-    int numChannelRetries = 0;
-    int bbStartPos;
-    long channelStartPos;
     try {
       synchronized (this.lock/* olf */) {
         if (olf.RAFClosed) {
@@ -5202,15 +5197,15 @@ public final class Oplog implements CompactableOplog, 
Flushable {
         ByteBuffer bb = olf.writeBuf;
         if (bb != null && bb.position() != 0) {
           bb.flip();
-          flushed = 0;
+          int flushed = 0;
+          int numChannelRetries = 0;
           do {
-            channelBytesWritten = 0;
-            bbStartPos = bb.position();
-            channelStartPos = olf.channel.position();
+            int channelBytesWritten = 0;
+            final int bbStartPos = bb.position();
+            final long channelStartPos = olf.channel.position();
             // differentiate between bytes written on this channel.write() 
iteration and the
             // total number of bytes written to the channel on this call
             channelBytesWritten = olf.channel.write(bb);
-            flushed += channelBytesWritten;
             // Expect channelBytesWritten and the changes in pp.position() and 
channel.position() to
             // be the same. If they are not, then the channel.write() silently 
failed. The following
             // retry separates spurious failures from permanent channel 
failures.
@@ -5218,11 +5213,16 @@ public final class Oplog implements CompactableOplog, 
Flushable {
               if (numChannelRetries++ < MAX_CHANNEL_RETRIES) {
                 // Reset the ByteBuffer position, but take into account 
anything that did get
                 // written to the channel
-                bb.position(bbStartPos + (int) (olf.channel.position() - 
channelStartPos));
+                channelBytesWritten = (int) (olf.channel.position() - 
channelStartPos);
+                bb.position(bbStartPos + channelBytesWritten);
               } else {
-                throw new IOException("Failed to write Oplog entry to" + 
olf.f.getName());
+                throw new IOException("Failed to write Oplog entry to" + 
olf.f.getName() + ": "
+                    + "channel.write() returned " + channelBytesWritten + ", "
+                    + "change in channel position = " + 
(olf.channel.position() - channelStartPos)
+                    + ", " + "change in source buffer position = " + 
(bb.position() - bbStartPos));
               }
             }
+            flushed += channelBytesWritten;
           } while (bb.hasRemaining());
           // update bytesFlushed after entire writeBuffer is flushed to fix bug
           // 41201
@@ -5247,11 +5247,6 @@ public final class Oplog implements CompactableOplog, 
Flushable {
 
   private final void flush(OplogFile olf, ByteBuffer b1, ByteBuffer b2) throws 
IOException {
     try {
-      long channelStartPos;
-      long expectedWritten;
-      long flushed;
-      int numChannelRetries = 0;
-      boolean retryWrite = false;
       synchronized (this.lock/* olf */) {
         if (olf.RAFClosed) {
           return;
@@ -5259,25 +5254,7 @@ public final class Oplog implements CompactableOplog, 
Flushable {
         this.bbArray[0] = b1;
         this.bbArray[1] = b2;
         b1.flip();
-        int b1StartPos = b1.position();
-        int b2StartPos = b2.position();
-        expectedWritten = b1.limit() - b1StartPos + b2.limit() - b2StartPos;
-        channelStartPos = olf.channel.position();
-
-        do {
-          retryWrite = false;
-          flushed = olf.channel.write(this.bbArray);
-          if (flushed != expectedWritten) {
-            if (numChannelRetries++ < MAX_CHANNEL_RETRIES) {
-              retryWrite = true;
-              olf.channel.position(channelStartPos);
-              b1.position(b1StartPos);
-              b2.position(b2StartPos);
-            } else {
-              throw new IOException("Failed to write Oplog entry to" + 
olf.f.getName());
-            }
-          }
-        } while (retryWrite);
+        long flushed = olf.channel.write(this.bbArray);
         this.bbArray[0] = null;
         this.bbArray[1] = null;
         // update bytesFlushed after entire writeBuffer is flushed to fix bug 
41201

http://git-wip-us.apache.org/repos/asf/geode/blob/fb14e9aa/geode-core/src/test/java/org/apache/geode/internal/cache/OplogFlushTest.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/OplogFlushTest.java 
b/geode-core/src/test/java/org/apache/geode/internal/cache/OplogFlushTest.java
index 1d484e4..d24e66d 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/OplogFlushTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/OplogFlushTest.java
@@ -166,93 +166,4 @@ public class OplogFlushTest extends DiskRegionTestingBase {
 
     doChannelFlushWithFailures(oplogs, 6 /* exceeds the retry limit in Oplog 
*/);
   }
-
-  class FakeChannelWriteBBArray implements Answer<Long> {
-
-    @Override
-    public Long answer(InvocationOnMock invocation) throws Throwable {
-      bbArray = ol.bbArray;
-      return fakeWriteBBArray(ol, bbArray);
-    }
-  }
-
-  private long fakeWriteBBArray(Oplog ol, ByteBuffer[] bbA) throws IOException 
{
-    if (nFakeChannelWrites > 0) {
-      for (int i = 0; i < bbA.length; ++i) {
-        bbA[i].position(bbA[i].limit());
-      }
-      --nFakeChannelWrites;
-      return 0;
-    }
-    doCallRealMethod().when(spyCh).write(bbA);
-    return spyCh.write(bbA);
-  }
-
-  private void doChannelBBArrayFlushWithFailures(Oplog[] oplogs, int 
numFailures)
-      throws IOException {
-    nFakeChannelWrites = numFailures;
-    ol = oplogs[0];
-    ch = ol.getFileChannel();
-    spyCh = spy(ch);
-    ol.testSetCrfChannel(spyCh);
-
-    byte[] entry1 = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15};
-    byte[] entry2 = {16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 
30, 31};
-    byte[] entry3 = {32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 
46, 47};
-
-    bb1 = ByteBuffer.allocateDirect(entry1.length);
-    bb2 = ByteBuffer.allocateDirect(entry2.length);
-    ByteBuffer[] bbArray = ol.bbArray;
-    try {
-      // Force channel.write() failures when writing the first entry
-      doAnswer(new FakeChannelWriteBBArray()).when(spyCh).write(bbArray);
-      long chStartPos = ol.getFileChannel().position();
-      bb1.clear();
-      bb1.put(entry1);
-      bb2.clear();
-      bb2.put(entry2);
-      bb2.flip();
-      ol.flush(bb1, bb2);
-
-      // Write the 2nd entry without forced channel failures
-      nFakeChannelWrites = 0;
-      bb1.clear();
-      bb1.put(entry2);
-      bb1 = ol.getWriteBuf();
-      ol.flushAll(true);
-      long chEndPos = ol.getFileChannel().position();
-      assertEquals("Change in channel position does not equal the size of the 
data flushed",
-          entry1.length + entry2.length, chEndPos - chStartPos);
-      ByteBuffer dst = ByteBuffer.allocateDirect(entry1.length);
-      ol.getFileChannel().position(chStartPos);
-      ol.getFileChannel().read(dst);
-      verifyBB(dst, entry1);
-    } finally {
-      region.destroyRegion();
-    }
-  }
-
-  @Test
-  public void testChannelRecoversFromWriteFailureOfByteBufferArray() throws 
Exception {
-    region = DiskRegionHelperFactory.getSyncOverFlowAndPersistRegion(cache, 
null);
-    DiskRegion dr = ((LocalRegion) region).getDiskRegion();
-    Oplog[] oplogs = dr.getDiskStore().persistentOplogs.getAllOplogs();
-    assertNotNull("Unexpected null Oplog[] for " + dr.getName(), oplogs);
-    assertNotNull("Unexpected null Oplog", oplogs[0]);
-
-    doChannelBBArrayFlushWithFailures(oplogs, 1 /* write failures */);
-  }
-
-  @Test
-  public void 
testOplogFlushOfByteBufferArrayThrowsIOExceptionWhenNumberOfChannelWriteRetriesExceedsLimit()
-      throws Exception {
-    expectedException.expect(instanceOf(IOException.class));
-    region = DiskRegionHelperFactory.getSyncOverFlowAndPersistRegion(cache, 
null);
-    DiskRegion dr = ((LocalRegion) region).getDiskRegion();
-    Oplog[] oplogs = dr.getDiskStore().persistentOplogs.getAllOplogs();
-    assertNotNull("Unexpected null Oplog[] for " + dr.getName(), oplogs);
-    assertNotNull("Unexpected null Oplog", oplogs[0]);
-
-    doChannelBBArrayFlushWithFailures(oplogs, 6 /* exceeds the retry limit in 
Oplog */);
-  }
 }

Reply via email to