hbase git commit: HBASE-18137 Replication gets stuck for empty WALs

2017-06-10 Thread apurtell
Repository: hbase
Updated Branches:
  refs/heads/branch-1.2 96e48c3df -> c1289960d


HBASE-18137 Replication gets stuck for empty WALs

Signed-off-by: Andrew Purtell 

Conflicts:

hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c1289960
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c1289960
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c1289960

Branch: refs/heads/branch-1.2
Commit: c1289960dd7ebf94c615d00ef9c77c27737496e1
Parents: 96e48c3
Author: Vincent 
Authored: Wed Jun 7 14:48:45 2017 -0700
Committer: Andrew Purtell 
Committed: Sat Jun 10 12:47:18 2017 -0700

--
 .../regionserver/ReplicationSource.java | 16 ++--
 .../hbase/replication/TestReplicationBase.java  |  1 +
 .../replication/TestReplicationSmallTests.java  | 83 
 3 files changed, 94 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/hbase/blob/c1289960/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
--
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 2285a5e..a59c3c8 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -512,9 +512,9 @@ public class ReplicationSource extends Thread
   terminate("Couldn't get the position of this recovered queue " + 
peerClusterZnode, e);
 }
   }
+  int sleepMultiplier = 1;
   // Loop until we close down
   while (isWorkerActive()) {
-int sleepMultiplier = 1;
 // Sleep until replication is enabled again
 if (!isPeerEnabled()) {
   if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
@@ -591,7 +591,7 @@ public class ReplicationSource extends Thread
 
 if (considerDumping &&
 sleepMultiplier == maxRetriesMultiplier &&
-processEndOfFile()) {
+processEndOfFile(false)) {
   continue;
 }
   }
@@ -717,7 +717,7 @@ public class ReplicationSource extends Thread
   }
   // If we didn't get anything and the queue has an object, it means we
   // hit the end of the file for sure
-  return seenEntries == 0 && processEndOfFile();
+  return seenEntries == 0 && processEndOfFile(false);
 }
 
 /**
@@ -846,11 +846,12 @@ public class ReplicationSource extends Thread
   // which throws a NPE if we open a file before any data node has the 
most recent block
   // Just sleep and retry. Will require re-reading compressed WALs for 
compressionContext.
   LOG.warn("Got NPE opening reader, will retry.");
-} else if (sleepMultiplier >= maxRetriesMultiplier) {
+} else if (sleepMultiplier >= maxRetriesMultiplier
+&& conf.getBoolean("replication.source.eof.autorecovery", false)) {
   // TODO Need a better way to determine if a file is really gone but
   // TODO without scanning all logs dir
   LOG.warn("Waited too long for this file, considering dumping");
-  return !processEndOfFile();
+  return !processEndOfFile(true);
 }
   }
   return true;
@@ -990,7 +991,7 @@ public class ReplicationSource extends Thread
  */
 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = 
"DE_MIGHT_IGNORE",
 justification = "Yeah, this is how it works")
-protected boolean processEndOfFile() {
+protected boolean processEndOfFile(boolean dumpOnlyIfZeroLength) {
   // We presume this means the file we're reading is closed.
   if (this.queue.size() != 0) {
 // -1 means the wal wasn't closed cleanly.
@@ -1025,6 +1026,9 @@ public class ReplicationSource extends Thread
   LOG.trace("Reached the end of log " + this.currentPath + ", stats: " 
+ getStats()
   + ", and the length of the file is " + (stat == null ? "N/A" : 
stat.getLen()));
 }
+if (dumpOnlyIfZeroLength && stat.getLen() != 0) {
+  return false;
+}
 this.currentPath = null;
 this.repLogReader.finishCurrentFile();
 this.reader = null;

http://git-wip-us.apache.org/repos/asf/hbase/blob/c1289960/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java

[3/4] hbase git commit: HBASE-18137 Replication gets stuck for empty WALs

2017-06-10 Thread apurtell
HBASE-18137 Replication gets stuck for empty WALs

Signed-off-by: Andrew Purtell 


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6782dfca
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6782dfca
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6782dfca

Branch: refs/heads/branch-1.3
Commit: 6782dfca4f3a2f5e02cc60a7c04d8d5d95ebc36e
Parents: 6a216c7
Author: Vincent 
Authored: Wed Jun 7 14:48:45 2017 -0700
Committer: Andrew Purtell 
Committed: Sat Jun 10 12:26:12 2017 -0700

--
 .../regionserver/ReplicationSource.java | 16 ++--
 .../hbase/replication/TestReplicationBase.java  |  1 +
 .../replication/TestReplicationSmallTests.java  | 83 
 3 files changed, 94 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/hbase/blob/6782dfca/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
--
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 65f581a..2285292 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -542,9 +542,9 @@ public class ReplicationSource extends Thread
   terminate("Couldn't get the position of this recovered queue " + 
peerClusterZnode, e);
 }
   }
