[GitHub] [hbase] sandeepvinayak commented on a change in pull request #3376: HBASE-25992 Polish the ReplicationSourceWALReader code for 2.x after …
sandeepvinayak commented on a change in pull request #3376: URL: https://github.com/apache/hbase/pull/3376#discussion_r649673898 ## File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java ## @@ -122,65 +122,51 @@ public ReplicationSourceWALReader(FileSystem fs, Configuration conf, @Override public void run() { int sleepMultiplier = 1; -WALEntryBatch batch = null; -WALEntryStream entryStream = null; -try { - // we only loop back here if something fatal happened to our stream - while (isReaderRunning()) { -try { - entryStream = -new WALEntryStream(logQueue, conf, currentPosition, source.getWALFileLengthProvider(), - source.getServerWALsBelongTo(), source.getSourceMetrics(), walGroupId); - while (isReaderRunning()) { // loop here to keep reusing stream while we can -if (!source.isPeerEnabled()) { - Threads.sleep(sleepForRetries); - continue; -} -if (!checkQuota()) { - continue; -} - -batch = createBatch(entryStream); -batch = readWALEntries(entryStream, batch); +while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream + WALEntryBatch batch = null; + try (WALEntryStream entryStream = + new WALEntryStream(logQueue, conf, currentPosition, + source.getWALFileLengthProvider(), source.getServerWALsBelongTo(), + source.getSourceMetrics(), walGroupId)) { +while (isReaderRunning()) { // loop here to keep reusing stream while we can + batch = null; + if (!source.isPeerEnabled()) { +Threads.sleep(sleepForRetries); +continue; + } + if (!checkQuota()) { +continue; + } + batch = tryAdvanceStreamAndCreateWALBatch(entryStream); + if (batch == null) { +// got no entries and didn't advance position in WAL +handleEmptyWALEntryBatch(); +entryStream.reset(); // reuse stream +continue; + } + // if we have already switched a file, skip reading and put it directly to the ship queue + if (!batch.isEndOfFile()) { +readWALEntries(entryStream, batch); currentPosition = entryStream.getPosition(); -if (batch == null) { - // either the queue have no WAL to read - // or got no new entries (didn't advance position in WAL) - handleEmptyWALEntryBatch(); - entryStream.reset(); // reuse stream -} else { - addBatchToShippingQueue(batch); -} } -} catch (WALEntryFilterRetryableException | IOException e) { // stream related - if (handleEofException(e, batch)) { -sleepMultiplier = 1; - } else { -LOG.warn("Failed to read stream of replication entries " - + "or replication filter is recovering", e); -if (sleepMultiplier < maxRetriesMultiplier) { - sleepMultiplier++; -} -Threads.sleep(sleepForRetries * sleepMultiplier); + // need to propagate the batch even it has no entries since it may carry the last + // sequence id information for serial replication. + LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries()); + entryBatchQueue.put(batch); + sleepMultiplier = 1; +} + } catch (IOException e) { // stream related +if (!handleEofException(e, batch)) { Review comment: I believe `handleEofException` returns true if are able to gracefully recover from a bad empty WAL which means we recovered from the exception so at that point resetting the sleep multiplier makes sense. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hbase] sandeepvinayak commented on a change in pull request #3376: HBASE-25992 Polish the ReplicationSourceWALReader code for 2.x after …
sandeepvinayak commented on a change in pull request #3376: URL: https://github.com/apache/hbase/pull/3376#discussion_r649435799 ## File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java ## @@ -270,43 +236,63 @@ private void handleEmptyWALEntryBatch() throws InterruptedException { } } + private WALEntryBatch tryAdvanceStreamAndCreateWALBatch(WALEntryStream entryStream) +throws IOException { +Path currentPath = entryStream.getCurrentPath(); +if (!entryStream.hasNext()) { + // check whether we have switched a file + if (currentPath != null && switched(entryStream, currentPath)) { +return WALEntryBatch.endOfFile(currentPath); + } else { +return null; + } +} +if (currentPath != null) { + if (switched(entryStream, currentPath)) { +return WALEntryBatch.endOfFile(currentPath); + } +} +return createBatch(entryStream); + } + /** * This is to handle the EOFException from the WAL entry stream. EOFException should * be handled carefully because there are chances of data loss because of never replicating * the data. Thus we should always try to ship existing batch of entries here. * If there was only one log in the queue before EOF, we ship the empty batch here * and since reader is still active, in the next iteration of reader we will * stop the reader. + * * If there was more than one log in the queue before EOF, we ship the existing batch * and reset the wal patch and position to the log with EOF, so shipper can remove * logs from replication queue * @return true only the IOE can be handled */ - private boolean handleEofException(Exception e, WALEntryBatch batch) - throws InterruptedException { + private boolean handleEofException(Exception e, WALEntryBatch batch) { PriorityBlockingQueue queue = logQueue.getQueue(walGroupId); // Dump the log even if logQueue size is 1 if the source is from recovered Source // since we don't add current log to recovered source queue so it is safe to remove. -if ((e instanceof EOFException || e.getCause() instanceof EOFException) - && (source.isRecovered() || queue.size() > 1) - && this.eofAutoRecovery) { +if ((e instanceof EOFException || e.getCause() instanceof EOFException) && + (source.isRecovered() || queue.size() > 1) && this.eofAutoRecovery) { Path head = queue.peek(); try { if (fs.getFileStatus(head).getLen() == 0) { // head of the queue is an empty log file LOG.warn("Forcing removal of 0 length log in queue: {}", head); logQueue.remove(walGroupId); currentPosition = 0; - // After we removed the WAL from the queue, we should - // try shipping the existing batch of entries and set the wal position - // and path to the wal just dequeued to correctly remove logs from the zk - batch.setLastWalPath(head); - batch.setLastWalPosition(currentPosition); - addBatchToShippingQueue(batch); + if (batch != null) { +// After we removed the WAL from the queue, we should try shipping the existing batch of +// entries +addBatchToShippingQueue(batch); Review comment: I don't think we need this now if we are sure that batch is empty since the batch can only have one WAL's data. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hbase] sandeepvinayak commented on a change in pull request #3376: HBASE-25992 Polish the ReplicationSourceWALReader code for 2.x after …
sandeepvinayak commented on a change in pull request #3376: URL: https://github.com/apache/hbase/pull/3376#discussion_r649431795 ## File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java ## @@ -122,65 +122,51 @@ public ReplicationSourceWALReader(FileSystem fs, Configuration conf, @Override public void run() { int sleepMultiplier = 1; -WALEntryBatch batch = null; -WALEntryStream entryStream = null; -try { - // we only loop back here if something fatal happened to our stream - while (isReaderRunning()) { -try { - entryStream = -new WALEntryStream(logQueue, conf, currentPosition, source.getWALFileLengthProvider(), - source.getServerWALsBelongTo(), source.getSourceMetrics(), walGroupId); - while (isReaderRunning()) { // loop here to keep reusing stream while we can -if (!source.isPeerEnabled()) { - Threads.sleep(sleepForRetries); - continue; -} -if (!checkQuota()) { - continue; -} - -batch = createBatch(entryStream); -batch = readWALEntries(entryStream, batch); +while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream + WALEntryBatch batch = null; + try (WALEntryStream entryStream = + new WALEntryStream(logQueue, conf, currentPosition, + source.getWALFileLengthProvider(), source.getServerWALsBelongTo(), + source.getSourceMetrics(), walGroupId)) { +while (isReaderRunning()) { // loop here to keep reusing stream while we can + batch = null; + if (!source.isPeerEnabled()) { +Threads.sleep(sleepForRetries); +continue; + } + if (!checkQuota()) { +continue; + } + batch = tryAdvanceStreamAndCreateWALBatch(entryStream); + if (batch == null) { +// got no entries and didn't advance position in WAL +handleEmptyWALEntryBatch(); +entryStream.reset(); // reuse stream +continue; + } + // if we have already switched a file, skip reading and put it directly to the ship queue + if (!batch.isEndOfFile()) { +readWALEntries(entryStream, batch); currentPosition = entryStream.getPosition(); -if (batch == null) { - // either the queue have no WAL to read - // or got no new entries (didn't advance position in WAL) - handleEmptyWALEntryBatch(); - entryStream.reset(); // reuse stream -} else { - addBatchToShippingQueue(batch); -} } -} catch (WALEntryFilterRetryableException | IOException e) { // stream related - if (handleEofException(e, batch)) { -sleepMultiplier = 1; - } else { -LOG.warn("Failed to read stream of replication entries " - + "or replication filter is recovering", e); -if (sleepMultiplier < maxRetriesMultiplier) { - sleepMultiplier++; -} -Threads.sleep(sleepForRetries * sleepMultiplier); + // need to propagate the batch even it has no entries since it may carry the last + // sequence id information for serial replication. + LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries()); + entryBatchQueue.put(batch); + sleepMultiplier = 1; +} + } catch (IOException e) { // stream related +if (!handleEofException(e, batch)) { Review comment: I think we need to reset the sleepMultiplier to 1 in the else part of this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org