Copilot commented on code in PR #3926:
URL: https://github.com/apache/hertzbeat/pull/3926#discussion_r2643085760


##########
hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/collect/common/ssh/SshHelper.java:
##########
@@ -136,71 +142,125 @@ public static ClientSession 
getConnectSession(SshProtocol sshProtocol, int timeo
                 return clientSession;
             }
         }
-        SshClient sshClient = CommonSshClient.getSshClient();
-        HostConfigEntry proxyConfig = new HostConfigEntry();
+
+        SshClient sshClient;
+        boolean isScopedClient = false;
         if (useProxy && StringUtils.hasText(sshProtocol.getProxyHost())) {
-            String proxySpec = String.format("%s@%s:%d", 
sshProtocol.getProxyUsername(), sshProtocol.getProxyHost(), 
Integer.parseInt(sshProtocol.getProxyPort()));
-            proxyConfig.setHostName(sshProtocol.getHost());
-            proxyConfig.setHost(sshProtocol.getHost());
-            proxyConfig.setPort(Integer.parseInt(sshProtocol.getPort()));
-            proxyConfig.setUsername(sshProtocol.getUsername());
-            proxyConfig.setProxyJump(proxySpec);
+            // create a dedicated client instance to avoid contaminating the 
global singleton
+            sshClient = SshClient.setUpDefaultClient();
+            // accept all server key verifier, will print warn log : Server at 
{} presented unverified {} key: {}
+            AcceptAllServerKeyVerifier verifier = 
AcceptAllServerKeyVerifier.INSTANCE;
+            sshClient.setServerKeyVerifier(verifier);
+            // set connection heartbeat interval time 2000ms, wait for 
heartbeat response timeout 300_000ms
+            PropertyResolverUtils.updateProperty(
+                sshClient, CoreModuleProperties.HEARTBEAT_INTERVAL.getName(), 
2000);
+            PropertyResolverUtils.updateProperty(
+                sshClient, 
CoreModuleProperties.HEARTBEAT_NO_REPLY_MAX.getName(), 30);
+            PropertyResolverUtils.updateProperty(
+                sshClient, CoreModuleProperties.SOCKET_KEEPALIVE.getName(), 
true);
+            // set support all KeyExchange
+            
sshClient.setKeyExchangeFactories(NamedFactory.setUpTransformedFactories(
+                false,
+                BuiltinDHFactories.VALUES,
+                ClientBuilder.DH2KEX
+            ));
+            sshClient.start();
+            isScopedClient = true;
+            log.debug("Created scoped SshClient for proxy connection to avoid 
race condition.");
+        } else {
+            sshClient = CommonSshClient.getSshClient();
+        }
 
-            // Apache SSHD requires the password for the proxy to be preloaded 
into the sshClient instance before connecting
-            if (StringUtils.hasText(sshProtocol.getProxyPassword())) {
-                sshClient.addPasswordIdentity(sshProtocol.getProxyPassword());
-                log.debug("Loaded proxy server password authentication: 
{}@{}", sshProtocol.getProxyUsername(), sshProtocol.getProxyHost());
-            }
-            if (StringUtils.hasText(sshProtocol.getProxyPrivateKey())) {
-                
proxyConfig.setIdentities(List.of(sshProtocol.getProxyPrivateKey()));
-                log.debug("Proxy private key loaded into HostConfigEntry");
+        try {
+            HostConfigEntry proxyConfig = new HostConfigEntry();
+            if (useProxy && StringUtils.hasText(sshProtocol.getProxyHost())) {
+                String proxySpec = String.format("%s@%s:%d", 
sshProtocol.getProxyUsername(), sshProtocol.getProxyHost(),
+                                                 
Integer.parseInt(sshProtocol.getProxyPort()));
+                proxyConfig.setHostName(sshProtocol.getHost());
+                proxyConfig.setHost(sshProtocol.getHost());
+                proxyConfig.setPort(Integer.parseInt(sshProtocol.getPort()));
+                proxyConfig.setUsername(sshProtocol.getUsername());
+                proxyConfig.setProxyJump(proxySpec);
+
+                // Apache SSHD requires the password for the proxy to be 
preloaded into the sshClient instance before connecting
+                if (StringUtils.hasText(sshProtocol.getProxyPassword())) {
+                    
sshClient.addPasswordIdentity(sshProtocol.getProxyPassword());
+                    log.debug("Loaded proxy server password authentication: 
{}@{}", sshProtocol.getProxyUsername(),
+                              sshProtocol.getProxyHost());
+                }
+                if (StringUtils.hasText(sshProtocol.getProxyPrivateKey())) {
+                    
proxyConfig.setIdentities(List.of(sshProtocol.getProxyPrivateKey()));
+                    log.debug("Proxy private key loaded into HostConfigEntry");
+                }
             }
-        }
 
-        if (useProxy && StringUtils.hasText(sshProtocol.getProxyHost())) {
-            try {
+            if (useProxy && StringUtils.hasText(sshProtocol.getProxyHost())) {
                 clientSession = sshClient.connect(proxyConfig)
                                          .verify(timeout, 
TimeUnit.MILLISECONDS).getSession();
+            } else {
+                clientSession = sshClient.connect(sshProtocol.getUsername(), 
sshProtocol.getHost(), Integer.parseInt(sshProtocol.getPort()))
+                                         .verify(timeout, 
TimeUnit.MILLISECONDS).getSession();
             }
-            finally {
-                
sshClient.removePasswordIdentity(sshProtocol.getProxyPassword());
-            }
-        } else {
-            clientSession = sshClient.connect(sshProtocol.getUsername(), 
sshProtocol.getHost(), Integer.parseInt(sshProtocol.getPort()))
-                                     .verify(timeout, 
TimeUnit.MILLISECONDS).getSession();
-        }
 
-        if (StringUtils.hasText(sshProtocol.getPassword())) {
-            clientSession.addPasswordIdentity(sshProtocol.getPassword());
-        } else if (StringUtils.hasText(sshProtocol.getPrivateKey())) {
-            var resourceKey = 
PrivateKeyUtils.writePrivateKey(sshProtocol.getHost(), 
sshProtocol.getPrivateKey());
-            try (InputStream keyStream = new FileInputStream(resourceKey)) {
-                FilePasswordProvider passwordProvider = (session, resource, 
index) -> {
-                    if 
(StringUtils.hasText(sshProtocol.getPrivateKeyPassphrase())) {
-                        return sshProtocol.getPrivateKeyPassphrase();
+            if (StringUtils.hasText(sshProtocol.getPassword())) {
+                clientSession.addPasswordIdentity(sshProtocol.getPassword());
+            } else if (StringUtils.hasText(sshProtocol.getPrivateKey())) {
+                var resourceKey = 
PrivateKeyUtils.writePrivateKey(sshProtocol.getHost(), 
sshProtocol.getPrivateKey());
+                try (InputStream keyStream = new FileInputStream(resourceKey)) 
{
+                    FilePasswordProvider passwordProvider = (session, 
resource, index) -> {
+                        if 
(StringUtils.hasText(sshProtocol.getPrivateKeyPassphrase())) {
+                            return sshProtocol.getPrivateKeyPassphrase();
+                        }
+                        return null;
+                    };
+                    Iterable<KeyPair> keyPairs = 
SecurityUtils.loadKeyPairIdentities(null, () -> resourceKey, keyStream,
+                                                                               
      passwordProvider);
+                    if (keyPairs != null) {
+                        keyPairs.forEach(clientSession::addPublicKeyIdentity);
+                    } else {
+                        log.error("Failed to load private key pairs from: {}", 
resourceKey);
                     }
-                    return null;
-                };
-                Iterable<KeyPair> keyPairs = 
SecurityUtils.loadKeyPairIdentities(null, () -> resourceKey, keyStream, 
passwordProvider);
-                if (keyPairs != null) {
-                    keyPairs.forEach(clientSession::addPublicKeyIdentity);
-                } else {
-                    log.error("Failed to load private key pairs from: {}", 
resourceKey);
+                } catch (IOException e) {
+                    log.error("Error reading private key file: {}", 
e.getMessage());
                 }
-            } catch (IOException e) {
-                log.error("Error reading private key file: {}", 
e.getMessage());
-            }
-        }  // else auth with localhost private public key certificates
+            }  // else auth with localhost private public key certificates
 
-        // auth
-        if (!clientSession.auth().verify(timeout, 
TimeUnit.MILLISECONDS).isSuccess()) {
-            clientSession.close();
-            throw new IllegalArgumentException("ssh auth failed.");
-        }
-        if (reuseConnection || useProxy) {
-            SshConnect sshConnect = new SshConnect(clientSession);
-            CONNECTION_COMMON_CACHE.addCache(identifier, sshConnect);
+            // auth
+            if (!clientSession.auth().verify(timeout, 
TimeUnit.MILLISECONDS).isSuccess()) {
+                clientSession.close();
+                if (isScopedClient) {
+                    sshClient.stop();
+                }
+                throw new IllegalArgumentException("ssh auth failed.");
+            }
+            if (isScopedClient) {
+                clientSession.addCloseFutureListener(future -> {
+                    try {
+                        log.debug("Session closed, stopping scoped 
SshClient.");
+                        sshClient.stop();
+                    } catch (Exception e) {
+                        log.error("Failed to stop scoped SshClient", e);
+                    }
+                });
+            }
+            if (reuseConnection || useProxy) {
+                SshConnect sshConnect = new SshConnect(clientSession);
+                CONNECTION_COMMON_CACHE.addCache(identifier, sshConnect);
+            }
+            return clientSession;
+        } catch (Exception e) {
+            if (isScopedClient && sshClient.isStarted()) {
+                try {
+                    // If the session has been established but an error occurs 
afterward, session.close() will trigger the above listener.
+                    // If the session has not been established yet, manually 
stop
+                    if (clientSession == null || clientSession.isClosed()) {

Review Comment:
   The condition `clientSession.isClosed()` at line 256 may cause the scoped 
SSH client to be stopped twice in certain scenarios. 
   
   If the session was successfully created and authenticated (lines 198-235), 
but an exception occurs afterward, the session closing will trigger the 
`addCloseFutureListener` callback at lines 237-244, which stops the client. 
However, if the session is already closed when we reach this catch block, the 
condition at line 256 will also call `sshClient.stop()`.
   
   This creates a potential double-stop situation. While `sshClient.stop()` may 
be idempotent, it's better to avoid the redundancy. Consider changing the 
condition to check if the session was never established or if it's open (not 
closed), as the listener will handle the closed case.
   ```suggestion
                       // If the session has not been established yet, or is 
still open, manually stop
                       if (clientSession == null || clientSession.isOpen()) {
   ```



##########
hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/collect/common/ssh/SshHelper.java:
##########
@@ -136,71 +142,125 @@ public static ClientSession 
getConnectSession(SshProtocol sshProtocol, int timeo
                 return clientSession;
             }
         }
-        SshClient sshClient = CommonSshClient.getSshClient();
-        HostConfigEntry proxyConfig = new HostConfigEntry();
+
+        SshClient sshClient;
+        boolean isScopedClient = false;
         if (useProxy && StringUtils.hasText(sshProtocol.getProxyHost())) {
-            String proxySpec = String.format("%s@%s:%d", 
sshProtocol.getProxyUsername(), sshProtocol.getProxyHost(), 
Integer.parseInt(sshProtocol.getProxyPort()));
-            proxyConfig.setHostName(sshProtocol.getHost());
-            proxyConfig.setHost(sshProtocol.getHost());
-            proxyConfig.setPort(Integer.parseInt(sshProtocol.getPort()));
-            proxyConfig.setUsername(sshProtocol.getUsername());
-            proxyConfig.setProxyJump(proxySpec);
+            // create a dedicated client instance to avoid contaminating the 
global singleton
+            sshClient = SshClient.setUpDefaultClient();
+            // accept all server key verifier, will print warn log : Server at 
{} presented unverified {} key: {}
+            AcceptAllServerKeyVerifier verifier = 
AcceptAllServerKeyVerifier.INSTANCE;
+            sshClient.setServerKeyVerifier(verifier);

Review Comment:
   The scoped SSH client created for proxy connections is missing the 
forwarding filter configuration that is present in the global singleton client. 
The `CommonSshClient` sets `setForwardingFilter(new 
AcceptAllForwardingFilter())` to handle port forwarding, which is essential for 
ProxyJump functionality.
   
   Without this configuration, the scoped client may not properly handle the 
port forwarding required for proxy jump connections. Consider adding this line 
after the server key verifier setup to ensure feature parity with the global 
client.



##########
hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/collect/common/ssh/SshHelper.java:
##########
@@ -136,71 +142,125 @@ public static ClientSession 
getConnectSession(SshProtocol sshProtocol, int timeo
                 return clientSession;
             }
         }
-        SshClient sshClient = CommonSshClient.getSshClient();
-        HostConfigEntry proxyConfig = new HostConfigEntry();
+
+        SshClient sshClient;
+        boolean isScopedClient = false;
         if (useProxy && StringUtils.hasText(sshProtocol.getProxyHost())) {
-            String proxySpec = String.format("%s@%s:%d", 
sshProtocol.getProxyUsername(), sshProtocol.getProxyHost(), 
Integer.parseInt(sshProtocol.getProxyPort()));
-            proxyConfig.setHostName(sshProtocol.getHost());
-            proxyConfig.setHost(sshProtocol.getHost());
-            proxyConfig.setPort(Integer.parseInt(sshProtocol.getPort()));
-            proxyConfig.setUsername(sshProtocol.getUsername());
-            proxyConfig.setProxyJump(proxySpec);
+            // create a dedicated client instance to avoid contaminating the 
global singleton
+            sshClient = SshClient.setUpDefaultClient();
+            // accept all server key verifier, will print warn log : Server at 
{} presented unverified {} key: {}
+            AcceptAllServerKeyVerifier verifier = 
AcceptAllServerKeyVerifier.INSTANCE;
+            sshClient.setServerKeyVerifier(verifier);
+            // set connection heartbeat interval time 2000ms, wait for 
heartbeat response timeout 300_000ms
+            PropertyResolverUtils.updateProperty(
+                sshClient, CoreModuleProperties.HEARTBEAT_INTERVAL.getName(), 
2000);
+            PropertyResolverUtils.updateProperty(
+                sshClient, 
CoreModuleProperties.HEARTBEAT_NO_REPLY_MAX.getName(), 30);
+            PropertyResolverUtils.updateProperty(
+                sshClient, CoreModuleProperties.SOCKET_KEEPALIVE.getName(), 
true);
+            // set support all KeyExchange
+            
sshClient.setKeyExchangeFactories(NamedFactory.setUpTransformedFactories(
+                false,
+                BuiltinDHFactories.VALUES,
+                ClientBuilder.DH2KEX
+            ));
+            sshClient.start();
+            isScopedClient = true;
+            log.debug("Created scoped SshClient for proxy connection to avoid 
race condition.");
+        } else {
+            sshClient = CommonSshClient.getSshClient();
+        }
 
-            // Apache SSHD requires the password for the proxy to be preloaded 
into the sshClient instance before connecting
-            if (StringUtils.hasText(sshProtocol.getProxyPassword())) {
-                sshClient.addPasswordIdentity(sshProtocol.getProxyPassword());
-                log.debug("Loaded proxy server password authentication: 
{}@{}", sshProtocol.getProxyUsername(), sshProtocol.getProxyHost());
-            }
-            if (StringUtils.hasText(sshProtocol.getProxyPrivateKey())) {
-                
proxyConfig.setIdentities(List.of(sshProtocol.getProxyPrivateKey()));
-                log.debug("Proxy private key loaded into HostConfigEntry");
+        try {
+            HostConfigEntry proxyConfig = new HostConfigEntry();
+            if (useProxy && StringUtils.hasText(sshProtocol.getProxyHost())) {
+                String proxySpec = String.format("%s@%s:%d", 
sshProtocol.getProxyUsername(), sshProtocol.getProxyHost(),
+                                                 
Integer.parseInt(sshProtocol.getProxyPort()));
+                proxyConfig.setHostName(sshProtocol.getHost());
+                proxyConfig.setHost(sshProtocol.getHost());
+                proxyConfig.setPort(Integer.parseInt(sshProtocol.getPort()));
+                proxyConfig.setUsername(sshProtocol.getUsername());
+                proxyConfig.setProxyJump(proxySpec);
+
+                // Apache SSHD requires the password for the proxy to be 
preloaded into the sshClient instance before connecting
+                if (StringUtils.hasText(sshProtocol.getProxyPassword())) {
+                    
sshClient.addPasswordIdentity(sshProtocol.getProxyPassword());
+                    log.debug("Loaded proxy server password authentication: 
{}@{}", sshProtocol.getProxyUsername(),
+                              sshProtocol.getProxyHost());
+                }
+                if (StringUtils.hasText(sshProtocol.getProxyPrivateKey())) {
+                    
proxyConfig.setIdentities(List.of(sshProtocol.getProxyPrivateKey()));
+                    log.debug("Proxy private key loaded into HostConfigEntry");
+                }
             }
-        }
 
-        if (useProxy && StringUtils.hasText(sshProtocol.getProxyHost())) {
-            try {
+            if (useProxy && StringUtils.hasText(sshProtocol.getProxyHost())) {
                 clientSession = sshClient.connect(proxyConfig)
                                          .verify(timeout, 
TimeUnit.MILLISECONDS).getSession();
+            } else {
+                clientSession = sshClient.connect(sshProtocol.getUsername(), 
sshProtocol.getHost(), Integer.parseInt(sshProtocol.getPort()))
+                                         .verify(timeout, 
TimeUnit.MILLISECONDS).getSession();
             }
-            finally {
-                
sshClient.removePasswordIdentity(sshProtocol.getProxyPassword());
-            }
-        } else {
-            clientSession = sshClient.connect(sshProtocol.getUsername(), 
sshProtocol.getHost(), Integer.parseInt(sshProtocol.getPort()))
-                                     .verify(timeout, 
TimeUnit.MILLISECONDS).getSession();
-        }
 
-        if (StringUtils.hasText(sshProtocol.getPassword())) {
-            clientSession.addPasswordIdentity(sshProtocol.getPassword());
-        } else if (StringUtils.hasText(sshProtocol.getPrivateKey())) {
-            var resourceKey = 
PrivateKeyUtils.writePrivateKey(sshProtocol.getHost(), 
sshProtocol.getPrivateKey());
-            try (InputStream keyStream = new FileInputStream(resourceKey)) {
-                FilePasswordProvider passwordProvider = (session, resource, 
index) -> {
-                    if 
(StringUtils.hasText(sshProtocol.getPrivateKeyPassphrase())) {
-                        return sshProtocol.getPrivateKeyPassphrase();
+            if (StringUtils.hasText(sshProtocol.getPassword())) {
+                clientSession.addPasswordIdentity(sshProtocol.getPassword());
+            } else if (StringUtils.hasText(sshProtocol.getPrivateKey())) {
+                var resourceKey = 
PrivateKeyUtils.writePrivateKey(sshProtocol.getHost(), 
sshProtocol.getPrivateKey());
+                try (InputStream keyStream = new FileInputStream(resourceKey)) 
{
+                    FilePasswordProvider passwordProvider = (session, 
resource, index) -> {
+                        if 
(StringUtils.hasText(sshProtocol.getPrivateKeyPassphrase())) {
+                            return sshProtocol.getPrivateKeyPassphrase();
+                        }
+                        return null;
+                    };
+                    Iterable<KeyPair> keyPairs = 
SecurityUtils.loadKeyPairIdentities(null, () -> resourceKey, keyStream,
+                                                                               
      passwordProvider);
+                    if (keyPairs != null) {
+                        keyPairs.forEach(clientSession::addPublicKeyIdentity);
+                    } else {
+                        log.error("Failed to load private key pairs from: {}", 
resourceKey);
                     }
-                    return null;
-                };
-                Iterable<KeyPair> keyPairs = 
SecurityUtils.loadKeyPairIdentities(null, () -> resourceKey, keyStream, 
passwordProvider);
-                if (keyPairs != null) {
-                    keyPairs.forEach(clientSession::addPublicKeyIdentity);
-                } else {
-                    log.error("Failed to load private key pairs from: {}", 
resourceKey);
+                } catch (IOException e) {
+                    log.error("Error reading private key file: {}", 
e.getMessage());
                 }
-            } catch (IOException e) {
-                log.error("Error reading private key file: {}", 
e.getMessage());
-            }
-        }  // else auth with localhost private public key certificates
+            }  // else auth with localhost private public key certificates
 
-        // auth
-        if (!clientSession.auth().verify(timeout, 
TimeUnit.MILLISECONDS).isSuccess()) {
-            clientSession.close();
-            throw new IllegalArgumentException("ssh auth failed.");
-        }
-        if (reuseConnection || useProxy) {
-            SshConnect sshConnect = new SshConnect(clientSession);
-            CONNECTION_COMMON_CACHE.addCache(identifier, sshConnect);
+            // auth
+            if (!clientSession.auth().verify(timeout, 
TimeUnit.MILLISECONDS).isSuccess()) {
+                clientSession.close();
+                if (isScopedClient) {
+                    sshClient.stop();
+                }
+                throw new IllegalArgumentException("ssh auth failed.");
+            }
+            if (isScopedClient) {
+                clientSession.addCloseFutureListener(future -> {
+                    try {
+                        log.debug("Session closed, stopping scoped 
SshClient.");
+                        sshClient.stop();
+                    } catch (Exception e) {
+                        log.error("Failed to stop scoped SshClient", e);
+                    }
+                });

Review Comment:
   The close listener is registered after successful authentication, but if the 
session is added to the cache and later closed by other code (e.g., from cache 
cleanup at lines 131-138), the listener will still attempt to stop the scoped 
client. This could cause the scoped client to be stopped while other sessions 
might still be using it if there's connection reuse.
   
   However, looking at line 236, the listener is only added when 
`isScopedClient` is true, and scoped clients are specifically created for proxy 
connections where `useProxy` is true. At line 246-248, when using a proxy, the 
connection is always added to cache. This means a single scoped client instance 
is tied to a single cached session.
   
   The current design appears safe because:
   1. Each scoped client is dedicated to one proxy connection
   2. When the cached session closes, the client should also stop
   3. The cache uses the same identifier for the same connection parameters
   
   This is working as intended, but it would be clearer with a comment 
explaining that scoped clients have 1:1 lifecycle with their sessions.



##########
hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/collect/common/ssh/SshHelper.java:
##########
@@ -136,71 +142,125 @@ public static ClientSession 
getConnectSession(SshProtocol sshProtocol, int timeo
                 return clientSession;
             }
         }
-        SshClient sshClient = CommonSshClient.getSshClient();
-        HostConfigEntry proxyConfig = new HostConfigEntry();
+
+        SshClient sshClient;
+        boolean isScopedClient = false;
         if (useProxy && StringUtils.hasText(sshProtocol.getProxyHost())) {
-            String proxySpec = String.format("%s@%s:%d", 
sshProtocol.getProxyUsername(), sshProtocol.getProxyHost(), 
Integer.parseInt(sshProtocol.getProxyPort()));
-            proxyConfig.setHostName(sshProtocol.getHost());
-            proxyConfig.setHost(sshProtocol.getHost());
-            proxyConfig.setPort(Integer.parseInt(sshProtocol.getPort()));
-            proxyConfig.setUsername(sshProtocol.getUsername());
-            proxyConfig.setProxyJump(proxySpec);
+            // create a dedicated client instance to avoid contaminating the 
global singleton
+            sshClient = SshClient.setUpDefaultClient();
+            // accept all server key verifier, will print warn log : Server at 
{} presented unverified {} key: {}
+            AcceptAllServerKeyVerifier verifier = 
AcceptAllServerKeyVerifier.INSTANCE;
+            sshClient.setServerKeyVerifier(verifier);
+            // set connection heartbeat interval time 2000ms, wait for 
heartbeat response timeout 300_000ms
+            PropertyResolverUtils.updateProperty(
+                sshClient, CoreModuleProperties.HEARTBEAT_INTERVAL.getName(), 
2000);
+            PropertyResolverUtils.updateProperty(
+                sshClient, 
CoreModuleProperties.HEARTBEAT_NO_REPLY_MAX.getName(), 30);
+            PropertyResolverUtils.updateProperty(
+                sshClient, CoreModuleProperties.SOCKET_KEEPALIVE.getName(), 
true);
+            // set support all KeyExchange
+            
sshClient.setKeyExchangeFactories(NamedFactory.setUpTransformedFactories(
+                false,
+                BuiltinDHFactories.VALUES,
+                ClientBuilder.DH2KEX
+            ));
+            sshClient.start();
+            isScopedClient = true;
+            log.debug("Created scoped SshClient for proxy connection to avoid 
race condition.");
+        } else {
+            sshClient = CommonSshClient.getSshClient();
+        }
 
-            // Apache SSHD requires the password for the proxy to be preloaded 
into the sshClient instance before connecting
-            if (StringUtils.hasText(sshProtocol.getProxyPassword())) {
-                sshClient.addPasswordIdentity(sshProtocol.getProxyPassword());
-                log.debug("Loaded proxy server password authentication: 
{}@{}", sshProtocol.getProxyUsername(), sshProtocol.getProxyHost());
-            }
-            if (StringUtils.hasText(sshProtocol.getProxyPrivateKey())) {
-                
proxyConfig.setIdentities(List.of(sshProtocol.getProxyPrivateKey()));
-                log.debug("Proxy private key loaded into HostConfigEntry");
+        try {
+            HostConfigEntry proxyConfig = new HostConfigEntry();
+            if (useProxy && StringUtils.hasText(sshProtocol.getProxyHost())) {
+                String proxySpec = String.format("%s@%s:%d", 
sshProtocol.getProxyUsername(), sshProtocol.getProxyHost(),
+                                                 
Integer.parseInt(sshProtocol.getProxyPort()));
+                proxyConfig.setHostName(sshProtocol.getHost());
+                proxyConfig.setHost(sshProtocol.getHost());
+                proxyConfig.setPort(Integer.parseInt(sshProtocol.getPort()));
+                proxyConfig.setUsername(sshProtocol.getUsername());
+                proxyConfig.setProxyJump(proxySpec);
+
+                // Apache SSHD requires the password for the proxy to be 
preloaded into the sshClient instance before connecting
+                if (StringUtils.hasText(sshProtocol.getProxyPassword())) {
+                    
sshClient.addPasswordIdentity(sshProtocol.getProxyPassword());
+                    log.debug("Loaded proxy server password authentication: 
{}@{}", sshProtocol.getProxyUsername(),
+                              sshProtocol.getProxyHost());
+                }
+                if (StringUtils.hasText(sshProtocol.getProxyPrivateKey())) {
+                    
proxyConfig.setIdentities(List.of(sshProtocol.getProxyPrivateKey()));
+                    log.debug("Proxy private key loaded into HostConfigEntry");
+                }
             }
-        }
 
-        if (useProxy && StringUtils.hasText(sshProtocol.getProxyHost())) {
-            try {
+            if (useProxy && StringUtils.hasText(sshProtocol.getProxyHost())) {
                 clientSession = sshClient.connect(proxyConfig)
                                          .verify(timeout, 
TimeUnit.MILLISECONDS).getSession();
+            } else {
+                clientSession = sshClient.connect(sshProtocol.getUsername(), 
sshProtocol.getHost(), Integer.parseInt(sshProtocol.getPort()))
+                                         .verify(timeout, 
TimeUnit.MILLISECONDS).getSession();
             }
-            finally {
-                
sshClient.removePasswordIdentity(sshProtocol.getProxyPassword());
-            }
-        } else {
-            clientSession = sshClient.connect(sshProtocol.getUsername(), 
sshProtocol.getHost(), Integer.parseInt(sshProtocol.getPort()))
-                                     .verify(timeout, 
TimeUnit.MILLISECONDS).getSession();
-        }
 
-        if (StringUtils.hasText(sshProtocol.getPassword())) {
-            clientSession.addPasswordIdentity(sshProtocol.getPassword());
-        } else if (StringUtils.hasText(sshProtocol.getPrivateKey())) {
-            var resourceKey = 
PrivateKeyUtils.writePrivateKey(sshProtocol.getHost(), 
sshProtocol.getPrivateKey());
-            try (InputStream keyStream = new FileInputStream(resourceKey)) {
-                FilePasswordProvider passwordProvider = (session, resource, 
index) -> {
-                    if 
(StringUtils.hasText(sshProtocol.getPrivateKeyPassphrase())) {
-                        return sshProtocol.getPrivateKeyPassphrase();
+            if (StringUtils.hasText(sshProtocol.getPassword())) {
+                clientSession.addPasswordIdentity(sshProtocol.getPassword());
+            } else if (StringUtils.hasText(sshProtocol.getPrivateKey())) {
+                var resourceKey = 
PrivateKeyUtils.writePrivateKey(sshProtocol.getHost(), 
sshProtocol.getPrivateKey());
+                try (InputStream keyStream = new FileInputStream(resourceKey)) 
{
+                    FilePasswordProvider passwordProvider = (session, 
resource, index) -> {
+                        if 
(StringUtils.hasText(sshProtocol.getPrivateKeyPassphrase())) {
+                            return sshProtocol.getPrivateKeyPassphrase();
+                        }
+                        return null;
+                    };
+                    Iterable<KeyPair> keyPairs = 
SecurityUtils.loadKeyPairIdentities(null, () -> resourceKey, keyStream,
+                                                                               
      passwordProvider);
+                    if (keyPairs != null) {
+                        keyPairs.forEach(clientSession::addPublicKeyIdentity);
+                    } else {
+                        log.error("Failed to load private key pairs from: {}", 
resourceKey);
                     }
-                    return null;
-                };
-                Iterable<KeyPair> keyPairs = 
SecurityUtils.loadKeyPairIdentities(null, () -> resourceKey, keyStream, 
passwordProvider);
-                if (keyPairs != null) {
-                    keyPairs.forEach(clientSession::addPublicKeyIdentity);
-                } else {
-                    log.error("Failed to load private key pairs from: {}", 
resourceKey);
+                } catch (IOException e) {
+                    log.error("Error reading private key file: {}", 
e.getMessage());
                 }
-            } catch (IOException e) {
-                log.error("Error reading private key file: {}", 
e.getMessage());
-            }
-        }  // else auth with localhost private public key certificates
+            }  // else auth with localhost private public key certificates
 
-        // auth
-        if (!clientSession.auth().verify(timeout, 
TimeUnit.MILLISECONDS).isSuccess()) {
-            clientSession.close();
-            throw new IllegalArgumentException("ssh auth failed.");
-        }
-        if (reuseConnection || useProxy) {
-            SshConnect sshConnect = new SshConnect(clientSession);
-            CONNECTION_COMMON_CACHE.addCache(identifier, sshConnect);
+            // auth
+            if (!clientSession.auth().verify(timeout, 
TimeUnit.MILLISECONDS).isSuccess()) {
+                clientSession.close();
+                if (isScopedClient) {
+                    sshClient.stop();
+                }
+                throw new IllegalArgumentException("ssh auth failed.");
+            }
+            if (isScopedClient) {
+                clientSession.addCloseFutureListener(future -> {
+                    try {
+                        log.debug("Session closed, stopping scoped 
SshClient.");
+                        sshClient.stop();
+                    } catch (Exception e) {
+                        log.error("Failed to stop scoped SshClient", e);
+                    }
+                });
+            }
+            if (reuseConnection || useProxy) {
+                SshConnect sshConnect = new SshConnect(clientSession);
+                CONNECTION_COMMON_CACHE.addCache(identifier, sshConnect);
+            }
+            return clientSession;
+        } catch (Exception e) {
+            if (isScopedClient && sshClient.isStarted()) {
+                try {
+                    // If the session has been established but an error occurs 
afterward, session.close() will trigger the above listener.
+                    // If the session has not been established yet, manually 
stop
+                    if (clientSession == null || clientSession.isClosed()) {
+                        sshClient.stop();
+                    }
+                } catch (Exception ex) {
+                    log.warn("Error stopping scoped client on failure", ex);
+                }
+            }

Review Comment:
   The scoped SSH client lifecycle management logic introduces significant new 
behavior that is not covered by tests. The new code handles:
   1. Creating a dedicated SSH client for proxy connections (lines 150-169)
   2. Starting and stopping the scoped client (lines 167, 232, 240, 257)
   3. Adding a close listener to clean up resources (lines 237-244)
   4. Complex error handling with conditional cleanup (lines 252-262)
   
   This functionality is critical for preventing the "Too many authentication 
failures" issue. Consider adding unit tests to verify:
   - Scoped client is created only for proxy connections
   - Client is properly stopped when session closes
   - Client is stopped on authentication failure
   - Client is stopped when exceptions occur before session establishment
   - No resource leaks occur in various failure scenarios



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to