+  int sleepMultiplier = 1;
   // Loop until we close down
   while (isWorkerActive()) {
-int sleepMultiplier = 1;
 // Sleep until replication is enabled again
 if (!isPeerEnabled()) {
   if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
@@ -622,7 +622,7 @@ public class ReplicationSource extends Thread
 
 if (considerDumping &&
 sleepMultiplier == maxRetriesMultiplier &&
-processEndOfFile()) {
+processEndOfFile(false)) {
   continue;
 }
   }
@@ -749,7 +749,7 @@ public class ReplicationSource extends Thread
   }
   // If we didn't get anything and the queue has an object, it means we
   // hit the end of the file for sure
-  return seenEntries == 0 && processEndOfFile();
+  return seenEntries == 0 && processEndOfFile(false);
 }
 
 /**
@@ -930,11 +930,12 @@ public class ReplicationSource extends Thread
   // which throws a NPE if we open a file before any data node has the 
most recent block
   // Just sleep and retry. Will require re-reading compressed WALs for 
compressionContext.
   LOG.warn("Got NPE opening reader, will retry.");
-} else if (sleepMultiplier >= maxRetriesMultiplier) {
+} else if (sleepMultiplier >= maxRetriesMultiplier
+&& conf.getBoolean("replication.source.eof.autorecovery", false)) {
   // TODO Need a better way to determine if a file is really gone but
   // TODO without scanning all logs dir
   LOG.warn("Waited too long for this file, considering dumping");
-  return !processEndOfFile();
+  return !processEndOfFile(true);
 }
   }
   return true;
@@ -1100,7 +1101,7 @@ public class ReplicationSource extends Thread
  */
 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = 
"DE_MIGHT_IGNORE",
 justification = "Yeah, this is how it works")
