zhaoguangqin opened a new issue, #4152:
URL: https://github.com/apache/hertzbeat/issues/4152

   ### Is there an existing issue for this?
   
   - [x] I have searched the existing issues
   
   ### Current Behavior
   
   - [问题一:SSH 连接未断开导致内存持续飙升](#问题一ssh-连接未断开导致内存持续飙升)
   - [问题二:SSH 密钥错误导致无限重试](#问题二ssh-密钥错误导致无限重试)
   - [修复方案总结](#修复方案总结)
   - [修改文件清单](#修改文件清单)
   
   ---
   
   ## 问题一:SSH 连接未断开导致内存持续飙升
   
   ### 现象
   
   服务器内存持续增长,SSH 相关的hertzbeat日志中出现大量密钥重新交换信息:
   
   ```
   requestNewKeysExchange(ClientSessionImpl[admin@/192.168.1.3:22]) Initiating 
key re-exchange
   ```
   
   ### 根因分析
   
   #### 1.1 心跳过于频繁(原 2 秒/次)
   
   `CommonSshClient.java` 原代码:
   
   ```java
   // 每 2 秒发送一次心跳
   PropertyResolverUtils.updateProperty(
           SSH_CLIENT, CoreModuleProperties.HEARTBEAT_INTERVAL.getName(), 2000);
   PropertyResolverUtils.updateProperty(
           SSH_CLIENT, CoreModuleProperties.HEARTBEAT_NO_REPLY_MAX.getName(), 
30);
   ```
   
   200 个 SSH 监控目标 × 每 2 秒一次心跳 = **每秒 100 次心跳**,产生大量 NIO ByteBuffer 分配和加密操作。
   
   ### 复现条件
   
   1. 在 HertzBeat 中配置若干 SSH 监控目标,开启 `reuseConnection=true`
   2. 采集频率设置为 60 秒
   3. 持续运行数小时后观察:
      - hertzbeat日志中 `requestNewKeysExchange` 和 `ServerKeyVerifier` 相关 
INFO/WARN 消息持续大量出现
      - JVM 内存(特别是老年代)持续增长,不回落
      - 通过 `jmap -histo <pid>` 可查看 `byte[]`、`SessionListener` 等对象数量只增不减
      - 目标服务器侧 `ss -tn | grep :22` 可观察到大量 ESTABLISHED 连接长时间不释放
   
   #### 1.2 Session 永不过期
   
   `CommonSshClient.java` 原代码中**没有配置 `IDLE_TIMEOUT`**,Session 没有空闲超时机制。
   
   同时 `SshHelper.getConnectSession()` 中缓存命中时传入 `refreshCache=true`,每次访问都刷新缓存 
TTL(600 秒),导致缓存永远不会过期:
   
   ```java
   // SshHelper.java 原代码 - 没有 session 最大存活期限制
   Optional<AbstractConnection<?>> cacheOption = 
CONNECTION_COMMON_CACHE.getCache(identifier, true);
   if (cacheOption.isPresent()) {
       SshConnect sshConnect = (SshConnect) cacheOption.get();
       clientSession = sshConnect.getConnection();
       try {
           // 只检查了 isClosed/isClosing,没有检查存活时间
           if (clientSession == null || clientSession.isClosed() || 
clientSession.isClosing()) {
               clientSession = null;
               CONNECTION_COMMON_CACHE.removeCache(identifier);
           }
       } ...
   }
   ```
   
   **结果**:心跳保活 + 缓存 TTL 不断刷新 = Session 永远存活,内部资源(NIO 管道、加密上下文、密钥交换状态)持续累积。
   
   #### 1.3 每次采集创建新 Channel,异步关闭导致堆积
   
   `SshCollectImpl.java` 原代码中,每次采集都在同一个 Session 上创建新的 exec channel:
   
   ```java
   // 第94行 - 每次采集都在 Session 上创建新的 exec channel
   channel = clientSession.createExecChannel(sshProtocol.getScript());
   channel.setOut(response);
   channel.setErr(new NoCloseOutputStream(System.err));
   channel.open().verify(timeout);
   ```
   
   `createExecChannel` 会在 Session 内部的 channel map 
中注册一个新通道,分配独立的缓冲区(InputStream/OutputStream)、NIO 管道和加密上下文。**这是内存资源的实际产生点。**
   
   而 finally 块中,channel 的关闭是异步的:
   
   ```java
   // finally 块 - 异步关闭 channel,不等完成就返回
   channel.close(false).addListener(future ->
           log.debug("channel is closed in {} ms", System.currentTimeMillis() - 
st));
   ```
   
   `channel.close(false)` 返回 `CloseFuture`,是异步非阻塞的。方法立即返回,Session 内部的 channel 
引用并没有被移除。
   
   **完整的内存泄漏链路**:
   
   ```
   Session 被缓存复用,永远不会关闭
       ↓
   每 60 秒调用一次 createExecChannel(),在 Session 上新建一个 exec channel
       ↓
   channel.close(false) 异步关闭,不等完成
       ↓
   如果远端 sshd 响应慢,旧 channel 尚未完成关闭,新 channel 已创建
       ↓
   Session 内部 channel map 中堆积越来越多的“正在关闭中”的 channel
       ↓
   每个 channel 持有自己的缓冲区、IO 流、加密上下文 → 内存持续飙升
   ```
   
   简而言之:**Channel 是资源的产生者,Session 是资源的持有者,异步关闭是资源堆积的直接原因。**
   
   ---
   
   ## 问题二:SSH 密钥错误导致无限重试
   
   ### 现象
   
   SSH 密钥/密码配置错误后,系统仍然每隔 60 秒尝试连接,远端服务器的 SSH 连接数被打满,其他用户无法登录,表现为"服务器被攻击"。
   
   ### 根因分析
   
   #### 2.1 没有熔断机制
   
   `SshHelper.java` 原代码中,认证失败后直接抛异常,没有任何失败计数或熔断逻辑:
   
   ```java
   // auth
   if (!clientSession.auth().verify(timeout, 
TimeUnit.MILLISECONDS).isSuccess()) {
       clientSession.close();
       throw new IllegalArgumentException("ssh auth failed.");  // 直接抛出,无记录
   }
   ```
   
   `SshCollectImpl.java` 中捕获异常后只设置采集状态为 FAIL,下次采集周期照常重试:
   
   ```java
   } catch (Exception exception) {
       String errorMsg = CommonUtil.getMessageFromThrowable(exception);
       log.warn(errorMsg, exception);
       builder.setCode(CollectRep.Code.FAIL);
       builder.setMsg(errorMsg);
       // 没有任何熔断处理,60秒后继续重试
   }
   ```
   
   ### 复现条件
   
   1. 在 HertzBeat 中新增一个 SSH 监控目标,**故意填写错误的密码或密钥**
   2. 对该目标配置多个采集指标(如 CPU、内存、磁盘各一个 SSH 指标),采集频率 60 秒
   3. 在目标服务器上执行 `ss -tn | grep :22 | wc -l` 观察连接数
   4. 每 60 秒可观察到:
      - 连接数周期性增加,远端 sshd 的 `MaxStartups` 限制被打满
      - 其他正常用户无法 SSH 登录目标服务器,报 `Connection closed by remote host`
      - 目标服务器 `/var/log/auth.log` 或 `/var/log/secure` 中出现大量 `Failed password` 记录
      - HertzBeat 日志中持续输出 `ssh auth failed` 但没有任何收敛趋势
   
   #### 2.2 没有负缓存
   
   认证失败时,异常在缓存写入之前抛出,失败结果不会被缓存:
   
   ```java
   // auth 失败 → 抛异常
   if (!clientSession.auth().verify(...).isSuccess()) {
       clientSession.close();
       throw new IllegalArgumentException("ssh auth failed.");  // ← 在这里抛出
   }
   // 下面这行永远执行不到
   if (reuseConnection) {
       CONNECTION_COMMON_CACHE.addCache(identifier, sshConnect);  // ← 不会缓存失败结果
   }
   ```
   
   **结果**:每次采集周期都从 TCP 握手开始,完整走一遍连接流程。N 个指标 × 每 60 秒 = 每分钟 N 次完整的 SSH 暴力连接。
   
   #### 2.3 Auth 失败时 Session 异步关闭
   
   ```java
   if (!clientSession.auth().verify(...).isSuccess()) {
       clientSession.close();  // ← 异步!不等完成就抛异常
       throw new IllegalArgumentException("ssh auth failed.");
   }
   ```
   
   `close()` 是异步的,紧接着就抛异常。远端 sshd 可能还没完全释放连接资源,下一次连接又来了,导致 `MaxStartups` 被打满。
   
   ---
   
   ## 修复方案总结
   
   ### 方案一:Session 生命周期管理(解决问题一)
   
   #### 修改 1:心跳降频 + 添加空闲超时
   
   **文件**:`CommonSshClient.java`(`hertzbeat-collector-common` 模块)
   
   ```java
   // 心跳从 2 秒降为 30 秒
   PropertyResolverUtils.updateProperty(
           SSH_CLIENT, CoreModuleProperties.HEARTBEAT_INTERVAL.getName(), 
30000);
   
   // 新增:session 空闲 10 分钟自动关闭
   PropertyResolverUtils.updateProperty(
           SSH_CLIENT, CoreModuleProperties.IDLE_TIMEOUT.getName(), 
TimeUnit.MINUTES.toMillis(10));
   ```
   
   #### 修改 2:记录 Session 创建时间
   
   **文件**:`SshConnect.java`(`hertzbeat-collector-common` 模块)
   
   ```java
   public class SshConnect extends AbstractConnection<ClientSession> {
       private final ClientSession clientSession;
       private final long createTime;  // 新增:记录创建时间
   
       public SshConnect(ClientSession clientSession) {
           this.clientSession = clientSession;
           this.createTime = System.currentTimeMillis();
       }
   
       public long getCreateTime() {
           return createTime;
       }
       // ...
   }
   ```
   
   #### 修改 3:Session 最大存活期(30 分钟定期换血)
   
   **文件**:`SshHelper.java`(`hertzbeat-collector-common` 模块)
   
   ```java
   // 新增常量
   private static final long MAX_SESSION_LIFETIME = 
TimeUnit.MINUTES.toMillis(30);
   
   // 缓存命中检查中增加存活期判断(两个 getConnectSession 方法均修改)
   if (clientSession == null || clientSession.isClosed() || 
clientSession.isClosing()
           || System.currentTimeMillis() - sshConnect.getCreateTime() > 
MAX_SESSION_LIFETIME) {
       clientSession = null;
       CONNECTION_COMMON_CACHE.removeCache(identifier);
   }
   ```
   
   #### 修改 4:Channel 关闭改为同步等待(核心修复)
   
   **文件**:`SshCollectImpl.java`(`hertzbeat-collector-basic` 模块)
   
   `createExecChannel` 每次采集都会在 Session 上创建新的 exec 
channel,是内存资源的实际产生点。修复的关键在于:确保每次采集周期结束时,channel 已经完全关闭并从 Session 中移除,再进入下一次采集。
   
   ```java
   // 旧:异步关闭,不等完成
   channel.close(false).addListener(future ->
           log.debug("channel is closed in {} ms", ...));
   
   // 新:同步等待关闭完成
   channel.close(false).await(timeout);
   log.debug("channel is closed in {} ms", System.currentTimeMillis() - st);
   ```
   
   ### 方案二:认证失败熔断机制(解决问题二)
   
   #### 修改 5:新增 SshCircuitBreaker 熔断器
   
   **文件**:`SshCircuitBreaker.java`(新增,`hertzbeat-collector-common` 模块)
   
   | 配置项 | 值 | 说明 |
   |---|---|---|
   | `MAX_FAILURES` | 3 | 连续失败 3 次后触发熔断 |
   | `COOLDOWN_MS` | 5 分钟 | 熔断冷却期,期间不再尝试连接 |
   
   完整类实现如下:
   
   ```java
   package org.apache.hertzbeat.collector.collect.common.ssh;
   
   import java.util.concurrent.ConcurrentHashMap;
   import java.util.concurrent.TimeUnit;
   import java.util.concurrent.atomic.AtomicInteger;
   import lombok.extern.slf4j.Slf4j;
   
   /**
    * SSH connection circuit breaker to prevent repeated failed connection 
attempts.
    * After MAX_FAILURES consecutive auth failures for a target, further 
attempts are blocked
    * for COOLDOWN_MS milliseconds to avoid flooding the remote server.
    */
   @Slf4j
   public class SshCircuitBreaker {
   
       private static final int MAX_FAILURES = 3;
       private static final long COOLDOWN_MS = TimeUnit.MINUTES.toMillis(5);
   
       private static final ConcurrentHashMap<String, FailureRecord> RECORDS = 
new ConcurrentHashMap<>();
   
       /**
        * Build a circuit breaker key from SSH target info.
        */
       public static String buildKey(String host, String port, String username) 
{
           return username + "@" + host + ":" + port;
       }
   
       /**
        * Check whether the circuit for the given target is open (blocked).
        * If open, throws IllegalArgumentException immediately.
        */
       public static void checkOpen(String key) {
           FailureRecord record = RECORDS.get(key);
           if (record == null) {
               return;
           }
           if (record.failureCount.get() >= MAX_FAILURES) {
               long elapsed = System.currentTimeMillis() - 
record.lastFailureTime;
               if (elapsed < COOLDOWN_MS) {
                   long remainingSeconds = (COOLDOWN_MS - elapsed) / 1000;
                   String msg = "SSH circuit breaker is OPEN for " + key
                           + ", " + record.failureCount.get() + " consecutive 
failures"
                           + ", retry available in " + remainingSeconds + "s";
                   log.warn(msg);
                   throw new IllegalArgumentException(msg);
               }
               // cooldown expired, reset
               RECORDS.remove(key);
           }
       }
   
       /**
        * Record a connection/auth failure for the given target.
        */
       public static void recordFailure(String key) {
           FailureRecord record = RECORDS.computeIfAbsent(key, k -> new 
FailureRecord());
           int count = record.failureCount.incrementAndGet();
           record.lastFailureTime = System.currentTimeMillis();
           if (count >= MAX_FAILURES) {
               log.warn("SSH circuit breaker OPEN for {} after {} consecutive 
failures, cooldown {}min",
                       key, count, 
TimeUnit.MILLISECONDS.toMinutes(COOLDOWN_MS));
           }
       }
   
       /**
        * Record a successful connection for the given target, resetting the 
failure counter.
        */
       public static void recordSuccess(String key) {
           RECORDS.remove(key);
       }
   
       private static class FailureRecord {
           final AtomicInteger failureCount = new AtomicInteger(0);
           volatile long lastFailureTime;
       }
   }
   ```
   
   #### 修改 6:SshHelper 集成熔断器
   
   **文件**:`SshHelper.java`(`hertzbeat-collector-common` 模块)
   
   在两个 `getConnectSession` 方法中各加 3 处调用:
   
   ```java
   // 1. 连接前检查熔断
   String cbKey = SshCircuitBreaker.buildKey(host, port, username);
   SshCircuitBreaker.checkOpen(cbKey);
   
   // 2. 认证失败时记录
   if (!clientSession.auth().verify(...).isSuccess()) {
       clientSession.close();
       SshCircuitBreaker.recordFailure(cbKey);  // 新增
       throw new IllegalArgumentException("ssh auth failed.");
   }
   
   // 3. 认证成功后重置
   SshCircuitBreaker.recordSuccess(cbKey);  // 新增
   ```
   
   ### 熔断效果
   
   ```
   第1次采集:认证失败 → recordFailure(计数=1)→ 正常关闭
   第2次采集:认证失败 → recordFailure(计数=2)→ 正常关闭
   第3次采集:认证失败 → recordFailure(计数=3)→ 触发熔断
   第4~N次采集(5分钟内):checkOpen 直接抛异常 → 不建立 TCP 连接 → 远端无感知
   5分钟后:冷却期结束 → 允许再次尝试 → 若密码已修正则成功
   ```
   
   ---
   
   ## 修改文件清单
   
   | 序号 | 模块 | 文件 | 修改类型 | 说明 |
   |---|---|---|---|---|
   | 1 | `hertzbeat-collector-common` | `CommonSshClient.java` | 修改 | 心跳降频 
30s,新增 IDLE_TIMEOUT 10min |
   | 2 | `hertzbeat-collector-common` | `SshConnect.java` | 修改 | 新增 createTime 
字段和 getter |
   | 3 | `hertzbeat-collector-common` | `SshHelper.java` | 修改 | 新增 
MAX_SESSION_LIFETIME 30min,集成熔断器 |
   | 4 | `hertzbeat-collector-common` | `SshCircuitBreaker.java` | **新增** | 
认证失败熔断器 |
   | 5 | `hertzbeat-collector-basic` | `SshCollectImpl.java` | 修改 | Channel 
关闭改为同步 await |
   
   ### Expected Behavior
   
   _No response_
   
   ### Steps To Reproduce
   
   _No response_
   
   ### Environment
   
   ```markdown
   HertzBeat version(s):v1.8.0
   ```
   
   ### Debug logs
   
   _No response_
   
   ### Anything else?
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: 
[email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to