This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 9e28038a846 Add job sharding info to migration list (#35053)
9e28038a846 is described below
commit 9e28038a846dd7829d8e33c67178edbb8c1ebebb
Author: Raigor <[email protected]>
AuthorDate: Fri Mar 21 11:15:11 2025 +0800
Add job sharding info to migration list (#35053)
* Add job sharding info to migration list
* Add job sharding nodes to the result set of `SHOW MIGRATION LIST`
---
RELEASE-NOTES.md | 1 +
.../syntax/ral/migration/show-migration-list.cn.md | 29 +++++++++++-----------
.../syntax/ral/migration/show-migration-list.en.md | 29 +++++++++++-----------
.../pipeline/core/job/api/PipelineAPIFactory.java | 15 +++++++++++
.../core/job/service/PipelineJobManager.java | 17 +++++++++++++
.../migration/query/ShowMigrationListExecutor.java | 24 +++++++++++++++---
6 files changed, 84 insertions(+), 31 deletions(-)
diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md
index beb32e75200..640c69001ab 100644
--- a/RELEASE-NOTES.md
+++ b/RELEASE-NOTES.md
@@ -28,6 +28,7 @@
1. Encrypt: Use EncryptDerivedColumnSuffix to enhance encrypt table subquery
rewrite logic - [#34829](https://github.com/apache/shardingsphere/pull/34829)
1. Encrypt: Add quotes to encrypt rewrite derived columns -
[#34950](https://github.com/apache/shardingsphere/pull/34950)
1. SQL Router: Add check for select with union all routing to multi data
sources - [#35037](https://github.com/apache/shardingsphere/pull/35037)
+1. DistSQL: Add job sharding nodes to the result set of `SHOW MIGRATION LIST`
- [#35053](https://github.com/apache/shardingsphere/pull/35053)
### Bug Fixes
diff --git
a/docs/document/content/user-manual/shardingsphere-proxy/distsql/syntax/ral/migration/show-migration-list.cn.md
b/docs/document/content/user-manual/shardingsphere-proxy/distsql/syntax/ral/migration/show-migration-list.cn.md
index 17ed7188a96..3d37c3752fb 100644
---
a/docs/document/content/user-manual/shardingsphere-proxy/distsql/syntax/ral/migration/show-migration-list.cn.md
+++
b/docs/document/content/user-manual/shardingsphere-proxy/distsql/syntax/ral/migration/show-migration-list.cn.md
@@ -24,14 +24,15 @@ ShowMigrationList ::=
### 返回值说明
-| 列 | 说明 |
-|----------------|------------|
-| id | 数据迁移作业ID |
-| tables | 迁移表 |
-| job_item_count | 数据迁移作业分片数量 |
-| active | 数据迁移作业状态 |
-| create_time | 数据迁移作业创建时间 |
-| stop_time | 数据迁移作业停止时间 |
+| 列 | 说明 |
+|----------------|--------------|
+| id | 数据迁移作业ID |
+| tables | 迁移表 |
+| active | 数据迁移作业状态 |
+| create_time | 数据迁移作业创建时间 |
+| stop_time | 数据迁移作业停止时间 |
+| job_item_count | 数据迁移作业分片数量 |
+| job_sharding_nodes | 数据迁移作业分片运行节点 |
### 示例
@@ -43,12 +44,12 @@ SHOW MIGRATION LIST;
```sql
mysql> SHOW MIGRATION LIST;
-+---------------------------------------+---------+----------------+--------+---------------------+---------------------+
-| id | tables | job_item_count | active |
create_time | stop_time |
-+---------------------------------------+---------+----------------+--------+---------------------+---------------------+
-| j01013a38b0184e07c864627b5bb05da09ee0 | t_order | 1 | false |
2022-10-31 18:18:24 | 2022-10-31 18:18:31 |
-+---------------------------------------+---------+----------------+--------+---------------------+---------------------+
-1 row in set (0.28 sec)
++--------------------------------------------+---------------------+--------+---------------------+-----------+----------------+--------------------+
+| id | tables | active |
create_time | stop_time | job_item_count | job_sharding_nodes |
++--------------------------------------------+---------------------+--------+---------------------+-----------+----------------+--------------------+
+| j0102p00001d029afca1fd960d567fed6cddc9b4a2 | source_ds.t_order | true |
2022-10-31 18:18:24 | | 1 | 10.7.5.76@-@27808 |
++--------------------------------------------+---------------------+--------+---------------------+-----------+----------------+--------------------+
+4 rows in set (0.06 sec)
```
### 保留字
diff --git
a/docs/document/content/user-manual/shardingsphere-proxy/distsql/syntax/ral/migration/show-migration-list.en.md
b/docs/document/content/user-manual/shardingsphere-proxy/distsql/syntax/ral/migration/show-migration-list.en.md
index e30f6cffc8b..80ed0773792 100644
---
a/docs/document/content/user-manual/shardingsphere-proxy/distsql/syntax/ral/migration/show-migration-list.en.md
+++
b/docs/document/content/user-manual/shardingsphere-proxy/distsql/syntax/ral/migration/show-migration-list.en.md
@@ -23,14 +23,15 @@ ShowMigrationList ::=
### Return Values Description
-| Columns | Description |
-|----------------|--------------------------------|
-| id | migration job id |
-| tables | migration tables |
-| job_item_count | migration job sharding number |
-| active | migration job states |
-| create_time | migration job create time |
-| stop_time | migration job stop time |
+| Columns | Description |
+|--------------------|-------------------------------|
+| id | migration job id |
+| tables | migration tables |
+| job_item_count | migration job sharding number |
+| active | migration job states |
+| create_time | migration job create time |
+| stop_time | migration job stop time |
+| job_sharding_nodes | migration job sharding nodes |
### Example
@@ -42,12 +43,12 @@ SHOW MIGRATION LIST;
```sql
mysql> SHOW MIGRATION LIST;
-+---------------------------------------+---------+----------------+--------+---------------------+---------------------+
-| id | tables | job_item_count | active |
create_time | stop_time |
-+---------------------------------------+---------+----------------+--------+---------------------+---------------------+
-| j01013a38b0184e07c864627b5bb05da09ee0 | t_order | 1 | false |
2022-10-31 18:18:24 | 2022-10-31 18:18:31 |
-+---------------------------------------+---------+----------------+--------+---------------------+---------------------+
-1 row in set (0.28 sec)
++--------------------------------------------+---------------------+--------+---------------------+-----------+----------------+--------------------+
+| id | tables | active |
create_time | stop_time | job_item_count | job_sharding_nodes |
++--------------------------------------------+---------------------+--------+---------------------+-----------+----------------+--------------------+
+| j0102p00001d029afca1fd960d567fed6cddc9b4a2 | source_ds.t_order | true |
2022-10-31 18:18:24 | | 1 | 10.7.5.76@-@27808 |
++--------------------------------------------+---------------------+--------+---------------------+-----------+----------------+--------------------+
+4 rows in set (0.06 sec)
```
### Reserved word
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/api/PipelineAPIFactory.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/api/PipelineAPIFactory.java
index a941c8a3cc7..eba438ee6a4 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/api/PipelineAPIFactory.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/api/PipelineAPIFactory.java
@@ -31,9 +31,11 @@ import
org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.Pi
import
org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobConfigurationAPI;
import org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobOperateAPI;
import
org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobStatisticsAPI;
+import
org.apache.shardingsphere.elasticjob.lite.lifecycle.api.ShardingStatisticsAPI;
import
org.apache.shardingsphere.elasticjob.lite.lifecycle.internal.operate.JobOperateAPIImpl;
import
org.apache.shardingsphere.elasticjob.lite.lifecycle.internal.settings.JobConfigurationAPIImpl;
import
org.apache.shardingsphere.elasticjob.lite.lifecycle.internal.statistics.JobStatisticsAPIImpl;
+import
org.apache.shardingsphere.elasticjob.lite.lifecycle.internal.statistics.ShardingStatisticsAPIImpl;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
@@ -99,6 +101,16 @@ public final class PipelineAPIFactory {
return ElasticJobAPIHolder.getInstance(contextKey).jobOperateAPI;
}
+ /**
+ * Get sharding statistics API.
+ *
+ * @param contextKey context key
+ * @return sharding statistics API
+ */
+ public static ShardingStatisticsAPI getShardingStatisticsAPI(final
PipelineContextKey contextKey) {
+ return
ElasticJobAPIHolder.getInstance(contextKey).shardingStatisticsAPI;
+ }
+
/**
* Get registry center.
*
@@ -119,11 +131,14 @@ public final class PipelineAPIFactory {
private final JobOperateAPI jobOperateAPI;
+ private final ShardingStatisticsAPI shardingStatisticsAPI;
+
private ElasticJobAPIHolder(final PipelineContextKey contextKey) {
CoordinatorRegistryCenter registryCenter =
getRegistryCenter(contextKey);
jobStatisticsAPI = new JobStatisticsAPIImpl(registryCenter);
jobConfigurationAPI = new JobConfigurationAPIImpl(registryCenter);
jobOperateAPI = new JobOperateAPIImpl(registryCenter);
+ shardingStatisticsAPI = new
ShardingStatisticsAPIImpl(registryCenter);
}
public static ElasticJobAPIHolder getInstance(final PipelineContextKey
contextKey) {
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
index 581523c0821..c9d11ea3a7d 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
@@ -32,11 +32,13 @@ import
org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobInfo;
import
org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.PipelineGovernanceFacade;
import
org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
+import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.ShardingInfo;
import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.util.datetime.DateTimeFormatterFactory;
import java.time.LocalDateTime;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
@@ -173,4 +175,19 @@ public final class PipelineJobManager {
return Collections.emptyList();
}
}
+
+ /**
+ * Get pipeline job sharding info.
+ *
+ * @param contextKey context key
+ * @param jobId job id
+ * @return job sharding info
+ */
+ public Collection<ShardingInfo> getJobShardingInfos(final
PipelineContextKey contextKey, final String jobId) {
+ try {
+ return
PipelineAPIFactory.getShardingStatisticsAPI(contextKey).getShardingInfo(jobId);
+ } catch (final NullPointerException | UnsupportedOperationException
ex) {
+ return Collections.emptyList();
+ }
+ }
}
diff --git
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/distsql/handler/migration/query/ShowMigrationListExecutor.java
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/distsql/handler/migration/query/ShowMigrationListExecutor.java
index b90398420c0..fc57e46c7a8 100644
---
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/distsql/handler/migration/query/ShowMigrationListExecutor.java
+++
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/distsql/handler/migration/query/ShowMigrationListExecutor.java
@@ -19,9 +19,11 @@ package
org.apache.shardingsphere.data.pipeline.distsql.handler.migration.query;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
+import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobInfo;
import
org.apache.shardingsphere.data.pipeline.migration.distsql.statement.queryable.ShowMigrationListStatement;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
import
org.apache.shardingsphere.distsql.handler.engine.query.DistSQLQueryExecutor;
+import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.ShardingInfo;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import
org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow;
import org.apache.shardingsphere.mode.manager.ContextManager;
@@ -39,13 +41,29 @@ public final class ShowMigrationListExecutor implements
DistSQLQueryExecutor<Sho
@Override
public Collection<String> getColumnNames(final ShowMigrationListStatement
sqlStatement) {
- return Arrays.asList("id", "tables", "job_item_count", "active",
"create_time", "stop_time");
+ return Arrays.asList("id", "tables", "active", "create_time",
"stop_time", "job_item_count", "job_sharding_nodes");
}
@Override
public Collection<LocalDataQueryResultRow> getRows(final
ShowMigrationListStatement sqlStatement, final ContextManager contextManager) {
- return pipelineJobManager.getJobInfos(new
PipelineContextKey(InstanceType.PROXY)).stream().map(each -> new
LocalDataQueryResultRow(each.getJobMetaData().getJobId(), each.getTableName(),
- each.getJobMetaData().getJobItemCount(),
each.getJobMetaData().isActive(), each.getJobMetaData().getCreateTime(),
each.getJobMetaData().getStopTime())).collect(Collectors.toList());
+ PipelineContextKey contextKey = new
PipelineContextKey(InstanceType.PROXY);
+ return pipelineJobManager.getJobInfos(contextKey).stream().map(each ->
getRow(contextKey, each)).collect(Collectors.toList());
+ }
+
+ private LocalDataQueryResultRow getRow(final PipelineContextKey
contextKey, final PipelineJobInfo jobInfo) {
+ return new
LocalDataQueryResultRow(jobInfo.getJobMetaData().getJobId(),
jobInfo.getTableName(), jobInfo.getJobMetaData().isActive(),
jobInfo.getJobMetaData().getCreateTime(),
+ jobInfo.getJobMetaData().getStopTime(),
jobInfo.getJobMetaData().getJobItemCount(), getJobShardingNodes(contextKey,
jobInfo.getJobMetaData().getJobId()));
+ }
+
+ private String getJobShardingNodes(final PipelineContextKey contextKey,
final String jobId) {
+ Collection<ShardingInfo> shardingInfos =
pipelineJobManager.getJobShardingInfos(contextKey, jobId);
+ return shardingInfos.isEmpty() ? "" :
getJobShardingNodes(shardingInfos);
+ }
+
+ private String getJobShardingNodes(final Collection<ShardingInfo>
shardingInfos) {
+ return 1 == shardingInfos.size()
+ ? shardingInfos.iterator().next().getInstanceId()
+ : shardingInfos.stream().map(each -> each.getItem() + "=" +
each.getInstanceId()).collect(Collectors.joining(","));
}
@Override