This is an automated email from the ASF dual-hosted git repository.

zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new b3b8c0784d Fix kill dynamic task doesn't kill the wait to run workflow 
instances (#15896)
b3b8c0784d is described below

commit b3b8c0784dfc31b516b13601a311bc78550bf931
Author: Wenjun Ruan <wen...@apache.org>
AuthorDate: Thu Apr 25 11:05:51 2024 +0800

    Fix kill dynamic task doesn't kill the wait to run workflow instances 
(#15896)
---
 .../dolphinscheduler/dao/mapper/CommandMapper.java |  9 +++-
 .../dolphinscheduler/dao/mapper/CommandMapper.xml  |  7 +++
 .../dao/mapper/CommandMapperTest.java              | 11 ++++-
 .../runner/task/dynamic/DynamicLogicTask.java      | 51 +++++++++++++++++++++-
 4 files changed, 74 insertions(+), 4 deletions(-)

diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java
index a8490cbef7..9fb6643227 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java
@@ -34,8 +34,9 @@ public interface CommandMapper extends BaseMapper<Command> {
 
     /**
      * count command state
-     * @param startTime startTime
-     * @param endTime endTime
+     *
+     * @param startTime    startTime
+     * @param endTime      endTime
      * @param projectCodes projectCodes
      * @return CommandCount list
      */
@@ -46,15 +47,19 @@ public interface CommandMapper extends BaseMapper<Command> {
 
     /**
      * query command page
+     *
      * @return
      */
     List<Command> queryCommandPage(@Param("limit") int limit, @Param("offset") 
int offset);
 
     /**
      * query command page by slot
+     *
      * @return command list
      */
     List<Command> queryCommandPageBySlot(@Param("limit") int limit,
                                          @Param("masterCount") int masterCount,
                                          @Param("thisMasterSlot") int 
thisMasterSlot);
+
+    void deleteByWorkflowInstanceIds(@Param("workflowInstanceIds") 
List<Integer> workflowInstanceIds);
 }
diff --git 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml
 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml
index c950f66413..56db890ef0 100644
--- 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml
+++ 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml
@@ -47,4 +47,11 @@
         order by process_instance_priority, id asc
             limit #{limit}
     </select>
+    <delete id="deleteByWorkflowInstanceIds" >
+        delete from t_ds_command
+        where process_instance_id in
+        <foreach collection="workflowInstanceIds" index="index" item="i" 
open="(" close=")" separator=",">
+            #{i}
+        </foreach>
+    </delete>
 </mapper>
diff --git 
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java
 
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java
index 3d45477d85..2d367e46e4 100644
--- 
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java
+++ 
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.dolphinscheduler.dao.mapper;
 
+import static com.google.common.truth.Truth.assertThat;
+
 import org.apache.dolphinscheduler.common.constants.Constants;
 import org.apache.dolphinscheduler.common.enums.CommandType;
 import org.apache.dolphinscheduler.common.enums.FailureStrategy;
@@ -173,6 +175,14 @@ public class CommandMapperTest extends BaseDaoTest {
         toTestQueryCommandPageBySlot(masterCount, thisMasterSlot);
     }
 
+    @Test
+    void deleteByWorkflowInstanceIds() {
+        Command command = createCommand();
+        assertThat(commandMapper.selectList(null)).isNotEmpty();
+        
commandMapper.deleteByWorkflowInstanceIds(Lists.newArrayList(command.getProcessInstanceId()));
+        assertThat(commandMapper.selectList(null)).isEmpty();
+    }
+
     private boolean toTestQueryCommandPageBySlot(int masterCount, int 
thisMasterSlot) {
         Command command = createCommand();
         Integer id = command.getId();
@@ -280,5 +290,4 @@ public class CommandMapperTest extends BaseDaoTest {
 
         return command;
     }
-
 }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicLogicTask.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicLogicTask.java
index 3baa10b343..12cae5c53e 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicLogicTask.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicLogicTask.java
@@ -252,12 +252,61 @@ public class DynamicLogicTask extends 
BaseAsyncLogicTask<DynamicParameters> {
     @Override
     public void kill() {
         try {
-            
changeRunningSubprocessInstancesToStop(WorkflowExecutionStatus.READY_STOP);
+            doKillSubWorkflowInstances();
         } catch (MasterTaskExecuteException e) {
             log.error("kill {} error", taskInstance.getName(), e);
         }
     }
 
+    private void doKillSubWorkflowInstances() throws 
MasterTaskExecuteException {
+        List<ProcessInstance> existsSubProcessInstanceList =
+                
subWorkflowService.getAllDynamicSubWorkflow(processInstance.getId(), 
taskInstance.getTaskCode());
+        if (CollectionUtils.isEmpty(existsSubProcessInstanceList)) {
+            return;
+        }
+
+        commandMapper.deleteByWorkflowInstanceIds(
+                
existsSubProcessInstanceList.stream().map(ProcessInstance::getId).collect(Collectors.toList()));
+
+        List<ProcessInstance> runningSubProcessInstanceList =
+                
subWorkflowService.filterRunningProcessInstances(existsSubProcessInstanceList);
+        doKillRunningSubWorkflowInstances(runningSubProcessInstanceList);
+
+        List<ProcessInstance> waitToRunProcessInstances =
+                
subWorkflowService.filterWaitToRunProcessInstances(existsSubProcessInstanceList);
+        doKillWaitToRunSubWorkflowInstances(waitToRunProcessInstances);
+
+        this.haveBeenCanceled = true;
+    }
+
+    private void doKillRunningSubWorkflowInstances(List<ProcessInstance> 
runningSubProcessInstanceList) throws MasterTaskExecuteException {
+        for (ProcessInstance subProcessInstance : 
runningSubProcessInstanceList) {
+            subProcessInstance.setState(WorkflowExecutionStatus.READY_STOP);
+            processInstanceDao.updateById(subProcessInstance);
+            if (subProcessInstance.getState().isFinished()) {
+                log.info("The process instance [{}] is finished, no need to 
stop", subProcessInstance.getId());
+                continue;
+            }
+            try {
+                sendToSubProcess(taskExecutionContext, subProcessInstance);
+                log.info("Success send [{}] request to SubWorkflow's master: 
{}", WorkflowExecutionStatus.READY_STOP,
+                        subProcessInstance.getHost());
+            } catch (Exception e) {
+                throw new MasterTaskExecuteException(
+                        String.format("Send stop request to SubWorkflow's 
master: %s failed",
+                                subProcessInstance.getHost()),
+                        e);
+            }
+        }
+    }
+
+    private void doKillWaitToRunSubWorkflowInstances(List<ProcessInstance> 
waitToRunWorkflowInstances) {
+        for (ProcessInstance subProcessInstance : waitToRunWorkflowInstances) {
+            subProcessInstance.setState(WorkflowExecutionStatus.STOP);
+            processInstanceDao.updateById(subProcessInstance);
+        }
+    }
+
     private void 
changeRunningSubprocessInstancesToStop(WorkflowExecutionStatus stopStatus) 
throws MasterTaskExecuteException {
         this.haveBeenCanceled = true;
         List<ProcessInstance> existsSubProcessInstanceList =

Reply via email to