This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new a0500704570 Remove heldByConnection flag and call connect and
disconnect in ShardingSphereConnection (#30067)
a0500704570 is described below
commit a0500704570c764ba31581962ec7913cf959ae13
Author: Zhengqiang Duan <[email protected]>
AuthorDate: Thu Feb 8 12:35:26 2024 +0800
Remove heldByConnection flag and call connect and disconnect in
ShardingSphereConnection (#30067)
* Remove heldByConnection flag and call connect and disconnect in
ShardingSphereConnection
* Remove heldByConnection flag
---
.../kernel/model/ExecutionGroupReportContext.java | 11 ----------
.../infra/executor/sql/process/Process.java | 13 +++++-------
.../infra/executor/sql/process/ProcessEngine.java | 24 +++++++++-------------
.../executor/sql/process/yaml/YamlProcess.java | 2 --
.../process/yaml/swapper/YamlProcessSwapper.java | 4 +---
.../yaml/swapper/YamlProcessListSwapperTest.java | 10 ++++-----
.../yaml/swapper/YamlProcessSwapperTest.java | 10 ++++-----
.../batch/BatchPreparedStatementExecutor.java | 4 +++-
.../core/connection/ShardingSphereConnection.java | 11 +++++++++-
.../statement/ShardingSpherePreparedStatement.java | 11 ++++++----
.../core/statement/ShardingSphereStatement.java | 9 +++++---
.../batch/BatchPreparedStatementExecutorTest.java | 6 +++++-
.../enumerable/EnumerableScanExecutor.java | 9 +++++---
.../ProcessListChangedSubscriberTest.java | 1 -
.../executor/ShowProcessListExecutorTest.java | 2 +-
15 files changed, 64 insertions(+), 63 deletions(-)
diff --git
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/model/ExecutionGroupReportContext.java
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/model/ExecutionGroupReportContext.java
index 7f68da7a739..9f85bcc9990 100644
---
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/model/ExecutionGroupReportContext.java
+++
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/model/ExecutionGroupReportContext.java
@@ -21,9 +21,6 @@ import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.metadata.user.Grantee;
-import java.util.UUID;
-import java.util.concurrent.ThreadLocalRandom;
-
/**
* Execution group report context.
*/
@@ -37,12 +34,4 @@ public final class ExecutionGroupReportContext {
private final String databaseName;
private final Grantee grantee;
-
- public ExecutionGroupReportContext(final String databaseName) {
- this(databaseName, new Grantee("", ""));
- }
-
- public ExecutionGroupReportContext(final String databaseName, final
Grantee grantee) {
- this(new UUID(ThreadLocalRandom.current().nextLong(),
ThreadLocalRandom.current().nextLong()).toString().replace("-", ""),
databaseName, grantee);
- }
}
diff --git
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/Process.java
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/Process.java
index 13cdc11ac93..517cbb19186 100644
---
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/Process.java
+++
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/Process.java
@@ -61,19 +61,17 @@ public final class Process {
private final boolean idle;
- private final boolean heldByConnection;
-
private final AtomicBoolean interrupted;
- public Process(final ExecutionGroupContext<? extends SQLExecutionUnit>
executionGroupContext, final boolean heldByConnection) {
- this("", executionGroupContext, true, heldByConnection);
+ public Process(final ExecutionGroupContext<? extends SQLExecutionUnit>
executionGroupContext) {
+ this("", executionGroupContext, true);
}
- public Process(final String sql, final ExecutionGroupContext<? extends
SQLExecutionUnit> executionGroupContext, final boolean heldByConnection) {
- this(sql, executionGroupContext, false, heldByConnection);
+ public Process(final String sql, final ExecutionGroupContext<? extends
SQLExecutionUnit> executionGroupContext) {
+ this(sql, executionGroupContext, false);
}
- private Process(final String sql, final ExecutionGroupContext<? extends
SQLExecutionUnit> executionGroupContext, final boolean idle, final boolean
heldByConnection) {
+ private Process(final String sql, final ExecutionGroupContext<? extends
SQLExecutionUnit> executionGroupContext, final boolean idle) {
id = executionGroupContext.getReportContext().getProcessId();
startMillis = System.currentTimeMillis();
this.sql = sql;
@@ -85,7 +83,6 @@ public final class Process {
processStatements.putAll(createProcessStatements(executionGroupContext));
completedUnitCount = new AtomicInteger(0);
this.idle = idle;
- this.heldByConnection = heldByConnection;
interrupted = new AtomicBoolean();
}
diff --git
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java
index d7b501fa7e9..a5d3e3ee48a 100644
---
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java
+++
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java
@@ -28,6 +28,8 @@ import
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.DMLStatemen
import
org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.MySQLStatement;
import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
/**
* Process engine.
@@ -42,8 +44,11 @@ public final class ProcessEngine {
* @return process ID
*/
public String connect(final Grantee grantee, final String databaseName) {
- ExecutionGroupContext<? extends SQLExecutionUnit>
executionGroupContext = new ExecutionGroupContext<>(Collections.emptyList(),
new ExecutionGroupReportContext(databaseName, grantee));
- Process process = new Process(executionGroupContext, true);
+ String processId = new UUID(ThreadLocalRandom.current().nextLong(),
ThreadLocalRandom.current().nextLong()).toString().replace("-", "");
+ ProcessIdContext.set(processId);
+ ExecutionGroupContext<? extends SQLExecutionUnit>
executionGroupContext =
+ new ExecutionGroupContext<>(Collections.emptyList(), new
ExecutionGroupReportContext(processId, databaseName, grantee));
+ Process process = new Process(executionGroupContext);
ProcessRegistry.getInstance().add(process);
return executionGroupContext.getReportContext().getProcessId();
}
@@ -55,7 +60,7 @@ public final class ProcessEngine {
*/
public void disconnect(final String processId) {
ProcessRegistry.getInstance().remove(processId);
-
+ ProcessIdContext.remove();
}
/**
@@ -66,11 +71,7 @@ public final class ProcessEngine {
*/
public void executeSQL(final ExecutionGroupContext<? extends
SQLExecutionUnit> executionGroupContext, final QueryContext queryContext) {
if
(isMySQLDDLOrDMLStatement(queryContext.getSqlStatementContext().getSqlStatement()))
{
- String processId =
executionGroupContext.getReportContext().getProcessId();
- // TODO remove heldByConnection when jdbc connection support
generate processId and call connect and disconnect
- boolean heldByConnection = null !=
ProcessRegistry.getInstance().get(processId) &&
ProcessRegistry.getInstance().get(processId).isHeldByConnection();
- ProcessIdContext.set(processId);
- ProcessRegistry.getInstance().add(new
Process(queryContext.getSql(), executionGroupContext, heldByConnection));
+ ProcessRegistry.getInstance().add(new
Process(queryContext.getSql(), executionGroupContext));
}
}
@@ -97,12 +98,7 @@ public final class ProcessEngine {
}
ExecutionGroupContext<? extends SQLExecutionUnit>
executionGroupContext = new ExecutionGroupContext<>(
Collections.emptyList(), new
ExecutionGroupReportContext(ProcessIdContext.get(), process.getDatabaseName(),
new Grantee(process.getUsername(), process.getHostname())));
- if (process.isHeldByConnection()) {
- ProcessRegistry.getInstance().add(new
Process(executionGroupContext, true));
- } else {
- ProcessRegistry.getInstance().remove(ProcessIdContext.get());
- }
- ProcessIdContext.remove();
+ ProcessRegistry.getInstance().add(new Process(executionGroupContext));
}
private boolean isMySQLDDLOrDMLStatement(final SQLStatement sqlStatement) {
diff --git
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/YamlProcess.java
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/YamlProcess.java
index 2c33485f12a..39aa9b0d09f 100644
---
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/YamlProcess.java
+++
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/YamlProcess.java
@@ -46,7 +46,5 @@ public final class YamlProcess implements YamlConfiguration {
private boolean idle;
- private boolean heldByConnection;
-
private boolean interrupted;
}
diff --git
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessSwapper.java
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessSwapper.java
index 2606086904c..9cfa079314d 100644
---
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessSwapper.java
+++
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessSwapper.java
@@ -41,7 +41,6 @@ public final class YamlProcessSwapper implements
YamlConfigurationSwapper<YamlPr
result.setTotalUnitCount(data.getTotalUnitCount());
result.setCompletedUnitCount(data.getCompletedUnitCount());
result.setIdle(data.isIdle());
- result.setHeldByConnection(data.isHeldByConnection());
result.setInterrupted(data.getInterrupted());
return result;
}
@@ -49,7 +48,6 @@ public final class YamlProcessSwapper implements
YamlConfigurationSwapper<YamlPr
@Override
public Process swapToObject(final YamlProcess yamlConfig) {
return new Process(yamlConfig.getId(), yamlConfig.getStartMillis(),
yamlConfig.getSql(), yamlConfig.getDatabaseName(), yamlConfig.getUsername(),
yamlConfig.getHostname(),
- yamlConfig.getTotalUnitCount(), new
AtomicInteger(yamlConfig.getCompletedUnitCount()), yamlConfig.isIdle(),
yamlConfig.isHeldByConnection(),
- new AtomicBoolean(yamlConfig.isInterrupted()));
+ yamlConfig.getTotalUnitCount(), new
AtomicInteger(yamlConfig.getCompletedUnitCount()), yamlConfig.isIdle(), new
AtomicBoolean(yamlConfig.isInterrupted()));
}
}
diff --git
a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessListSwapperTest.java
b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessListSwapperTest.java
index ab6a9f0935e..7144ef090d7 100644
---
a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessListSwapperTest.java
+++
b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessListSwapperTest.java
@@ -28,6 +28,8 @@ import org.junit.jupiter.api.Test;
import java.util.Collection;
import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -40,9 +42,10 @@ class YamlProcessListSwapperTest {
@Test
void assertSwapToYamlConfiguration() {
- ExecutionGroupReportContext reportContext = new
ExecutionGroupReportContext("foo_db", new Grantee("root", "localhost"));
+ String processId = new UUID(ThreadLocalRandom.current().nextLong(),
ThreadLocalRandom.current().nextLong()).toString().replace("-", "");
+ ExecutionGroupReportContext reportContext = new
ExecutionGroupReportContext(processId, "foo_db", new Grantee("root",
"localhost"));
ExecutionGroupContext<? extends SQLExecutionUnit>
executionGroupContext = new ExecutionGroupContext<>(Collections.emptyList(),
reportContext);
- Process process = new Process("SELECT 1", executionGroupContext,
false);
+ Process process = new Process("SELECT 1", executionGroupContext);
YamlProcessList actual = new
YamlProcessListSwapper().swapToYamlConfiguration(Collections.singleton(process));
assertThat(actual.getProcesses().size(), is(1));
assertYamlProcessContext(actual.getProcesses().iterator().next());
@@ -57,7 +60,6 @@ class YamlProcessListSwapperTest {
assertThat(actual.getHostname(), is("localhost"));
assertThat(actual.getCompletedUnitCount(), is(0));
assertThat(actual.getTotalUnitCount(), is(0));
- assertThat(actual.isHeldByConnection(), is(false));
assertFalse(actual.isIdle());
}
@@ -81,7 +83,6 @@ class YamlProcessListSwapperTest {
result.setTotalUnitCount(10);
result.setCompletedUnitCount(5);
result.setIdle(true);
- result.setHeldByConnection(true);
return result;
}
@@ -94,7 +95,6 @@ class YamlProcessListSwapperTest {
assertThat(actual.getHostname(), is("localhost"));
assertThat(actual.getTotalUnitCount(), is(10));
assertThat(actual.getCompletedUnitCount(), is(5));
- assertThat(actual.isHeldByConnection(), is(true));
assertTrue(actual.isIdle());
}
}
diff --git
a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessSwapperTest.java
b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessSwapperTest.java
index 2debede78b8..561b13b4093 100644
---
a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessSwapperTest.java
+++
b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/yaml/swapper/YamlProcessSwapperTest.java
@@ -26,6 +26,8 @@ import org.apache.shardingsphere.infra.metadata.user.Grantee;
import org.junit.jupiter.api.Test;
import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -38,9 +40,10 @@ class YamlProcessSwapperTest {
@Test
void assertSwapToYamlConfiguration() {
- ExecutionGroupReportContext reportContext = new
ExecutionGroupReportContext("foo_db", new Grantee("root", "localhost"));
+ String processId = new UUID(ThreadLocalRandom.current().nextLong(),
ThreadLocalRandom.current().nextLong()).toString().replace("-", "");
+ ExecutionGroupReportContext reportContext = new
ExecutionGroupReportContext(processId, "foo_db", new Grantee("root",
"localhost"));
ExecutionGroupContext<? extends SQLExecutionUnit>
executionGroupContext = new ExecutionGroupContext<>(Collections.emptyList(),
reportContext);
- Process process = new Process("SELECT 1", executionGroupContext, true);
+ Process process = new Process("SELECT 1", executionGroupContext);
YamlProcess actual = new
YamlProcessSwapper().swapToYamlConfiguration(process);
assertNotNull(actual.getId());
assertThat(actual.getStartMillis(),
lessThanOrEqualTo(System.currentTimeMillis()));
@@ -50,7 +53,6 @@ class YamlProcessSwapperTest {
assertThat(actual.getHostname(), is("localhost"));
assertThat(actual.getCompletedUnitCount(), is(0));
assertThat(actual.getTotalUnitCount(), is(0));
- assertThat(actual.isHeldByConnection(), is(true));
assertFalse(actual.isIdle());
}
@@ -65,7 +67,6 @@ class YamlProcessSwapperTest {
assertThat(actual.getHostname(), is("localhost"));
assertThat(actual.getTotalUnitCount(), is(10));
assertThat(actual.getCompletedUnitCount(), is(5));
- assertThat(actual.isHeldByConnection(), is(false));
assertTrue(actual.isIdle());
}
@@ -80,7 +81,6 @@ class YamlProcessSwapperTest {
result.setTotalUnitCount(10);
result.setCompletedUnitCount(5);
result.setIdle(true);
- result.setHeldByConnection(false);
return result;
}
}
diff --git
a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/batch/BatchPreparedStatementExecutor.java
b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/batch/BatchPreparedStatementExecutor.java
index 47fdc92f4bc..9173d58b68c 100644
---
a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/batch/BatchPreparedStatementExecutor.java
+++
b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/executor/batch/BatchPreparedStatementExecutor.java
@@ -29,6 +29,8 @@ import
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorEx
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
+import org.apache.shardingsphere.infra.executor.sql.process.ProcessIdContext;
+import org.apache.shardingsphere.infra.metadata.user.Grantee;
import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
import
org.apache.shardingsphere.infra.rule.identifier.type.DataNodeContainedRule;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
@@ -67,7 +69,7 @@ public final class BatchPreparedStatementExecutor {
this.databaseName = databaseName;
this.metaDataContexts = metaDataContexts;
this.jdbcExecutor = jdbcExecutor;
- executionGroupContext = new ExecutionGroupContext<>(new
LinkedList<>(), new ExecutionGroupReportContext(databaseName));
+ executionGroupContext = new ExecutionGroupContext<>(new
LinkedList<>(), new ExecutionGroupReportContext(ProcessIdContext.get(),
databaseName, new Grantee("", "")));
batchExecutionUnits = new LinkedList<>();
}
diff --git
a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
index 764c9dcf43d..532a3743029 100644
---
a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
+++
b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
@@ -23,8 +23,10 @@ import
org.apache.shardingsphere.driver.jdbc.core.datasource.metadata.ShardingSp
import
org.apache.shardingsphere.driver.jdbc.core.statement.ShardingSpherePreparedStatement;
import
org.apache.shardingsphere.driver.jdbc.core.statement.ShardingSphereStatement;
import
org.apache.shardingsphere.driver.jdbc.exception.connection.ConnectionClosedException;
-import org.apache.shardingsphere.infra.session.connection.ConnectionContext;
import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
+import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
+import org.apache.shardingsphere.infra.metadata.user.Grantee;
+import org.apache.shardingsphere.infra.session.connection.ConnectionContext;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.transaction.api.TransactionType;
@@ -42,6 +44,8 @@ import java.sql.Statement;
*/
public final class ShardingSphereConnection extends AbstractConnectionAdapter {
+ private final ProcessEngine processEngine = new ProcessEngine();
+
@Getter
private final String databaseName;
@@ -51,6 +55,9 @@ public final class ShardingSphereConnection extends
AbstractConnectionAdapter {
@Getter
private final DriverDatabaseConnectionManager databaseConnectionManager;
+ @Getter
+ private final String processId;
+
private boolean autoCommit = true;
private int transactionIsolation = TRANSACTION_READ_UNCOMMITTED;
@@ -63,6 +70,7 @@ public final class ShardingSphereConnection extends
AbstractConnectionAdapter {
this.databaseName = databaseName;
this.contextManager = contextManager;
databaseConnectionManager = new
DriverDatabaseConnectionManager(databaseName, contextManager);
+ processId = processEngine.connect(new Grantee("", ""), databaseName);
}
/**
@@ -301,6 +309,7 @@ public final class ShardingSphereConnection extends
AbstractConnectionAdapter {
public void close() throws SQLException {
closed = true;
databaseConnectionManager.close();
+ processEngine.disconnect(processId);
}
private ConnectionContext getConnectionContext() {
diff --git
a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
index 39a6a36f411..a2075b54b28 100644
---
a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
+++
b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
@@ -64,6 +64,7 @@ import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecuti
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
import
org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
+import org.apache.shardingsphere.infra.executor.sql.process.ProcessIdContext;
import org.apache.shardingsphere.infra.hint.HintManager;
import org.apache.shardingsphere.infra.hint.HintValueContext;
import org.apache.shardingsphere.infra.hint.SQLHintUtils;
@@ -72,6 +73,7 @@ import org.apache.shardingsphere.infra.merge.MergeEngine;
import org.apache.shardingsphere.infra.merge.result.MergedResult;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
+import org.apache.shardingsphere.infra.metadata.user.Grantee;
import org.apache.shardingsphere.infra.parser.SQLParserEngine;
import org.apache.shardingsphere.infra.route.context.RouteContext;
import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
@@ -279,7 +281,7 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
prepareEngine = createDriverExecutionPrepareEngine();
ExecutionUnit executionUnit = new ExecutionUnit(trafficInstanceId, new
SQLUnit(queryContext.getSql(), queryContext.getParameters()));
ExecutionGroupContext<JDBCExecutionUnit> context =
- prepareEngine.prepare(new RouteContext(),
Collections.singleton(executionUnit), new
ExecutionGroupReportContext(databaseName));
+ prepareEngine.prepare(new RouteContext(),
Collections.singleton(executionUnit), new
ExecutionGroupReportContext(ProcessIdContext.get(), databaseName, new
Grantee("", "")));
if (context.getInputGroups().isEmpty() ||
context.getInputGroups().iterator().next().getInputs().isEmpty()) {
throw new EmptyTrafficExecutionUnitException();
}
@@ -451,7 +453,7 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
private ExecutionGroupContext<RawSQLExecutionUnit>
createRawExecutionGroupContext(final ExecutionContext executionContext) throws
SQLException {
int maxConnectionsSizePerQuery =
metaDataContexts.getMetaData().getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
return new RawExecutionPrepareEngine(maxConnectionsSizePerQuery,
metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getRules())
- .prepare(executionContext.getRouteContext(),
executionContext.getExecutionUnits(), new
ExecutionGroupReportContext(databaseName));
+ .prepare(executionContext.getRouteContext(),
executionContext.getExecutionUnits(), new
ExecutionGroupReportContext(ProcessIdContext.get(), databaseName, new
Grantee("", "")));
}
private boolean executeWithExecutionContext(final ExecutionContext
executionContext) throws SQLException {
@@ -526,7 +528,8 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
private ExecutionGroupContext<JDBCExecutionUnit>
createExecutionGroupContext(final ExecutionContext executionContext) throws
SQLException {
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
prepareEngine = createDriverExecutionPrepareEngine();
- return prepareEngine.prepare(executionContext.getRouteContext(),
executionContext.getExecutionUnits(), new
ExecutionGroupReportContext(databaseName));
+ return prepareEngine.prepare(executionContext.getRouteContext(),
executionContext.getExecutionUnits(),
+ new ExecutionGroupReportContext(ProcessIdContext.get(),
databaseName, new Grantee("", "")));
}
@Override
@@ -718,7 +721,7 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
ExecutionUnit executionUnit = each.getExecutionUnit();
executionUnits.add(executionUnit);
}
-
batchExecutor.init(prepareEngine.prepare(executionContext.getRouteContext(),
executionUnits, new ExecutionGroupReportContext(databaseName)));
+
batchExecutor.init(prepareEngine.prepare(executionContext.getRouteContext(),
executionUnits, new ExecutionGroupReportContext(ProcessIdContext.get(),
databaseName, new Grantee("", ""))));
setBatchParametersForStatements(batchExecutor);
}
diff --git
a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
index 99fa09d1848..e8daac199cd 100644
---
a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
+++
b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
@@ -61,6 +61,7 @@ import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecuti
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
import
org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
+import org.apache.shardingsphere.infra.executor.sql.process.ProcessIdContext;
import org.apache.shardingsphere.infra.hint.HintValueContext;
import org.apache.shardingsphere.infra.hint.SQLHintUtils;
import org.apache.shardingsphere.infra.instance.InstanceContext;
@@ -68,6 +69,7 @@ import org.apache.shardingsphere.infra.merge.MergeEngine;
import org.apache.shardingsphere.infra.merge.result.MergedResult;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
+import org.apache.shardingsphere.infra.metadata.user.Grantee;
import org.apache.shardingsphere.infra.route.context.RouteContext;
import
org.apache.shardingsphere.infra.rule.identifier.type.DataNodeContainedRule;
import org.apache.shardingsphere.infra.rule.identifier.type.RawExecutionRule;
@@ -476,7 +478,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
prepareEngine = createDriverExecutionPrepareEngine();
ExecutionUnit executionUnit = new ExecutionUnit(trafficInstanceId, new
SQLUnit(queryContext.getSql(), queryContext.getParameters()));
ExecutionGroupContext<JDBCExecutionUnit> context =
- prepareEngine.prepare(new RouteContext(),
Collections.singletonList(executionUnit), new
ExecutionGroupReportContext(databaseName));
+ prepareEngine.prepare(new RouteContext(),
Collections.singletonList(executionUnit), new
ExecutionGroupReportContext(ProcessIdContext.get(), databaseName, new
Grantee("", "")));
return context.getInputGroups().stream().flatMap(each ->
each.getInputs().stream()).findFirst().orElseThrow(EmptyTrafficExecutionUnitException::new);
}
@@ -522,13 +524,14 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
private ExecutionGroupContext<JDBCExecutionUnit>
createExecutionGroupContext(final ExecutionContext executionContext) throws
SQLException {
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
prepareEngine = createDriverExecutionPrepareEngine();
- return prepareEngine.prepare(executionContext.getRouteContext(),
executionContext.getExecutionUnits(), new
ExecutionGroupReportContext(databaseName));
+ return prepareEngine.prepare(executionContext.getRouteContext(),
executionContext.getExecutionUnits(),
+ new ExecutionGroupReportContext(ProcessIdContext.get(),
databaseName, new Grantee("", "")));
}
private ExecutionGroupContext<RawSQLExecutionUnit>
createRawExecutionContext(final ExecutionContext executionContext) throws
SQLException {
int maxConnectionsSizePerQuery =
metaDataContexts.getMetaData().getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
return new RawExecutionPrepareEngine(maxConnectionsSizePerQuery,
metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getRules())
- .prepare(executionContext.getRouteContext(),
executionContext.getExecutionUnits(), new
ExecutionGroupReportContext(databaseName));
+ .prepare(executionContext.getRouteContext(),
executionContext.getExecutionUnits(), new
ExecutionGroupReportContext(ProcessIdContext.get(), databaseName, new
Grantee("", "")));
}
private boolean executeWithExecutionContext(final ExecuteCallback
executeCallback, final ExecutionContext executionContext) throws SQLException {
diff --git
a/jdbc/core/src/test/java/org/apache/shardingsphere/driver/executor/batch/BatchPreparedStatementExecutorTest.java
b/jdbc/core/src/test/java/org/apache/shardingsphere/driver/executor/batch/BatchPreparedStatementExecutorTest.java
index 5a8e8716a30..6c8cb0acb12 100644
---
a/jdbc/core/src/test/java/org/apache/shardingsphere/driver/executor/batch/BatchPreparedStatementExecutorTest.java
+++
b/jdbc/core/src/test/java/org/apache/shardingsphere/driver/executor/batch/BatchPreparedStatementExecutorTest.java
@@ -32,6 +32,7 @@ import
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorEx
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
+import org.apache.shardingsphere.infra.metadata.user.Grantee;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.sharding.rule.ShardingRule;
@@ -58,6 +59,8 @@ import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -193,8 +196,9 @@ class BatchPreparedStatementExecutorTest {
@SneakyThrows(ReflectiveOperationException.class)
private void setFields(final Collection<ExecutionGroup<JDBCExecutionUnit>>
executionGroups, final Collection<BatchExecutionUnit> batchExecutionUnits) {
+ String processId = new UUID(ThreadLocalRandom.current().nextLong(),
ThreadLocalRandom.current().nextLong()).toString().replace("-", "");
Plugins.getMemberAccessor().set(BatchPreparedStatementExecutor.class.getDeclaredField("executionGroupContext"),
executor, new ExecutionGroupContext<>(executionGroups,
- new ExecutionGroupReportContext("logic_db")));
+ new ExecutionGroupReportContext(processId, "logic_db", new
Grantee("", ""))));
Plugins.getMemberAccessor().set(BatchPreparedStatementExecutor.class.getDeclaredField("batchExecutionUnits"),
executor, batchExecutionUnits);
Plugins.getMemberAccessor().set(BatchPreparedStatementExecutor.class.getDeclaredField("batchCount"),
executor, 2);
}
diff --git
a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/EnumerableScanExecutor.java
b/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/EnumerableScanExecutor.java
index 0b494ea7f34..6bb47cac627 100644
---
a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/EnumerableScanExecutor.java
+++
b/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/EnumerableScanExecutor.java
@@ -41,6 +41,7 @@ import
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult
import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
+import org.apache.shardingsphere.infra.executor.sql.process.ProcessIdContext;
import org.apache.shardingsphere.infra.hint.HintValueContext;
import org.apache.shardingsphere.infra.merge.MergeEngine;
import org.apache.shardingsphere.infra.merge.result.MergedResult;
@@ -53,6 +54,7 @@ import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereRowData
import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereSchemaData;
import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereStatistics;
import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereTableData;
+import org.apache.shardingsphere.infra.metadata.user.Grantee;
import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUser;
import org.apache.shardingsphere.infra.parser.sql.SQLStatementParserEngine;
import org.apache.shardingsphere.infra.session.connection.ConnectionContext;
@@ -60,10 +62,10 @@ import
org.apache.shardingsphere.infra.session.query.QueryContext;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import
org.apache.shardingsphere.sqlfederation.executor.SQLFederationExecutorContext;
import
org.apache.shardingsphere.sqlfederation.executor.TableScanExecutorContext;
-import
org.apache.shardingsphere.sqlfederation.optimizer.metadata.schema.table.EmptyRowEnumerator;
import org.apache.shardingsphere.sqlfederation.executor.row.MemoryEnumerator;
import
org.apache.shardingsphere.sqlfederation.executor.row.SQLFederationRowEnumerator;
import
org.apache.shardingsphere.sqlfederation.optimizer.context.OptimizerContext;
+import
org.apache.shardingsphere.sqlfederation.optimizer.metadata.schema.table.EmptyRowEnumerator;
import
org.apache.shardingsphere.sqlfederation.optimizer.metadata.schema.table.ScanExecutor;
import
org.apache.shardingsphere.sqlfederation.optimizer.metadata.schema.table.ScanExecutorContext;
@@ -151,8 +153,9 @@ public final class EnumerableScanExecutor implements
ScanExecutor {
@Override
public Enumerator<Object> enumerator() {
computeConnectionOffsets(context);
- ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext
=
- prepareEngine.prepare(context.getRouteContext(),
executorContext.getConnectionOffsets(), context.getExecutionUnits(), new
ExecutionGroupReportContext(database.getName()));
+ // TODO pass grantee from proxy and jdbc adapter
+ ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext
= prepareEngine.prepare(context.getRouteContext(),
executorContext.getConnectionOffsets(), context.getExecutionUnits(),
+ new
ExecutionGroupReportContext(ProcessIdContext.get(), database.getName(), new
Grantee("", "")));
setParameters(executionGroupContext.getInputGroups());
processEngine.executeSQL(executionGroupContext,
context.getQueryContext());
List<QueryResult> queryResults =
jdbcExecutor.execute(executionGroupContext,
callback).stream().map(QueryResult.class::cast).collect(Collectors.toList());
diff --git
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
index 74ea27249a3..6f38546e90e 100644
---
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
+++
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
@@ -118,7 +118,6 @@ class ProcessListChangedSubscriberTest {
ClusterPersistRepository repository = registryCenter.getRepository();
verify(repository).persist("/execution_nodes/foo_id/" + instanceId,
"processes:" + System.lineSeparator() + "- completedUnitCount:
0" + System.lineSeparator()
- + " heldByConnection: false" + System.lineSeparator()
+ " id: foo_id" + System.lineSeparator()
+ " idle: false" + System.lineSeparator()
+ " interrupted: false" + System.lineSeparator()
diff --git
a/proxy/backend/type/mysql/src/test/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutorTest.java
b/proxy/backend/type/mysql/src/test/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutorTest.java
index 04d8c60069c..a186876c806 100644
---
a/proxy/backend/type/mysql/src/test/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutorTest.java
+++
b/proxy/backend/type/mysql/src/test/java/org/apache/shardingsphere/proxy/backend/mysql/handler/admin/executor/ShowProcessListExecutorTest.java
@@ -67,7 +67,7 @@ class ShowProcessListExecutorTest {
private void setupProcesses(final ShowProcessListExecutor
showProcessListExecutor) throws ReflectiveOperationException {
Process process = new Process("f6c2336a-63ba-41bf-941e-2e3504eb2c80",
1617939785160L,
- "ALTER TABLE t_order ADD COLUMN a varchar(64) AFTER order_id",
"foo_db", "root", "127.0.0.1", 2, new AtomicInteger(1), false, false, new
AtomicBoolean());
+ "ALTER TABLE t_order ADD COLUMN a varchar(64) AFTER order_id",
"foo_db", "root", "127.0.0.1", 2, new AtomicInteger(1), false, new
AtomicBoolean());
Plugins.getMemberAccessor().set(
showProcessListExecutor.getClass().getDeclaredField("processes"),
showProcessListExecutor, Collections.singleton(process));
}