This is an automated email from the ASF dual-hosted git repository.
menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 42528f7 ExecuteProcessEngine.finish (#9706)
42528f7 is described below
commit 42528f7fbec7ddfa59543254b6d7779af4ced79a
Author: Juan Pan(Trista) <[email protected]>
AuthorDate: Wed Mar 17 17:43:44 2021 +0800
ExecuteProcessEngine.finish (#9706)
* ExecuteProcessEngine.finish
* fix test
* rename parameters
---
.../process/GovernanceExecuteProcessReporter.java | 6 +--
.../engine/driver/jdbc/JDBCExecutorCallback.java | 10 ++++
.../executor/sql/process/ExecuteProcessEngine.java | 12 +++--
.../process/event/ExecuteProcessCreatedEvent.java | 54 ----------------------
.../event/ExecuteProcessUnitCreatedEvent.java | 39 ----------------
...essStatus.java => ExecuteProcessConstants.java} | 6 +--
.../sql/process/model/ExecuteProcessUnit.java | 2 +-
.../sql/process/spi/ExecuteProcessReporter.java | 10 ++--
.../engine/jdbc/JDBCExecutorCallbackTest.java | 4 +-
9 files changed, 31 insertions(+), 112 deletions(-)
diff --git
a/shardingsphere-governance/shardingsphere-governance-context/src/main/java/org/apache/shardingsphere/governance/context/process/GovernanceExecuteProcessReporter.java
b/shardingsphere-governance/shardingsphere-governance-context/src/main/java/org/apache/shardingsphere/governance/context/process/GovernanceExecuteProcessReporter.java
index acd092e..ce2b5b9 100644
---
a/shardingsphere-governance/shardingsphere-governance-context/src/main/java/org/apache/shardingsphere/governance/context/process/GovernanceExecuteProcessReporter.java
+++
b/shardingsphere-governance/shardingsphere-governance-context/src/main/java/org/apache/shardingsphere/governance/context/process/GovernanceExecuteProcessReporter.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.governance.context.process;
import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutionUnit;
-import
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessStatus;
+import
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessConstants;
import
org.apache.shardingsphere.infra.executor.sql.process.spi.ExecuteProcessReporter;
/**
@@ -29,12 +29,12 @@ import
org.apache.shardingsphere.infra.executor.sql.process.spi.ExecuteProcessRe
public final class GovernanceExecuteProcessReporter implements
ExecuteProcessReporter {
@Override
- public void report(final SQLStatementContext<?> context, final
ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext, final
ExecuteProcessStatus status) {
+ public void report(final SQLStatementContext<?> context, final
ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext, final
ExecuteProcessConstants constants) {
// TODO :Call API of configCenter
}
@Override
- public void report(final String executionID, final SQLExecutionUnit
executionUnit, final ExecuteProcessStatus status) {
+ public void report(final String executionID, final SQLExecutionUnit
executionUnit, final ExecuteProcessConstants constants) {
// TODO :Call API of configCenter
}
}
diff --git
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java
index 4e7de95..8ad00c4 100644
---
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java
+++
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java
@@ -23,9 +23,12 @@ import
org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutorCallback;
import org.apache.shardingsphere.infra.executor.sql.context.SQLUnit;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
+import
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutionUnit;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
import org.apache.shardingsphere.infra.executor.sql.hook.SPISQLExecutionHook;
import org.apache.shardingsphere.infra.executor.sql.hook.SQLExecutionHook;
+import
org.apache.shardingsphere.infra.executor.sql.process.ExecuteProcessEngine;
+import
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessConstants;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import java.sql.DatabaseMetaData;
@@ -81,6 +84,7 @@ public abstract class JDBCExecutorCallback<T> implements
ExecutorCallback<JDBCEx
sqlExecutionHook.start(jdbcExecutionUnit.getExecutionUnit().getDataSourceName(),
sqlUnit.getSql(), sqlUnit.getParameters(), dataSourceMetaData, isTrunkThread,
dataMap);
T result = executeSQL(sqlUnit.getSql(),
jdbcExecutionUnit.getStorageResource(), jdbcExecutionUnit.getConnectionMode());
sqlExecutionHook.finishSuccess();
+ finishReport(dataMap, jdbcExecutionUnit);
return result;
} catch (final SQLException ex) {
if (!isTrunkThread) {
@@ -106,6 +110,12 @@ public abstract class JDBCExecutorCallback<T> implements
ExecutorCallback<JDBCEx
return result;
}
+ private void finishReport(final Map<String, Object> dataMap, final
SQLExecutionUnit executionUnit) {
+ if (dataMap.containsKey(ExecuteProcessConstants.EXECUTE_ID.name())) {
+
ExecuteProcessEngine.finish(dataMap.get(ExecuteProcessConstants.EXECUTE_ID.name()).toString(),
executionUnit);
+ }
+ }
+
protected abstract T executeSQL(String sql, Statement statement,
ConnectionMode connectionMode) throws SQLException;
protected abstract Optional<T> getSaneResult(SQLStatement sqlStatement);
diff --git
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ExecuteProcessEngine.java
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ExecuteProcessEngine.java
index 64c6ace..6aa70c4 100644
---
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ExecuteProcessEngine.java
+++
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ExecuteProcessEngine.java
@@ -21,8 +21,9 @@ import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
+import org.apache.shardingsphere.infra.executor.kernel.model.ExecutorDataMap;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutionUnit;
-import
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessStatus;
+import
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessConstants;
import
org.apache.shardingsphere.infra.executor.sql.process.spi.ExecuteProcessReporter;
import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
@@ -49,19 +50,20 @@ public final class ExecuteProcessEngine {
*/
public static void initialize(final SQLStatementContext<?> context, final
ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext) {
if (!HANDLERS.isEmpty() &&
ExecuteProcessStrategyEvaluator.evaluate(context, executionGroupContext)) {
- HANDLERS.iterator().next().report(context, executionGroupContext,
ExecuteProcessStatus.DOING);
+
ExecutorDataMap.getValue().put(ExecuteProcessConstants.EXECUTE_ID.name(),
executionGroupContext.getExecutionID());
+ HANDLERS.iterator().next().report(context, executionGroupContext,
ExecuteProcessConstants.EXECUTE_STATUS_START);
}
}
/**
- * Complete.
+ * Finish.
*
* @param executionID execution ID
* @param executionUnit execution unit
*/
- public static void complete(final String executionID, final
SQLExecutionUnit executionUnit) {
+ public static void finish(final String executionID, final SQLExecutionUnit
executionUnit) {
if (!HANDLERS.isEmpty()) {
- HANDLERS.iterator().next().report(executionID, executionUnit,
ExecuteProcessStatus.DONE);
+ HANDLERS.iterator().next().report(executionID, executionUnit,
ExecuteProcessConstants.EXECUTE_STATUS_DONE);
}
}
}
diff --git
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/event/ExecuteProcessCreatedEvent.java
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/event/ExecuteProcessCreatedEvent.java
deleted file mode 100644
index ab74d89..0000000
---
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/event/ExecuteProcessCreatedEvent.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.infra.executor.sql.process.event;
-
-import lombok.Getter;
-import
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutionUnit;
-import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
-import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
-import
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessStatus;
-import
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessUnit;
-
-import java.util.Collection;
-import java.util.LinkedList;
-
-/**
- * Execution process created event.
- */
-@Getter
-public final class ExecuteProcessCreatedEvent {
-
- private final String executionID;
-
- private final Collection<ExecuteProcessUnit> unitStatuses;
-
- public ExecuteProcessCreatedEvent(final
ExecutionGroupContext<SQLExecutionUnit> executionGroupContext) {
- this.executionID = executionGroupContext.getExecutionID();
- unitStatuses = createExecutionUnitStatuses(executionGroupContext);
- }
-
- private Collection<ExecuteProcessUnit> createExecutionUnitStatuses(final
ExecutionGroupContext<SQLExecutionUnit> executionGroupContext) {
- Collection<ExecuteProcessUnit> result = new LinkedList<>();
- for (ExecutionGroup<SQLExecutionUnit> group :
executionGroupContext.getInputGroups()) {
- for (SQLExecutionUnit each : group.getInputs()) {
- result.add(new
ExecuteProcessUnit(String.valueOf(each.getExecutionUnit().hashCode()),
ExecuteProcessStatus.DOING));
- }
- }
- return result;
- }
-}
diff --git
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/event/ExecuteProcessUnitCreatedEvent.java
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/event/ExecuteProcessUnitCreatedEvent.java
deleted file mode 100644
index df3d22c..0000000
---
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/event/ExecuteProcessUnitCreatedEvent.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.infra.executor.sql.process.event;
-
-import lombok.Getter;
-import
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutionUnit;
-import
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessStatus;
-import
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessUnit;
-
-/**
- * Execute process unit created event.
- */
-@Getter
-public final class ExecuteProcessUnitCreatedEvent {
-
- private final String executionID;
-
- private final ExecuteProcessUnit unitStatuses;
-
- public ExecuteProcessUnitCreatedEvent(final String executionID, final
SQLExecutionUnit executionUnit, final ExecuteProcessStatus status) {
- this.executionID = executionID;
- this.unitStatuses = new
ExecuteProcessUnit(String.valueOf(executionUnit.getExecutionUnit().hashCode()),
status);
- }
-}
diff --git
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/ExecuteProcessStatus.java
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/ExecuteProcessConstants.java
similarity index 87%
rename from
shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/ExecuteProcessStatus.java
rename to
shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/ExecuteProcessConstants.java
index 466aecc..e551380 100644
---
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/ExecuteProcessStatus.java
+++
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/ExecuteProcessConstants.java
@@ -18,9 +18,9 @@
package org.apache.shardingsphere.infra.executor.sql.process.model;
/**
- * Execute process status.
+ * Execute process constants.
*/
-public enum ExecuteProcessStatus {
+public enum ExecuteProcessConstants {
- DOING, DONE
+ EXECUTE_ID, EXECUTE_STATUS_START, EXECUTE_STATUS_DONE
}
diff --git
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/ExecuteProcessUnit.java
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/ExecuteProcessUnit.java
index c769f76..30d2b71 100644
---
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/ExecuteProcessUnit.java
+++
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/model/ExecuteProcessUnit.java
@@ -29,5 +29,5 @@ public final class ExecuteProcessUnit {
private final String unitID;
- private final ExecuteProcessStatus status;
+ private final ExecuteProcessConstants status;
}
diff --git
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/spi/ExecuteProcessReporter.java
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/spi/ExecuteProcessReporter.java
index 978cf1f..4d7169b 100644
---
a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/spi/ExecuteProcessReporter.java
+++
b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/spi/ExecuteProcessReporter.java
@@ -20,7 +20,7 @@ package
org.apache.shardingsphere.infra.executor.sql.process.spi;
import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutionUnit;
-import
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessStatus;
+import
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessConstants;
/**
* Execute process report.
@@ -31,15 +31,15 @@ public interface ExecuteProcessReporter {
* Report the summary of this task.
* @param context context
* @param executionGroupContext execution group context
- * @param status status
+ * @param constants constants
*/
- void report(SQLStatementContext<?> context, ExecutionGroupContext<?
extends SQLExecutionUnit> executionGroupContext, ExecuteProcessStatus status);
+ void report(SQLStatementContext<?> context, ExecutionGroupContext<?
extends SQLExecutionUnit> executionGroupContext, ExecuteProcessConstants
constants);
/**
* Report a unit of this task.
* @param executionID execution ID
* @param executionUnit execution unit
- * @param status status
+ * @param constants constants
*/
- void report(String executionID, SQLExecutionUnit executionUnit,
ExecuteProcessStatus status);
+ void report(String executionID, SQLExecutionUnit executionUnit,
ExecuteProcessConstants constants);
}
diff --git
a/shardingsphere-infra/shardingsphere-infra-executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/jdbc/JDBCExecutorCallbackTest.java
b/shardingsphere-infra/shardingsphere-infra-executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/jdbc/JDBCExecutorCallbackTest.java
index 48e2369..b1ad8d1 100644
---
a/shardingsphere-infra/shardingsphere-infra-executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/jdbc/JDBCExecutorCallbackTest.java
+++
b/shardingsphere-infra/shardingsphere-infra-executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/jdbc/JDBCExecutorCallbackTest.java
@@ -90,9 +90,9 @@ public final class JDBCExecutorCallbackTest {
field.setAccessible(true);
Map<String, DataSourceMetaData> cachedDataSourceMetaData =
(Map<String, DataSourceMetaData>) field.get(jdbcExecutorCallback);
assertThat(cachedDataSourceMetaData.size(), is(0));
- jdbcExecutorCallback.execute(units, true, null);
+ jdbcExecutorCallback.execute(units, true, Collections.emptyMap());
assertThat(cachedDataSourceMetaData.size(), is(1));
- jdbcExecutorCallback.execute(units, true, null);
+ jdbcExecutorCallback.execute(units, true, Collections.emptyMap());
assertThat(cachedDataSourceMetaData.size(), is(1));
}
}