This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new 79750a8 [ZEPPELIN-5510]Thread leakage in JobManager of flink interpreter. 79750a8 is described below commit 79750a88025bd64cff5830cf2ec8e6a6181454f5 Author: nicolasgong <nicolasgon...@gmail.com> AuthorDate: Mon Sep 27 12:43:57 2021 +0800 [ZEPPELIN-5510]Thread leakage in JobManager of flink interpreter. [ZEPPELIN-5510] Thread leakage in JobManager of flink interpreter. ### What is this PR for? There are some logic problems, which may lead to thread leaks. Fix code issues. ### What type of PR is it? [Improvement] ### What is the Jira issue? https://issues.apache.org/jira/browse/ZEPPELIN-5510 Author: nicolasgong <nicolasgon...@gmail.com> Author: YiningGong <nicolasgon...@gmail.com> Closes #4216 from NicolasGong/bugfix-0.2 and squashes the following commits: 57835a8985 [nicolasgong] Added null - value verification for parameters a9bb128c22 [YiningGong] Thread leakage in JobManager of flink interpreter. --- .../main/java/org/apache/zeppelin/flink/JobManager.java | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/JobManager.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/JobManager.java index d60ff1a..49c803a 100644 --- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/JobManager.java +++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/JobManager.java @@ -62,19 +62,20 @@ public class JobManager { LOGGER.info("Creating JobManager at flinkWebUrl: {}, displayedFlinkWebUrl: {}", flinkWebUrl, displayedFlinkWebUrl); } - + public void addJob(InterpreterContext context, JobClient jobClient) { String paragraphId = context.getParagraphId(); JobClient previousJobClient = this.jobs.put(paragraphId, jobClient); + if (previousJobClient != null) { + LOGGER.warn("There's another Job {} that is associated with paragraph {}", + jobClient.getJobID(), paragraphId); + return; + } long checkInterval = Long.parseLong(properties.getProperty("zeppelin.flink.job.check_interval", "1000")); FlinkJobProgressPoller thread = new FlinkJobProgressPoller(flinkWebUrl, jobClient.getJobID(), context, checkInterval); thread.setName("JobProgressPoller-Thread-" + paragraphId); thread.start(); this.jobProgressPollerMap.put(jobClient.getJobID(), thread); - if (previousJobClient != null) { - LOGGER.warn("There's another Job {} that is associated with paragraph {}", - jobClient.getJobID(), paragraphId); - } } public void removeJob(String paragraphId) { @@ -87,6 +88,12 @@ public class JobManager { } FlinkJobProgressPoller jobProgressPoller = this.jobProgressPollerMap.remove(jobClient.getJobID()); + if (jobProgressPoller == null) { + LOGGER.warn("Unable to remove poller, because no poller is associated with paragraph: " + + paragraphId); + return; + } + jobProgressPoller.cancel(); jobProgressPoller.interrupt(); }