[GitHub] [hbase] wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly update the position of WALs currently being replicated

2019-11-29 Thread GitBox
wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly 
update the position of WALs currently being replicated
URL: https://github.com/apache/hbase/pull/749#discussion_r352175795
 
 

 ##
 File path: 
hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
 ##
 @@ -436,6 +439,56 @@ public void testWALKeySerialization() throws Exception {
 }
   }
 
+  @Test
+  public void testReplicationSourceWALReaderThreadWithFilter() throws 
Exception {
+final byte[] notReplicatedCf = Bytes.toBytes("notReplicated");
+final Map> tableCfs = new HashMap<>();
+tableCfs.put(tableName, Collections.singletonList(Bytes.toString(family)));
+ReplicationPeer peer = mock(ReplicationPeer.class);
+when(peer.getTableCFs()).thenReturn(tableCfs);
+WALEntryFilter filter = new ChainWALEntryFilter(new 
TableCfWALEntryFilter(peer));
+
+appendToLogPlus(3, notReplicatedCf);
+
+Path firstWAL = walQueue.peek();
+ReplicationSourceManager mockSourceManager = 
mock(ReplicationSourceManager.class);
+when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
+final ReplicationSourceWALReaderThread reader =
+new ReplicationSourceWALReaderThread(mockSourceManager, 
getQueueInfo(), walQueue,
+0, fs, conf, filter, new MetricsSource("1"));
+reader.start();
+
+// reader won't put any batch, even if EOF reached.
+ExecutorService executor = Executors.newSingleThreadExecutor();
+Future future = executor.submit(new 
Callable() {
+  @Override
+  public WALEntryBatch call() throws Exception {
+return reader.take();
+  }
+});
+Thread.sleep(2000);
+assertFalse(future.isDone());
 
 Review comment:
   Just a _heads up_ here: we may simplify this part if we decide to make 
`ReplicationSourceWALReaderThread.lastReadPosition` exposed via _getter_ method.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [hbase] wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly update the position of WALs currently being replicated

2019-11-29 Thread GitBox
wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly 
update the position of WALs currently being replicated
URL: https://github.com/apache/hbase/pull/749#discussion_r352161498
 
 

 ##
 File path: 
hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
 ##
 @@ -378,37 +380,35 @@ public void testReplicationSourceWALReaderThread() 
throws Exception {
   }
 
   @Test
-  public void testReplicationSourceUpdatesLogPositionOnFilteredEntries() 
throws Exception {
+  public void testReplicationSourceWALReaderThreadRecoveredQueue() throws 
Exception {
 
 Review comment:
   Actually yeah, this is indeed simulating recovered queue when creating it as 
recovered. I think this test is fine.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [hbase] wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly update the position of WALs currently being replicated

2019-11-29 Thread GitBox
wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly 
update the position of WALs currently being replicated
URL: https://github.com/apache/hbase/pull/749#discussion_r352168908
 
 

 ##
 File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
 ##
 @@ -135,59 +127,46 @@ public void run() {
   try (WALEntryStream entryStream =
   new WALEntryStream(logQueue, fs, conf, currentPosition, metrics)) {
 while (isReaderRunning()) { // loop here to keep reusing stream while 
we can
-  if (!checkQuota()) {
+  if (manager.isBufferQuotaReached()) {
+Threads.sleep(sleepForRetries);
 continue;
   }
-  WALEntryBatch batch = null;
-  while (entryStream.hasNext()) {
-if (batch == null) {
-  batch = new WALEntryBatch(replicationBatchCountCapacity, 
entryStream.getCurrentPath());
-}
+  WALEntryBatch batch =
+  new WALEntryBatch(replicationBatchCountCapacity, 
replicationBatchSizeCapacity);
+  boolean hasNext;
+  while ((hasNext = entryStream.hasNext()) == true) {
 Entry entry = entryStream.next();
 entry = filterEntry(entry);
 if (entry != null) {
   WALEdit edit = entry.getEdit();
   if (edit != null && !edit.isEmpty()) {
-long entrySize = getEntrySizeIncludeBulkLoad(entry);
-long entrySizeExlucdeBulkLoad = 
getEntrySizeExcludeBulkLoad(entry);
-batch.addEntry(entry);
-replicationSourceManager.setPendingShipment(true);
-updateBatchStats(batch, entry, entryStream.getPosition(), 
entrySize);
-boolean totalBufferTooLarge = 
acquireBufferQuota(entrySizeExlucdeBulkLoad);
+long entrySizeExcludeBulkLoad = batch.addEntry(entry);
+boolean totalBufferTooLarge = 
manager.acquireBufferQuota(entrySizeExcludeBulkLoad);
 // Stop if too many entries or too big
-if (totalBufferTooLarge || batch.getHeapSize() >= 
replicationBatchSizeCapacity
-|| batch.getNbEntries() >= replicationBatchCountCapacity) {
+if (totalBufferTooLarge || batch.isLimitReached()) {
   break;
 }
   }
-} else {
 
 Review comment:
   I guess this is fine for the replication progress problem. One additional 
issue, though, is regarding monitoring. IIRC, DumpReplicationQueues relies on 
replication info available at ZK, so now it may not show an accurate position 
for the log queue. We may need to expose 
`ReplicationSourceWALReaderThread.lastReadPosition` via _getter_ method for 
monitoring purposes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [hbase] wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly update the position of WALs currently being replicated

2019-11-19 Thread GitBox
wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly 
update the position of WALs currently being replicated
URL: https://github.com/apache/hbase/pull/749#discussion_r348096334
 
 

 ##
 File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
 ##
 @@ -135,59 +127,46 @@ public void run() {
   try (WALEntryStream entryStream =
   new WALEntryStream(logQueue, fs, conf, currentPosition, metrics)) {
 while (isReaderRunning()) { // loop here to keep reusing stream while 
we can
-  if (!checkQuota()) {
+  if (manager.isBufferQuotaReached()) {
+Threads.sleep(sleepForRetries);
 continue;
   }
-  WALEntryBatch batch = null;
-  while (entryStream.hasNext()) {
-if (batch == null) {
-  batch = new WALEntryBatch(replicationBatchCountCapacity, 
entryStream.getCurrentPath());
-}
+  WALEntryBatch batch =
+  new WALEntryBatch(replicationBatchCountCapacity, 
replicationBatchSizeCapacity);
+  boolean hasNext;
+  while ((hasNext = entryStream.hasNext()) == true) {
 Entry entry = entryStream.next();
 entry = filterEntry(entry);
 if (entry != null) {
   WALEdit edit = entry.getEdit();
   if (edit != null && !edit.isEmpty()) {
-long entrySize = getEntrySizeIncludeBulkLoad(entry);
-long entrySizeExlucdeBulkLoad = 
getEntrySizeExcludeBulkLoad(entry);
-batch.addEntry(entry);
-replicationSourceManager.setPendingShipment(true);
-updateBatchStats(batch, entry, entryStream.getPosition(), 
entrySize);
-boolean totalBufferTooLarge = 
acquireBufferQuota(entrySizeExlucdeBulkLoad);
+long entrySizeExcludeBulkLoad = batch.addEntry(entry);
+boolean totalBufferTooLarge = 
manager.acquireBufferQuota(entrySizeExcludeBulkLoad);
 // Stop if too many entries or too big
-if (totalBufferTooLarge || batch.getHeapSize() >= 
replicationBatchSizeCapacity
-|| batch.getNbEntries() >= replicationBatchCountCapacity) {
+if (totalBufferTooLarge || batch.isLimitReached()) {
   break;
 }
   }
-} else {
 
 Review comment:
   I think the answer to my question above is in the _resetStream()_ that gets 
called at the end of the second while loop, which will update 
_lastReadPosition_ variable that is now used for reading here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [hbase] wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly update the position of WALs currently being replicated

2019-11-19 Thread GitBox
wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly 
update the position of WALs currently being replicated
URL: https://github.com/apache/hbase/pull/749#discussion_r348092931
 
 

 ##
 File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
 ##
 @@ -135,59 +127,46 @@ public void run() {
   try (WALEntryStream entryStream =
   new WALEntryStream(logQueue, fs, conf, currentPosition, metrics)) {
 while (isReaderRunning()) { // loop here to keep reusing stream while 
we can
-  if (!checkQuota()) {
+  if (manager.isBufferQuotaReached()) {
+Threads.sleep(sleepForRetries);
 continue;
   }
-  WALEntryBatch batch = null;
-  while (entryStream.hasNext()) {
-if (batch == null) {
-  batch = new WALEntryBatch(replicationBatchCountCapacity, 
entryStream.getCurrentPath());
-}
+  WALEntryBatch batch =
+  new WALEntryBatch(replicationBatchCountCapacity, 
replicationBatchSizeCapacity);
+  boolean hasNext;
+  while ((hasNext = entryStream.hasNext()) == true) {
 Entry entry = entryStream.next();
 entry = filterEntry(entry);
 if (entry != null) {
   WALEdit edit = entry.getEdit();
   if (edit != null && !edit.isEmpty()) {
-long entrySize = getEntrySizeIncludeBulkLoad(entry);
-long entrySizeExlucdeBulkLoad = 
getEntrySizeExcludeBulkLoad(entry);
-batch.addEntry(entry);
-replicationSourceManager.setPendingShipment(true);
-updateBatchStats(batch, entry, entryStream.getPosition(), 
entrySize);
-boolean totalBufferTooLarge = 
acquireBufferQuota(entrySizeExlucdeBulkLoad);
+long entrySizeExcludeBulkLoad = batch.addEntry(entry);
+boolean totalBufferTooLarge = 
manager.acquireBufferQuota(entrySizeExcludeBulkLoad);
 // Stop if too many entries or too big
-if (totalBufferTooLarge || batch.getHeapSize() >= 
replicationBatchSizeCapacity
-|| batch.getNbEntries() >= replicationBatchCountCapacity) {
+if (totalBufferTooLarge || batch.isLimitReached()) {
   break;
 }
   }
-} else {
-  
replicationSourceManager.logPositionAndCleanOldLogs(entryStream.getCurrentPath(),
-this.replicationQueueInfo.getPeerClusterZnode(),
-entryStream.getPosition(),
-this.replicationQueueInfo.isQueueRecovered(), false);
 }
   }
-  if (batch != null && (!batch.getLastSeqIds().isEmpty() || 
batch.getNbEntries() > 0)) {
-if (LOG.isTraceEnabled()) {
-  LOG.trace(String.format("Read %s WAL entries eligible for 
replication",
-batch.getNbEntries()));
-}
-entryBatchQueue.put(batch);
+
+  if (LOG.isTraceEnabled()) {
+LOG.trace(String.format("Read %s WAL entries eligible for 
replication",
+batch.getNbEntries()));
+  }
+
+  updateBatch(entryStream, batch, hasNext);
+  if (isShippable(batch)) {
 sleepMultiplier = 1;
-  } else { // got no entries and didn't advance position in WAL
-LOG.trace("Didn't read any new entries from WAL");
-if (replicationQueueInfo.isQueueRecovered()) {
-  // we're done with queue recovery, shut ourself down
+entryBatchQueue.put(batch);
+if (!batch.hasMoreEntries()) {
+  // we're done with queue recovery, shut ourselves down
   setReaderRunning(false);
-  // shuts down shipper thread immediately
-  entryBatchQueue.put(batch != null ? batch
-  : new WALEntryBatch(replicationBatchCountCapacity, 
entryStream.getCurrentPath()));
-} else {
-  Thread.sleep(sleepForRetries);
 }
+  } else {
 
 Review comment:
   Ok, so _batch.hasMoreEntries()_ returns true if this is isn't a recovered 
queue.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [hbase] wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly update the position of WALs currently being replicated

2019-11-19 Thread GitBox
wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly 
update the position of WALs currently being replicated
URL: https://github.com/apache/hbase/pull/749#discussion_r348087671
 
 

 ##
 File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
 ##
 @@ -135,59 +127,46 @@ public void run() {
   try (WALEntryStream entryStream =
   new WALEntryStream(logQueue, fs, conf, currentPosition, metrics)) {
 while (isReaderRunning()) { // loop here to keep reusing stream while 
we can
-  if (!checkQuota()) {
+  if (manager.isBufferQuotaReached()) {
+Threads.sleep(sleepForRetries);
 continue;
   }
-  WALEntryBatch batch = null;
-  while (entryStream.hasNext()) {
-if (batch == null) {
-  batch = new WALEntryBatch(replicationBatchCountCapacity, 
entryStream.getCurrentPath());
-}
+  WALEntryBatch batch =
+  new WALEntryBatch(replicationBatchCountCapacity, 
replicationBatchSizeCapacity);
+  boolean hasNext;
+  while ((hasNext = entryStream.hasNext()) == true) {
 Entry entry = entryStream.next();
 entry = filterEntry(entry);
 if (entry != null) {
   WALEdit edit = entry.getEdit();
   if (edit != null && !edit.isEmpty()) {
-long entrySize = getEntrySizeIncludeBulkLoad(entry);
-long entrySizeExlucdeBulkLoad = 
getEntrySizeExcludeBulkLoad(entry);
-batch.addEntry(entry);
-replicationSourceManager.setPendingShipment(true);
-updateBatchStats(batch, entry, entryStream.getPosition(), 
entrySize);
-boolean totalBufferTooLarge = 
acquireBufferQuota(entrySizeExlucdeBulkLoad);
+long entrySizeExcludeBulkLoad = batch.addEntry(entry);
+boolean totalBufferTooLarge = 
manager.acquireBufferQuota(entrySizeExcludeBulkLoad);
 // Stop if too many entries or too big
-if (totalBufferTooLarge || batch.getHeapSize() >= 
replicationBatchSizeCapacity
-|| batch.getNbEntries() >= replicationBatchCountCapacity) {
+if (totalBufferTooLarge || batch.isLimitReached()) {
   break;
 }
   }
-} else {
 
 Review comment:
   > If some use cases means "no mutations come for a long time, but a batch 
has entries"
   
   What if the whole WAL section read got no entries for replication? In this 
case, batch would be empty, so 
_ReplicationSourceManager.logPositionAndCleanOldLogs_ does not ever get called 
(at least, I guess, until the log is rolled).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [hbase] wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly update the position of WALs currently being replicated

2019-11-19 Thread GitBox
wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly 
update the position of WALs currently being replicated
URL: https://github.com/apache/hbase/pull/749#discussion_r348087671
 
 

 ##
 File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
 ##
 @@ -135,59 +127,46 @@ public void run() {
   try (WALEntryStream entryStream =
   new WALEntryStream(logQueue, fs, conf, currentPosition, metrics)) {
 while (isReaderRunning()) { // loop here to keep reusing stream while 
we can
-  if (!checkQuota()) {
+  if (manager.isBufferQuotaReached()) {
+Threads.sleep(sleepForRetries);
 continue;
   }
-  WALEntryBatch batch = null;
-  while (entryStream.hasNext()) {
-if (batch == null) {
-  batch = new WALEntryBatch(replicationBatchCountCapacity, 
entryStream.getCurrentPath());
-}
+  WALEntryBatch batch =
+  new WALEntryBatch(replicationBatchCountCapacity, 
replicationBatchSizeCapacity);
+  boolean hasNext;
+  while ((hasNext = entryStream.hasNext()) == true) {
 Entry entry = entryStream.next();
 entry = filterEntry(entry);
 if (entry != null) {
   WALEdit edit = entry.getEdit();
   if (edit != null && !edit.isEmpty()) {
-long entrySize = getEntrySizeIncludeBulkLoad(entry);
-long entrySizeExlucdeBulkLoad = 
getEntrySizeExcludeBulkLoad(entry);
-batch.addEntry(entry);
-replicationSourceManager.setPendingShipment(true);
-updateBatchStats(batch, entry, entryStream.getPosition(), 
entrySize);
-boolean totalBufferTooLarge = 
acquireBufferQuota(entrySizeExlucdeBulkLoad);
+long entrySizeExcludeBulkLoad = batch.addEntry(entry);
+boolean totalBufferTooLarge = 
manager.acquireBufferQuota(entrySizeExcludeBulkLoad);
 // Stop if too many entries or too big
-if (totalBufferTooLarge || batch.getHeapSize() >= 
replicationBatchSizeCapacity
-|| batch.getNbEntries() >= replicationBatchCountCapacity) {
+if (totalBufferTooLarge || batch.isLimitReached()) {
   break;
 }
   }
-} else {
 
 Review comment:
   > If some use cases means "no mutations come for a long time, but a batch 
has entries"
   What if the whole WAL section read got no entries for replication? In this 
case, batch would be empty, so 
_ReplicationSourceManager.logPositionAndCleanOldLogs_ does not ever get called 
(at least, I guess, until the log is rolled).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [hbase] wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly update the position of WALs currently being replicated

2019-11-17 Thread GitBox
wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly 
update the position of WALs currently being replicated
URL: https://github.com/apache/hbase/pull/749#discussion_r347131612
 
 

 ##
 File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
 ##
 @@ -135,59 +127,46 @@ public void run() {
   try (WALEntryStream entryStream =
   new WALEntryStream(logQueue, fs, conf, currentPosition, metrics)) {
 while (isReaderRunning()) { // loop here to keep reusing stream while 
we can
-  if (!checkQuota()) {
+  if (manager.isBufferQuotaReached()) {
+Threads.sleep(sleepForRetries);
 continue;
   }
-  WALEntryBatch batch = null;
-  while (entryStream.hasNext()) {
-if (batch == null) {
-  batch = new WALEntryBatch(replicationBatchCountCapacity, 
entryStream.getCurrentPath());
-}
+  WALEntryBatch batch =
+  new WALEntryBatch(replicationBatchCountCapacity, 
replicationBatchSizeCapacity);
+  boolean hasNext;
+  while ((hasNext = entryStream.hasNext()) == true) {
 Entry entry = entryStream.next();
 entry = filterEntry(entry);
 if (entry != null) {
   WALEdit edit = entry.getEdit();
   if (edit != null && !edit.isEmpty()) {
-long entrySize = getEntrySizeIncludeBulkLoad(entry);
-long entrySizeExlucdeBulkLoad = 
getEntrySizeExcludeBulkLoad(entry);
-batch.addEntry(entry);
-replicationSourceManager.setPendingShipment(true);
-updateBatchStats(batch, entry, entryStream.getPosition(), 
entrySize);
-boolean totalBufferTooLarge = 
acquireBufferQuota(entrySizeExlucdeBulkLoad);
+long entrySizeExcludeBulkLoad = batch.addEntry(entry);
+boolean totalBufferTooLarge = 
manager.acquireBufferQuota(entrySizeExcludeBulkLoad);
 // Stop if too many entries or too big
-if (totalBufferTooLarge || batch.getHeapSize() >= 
replicationBatchSizeCapacity
-|| batch.getNbEntries() >= replicationBatchCountCapacity) {
+if (totalBufferTooLarge || batch.isLimitReached()) {
   break;
 }
   }
-} else {
 
 Review comment:
   So if a filtered edit came, any sub-sequent non-filterable one would need to 
wait for a log roll? That could take too long for some use cases. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [hbase] wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly update the position of WALs currently being replicated

2019-11-17 Thread GitBox
wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly 
update the position of WALs currently being replicated
URL: https://github.com/apache/hbase/pull/749#discussion_r347130863
 
 

 ##
 File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
 ##
 @@ -135,59 +127,46 @@ public void run() {
   try (WALEntryStream entryStream =
   new WALEntryStream(logQueue, fs, conf, currentPosition, metrics)) {
 while (isReaderRunning()) { // loop here to keep reusing stream while 
we can
-  if (!checkQuota()) {
+  if (manager.isBufferQuotaReached()) {
+Threads.sleep(sleepForRetries);
 continue;
   }
-  WALEntryBatch batch = null;
-  while (entryStream.hasNext()) {
-if (batch == null) {
-  batch = new WALEntryBatch(replicationBatchCountCapacity, 
entryStream.getCurrentPath());
-}
+  WALEntryBatch batch =
+  new WALEntryBatch(replicationBatchCountCapacity, 
replicationBatchSizeCapacity);
+  boolean hasNext;
+  while ((hasNext = entryStream.hasNext()) == true) {
 Entry entry = entryStream.next();
 entry = filterEntry(entry);
 if (entry != null) {
   WALEdit edit = entry.getEdit();
   if (edit != null && !edit.isEmpty()) {
-long entrySize = getEntrySizeIncludeBulkLoad(entry);
-long entrySizeExlucdeBulkLoad = 
getEntrySizeExcludeBulkLoad(entry);
-batch.addEntry(entry);
-replicationSourceManager.setPendingShipment(true);
-updateBatchStats(batch, entry, entryStream.getPosition(), 
entrySize);
-boolean totalBufferTooLarge = 
acquireBufferQuota(entrySizeExlucdeBulkLoad);
+long entrySizeExcludeBulkLoad = batch.addEntry(entry);
+boolean totalBufferTooLarge = 
manager.acquireBufferQuota(entrySizeExcludeBulkLoad);
 // Stop if too many entries or too big
-if (totalBufferTooLarge || batch.getHeapSize() >= 
replicationBatchSizeCapacity
-|| batch.getNbEntries() >= replicationBatchCountCapacity) {
+if (totalBufferTooLarge || batch.isLimitReached()) {
   break;
 }
   }
-} else {
-  
replicationSourceManager.logPositionAndCleanOldLogs(entryStream.getCurrentPath(),
-this.replicationQueueInfo.getPeerClusterZnode(),
-entryStream.getPosition(),
-this.replicationQueueInfo.isQueueRecovered(), false);
 }
   }
-  if (batch != null && (!batch.getLastSeqIds().isEmpty() || 
batch.getNbEntries() > 0)) {
-if (LOG.isTraceEnabled()) {
-  LOG.trace(String.format("Read %s WAL entries eligible for 
replication",
-batch.getNbEntries()));
-}
-entryBatchQueue.put(batch);
+
+  if (LOG.isTraceEnabled()) {
 
 Review comment:
   This adds extra complexity for troubleshooting and is even misleading, as 
the message says the entries are eligible for replication, even if they got 
filtered, which means just the opposite.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [hbase] wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly update the position of WALs currently being replicated

2019-11-17 Thread GitBox
wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly 
update the position of WALs currently being replicated
URL: https://github.com/apache/hbase/pull/749#discussion_r347130863
 
 

 ##
 File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
 ##
 @@ -135,59 +127,46 @@ public void run() {
   try (WALEntryStream entryStream =
   new WALEntryStream(logQueue, fs, conf, currentPosition, metrics)) {
 while (isReaderRunning()) { // loop here to keep reusing stream while 
we can
-  if (!checkQuota()) {
+  if (manager.isBufferQuotaReached()) {
+Threads.sleep(sleepForRetries);
 continue;
   }
-  WALEntryBatch batch = null;
-  while (entryStream.hasNext()) {
-if (batch == null) {
-  batch = new WALEntryBatch(replicationBatchCountCapacity, 
entryStream.getCurrentPath());
-}
+  WALEntryBatch batch =
+  new WALEntryBatch(replicationBatchCountCapacity, 
replicationBatchSizeCapacity);
+  boolean hasNext;
+  while ((hasNext = entryStream.hasNext()) == true) {
 Entry entry = entryStream.next();
 entry = filterEntry(entry);
 if (entry != null) {
   WALEdit edit = entry.getEdit();
   if (edit != null && !edit.isEmpty()) {
-long entrySize = getEntrySizeIncludeBulkLoad(entry);
-long entrySizeExlucdeBulkLoad = 
getEntrySizeExcludeBulkLoad(entry);
-batch.addEntry(entry);
-replicationSourceManager.setPendingShipment(true);
-updateBatchStats(batch, entry, entryStream.getPosition(), 
entrySize);
-boolean totalBufferTooLarge = 
acquireBufferQuota(entrySizeExlucdeBulkLoad);
+long entrySizeExcludeBulkLoad = batch.addEntry(entry);
+boolean totalBufferTooLarge = 
manager.acquireBufferQuota(entrySizeExcludeBulkLoad);
 // Stop if too many entries or too big
-if (totalBufferTooLarge || batch.getHeapSize() >= 
replicationBatchSizeCapacity
-|| batch.getNbEntries() >= replicationBatchCountCapacity) {
+if (totalBufferTooLarge || batch.isLimitReached()) {
   break;
 }
   }
-} else {
-  
replicationSourceManager.logPositionAndCleanOldLogs(entryStream.getCurrentPath(),
-this.replicationQueueInfo.getPeerClusterZnode(),
-entryStream.getPosition(),
-this.replicationQueueInfo.isQueueRecovered(), false);
 }
   }
-  if (batch != null && (!batch.getLastSeqIds().isEmpty() || 
batch.getNbEntries() > 0)) {
-if (LOG.isTraceEnabled()) {
-  LOG.trace(String.format("Read %s WAL entries eligible for 
replication",
-batch.getNbEntries()));
-}
-entryBatchQueue.put(batch);
+
+  if (LOG.isTraceEnabled()) {
 
 Review comment:
   This can lead to confusion, adds extra complexity for troubleshooting and is 
even misleading, as the message says the entries are eligible for replication, 
even if they got filtered, which means just the opposite.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [hbase] wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly update the position of WALs currently being replicated

2019-10-30 Thread GitBox
wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly 
update the position of WALs currently being replicated
URL: https://github.com/apache/hbase/pull/749#discussion_r340533598
 
 

 ##
 File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
 ##
 @@ -466,8 +461,27 @@ void postLogRoll(Path newLog) throws IOException {
   }
 
   @VisibleForTesting
-  public AtomicLong getTotalBufferUsed() {
-return totalBufferUsed;
+  public long getTotalBufferUsed() {
 
 Review comment:
   These are all a separate issue, not related to the wal position update, 
right? If so, please remove these from this PR and create a separate jira/PR 
for it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [hbase] wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly update the position of WALs currently being replicated

2019-10-30 Thread GitBox
wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly 
update the position of WALs currently being replicated
URL: https://github.com/apache/hbase/pull/749#discussion_r340530744
 
 

 ##
 File path: 
hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
 ##
 @@ -829,8 +830,9 @@ public void testEmptyWALRecovery() throws Exception {
   WAL wal = 
utility1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
   Path currentWalPath = DefaultWALProvider.getCurrentFileName(wal);
   String walGroupId = 
DefaultWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
-  Path emptyWalPath = new Path(utility1.getDataTestDir(), walGroupId + "." 
+ ts);
-  utility1.getTestFileSystem().create(emptyWalPath).close();
+  Path emptyWalPath = new Path(currentWalPath.getParent(), walGroupId + 
"." + ts);
 
 Review comment:
   Why is this change needed?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [hbase] wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly update the position of WALs currently being replicated

2019-10-30 Thread GitBox
wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly 
update the position of WALs currently being replicated
URL: https://github.com/apache/hbase/pull/749#discussion_r340547200
 
 

 ##
 File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
 ##
 @@ -135,59 +127,46 @@ public void run() {
   try (WALEntryStream entryStream =
   new WALEntryStream(logQueue, fs, conf, currentPosition, metrics)) {
 while (isReaderRunning()) { // loop here to keep reusing stream while 
we can
-  if (!checkQuota()) {
+  if (manager.isBufferQuotaReached()) {
+Threads.sleep(sleepForRetries);
 continue;
   }
-  WALEntryBatch batch = null;
-  while (entryStream.hasNext()) {
-if (batch == null) {
-  batch = new WALEntryBatch(replicationBatchCountCapacity, 
entryStream.getCurrentPath());
-}
+  WALEntryBatch batch =
+  new WALEntryBatch(replicationBatchCountCapacity, 
replicationBatchSizeCapacity);
+  boolean hasNext;
+  while ((hasNext = entryStream.hasNext()) == true) {
 Entry entry = entryStream.next();
 entry = filterEntry(entry);
 if (entry != null) {
   WALEdit edit = entry.getEdit();
   if (edit != null && !edit.isEmpty()) {
-long entrySize = getEntrySizeIncludeBulkLoad(entry);
-long entrySizeExlucdeBulkLoad = 
getEntrySizeExcludeBulkLoad(entry);
-batch.addEntry(entry);
-replicationSourceManager.setPendingShipment(true);
-updateBatchStats(batch, entry, entryStream.getPosition(), 
entrySize);
-boolean totalBufferTooLarge = 
acquireBufferQuota(entrySizeExlucdeBulkLoad);
+long entrySizeExcludeBulkLoad = batch.addEntry(entry);
+boolean totalBufferTooLarge = 
manager.acquireBufferQuota(entrySizeExcludeBulkLoad);
 // Stop if too many entries or too big
-if (totalBufferTooLarge || batch.getHeapSize() >= 
replicationBatchSizeCapacity
-|| batch.getNbEntries() >= replicationBatchCountCapacity) {
+if (totalBufferTooLarge || batch.isLimitReached()) {
   break;
 }
   }
-} else {
-  
replicationSourceManager.logPositionAndCleanOldLogs(entryStream.getCurrentPath(),
-this.replicationQueueInfo.getPeerClusterZnode(),
-entryStream.getPosition(),
-this.replicationQueueInfo.isQueueRecovered(), false);
 }
   }
-  if (batch != null && (!batch.getLastSeqIds().isEmpty() || 
batch.getNbEntries() > 0)) {
-if (LOG.isTraceEnabled()) {
-  LOG.trace(String.format("Read %s WAL entries eligible for 
replication",
-batch.getNbEntries()));
-}
-entryBatchQueue.put(batch);
+
+  if (LOG.isTraceEnabled()) {
+LOG.trace(String.format("Read %s WAL entries eligible for 
replication",
+batch.getNbEntries()));
+  }
+
+  updateBatch(entryStream, batch, hasNext);
+  if (isShippable(batch)) {
 sleepMultiplier = 1;
-  } else { // got no entries and didn't advance position in WAL
-LOG.trace("Didn't read any new entries from WAL");
-if (replicationQueueInfo.isQueueRecovered()) {
-  // we're done with queue recovery, shut ourself down
+entryBatchQueue.put(batch);
+if (!batch.hasMoreEntries()) {
+  // we're done with queue recovery, shut ourselves down
   setReaderRunning(false);
-  // shuts down shipper thread immediately
-  entryBatchQueue.put(batch != null ? batch
-  : new WALEntryBatch(replicationBatchCountCapacity, 
entryStream.getCurrentPath()));
-} else {
-  Thread.sleep(sleepForRetries);
 }
+  } else {
 
 Review comment:
   How about the recovered queue condition? If a source reading a recovered 
queue reached end of all wals, it should stop running, right?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [hbase] wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly update the position of WALs currently being replicated

2019-10-30 Thread GitBox
wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly 
update the position of WALs currently being replicated
URL: https://github.com/apache/hbase/pull/749#discussion_r340546055
 
 

 ##
 File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
 ##
 @@ -135,59 +127,46 @@ public void run() {
   try (WALEntryStream entryStream =
   new WALEntryStream(logQueue, fs, conf, currentPosition, metrics)) {
 while (isReaderRunning()) { // loop here to keep reusing stream while 
we can
-  if (!checkQuota()) {
+  if (manager.isBufferQuotaReached()) {
+Threads.sleep(sleepForRetries);
 continue;
   }
-  WALEntryBatch batch = null;
-  while (entryStream.hasNext()) {
-if (batch == null) {
-  batch = new WALEntryBatch(replicationBatchCountCapacity, 
entryStream.getCurrentPath());
-}
+  WALEntryBatch batch =
+  new WALEntryBatch(replicationBatchCountCapacity, 
replicationBatchSizeCapacity);
+  boolean hasNext;
+  while ((hasNext = entryStream.hasNext()) == true) {
 Entry entry = entryStream.next();
 entry = filterEntry(entry);
 if (entry != null) {
   WALEdit edit = entry.getEdit();
   if (edit != null && !edit.isEmpty()) {
-long entrySize = getEntrySizeIncludeBulkLoad(entry);
-long entrySizeExlucdeBulkLoad = 
getEntrySizeExcludeBulkLoad(entry);
-batch.addEntry(entry);
-replicationSourceManager.setPendingShipment(true);
-updateBatchStats(batch, entry, entryStream.getPosition(), 
entrySize);
-boolean totalBufferTooLarge = 
acquireBufferQuota(entrySizeExlucdeBulkLoad);
+long entrySizeExcludeBulkLoad = batch.addEntry(entry);
+boolean totalBufferTooLarge = 
manager.acquireBufferQuota(entrySizeExcludeBulkLoad);
 // Stop if too many entries or too big
-if (totalBufferTooLarge || batch.getHeapSize() >= 
replicationBatchSizeCapacity
-|| batch.getNbEntries() >= replicationBatchCountCapacity) {
+if (totalBufferTooLarge || batch.isLimitReached()) {
   break;
 }
   }
-} else {
-  
replicationSourceManager.logPositionAndCleanOldLogs(entryStream.getCurrentPath(),
-this.replicationQueueInfo.getPeerClusterZnode(),
-entryStream.getPosition(),
-this.replicationQueueInfo.isQueueRecovered(), false);
 }
   }
-  if (batch != null && (!batch.getLastSeqIds().isEmpty() || 
batch.getNbEntries() > 0)) {
-if (LOG.isTraceEnabled()) {
-  LOG.trace(String.format("Read %s WAL entries eligible for 
replication",
-batch.getNbEntries()));
-}
-entryBatchQueue.put(batch);
+
+  if (LOG.isTraceEnabled()) {
 
 Review comment:
   Is this being logged all the time now? What if the edit got filtered due to 
not being target for replication?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [hbase] wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly update the position of WALs currently being replicated

2019-10-30 Thread GitBox
wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly 
update the position of WALs currently being replicated
URL: https://github.com/apache/hbase/pull/749#discussion_r340538075
 
 

 ##
 File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java
 ##
 @@ -135,59 +127,46 @@ public void run() {
   try (WALEntryStream entryStream =
   new WALEntryStream(logQueue, fs, conf, currentPosition, metrics)) {
 while (isReaderRunning()) { // loop here to keep reusing stream while 
we can
-  if (!checkQuota()) {
+  if (manager.isBufferQuotaReached()) {
+Threads.sleep(sleepForRetries);
 continue;
   }
-  WALEntryBatch batch = null;
-  while (entryStream.hasNext()) {
-if (batch == null) {
-  batch = new WALEntryBatch(replicationBatchCountCapacity, 
entryStream.getCurrentPath());
-}
+  WALEntryBatch batch =
+  new WALEntryBatch(replicationBatchCountCapacity, 
replicationBatchSizeCapacity);
+  boolean hasNext;
+  while ((hasNext = entryStream.hasNext()) == true) {
 Entry entry = entryStream.next();
 entry = filterEntry(entry);
 if (entry != null) {
   WALEdit edit = entry.getEdit();
   if (edit != null && !edit.isEmpty()) {
-long entrySize = getEntrySizeIncludeBulkLoad(entry);
-long entrySizeExlucdeBulkLoad = 
getEntrySizeExcludeBulkLoad(entry);
-batch.addEntry(entry);
-replicationSourceManager.setPendingShipment(true);
-updateBatchStats(batch, entry, entryStream.getPosition(), 
entrySize);
-boolean totalBufferTooLarge = 
acquireBufferQuota(entrySizeExlucdeBulkLoad);
+long entrySizeExcludeBulkLoad = batch.addEntry(entry);
+boolean totalBufferTooLarge = 
manager.acquireBufferQuota(entrySizeExcludeBulkLoad);
 // Stop if too many entries or too big
-if (totalBufferTooLarge || batch.getHeapSize() >= 
replicationBatchSizeCapacity
-|| batch.getNbEntries() >= replicationBatchCountCapacity) {
+if (totalBufferTooLarge || batch.isLimitReached()) {
   break;
 }
   }
-} else {
 
 Review comment:
   Where is the log position getting updated now if the current edit is not 
targeted to replication?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [hbase] wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly update the position of WALs currently being replicated

2019-10-30 Thread GitBox
wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly 
update the position of WALs currently being replicated
URL: https://github.com/apache/hbase/pull/749#discussion_r340532286
 
 

 ##
 File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
 ##
 @@ -466,8 +461,27 @@ void postLogRoll(Path newLog) throws IOException {
   }
 
   @VisibleForTesting
-  public AtomicLong getTotalBufferUsed() {
-return totalBufferUsed;
+  public long getTotalBufferUsed() {
 
 Review comment:
   Trivial, unnecessary change that just adds overhead to review. Please revert 
it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [hbase] wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly update the position of WALs currently being replicated

2019-10-30 Thread GitBox
wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly 
update the position of WALs currently being replicated
URL: https://github.com/apache/hbase/pull/749#discussion_r340550929
 
 

 ##
 File path: 
hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
 ##
 @@ -378,37 +380,35 @@ public void testReplicationSourceWALReaderThread() 
throws Exception {
   }
 
   @Test
-  public void testReplicationSourceUpdatesLogPositionOnFilteredEntries() 
throws Exception {
+  public void testReplicationSourceWALReaderThreadRecoveredQueue() throws 
Exception {
 
 Review comment:
   Why this test has been changed from it's original purpose of checking for 
upating wal position when no edits targeted to replication are read? Also the 
name does not seem accurate, it does not seem to create a recovered queue 
scenario.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services