Re: [PR] HDDS-14793. Fix race condition in XceiverClientGrpc#connectToDatanode causing intermittent NPEs. [ozone]
adoroszlai commented on code in PR #9997:
URL: https://github.com/apache/ozone/pull/9997#discussion_r3031846821
##
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java:
##
@@ -739,30 +738,13 @@ private void decreasePendingMetricsAndReleaseSemaphore() {
private void checkOpen(DatanodeDetails dn)
throws IOException {
-if (isClosed.get()) {
- throw new IOException("This channel is not connected.");
-}
-
-ManagedChannel channel = channels.get(dn.getID());
-// If the channel doesn't exist for this specific datanode or the channel
-// is closed, just reconnect
-if (!isConnected(channel)) {
- reconnect(dn);
-}
-
- }
-
- private void reconnect(DatanodeDetails dn)
- throws IOException {
-ManagedChannel channel;
try {
connectToDatanode(dn);
- channel = channels.get(dn.getID());
} catch (Exception e) {
throw new IOException("Error while connecting", e);
}
Review Comment:
With simplified `checkOpen`, there are two test cases that need to be
updated, sorry for missing that.
```
Expecting actual:
"Error while connecting"
to contain:
"This channel is not connected"
at
org.apache.hadoop.hdds.scm.TestXceiverClientManager.testFreeByReference(TestXceiverClientManager.java:162)
...
at
org.apache.hadoop.hdds.scm.TestXceiverClientManager.testFreeByEviction(TestXceiverClientManager.java:211)
```
Let's remove this `try-catch`, we don't need to wrap the exception from
`connectToDatanode`.
With that, exception will have the correct message (`Client is closed`).
Please update the test to reflect that.
https://github.com/apache/ozone/blob/8fbe16b4efaa1e40b90d890028e447317cae3d01/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientManager.java#L162
https://github.com/apache/ozone/blob/8fbe16b4efaa1e40b90d890028e447317cae3d01/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientManager.java#L211
```diff
diff --git
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
index a9dbcb9456..1f9ac0a122 100644
---
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
+++
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
@@ -738,11 +738,7 @@ private void
decreasePendingMetricsAndReleaseSemaphore() {
private void checkOpen(DatanodeDetails dn)
throws IOException {
-try {
- connectToDatanode(dn);
-} catch (Exception e) {
- throw new IOException("Error while connecting", e);
-}
+connectToDatanode(dn);
if (!isConnected(dn)) {
throw new IOException("This channel is not connected.");
diff --git
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientManager.java
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientManager.java
index 06d19e5575..9468cec94f 100644
---
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientManager.java
+++
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientManager.java
@@ -159,7 +159,7 @@ public void testFreeByReference(@TempDir Path metaDir)
throws IOException {
Throwable t = assertThrows(IOException.class,
() -> ContainerProtocolCalls.createContainer(client1,
container1.getContainerInfo().getContainerID(), null));
- assertThat(t.getMessage()).contains("This channel is not connected");
+ assertThat(t.getMessage()).contains("Client is closed");
clientManager.releaseClient(client2, false);
}
@@ -208,7 +208,7 @@ public void testFreeByEviction(@TempDir Path metaDir)
throws IOException {
Throwable t = assertThrows(IOException.class,
() -> ContainerProtocolCalls.createContainer(client1,
container1.getContainerInfo().getContainerID(), null));
- assertThat(t.getMessage()).contains("This channel is not connected");
+ assertThat(t.getMessage()).contains("Client is closed");
clientManager.releaseClient(client2, false);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected].
Re: [PR] HDDS-14793. Fix race condition in XceiverClientGrpc#connectToDatanode causing intermittent NPEs. [ozone]
ptlrs commented on code in PR #9997:
URL: https://github.com/apache/ozone/pull/9997#discussion_r3030959630
##
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java:
##
@@ -267,13 +255,16 @@ public void close() {
return;
}
-for (ManagedChannel channel : channels.values()) {
- channel.shutdown();
+for (ChannelInfo channelInfo : dnChannelInfoMap.values()) {
+ channelInfo.getChannel().shutdown();
}
final long maxWaitNanos =
TimeUnit.SECONDS.toNanos(SHUTDOWN_WAIT_MAX_SECONDS);
long deadline = System.nanoTime() + maxWaitNanos;
-List nonTerminatedChannels = new
ArrayList<>(channels.values());
+List nonTerminatedChannels = dnChannelInfoMap.values()
+.stream()
+.map(ChannelInfo::getChannel)
+.collect(Collectors.toList());
Review Comment:
Fixed
##
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java:
##
@@ -161,49 +161,37 @@ public void connect() throws Exception {
connectToDatanode(dn);
}
- private void connectToDatanode(DatanodeDetails dn)
- throws IOException {
+ private void connectToDatanode(DatanodeDetails dn) throws IOException {
if (isClosed.get()) {
throw new IOException("Client is closed.");
}
-if (isConnected(dn)) {
- return;
-}
-// read port from the data node, on failure use default configured port
-int port = dn.getStandalonePort().getValue();
-if (port == 0) {
- port = config.getInt(OzoneConfigKeys.HDDS_CONTAINER_IPC_PORT,
- OzoneConfigKeys.HDDS_CONTAINER_IPC_PORT_DEFAULT);
-}
-final int finalPort = port;
-
-LOG.debug("Connecting to server : {}; nodes in pipeline : {}, ", dn,
pipeline.getNodes());
-
-channels.computeIfPresent(dn.getID(), (dnId, channel) -> {
- if (channel.isTerminated() || channel.isShutdown()) {
-asyncStubs.remove(dnId);
-return null; // removes from channels map
+dnChannelInfoMap.compute(dn.getID(), (dnId, channelInfo) -> {
+ // channel is absent or stale
+ if (channelInfo == null || channelInfo.isChannelInactive()) {
+LOG.debug("Connecting to server: {}; nodes in pipeline: {}", dn,
pipeline.getNodes());
+try {
+ return generateNewChannel(dn);
+} catch (IOException e) {
+ throw new UncheckedIOException(e);
+}
}
- return channel;
+ // channel is present and active
+ return channelInfo;
});
+ }
-ManagedChannel channel;
-try {
- channel = channels.computeIfAbsent(dn.getID(), dnId -> {
-try {
- return createChannel(dn, finalPort).build();
-} catch (IOException e) {
- throw new RuntimeException(e);
-}
- });
-} catch (RuntimeException e) {
- LOG.error("Failed to create channel to datanode {}", dn, e);
- throw new IOException(e.getCause());
Review Comment:
Fixed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] HDDS-14793. Fix race condition in XceiverClientGrpc#connectToDatanode causing intermittent NPEs. [ozone]
ptlrs commented on PR #9997: URL: https://github.com/apache/ozone/pull/9997#issuecomment-4181185894 @adoroszlai thanks for the [instructions](https://issues.apache.org/jira/browse/HDDS-14793?focusedCommentId=18069352&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-18069352) on how to run the Flaky test CI . After running it on master and a test branch which has the latest changes of this PR we see the following results: | Branch | Failures | |-|--| | [master](https://github.com/ptlrs/ozone/actions/runs/23834282035) | 7/100 failed | | [FlakyTest-HDDS-14793-Intermittent-NPE-in-XceiverClientGrpc](https://github.com/ptlrs/ozone/actions/runs/23928375732) | 0/100 failed | -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] HDDS-14793. Fix race condition in XceiverClientGrpc#connectToDatanode causing intermittent NPEs. [ozone]
adoroszlai commented on code in PR #9997:
URL: https://github.com/apache/ozone/pull/9997#discussion_r3020908085
##
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java:
##
@@ -267,13 +255,16 @@ public void close() {
return;
}
-for (ManagedChannel channel : channels.values()) {
- channel.shutdown();
+for (ChannelInfo channelInfo : dnChannelInfoMap.values()) {
+ channelInfo.getChannel().shutdown();
}
final long maxWaitNanos =
TimeUnit.SECONDS.toNanos(SHUTDOWN_WAIT_MAX_SECONDS);
long deadline = System.nanoTime() + maxWaitNanos;
-List nonTerminatedChannels = new
ArrayList<>(channels.values());
+List nonTerminatedChannels = dnChannelInfoMap.values()
+.stream()
+.map(ChannelInfo::getChannel)
+.collect(Collectors.toList());
Review Comment:
`ChannelInfo.channel` may be `null` (or at least there are checks for that
elsewhere).
```suggestion
.map(ChannelInfo::getChannel)
.filter(Objects::nonNull)
.collect(Collectors.toList());
```
##
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java:
##
@@ -743,26 +735,21 @@ private void checkOpen(DatanodeDetails dn)
throw new IOException("This channel is not connected.");
}
-ManagedChannel channel = channels.get(dn.getID());
-// If the channel doesn't exist for this specific datanode or the channel
-// is closed, just reconnect
-if (!isConnected(channel)) {
+// If the channel doesn't exist for this specific datanode or the channel
is closed, just reconnect
+if (!isConnected(dn)) {
reconnect(dn);
}
-
}
private void reconnect(DatanodeDetails dn)
throws IOException {
-ManagedChannel channel;
try {
connectToDatanode(dn);
- channel = channels.get(dn.getID());
} catch (Exception e) {
throw new IOException("Error while connecting", e);
}
-if (!isConnected(channel)) {
+if (!isConnected(dn)) {
throw new IOException("This channel is not connected.");
}
Review Comment:
Given that `connectToDatanode` handles all cases ("closed", "already
connected", "needs new connection"), `checkOpen` can be simplified:
```java
connectToDatanode(dn);
if (!isConnected(dn)) {
throw new IOException("This channel is not connected.");
}
```
and `reconnect` can be removed.
##
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java:
##
@@ -161,49 +161,37 @@ public void connect() throws Exception {
connectToDatanode(dn);
}
- private void connectToDatanode(DatanodeDetails dn)
- throws IOException {
+ private void connectToDatanode(DatanodeDetails dn) throws IOException {
if (isClosed.get()) {
throw new IOException("Client is closed.");
}
-if (isConnected(dn)) {
- return;
-}
-// read port from the data node, on failure use default configured port
-int port = dn.getStandalonePort().getValue();
-if (port == 0) {
- port = config.getInt(OzoneConfigKeys.HDDS_CONTAINER_IPC_PORT,
- OzoneConfigKeys.HDDS_CONTAINER_IPC_PORT_DEFAULT);
-}
-final int finalPort = port;
-
-LOG.debug("Connecting to server : {}; nodes in pipeline : {}, ", dn,
pipeline.getNodes());
-
-channels.computeIfPresent(dn.getID(), (dnId, channel) -> {
- if (channel.isTerminated() || channel.isShutdown()) {
-asyncStubs.remove(dnId);
-return null; // removes from channels map
+dnChannelInfoMap.compute(dn.getID(), (dnId, channelInfo) -> {
+ // channel is absent or stale
+ if (channelInfo == null || channelInfo.isChannelInactive()) {
+LOG.debug("Connecting to server: {}; nodes in pipeline: {}", dn,
pipeline.getNodes());
+try {
+ return generateNewChannel(dn);
+} catch (IOException e) {
+ throw new UncheckedIOException(e);
+}
}
- return channel;
+ // channel is present and active
+ return channelInfo;
});
+ }
-ManagedChannel channel;
-try {
- channel = channels.computeIfAbsent(dn.getID(), dnId -> {
-try {
- return createChannel(dn, finalPort).build();
-} catch (IOException e) {
- throw new RuntimeException(e);
-}
- });
-} catch (RuntimeException e) {
- LOG.error("Failed to create channel to datanode {}", dn, e);
- throw new IOException(e.getCause());
Review Comment:
We need to keep the translation back from unchecked exception to
`IOException`, because callers may not handle the former.
```java
try {
dnChannelInfoMap.compute(...);
} catch (UncheckedIOException e) {
LOG.error("Failed to create channel to datanode {}", dn, e);
throw e.getCause();
}
```
##
hadoop-hdds/client/src/main/java/o
Re: [PR] HDDS-14793. Fix race condition in XceiverClientGrpc#connectToDatanode causing intermittent NPEs. [ozone]
ptlrs commented on code in PR #9997:
URL: https://github.com/apache/ozone/pull/9997#discussion_r3011808618
##
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java:
##
@@ -161,49 +161,37 @@ public void connect() throws Exception {
connectToDatanode(dn);
}
- private void connectToDatanode(DatanodeDetails dn)
- throws IOException {
+ private void connectToDatanode(DatanodeDetails dn) throws IOException {
if (isClosed.get()) {
throw new IOException("Client is closed.");
}
-if (isConnected(dn)) {
- return;
-}
-// read port from the data node, on failure use default configured port
-int port = dn.getStandalonePort().getValue();
-if (port == 0) {
- port = config.getInt(OzoneConfigKeys.HDDS_CONTAINER_IPC_PORT,
- OzoneConfigKeys.HDDS_CONTAINER_IPC_PORT_DEFAULT);
-}
-final int finalPort = port;
-
-LOG.debug("Connecting to server : {}; nodes in pipeline : {}, ", dn,
pipeline.getNodes());
-
-channels.computeIfPresent(dn.getID(), (dnId, channel) -> {
- if (channel.isTerminated() || channel.isShutdown()) {
-asyncStubs.remove(dnId);
-return null; // removes from channels map
+dnChannelInfoMap.compute(dn.getID(), (dnId, channelInfo) -> {
+ // channel is absent or stale
+ if (channelInfo == null || channelInfo.isChannelInactive()) {
Review Comment:
If the channel is inactive, it is already either terminated or shutdown. In
both cases the underlying resources have already been released.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] HDDS-14793. Fix race condition in XceiverClientGrpc#connectToDatanode causing intermittent NPEs. [ozone]
yandrey321 commented on code in PR #9997:
URL: https://github.com/apache/ozone/pull/9997#discussion_r3011327472
##
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java:
##
@@ -161,49 +161,37 @@ public void connect() throws Exception {
connectToDatanode(dn);
}
- private void connectToDatanode(DatanodeDetails dn)
- throws IOException {
+ private void connectToDatanode(DatanodeDetails dn) throws IOException {
if (isClosed.get()) {
throw new IOException("Client is closed.");
}
-if (isConnected(dn)) {
- return;
-}
-// read port from the data node, on failure use default configured port
-int port = dn.getStandalonePort().getValue();
-if (port == 0) {
- port = config.getInt(OzoneConfigKeys.HDDS_CONTAINER_IPC_PORT,
- OzoneConfigKeys.HDDS_CONTAINER_IPC_PORT_DEFAULT);
-}
-final int finalPort = port;
-
-LOG.debug("Connecting to server : {}; nodes in pipeline : {}, ", dn,
pipeline.getNodes());
-
-channels.computeIfPresent(dn.getID(), (dnId, channel) -> {
- if (channel.isTerminated() || channel.isShutdown()) {
-asyncStubs.remove(dnId);
-return null; // removes from channels map
+dnChannelInfoMap.compute(dn.getID(), (dnId, channelInfo) -> {
+ // channel is absent or stale
+ if (channelInfo == null || channelInfo.isChannelInactive()) {
Review Comment:
do we need to close inactive channel?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] HDDS-14793. Fix race condition in XceiverClientGrpc#connectToDatanode causing intermittent NPEs. [ozone]
ptlrs commented on PR #9997: URL: https://github.com/apache/ozone/pull/9997#issuecomment-4151262600 Thanks for the review @adoroszlai, I have pushed the fixes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] HDDS-14793. Fix race condition in XceiverClientGrpc#connectToDatanode causing intermittent NPEs. [ozone]
adoroszlai commented on code in PR #9997:
URL: https://github.com/apache/ozone/pull/9997#discussion_r3005851164
##
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java:
##
@@ -161,49 +159,55 @@ public void connect() throws Exception {
connectToDatanode(dn);
}
- private void connectToDatanode(DatanodeDetails dn)
- throws IOException {
+ private void connectToDatanode(DatanodeDetails dn) throws IOException {
if (isClosed.get()) {
throw new IOException("Client is closed.");
}
if (isConnected(dn)) {
return;
}
-// read port from the data node, on failure use default configured port
-int port = dn.getStandalonePort().getValue();
-if (port == 0) {
- port = config.getInt(OzoneConfigKeys.HDDS_CONTAINER_IPC_PORT,
- OzoneConfigKeys.HDDS_CONTAINER_IPC_PORT_DEFAULT);
-}
-final int finalPort = port;
-LOG.debug("Connecting to server : {}; nodes in pipeline : {}, ", dn,
pipeline.getNodes());
+LOG.debug("Connecting to server: {}; nodes in pipeline: {}", dn,
pipeline.getNodes());
-channels.computeIfPresent(dn.getID(), (dnId, channel) -> {
- if (channel.isTerminated() || channel.isShutdown()) {
-asyncStubs.remove(dnId);
-return null; // removes from channels map
- }
+removeStaleChannel(dn);
+generateNewChannel(dn);
+ }
- return channel;
-});
+ /**
+ * Checks if the client has a live connection channel to the specified
Datanode.
+ *
+ * @return True if the connection is alive, false otherwise.
+ */
+ @VisibleForTesting
+ public boolean isConnected(DatanodeDetails details) {
Review Comment:
nit: please don't move `isConnected` unnecessarily, it inflates the diff
##
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java:
##
@@ -161,49 +159,55 @@ public void connect() throws Exception {
connectToDatanode(dn);
}
- private void connectToDatanode(DatanodeDetails dn)
- throws IOException {
+ private void connectToDatanode(DatanodeDetails dn) throws IOException {
if (isClosed.get()) {
throw new IOException("Client is closed.");
}
if (isConnected(dn)) {
return;
}
-// read port from the data node, on failure use default configured port
-int port = dn.getStandalonePort().getValue();
-if (port == 0) {
- port = config.getInt(OzoneConfigKeys.HDDS_CONTAINER_IPC_PORT,
- OzoneConfigKeys.HDDS_CONTAINER_IPC_PORT_DEFAULT);
-}
-final int finalPort = port;
-LOG.debug("Connecting to server : {}; nodes in pipeline : {}, ", dn,
pipeline.getNodes());
+LOG.debug("Connecting to server: {}; nodes in pipeline: {}", dn,
pipeline.getNodes());
-channels.computeIfPresent(dn.getID(), (dnId, channel) -> {
- if (channel.isTerminated() || channel.isShutdown()) {
-asyncStubs.remove(dnId);
-return null; // removes from channels map
- }
+removeStaleChannel(dn);
+generateNewChannel(dn);
+ }
Review Comment:
`connectToDatanode` still performs multiple map operations in a non-atomic
way:
- calls `isConnected` twice, once directly, once via `removeStaleChannel`
- `isConnected` calls `containsKey` and `get` separately
- `removeStaleChannel` calls `isConnected` and `remove` separately
- `generateNewChannel` calls unconditional `put`
Use only one `compute` operation, and distinguish between the three cases in
that call:
- absent
- present but stale
- present and active
##
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java:
##
@@ -784,4 +772,25 @@ public ConfigurationSource getConfig() {
public void setTimeout(long timeout) {
this.timeout = timeout;
}
+
+ /**
+ * Group the channel and stub so that they are published together.
+ */
+ private static class ChannelInfo {
+private ManagedChannel channel;
+private XceiverClientProtocolServiceStub stub;
Review Comment:
nit: these can be `final`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
