This is an automated email from the ASF dual-hosted git repository. Aias00 pushed a commit to branch 2.0.0 in repository https://gitbox.apache.org/repos/asf/hertzbeat.git
commit d1e533f23d521d4b5d1418500b302cf7d8c6568d Author: liuhy <[email protected]> AuthorDate: Fri Jun 5 14:26:08 2026 +0800 fix(collector): fix resource leaks in WebSocket, script, FTP, and JDBC collectors - WebsocketCollectImpl: add finally block to close socket and I/O streams on all code paths (previously only closed on success, leaking on exception) - ScriptCollectImpl: add finally block to close BufferedReader and destroy Process; restore interrupt flag on InterruptedException - FtpCollectImpl: add finally block to disconnect FTPClient on exception - JdbcCommonCollect: close JDBC Connection if setReadOnly/createStatement fails (previously leaked because statement remained null and caller's finally block skipped cleanup) Co-Authored-By: Claude Opus 4.8 <[email protected]> --- .../collect/database/JdbcCommonCollect.java | 14 +++++++-- .../collector/collect/ftp/FtpCollectImpl.java | 8 +++++ .../collect/script/ScriptCollectImpl.java | 18 +++++++++-- .../collect/websocket/WebsocketCollectImpl.java | 35 +++++++++++++++++----- 4 files changed, 64 insertions(+), 11 deletions(-) diff --git a/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/database/JdbcCommonCollect.java b/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/database/JdbcCommonCollect.java index 42529c7284..e34a8b28b1 100644 --- a/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/database/JdbcCommonCollect.java +++ b/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/database/JdbcCommonCollect.java @@ -324,8 +324,18 @@ public class JdbcCommonCollect extends AbstractCollect { } // renew connection when failed Connection connection = DriverManager.getConnection(url, username, password); - connection.setReadOnly(true); - statement = connection.createStatement(); + try { + connection.setReadOnly(true); + statement = connection.createStatement(); + } catch (Exception e) { + // Close connection if statement creation fails to prevent connection leak + try { + connection.close(); + } catch (Exception closeEx) { + log.warn("Failed to close connection after statement creation error: {}", closeEx.getMessage()); + } + throw e; + } int timeoutSecond = timeout / 1000; timeoutSecond = timeoutSecond <= 0 ? 1 : timeoutSecond; statement.setQueryTimeout(timeoutSecond); diff --git a/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/ftp/FtpCollectImpl.java b/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/ftp/FtpCollectImpl.java index 03307e1c7a..66c43cfbca 100644 --- a/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/ftp/FtpCollectImpl.java +++ b/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/ftp/FtpCollectImpl.java @@ -191,6 +191,14 @@ public class FtpCollectImpl extends AbstractCollect { builder.setCode(CollectRep.Code.UN_CONNECTABLE); builder.setMsg(e.getMessage()); return; + } finally { + if (ftpClient.isConnected()) { + try { + ftpClient.disconnect(); + } catch (IOException e) { + log.warn("[FTPClient] error while disconnecting: {}", e.getMessage()); + } + } } builder.addValueRow(valueRowBuilder.build()); } diff --git a/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/script/ScriptCollectImpl.java b/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/script/ScriptCollectImpl.java index 3d797b53a1..5d85bcf8be 100644 --- a/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/script/ScriptCollectImpl.java +++ b/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/script/ScriptCollectImpl.java @@ -110,9 +110,11 @@ public class ScriptCollectImpl extends AbstractCollect { processBuilder.directory(new File(workDirectory)); } // execute command + Process process = null; + BufferedReader reader = null; try { - Process process = processBuilder.start(); - BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream(), Charset.forName(scriptProtocol.getCharset()))); + process = processBuilder.start(); + reader = new BufferedReader(new InputStreamReader(process.getInputStream(), Charset.forName(scriptProtocol.getCharset()))); StringBuilder response = new StringBuilder(); String line; while ((line = reader.readLine()) != null) { @@ -148,11 +150,23 @@ public class ScriptCollectImpl extends AbstractCollect { log.warn(errorMsg); builder.setCode(CollectRep.Code.FAIL); builder.setMsg("Peer interrupt this script: " + errorMsg); + Thread.currentThread().interrupt(); } catch (Exception exception) { String errorMsg = CommonUtil.getMessageFromThrowable(exception); log.warn(errorMsg); builder.setCode(CollectRep.Code.FAIL); builder.setMsg(errorMsg); + } finally { + if (reader != null) { + try { + reader.close(); + } catch (IOException e) { + log.warn(e.getMessage()); + } + } + if (process != null) { + process.destroy(); + } } } diff --git a/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/websocket/WebsocketCollectImpl.java b/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/websocket/WebsocketCollectImpl.java index 1ff8c1aa4d..4bb461f136 100644 --- a/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/websocket/WebsocketCollectImpl.java +++ b/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/websocket/WebsocketCollectImpl.java @@ -73,6 +73,8 @@ public class WebsocketCollectImpl extends AbstractCollect { String host = websocketProtocol.getHost(); String port = websocketProtocol.getPort(); Socket socket = null; + OutputStream out = null; + InputStream in = null; try { socket = new Socket(); SocketAddress socketAddress = new InetSocketAddress(host, Integer.parseInt(port)); @@ -80,17 +82,13 @@ public class WebsocketCollectImpl extends AbstractCollect { if (socket.isConnected()) { long responseTime = System.currentTimeMillis() - startTime; - OutputStream out = socket.getOutputStream(); - InputStream in = socket.getInputStream(); - + out = socket.getOutputStream(); + in = socket.getInputStream(); + send(out, websocketProtocol); Map<String, String> resultMap = readHeaders(in); resultMap.put(CollectorConstants.RESPONSE_TIME, Long.toString(responseTime)); - // Close the output stream and socket connection - in.close(); - out.close(); - socket.close(); List<String> aliasFields = metrics.getAliasFields(); CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder(); for (String field : aliasFields) { @@ -117,6 +115,29 @@ public class WebsocketCollectImpl extends AbstractCollect { log.info(errorMsg); builder.setCode(CollectRep.Code.UN_CONNECTABLE); builder.setMsg("Connect may fail:" + errorMsg); + } finally { + // Ensure socket and streams are always closed to prevent resource leaks + if (in != null) { + try { + in.close(); + } catch (IOException e) { + log.warn(e.getMessage()); + } + } + if (out != null) { + try { + out.close(); + } catch (IOException e) { + log.warn(e.getMessage()); + } + } + if (socket != null) { + try { + socket.close(); + } catch (IOException e) { + log.warn(e.getMessage()); + } + } } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
