This is an automated email from the ASF dual-hosted git repository.

sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new a0500704570 Remove heldByConnection flag and call connect and 
disconnect in ShardingSphereConnection (#30067)
a0500704570 is described below

commit a0500704570c764ba31581962ec7913cf959ae13
Author: Zhengqiang Duan <[email protected]>
AuthorDate: Thu Feb 8 12:35:26 2024 +0800

    Remove heldByConnection flag and call connect and disconnect in 
ShardingSphereConnection (#30067)
    
    * Remove heldByConnection flag and call connect and disconnect in 
ShardingSphereConnection
    
    * Remove heldByConnection flag
---
 .../kernel/model/ExecutionGroupReportContext.java  | 11 ----------
 .../infra/executor/sql/process/Process.java        | 13 +++++-------
 .../infra/executor/sql/process/ProcessEngine.java  | 24 +++++++++-------------
 .../executor/sql/process/yaml/YamlProcess.java     |  2 --
 .../process/yaml/swapper/YamlProcessSwapper.java   |  4 +---
 .../yaml/swapper/YamlProcessListSwapperTest.java   | 10 ++++-----
 .../yaml/swapper/YamlProcessSwapperTest.java       | 10 ++++-----
 .../batch/BatchPreparedStatementExecutor.java      |  4 +++-
 .../core/connection/ShardingSphereConnection.java  | 11 +++++++++-
 .../statement/ShardingSpherePreparedStatement.java | 11 ++++++----
 .../core/statement/ShardingSphereStatement.java    |  9 +++++---
 .../batch/BatchPreparedStatementExecutorTest.java  |  6 +++++-
 .../enumerable/EnumerableScanExecutor.java         |  9 +++++---
 .../ProcessListChangedSubscriberTest.java          |  1 -
 .../executor/ShowProcessListExecutorTest.java      |  2 +-
 15 files changed, 64 insertions(+), 63 deletions(-)

diff --git 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/model/ExecutionGroupReportContext.java
 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/model/ExecutionGroupReportContext.java
index 7f68da7a739..9f85bcc9990 100644
--- 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/model/ExecutionGroupReportContext.java
+++ 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/model/ExecutionGroupReportContext.java
@@ -21,9 +21,6 @@ import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.infra.metadata.user.Grantee;
 
-import java.util.UUID;
-import java.util.concurrent.ThreadLocalRandom;
-
 /**
  * Execution group report context.
  */
@@ -37,12 +34,4 @@ public final class ExecutionGroupReportContext {
     private final String databaseName;
     
     private final Grantee grantee;
-    
-    public ExecutionGroupReportContext(final String databaseName) {
-        this(databaseName, new Grantee("", ""));
-    }
-    
-    public ExecutionGroupReportContext(final String databaseName, final 
Grantee grantee) {
-        this(new UUID(ThreadLocalRandom.current().nextLong(), 
ThreadLocalRandom.current().nextLong()).toString().replace("-", ""), 
databaseName, grantee);
-    }
 }
diff --git 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/Process.java
 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/Process.java
index 13cdc11ac93..517cbb19186 100644
--- 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/Process.java
+++ 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/Process.java
@@ -61,19 +61,17 @@ public final class Process {
     
     private final boolean idle;
     
-    private final boolean heldByConnection;
-    
     private final AtomicBoolean interrupted;
     
-    public Process(final ExecutionGroupContext<? extends SQLExecutionUnit> 
executionGroupContext, final boolean heldByConnection) {
-        this("", executionGroupContext, true, heldByConnection);
+    public Process(final ExecutionGroupContext<? extends SQLExecutionUnit> 
executionGroupContext) {
+        this("", executionGroupContext, true);
     }
     
-    public Process(final String sql, final ExecutionGroupContext<? extends 
SQLExecutionUnit> executionGroupContext, final boolean heldByConnection) {
-        this(sql, executionGroupContext, false, heldByConnection);
+    public Process(final String sql, final ExecutionGroupContext<? extends 
SQLExecutionUnit> executionGroupContext) {
+        this(sql, executionGroupContext, false);
     }
     
-    private Process(final String sql, final ExecutionGroupContext<? extends 
SQLExecutionUnit> executionGroupContext, final boolean idle, final boolean 
heldByConnection) {
+    private Process(final String sql, final ExecutionGroupContext<? extends 
SQLExecutionUnit> executionGroupContext, final boolean idle) {
         id = executionGroupContext.getReportContext().getProcessId();
         startMillis = System.currentTimeMillis();
         this.sql = sql;
@@ -85,7 +83,6 @@ public final class Process {
         
processStatements.putAll(createProcessStatements(executionGroupContext));
         completedUnitCount = new AtomicInteger(0);
         this.idle = idle;
-        this.heldByConnection = heldByConnection;
         interrupted = new AtomicBoolean();
     }
     
diff --git 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java
 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java
index d7b501fa7e9..a5d3e3ee48a 100644
--- 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java
+++ 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java
@@ -28,6 +28,8 @@ import 
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.DMLStatemen
 import 
org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.MySQLStatement;
 
 import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
 
 /**
  * Process engine.
@@ -42,8 +44,11 @@ public final class ProcessEngine {
      * @return process ID
      */
     public String connect(final Grantee grantee, final String databaseName) {
-        ExecutionGroupContext<? extends SQLExecutionUnit> 
executionGroupContext = new ExecutionGroupContext<>(Collections.emptyList(), 
new ExecutionGroupReportContext(databaseName, grantee));
-        Process process = new Process(executionGroupContext, true);
+        String processId = new UUID(ThreadLocalRandom.current().nextLong(), 
ThreadLocalRandom.current().nextLong()).toString().replace("-", "");
+        ProcessIdContext.set(processId);
+        ExecutionGroupContext<? extends SQLExecutionUnit> 
executionGroupContext =
+                new ExecutionGroupContext<>(Collections.emptyList(), new 
ExecutionGroupReportContext(processId, databaseName, grantee));
+        Process process = new Process(executionGroupContext);
         ProcessRegistry.getInstance().add(process);
         return executionGroupContext.getReportContext().getProcessId();
     }
@@ -55,7 +60,7 @@ public final class ProcessEngine {
      */
     public void disconnect(final String processId) {
         ProcessRegistry.getInstance().remove(processId);
-        
+        ProcessIdContext.remove();
     }
     
     /**
@@ -66,11 +71,7 @@ public final class ProcessEngine {
      */
     public void executeSQL(final ExecutionGroupContext<? extends 
SQLExecutionUnit> executionGroupContext, final QueryContext queryContext) {
         if 
(isMySQLDDLOrDMLStatement(queryContext.getSqlStatementContext().getSqlStatement()))
 {
-            String processId = 
executionGroupContext.getReportContext().getProcessId();
-            // TODO remove heldByConnection when jdbc connection support 
generate processId and call connect and disconnect
-            boolean heldByConnection = null != 
ProcessRegistry.getInstance().get(processId) && 
ProcessRegistry.getInstance().get(processId).isHeldByConnection();
-            ProcessIdContext.set(processId);
-            ProcessRegistry.getInstance().add(new 
Process(queryContext.getSql(), executionGroupContext, heldByConnection));
+            ProcessRegistry.getInstance().add(new 
Process(queryContext.getSql(), executionGroupContext));
         }
     }
     
@@ -97,12 +98,7 @@ public final class ProcessEngine {
         }
         ExecutionGroupContext<? extends SQLExecutionUnit> 
executionGroupContext = new ExecutionGroupContext<>(
                 Collections.emptyList(), new 
ExecutionGroupReportContext(ProcessIdContext.get(), process.getDatabaseName(), 
new Grantee(process.getUsername(), process.getHostname())));
-        if (process.isHeldByConnection()) {
-            ProcessRegistry.getInstance().add(new 
Process(executionGroupContext, true));
-        } else {
-            ProcessRegistry.getInstance().remove(ProcessIdContext.get());
-        }
-        ProcessIdContext.remove();
+        ProcessRegistry.getInstance().add(new Process(executionGroupContext));
     }
     
     private boolean isMySQLDDLOrDMLStatement(final SQLStatement sqlStatement) {
diff --git 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/YamlProcess.java
 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/YamlProcess.java
index 2c33485f12a..39aa9b0d09f 100644
--- 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/YamlProcess.java
+++ 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/YamlProcess.java
@@ -46,7 +46,5 @@ public final class YamlProcess implements YamlConfiguration {
     
     private boolean idle;
     
-    private boolean heldByConnection;
-    
     private boolean interrupted;
 }
diff --git 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessSwapper.java
 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessSwapper.java
index 2606086904c..9cfa079314d 100644
--- 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessSwapper.java
+++ 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessSwapper.java
@@ -41,7 +41,6 @@ public final class YamlProcessSwapper implements 
YamlConfigurationSwapper<YamlPr
         result.setTotalUnitCount(data.getTotalUnitCount());
         result.setCompletedUnitCount(data.getCompletedUnitCount());
         result.setIdle(data.isIdle());
-        result.setHeldByConnection(data.isHeldByConnection());
         result.setInterrupted(data.getInterrupted());
         return result;
     }
@@ -49,7 +48,6 @@ public final class YamlProcessSwapper implements 
YamlConfigurationSwapper<YamlPr
     @Override
     public Process swapToObject(final YamlProcess yamlConfig) {
         return new Process(yamlConfig.getId(), yamlConfig.getStartMillis(), 
yamlConfig.getSql(), yamlConfig.getDatabaseName(), yamlConfig.getUsername(), 
yamlConfig.getHostname(),
-                yamlConfig.getTotalUnitCount(), new 
AtomicInteger(yamlConfig.getCompletedUnitCount()), yamlConfig.isIdle(), 
yamlConfig.isHeldByConnection(),
-                new AtomicBoolean(yamlConfig.isInterrupted()));
+                yamlConfig.getTotalUnitCount(), new 
AtomicInteger(yamlConfig.getCompletedUnitCount()), yamlConfig.isIdle(), new 
AtomicBoolean(yamlConfig.isInterrupted()));
     }
 }
diff --git 
a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessListSwapperTest.java
 
b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessListSwapperTest.java
index ab6a9f0935e..7144ef090d7 100644
--- 
a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessListSwapperTest.java
+++ 
b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessListSwapperTest.java
@@ -28,6 +28,8 @@ import org.junit.jupiter.api.Test;
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -40,9 +42,10 @@ class YamlProcessListSwapperTest {
     
     @Test
     void assertSwapToYamlConfiguration() {
-        ExecutionGroupReportContext reportContext = new 
ExecutionGroupReportContext("foo_db", new Grantee("root", "localhost"));
+        String processId = new UUID(ThreadLocalRandom.current().nextLong(), 
ThreadLocalRandom.current().nextLong()).toString().replace("-", "");
+        ExecutionGroupReportContext reportContext = new 
ExecutionGroupReportContext(processId, "foo_db", new Grantee("root", 
"localhost"));
         ExecutionGroupContext<? extends SQLExecutionUnit> 
executionGroupContext = new ExecutionGroupContext<>(Collections.emptyList(), 
reportContext);
-        Process process = new Process("SELECT 1", executionGroupContext, 
false);
+        Process process = new Process("SELECT 1", executionGroupContext);
         YamlProcessList actual = new 
YamlProcessListSwapper().swapToYamlConfiguration(Collections.singleton(process));
         assertThat(actual.getProcesses().size(), is(1));
         assertYamlProcessContext(actual.getProcesses().iterator().next());
@@ -57,7 +60,6 @@ class YamlProcessListSwapperTest {
         assertThat(actual.getHostname(), is("localhost"));
         assertThat(actual.getCompletedUnitCount(), is(0));
         assertThat(actual.getTotalUnitCount(), is(0));
-        assertThat(actual.isHeldByConnection(), is(false));
         assertFalse(actual.isIdle());
     }
     
@@ -81,7 +83,6 @@ class YamlProcessListSwapperTest {
         result.setTotalUnitCount(10);
         result.setCompletedUnitCount(5);
         result.setIdle(true);
-        result.setHeldByConnection(true);
         return result;
     }
     
@@ -94,7 +95,6 @@ class YamlProcessListSwapperTest {
         assertThat(actual.getHostname(), is("localhost"));
         assertThat(actual.getTotalUnitCount(), is(10));
         assertThat(actual.getCompletedUnitCount(), is(5));
-        assertThat(actual.isHeldByConnection(), is(true));
         assertTrue(actual.isIdle());
     }
 }
diff --git 
a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessSwapperTest.java
 
b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessSwapperTest.java
index 2debede78b8..561b13b4093 100644
--- 
a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessSwapperTest.java
+++ 
b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessSwapperTest.java
@@ -26,6 +26,8 @@ import org.apache.shardingsphere.infra.metadata.user.Grantee;
 import org.junit.jupiter.api.Test;
 
 import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -38,9 +40,10 @@ class YamlProcessSwapperTest {
     
     @Test
     void assertSwapToYamlConfiguration() {
-        ExecutionGroupReportContext reportContext = new 
ExecutionGroupReportContext("foo_db", new Grantee("root", "localhost"));
+        String processId = new UUID(ThreadLocalRandom.current().nextLong(), 
ThreadLocalRandom.current().nextLong()).toString().replace("-", "");
+        ExecutionGroupReportContext reportContext = new 
ExecutionGroupReportContext(processId, "foo_db", new Grantee("root", 
"localhost"));
         ExecutionGroupContext<? extends SQLExecutionUnit> 
executionGroupContext = new ExecutionGroupContext<>(Collections.emptyList(), 
reportContext);
-        Process process = new Process("SELECT 1", executionGroupContext, true);
+        Process process = new Process("SELECT 1", executionGroupContext);
         YamlProcess actual = new 
YamlProcessSwapper().swapToYamlConfiguration(process);
         assertNotNull(actual.getId());
         assertThat(actual.getStartMillis(), 
lessThanOrEqualTo(System.currentTimeMillis()));
@@ -50,7 +53,6 @@ class YamlProcessSwapperTest {
         assertThat(actual.getHostname(), is("localhost"));
         assertThat(actual.getCompletedUnitCount(), is(0));
         assertThat(actual.getTotalUnitCount(), is(0));
-        assertThat(actual.isHeldByConnection(), is(true));
         assertFalse(actual.isIdle());
     }
     
@@ -65,7 +67,6 @@ class YamlProcessSwapperTest {
         assertThat(actual.getHostname(), is("localhost"));
         assertThat(actual.getTotalUnitCount(), is(10));
         assertThat(actual.getCompletedUnitCount(), is(5));
-        assertThat(actual.isHeldByConnection(), is(false));
         assertTrue(actual.isIdle());
     }
     
@@ -80,7 +81,6 @@ class YamlProcessSwapperTest {
         result.setTotalUnitCount(10);
         result.setCompletedUnitCount(5);
         result.setIdle(true);
-        result.setHeldByConnection(false);
         return result;
     }
 }
diff --git 
a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/batch/BatchPreparedStatementExecutor.java
 
b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/batch/BatchPreparedStatementExecutor.java
index 47fdc92f4bc..9173d58b68c 100644
--- 
a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/batch/BatchPreparedStatementExecutor.java
+++ 
b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/batch/BatchPreparedStatementExecutor.java
@@ -29,6 +29,8 @@ import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorEx
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
+import org.apache.shardingsphere.infra.executor.sql.process.ProcessIdContext;
+import org.apache.shardingsphere.infra.metadata.user.Grantee;
 import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
 import 
org.apache.shardingsphere.infra.rule.identifier.type.DataNodeContainedRule;
 import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
@@ -67,7 +69,7 @@ public final class BatchPreparedStatementExecutor {
         this.databaseName = databaseName;
         this.metaDataContexts = metaDataContexts;
         this.jdbcExecutor = jdbcExecutor;
-        executionGroupContext = new ExecutionGroupContext<>(new 
LinkedList<>(), new ExecutionGroupReportContext(databaseName));
+        executionGroupContext = new ExecutionGroupContext<>(new 
LinkedList<>(), new ExecutionGroupReportContext(ProcessIdContext.get(), 
databaseName, new Grantee("", "")));
         batchExecutionUnits = new LinkedList<>();
     }
     
diff --git 
a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
 
b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
index 764c9dcf43d..532a3743029 100644
--- 
a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
+++ 
b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
@@ -23,8 +23,10 @@ import 
org.apache.shardingsphere.driver.jdbc.core.datasource.metadata.ShardingSp
 import 
org.apache.shardingsphere.driver.jdbc.core.statement.ShardingSpherePreparedStatement;
 import 
org.apache.shardingsphere.driver.jdbc.core.statement.ShardingSphereStatement;
 import 
org.apache.shardingsphere.driver.jdbc.exception.connection.ConnectionClosedException;
-import org.apache.shardingsphere.infra.session.connection.ConnectionContext;
 import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
+import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
+import org.apache.shardingsphere.infra.metadata.user.Grantee;
+import org.apache.shardingsphere.infra.session.connection.ConnectionContext;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 import org.apache.shardingsphere.transaction.api.TransactionType;
 
@@ -42,6 +44,8 @@ import java.sql.Statement;
  */
 public final class ShardingSphereConnection extends AbstractConnectionAdapter {
     
+    private final ProcessEngine processEngine = new ProcessEngine();
+    
     @Getter
     private final String databaseName;
     
@@ -51,6 +55,9 @@ public final class ShardingSphereConnection extends 
AbstractConnectionAdapter {
     @Getter
     private final DriverDatabaseConnectionManager databaseConnectionManager;
     
+    @Getter
+    private final String processId;
+    
     private boolean autoCommit = true;
     
     private int transactionIsolation = TRANSACTION_READ_UNCOMMITTED;
@@ -63,6 +70,7 @@ public final class ShardingSphereConnection extends 
AbstractConnectionAdapter {
         this.databaseName = databaseName;
         this.contextManager = contextManager;
         databaseConnectionManager = new 
DriverDatabaseConnectionManager(databaseName, contextManager);
+        processId = processEngine.connect(new Grantee("", ""), databaseName);
     }
     
     /**
@@ -301,6 +309,7 @@ public final class ShardingSphereConnection extends 
AbstractConnectionAdapter {
     public void close() throws SQLException {
         closed = true;
         databaseConnectionManager.close();
+        processEngine.disconnect(processId);
     }
     
     private ConnectionContext getConnectionContext() {
diff --git 
a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
 
b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
index 39a6a36f411..a2075b54b28 100644
--- 
a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
+++ 
b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
@@ -64,6 +64,7 @@ import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecuti
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
+import org.apache.shardingsphere.infra.executor.sql.process.ProcessIdContext;
 import org.apache.shardingsphere.infra.hint.HintManager;
 import org.apache.shardingsphere.infra.hint.HintValueContext;
 import org.apache.shardingsphere.infra.hint.SQLHintUtils;
@@ -72,6 +73,7 @@ import org.apache.shardingsphere.infra.merge.MergeEngine;
 import org.apache.shardingsphere.infra.merge.result.MergedResult;
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
+import org.apache.shardingsphere.infra.metadata.user.Grantee;
 import org.apache.shardingsphere.infra.parser.SQLParserEngine;
 import org.apache.shardingsphere.infra.route.context.RouteContext;
 import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
@@ -279,7 +281,7 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
         DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> 
prepareEngine = createDriverExecutionPrepareEngine();
         ExecutionUnit executionUnit = new ExecutionUnit(trafficInstanceId, new 
SQLUnit(queryContext.getSql(), queryContext.getParameters()));
         ExecutionGroupContext<JDBCExecutionUnit> context =
-                prepareEngine.prepare(new RouteContext(), 
Collections.singleton(executionUnit), new 
ExecutionGroupReportContext(databaseName));
+                prepareEngine.prepare(new RouteContext(), 
Collections.singleton(executionUnit), new 
ExecutionGroupReportContext(ProcessIdContext.get(), databaseName, new 
Grantee("", "")));
         if (context.getInputGroups().isEmpty() || 
context.getInputGroups().iterator().next().getInputs().isEmpty()) {
             throw new EmptyTrafficExecutionUnitException();
         }
@@ -451,7 +453,7 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
     private ExecutionGroupContext<RawSQLExecutionUnit> 
createRawExecutionGroupContext(final ExecutionContext executionContext) throws 
SQLException {
         int maxConnectionsSizePerQuery = 
metaDataContexts.getMetaData().getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
         return new RawExecutionPrepareEngine(maxConnectionsSizePerQuery, 
metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getRules())
-                .prepare(executionContext.getRouteContext(), 
executionContext.getExecutionUnits(), new 
ExecutionGroupReportContext(databaseName));
+                .prepare(executionContext.getRouteContext(), 
executionContext.getExecutionUnits(), new 
ExecutionGroupReportContext(ProcessIdContext.get(), databaseName, new 
Grantee("", "")));
     }
     
     private boolean executeWithExecutionContext(final ExecutionContext 
executionContext) throws SQLException {
@@ -526,7 +528,8 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
     
     private ExecutionGroupContext<JDBCExecutionUnit> 
createExecutionGroupContext(final ExecutionContext executionContext) throws 
SQLException {
         DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> 
prepareEngine = createDriverExecutionPrepareEngine();
-        return prepareEngine.prepare(executionContext.getRouteContext(), 
executionContext.getExecutionUnits(), new 
ExecutionGroupReportContext(databaseName));
+        return prepareEngine.prepare(executionContext.getRouteContext(), 
executionContext.getExecutionUnits(),
+                new ExecutionGroupReportContext(ProcessIdContext.get(), 
databaseName, new Grantee("", "")));
     }
     
     @Override
@@ -718,7 +721,7 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
             ExecutionUnit executionUnit = each.getExecutionUnit();
             executionUnits.add(executionUnit);
         }
-        
batchExecutor.init(prepareEngine.prepare(executionContext.getRouteContext(), 
executionUnits, new ExecutionGroupReportContext(databaseName)));
+        
batchExecutor.init(prepareEngine.prepare(executionContext.getRouteContext(), 
executionUnits, new ExecutionGroupReportContext(ProcessIdContext.get(), 
databaseName, new Grantee("", ""))));
         setBatchParametersForStatements(batchExecutor);
     }
     
diff --git 
a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
 
b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
index 99fa09d1848..e8daac199cd 100644
--- 
a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
+++ 
b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
@@ -61,6 +61,7 @@ import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecuti
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
+import org.apache.shardingsphere.infra.executor.sql.process.ProcessIdContext;
 import org.apache.shardingsphere.infra.hint.HintValueContext;
 import org.apache.shardingsphere.infra.hint.SQLHintUtils;
 import org.apache.shardingsphere.infra.instance.InstanceContext;
@@ -68,6 +69,7 @@ import org.apache.shardingsphere.infra.merge.MergeEngine;
 import org.apache.shardingsphere.infra.merge.result.MergedResult;
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
+import org.apache.shardingsphere.infra.metadata.user.Grantee;
 import org.apache.shardingsphere.infra.route.context.RouteContext;
 import 
org.apache.shardingsphere.infra.rule.identifier.type.DataNodeContainedRule;
 import org.apache.shardingsphere.infra.rule.identifier.type.RawExecutionRule;
@@ -476,7 +478,7 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
         DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> 
prepareEngine = createDriverExecutionPrepareEngine();
         ExecutionUnit executionUnit = new ExecutionUnit(trafficInstanceId, new 
SQLUnit(queryContext.getSql(), queryContext.getParameters()));
         ExecutionGroupContext<JDBCExecutionUnit> context =
-                prepareEngine.prepare(new RouteContext(), 
Collections.singletonList(executionUnit), new 
ExecutionGroupReportContext(databaseName));
+                prepareEngine.prepare(new RouteContext(), 
Collections.singletonList(executionUnit), new 
ExecutionGroupReportContext(ProcessIdContext.get(), databaseName, new 
Grantee("", "")));
         return context.getInputGroups().stream().flatMap(each -> 
each.getInputs().stream()).findFirst().orElseThrow(EmptyTrafficExecutionUnitException::new);
     }
     
@@ -522,13 +524,14 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
     
     private ExecutionGroupContext<JDBCExecutionUnit> 
createExecutionGroupContext(final ExecutionContext executionContext) throws 
SQLException {
         DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> 
prepareEngine = createDriverExecutionPrepareEngine();
-        return prepareEngine.prepare(executionContext.getRouteContext(), 
executionContext.getExecutionUnits(), new 
ExecutionGroupReportContext(databaseName));
+        return prepareEngine.prepare(executionContext.getRouteContext(), 
executionContext.getExecutionUnits(),
+                new ExecutionGroupReportContext(ProcessIdContext.get(), 
databaseName, new Grantee("", "")));
     }
     
     private ExecutionGroupContext<RawSQLExecutionUnit> 
createRawExecutionContext(final ExecutionContext executionContext) throws 
SQLException {
         int maxConnectionsSizePerQuery = 
metaDataContexts.getMetaData().getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
         return new RawExecutionPrepareEngine(maxConnectionsSizePerQuery, 
metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getRules())
-                .prepare(executionContext.getRouteContext(), 
executionContext.getExecutionUnits(), new 
ExecutionGroupReportContext(databaseName));
+                .prepare(executionContext.getRouteContext(), 
executionContext.getExecutionUnits(), new 
ExecutionGroupReportContext(ProcessIdContext.get(), databaseName, new 
Grantee("", "")));
     }
     
     private boolean executeWithExecutionContext(final ExecuteCallback 
executeCallback, final ExecutionContext executionContext) throws SQLException {
diff --git 
a/jdbc/core/src/test/java/org/apache/shardingsphere/driver/executor/batch/BatchPreparedStatementExecutorTest.java
 
b/jdbc/core/src/test/java/org/apache/shardingsphere/driver/executor/batch/BatchPreparedStatementExecutorTest.java
index 5a8e8716a30..6c8cb0acb12 100644
--- 
a/jdbc/core/src/test/java/org/apache/shardingsphere/driver/executor/batch/BatchPreparedStatementExecutorTest.java
+++ 
b/jdbc/core/src/test/java/org/apache/shardingsphere/driver/executor/batch/BatchPreparedStatementExecutorTest.java
@@ -32,6 +32,7 @@ import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorEx
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
 import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
+import org.apache.shardingsphere.infra.metadata.user.Grantee;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
 import org.apache.shardingsphere.sharding.rule.ShardingRule;
@@ -58,6 +59,8 @@ import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -193,8 +196,9 @@ class BatchPreparedStatementExecutorTest {
     
     @SneakyThrows(ReflectiveOperationException.class)
     private void setFields(final Collection<ExecutionGroup<JDBCExecutionUnit>> 
executionGroups, final Collection<BatchExecutionUnit> batchExecutionUnits) {
+        String processId = new UUID(ThreadLocalRandom.current().nextLong(), 
ThreadLocalRandom.current().nextLong()).toString().replace("-", "");
         
Plugins.getMemberAccessor().set(BatchPreparedStatementExecutor.class.getDeclaredField("executionGroupContext"),
 executor, new ExecutionGroupContext<>(executionGroups,
-                new ExecutionGroupReportContext("logic_db")));
+                new ExecutionGroupReportContext(processId, "logic_db", new 
Grantee("", ""))));
         
Plugins.getMemberAccessor().set(BatchPreparedStatementExecutor.class.getDeclaredField("batchExecutionUnits"),
 executor, batchExecutionUnits);
         
Plugins.getMemberAccessor().set(BatchPreparedStatementExecutor.class.getDeclaredField("batchCount"),
 executor, 2);
     }
diff --git 
a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/EnumerableScanExecutor.java
 
b/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/EnumerableScanExecutor.java
index 0b494ea7f34..6bb47cac627 100644
--- 
a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/EnumerableScanExecutor.java
+++ 
b/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/EnumerableScanExecutor.java
@@ -41,6 +41,7 @@ import 
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult
 import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
 import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
+import org.apache.shardingsphere.infra.executor.sql.process.ProcessIdContext;
 import org.apache.shardingsphere.infra.hint.HintValueContext;
 import org.apache.shardingsphere.infra.merge.MergeEngine;
 import org.apache.shardingsphere.infra.merge.result.MergedResult;
@@ -53,6 +54,7 @@ import 
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereRowData
 import 
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereSchemaData;
 import 
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereStatistics;
 import 
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereTableData;
+import org.apache.shardingsphere.infra.metadata.user.Grantee;
 import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUser;
 import org.apache.shardingsphere.infra.parser.sql.SQLStatementParserEngine;
 import org.apache.shardingsphere.infra.session.connection.ConnectionContext;
@@ -60,10 +62,10 @@ import 
org.apache.shardingsphere.infra.session.query.QueryContext;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
 import 
org.apache.shardingsphere.sqlfederation.executor.SQLFederationExecutorContext;
 import 
org.apache.shardingsphere.sqlfederation.executor.TableScanExecutorContext;
-import 
org.apache.shardingsphere.sqlfederation.optimizer.metadata.schema.table.EmptyRowEnumerator;
 import org.apache.shardingsphere.sqlfederation.executor.row.MemoryEnumerator;
 import 
org.apache.shardingsphere.sqlfederation.executor.row.SQLFederationRowEnumerator;
 import 
org.apache.shardingsphere.sqlfederation.optimizer.context.OptimizerContext;
+import 
org.apache.shardingsphere.sqlfederation.optimizer.metadata.schema.table.EmptyRowEnumerator;
 import 
org.apache.shardingsphere.sqlfederation.optimizer.metadata.schema.table.ScanExecutor;
 import 
org.apache.shardingsphere.sqlfederation.optimizer.metadata.schema.table.ScanExecutorContext;
 
@@ -151,8 +153,9 @@ public final class EnumerableScanExecutor implements 
ScanExecutor {
             @Override
             public Enumerator<Object> enumerator() {
                 computeConnectionOffsets(context);
-                ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext 
=
-                        prepareEngine.prepare(context.getRouteContext(), 
executorContext.getConnectionOffsets(), context.getExecutionUnits(), new 
ExecutionGroupReportContext(database.getName()));
+                // TODO pass grantee from proxy and jdbc adapter
+                ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext 
= prepareEngine.prepare(context.getRouteContext(), 
executorContext.getConnectionOffsets(), context.getExecutionUnits(),
+                        new 
ExecutionGroupReportContext(ProcessIdContext.get(), database.getName(), new 
Grantee("", "")));
                 setParameters(executionGroupContext.getInputGroups());
                 processEngine.executeSQL(executionGroupContext, 
context.getQueryContext());
                 List<QueryResult> queryResults = 
jdbcExecutor.execute(executionGroupContext, 
callback).stream().map(QueryResult.class::cast).collect(Collectors.toList());
diff --git 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
index 74ea27249a3..6f38546e90e 100644
--- 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
+++ 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
@@ -118,7 +118,6 @@ class ProcessListChangedSubscriberTest {
         ClusterPersistRepository repository = registryCenter.getRepository();
         verify(repository).persist("/execution_nodes/foo_id/" + instanceId,
                 "processes:" + System.lineSeparator() + "- completedUnitCount: 
0" + System.lineSeparator()
-                        + "  heldByConnection: false" + System.lineSeparator()
                         + "  id: foo_id" + System.lineSeparator()
                         + "  idle: false" + System.lineSeparator()
                         + "  interrupted: false" + System.lineSeparator()
diff --git 
a/proxy/backend/type/mysql/src/test/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutorTest.java
 
b/proxy/backend/type/mysql/src/test/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutorTest.java
index 04d8c60069c..a186876c806 100644
--- 
a/proxy/backend/type/mysql/src/test/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutorTest.java
+++ 
b/proxy/backend/type/mysql/src/test/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutorTest.java
@@ -67,7 +67,7 @@ class ShowProcessListExecutorTest {
     
     private void setupProcesses(final ShowProcessListExecutor 
showProcessListExecutor) throws ReflectiveOperationException {
         Process process = new Process("f6c2336a-63ba-41bf-941e-2e3504eb2c80", 
1617939785160L,
-                "ALTER TABLE t_order ADD COLUMN a varchar(64) AFTER order_id", 
"foo_db", "root", "127.0.0.1", 2, new AtomicInteger(1), false, false, new 
AtomicBoolean());
+                "ALTER TABLE t_order ADD COLUMN a varchar(64) AFTER order_id", 
"foo_db", "root", "127.0.0.1", 2, new AtomicInteger(1), false, new 
AtomicBoolean());
         Plugins.getMemberAccessor().set(
                 
showProcessListExecutor.getClass().getDeclaredField("processes"), 
showProcessListExecutor, Collections.singleton(process));
     }


Reply via email to