This is an automated email from the ASF dual-hosted git repository.

jianglongtao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 186722b7b40 Refactor PipelineContextKey (#28996)
186722b7b40 is described below

commit 186722b7b40dff9785c026b1d33b448a9520b690
Author: Liang Zhang <[email protected]>
AuthorDate: Thu Nov 9 23:17:55 2023 +0800

    Refactor PipelineContextKey (#28996)
---
 .../common/context/PipelineContextKey.java         | 35 +++-------------------
 .../common/context/PipelineContextManager.java     |  3 +-
 .../PipelineContextManagerLifecycleListener.java   |  4 +--
 .../data/pipeline/core/job/PipelineJobIdUtils.java |  2 +-
 .../common/context/PipelineContextKeyTest.java     |  8 ++---
 .../handler/query/ShowStreamingListExecutor.java   |  3 +-
 .../handler/query/ShowMigrationListExecutor.java   |  3 +-
 .../ShowMigrationSourceStorageUnitsExecutor.java   |  3 +-
 .../handler/update/MigrateTableUpdater.java        |  3 +-
 .../RegisterMigrationSourceStorageUnitUpdater.java |  3 +-
 ...nregisterMigrationSourceStorageUnitUpdater.java |  3 +-
 .../data/pipeline/cdc/api/impl/CDCJobAPI.java      |  3 +-
 .../data/pipeline/cdc/core/job/CDCJobIdTest.java   |  2 +-
 .../pipeline/core/job/PipelineJobIdUtilsTest.java  |  2 +-
 .../ral/queryable/ShowMigrationRuleExecutor.java   |  3 +-
 .../AlterInventoryIncrementalRuleUpdater.java      |  3 +-
 .../core/util/JobConfigurationBuilder.java         |  2 +-
 .../pipeline/core/util/PipelineContextUtils.java   |  3 +-
 .../consistencycheck/ConsistencyCheckJobTest.java  |  3 +-
 19 files changed, 38 insertions(+), 53 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/context/PipelineContextKey.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/context/PipelineContextKey.java
index 179095afb55..45f4ab61421 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/context/PipelineContextKey.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/context/PipelineContextKey.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.data.pipeline.common.context;
 
-import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
@@ -27,7 +26,7 @@ import java.util.Objects;
 /**
  * Pipeline context key.
  */
-@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
+@RequiredArgsConstructor
 @Getter
 public final class PipelineContextKey {
     
@@ -35,34 +34,8 @@ public final class PipelineContextKey {
     
     private final InstanceType instanceType;
     
-    /**
-     * Build context key.
-     *
-     * @param databaseName database name
-     * @param instanceType instance type
-     * @return context key
-     */
-    public static PipelineContextKey build(final String databaseName, final 
InstanceType instanceType) {
-        return new PipelineContextKey(databaseName, instanceType);
-    }
-    
-    /**
-     * Build context key for proxy.
-     *
-     * @return context key
-     */
-    public static PipelineContextKey buildForProxy() {
-        return new PipelineContextKey("", InstanceType.PROXY);
-    }
-    
-    /**
-     * Build context key for proxy.
-     *
-     * @param databaseName database name
-     * @return context key
-     */
-    public static PipelineContextKey buildForProxy(final String databaseName) {
-        return new PipelineContextKey(databaseName, InstanceType.PROXY);
+    public PipelineContextKey(final InstanceType instanceType) {
+        this("", instanceType);
     }
     
     @Override
@@ -78,7 +51,7 @@ public final class PipelineContextKey {
     }
     
     private String filterDatabaseName(final PipelineContextKey contextKey) {
-        return contextKey.getInstanceType() == InstanceType.PROXY ? "" : 
contextKey.getDatabaseName();
+        return InstanceType.PROXY == contextKey.getInstanceType() ? "" : 
contextKey.getDatabaseName();
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/context/PipelineContextManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/context/PipelineContextManager.java
index bcf9f6819c9..1be109f6784 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/context/PipelineContextManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/context/PipelineContextManager.java
@@ -19,6 +19,7 @@ package 
org.apache.shardingsphere.data.pipeline.common.context;
 
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -47,7 +48,7 @@ public final class PipelineContextManager {
      * @return context
      */
     public static PipelineContext getProxyContext() {
-        return CONTEXT_MAP.get(PipelineContextKey.buildForProxy());
+        return CONTEXT_MAP.get(new PipelineContextKey(InstanceType.PROXY));
     }
     
     /**
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/listener/PipelineContextManagerLifecycleListener.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/listener/PipelineContextManagerLifecycleListener.java
index 9abebcd9ce3..e22b29891f9 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/listener/PipelineContextManagerLifecycleListener.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/listener/PipelineContextManagerLifecycleListener.java
@@ -47,7 +47,7 @@ public final class PipelineContextManagerLifecycleListener 
implements ContextMan
         if (DefaultDatabase.LOGIC_NAME.equals(databaseName)) {
             return;
         }
-        PipelineContextKey contextKey = PipelineContextKey.build(databaseName, 
contextManager.getInstanceContext().getInstance().getMetaData().getType());
+        PipelineContextKey contextKey = new PipelineContextKey(databaseName, 
contextManager.getInstanceContext().getInstance().getMetaData().getType());
         PipelineContextManager.putContext(contextKey, new 
PipelineContext(modeConfig, contextManager));
         PipelineMetaDataNodeWatcher.getInstance(contextKey);
         ElasticJobServiceLoader.registerTypedService(ElasticJobListener.class);
@@ -55,6 +55,6 @@ public final class PipelineContextManagerLifecycleListener 
implements ContextMan
     
     @Override
     public void onDestroyed(final String databaseName, final InstanceType 
instanceType) {
-        
PipelineContextManager.removeContext(PipelineContextKey.build(databaseName, 
instanceType));
+        PipelineContextManager.removeContext(new 
PipelineContextKey(databaseName, instanceType));
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java
index 08f06b3ddbe..0840073bf3d 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java
@@ -84,6 +84,6 @@ public final class PipelineJobIdUtils {
         char instanceType = jobId.charAt(5);
         short databaseNameLength = 
Shorts.fromByteArray(Hex.decodeHex(jobId.substring(6, 10)));
         String databaseName = new String(Hex.decodeHex(jobId.substring(10, 10 
+ databaseNameLength)), StandardCharsets.UTF_8);
-        return PipelineContextKey.build(databaseName, 
InstanceTypeUtils.decode(instanceType));
+        return new PipelineContextKey(databaseName, 
InstanceTypeUtils.decode(instanceType));
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/context/PipelineContextKeyTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/context/PipelineContextKeyTest.java
index fed520da329..b690776815e 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/context/PipelineContextKeyTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/context/PipelineContextKeyTest.java
@@ -28,16 +28,16 @@ class PipelineContextKeyTest {
     
     @Test
     void assertHashCodeEqualsForProxyMode() {
-        PipelineContextKey contextKey1 = PipelineContextKey.build(null, 
InstanceType.PROXY);
-        PipelineContextKey contextKey2 = 
PipelineContextKey.build("sharding_db", InstanceType.PROXY);
+        PipelineContextKey contextKey1 = new PipelineContextKey(null, 
InstanceType.PROXY);
+        PipelineContextKey contextKey2 = new PipelineContextKey("sharding_db", 
InstanceType.PROXY);
         assertThat(contextKey1.hashCode(), is(contextKey2.hashCode()));
         assertThat(contextKey1, is(contextKey2));
     }
     
     @Test
     void assertHashCodeEqualsForJdbcMode() {
-        PipelineContextKey contextKey1 = PipelineContextKey.build("logic_db", 
InstanceType.JDBC);
-        PipelineContextKey contextKey2 = 
PipelineContextKey.build("sharding_db", InstanceType.JDBC);
+        PipelineContextKey contextKey1 = new PipelineContextKey("logic_db", 
InstanceType.JDBC);
+        PipelineContextKey contextKey2 = new PipelineContextKey("sharding_db", 
InstanceType.JDBC);
         assertThat(contextKey1.hashCode(), not(contextKey2.hashCode()));
         assertThat(contextKey1, not(contextKey2));
     }
diff --git 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingListExecutor.java
 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingListExecutor.java
index 8d83b648daa..fe9676f5119 100644
--- 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingListExecutor.java
+++ 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingListExecutor.java
@@ -22,6 +22,7 @@ import 
org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
 import 
org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo;
 import 
org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
+import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
 import 
org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow;
 
 import java.util.Arrays;
@@ -38,7 +39,7 @@ public final class ShowStreamingListExecutor implements 
QueryableRALExecutor<Sho
     
     @Override
     public Collection<LocalDataQueryResultRow> getRows(final 
ShowStreamingListStatement sqlStatement) {
-        return 
jobAPI.list(PipelineContextKey.buildForProxy()).stream().map(each -> new 
LocalDataQueryResultRow(each.getJobMetaData().getJobId(),
+        return jobAPI.list(new 
PipelineContextKey(InstanceType.PROXY)).stream().map(each -> new 
LocalDataQueryResultRow(each.getJobMetaData().getJobId(),
                 ((TableBasedPipelineJobInfo) each).getDatabaseName(), 
((TableBasedPipelineJobInfo) each).getTable(),
                 each.getJobMetaData().getJobItemCount(), 
each.getJobMetaData().isActive() ? Boolean.TRUE.toString() : 
Boolean.FALSE.toString(),
                 each.getJobMetaData().getCreateTime(), 
Optional.ofNullable(each.getJobMetaData().getStopTime()).orElse(""))).collect(Collectors.toList());
diff --git 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationListExecutor.java
 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationListExecutor.java
index 45ed253bdf0..94585e89753 100644
--- 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationListExecutor.java
+++ 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationListExecutor.java
@@ -21,6 +21,7 @@ import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey
 import 
org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
 import 
org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
+import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
 import 
org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow;
 import 
org.apache.shardingsphere.migration.distsql.statement.ShowMigrationListStatement;
 
@@ -37,7 +38,7 @@ public final class ShowMigrationListExecutor implements 
QueryableRALExecutor<Sho
     
     @Override
     public Collection<LocalDataQueryResultRow> getRows(final 
ShowMigrationListStatement sqlStatement) {
-        return 
jobAPI.list(PipelineContextKey.buildForProxy()).stream().map(each -> new 
LocalDataQueryResultRow(each.getJobMetaData().getJobId(),
+        return jobAPI.list(new 
PipelineContextKey(InstanceType.PROXY)).stream().map(each -> new 
LocalDataQueryResultRow(each.getJobMetaData().getJobId(),
                 ((TableBasedPipelineJobInfo) each).getTable(), 
each.getJobMetaData().getJobItemCount(),
                 each.getJobMetaData().isActive() ? Boolean.TRUE.toString() : 
Boolean.FALSE.toString(),
                 each.getJobMetaData().getCreateTime(), 
each.getJobMetaData().getStopTime())).collect(Collectors.toList());
diff --git 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationSourceStorageUnitsExecutor.java
 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationSourceStorageUnitsExecutor.java
index 52d40f5c7c2..d4b5e537464 100644
--- 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationSourceStorageUnitsExecutor.java
+++ 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationSourceStorageUnitsExecutor.java
@@ -20,6 +20,7 @@ package 
org.apache.shardingsphere.migration.distsql.handler.query;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
 import 
org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
+import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
 import 
org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow;
 import 
org.apache.shardingsphere.migration.distsql.statement.ShowMigrationSourceStorageUnitsStatement;
 
@@ -38,7 +39,7 @@ public final class ShowMigrationSourceStorageUnitsExecutor 
implements QueryableR
     
     @Override
     public Collection<LocalDataQueryResultRow> getRows(final 
ShowMigrationSourceStorageUnitsStatement sqlStatement) {
-        Iterator<Collection<Object>> data = 
jobAPI.listMigrationSourceResources(PipelineContextKey.buildForProxy()).iterator();
+        Iterator<Collection<Object>> data = 
jobAPI.listMigrationSourceResources(new 
PipelineContextKey(InstanceType.PROXY)).iterator();
         Collection<LocalDataQueryResultRow> result = new LinkedList<>();
         while (data.hasNext()) {
             result.add(new LocalDataQueryResultRow((List<Object>) 
data.next()));
diff --git 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/MigrateTableUpdater.java
 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/MigrateTableUpdater.java
index 854dcffe1ba..ba8c3947445 100644
--- 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/MigrateTableUpdater.java
+++ 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/MigrateTableUpdater.java
@@ -23,6 +23,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.exception.job.MissingRequire
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
 import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
 import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
+import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
 import 
org.apache.shardingsphere.migration.distsql.statement.MigrateTableStatement;
 
 /**
@@ -37,7 +38,7 @@ public final class MigrateTableUpdater implements 
RALUpdater<MigrateTableStateme
     public void executeUpdate(final String databaseName, final 
MigrateTableStatement sqlStatement) {
         String targetDatabaseName = null == 
sqlStatement.getTargetDatabaseName() ? databaseName : 
sqlStatement.getTargetDatabaseName();
         ShardingSpherePreconditions.checkNotNull(targetDatabaseName, 
MissingRequiredTargetDatabaseException::new);
-        jobAPI.createJobAndStart(PipelineContextKey.buildForProxy(), new 
MigrateTableStatement(sqlStatement.getSourceTargetEntries(), 
targetDatabaseName));
+        jobAPI.createJobAndStart(new PipelineContextKey(InstanceType.PROXY), 
new MigrateTableStatement(sqlStatement.getSourceTargetEntries(), 
targetDatabaseName));
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/RegisterMigrationSourceStorageUnitUpdater.java
 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/RegisterMigrationSourceStorageUnitUpdater.java
index 8dde82c6b6b..b1c43f9e905 100644
--- 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/RegisterMigrationSourceStorageUnitUpdater.java
+++ 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/RegisterMigrationSourceStorageUnitUpdater.java
@@ -30,6 +30,7 @@ import 
org.apache.shardingsphere.infra.database.core.type.DatabaseTypeFactory;
 import 
org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties;
 import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
 import 
org.apache.shardingsphere.infra.exception.core.external.sql.type.generic.UnsupportedSQLOperationException;
+import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
 import 
org.apache.shardingsphere.migration.distsql.statement.RegisterMigrationSourceStorageUnitStatement;
 
 import java.util.ArrayList;
@@ -54,7 +55,7 @@ public final class RegisterMigrationSourceStorageUnitUpdater 
implements RALUpdat
         DatabaseType databaseType = 
DatabaseTypeFactory.get(urlBasedDataSourceSegment.getUrl());
         Map<String, DataSourcePoolProperties> propsMap = 
DataSourceSegmentsConverter.convert(databaseType, dataSources);
         validateHandler.validate(propsMap);
-        jobAPI.addMigrationSourceResources(PipelineContextKey.buildForProxy(), 
propsMap);
+        jobAPI.addMigrationSourceResources(new 
PipelineContextKey(InstanceType.PROXY), propsMap);
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/UnregisterMigrationSourceStorageUnitUpdater.java
 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/UnregisterMigrationSourceStorageUnitUpdater.java
index f74d6f0eeca..975c218a37d 100644
--- 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/UnregisterMigrationSourceStorageUnitUpdater.java
+++ 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/UnregisterMigrationSourceStorageUnitUpdater.java
@@ -20,6 +20,7 @@ package 
org.apache.shardingsphere.migration.distsql.handler.update;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
 import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
+import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
 import 
org.apache.shardingsphere.migration.distsql.statement.UnregisterMigrationSourceStorageUnitStatement;
 
 /**
@@ -31,7 +32,7 @@ public final class 
UnregisterMigrationSourceStorageUnitUpdater implements RALUpd
     
     @Override
     public void executeUpdate(final String databaseName, final 
UnregisterMigrationSourceStorageUnitStatement sqlStatement) {
-        
jobAPI.dropMigrationSourceResources(PipelineContextKey.buildForProxy(), 
sqlStatement.getNames());
+        jobAPI.dropMigrationSourceResources(new 
PipelineContextKey(InstanceType.PROXY), sqlStatement.getNames());
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 15b5f7cc3f5..09ceb2debd5 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -78,6 +78,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparer
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import 
org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
 import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
+import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import 
org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
@@ -117,7 +118,7 @@ public final class CDCJobAPI extends 
AbstractInventoryIncrementalJobAPIImpl {
      * @return job id
      */
     public String createJob(final StreamDataParameter param, final CDCSinkType 
sinkType, final Properties sinkProps) {
-        PipelineContextKey contextKey = 
PipelineContextKey.buildForProxy(param.getDatabaseName());
+        PipelineContextKey contextKey = new 
PipelineContextKey(param.getDatabaseName(), InstanceType.PROXY);
         YamlCDCJobConfiguration yamlJobConfig = 
getYamlCDCJobConfiguration(param, sinkType, sinkProps, contextKey);
         extendYamlJobConfiguration(contextKey, yamlJobConfig);
         CDCJobConfiguration jobConfig = new 
YamlCDCJobConfigurationSwapper().swapToObject(yamlJobConfig);
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobIdTest.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobIdTest.java
index e61cce62ab4..70856a72c1c 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobIdTest.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobIdTest.java
@@ -34,7 +34,7 @@ class CDCJobIdTest {
     
     @Test
     void assertParseJobType() {
-        PipelineContextKey contextKey = 
PipelineContextKey.build("sharding_db", InstanceType.PROXY);
+        PipelineContextKey contextKey = new PipelineContextKey("sharding_db", 
InstanceType.PROXY);
         CDCJobId pipelineJobId = new CDCJobId(contextKey, 
Arrays.asList("test", "t_order"), false, CDCSinkType.SOCKET.name());
         String jobId = 
PipelineJobIdUtils.marshalJobIdCommonPrefix(pipelineJobId) + "abcd";
         JobType actualJobType = PipelineJobIdUtils.parseJobType(jobId);
diff --git 
a/kernel/data-pipeline/scenario/migration/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtilsTest.java
 
b/kernel/data-pipeline/scenario/migration/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtilsTest.java
index 92c53973eb6..2b20d73649c 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtilsTest.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtilsTest.java
@@ -40,7 +40,7 @@ class PipelineJobIdUtilsTest {
     }
     
     private void assertParse0(final InstanceType instanceType) {
-        PipelineContextKey contextKey = 
PipelineContextKey.build("sharding_db", instanceType);
+        PipelineContextKey contextKey = new PipelineContextKey("sharding_db", 
instanceType);
         MigrationJobId pipelineJobId = new MigrationJobId(contextKey, 
Collections.singletonList("t_order:ds_0.t_order_0,ds_0.t_order_1"));
         String jobId = new MigrationJobAPI().marshalJobId(pipelineJobId);
         assertThat(PipelineJobIdUtils.parseJobType(jobId), 
instanceOf(MigrationJobType.class));
diff --git 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowMigrationRuleExecutor.java
 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowMigrationRuleExecutor.java
index ffb025ccbe8..e592dd7ee12 100644
--- 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowMigrationRuleExecutor.java
+++ 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowMigrationRuleExecutor.java
@@ -23,6 +23,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncreme
 import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
 import 
org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
 import 
org.apache.shardingsphere.distsql.statement.ral.queryable.ShowMigrationRuleStatement;
+import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
 import 
org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 import org.apache.shardingsphere.infra.util.json.JsonUtils;
@@ -39,7 +40,7 @@ public final class ShowMigrationRuleExecutor implements 
QueryableRALExecutor<Sho
     @Override
     public Collection<LocalDataQueryResultRow> getRows(final 
ShowMigrationRuleStatement sqlStatement) {
         PipelineProcessConfiguration processConfig = 
((InventoryIncrementalJobAPI) TypedSPILoader.getService(PipelineJobAPI.class, 
"MIGRATION"))
-                .showProcessConfiguration(PipelineContextKey.buildForProxy());
+                .showProcessConfiguration(new 
PipelineContextKey(InstanceType.PROXY));
         Collection<LocalDataQueryResultRow> result = new LinkedList<>();
         result.add(new 
LocalDataQueryResultRow(getString(processConfig.getRead()), 
getString(processConfig.getWrite()), 
getString(processConfig.getStreamChannel())));
         return result;
diff --git 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterInventoryIncrementalRuleUpdater.java
 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterInventoryIncrementalRuleUpdater.java
index c4e277cc2d2..4931418c499 100644
--- 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterInventoryIncrementalRuleUpdater.java
+++ 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterInventoryIncrementalRuleUpdater.java
@@ -23,6 +23,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncreme
 import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
 import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
 import 
org.apache.shardingsphere.distsql.statement.ral.updatable.AlterInventoryIncrementalRuleStatement;
+import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 import 
org.apache.shardingsphere.proxy.backend.handler.distsql.ral.updatable.converter.InventoryIncrementalProcessConfigurationSegmentConverter;
 
@@ -35,7 +36,7 @@ public final class AlterInventoryIncrementalRuleUpdater 
implements RALUpdater<Al
     public void executeUpdate(final String databaseName, final 
AlterInventoryIncrementalRuleStatement sqlStatement) {
         InventoryIncrementalJobAPI jobAPI = (InventoryIncrementalJobAPI) 
TypedSPILoader.getService(PipelineJobAPI.class, sqlStatement.getJobTypeName());
         PipelineProcessConfiguration processConfig = 
InventoryIncrementalProcessConfigurationSegmentConverter.convert(sqlStatement.getProcessConfigSegment());
-        jobAPI.alterProcessConfiguration(PipelineContextKey.buildForProxy(), 
processConfig);
+        jobAPI.alterProcessConfiguration(new 
PipelineContextKey(InstanceType.PROXY), processConfig);
     }
     
     @Override
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java
index 9a44512b41d..09abcb219d2 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java
@@ -78,7 +78,7 @@ public final class JobConfigurationBuilder {
         result.setTargetTableSchemaMap(targetTableSchemaMap);
         result.setTablesFirstDataNodes("t_order:ds_0.t_order");
         
result.setJobShardingDataNodes(Collections.singletonList("t_order:ds_0.t_order"));
-        PipelineContextKey contextKey = 
PipelineContextKey.build(RandomStringUtils.randomAlphabetic(32), 
InstanceType.PROXY);
+        PipelineContextKey contextKey = new 
PipelineContextKey(RandomStringUtils.randomAlphabetic(32), InstanceType.PROXY);
         result.setJobId(generateMigrationJobId(contextKey, result));
         Map<String, YamlPipelineDataSourceConfiguration> sources = new 
LinkedHashMap<>();
         String databaseNameSuffix = RandomStringUtils.randomAlphabetic(9);
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
index 8993efaacc8..ba528a95c1f 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
@@ -38,6 +38,7 @@ import 
org.apache.shardingsphere.data.pipeline.scenario.migration.context.Migrat
 import 
org.apache.shardingsphere.driver.api.yaml.YamlShardingSphereDataSourceFactory;
 import 
org.apache.shardingsphere.driver.jdbc.core.datasource.ShardingSphereDataSource;
 import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
+import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
 import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereColumn;
 import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
@@ -65,7 +66,7 @@ import java.util.Map;
  */
 public final class PipelineContextUtils {
     
-    private static final PipelineContextKey CONTEXT_KEY = 
PipelineContextKey.buildForProxy();
+    private static final PipelineContextKey CONTEXT_KEY = new 
PipelineContextKey(InstanceType.PROXY);
     
     private static final ExecuteEngine EXECUTE_ENGINE = 
ExecuteEngine.newCachedThreadInstance(PipelineContextUtils.class.getSimpleName());
     
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
index a8c3af697f2..f918adf24df 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
@@ -27,6 +27,7 @@ import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.imp
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.yaml.YamlConsistencyCheckJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.context.ConsistencyCheckJobItemContext;
 import org.apache.shardingsphere.elasticjob.api.ShardingContext;
+import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import 
org.apache.shardingsphere.test.it.data.pipeline.core.util.JobConfigurationBuilder;
 import 
org.apache.shardingsphere.test.it.data.pipeline.core.util.PipelineContextUtils;
@@ -48,7 +49,7 @@ class ConsistencyCheckJobTest {
     
     @Test
     void assertBuildPipelineJobItemContext() {
-        ConsistencyCheckJobId pipelineJobId = new 
ConsistencyCheckJobId(PipelineContextKey.buildForProxy(), 
JobConfigurationBuilder.createYamlMigrationJobConfiguration().getJobId());
+        ConsistencyCheckJobId pipelineJobId = new ConsistencyCheckJobId(new 
PipelineContextKey(InstanceType.PROXY), 
JobConfigurationBuilder.createYamlMigrationJobConfiguration().getJobId());
         String checkJobId = new 
ConsistencyCheckJobAPI().marshalJobId(pipelineJobId);
         Map<String, Object> expectTableCheckPosition = 
Collections.singletonMap("t_order", 100);
         
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineContextUtils.getContextKey()).persistJobItemProgress(checkJobId,
 0,


Reply via email to