-protected boolean processEndOfFile() {
+protected boolean processEndOfFile(boolean dumpOnlyIfZeroLength) {
   // We presume this means the file we're reading is closed.
   if (this.queue.size() != 0) {
 // -1 means the wal wasn't closed cleanly.
@@ -1135,6 +1136,9 @@ public class ReplicationSource extends Thread
   LOG.trace("Reached the end of log " + this.currentPath + ", stats: " 
+ getStats()
   + ", and the length of the file is " + (stat == null ? "N/A" : 
stat.getLen()));
 }
+if (dumpOnlyIfZeroLength && stat.getLen() != 0) {
+  return false;
+}
 this.currentPath = null;
 this.repLogReader.finishCurrentFile();
 this.reader = null;

http://git-wip-us.apache.org/repos/asf/hbase/blob/6782dfca/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
--
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
 

[4/4] hbase git commit: HBASE-18137 Replication gets stuck for empty WALs

2017-06-10 Thread apurtell
HBASE-18137 Replication gets stuck for empty WALs

Signed-off-by: Andrew Purtell 


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/385b7924
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/385b7924
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/385b7924

Branch: refs/heads/branch-2
Commit: 385b792446ea1b0c58b7365904d677ba48eec930
Parents: eca1ec3
Author: Vincent 
Authored: Fri Jun 9 18:47:14 2017 -0700
Committer: Andrew Purtell 
Committed: Sat Jun 10 12:45:40 2017 -0700

--
 .../ReplicationSourceShipperThread.java |  2 +-
 .../ReplicationSourceWALReaderThread.java   | 30 
 .../hbase/replication/TestReplicationBase.java  |  1 +
 .../replication/TestReplicationSmallTests.java  | 80 
 4 files changed, 112 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/hbase/blob/385b7924/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java
--
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java
index d1a8ac2..6807da2 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java
@@ -303,7 +303,7 @@ public class ReplicationSourceShipperThread extends Thread {
   }
 
   public Path getCurrentPath() {
-return this.currentPath;
+return this.entryReader.getCurrentPath();
   }
 
   public long getCurrentPosition() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/385b7924/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
--
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
index ad08866..c1af6e6 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hadoop.hbase.replication.regionserver;
 
+import java.io.EOFException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -189,6 +190,7 @@ public class ReplicationSourceWALReaderThread extends 
Thread {
   sleepMultiplier++;
 } else {
   LOG.error("Failed to read stream of replication entries", e);
+  handleEofException(e);
 }
 Threads.sleep(sleepForRetries * sleepMultiplier);
   } catch (InterruptedException e) {
@@ -198,6 +200,34 @@ public class ReplicationSourceWALReaderThread extends 
Thread {
 }
   }
 
+  // if we get an EOF due to a zero-length log, and there are other logs in 
queue
+  // (highly likely we've closed the current log), we've hit the max retries, 
and autorecovery is
+  // enabled, then dump the log
+  private void handleEofException(Exception e) {
+if (e.getCause() instanceof EOFException && logQueue.size() > 1
+&& conf.getBoolean("replication.source.eof.autorecovery", false)) {
+  try {
+if (fs.getFileStatus(logQueue.peek()).getLen() == 0) {
+  LOG.warn("Forcing removal of 0 length log in queue: " + 
logQueue.peek());
+  logQueue.remove();
+  currentPosition = 0;
+}
+  } catch (IOException ioe) {
+LOG.warn("Couldn't get file length information about log " + 
logQueue.peek());
+  }
+}
+  }
+
+  public Path getCurrentPath() {
+// if we've read some WAL entries, get the Path we read from
+WALEntryBatch batchQueueHead = entryBatchQueue.peek();
+if (batchQueueHead != null) {
+  return batchQueueHead.lastWalPath;
+}
+// otherwise, we must be currently reading from the head of the log queue
+return logQueue.peek();
+  }
+
   //returns false if we've already exceeded the global quota
   private boolean checkQuota() {
 // try not to go over total quota

http://git-wip-us.apache.org/repos/asf/hbase/blob/385b7924/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
--
diff 

[1/4] hbase git commit: HBASE-18137 Replication gets stuck for empty WALs

2017-06-10 Thread apurtell
Repository: hbase
Updated Branches:
  refs/heads/branch-1 6860ddca9 -> 650ef5cf5
  refs/heads/branch-1.3 6a216c787 -> 6782dfca4
  refs/heads/branch-2 eca1ec335 -> 385b79244
  refs/heads/master ea64dbef7 -> 384e308e9


HBASE-18137 Replication gets stuck for empty WALs

Signed-off-by: Andrew Purtell 


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/384e308e
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/384e308e
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/384e308e

Branch: refs/heads/master
Commit: 384e308e9f2387422e76ceb1432d6b2b85a973cf
Parents: ea64dbe
Author: Vincent 
Authored: Fri Jun 9 18:47:14 2017 -0700
Committer: Andrew Purtell 
Committed: Sat Jun 10 10:30:40 2017 -0700

--
 .../ReplicationSourceShipperThread.java |  2 +-
 .../ReplicationSourceWALReaderThread.java   | 30 
 .../hbase/replication/TestReplicationBase.java  |  1 +
 .../replication/TestReplicationSmallTests.java  | 80 
 4 files changed, 112 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/hbase/blob/384e308e/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java
--
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java
index d1a8ac2..6807da2 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipperThread.java
@@ -303,7 +303,7 @@ public class ReplicationSourceShipperThread extends Thread {
   }
 
   public Path getCurrentPath() {
-return this.currentPath;
+return this.entryReader.getCurrentPath();
   }
 
   public long getCurrentPosition() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/384e308e/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
--
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
index ad08866..c1af6e6 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hadoop.hbase.replication.regionserver;
 
+import java.io.EOFException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -189,6 +190,7 @@ public class ReplicationSourceWALReaderThread extends 
Thread {
   sleepMultiplier++;
 } else {
   LOG.error("Failed to read stream of replication entries", e);
+  handleEofException(e);
 }
 Threads.sleep(sleepForRetries * sleepMultiplier);
   } catch (InterruptedException e) {
@@ -198,6 +200,34 @@ public class ReplicationSourceWALReaderThread extends 
Thread {
 }
   }
 
+  // if we get an EOF due to a zero-length log, and there are other logs in 
queue
+  // (highly likely we've closed the current log), we've hit the max retries, 
and autorecovery is
+  // enabled, then dump the log
+  private void handleEofException(Exception e) {
+if (e.getCause() instanceof EOFException && logQueue.size() > 1
+&& conf.getBoolean("replication.source.eof.autorecovery", false)) {
+  try {
+if (fs.getFileStatus(logQueue.peek()).getLen() == 0) {
+  LOG.warn("Forcing removal of 0 length log in queue: " + 
logQueue.peek());
+  logQueue.remove();
+  currentPosition = 0;
+}
+  } catch (IOException ioe) {
+LOG.warn("Couldn't get file length information about log " + 
logQueue.peek());
+  }
+}
+  }
+
+  public Path getCurrentPath() {
+// if we've read some WAL entries, get the Path we read from
+WALEntryBatch batchQueueHead = entryBatchQueue.peek();
+if (batchQueueHead != null) {
+  return batchQueueHead.lastWalPath;
+}
+// otherwise, we must be currently reading from the head of the log queue
+return logQueue.peek();
+  }
+
   //returns false if we've already exceeded the global quota
   private boolean checkQuota() {
 // try not to go over total quota


[2/4] hbase git commit: HBASE-18137 Replication gets stuck for empty WALs

2017-06-10 Thread apurtell
HBASE-18137 Replication gets stuck for empty WALs

Signed-off-by: Andrew Purtell 


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/650ef5cf
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/650ef5cf
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/650ef5cf

Branch: refs/heads/branch-1
Commit: 650ef5cf59ee7f6e4c219a9043b66a814da52f19
Parents: 6860ddc
Author: Vincent 
Authored: Fri Jun 9 18:36:23 2017 -0700
Committer: Andrew Purtell 
Committed: Sat Jun 10 11:29:51 2017 -0700

--
 .../regionserver/ReplicationSource.java |  2 +-
 .../ReplicationSourceWALReaderThread.java   | 30 +++
 .../hbase/replication/TestReplicationBase.java  |  1 +
 .../replication/TestReplicationSmallTests.java  | 82 
 4 files changed, 114 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/hbase/blob/650ef5cf/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
--
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 6954ea2..8378b9b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -931,7 +931,7 @@ public class ReplicationSource extends Thread implements 
ReplicationSourceInterf
 }
 
 public Path getCurrentPath() {
-  return this.currentPath;
+  return this.entryReader.getCurrentPath();
 }
 
 public long getCurrentPosition() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/650ef5cf/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
--
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
index 6f1c641..40828b7 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hadoop.hbase.replication.regionserver;
 
+import java.io.EOFException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -188,6 +189,7 @@ public class ReplicationSourceWALReaderThread extends 
Thread {
   sleepMultiplier++;
 } else {
   LOG.error("Failed to read stream of replication entries", e);
+  handleEofException(e);
 }
 Threads.sleep(sleepForRetries * sleepMultiplier);
   } catch (InterruptedException e) {
@@ -197,6 +199,34 @@ public class ReplicationSourceWALReaderThread extends 
Thread {
 }
   }
 
