This is an automated email from the ASF dual-hosted git repository.

menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 42528f7  ExecuteProcessEngine.finish (#9706)
42528f7 is described below

commit 42528f7fbec7ddfa59543254b6d7779af4ced79a
Author: Juan Pan(Trista) <[email protected]>
AuthorDate: Wed Mar 17 17:43:44 2021 +0800

    ExecuteProcessEngine.finish (#9706)
    
    * ExecuteProcessEngine.finish
    
    * fix test
    
    * rename parameters
---
 .../process/GovernanceExecuteProcessReporter.java  |  6 +--
 .../engine/driver/jdbc/JDBCExecutorCallback.java   | 10 ++++
 .../executor/sql/process/ExecuteProcessEngine.java | 12 +++--
 .../process/event/ExecuteProcessCreatedEvent.java  | 54 ----------------------
 .../event/ExecuteProcessUnitCreatedEvent.java      | 39 ----------------
 ...essStatus.java => ExecuteProcessConstants.java} |  6 +--
 .../sql/process/model/ExecuteProcessUnit.java      |  2 +-
 .../sql/process/spi/ExecuteProcessReporter.java    | 10 ++--
 .../engine/jdbc/JDBCExecutorCallbackTest.java      |  4 +-
 9 files changed, 31 insertions(+), 112 deletions(-)

diff --git 
a/shardingsphere-governance/shardingsphere-governance-context/src/main/java/org/apache/shardingsphere/governance/context/process/GovernanceExecuteProcessReporter.java
 
b/shardingsphere-governance/shardingsphere-governance-context/src/main/java/org/apache/shardingsphere/governance/context/process/GovernanceExecuteProcessReporter.java
index acd092e..ce2b5b9 100644
--- 
a/shardingsphere-governance/shardingsphere-governance-context/src/main/java/org/apache/shardingsphere/governance/context/process/GovernanceExecuteProcessReporter.java
+++ 
b/shardingsphere-governance/shardingsphere-governance-context/src/main/java/org/apache/shardingsphere/governance/context/process/GovernanceExecuteProcessReporter.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.governance.context.process;
 import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
 import 
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutionUnit;
-import 
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessStatus;
+import 
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessConstants;
 import 
org.apache.shardingsphere.infra.executor.sql.process.spi.ExecuteProcessReporter;
 
 /**
@@ -29,12 +29,12 @@ import 
org.apache.shardingsphere.infra.executor.sql.process.spi.ExecuteProcessRe
 public final class GovernanceExecuteProcessReporter implements 
ExecuteProcessReporter {
     
     @Override
-    public void report(final SQLStatementContext<?> context, final 
ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext, final 
ExecuteProcessStatus status) {
+    public void report(final SQLStatementContext<?> context, final 
ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext, final 
ExecuteProcessConstants constants) {
         // TODO :Call API of configCenter
     }
     
     @Override
-    public void report(final String executionID, final SQLExecutionUnit 
executionUnit, final ExecuteProcessStatus status) {
+    public void report(final String executionID, final SQLExecutionUnit 
executionUnit, final ExecuteProcessConstants constants) {
         // TODO :Call API of configCenter
     }
 }
diff --git 
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java
 
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java
index 4e7de95..8ad00c4 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java
@@ -23,9 +23,12 @@ import 
org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.executor.kernel.model.ExecutorCallback;
 import org.apache.shardingsphere.infra.executor.sql.context.SQLUnit;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutionUnit;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
 import org.apache.shardingsphere.infra.executor.sql.hook.SPISQLExecutionHook;
 import org.apache.shardingsphere.infra.executor.sql.hook.SQLExecutionHook;
+import 
org.apache.shardingsphere.infra.executor.sql.process.ExecuteProcessEngine;
+import 
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessConstants;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
 
 import java.sql.DatabaseMetaData;
@@ -81,6 +84,7 @@ public abstract class JDBCExecutorCallback<T> implements 
ExecutorCallback<JDBCEx
             
sqlExecutionHook.start(jdbcExecutionUnit.getExecutionUnit().getDataSourceName(),
 sqlUnit.getSql(), sqlUnit.getParameters(), dataSourceMetaData, isTrunkThread, 
dataMap);
             T result = executeSQL(sqlUnit.getSql(), 
jdbcExecutionUnit.getStorageResource(), jdbcExecutionUnit.getConnectionMode());
             sqlExecutionHook.finishSuccess();
+            finishReport(dataMap, jdbcExecutionUnit);
             return result;
         } catch (final SQLException ex) {
             if (!isTrunkThread) {
@@ -106,6 +110,12 @@ public abstract class JDBCExecutorCallback<T> implements 
ExecutorCallback<JDBCEx
         return result;
     }
     
+    private void finishReport(final Map<String, Object> dataMap, final 
SQLExecutionUnit executionUnit) {
+        if (dataMap.containsKey(ExecuteProcessConstants.EXECUTE_ID.name())) {
+            
ExecuteProcessEngine.finish(dataMap.get(ExecuteProcessConstants.EXECUTE_ID.name()).toString(),
 executionUnit);
+        }
+    }
+    
     protected abstract T executeSQL(String sql, Statement statement, 
ConnectionMode connectionMode) throws SQLException;
     
     protected abstract Optional<T> getSaneResult(SQLStatement sqlStatement);
diff --git 
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ExecuteProcessEngine.java
 
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ExecuteProcessEngine.java
index 64c6ace..6aa70c4 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ExecuteProcessEngine.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ExecuteProcessEngine.java
@@ -21,8 +21,9 @@ import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
 import 
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
+import org.apache.shardingsphere.infra.executor.kernel.model.ExecutorDataMap;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutionUnit;
-import 
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessStatus;
+import 
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessConstants;
 import 
org.apache.shardingsphere.infra.executor.sql.process.spi.ExecuteProcessReporter;
 import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
 
@@ -49,19 +50,20 @@ public final class ExecuteProcessEngine {
      */
     public static void initialize(final SQLStatementContext<?> context, final 
ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext) {
         if (!HANDLERS.isEmpty() && 
ExecuteProcessStrategyEvaluator.evaluate(context, executionGroupContext)) {
-            HANDLERS.iterator().next().report(context, executionGroupContext, 
ExecuteProcessStatus.DOING);
+            
ExecutorDataMap.getValue().put(ExecuteProcessConstants.EXECUTE_ID.name(), 
executionGroupContext.getExecutionID());
+            HANDLERS.iterator().next().report(context, executionGroupContext, 
ExecuteProcessConstants.EXECUTE_STATUS_START);
         }
     }
     
     /**
-     * Complete.
+     * Finish.
      *
      * @param executionID execution ID
      * @param executionUnit execution unit
      */
-    public static void complete(final String executionID, final 
SQLExecutionUnit executionUnit) {
+    public static void finish(final String executionID, final SQLExecutionUnit 
executionUnit) {
         if (!HANDLERS.isEmpty()) {
-            HANDLERS.iterator().next().report(executionID, executionUnit, 
ExecuteProcessStatus.DONE);
+            HANDLERS.iterator().next().report(executionID, executionUnit, 
ExecuteProcessConstants.EXECUTE_STATUS_DONE);
         }
     }
 }
diff --git 
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/event/ExecuteProcessCreatedEvent.java
 
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/event/ExecuteProcessCreatedEvent.java
deleted file mode 100644
index ab74d89..0000000
--- 
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/event/ExecuteProcessCreatedEvent.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.infra.executor.sql.process.event;
-
-import lombok.Getter;
-import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutionUnit;
-import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
-import 
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
-import 
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessStatus;
-import 
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessUnit;
-
-import java.util.Collection;
-import java.util.LinkedList;
-
-/**
- * Execution process created event.
- */
-@Getter
-public final class ExecuteProcessCreatedEvent {
-    
-    private final String executionID;
-    
-    private final Collection<ExecuteProcessUnit> unitStatuses;
-    
-    public ExecuteProcessCreatedEvent(final 
ExecutionGroupContext<SQLExecutionUnit> executionGroupContext) {
-        this.executionID = executionGroupContext.getExecutionID();
-        unitStatuses = createExecutionUnitStatuses(executionGroupContext);
-    }
-    
-    private Collection<ExecuteProcessUnit> createExecutionUnitStatuses(final 
ExecutionGroupContext<SQLExecutionUnit> executionGroupContext) {
-        Collection<ExecuteProcessUnit> result = new LinkedList<>();
-        for (ExecutionGroup<SQLExecutionUnit> group : 
executionGroupContext.getInputGroups()) {
-            for (SQLExecutionUnit each : group.getInputs()) {
-                result.add(new 
ExecuteProcessUnit(String.valueOf(each.getExecutionUnit().hashCode()), 
ExecuteProcessStatus.DOING));
-            }
-        }
-        return result;
-    }
-}
diff --git 
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/event/ExecuteProcessUnitCreatedEvent.java
 
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/event/ExecuteProcessUnitCreatedEvent.java
deleted file mode 100644
index df3d22c..0000000
--- 
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/event/ExecuteProcessUnitCreatedEvent.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.infra.executor.sql.process.event;
-
-import lombok.Getter;
-import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutionUnit;
-import 
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessStatus;
-import 
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessUnit;
-
-/**
- * Execute process unit created event.
- */
-@Getter
-public final class ExecuteProcessUnitCreatedEvent {
-    
-    private final String executionID;
-    
-    private final ExecuteProcessUnit unitStatuses;
-    
-    public ExecuteProcessUnitCreatedEvent(final String executionID, final 
SQLExecutionUnit executionUnit, final ExecuteProcessStatus status) {
-        this.executionID = executionID;
-        this.unitStatuses = new 
ExecuteProcessUnit(String.valueOf(executionUnit.getExecutionUnit().hashCode()), 
status);
-    }
-}
diff --git 
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/ExecuteProcessStatus.java
 
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/ExecuteProcessConstants.java
similarity index 87%
rename from 
shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/ExecuteProcessStatus.java
rename to 
shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/ExecuteProcessConstants.java
index 466aecc..e551380 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/ExecuteProcessStatus.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/ExecuteProcessConstants.java
@@ -18,9 +18,9 @@
 package org.apache.shardingsphere.infra.executor.sql.process.model;
 
 /**
- * Execute process status.
+ * Execute process constants.
  */
-public enum ExecuteProcessStatus {
+public enum ExecuteProcessConstants {
     
-    DOING, DONE
+    EXECUTE_ID, EXECUTE_STATUS_START, EXECUTE_STATUS_DONE
 }
diff --git 
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/ExecuteProcessUnit.java
 
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/ExecuteProcessUnit.java
index c769f76..30d2b71 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/ExecuteProcessUnit.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/ExecuteProcessUnit.java
@@ -29,5 +29,5 @@ public final class ExecuteProcessUnit {
     
     private final String unitID;
     
-    private final ExecuteProcessStatus status;
+    private final ExecuteProcessConstants status;
 }
diff --git 
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/spi/ExecuteProcessReporter.java
 
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/spi/ExecuteProcessReporter.java
index 978cf1f..4d7169b 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/spi/ExecuteProcessReporter.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/spi/ExecuteProcessReporter.java
@@ -20,7 +20,7 @@ package 
org.apache.shardingsphere.infra.executor.sql.process.spi;
 import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
 import 
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutionUnit;
-import 
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessStatus;
+import 
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessConstants;
 
 /**
  * Execute process report.
@@ -31,15 +31,15 @@ public interface ExecuteProcessReporter {
      * Report the summary of this task.
      * @param context context
      * @param executionGroupContext execution group context
-     * @param status status
+     * @param constants constants
      */
-    void report(SQLStatementContext<?> context, ExecutionGroupContext<? 
extends SQLExecutionUnit> executionGroupContext, ExecuteProcessStatus status);
+    void report(SQLStatementContext<?> context, ExecutionGroupContext<? 
extends SQLExecutionUnit> executionGroupContext, ExecuteProcessConstants 
constants);
     
     /**
      * Report a unit of this task.
      * @param executionID execution ID
      * @param executionUnit execution unit
-     * @param status status
+     * @param constants constants
      */
-    void report(String executionID, SQLExecutionUnit executionUnit, 
ExecuteProcessStatus status);
+    void report(String executionID, SQLExecutionUnit executionUnit, 
ExecuteProcessConstants constants);
 }
diff --git 
a/shardingsphere-infra/shardingsphere-infra-executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/jdbc/JDBCExecutorCallbackTest.java
 
b/shardingsphere-infra/shardingsphere-infra-executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/jdbc/JDBCExecutorCallbackTest.java
index 48e2369..b1ad8d1 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/jdbc/JDBCExecutorCallbackTest.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/jdbc/JDBCExecutorCallbackTest.java
@@ -90,9 +90,9 @@ public final class JDBCExecutorCallbackTest {
         field.setAccessible(true);
         Map<String, DataSourceMetaData> cachedDataSourceMetaData = 
(Map<String, DataSourceMetaData>) field.get(jdbcExecutorCallback);
         assertThat(cachedDataSourceMetaData.size(), is(0));
-        jdbcExecutorCallback.execute(units, true, null);
+        jdbcExecutorCallback.execute(units, true, Collections.emptyMap());
         assertThat(cachedDataSourceMetaData.size(), is(1));
-        jdbcExecutorCallback.execute(units, true, null);
+        jdbcExecutorCallback.execute(units, true, Collections.emptyMap());
         assertThat(cachedDataSourceMetaData.size(), is(1));
     }
 }

Reply via email to