[jira] [Updated] (HBASE-21503) Replication normal source can get stuck due potential race conditions between source wal reader and wal provider initialization threads.
[ https://issues.apache.org/jira/browse/HBASE-21503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Duo Zhang updated HBASE-21503: -- Resolution: Fixed Hadoop Flags: Reviewed Status: Resolved (was: Patch Available) Pushed to branch-2.0+. Thanks [~zghaobac] for reviewing. And thanks [~wchevreuil] for the great analyzing. > Replication normal source can get stuck due potential race conditions between > source wal reader and wal provider initialization threads. > > > Key: HBASE-21503 > URL: https://issues.apache.org/jira/browse/HBASE-21503 > Project: HBase > Issue Type: Bug > Components: Replication >Reporter: Wellington Chevreuil >Assignee: Wellington Chevreuil >Priority: Blocker > Fix For: 3.0.0, 2.2.0, 2.0.3, 2.1.2 > > Attachments: HBASE-21503-master.001.patch, HBASE-21503.patch > > > Noticed replication sources could get stuck while doing some tests that > involved RS restart. On these cases, upon RS restart, the newly created > normal source was reaching wal end and not recognising it was open for write, > what leads to remove it from source queue. Thus, no new OP get's replicated > unless this log goes to a recovery queue. > Checking this further, my understanding is that, during restart, RS will > start replication services, which inits ReplicationSourceManager and > ReplicationSources for each wal group id, in below sequence: > {noformat} > HRegionServer -> Replication.startReplicationService() -> > ReplicationSourceManager.init() -> add ReplicationSource > {noformat} > At this point, ReplicationSources have no paths yet, so WAL reader thread is > not running. ReplicationSourceManager is registered as a WAL listener, in > order to get notified whenever new wal file is available. During > ReplicationSourceManager and ReplicationSource instances creation, a > WALFileLengthProvider instance is obtained from WALProvider and cached by > both ReplicationSourceManager and ReplicationSource. The default > implementation for this WALFileLengthProvider is below, on WALProvider > interface: > {noformat} > default WALFileLengthProvider getWALFileLengthProvider() { > return path -> getWALs().stream().map(w -> > w.getLogFileSizeIfBeingWritten(path)) > .filter(o -> o.isPresent()).findAny().orElse(OptionalLong.empty()); > } > {noformat} > Notice that if WALProvider.getWALs returns an empty list, this > WALFileLengthProvider instance is always going to return nothing. This is > relevant because when ReplicationSource finally starts > ReplicationSourceWALReader thread, it passes this WALFileLengthProvider, > which is used by WALEntryStream (inside the wal reader) to determine if wal > is being written (and should be kept in the queue) here: > {noformat} > private void tryAdvanceEntry() throws IOException { > if (checkReader()) { > boolean beingWritten = readNextEntryAndRecordReaderPosition(); > LOG.trace("reading wal file {}. Current open for write: {}", > this.currentPath, beingWritten); > if (currentEntry == null && !beingWritten) { > // no more entries in this log file, and the file is already closed, > i.e, rolled > // Before dequeueing, we should always get one more attempt at > reading. > // This is in case more entries came in after we opened the reader, > and the log is rolled > // while we were reading. See HBASE-6758 > resetReader(); > readNextEntryAndRecordReaderPosition(); > if (currentEntry == null) { > if (checkAllBytesParsed()) { // now we're certain we're done with > this log file > dequeueCurrentLog(); > if (openNextLog()) { > readNextEntryAndRecordReaderPosition(); > } > } > } > } > ... > {noformat} > Here code snippet for WALEntryStream.readNextEntryAndRecordReaderPosition() > method that relies on the WALFileLengthProvider: > {noformat} > ... > #1 OptionalLong fileLength = > walFileLengthProvider.getLogFileSizeIfBeingWritten(currentPath); > if (fileLength.isPresent() && readerPos > fileLength.getAsLong()) { > // see HBASE-14004, for AsyncFSWAL which uses fan-out, it is possible > that we read uncommitted > // data, so we need to make sure that we do not read beyond the > committed file length. > if (LOG.isDebugEnabled()) { > LOG.debug("The provider tells us the valid length for " + currentPath > + " is " + > fileLength.getAsLong() + ", but we have advanced to " + > readerPos); > } > resetReader(); > return true; > } > if (readEntry != null) { > metrics.incrLogEditsRead(); >
[jira] [Updated] (HBASE-21503) Replication normal source can get stuck due potential race conditions between source wal reader and wal provider initialization threads.
[ https://issues.apache.org/jira/browse/HBASE-21503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Duo Zhang updated HBASE-21503: -- Attachment: HBASE-21503.patch > Replication normal source can get stuck due potential race conditions between > source wal reader and wal provider initialization threads. > > > Key: HBASE-21503 > URL: https://issues.apache.org/jira/browse/HBASE-21503 > Project: HBase > Issue Type: Bug > Components: Replication >Reporter: Wellington Chevreuil >Assignee: Wellington Chevreuil >Priority: Blocker > Fix For: 3.0.0, 2.2.0, 2.0.3, 2.1.2 > > Attachments: HBASE-21503-master.001.patch, HBASE-21503.patch > > > Noticed replication sources could get stuck while doing some tests that > involved RS restart. On these cases, upon RS restart, the newly created > normal source was reaching wal end and not recognising it was open for write, > what leads to remove it from source queue. Thus, no new OP get's replicated > unless this log goes to a recovery queue. > Checking this further, my understanding is that, during restart, RS will > start replication services, which inits ReplicationSourceManager and > ReplicationSources for each wal group id, in below sequence: > {noformat} > HRegionServer -> Replication.startReplicationService() -> > ReplicationSourceManager.init() -> add ReplicationSource > {noformat} > At this point, ReplicationSources have no paths yet, so WAL reader thread is > not running. ReplicationSourceManager is registered as a WAL listener, in > order to get notified whenever new wal file is available. During > ReplicationSourceManager and ReplicationSource instances creation, a > WALFileLengthProvider instance is obtained from WALProvider and cached by > both ReplicationSourceManager and ReplicationSource. The default > implementation for this WALFileLengthProvider is below, on WALProvider > interface: > {noformat} > default WALFileLengthProvider getWALFileLengthProvider() { > return path -> getWALs().stream().map(w -> > w.getLogFileSizeIfBeingWritten(path)) > .filter(o -> o.isPresent()).findAny().orElse(OptionalLong.empty()); > } > {noformat} > Notice that if WALProvider.getWALs returns an empty list, this > WALFileLengthProvider instance is always going to return nothing. This is > relevant because when ReplicationSource finally starts > ReplicationSourceWALReader thread, it passes this WALFileLengthProvider, > which is used by WALEntryStream (inside the wal reader) to determine if wal > is being written (and should be kept in the queue) here: > {noformat} > private void tryAdvanceEntry() throws IOException { > if (checkReader()) { > boolean beingWritten = readNextEntryAndRecordReaderPosition(); > LOG.trace("reading wal file {}. Current open for write: {}", > this.currentPath, beingWritten); > if (currentEntry == null && !beingWritten) { > // no more entries in this log file, and the file is already closed, > i.e, rolled > // Before dequeueing, we should always get one more attempt at > reading. > // This is in case more entries came in after we opened the reader, > and the log is rolled > // while we were reading. See HBASE-6758 > resetReader(); > readNextEntryAndRecordReaderPosition(); > if (currentEntry == null) { > if (checkAllBytesParsed()) { // now we're certain we're done with > this log file > dequeueCurrentLog(); > if (openNextLog()) { > readNextEntryAndRecordReaderPosition(); > } > } > } > } > ... > {noformat} > Here code snippet for WALEntryStream.readNextEntryAndRecordReaderPosition() > method that relies on the WALFileLengthProvider: > {noformat} > ... > #1 OptionalLong fileLength = > walFileLengthProvider.getLogFileSizeIfBeingWritten(currentPath); > if (fileLength.isPresent() && readerPos > fileLength.getAsLong()) { > // see HBASE-14004, for AsyncFSWAL which uses fan-out, it is possible > that we read uncommitted > // data, so we need to make sure that we do not read beyond the > committed file length. > if (LOG.isDebugEnabled()) { > LOG.debug("The provider tells us the valid length for " + currentPath > + " is " + > fileLength.getAsLong() + ", but we have advanced to " + > readerPos); > } > resetReader(); > return true; > } > if (readEntry != null) { > metrics.incrLogEditsRead(); > metrics.incrLogReadInBytes(readerPos - currentPositionOfEntry); > } > currentEntry = readEntry; // could be null > this.currentPositionOfReader = readerPos; > return
[jira] [Updated] (HBASE-21503) Replication normal source can get stuck due potential race conditions between source wal reader and wal provider initialization threads.
[ https://issues.apache.org/jira/browse/HBASE-21503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Duo Zhang updated HBASE-21503: -- Priority: Blocker (was: Critical) > Replication normal source can get stuck due potential race conditions between > source wal reader and wal provider initialization threads. > > > Key: HBASE-21503 > URL: https://issues.apache.org/jira/browse/HBASE-21503 > Project: HBase > Issue Type: Bug > Components: Replication >Reporter: Wellington Chevreuil >Assignee: Wellington Chevreuil >Priority: Blocker > Fix For: 3.0.0, 2.2.0, 2.0.3, 2.1.2 > > Attachments: HBASE-21503-master.001.patch > > > Noticed replication sources could get stuck while doing some tests that > involved RS restart. On these cases, upon RS restart, the newly created > normal source was reaching wal end and not recognising it was open for write, > what leads to remove it from source queue. Thus, no new OP get's replicated > unless this log goes to a recovery queue. > Checking this further, my understanding is that, during restart, RS will > start replication services, which inits ReplicationSourceManager and > ReplicationSources for each wal group id, in below sequence: > {noformat} > HRegionServer -> Replication.startReplicationService() -> > ReplicationSourceManager.init() -> add ReplicationSource > {noformat} > At this point, ReplicationSources have no paths yet, so WAL reader thread is > not running. ReplicationSourceManager is registered as a WAL listener, in > order to get notified whenever new wal file is available. During > ReplicationSourceManager and ReplicationSource instances creation, a > WALFileLengthProvider instance is obtained from WALProvider and cached by > both ReplicationSourceManager and ReplicationSource. The default > implementation for this WALFileLengthProvider is below, on WALProvider > interface: > {noformat} > default WALFileLengthProvider getWALFileLengthProvider() { > return path -> getWALs().stream().map(w -> > w.getLogFileSizeIfBeingWritten(path)) > .filter(o -> o.isPresent()).findAny().orElse(OptionalLong.empty()); > } > {noformat} > Notice that if WALProvider.getWALs returns an empty list, this > WALFileLengthProvider instance is always going to return nothing. This is > relevant because when ReplicationSource finally starts > ReplicationSourceWALReader thread, it passes this WALFileLengthProvider, > which is used by WALEntryStream (inside the wal reader) to determine if wal > is being written (and should be kept in the queue) here: > {noformat} > private void tryAdvanceEntry() throws IOException { > if (checkReader()) { > boolean beingWritten = readNextEntryAndRecordReaderPosition(); > LOG.trace("reading wal file {}. Current open for write: {}", > this.currentPath, beingWritten); > if (currentEntry == null && !beingWritten) { > // no more entries in this log file, and the file is already closed, > i.e, rolled > // Before dequeueing, we should always get one more attempt at > reading. > // This is in case more entries came in after we opened the reader, > and the log is rolled > // while we were reading. See HBASE-6758 > resetReader(); > readNextEntryAndRecordReaderPosition(); > if (currentEntry == null) { > if (checkAllBytesParsed()) { // now we're certain we're done with > this log file > dequeueCurrentLog(); > if (openNextLog()) { > readNextEntryAndRecordReaderPosition(); > } > } > } > } > ... > {noformat} > Here code snippet for WALEntryStream.readNextEntryAndRecordReaderPosition() > method that relies on the WALFileLengthProvider: > {noformat} > ... > #1 OptionalLong fileLength = > walFileLengthProvider.getLogFileSizeIfBeingWritten(currentPath); > if (fileLength.isPresent() && readerPos > fileLength.getAsLong()) { > // see HBASE-14004, for AsyncFSWAL which uses fan-out, it is possible > that we read uncommitted > // data, so we need to make sure that we do not read beyond the > committed file length. > if (LOG.isDebugEnabled()) { > LOG.debug("The provider tells us the valid length for " + currentPath > + " is " + > fileLength.getAsLong() + ", but we have advanced to " + > readerPos); > } > resetReader(); > return true; > } > if (readEntry != null) { > metrics.incrLogEditsRead(); > metrics.incrLogReadInBytes(readerPos - currentPositionOfEntry); > } > currentEntry = readEntry; // could be null > this.currentPositionOfReader = readerPos; > return
[jira] [Updated] (HBASE-21503) Replication normal source can get stuck due potential race conditions between source wal reader and wal provider initialization threads.
[ https://issues.apache.org/jira/browse/HBASE-21503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Duo Zhang updated HBASE-21503: -- Component/s: Replication > Replication normal source can get stuck due potential race conditions between > source wal reader and wal provider initialization threads. > > > Key: HBASE-21503 > URL: https://issues.apache.org/jira/browse/HBASE-21503 > Project: HBase > Issue Type: Bug > Components: Replication >Reporter: Wellington Chevreuil >Assignee: Wellington Chevreuil >Priority: Critical > Fix For: 3.0.0, 2.2.0, 2.0.3, 2.1.2 > > Attachments: HBASE-21503-master.001.patch > > > Noticed replication sources could get stuck while doing some tests that > involved RS restart. On these cases, upon RS restart, the newly created > normal source was reaching wal end and not recognising it was open for write, > what leads to remove it from source queue. Thus, no new OP get's replicated > unless this log goes to a recovery queue. > Checking this further, my understanding is that, during restart, RS will > start replication services, which inits ReplicationSourceManager and > ReplicationSources for each wal group id, in below sequence: > {noformat} > HRegionServer -> Replication.startReplicationService() -> > ReplicationSourceManager.init() -> add ReplicationSource > {noformat} > At this point, ReplicationSources have no paths yet, so WAL reader thread is > not running. ReplicationSourceManager is registered as a WAL listener, in > order to get notified whenever new wal file is available. During > ReplicationSourceManager and ReplicationSource instances creation, a > WALFileLengthProvider instance is obtained from WALProvider and cached by > both ReplicationSourceManager and ReplicationSource. The default > implementation for this WALFileLengthProvider is below, on WALProvider > interface: > {noformat} > default WALFileLengthProvider getWALFileLengthProvider() { > return path -> getWALs().stream().map(w -> > w.getLogFileSizeIfBeingWritten(path)) > .filter(o -> o.isPresent()).findAny().orElse(OptionalLong.empty()); > } > {noformat} > Notice that if WALProvider.getWALs returns an empty list, this > WALFileLengthProvider instance is always going to return nothing. This is > relevant because when ReplicationSource finally starts > ReplicationSourceWALReader thread, it passes this WALFileLengthProvider, > which is used by WALEntryStream (inside the wal reader) to determine if wal > is being written (and should be kept in the queue) here: > {noformat} > private void tryAdvanceEntry() throws IOException { > if (checkReader()) { > boolean beingWritten = readNextEntryAndRecordReaderPosition(); > LOG.trace("reading wal file {}. Current open for write: {}", > this.currentPath, beingWritten); > if (currentEntry == null && !beingWritten) { > // no more entries in this log file, and the file is already closed, > i.e, rolled > // Before dequeueing, we should always get one more attempt at > reading. > // This is in case more entries came in after we opened the reader, > and the log is rolled > // while we were reading. See HBASE-6758 > resetReader(); > readNextEntryAndRecordReaderPosition(); > if (currentEntry == null) { > if (checkAllBytesParsed()) { // now we're certain we're done with > this log file > dequeueCurrentLog(); > if (openNextLog()) { > readNextEntryAndRecordReaderPosition(); > } > } > } > } > ... > {noformat} > Here code snippet for WALEntryStream.readNextEntryAndRecordReaderPosition() > method that relies on the WALFileLengthProvider: > {noformat} > ... > #1 OptionalLong fileLength = > walFileLengthProvider.getLogFileSizeIfBeingWritten(currentPath); > if (fileLength.isPresent() && readerPos > fileLength.getAsLong()) { > // see HBASE-14004, for AsyncFSWAL which uses fan-out, it is possible > that we read uncommitted > // data, so we need to make sure that we do not read beyond the > committed file length. > if (LOG.isDebugEnabled()) { > LOG.debug("The provider tells us the valid length for " + currentPath > + " is " + > fileLength.getAsLong() + ", but we have advanced to " + > readerPos); > } > resetReader(); > return true; > } > if (readEntry != null) { > metrics.incrLogEditsRead(); > metrics.incrLogReadInBytes(readerPos - currentPositionOfEntry); > } > currentEntry = readEntry; // could be null > this.currentPositionOfReader = readerPos; > return fileLength.isPresent();
[jira] [Updated] (HBASE-21503) Replication normal source can get stuck due potential race conditions between source wal reader and wal provider initialization threads.
[ https://issues.apache.org/jira/browse/HBASE-21503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Duo Zhang updated HBASE-21503: -- Priority: Critical (was: Major) > Replication normal source can get stuck due potential race conditions between > source wal reader and wal provider initialization threads. > > > Key: HBASE-21503 > URL: https://issues.apache.org/jira/browse/HBASE-21503 > Project: HBase > Issue Type: Bug >Reporter: Wellington Chevreuil >Assignee: Wellington Chevreuil >Priority: Critical > Fix For: 3.0.0, 2.2.0, 2.0.3, 2.1.2 > > Attachments: HBASE-21503-master.001.patch > > > Noticed replication sources could get stuck while doing some tests that > involved RS restart. On these cases, upon RS restart, the newly created > normal source was reaching wal end and not recognising it was open for write, > what leads to remove it from source queue. Thus, no new OP get's replicated > unless this log goes to a recovery queue. > Checking this further, my understanding is that, during restart, RS will > start replication services, which inits ReplicationSourceManager and > ReplicationSources for each wal group id, in below sequence: > {noformat} > HRegionServer -> Replication.startReplicationService() -> > ReplicationSourceManager.init() -> add ReplicationSource > {noformat} > At this point, ReplicationSources have no paths yet, so WAL reader thread is > not running. ReplicationSourceManager is registered as a WAL listener, in > order to get notified whenever new wal file is available. During > ReplicationSourceManager and ReplicationSource instances creation, a > WALFileLengthProvider instance is obtained from WALProvider and cached by > both ReplicationSourceManager and ReplicationSource. The default > implementation for this WALFileLengthProvider is below, on WALProvider > interface: > {noformat} > default WALFileLengthProvider getWALFileLengthProvider() { > return path -> getWALs().stream().map(w -> > w.getLogFileSizeIfBeingWritten(path)) > .filter(o -> o.isPresent()).findAny().orElse(OptionalLong.empty()); > } > {noformat} > Notice that if WALProvider.getWALs returns an empty list, this > WALFileLengthProvider instance is always going to return nothing. This is > relevant because when ReplicationSource finally starts > ReplicationSourceWALReader thread, it passes this WALFileLengthProvider, > which is used by WALEntryStream (inside the wal reader) to determine if wal > is being written (and should be kept in the queue) here: > {noformat} > private void tryAdvanceEntry() throws IOException { > if (checkReader()) { > boolean beingWritten = readNextEntryAndRecordReaderPosition(); > LOG.trace("reading wal file {}. Current open for write: {}", > this.currentPath, beingWritten); > if (currentEntry == null && !beingWritten) { > // no more entries in this log file, and the file is already closed, > i.e, rolled > // Before dequeueing, we should always get one more attempt at > reading. > // This is in case more entries came in after we opened the reader, > and the log is rolled > // while we were reading. See HBASE-6758 > resetReader(); > readNextEntryAndRecordReaderPosition(); > if (currentEntry == null) { > if (checkAllBytesParsed()) { // now we're certain we're done with > this log file > dequeueCurrentLog(); > if (openNextLog()) { > readNextEntryAndRecordReaderPosition(); > } > } > } > } > ... > {noformat} > Here code snippet for WALEntryStream.readNextEntryAndRecordReaderPosition() > method that relies on the WALFileLengthProvider: > {noformat} > ... > #1 OptionalLong fileLength = > walFileLengthProvider.getLogFileSizeIfBeingWritten(currentPath); > if (fileLength.isPresent() && readerPos > fileLength.getAsLong()) { > // see HBASE-14004, for AsyncFSWAL which uses fan-out, it is possible > that we read uncommitted > // data, so we need to make sure that we do not read beyond the > committed file length. > if (LOG.isDebugEnabled()) { > LOG.debug("The provider tells us the valid length for " + currentPath > + " is " + > fileLength.getAsLong() + ", but we have advanced to " + > readerPos); > } > resetReader(); > return true; > } > if (readEntry != null) { > metrics.incrLogEditsRead(); > metrics.incrLogReadInBytes(readerPos - currentPositionOfEntry); > } > currentEntry = readEntry; // could be null > this.currentPositionOfReader = readerPos; > return fileLength.isPresent(); > ... > {noformat} > The
[jira] [Updated] (HBASE-21503) Replication normal source can get stuck due potential race conditions between source wal reader and wal provider initialization threads.
[ https://issues.apache.org/jira/browse/HBASE-21503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Duo Zhang updated HBASE-21503: -- Fix Version/s: 2.1.2 2.0.3 2.2.0 3.0.0 > Replication normal source can get stuck due potential race conditions between > source wal reader and wal provider initialization threads. > > > Key: HBASE-21503 > URL: https://issues.apache.org/jira/browse/HBASE-21503 > Project: HBase > Issue Type: Bug >Reporter: Wellington Chevreuil >Assignee: Wellington Chevreuil >Priority: Major > Fix For: 3.0.0, 2.2.0, 2.0.3, 2.1.2 > > Attachments: HBASE-21503-master.001.patch > > > Noticed replication sources could get stuck while doing some tests that > involved RS restart. On these cases, upon RS restart, the newly created > normal source was reaching wal end and not recognising it was open for write, > what leads to remove it from source queue. Thus, no new OP get's replicated > unless this log goes to a recovery queue. > Checking this further, my understanding is that, during restart, RS will > start replication services, which inits ReplicationSourceManager and > ReplicationSources for each wal group id, in below sequence: > {noformat} > HRegionServer -> Replication.startReplicationService() -> > ReplicationSourceManager.init() -> add ReplicationSource > {noformat} > At this point, ReplicationSources have no paths yet, so WAL reader thread is > not running. ReplicationSourceManager is registered as a WAL listener, in > order to get notified whenever new wal file is available. During > ReplicationSourceManager and ReplicationSource instances creation, a > WALFileLengthProvider instance is obtained from WALProvider and cached by > both ReplicationSourceManager and ReplicationSource. The default > implementation for this WALFileLengthProvider is below, on WALProvider > interface: > {noformat} > default WALFileLengthProvider getWALFileLengthProvider() { > return path -> getWALs().stream().map(w -> > w.getLogFileSizeIfBeingWritten(path)) > .filter(o -> o.isPresent()).findAny().orElse(OptionalLong.empty()); > } > {noformat} > Notice that if WALProvider.getWALs returns an empty list, this > WALFileLengthProvider instance is always going to return nothing. This is > relevant because when ReplicationSource finally starts > ReplicationSourceWALReader thread, it passes this WALFileLengthProvider, > which is used by WALEntryStream (inside the wal reader) to determine if wal > is being written (and should be kept in the queue) here: > {noformat} > private void tryAdvanceEntry() throws IOException { > if (checkReader()) { > boolean beingWritten = readNextEntryAndRecordReaderPosition(); > LOG.trace("reading wal file {}. Current open for write: {}", > this.currentPath, beingWritten); > if (currentEntry == null && !beingWritten) { > // no more entries in this log file, and the file is already closed, > i.e, rolled > // Before dequeueing, we should always get one more attempt at > reading. > // This is in case more entries came in after we opened the reader, > and the log is rolled > // while we were reading. See HBASE-6758 > resetReader(); > readNextEntryAndRecordReaderPosition(); > if (currentEntry == null) { > if (checkAllBytesParsed()) { // now we're certain we're done with > this log file > dequeueCurrentLog(); > if (openNextLog()) { > readNextEntryAndRecordReaderPosition(); > } > } > } > } > ... > {noformat} > Here code snippet for WALEntryStream.readNextEntryAndRecordReaderPosition() > method that relies on the WALFileLengthProvider: > {noformat} > ... > #1 OptionalLong fileLength = > walFileLengthProvider.getLogFileSizeIfBeingWritten(currentPath); > if (fileLength.isPresent() && readerPos > fileLength.getAsLong()) { > // see HBASE-14004, for AsyncFSWAL which uses fan-out, it is possible > that we read uncommitted > // data, so we need to make sure that we do not read beyond the > committed file length. > if (LOG.isDebugEnabled()) { > LOG.debug("The provider tells us the valid length for " + currentPath > + " is " + > fileLength.getAsLong() + ", but we have advanced to " + > readerPos); > } > resetReader(); > return true; > } > if (readEntry != null) { > metrics.incrLogEditsRead(); > metrics.incrLogReadInBytes(readerPos - currentPositionOfEntry); > } > currentEntry = readEntry; // could be null > this.currentPositionOfReader = readerPos; >
[jira] [Updated] (HBASE-21503) Replication normal source can get stuck due potential race conditions between source wal reader and wal provider initialization threads.
[ https://issues.apache.org/jira/browse/HBASE-21503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wellington Chevreuil updated HBASE-21503: - Status: Patch Available (was: Open) > Replication normal source can get stuck due potential race conditions between > source wal reader and wal provider initialization threads. > > > Key: HBASE-21503 > URL: https://issues.apache.org/jira/browse/HBASE-21503 > Project: HBase > Issue Type: Bug >Reporter: Wellington Chevreuil >Assignee: Wellington Chevreuil >Priority: Major > Attachments: HBASE-21503-master.001.patch > > > Noticed replication sources could get stuck while doing some tests that > involved RS restart. On these cases, upon RS restart, the newly created > normal source was reaching wal end and not recognising it was open for write, > what leads to remove it from source queue. Thus, no new OP get's replicated > unless this log goes to a recovery queue. > Checking this further, my understanding is that, during restart, RS will > start replication services, which inits ReplicationSourceManager and > ReplicationSources for each wal group id, in below sequence: > {noformat} > HRegionServer -> Replication.startReplicationService() -> > ReplicationSourceManager.init() -> add ReplicationSource > {noformat} > At this point, ReplicationSources have no paths yet, so WAL reader thread is > not running. ReplicationSourceManager is registered as a WAL listener, in > order to get notified whenever new wal file is available. During > ReplicationSourceManager and ReplicationSource instances creation, a > WALFileLengthProvider instance is obtained from WALProvider and cached by > both ReplicationSourceManager and ReplicationSource. The default > implementation for this WALFileLengthProvider is below, on WALProvider > interface: > {noformat} > default WALFileLengthProvider getWALFileLengthProvider() { > return path -> getWALs().stream().map(w -> > w.getLogFileSizeIfBeingWritten(path)) > .filter(o -> o.isPresent()).findAny().orElse(OptionalLong.empty()); > } > {noformat} > Notice that if WALProvider.getWALs returns an empty list, this > WALFileLengthProvider instance is always going to return nothing. This is > relevant because when ReplicationSource finally starts > ReplicationSourceWALReader thread, it passes this WALFileLengthProvider, > which is used by WALEntryStream (inside the wal reader) to determine if wal > is being written (and should be kept in the queue) here: > {noformat} > private void tryAdvanceEntry() throws IOException { > if (checkReader()) { > boolean beingWritten = readNextEntryAndRecordReaderPosition(); > LOG.trace("reading wal file {}. Current open for write: {}", > this.currentPath, beingWritten); > if (currentEntry == null && !beingWritten) { > // no more entries in this log file, and the file is already closed, > i.e, rolled > // Before dequeueing, we should always get one more attempt at > reading. > // This is in case more entries came in after we opened the reader, > and the log is rolled > // while we were reading. See HBASE-6758 > resetReader(); > readNextEntryAndRecordReaderPosition(); > if (currentEntry == null) { > if (checkAllBytesParsed()) { // now we're certain we're done with > this log file > dequeueCurrentLog(); > if (openNextLog()) { > readNextEntryAndRecordReaderPosition(); > } > } > } > } > ... > {noformat} > Here code snippet for WALEntryStream.readNextEntryAndRecordReaderPosition() > method that relies on the WALFileLengthProvider: > {noformat} > ... > #1 OptionalLong fileLength = > walFileLengthProvider.getLogFileSizeIfBeingWritten(currentPath); > if (fileLength.isPresent() && readerPos > fileLength.getAsLong()) { > // see HBASE-14004, for AsyncFSWAL which uses fan-out, it is possible > that we read uncommitted > // data, so we need to make sure that we do not read beyond the > committed file length. > if (LOG.isDebugEnabled()) { > LOG.debug("The provider tells us the valid length for " + currentPath > + " is " + > fileLength.getAsLong() + ", but we have advanced to " + > readerPos); > } > resetReader(); > return true; > } > if (readEntry != null) { > metrics.incrLogEditsRead(); > metrics.incrLogReadInBytes(readerPos - currentPositionOfEntry); > } > currentEntry = readEntry; // could be null > this.currentPositionOfReader = readerPos; > return fileLength.isPresent(); > ... > {noformat} > The problem can occur because when
[jira] [Updated] (HBASE-21503) Replication normal source can get stuck due potential race conditions between source wal reader and wal provider initialization threads.
[ https://issues.apache.org/jira/browse/HBASE-21503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wellington Chevreuil updated HBASE-21503: - Attachment: HBASE-21503-master.001.patch > Replication normal source can get stuck due potential race conditions between > source wal reader and wal provider initialization threads. > > > Key: HBASE-21503 > URL: https://issues.apache.org/jira/browse/HBASE-21503 > Project: HBase > Issue Type: Bug >Reporter: Wellington Chevreuil >Assignee: Wellington Chevreuil >Priority: Major > Attachments: HBASE-21503-master.001.patch > > > Noticed replication sources could get stuck while doing some tests that > involved RS restart. On these cases, upon RS restart, the newly created > normal source was reaching wal end and not recognising it was open for write, > what leads to remove it from source queue. Thus, no new OP get's replicated > unless this log goes to a recovery queue. > Checking this further, my understanding is that, during restart, RS will > start replication services, which inits ReplicationSourceManager and > ReplicationSources for each wal group id, in below sequence: > {noformat} > HRegionServer -> Replication.startReplicationService() -> > ReplicationSourceManager.init() -> add ReplicationSource > {noformat} > At this point, ReplicationSources have no paths yet, so WAL reader thread is > not running. ReplicationSourceManager is registered as a WAL listener, in > order to get notified whenever new wal file is available. During > ReplicationSourceManager and ReplicationSource instances creation, a > WALFileLengthProvider instance is obtained from WALProvider and cached by > both ReplicationSourceManager and ReplicationSource. The default > implementation for this WALFileLengthProvider is below, on WALProvider > interface: > {noformat} > default WALFileLengthProvider getWALFileLengthProvider() { > return path -> getWALs().stream().map(w -> > w.getLogFileSizeIfBeingWritten(path)) > .filter(o -> o.isPresent()).findAny().orElse(OptionalLong.empty()); > } > {noformat} > Notice that if WALProvider.getWALs returns an empty list, this > WALFileLengthProvider instance is always going to return nothing. This is > relevant because when ReplicationSource finally starts > ReplicationSourceWALReader thread, it passes this WALFileLengthProvider, > which is used by WALEntryStream (inside the wal reader) to determine if wal > is being written (and should be kept in the queue) here: > {noformat} > private void tryAdvanceEntry() throws IOException { > if (checkReader()) { > boolean beingWritten = readNextEntryAndRecordReaderPosition(); > LOG.trace("reading wal file {}. Current open for write: {}", > this.currentPath, beingWritten); > if (currentEntry == null && !beingWritten) { > // no more entries in this log file, and the file is already closed, > i.e, rolled > // Before dequeueing, we should always get one more attempt at > reading. > // This is in case more entries came in after we opened the reader, > and the log is rolled > // while we were reading. See HBASE-6758 > resetReader(); > readNextEntryAndRecordReaderPosition(); > if (currentEntry == null) { > if (checkAllBytesParsed()) { // now we're certain we're done with > this log file > dequeueCurrentLog(); > if (openNextLog()) { > readNextEntryAndRecordReaderPosition(); > } > } > } > } > ... > {noformat} > Here code snippet for WALEntryStream.readNextEntryAndRecordReaderPosition() > method that relies on the WALFileLengthProvider: > {noformat} > ... > #1 OptionalLong fileLength = > walFileLengthProvider.getLogFileSizeIfBeingWritten(currentPath); > if (fileLength.isPresent() && readerPos > fileLength.getAsLong()) { > // see HBASE-14004, for AsyncFSWAL which uses fan-out, it is possible > that we read uncommitted > // data, so we need to make sure that we do not read beyond the > committed file length. > if (LOG.isDebugEnabled()) { > LOG.debug("The provider tells us the valid length for " + currentPath > + " is " + > fileLength.getAsLong() + ", but we have advanced to " + > readerPos); > } > resetReader(); > return true; > } > if (readEntry != null) { > metrics.incrLogEditsRead(); > metrics.incrLogReadInBytes(readerPos - currentPositionOfEntry); > } > currentEntry = readEntry; // could be null > this.currentPositionOfReader = readerPos; > return fileLength.isPresent(); > ... > {noformat} > The problem can occur because