[jira] [Updated] (HBASE-21503) Replication normal source can get stuck due potential race conditions between source wal reader and wal provider initialization threads.

2018-11-21 Thread Duo Zhang (JIRA)


 [ 
https://issues.apache.org/jira/browse/HBASE-21503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Duo Zhang updated HBASE-21503:
--
  Resolution: Fixed
Hadoop Flags: Reviewed
  Status: Resolved  (was: Patch Available)

Pushed to branch-2.0+. Thanks [~zghaobac] for reviewing.

And thanks [~wchevreuil] for the great analyzing.

> Replication normal source can get stuck due potential race conditions between 
> source wal reader and wal provider initialization threads.
> 
>
> Key: HBASE-21503
> URL: https://issues.apache.org/jira/browse/HBASE-21503
> Project: HBase
>  Issue Type: Bug
>  Components: Replication
>Reporter: Wellington Chevreuil
>Assignee: Wellington Chevreuil
>Priority: Blocker
> Fix For: 3.0.0, 2.2.0, 2.0.3, 2.1.2
>
> Attachments: HBASE-21503-master.001.patch, HBASE-21503.patch
>
>
> Noticed replication sources could get stuck while doing some tests that 
> involved RS restart. On these cases, upon RS restart, the newly created 
> normal source was reaching wal end and not recognising it was open for write, 
> what leads to remove it from source queue. Thus, no new OP get's replicated 
> unless this log goes to a recovery queue.
> Checking this further, my understanding is that, during restart, RS will 
> start replication services, which inits ReplicationSourceManager and 
> ReplicationSources for each wal group id, in below sequence:
> {noformat}
> HRegionServer -> Replication.startReplicationService() -> 
> ReplicationSourceManager.init() -> add ReplicationSource 
> {noformat}
> At this point, ReplicationSources have no paths yet, so WAL reader thread is 
> not running. ReplicationSourceManager is registered as a WAL listener, in 
> order to get notified whenever new wal file is available. During 
> ReplicationSourceManager and ReplicationSource instances creation, a 
> WALFileLengthProvider instance is obtained from WALProvider and cached by 
> both ReplicationSourceManager and ReplicationSource. The default 
> implementation for this WALFileLengthProvider is below, on WALProvider 
> interface:
> {noformat}
> default WALFileLengthProvider getWALFileLengthProvider() {
> return path -> getWALs().stream().map(w -> 
> w.getLogFileSizeIfBeingWritten(path))
> .filter(o -> o.isPresent()).findAny().orElse(OptionalLong.empty());
>   } 
> {noformat}
> Notice that if WALProvider.getWALs returns an empty list, this 
> WALFileLengthProvider instance is always going to return nothing. This is 
> relevant because when ReplicationSource finally starts 
> ReplicationSourceWALReader thread, it passes this WALFileLengthProvider, 
> which is used by WALEntryStream (inside the wal reader) to determine if wal 
> is being written (and should be kept in the queue) here:
> {noformat}
>   private void tryAdvanceEntry() throws IOException {
> if (checkReader()) {
>   boolean beingWritten = readNextEntryAndRecordReaderPosition();
>   LOG.trace("reading wal file {}. Current open for write: {}", 
> this.currentPath, beingWritten);
>   if (currentEntry == null && !beingWritten) {
> // no more entries in this log file, and the file is already closed, 
> i.e, rolled
> // Before dequeueing, we should always get one more attempt at 
> reading.
> // This is in case more entries came in after we opened the reader, 
> and the log is rolled
> // while we were reading. See HBASE-6758
> resetReader();
> readNextEntryAndRecordReaderPosition();
> if (currentEntry == null) {
>   if (checkAllBytesParsed()) { // now we're certain we're done with 
> this log file
> dequeueCurrentLog();
> if (openNextLog()) {
>   readNextEntryAndRecordReaderPosition();
> }
>   }
> }
>   }
> ...
> {noformat}
> Here code snippet for WALEntryStream.readNextEntryAndRecordReaderPosition() 
> method that relies on the WALFileLengthProvider:
> {noformat}
> ...
> #1   OptionalLong fileLength = 
> walFileLengthProvider.getLogFileSizeIfBeingWritten(currentPath);
> if (fileLength.isPresent() && readerPos > fileLength.getAsLong()) {
>   // see HBASE-14004, for AsyncFSWAL which uses fan-out, it is possible 
> that we read uncommitted
>   // data, so we need to make sure that we do not read beyond the 
> committed file length.
>   if (LOG.isDebugEnabled()) {
> LOG.debug("The provider tells us the valid length for " + currentPath 
> + " is " +
> fileLength.getAsLong() + ", but we have advanced to " + 
> readerPos);
>   }
>   resetReader();
>   return true;
> }
> if (readEntry != null) {
>   metrics.incrLogEditsRead();
>   

[jira] [Updated] (HBASE-21503) Replication normal source can get stuck due potential race conditions between source wal reader and wal provider initialization threads.

2018-11-20 Thread Duo Zhang (JIRA)


 [ 
https://issues.apache.org/jira/browse/HBASE-21503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Duo Zhang updated HBASE-21503:
--
Attachment: HBASE-21503.patch

> Replication normal source can get stuck due potential race conditions between 
> source wal reader and wal provider initialization threads.
> 
>
> Key: HBASE-21503
> URL: https://issues.apache.org/jira/browse/HBASE-21503
> Project: HBase
>  Issue Type: Bug
>  Components: Replication
>Reporter: Wellington Chevreuil
>Assignee: Wellington Chevreuil
>Priority: Blocker
> Fix For: 3.0.0, 2.2.0, 2.0.3, 2.1.2
>
> Attachments: HBASE-21503-master.001.patch, HBASE-21503.patch
>
>
> Noticed replication sources could get stuck while doing some tests that 
> involved RS restart. On these cases, upon RS restart, the newly created 
> normal source was reaching wal end and not recognising it was open for write, 
> what leads to remove it from source queue. Thus, no new OP get's replicated 
> unless this log goes to a recovery queue.
> Checking this further, my understanding is that, during restart, RS will 
> start replication services, which inits ReplicationSourceManager and 
> ReplicationSources for each wal group id, in below sequence:
> {noformat}
> HRegionServer -> Replication.startReplicationService() -> 
> ReplicationSourceManager.init() -> add ReplicationSource 
> {noformat}
> At this point, ReplicationSources have no paths yet, so WAL reader thread is 
> not running. ReplicationSourceManager is registered as a WAL listener, in 
> order to get notified whenever new wal file is available. During 
> ReplicationSourceManager and ReplicationSource instances creation, a 
> WALFileLengthProvider instance is obtained from WALProvider and cached by 
> both ReplicationSourceManager and ReplicationSource. The default 
> implementation for this WALFileLengthProvider is below, on WALProvider 
> interface:
> {noformat}
> default WALFileLengthProvider getWALFileLengthProvider() {
> return path -> getWALs().stream().map(w -> 
> w.getLogFileSizeIfBeingWritten(path))
> .filter(o -> o.isPresent()).findAny().orElse(OptionalLong.empty());
>   } 
> {noformat}
> Notice that if WALProvider.getWALs returns an empty list, this 
> WALFileLengthProvider instance is always going to return nothing. This is 
> relevant because when ReplicationSource finally starts 
> ReplicationSourceWALReader thread, it passes this WALFileLengthProvider, 
> which is used by WALEntryStream (inside the wal reader) to determine if wal 
> is being written (and should be kept in the queue) here:
> {noformat}
>   private void tryAdvanceEntry() throws IOException {
> if (checkReader()) {
>   boolean beingWritten = readNextEntryAndRecordReaderPosition();
>   LOG.trace("reading wal file {}. Current open for write: {}", 
> this.currentPath, beingWritten);
>   if (currentEntry == null && !beingWritten) {
> // no more entries in this log file, and the file is already closed, 
> i.e, rolled
> // Before dequeueing, we should always get one more attempt at 
> reading.
> // This is in case more entries came in after we opened the reader, 
> and the log is rolled
> // while we were reading. See HBASE-6758
> resetReader();
> readNextEntryAndRecordReaderPosition();
> if (currentEntry == null) {
>   if (checkAllBytesParsed()) { // now we're certain we're done with 
> this log file
> dequeueCurrentLog();
> if (openNextLog()) {
>   readNextEntryAndRecordReaderPosition();
> }
>   }
> }
>   }
> ...
> {noformat}
> Here code snippet for WALEntryStream.readNextEntryAndRecordReaderPosition() 
> method that relies on the WALFileLengthProvider:
> {noformat}
> ...
> #1   OptionalLong fileLength = 
> walFileLengthProvider.getLogFileSizeIfBeingWritten(currentPath);
> if (fileLength.isPresent() && readerPos > fileLength.getAsLong()) {
>   // see HBASE-14004, for AsyncFSWAL which uses fan-out, it is possible 
> that we read uncommitted
>   // data, so we need to make sure that we do not read beyond the 
> committed file length.
>   if (LOG.isDebugEnabled()) {
> LOG.debug("The provider tells us the valid length for " + currentPath 
> + " is " +
> fileLength.getAsLong() + ", but we have advanced to " + 
> readerPos);
>   }
>   resetReader();
>   return true;
> }
> if (readEntry != null) {
>   metrics.incrLogEditsRead();
>   metrics.incrLogReadInBytes(readerPos - currentPositionOfEntry);
> }
> currentEntry = readEntry; // could be null
> this.currentPositionOfReader = readerPos;
> return 

[jira] [Updated] (HBASE-21503) Replication normal source can get stuck due potential race conditions between source wal reader and wal provider initialization threads.

2018-11-20 Thread Duo Zhang (JIRA)


 [ 
https://issues.apache.org/jira/browse/HBASE-21503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Duo Zhang updated HBASE-21503:
--
Priority: Blocker  (was: Critical)

> Replication normal source can get stuck due potential race conditions between 
> source wal reader and wal provider initialization threads.
> 
>
> Key: HBASE-21503
> URL: https://issues.apache.org/jira/browse/HBASE-21503
> Project: HBase
>  Issue Type: Bug
>  Components: Replication
>Reporter: Wellington Chevreuil
>Assignee: Wellington Chevreuil
>Priority: Blocker
> Fix For: 3.0.0, 2.2.0, 2.0.3, 2.1.2
>
> Attachments: HBASE-21503-master.001.patch
>
>
> Noticed replication sources could get stuck while doing some tests that 
> involved RS restart. On these cases, upon RS restart, the newly created 
> normal source was reaching wal end and not recognising it was open for write, 
> what leads to remove it from source queue. Thus, no new OP get's replicated 
> unless this log goes to a recovery queue.
> Checking this further, my understanding is that, during restart, RS will 
> start replication services, which inits ReplicationSourceManager and 
> ReplicationSources for each wal group id, in below sequence:
> {noformat}
> HRegionServer -> Replication.startReplicationService() -> 
> ReplicationSourceManager.init() -> add ReplicationSource 
> {noformat}
> At this point, ReplicationSources have no paths yet, so WAL reader thread is 
> not running. ReplicationSourceManager is registered as a WAL listener, in 
> order to get notified whenever new wal file is available. During 
> ReplicationSourceManager and ReplicationSource instances creation, a 
> WALFileLengthProvider instance is obtained from WALProvider and cached by 
> both ReplicationSourceManager and ReplicationSource. The default 
> implementation for this WALFileLengthProvider is below, on WALProvider 
> interface:
> {noformat}
> default WALFileLengthProvider getWALFileLengthProvider() {
> return path -> getWALs().stream().map(w -> 
> w.getLogFileSizeIfBeingWritten(path))
> .filter(o -> o.isPresent()).findAny().orElse(OptionalLong.empty());
>   } 
> {noformat}
> Notice that if WALProvider.getWALs returns an empty list, this 
> WALFileLengthProvider instance is always going to return nothing. This is 
> relevant because when ReplicationSource finally starts 
> ReplicationSourceWALReader thread, it passes this WALFileLengthProvider, 
> which is used by WALEntryStream (inside the wal reader) to determine if wal 
> is being written (and should be kept in the queue) here:
> {noformat}
>   private void tryAdvanceEntry() throws IOException {
> if (checkReader()) {
>   boolean beingWritten = readNextEntryAndRecordReaderPosition();
>   LOG.trace("reading wal file {}. Current open for write: {}", 
> this.currentPath, beingWritten);
>   if (currentEntry == null && !beingWritten) {
> // no more entries in this log file, and the file is already closed, 
> i.e, rolled
> // Before dequeueing, we should always get one more attempt at 
> reading.
> // This is in case more entries came in after we opened the reader, 
> and the log is rolled
> // while we were reading. See HBASE-6758
> resetReader();
> readNextEntryAndRecordReaderPosition();
> if (currentEntry == null) {
>   if (checkAllBytesParsed()) { // now we're certain we're done with 
> this log file
> dequeueCurrentLog();
> if (openNextLog()) {
>   readNextEntryAndRecordReaderPosition();
> }
>   }
> }
>   }
> ...
> {noformat}
> Here code snippet for WALEntryStream.readNextEntryAndRecordReaderPosition() 
> method that relies on the WALFileLengthProvider:
> {noformat}
> ...
> #1   OptionalLong fileLength = 
> walFileLengthProvider.getLogFileSizeIfBeingWritten(currentPath);
> if (fileLength.isPresent() && readerPos > fileLength.getAsLong()) {
>   // see HBASE-14004, for AsyncFSWAL which uses fan-out, it is possible 
> that we read uncommitted
>   // data, so we need to make sure that we do not read beyond the 
> committed file length.
>   if (LOG.isDebugEnabled()) {
> LOG.debug("The provider tells us the valid length for " + currentPath 
> + " is " +
> fileLength.getAsLong() + ", but we have advanced to " + 
> readerPos);
>   }
>   resetReader();
>   return true;
> }
> if (readEntry != null) {
>   metrics.incrLogEditsRead();
>   metrics.incrLogReadInBytes(readerPos - currentPositionOfEntry);
> }
> currentEntry = readEntry; // could be null
> this.currentPositionOfReader = readerPos;
> return 

[jira] [Updated] (HBASE-21503) Replication normal source can get stuck due potential race conditions between source wal reader and wal provider initialization threads.

2018-11-20 Thread Duo Zhang (JIRA)


 [ 
https://issues.apache.org/jira/browse/HBASE-21503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Duo Zhang updated HBASE-21503:
--
Component/s: Replication

> Replication normal source can get stuck due potential race conditions between 
> source wal reader and wal provider initialization threads.
> 
>
> Key: HBASE-21503
> URL: https://issues.apache.org/jira/browse/HBASE-21503
> Project: HBase
>  Issue Type: Bug
>  Components: Replication
>Reporter: Wellington Chevreuil
>Assignee: Wellington Chevreuil
>Priority: Critical
> Fix For: 3.0.0, 2.2.0, 2.0.3, 2.1.2
>
> Attachments: HBASE-21503-master.001.patch
>
>
> Noticed replication sources could get stuck while doing some tests that 
> involved RS restart. On these cases, upon RS restart, the newly created 
> normal source was reaching wal end and not recognising it was open for write, 
> what leads to remove it from source queue. Thus, no new OP get's replicated 
> unless this log goes to a recovery queue.
> Checking this further, my understanding is that, during restart, RS will 
> start replication services, which inits ReplicationSourceManager and 
> ReplicationSources for each wal group id, in below sequence:
> {noformat}
> HRegionServer -> Replication.startReplicationService() -> 
> ReplicationSourceManager.init() -> add ReplicationSource 
> {noformat}
> At this point, ReplicationSources have no paths yet, so WAL reader thread is 
> not running. ReplicationSourceManager is registered as a WAL listener, in 
> order to get notified whenever new wal file is available. During 
> ReplicationSourceManager and ReplicationSource instances creation, a 
> WALFileLengthProvider instance is obtained from WALProvider and cached by 
> both ReplicationSourceManager and ReplicationSource. The default 
> implementation for this WALFileLengthProvider is below, on WALProvider 
> interface:
> {noformat}
> default WALFileLengthProvider getWALFileLengthProvider() {
> return path -> getWALs().stream().map(w -> 
> w.getLogFileSizeIfBeingWritten(path))
> .filter(o -> o.isPresent()).findAny().orElse(OptionalLong.empty());
>   } 
> {noformat}
> Notice that if WALProvider.getWALs returns an empty list, this 
> WALFileLengthProvider instance is always going to return nothing. This is 
> relevant because when ReplicationSource finally starts 
> ReplicationSourceWALReader thread, it passes this WALFileLengthProvider, 
> which is used by WALEntryStream (inside the wal reader) to determine if wal 
> is being written (and should be kept in the queue) here:
> {noformat}
>   private void tryAdvanceEntry() throws IOException {
> if (checkReader()) {
>   boolean beingWritten = readNextEntryAndRecordReaderPosition();
>   LOG.trace("reading wal file {}. Current open for write: {}", 
> this.currentPath, beingWritten);
>   if (currentEntry == null && !beingWritten) {
> // no more entries in this log file, and the file is already closed, 
> i.e, rolled
> // Before dequeueing, we should always get one more attempt at 
> reading.
> // This is in case more entries came in after we opened the reader, 
> and the log is rolled
> // while we were reading. See HBASE-6758
> resetReader();
> readNextEntryAndRecordReaderPosition();
> if (currentEntry == null) {
>   if (checkAllBytesParsed()) { // now we're certain we're done with 
> this log file
> dequeueCurrentLog();
> if (openNextLog()) {
>   readNextEntryAndRecordReaderPosition();
> }
>   }
> }
>   }
> ...
> {noformat}
> Here code snippet for WALEntryStream.readNextEntryAndRecordReaderPosition() 
> method that relies on the WALFileLengthProvider:
> {noformat}
> ...
> #1   OptionalLong fileLength = 
> walFileLengthProvider.getLogFileSizeIfBeingWritten(currentPath);
> if (fileLength.isPresent() && readerPos > fileLength.getAsLong()) {
>   // see HBASE-14004, for AsyncFSWAL which uses fan-out, it is possible 
> that we read uncommitted
>   // data, so we need to make sure that we do not read beyond the 
> committed file length.
>   if (LOG.isDebugEnabled()) {
> LOG.debug("The provider tells us the valid length for " + currentPath 
> + " is " +
> fileLength.getAsLong() + ", but we have advanced to " + 
> readerPos);
>   }
>   resetReader();
>   return true;
> }
> if (readEntry != null) {
>   metrics.incrLogEditsRead();
>   metrics.incrLogReadInBytes(readerPos - currentPositionOfEntry);
> }
> currentEntry = readEntry; // could be null
> this.currentPositionOfReader = readerPos;
> return fileLength.isPresent();

[jira] [Updated] (HBASE-21503) Replication normal source can get stuck due potential race conditions between source wal reader and wal provider initialization threads.

2018-11-20 Thread Duo Zhang (JIRA)


 [ 
https://issues.apache.org/jira/browse/HBASE-21503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Duo Zhang updated HBASE-21503:
--
Priority: Critical  (was: Major)

> Replication normal source can get stuck due potential race conditions between 
> source wal reader and wal provider initialization threads.
> 
>
> Key: HBASE-21503
> URL: https://issues.apache.org/jira/browse/HBASE-21503
> Project: HBase
>  Issue Type: Bug
>Reporter: Wellington Chevreuil
>Assignee: Wellington Chevreuil
>Priority: Critical
> Fix For: 3.0.0, 2.2.0, 2.0.3, 2.1.2
>
> Attachments: HBASE-21503-master.001.patch
>
>
> Noticed replication sources could get stuck while doing some tests that 
> involved RS restart. On these cases, upon RS restart, the newly created 
> normal source was reaching wal end and not recognising it was open for write, 
> what leads to remove it from source queue. Thus, no new OP get's replicated 
> unless this log goes to a recovery queue.
> Checking this further, my understanding is that, during restart, RS will 
> start replication services, which inits ReplicationSourceManager and 
> ReplicationSources for each wal group id, in below sequence:
> {noformat}
> HRegionServer -> Replication.startReplicationService() -> 
> ReplicationSourceManager.init() -> add ReplicationSource 
> {noformat}
> At this point, ReplicationSources have no paths yet, so WAL reader thread is 
> not running. ReplicationSourceManager is registered as a WAL listener, in 
> order to get notified whenever new wal file is available. During 
> ReplicationSourceManager and ReplicationSource instances creation, a 
> WALFileLengthProvider instance is obtained from WALProvider and cached by 
> both ReplicationSourceManager and ReplicationSource. The default 
> implementation for this WALFileLengthProvider is below, on WALProvider 
> interface:
> {noformat}
> default WALFileLengthProvider getWALFileLengthProvider() {
> return path -> getWALs().stream().map(w -> 
> w.getLogFileSizeIfBeingWritten(path))
> .filter(o -> o.isPresent()).findAny().orElse(OptionalLong.empty());
>   } 
> {noformat}
> Notice that if WALProvider.getWALs returns an empty list, this 
> WALFileLengthProvider instance is always going to return nothing. This is 
> relevant because when ReplicationSource finally starts 
> ReplicationSourceWALReader thread, it passes this WALFileLengthProvider, 
> which is used by WALEntryStream (inside the wal reader) to determine if wal 
> is being written (and should be kept in the queue) here:
> {noformat}
>   private void tryAdvanceEntry() throws IOException {
> if (checkReader()) {
>   boolean beingWritten = readNextEntryAndRecordReaderPosition();
>   LOG.trace("reading wal file {}. Current open for write: {}", 
> this.currentPath, beingWritten);
>   if (currentEntry == null && !beingWritten) {
> // no more entries in this log file, and the file is already closed, 
> i.e, rolled
> // Before dequeueing, we should always get one more attempt at 
> reading.
> // This is in case more entries came in after we opened the reader, 
> and the log is rolled
> // while we were reading. See HBASE-6758
> resetReader();
> readNextEntryAndRecordReaderPosition();
> if (currentEntry == null) {
>   if (checkAllBytesParsed()) { // now we're certain we're done with 
> this log file
> dequeueCurrentLog();
> if (openNextLog()) {
>   readNextEntryAndRecordReaderPosition();
> }
>   }
> }
>   }
> ...
> {noformat}
> Here code snippet for WALEntryStream.readNextEntryAndRecordReaderPosition() 
> method that relies on the WALFileLengthProvider:
> {noformat}
> ...
> #1   OptionalLong fileLength = 
> walFileLengthProvider.getLogFileSizeIfBeingWritten(currentPath);
> if (fileLength.isPresent() && readerPos > fileLength.getAsLong()) {
>   // see HBASE-14004, for AsyncFSWAL which uses fan-out, it is possible 
> that we read uncommitted
>   // data, so we need to make sure that we do not read beyond the 
> committed file length.
>   if (LOG.isDebugEnabled()) {
> LOG.debug("The provider tells us the valid length for " + currentPath 
> + " is " +
> fileLength.getAsLong() + ", but we have advanced to " + 
> readerPos);
>   }
>   resetReader();
>   return true;
> }
> if (readEntry != null) {
>   metrics.incrLogEditsRead();
>   metrics.incrLogReadInBytes(readerPos - currentPositionOfEntry);
> }
> currentEntry = readEntry; // could be null
> this.currentPositionOfReader = readerPos;
> return fileLength.isPresent();
> ...
> {noformat}
> The 

[jira] [Updated] (HBASE-21503) Replication normal source can get stuck due potential race conditions between source wal reader and wal provider initialization threads.

2018-11-20 Thread Duo Zhang (JIRA)


 [ 
https://issues.apache.org/jira/browse/HBASE-21503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Duo Zhang updated HBASE-21503:
--
Fix Version/s: 2.1.2
   2.0.3
   2.2.0
   3.0.0

> Replication normal source can get stuck due potential race conditions between 
> source wal reader and wal provider initialization threads.
> 
>
> Key: HBASE-21503
> URL: https://issues.apache.org/jira/browse/HBASE-21503
> Project: HBase
>  Issue Type: Bug
>Reporter: Wellington Chevreuil
>Assignee: Wellington Chevreuil
>Priority: Major
> Fix For: 3.0.0, 2.2.0, 2.0.3, 2.1.2
>
> Attachments: HBASE-21503-master.001.patch
>
>
> Noticed replication sources could get stuck while doing some tests that 
> involved RS restart. On these cases, upon RS restart, the newly created 
> normal source was reaching wal end and not recognising it was open for write, 
> what leads to remove it from source queue. Thus, no new OP get's replicated 
> unless this log goes to a recovery queue.
> Checking this further, my understanding is that, during restart, RS will 
> start replication services, which inits ReplicationSourceManager and 
> ReplicationSources for each wal group id, in below sequence:
> {noformat}
> HRegionServer -> Replication.startReplicationService() -> 
> ReplicationSourceManager.init() -> add ReplicationSource 
> {noformat}
> At this point, ReplicationSources have no paths yet, so WAL reader thread is 
> not running. ReplicationSourceManager is registered as a WAL listener, in 
> order to get notified whenever new wal file is available. During 
> ReplicationSourceManager and ReplicationSource instances creation, a 
> WALFileLengthProvider instance is obtained from WALProvider and cached by 
> both ReplicationSourceManager and ReplicationSource. The default 
> implementation for this WALFileLengthProvider is below, on WALProvider 
> interface:
> {noformat}
> default WALFileLengthProvider getWALFileLengthProvider() {
> return path -> getWALs().stream().map(w -> 
> w.getLogFileSizeIfBeingWritten(path))
> .filter(o -> o.isPresent()).findAny().orElse(OptionalLong.empty());
>   } 
> {noformat}
> Notice that if WALProvider.getWALs returns an empty list, this 
> WALFileLengthProvider instance is always going to return nothing. This is 
> relevant because when ReplicationSource finally starts 
> ReplicationSourceWALReader thread, it passes this WALFileLengthProvider, 
> which is used by WALEntryStream (inside the wal reader) to determine if wal 
> is being written (and should be kept in the queue) here:
> {noformat}
>   private void tryAdvanceEntry() throws IOException {
> if (checkReader()) {
>   boolean beingWritten = readNextEntryAndRecordReaderPosition();
>   LOG.trace("reading wal file {}. Current open for write: {}", 
> this.currentPath, beingWritten);
>   if (currentEntry == null && !beingWritten) {
> // no more entries in this log file, and the file is already closed, 
> i.e, rolled
> // Before dequeueing, we should always get one more attempt at 
> reading.
> // This is in case more entries came in after we opened the reader, 
> and the log is rolled
> // while we were reading. See HBASE-6758
> resetReader();
> readNextEntryAndRecordReaderPosition();
> if (currentEntry == null) {
>   if (checkAllBytesParsed()) { // now we're certain we're done with 
> this log file
> dequeueCurrentLog();
> if (openNextLog()) {
>   readNextEntryAndRecordReaderPosition();
> }
>   }
> }
>   }
> ...
> {noformat}
> Here code snippet for WALEntryStream.readNextEntryAndRecordReaderPosition() 
> method that relies on the WALFileLengthProvider:
> {noformat}
> ...
> #1   OptionalLong fileLength = 
> walFileLengthProvider.getLogFileSizeIfBeingWritten(currentPath);
> if (fileLength.isPresent() && readerPos > fileLength.getAsLong()) {
>   // see HBASE-14004, for AsyncFSWAL which uses fan-out, it is possible 
> that we read uncommitted
>   // data, so we need to make sure that we do not read beyond the 
> committed file length.
>   if (LOG.isDebugEnabled()) {
> LOG.debug("The provider tells us the valid length for " + currentPath 
> + " is " +
> fileLength.getAsLong() + ", but we have advanced to " + 
> readerPos);
>   }
>   resetReader();
>   return true;
> }
> if (readEntry != null) {
>   metrics.incrLogEditsRead();
>   metrics.incrLogReadInBytes(readerPos - currentPositionOfEntry);
> }
> currentEntry = readEntry; // could be null
> this.currentPositionOfReader = readerPos;
>   

[jira] [Updated] (HBASE-21503) Replication normal source can get stuck due potential race conditions between source wal reader and wal provider initialization threads.

2018-11-20 Thread Wellington Chevreuil (JIRA)


 [ 
https://issues.apache.org/jira/browse/HBASE-21503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wellington Chevreuil updated HBASE-21503:
-
Status: Patch Available  (was: Open)

> Replication normal source can get stuck due potential race conditions between 
> source wal reader and wal provider initialization threads.
> 
>
> Key: HBASE-21503
> URL: https://issues.apache.org/jira/browse/HBASE-21503
> Project: HBase
>  Issue Type: Bug
>Reporter: Wellington Chevreuil
>Assignee: Wellington Chevreuil
>Priority: Major
> Attachments: HBASE-21503-master.001.patch
>
>
> Noticed replication sources could get stuck while doing some tests that 
> involved RS restart. On these cases, upon RS restart, the newly created 
> normal source was reaching wal end and not recognising it was open for write, 
> what leads to remove it from source queue. Thus, no new OP get's replicated 
> unless this log goes to a recovery queue.
> Checking this further, my understanding is that, during restart, RS will 
> start replication services, which inits ReplicationSourceManager and 
> ReplicationSources for each wal group id, in below sequence:
> {noformat}
> HRegionServer -> Replication.startReplicationService() -> 
> ReplicationSourceManager.init() -> add ReplicationSource 
> {noformat}
> At this point, ReplicationSources have no paths yet, so WAL reader thread is 
> not running. ReplicationSourceManager is registered as a WAL listener, in 
> order to get notified whenever new wal file is available. During 
> ReplicationSourceManager and ReplicationSource instances creation, a 
> WALFileLengthProvider instance is obtained from WALProvider and cached by 
> both ReplicationSourceManager and ReplicationSource. The default 
> implementation for this WALFileLengthProvider is below, on WALProvider 
> interface:
> {noformat}
> default WALFileLengthProvider getWALFileLengthProvider() {
> return path -> getWALs().stream().map(w -> 
> w.getLogFileSizeIfBeingWritten(path))
> .filter(o -> o.isPresent()).findAny().orElse(OptionalLong.empty());
>   } 
> {noformat}
> Notice that if WALProvider.getWALs returns an empty list, this 
> WALFileLengthProvider instance is always going to return nothing. This is 
> relevant because when ReplicationSource finally starts 
> ReplicationSourceWALReader thread, it passes this WALFileLengthProvider, 
> which is used by WALEntryStream (inside the wal reader) to determine if wal 
> is being written (and should be kept in the queue) here:
> {noformat}
>   private void tryAdvanceEntry() throws IOException {
> if (checkReader()) {
>   boolean beingWritten = readNextEntryAndRecordReaderPosition();
>   LOG.trace("reading wal file {}. Current open for write: {}", 
> this.currentPath, beingWritten);
>   if (currentEntry == null && !beingWritten) {
> // no more entries in this log file, and the file is already closed, 
> i.e, rolled
> // Before dequeueing, we should always get one more attempt at 
> reading.
> // This is in case more entries came in after we opened the reader, 
> and the log is rolled
> // while we were reading. See HBASE-6758
> resetReader();
> readNextEntryAndRecordReaderPosition();
> if (currentEntry == null) {
>   if (checkAllBytesParsed()) { // now we're certain we're done with 
> this log file
> dequeueCurrentLog();
> if (openNextLog()) {
>   readNextEntryAndRecordReaderPosition();
> }
>   }
> }
>   }
> ...
> {noformat}
> Here code snippet for WALEntryStream.readNextEntryAndRecordReaderPosition() 
> method that relies on the WALFileLengthProvider:
> {noformat}
> ...
> #1   OptionalLong fileLength = 
> walFileLengthProvider.getLogFileSizeIfBeingWritten(currentPath);
> if (fileLength.isPresent() && readerPos > fileLength.getAsLong()) {
>   // see HBASE-14004, for AsyncFSWAL which uses fan-out, it is possible 
> that we read uncommitted
>   // data, so we need to make sure that we do not read beyond the 
> committed file length.
>   if (LOG.isDebugEnabled()) {
> LOG.debug("The provider tells us the valid length for " + currentPath 
> + " is " +
> fileLength.getAsLong() + ", but we have advanced to " + 
> readerPos);
>   }
>   resetReader();
>   return true;
> }
> if (readEntry != null) {
>   metrics.incrLogEditsRead();
>   metrics.incrLogReadInBytes(readerPos - currentPositionOfEntry);
> }
> currentEntry = readEntry; // could be null
> this.currentPositionOfReader = readerPos;
> return fileLength.isPresent();
> ...
> {noformat}
> The problem can occur because when 

[jira] [Updated] (HBASE-21503) Replication normal source can get stuck due potential race conditions between source wal reader and wal provider initialization threads.

2018-11-20 Thread Wellington Chevreuil (JIRA)


 [ 
https://issues.apache.org/jira/browse/HBASE-21503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wellington Chevreuil updated HBASE-21503:
-
Attachment: HBASE-21503-master.001.patch

> Replication normal source can get stuck due potential race conditions between 
> source wal reader and wal provider initialization threads.
> 
>
> Key: HBASE-21503
> URL: https://issues.apache.org/jira/browse/HBASE-21503
> Project: HBase
>  Issue Type: Bug
>Reporter: Wellington Chevreuil
>Assignee: Wellington Chevreuil
>Priority: Major
> Attachments: HBASE-21503-master.001.patch
>
>
> Noticed replication sources could get stuck while doing some tests that 
> involved RS restart. On these cases, upon RS restart, the newly created 
> normal source was reaching wal end and not recognising it was open for write, 
> what leads to remove it from source queue. Thus, no new OP get's replicated 
> unless this log goes to a recovery queue.
> Checking this further, my understanding is that, during restart, RS will 
> start replication services, which inits ReplicationSourceManager and 
> ReplicationSources for each wal group id, in below sequence:
> {noformat}
> HRegionServer -> Replication.startReplicationService() -> 
> ReplicationSourceManager.init() -> add ReplicationSource 
> {noformat}
> At this point, ReplicationSources have no paths yet, so WAL reader thread is 
> not running. ReplicationSourceManager is registered as a WAL listener, in 
> order to get notified whenever new wal file is available. During 
> ReplicationSourceManager and ReplicationSource instances creation, a 
> WALFileLengthProvider instance is obtained from WALProvider and cached by 
> both ReplicationSourceManager and ReplicationSource. The default 
> implementation for this WALFileLengthProvider is below, on WALProvider 
> interface:
> {noformat}
> default WALFileLengthProvider getWALFileLengthProvider() {
> return path -> getWALs().stream().map(w -> 
> w.getLogFileSizeIfBeingWritten(path))
> .filter(o -> o.isPresent()).findAny().orElse(OptionalLong.empty());
>   } 
> {noformat}
> Notice that if WALProvider.getWALs returns an empty list, this 
> WALFileLengthProvider instance is always going to return nothing. This is 
> relevant because when ReplicationSource finally starts 
> ReplicationSourceWALReader thread, it passes this WALFileLengthProvider, 
> which is used by WALEntryStream (inside the wal reader) to determine if wal 
> is being written (and should be kept in the queue) here:
> {noformat}
>   private void tryAdvanceEntry() throws IOException {
> if (checkReader()) {
>   boolean beingWritten = readNextEntryAndRecordReaderPosition();
>   LOG.trace("reading wal file {}. Current open for write: {}", 
> this.currentPath, beingWritten);
>   if (currentEntry == null && !beingWritten) {
> // no more entries in this log file, and the file is already closed, 
> i.e, rolled
> // Before dequeueing, we should always get one more attempt at 
> reading.
> // This is in case more entries came in after we opened the reader, 
> and the log is rolled
> // while we were reading. See HBASE-6758
> resetReader();
> readNextEntryAndRecordReaderPosition();
> if (currentEntry == null) {
>   if (checkAllBytesParsed()) { // now we're certain we're done with 
> this log file
> dequeueCurrentLog();
> if (openNextLog()) {
>   readNextEntryAndRecordReaderPosition();
> }
>   }
> }
>   }
> ...
> {noformat}
> Here code snippet for WALEntryStream.readNextEntryAndRecordReaderPosition() 
> method that relies on the WALFileLengthProvider:
> {noformat}
> ...
> #1   OptionalLong fileLength = 
> walFileLengthProvider.getLogFileSizeIfBeingWritten(currentPath);
> if (fileLength.isPresent() && readerPos > fileLength.getAsLong()) {
>   // see HBASE-14004, for AsyncFSWAL which uses fan-out, it is possible 
> that we read uncommitted
>   // data, so we need to make sure that we do not read beyond the 
> committed file length.
>   if (LOG.isDebugEnabled()) {
> LOG.debug("The provider tells us the valid length for " + currentPath 
> + " is " +
> fileLength.getAsLong() + ", but we have advanced to " + 
> readerPos);
>   }
>   resetReader();
>   return true;
> }
> if (readEntry != null) {
>   metrics.incrLogEditsRead();
>   metrics.incrLogReadInBytes(readerPos - currentPositionOfEntry);
> }
> currentEntry = readEntry; // could be null
> this.currentPositionOfReader = readerPos;
> return fileLength.isPresent();
> ...
> {noformat}
> The problem can occur because