This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 79750a8  [ZEPPELIN-5510]Thread leakage in JobManager of flink 
interpreter.
79750a8 is described below

commit 79750a88025bd64cff5830cf2ec8e6a6181454f5
Author: nicolasgong <nicolasgon...@gmail.com>
AuthorDate: Mon Sep 27 12:43:57 2021 +0800

    [ZEPPELIN-5510]Thread leakage in JobManager of flink interpreter.
    
    [ZEPPELIN-5510] Thread leakage in JobManager of flink interpreter.
    
    ### What is this PR for?
    There are some logic problems, which may lead to thread leaks.
    Fix code issues.
    
    ### What type of PR is it?
    [Improvement]
    
    ### What is the Jira issue?
    https://issues.apache.org/jira/browse/ZEPPELIN-5510
    
    Author: nicolasgong <nicolasgon...@gmail.com>
    Author: YiningGong <nicolasgon...@gmail.com>
    
    Closes #4216 from NicolasGong/bugfix-0.2 and squashes the following commits:
    
    57835a8985 [nicolasgong] Added null - value verification for parameters
    a9bb128c22 [YiningGong] Thread leakage in JobManager of flink interpreter.
---
 .../main/java/org/apache/zeppelin/flink/JobManager.java | 17 ++++++++++++-----
 1 file changed, 12 insertions(+), 5 deletions(-)

diff --git 
a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/JobManager.java
 
b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/JobManager.java
index d60ff1a..49c803a 100644
--- 
a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/JobManager.java
+++ 
b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/JobManager.java
@@ -62,19 +62,20 @@ public class JobManager {
     LOGGER.info("Creating JobManager at flinkWebUrl: {}, displayedFlinkWebUrl: 
{}",
             flinkWebUrl, displayedFlinkWebUrl);
   }
-
+  
   public void addJob(InterpreterContext context, JobClient jobClient) {
     String paragraphId = context.getParagraphId();
     JobClient previousJobClient = this.jobs.put(paragraphId, jobClient);
+    if (previousJobClient != null) {
+      LOGGER.warn("There's another Job {} that is associated with paragraph 
{}",
+              jobClient.getJobID(), paragraphId);
+      return;
+    }
     long checkInterval = 
Long.parseLong(properties.getProperty("zeppelin.flink.job.check_interval", 
"1000"));
     FlinkJobProgressPoller thread = new FlinkJobProgressPoller(flinkWebUrl, 
jobClient.getJobID(), context, checkInterval);
     thread.setName("JobProgressPoller-Thread-" + paragraphId);
     thread.start();
     this.jobProgressPollerMap.put(jobClient.getJobID(), thread);
-    if (previousJobClient != null) {
-      LOGGER.warn("There's another Job {} that is associated with paragraph 
{}",
-              jobClient.getJobID(), paragraphId);
-    }
   }
 
   public void removeJob(String paragraphId) {
@@ -87,6 +88,12 @@ public class JobManager {
     }
     FlinkJobProgressPoller jobProgressPoller =
             this.jobProgressPollerMap.remove(jobClient.getJobID());
+    if (jobProgressPoller == null) {
+        LOGGER.warn("Unable to remove poller, because no poller is associated 
with paragraph: "
+                + paragraphId);
+        return;
+    }
+
     jobProgressPoller.cancel();
     jobProgressPoller.interrupt();
   }

Reply via email to