+  // if we get an EOF due to a zero-length log, and there are other logs in 
queue
+  // (highly likely we've closed the current log), we've hit the max retries, 
and autorecovery is
+  // enabled, then dump the log
+  private void handleEofException(Exception e) {
+if (e.getCause() instanceof EOFException && logQueue.size() > 1
+&& conf.getBoolean("replication.source.eof.autorecovery", false)) {
+  try {
+if (fs.getFileStatus(logQueue.peek()).getLen() == 0) {
+  LOG.warn("Forcing removal of 0 length log in queue: " + 
logQueue.peek());
+  logQueue.remove();
+  currentPosition = 0;
+}
+  } catch (IOException ioe) {
+LOG.warn("Couldn't get file length information about log " + 
logQueue.peek());
+  }
+}
+  }
+
+  public Path getCurrentPath() {
+// if we've read some WAL entries, get the Path we read from
+WALEntryBatch batchQueueHead = entryBatchQueue.peek();
+if (batchQueueHead != null) {
+  return batchQueueHead.lastWalPath;
+}
+// otherwise, we must be currently reading from the head of the log queue
+return logQueue.peek();
+  }
+
   //returns false if we've already exceeded the global quota
   private boolean checkQuota() {
 // try not to go over total quota

http://git-wip-us.apache.org/repos/asf/hbase/blob/650ef5cf/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
--
diff --git