[GitHub] [hbase] wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly update the position of WALs currently being replicated
wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly update the position of WALs currently being replicated URL: https://github.com/apache/hbase/pull/749#discussion_r352175795 ## File path: hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java ## @@ -436,6 +439,56 @@ public void testWALKeySerialization() throws Exception { } } + @Test + public void testReplicationSourceWALReaderThreadWithFilter() throws Exception { +final byte[] notReplicatedCf = Bytes.toBytes("notReplicated"); +final Map> tableCfs = new HashMap<>(); +tableCfs.put(tableName, Collections.singletonList(Bytes.toString(family))); +ReplicationPeer peer = mock(ReplicationPeer.class); +when(peer.getTableCFs()).thenReturn(tableCfs); +WALEntryFilter filter = new ChainWALEntryFilter(new TableCfWALEntryFilter(peer)); + +appendToLogPlus(3, notReplicatedCf); + +Path firstWAL = walQueue.peek(); +ReplicationSourceManager mockSourceManager = mock(ReplicationSourceManager.class); +when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); +final ReplicationSourceWALReaderThread reader = +new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(), walQueue, +0, fs, conf, filter, new MetricsSource("1")); +reader.start(); + +// reader won't put any batch, even if EOF reached. +ExecutorService executor = Executors.newSingleThreadExecutor(); +Future future = executor.submit(new Callable() { + @Override + public WALEntryBatch call() throws Exception { +return reader.take(); + } +}); +Thread.sleep(2000); +assertFalse(future.isDone()); Review comment: Just a _heads up_ here: we may simplify this part if we decide to make `ReplicationSourceWALReaderThread.lastReadPosition` exposed via _getter_ method. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [hbase] wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly update the position of WALs currently being replicated
wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly update the position of WALs currently being replicated URL: https://github.com/apache/hbase/pull/749#discussion_r352161498 ## File path: hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java ## @@ -378,37 +380,35 @@ public void testReplicationSourceWALReaderThread() throws Exception { } @Test - public void testReplicationSourceUpdatesLogPositionOnFilteredEntries() throws Exception { + public void testReplicationSourceWALReaderThreadRecoveredQueue() throws Exception { Review comment: Actually yeah, this is indeed simulating recovered queue when creating it as recovered. I think this test is fine. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [hbase] wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly update the position of WALs currently being replicated
wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly update the position of WALs currently being replicated URL: https://github.com/apache/hbase/pull/749#discussion_r352168908 ## File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java ## @@ -135,59 +127,46 @@ public void run() { try (WALEntryStream entryStream = new WALEntryStream(logQueue, fs, conf, currentPosition, metrics)) { while (isReaderRunning()) { // loop here to keep reusing stream while we can - if (!checkQuota()) { + if (manager.isBufferQuotaReached()) { +Threads.sleep(sleepForRetries); continue; } - WALEntryBatch batch = null; - while (entryStream.hasNext()) { -if (batch == null) { - batch = new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath()); -} + WALEntryBatch batch = + new WALEntryBatch(replicationBatchCountCapacity, replicationBatchSizeCapacity); + boolean hasNext; + while ((hasNext = entryStream.hasNext()) == true) { Entry entry = entryStream.next(); entry = filterEntry(entry); if (entry != null) { WALEdit edit = entry.getEdit(); if (edit != null && !edit.isEmpty()) { -long entrySize = getEntrySizeIncludeBulkLoad(entry); -long entrySizeExlucdeBulkLoad = getEntrySizeExcludeBulkLoad(entry); -batch.addEntry(entry); -replicationSourceManager.setPendingShipment(true); -updateBatchStats(batch, entry, entryStream.getPosition(), entrySize); -boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExlucdeBulkLoad); +long entrySizeExcludeBulkLoad = batch.addEntry(entry); +boolean totalBufferTooLarge = manager.acquireBufferQuota(entrySizeExcludeBulkLoad); // Stop if too many entries or too big -if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity -|| batch.getNbEntries() >= replicationBatchCountCapacity) { +if (totalBufferTooLarge || batch.isLimitReached()) { break; } } -} else { Review comment: I guess this is fine for the replication progress problem. One additional issue, though, is regarding monitoring. IIRC, DumpReplicationQueues relies on replication info available at ZK, so now it may not show an accurate position for the log queue. We may need to expose `ReplicationSourceWALReaderThread.lastReadPosition` via _getter_ method for monitoring purposes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [hbase] wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly update the position of WALs currently being replicated
wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly update the position of WALs currently being replicated URL: https://github.com/apache/hbase/pull/749#discussion_r348096334 ## File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java ## @@ -135,59 +127,46 @@ public void run() { try (WALEntryStream entryStream = new WALEntryStream(logQueue, fs, conf, currentPosition, metrics)) { while (isReaderRunning()) { // loop here to keep reusing stream while we can - if (!checkQuota()) { + if (manager.isBufferQuotaReached()) { +Threads.sleep(sleepForRetries); continue; } - WALEntryBatch batch = null; - while (entryStream.hasNext()) { -if (batch == null) { - batch = new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath()); -} + WALEntryBatch batch = + new WALEntryBatch(replicationBatchCountCapacity, replicationBatchSizeCapacity); + boolean hasNext; + while ((hasNext = entryStream.hasNext()) == true) { Entry entry = entryStream.next(); entry = filterEntry(entry); if (entry != null) { WALEdit edit = entry.getEdit(); if (edit != null && !edit.isEmpty()) { -long entrySize = getEntrySizeIncludeBulkLoad(entry); -long entrySizeExlucdeBulkLoad = getEntrySizeExcludeBulkLoad(entry); -batch.addEntry(entry); -replicationSourceManager.setPendingShipment(true); -updateBatchStats(batch, entry, entryStream.getPosition(), entrySize); -boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExlucdeBulkLoad); +long entrySizeExcludeBulkLoad = batch.addEntry(entry); +boolean totalBufferTooLarge = manager.acquireBufferQuota(entrySizeExcludeBulkLoad); // Stop if too many entries or too big -if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity -|| batch.getNbEntries() >= replicationBatchCountCapacity) { +if (totalBufferTooLarge || batch.isLimitReached()) { break; } } -} else { Review comment: I think the answer to my question above is in the _resetStream()_ that gets called at the end of the second while loop, which will update _lastReadPosition_ variable that is now used for reading here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [hbase] wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly update the position of WALs currently being replicated
wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly update the position of WALs currently being replicated URL: https://github.com/apache/hbase/pull/749#discussion_r348092931 ## File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java ## @@ -135,59 +127,46 @@ public void run() { try (WALEntryStream entryStream = new WALEntryStream(logQueue, fs, conf, currentPosition, metrics)) { while (isReaderRunning()) { // loop here to keep reusing stream while we can - if (!checkQuota()) { + if (manager.isBufferQuotaReached()) { +Threads.sleep(sleepForRetries); continue; } - WALEntryBatch batch = null; - while (entryStream.hasNext()) { -if (batch == null) { - batch = new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath()); -} + WALEntryBatch batch = + new WALEntryBatch(replicationBatchCountCapacity, replicationBatchSizeCapacity); + boolean hasNext; + while ((hasNext = entryStream.hasNext()) == true) { Entry entry = entryStream.next(); entry = filterEntry(entry); if (entry != null) { WALEdit edit = entry.getEdit(); if (edit != null && !edit.isEmpty()) { -long entrySize = getEntrySizeIncludeBulkLoad(entry); -long entrySizeExlucdeBulkLoad = getEntrySizeExcludeBulkLoad(entry); -batch.addEntry(entry); -replicationSourceManager.setPendingShipment(true); -updateBatchStats(batch, entry, entryStream.getPosition(), entrySize); -boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExlucdeBulkLoad); +long entrySizeExcludeBulkLoad = batch.addEntry(entry); +boolean totalBufferTooLarge = manager.acquireBufferQuota(entrySizeExcludeBulkLoad); // Stop if too many entries or too big -if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity -|| batch.getNbEntries() >= replicationBatchCountCapacity) { +if (totalBufferTooLarge || batch.isLimitReached()) { break; } } -} else { - replicationSourceManager.logPositionAndCleanOldLogs(entryStream.getCurrentPath(), -this.replicationQueueInfo.getPeerClusterZnode(), -entryStream.getPosition(), -this.replicationQueueInfo.isQueueRecovered(), false); } } - if (batch != null && (!batch.getLastSeqIds().isEmpty() || batch.getNbEntries() > 0)) { -if (LOG.isTraceEnabled()) { - LOG.trace(String.format("Read %s WAL entries eligible for replication", -batch.getNbEntries())); -} -entryBatchQueue.put(batch); + + if (LOG.isTraceEnabled()) { +LOG.trace(String.format("Read %s WAL entries eligible for replication", +batch.getNbEntries())); + } + + updateBatch(entryStream, batch, hasNext); + if (isShippable(batch)) { sleepMultiplier = 1; - } else { // got no entries and didn't advance position in WAL -LOG.trace("Didn't read any new entries from WAL"); -if (replicationQueueInfo.isQueueRecovered()) { - // we're done with queue recovery, shut ourself down +entryBatchQueue.put(batch); +if (!batch.hasMoreEntries()) { + // we're done with queue recovery, shut ourselves down setReaderRunning(false); - // shuts down shipper thread immediately - entryBatchQueue.put(batch != null ? batch - : new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath())); -} else { - Thread.sleep(sleepForRetries); } + } else { Review comment: Ok, so _batch.hasMoreEntries()_ returns true if this is isn't a recovered queue. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [hbase] wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly update the position of WALs currently being replicated
wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly update the position of WALs currently being replicated URL: https://github.com/apache/hbase/pull/749#discussion_r348087671 ## File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java ## @@ -135,59 +127,46 @@ public void run() { try (WALEntryStream entryStream = new WALEntryStream(logQueue, fs, conf, currentPosition, metrics)) { while (isReaderRunning()) { // loop here to keep reusing stream while we can - if (!checkQuota()) { + if (manager.isBufferQuotaReached()) { +Threads.sleep(sleepForRetries); continue; } - WALEntryBatch batch = null; - while (entryStream.hasNext()) { -if (batch == null) { - batch = new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath()); -} + WALEntryBatch batch = + new WALEntryBatch(replicationBatchCountCapacity, replicationBatchSizeCapacity); + boolean hasNext; + while ((hasNext = entryStream.hasNext()) == true) { Entry entry = entryStream.next(); entry = filterEntry(entry); if (entry != null) { WALEdit edit = entry.getEdit(); if (edit != null && !edit.isEmpty()) { -long entrySize = getEntrySizeIncludeBulkLoad(entry); -long entrySizeExlucdeBulkLoad = getEntrySizeExcludeBulkLoad(entry); -batch.addEntry(entry); -replicationSourceManager.setPendingShipment(true); -updateBatchStats(batch, entry, entryStream.getPosition(), entrySize); -boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExlucdeBulkLoad); +long entrySizeExcludeBulkLoad = batch.addEntry(entry); +boolean totalBufferTooLarge = manager.acquireBufferQuota(entrySizeExcludeBulkLoad); // Stop if too many entries or too big -if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity -|| batch.getNbEntries() >= replicationBatchCountCapacity) { +if (totalBufferTooLarge || batch.isLimitReached()) { break; } } -} else { Review comment: > If some use cases means "no mutations come for a long time, but a batch has entries" What if the whole WAL section read got no entries for replication? In this case, batch would be empty, so _ReplicationSourceManager.logPositionAndCleanOldLogs_ does not ever get called (at least, I guess, until the log is rolled). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [hbase] wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly update the position of WALs currently being replicated
wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly update the position of WALs currently being replicated URL: https://github.com/apache/hbase/pull/749#discussion_r348087671 ## File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java ## @@ -135,59 +127,46 @@ public void run() { try (WALEntryStream entryStream = new WALEntryStream(logQueue, fs, conf, currentPosition, metrics)) { while (isReaderRunning()) { // loop here to keep reusing stream while we can - if (!checkQuota()) { + if (manager.isBufferQuotaReached()) { +Threads.sleep(sleepForRetries); continue; } - WALEntryBatch batch = null; - while (entryStream.hasNext()) { -if (batch == null) { - batch = new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath()); -} + WALEntryBatch batch = + new WALEntryBatch(replicationBatchCountCapacity, replicationBatchSizeCapacity); + boolean hasNext; + while ((hasNext = entryStream.hasNext()) == true) { Entry entry = entryStream.next(); entry = filterEntry(entry); if (entry != null) { WALEdit edit = entry.getEdit(); if (edit != null && !edit.isEmpty()) { -long entrySize = getEntrySizeIncludeBulkLoad(entry); -long entrySizeExlucdeBulkLoad = getEntrySizeExcludeBulkLoad(entry); -batch.addEntry(entry); -replicationSourceManager.setPendingShipment(true); -updateBatchStats(batch, entry, entryStream.getPosition(), entrySize); -boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExlucdeBulkLoad); +long entrySizeExcludeBulkLoad = batch.addEntry(entry); +boolean totalBufferTooLarge = manager.acquireBufferQuota(entrySizeExcludeBulkLoad); // Stop if too many entries or too big -if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity -|| batch.getNbEntries() >= replicationBatchCountCapacity) { +if (totalBufferTooLarge || batch.isLimitReached()) { break; } } -} else { Review comment: > If some use cases means "no mutations come for a long time, but a batch has entries" What if the whole WAL section read got no entries for replication? In this case, batch would be empty, so _ReplicationSourceManager.logPositionAndCleanOldLogs_ does not ever get called (at least, I guess, until the log is rolled). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [hbase] wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly update the position of WALs currently being replicated
wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly update the position of WALs currently being replicated URL: https://github.com/apache/hbase/pull/749#discussion_r347131612 ## File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java ## @@ -135,59 +127,46 @@ public void run() { try (WALEntryStream entryStream = new WALEntryStream(logQueue, fs, conf, currentPosition, metrics)) { while (isReaderRunning()) { // loop here to keep reusing stream while we can - if (!checkQuota()) { + if (manager.isBufferQuotaReached()) { +Threads.sleep(sleepForRetries); continue; } - WALEntryBatch batch = null; - while (entryStream.hasNext()) { -if (batch == null) { - batch = new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath()); -} + WALEntryBatch batch = + new WALEntryBatch(replicationBatchCountCapacity, replicationBatchSizeCapacity); + boolean hasNext; + while ((hasNext = entryStream.hasNext()) == true) { Entry entry = entryStream.next(); entry = filterEntry(entry); if (entry != null) { WALEdit edit = entry.getEdit(); if (edit != null && !edit.isEmpty()) { -long entrySize = getEntrySizeIncludeBulkLoad(entry); -long entrySizeExlucdeBulkLoad = getEntrySizeExcludeBulkLoad(entry); -batch.addEntry(entry); -replicationSourceManager.setPendingShipment(true); -updateBatchStats(batch, entry, entryStream.getPosition(), entrySize); -boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExlucdeBulkLoad); +long entrySizeExcludeBulkLoad = batch.addEntry(entry); +boolean totalBufferTooLarge = manager.acquireBufferQuota(entrySizeExcludeBulkLoad); // Stop if too many entries or too big -if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity -|| batch.getNbEntries() >= replicationBatchCountCapacity) { +if (totalBufferTooLarge || batch.isLimitReached()) { break; } } -} else { Review comment: So if a filtered edit came, any sub-sequent non-filterable one would need to wait for a log roll? That could take too long for some use cases. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [hbase] wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly update the position of WALs currently being replicated
wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly update the position of WALs currently being replicated URL: https://github.com/apache/hbase/pull/749#discussion_r347130863 ## File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java ## @@ -135,59 +127,46 @@ public void run() { try (WALEntryStream entryStream = new WALEntryStream(logQueue, fs, conf, currentPosition, metrics)) { while (isReaderRunning()) { // loop here to keep reusing stream while we can - if (!checkQuota()) { + if (manager.isBufferQuotaReached()) { +Threads.sleep(sleepForRetries); continue; } - WALEntryBatch batch = null; - while (entryStream.hasNext()) { -if (batch == null) { - batch = new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath()); -} + WALEntryBatch batch = + new WALEntryBatch(replicationBatchCountCapacity, replicationBatchSizeCapacity); + boolean hasNext; + while ((hasNext = entryStream.hasNext()) == true) { Entry entry = entryStream.next(); entry = filterEntry(entry); if (entry != null) { WALEdit edit = entry.getEdit(); if (edit != null && !edit.isEmpty()) { -long entrySize = getEntrySizeIncludeBulkLoad(entry); -long entrySizeExlucdeBulkLoad = getEntrySizeExcludeBulkLoad(entry); -batch.addEntry(entry); -replicationSourceManager.setPendingShipment(true); -updateBatchStats(batch, entry, entryStream.getPosition(), entrySize); -boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExlucdeBulkLoad); +long entrySizeExcludeBulkLoad = batch.addEntry(entry); +boolean totalBufferTooLarge = manager.acquireBufferQuota(entrySizeExcludeBulkLoad); // Stop if too many entries or too big -if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity -|| batch.getNbEntries() >= replicationBatchCountCapacity) { +if (totalBufferTooLarge || batch.isLimitReached()) { break; } } -} else { - replicationSourceManager.logPositionAndCleanOldLogs(entryStream.getCurrentPath(), -this.replicationQueueInfo.getPeerClusterZnode(), -entryStream.getPosition(), -this.replicationQueueInfo.isQueueRecovered(), false); } } - if (batch != null && (!batch.getLastSeqIds().isEmpty() || batch.getNbEntries() > 0)) { -if (LOG.isTraceEnabled()) { - LOG.trace(String.format("Read %s WAL entries eligible for replication", -batch.getNbEntries())); -} -entryBatchQueue.put(batch); + + if (LOG.isTraceEnabled()) { Review comment: This adds extra complexity for troubleshooting and is even misleading, as the message says the entries are eligible for replication, even if they got filtered, which means just the opposite. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [hbase] wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly update the position of WALs currently being replicated
wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly update the position of WALs currently being replicated URL: https://github.com/apache/hbase/pull/749#discussion_r347130863 ## File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java ## @@ -135,59 +127,46 @@ public void run() { try (WALEntryStream entryStream = new WALEntryStream(logQueue, fs, conf, currentPosition, metrics)) { while (isReaderRunning()) { // loop here to keep reusing stream while we can - if (!checkQuota()) { + if (manager.isBufferQuotaReached()) { +Threads.sleep(sleepForRetries); continue; } - WALEntryBatch batch = null; - while (entryStream.hasNext()) { -if (batch == null) { - batch = new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath()); -} + WALEntryBatch batch = + new WALEntryBatch(replicationBatchCountCapacity, replicationBatchSizeCapacity); + boolean hasNext; + while ((hasNext = entryStream.hasNext()) == true) { Entry entry = entryStream.next(); entry = filterEntry(entry); if (entry != null) { WALEdit edit = entry.getEdit(); if (edit != null && !edit.isEmpty()) { -long entrySize = getEntrySizeIncludeBulkLoad(entry); -long entrySizeExlucdeBulkLoad = getEntrySizeExcludeBulkLoad(entry); -batch.addEntry(entry); -replicationSourceManager.setPendingShipment(true); -updateBatchStats(batch, entry, entryStream.getPosition(), entrySize); -boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExlucdeBulkLoad); +long entrySizeExcludeBulkLoad = batch.addEntry(entry); +boolean totalBufferTooLarge = manager.acquireBufferQuota(entrySizeExcludeBulkLoad); // Stop if too many entries or too big -if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity -|| batch.getNbEntries() >= replicationBatchCountCapacity) { +if (totalBufferTooLarge || batch.isLimitReached()) { break; } } -} else { - replicationSourceManager.logPositionAndCleanOldLogs(entryStream.getCurrentPath(), -this.replicationQueueInfo.getPeerClusterZnode(), -entryStream.getPosition(), -this.replicationQueueInfo.isQueueRecovered(), false); } } - if (batch != null && (!batch.getLastSeqIds().isEmpty() || batch.getNbEntries() > 0)) { -if (LOG.isTraceEnabled()) { - LOG.trace(String.format("Read %s WAL entries eligible for replication", -batch.getNbEntries())); -} -entryBatchQueue.put(batch); + + if (LOG.isTraceEnabled()) { Review comment: This can lead to confusion, adds extra complexity for troubleshooting and is even misleading, as the message says the entries are eligible for replication, even if they got filtered, which means just the opposite. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [hbase] wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly update the position of WALs currently being replicated
wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly update the position of WALs currently being replicated URL: https://github.com/apache/hbase/pull/749#discussion_r340533598 ## File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java ## @@ -466,8 +461,27 @@ void postLogRoll(Path newLog) throws IOException { } @VisibleForTesting - public AtomicLong getTotalBufferUsed() { -return totalBufferUsed; + public long getTotalBufferUsed() { Review comment: These are all a separate issue, not related to the wal position update, right? If so, please remove these from this PR and create a separate jira/PR for it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [hbase] wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly update the position of WALs currently being replicated
wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly update the position of WALs currently being replicated URL: https://github.com/apache/hbase/pull/749#discussion_r340530744 ## File path: hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java ## @@ -829,8 +830,9 @@ public void testEmptyWALRecovery() throws Exception { WAL wal = utility1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo); Path currentWalPath = DefaultWALProvider.getCurrentFileName(wal); String walGroupId = DefaultWALProvider.getWALPrefixFromWALName(currentWalPath.getName()); - Path emptyWalPath = new Path(utility1.getDataTestDir(), walGroupId + "." + ts); - utility1.getTestFileSystem().create(emptyWalPath).close(); + Path emptyWalPath = new Path(currentWalPath.getParent(), walGroupId + "." + ts); Review comment: Why is this change needed? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [hbase] wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly update the position of WALs currently being replicated
wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly update the position of WALs currently being replicated URL: https://github.com/apache/hbase/pull/749#discussion_r340547200 ## File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java ## @@ -135,59 +127,46 @@ public void run() { try (WALEntryStream entryStream = new WALEntryStream(logQueue, fs, conf, currentPosition, metrics)) { while (isReaderRunning()) { // loop here to keep reusing stream while we can - if (!checkQuota()) { + if (manager.isBufferQuotaReached()) { +Threads.sleep(sleepForRetries); continue; } - WALEntryBatch batch = null; - while (entryStream.hasNext()) { -if (batch == null) { - batch = new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath()); -} + WALEntryBatch batch = + new WALEntryBatch(replicationBatchCountCapacity, replicationBatchSizeCapacity); + boolean hasNext; + while ((hasNext = entryStream.hasNext()) == true) { Entry entry = entryStream.next(); entry = filterEntry(entry); if (entry != null) { WALEdit edit = entry.getEdit(); if (edit != null && !edit.isEmpty()) { -long entrySize = getEntrySizeIncludeBulkLoad(entry); -long entrySizeExlucdeBulkLoad = getEntrySizeExcludeBulkLoad(entry); -batch.addEntry(entry); -replicationSourceManager.setPendingShipment(true); -updateBatchStats(batch, entry, entryStream.getPosition(), entrySize); -boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExlucdeBulkLoad); +long entrySizeExcludeBulkLoad = batch.addEntry(entry); +boolean totalBufferTooLarge = manager.acquireBufferQuota(entrySizeExcludeBulkLoad); // Stop if too many entries or too big -if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity -|| batch.getNbEntries() >= replicationBatchCountCapacity) { +if (totalBufferTooLarge || batch.isLimitReached()) { break; } } -} else { - replicationSourceManager.logPositionAndCleanOldLogs(entryStream.getCurrentPath(), -this.replicationQueueInfo.getPeerClusterZnode(), -entryStream.getPosition(), -this.replicationQueueInfo.isQueueRecovered(), false); } } - if (batch != null && (!batch.getLastSeqIds().isEmpty() || batch.getNbEntries() > 0)) { -if (LOG.isTraceEnabled()) { - LOG.trace(String.format("Read %s WAL entries eligible for replication", -batch.getNbEntries())); -} -entryBatchQueue.put(batch); + + if (LOG.isTraceEnabled()) { +LOG.trace(String.format("Read %s WAL entries eligible for replication", +batch.getNbEntries())); + } + + updateBatch(entryStream, batch, hasNext); + if (isShippable(batch)) { sleepMultiplier = 1; - } else { // got no entries and didn't advance position in WAL -LOG.trace("Didn't read any new entries from WAL"); -if (replicationQueueInfo.isQueueRecovered()) { - // we're done with queue recovery, shut ourself down +entryBatchQueue.put(batch); +if (!batch.hasMoreEntries()) { + // we're done with queue recovery, shut ourselves down setReaderRunning(false); - // shuts down shipper thread immediately - entryBatchQueue.put(batch != null ? batch - : new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath())); -} else { - Thread.sleep(sleepForRetries); } + } else { Review comment: How about the recovered queue condition? If a source reading a recovered queue reached end of all wals, it should stop running, right? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [hbase] wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly update the position of WALs currently being replicated
wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly update the position of WALs currently being replicated URL: https://github.com/apache/hbase/pull/749#discussion_r340546055 ## File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java ## @@ -135,59 +127,46 @@ public void run() { try (WALEntryStream entryStream = new WALEntryStream(logQueue, fs, conf, currentPosition, metrics)) { while (isReaderRunning()) { // loop here to keep reusing stream while we can - if (!checkQuota()) { + if (manager.isBufferQuotaReached()) { +Threads.sleep(sleepForRetries); continue; } - WALEntryBatch batch = null; - while (entryStream.hasNext()) { -if (batch == null) { - batch = new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath()); -} + WALEntryBatch batch = + new WALEntryBatch(replicationBatchCountCapacity, replicationBatchSizeCapacity); + boolean hasNext; + while ((hasNext = entryStream.hasNext()) == true) { Entry entry = entryStream.next(); entry = filterEntry(entry); if (entry != null) { WALEdit edit = entry.getEdit(); if (edit != null && !edit.isEmpty()) { -long entrySize = getEntrySizeIncludeBulkLoad(entry); -long entrySizeExlucdeBulkLoad = getEntrySizeExcludeBulkLoad(entry); -batch.addEntry(entry); -replicationSourceManager.setPendingShipment(true); -updateBatchStats(batch, entry, entryStream.getPosition(), entrySize); -boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExlucdeBulkLoad); +long entrySizeExcludeBulkLoad = batch.addEntry(entry); +boolean totalBufferTooLarge = manager.acquireBufferQuota(entrySizeExcludeBulkLoad); // Stop if too many entries or too big -if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity -|| batch.getNbEntries() >= replicationBatchCountCapacity) { +if (totalBufferTooLarge || batch.isLimitReached()) { break; } } -} else { - replicationSourceManager.logPositionAndCleanOldLogs(entryStream.getCurrentPath(), -this.replicationQueueInfo.getPeerClusterZnode(), -entryStream.getPosition(), -this.replicationQueueInfo.isQueueRecovered(), false); } } - if (batch != null && (!batch.getLastSeqIds().isEmpty() || batch.getNbEntries() > 0)) { -if (LOG.isTraceEnabled()) { - LOG.trace(String.format("Read %s WAL entries eligible for replication", -batch.getNbEntries())); -} -entryBatchQueue.put(batch); + + if (LOG.isTraceEnabled()) { Review comment: Is this being logged all the time now? What if the edit got filtered due to not being target for replication? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [hbase] wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly update the position of WALs currently being replicated
wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly update the position of WALs currently being replicated URL: https://github.com/apache/hbase/pull/749#discussion_r340538075 ## File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java ## @@ -135,59 +127,46 @@ public void run() { try (WALEntryStream entryStream = new WALEntryStream(logQueue, fs, conf, currentPosition, metrics)) { while (isReaderRunning()) { // loop here to keep reusing stream while we can - if (!checkQuota()) { + if (manager.isBufferQuotaReached()) { +Threads.sleep(sleepForRetries); continue; } - WALEntryBatch batch = null; - while (entryStream.hasNext()) { -if (batch == null) { - batch = new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath()); -} + WALEntryBatch batch = + new WALEntryBatch(replicationBatchCountCapacity, replicationBatchSizeCapacity); + boolean hasNext; + while ((hasNext = entryStream.hasNext()) == true) { Entry entry = entryStream.next(); entry = filterEntry(entry); if (entry != null) { WALEdit edit = entry.getEdit(); if (edit != null && !edit.isEmpty()) { -long entrySize = getEntrySizeIncludeBulkLoad(entry); -long entrySizeExlucdeBulkLoad = getEntrySizeExcludeBulkLoad(entry); -batch.addEntry(entry); -replicationSourceManager.setPendingShipment(true); -updateBatchStats(batch, entry, entryStream.getPosition(), entrySize); -boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExlucdeBulkLoad); +long entrySizeExcludeBulkLoad = batch.addEntry(entry); +boolean totalBufferTooLarge = manager.acquireBufferQuota(entrySizeExcludeBulkLoad); // Stop if too many entries or too big -if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity -|| batch.getNbEntries() >= replicationBatchCountCapacity) { +if (totalBufferTooLarge || batch.isLimitReached()) { break; } } -} else { Review comment: Where is the log position getting updated now if the current edit is not targeted to replication? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [hbase] wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly update the position of WALs currently being replicated
wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly update the position of WALs currently being replicated URL: https://github.com/apache/hbase/pull/749#discussion_r340532286 ## File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java ## @@ -466,8 +461,27 @@ void postLogRoll(Path newLog) throws IOException { } @VisibleForTesting - public AtomicLong getTotalBufferUsed() { -return totalBufferUsed; + public long getTotalBufferUsed() { Review comment: Trivial, unnecessary change that just adds overhead to review. Please revert it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [hbase] wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly update the position of WALs currently being replicated
wchevreuil commented on a change in pull request #749: HBASE-23205 Correctly update the position of WALs currently being replicated URL: https://github.com/apache/hbase/pull/749#discussion_r340550929 ## File path: hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java ## @@ -378,37 +380,35 @@ public void testReplicationSourceWALReaderThread() throws Exception { } @Test - public void testReplicationSourceUpdatesLogPositionOnFilteredEntries() throws Exception { + public void testReplicationSourceWALReaderThreadRecoveredQueue() throws Exception { Review comment: Why this test has been changed from it's original purpose of checking for upating wal position when no edits targeted to replication are read? Also the name does not seem accurate, it does not seem to create a recovered queue scenario. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services