This is an automated email from the ASF dual-hosted git repository.
jianglongtao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 186722b7b40 Refactor PipelineContextKey (#28996)
186722b7b40 is described below
commit 186722b7b40dff9785c026b1d33b448a9520b690
Author: Liang Zhang <[email protected]>
AuthorDate: Thu Nov 9 23:17:55 2023 +0800
Refactor PipelineContextKey (#28996)
---
.../common/context/PipelineContextKey.java | 35 +++-------------------
.../common/context/PipelineContextManager.java | 3 +-
.../PipelineContextManagerLifecycleListener.java | 4 +--
.../data/pipeline/core/job/PipelineJobIdUtils.java | 2 +-
.../common/context/PipelineContextKeyTest.java | 8 ++---
.../handler/query/ShowStreamingListExecutor.java | 3 +-
.../handler/query/ShowMigrationListExecutor.java | 3 +-
.../ShowMigrationSourceStorageUnitsExecutor.java | 3 +-
.../handler/update/MigrateTableUpdater.java | 3 +-
.../RegisterMigrationSourceStorageUnitUpdater.java | 3 +-
...nregisterMigrationSourceStorageUnitUpdater.java | 3 +-
.../data/pipeline/cdc/api/impl/CDCJobAPI.java | 3 +-
.../data/pipeline/cdc/core/job/CDCJobIdTest.java | 2 +-
.../pipeline/core/job/PipelineJobIdUtilsTest.java | 2 +-
.../ral/queryable/ShowMigrationRuleExecutor.java | 3 +-
.../AlterInventoryIncrementalRuleUpdater.java | 3 +-
.../core/util/JobConfigurationBuilder.java | 2 +-
.../pipeline/core/util/PipelineContextUtils.java | 3 +-
.../consistencycheck/ConsistencyCheckJobTest.java | 3 +-
19 files changed, 38 insertions(+), 53 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/context/PipelineContextKey.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/context/PipelineContextKey.java
index 179095afb55..45f4ab61421 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/context/PipelineContextKey.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/context/PipelineContextKey.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.data.pipeline.common.context;
-import lombok.AccessLevel;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
@@ -27,7 +26,7 @@ import java.util.Objects;
/**
* Pipeline context key.
*/
-@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
+@RequiredArgsConstructor
@Getter
public final class PipelineContextKey {
@@ -35,34 +34,8 @@ public final class PipelineContextKey {
private final InstanceType instanceType;
- /**
- * Build context key.
- *
- * @param databaseName database name
- * @param instanceType instance type
- * @return context key
- */
- public static PipelineContextKey build(final String databaseName, final
InstanceType instanceType) {
- return new PipelineContextKey(databaseName, instanceType);
- }
-
- /**
- * Build context key for proxy.
- *
- * @return context key
- */
- public static PipelineContextKey buildForProxy() {
- return new PipelineContextKey("", InstanceType.PROXY);
- }
-
- /**
- * Build context key for proxy.
- *
- * @param databaseName database name
- * @return context key
- */
- public static PipelineContextKey buildForProxy(final String databaseName) {
- return new PipelineContextKey(databaseName, InstanceType.PROXY);
+ public PipelineContextKey(final InstanceType instanceType) {
+ this("", instanceType);
}
@Override
@@ -78,7 +51,7 @@ public final class PipelineContextKey {
}
private String filterDatabaseName(final PipelineContextKey contextKey) {
- return contextKey.getInstanceType() == InstanceType.PROXY ? "" :
contextKey.getDatabaseName();
+ return InstanceType.PROXY == contextKey.getInstanceType() ? "" :
contextKey.getDatabaseName();
}
@Override
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/context/PipelineContextManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/context/PipelineContextManager.java
index bcf9f6819c9..1be109f6784 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/context/PipelineContextManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/context/PipelineContextManager.java
@@ -19,6 +19,7 @@ package
org.apache.shardingsphere.data.pipeline.common.context;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -47,7 +48,7 @@ public final class PipelineContextManager {
* @return context
*/
public static PipelineContext getProxyContext() {
- return CONTEXT_MAP.get(PipelineContextKey.buildForProxy());
+ return CONTEXT_MAP.get(new PipelineContextKey(InstanceType.PROXY));
}
/**
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/listener/PipelineContextManagerLifecycleListener.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/listener/PipelineContextManagerLifecycleListener.java
index 9abebcd9ce3..e22b29891f9 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/listener/PipelineContextManagerLifecycleListener.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/listener/PipelineContextManagerLifecycleListener.java
@@ -47,7 +47,7 @@ public final class PipelineContextManagerLifecycleListener
implements ContextMan
if (DefaultDatabase.LOGIC_NAME.equals(databaseName)) {
return;
}
- PipelineContextKey contextKey = PipelineContextKey.build(databaseName,
contextManager.getInstanceContext().getInstance().getMetaData().getType());
+ PipelineContextKey contextKey = new PipelineContextKey(databaseName,
contextManager.getInstanceContext().getInstance().getMetaData().getType());
PipelineContextManager.putContext(contextKey, new
PipelineContext(modeConfig, contextManager));
PipelineMetaDataNodeWatcher.getInstance(contextKey);
ElasticJobServiceLoader.registerTypedService(ElasticJobListener.class);
@@ -55,6 +55,6 @@ public final class PipelineContextManagerLifecycleListener
implements ContextMan
@Override
public void onDestroyed(final String databaseName, final InstanceType
instanceType) {
-
PipelineContextManager.removeContext(PipelineContextKey.build(databaseName,
instanceType));
+ PipelineContextManager.removeContext(new
PipelineContextKey(databaseName, instanceType));
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java
index 08f06b3ddbe..0840073bf3d 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java
@@ -84,6 +84,6 @@ public final class PipelineJobIdUtils {
char instanceType = jobId.charAt(5);
short databaseNameLength =
Shorts.fromByteArray(Hex.decodeHex(jobId.substring(6, 10)));
String databaseName = new String(Hex.decodeHex(jobId.substring(10, 10
+ databaseNameLength)), StandardCharsets.UTF_8);
- return PipelineContextKey.build(databaseName,
InstanceTypeUtils.decode(instanceType));
+ return new PipelineContextKey(databaseName,
InstanceTypeUtils.decode(instanceType));
}
}
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/context/PipelineContextKeyTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/context/PipelineContextKeyTest.java
index fed520da329..b690776815e 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/context/PipelineContextKeyTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/context/PipelineContextKeyTest.java
@@ -28,16 +28,16 @@ class PipelineContextKeyTest {
@Test
void assertHashCodeEqualsForProxyMode() {
- PipelineContextKey contextKey1 = PipelineContextKey.build(null,
InstanceType.PROXY);
- PipelineContextKey contextKey2 =
PipelineContextKey.build("sharding_db", InstanceType.PROXY);
+ PipelineContextKey contextKey1 = new PipelineContextKey(null,
InstanceType.PROXY);
+ PipelineContextKey contextKey2 = new PipelineContextKey("sharding_db",
InstanceType.PROXY);
assertThat(contextKey1.hashCode(), is(contextKey2.hashCode()));
assertThat(contextKey1, is(contextKey2));
}
@Test
void assertHashCodeEqualsForJdbcMode() {
- PipelineContextKey contextKey1 = PipelineContextKey.build("logic_db",
InstanceType.JDBC);
- PipelineContextKey contextKey2 =
PipelineContextKey.build("sharding_db", InstanceType.JDBC);
+ PipelineContextKey contextKey1 = new PipelineContextKey("logic_db",
InstanceType.JDBC);
+ PipelineContextKey contextKey2 = new PipelineContextKey("sharding_db",
InstanceType.JDBC);
assertThat(contextKey1.hashCode(), not(contextKey2.hashCode()));
assertThat(contextKey1, not(contextKey2));
}
diff --git
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingListExecutor.java
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingListExecutor.java
index 8d83b648daa..fe9676f5119 100644
---
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingListExecutor.java
+++
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingListExecutor.java
@@ -22,6 +22,7 @@ import
org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import
org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo;
import
org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
+import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import
org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow;
import java.util.Arrays;
@@ -38,7 +39,7 @@ public final class ShowStreamingListExecutor implements
QueryableRALExecutor<Sho
@Override
public Collection<LocalDataQueryResultRow> getRows(final
ShowStreamingListStatement sqlStatement) {
- return
jobAPI.list(PipelineContextKey.buildForProxy()).stream().map(each -> new
LocalDataQueryResultRow(each.getJobMetaData().getJobId(),
+ return jobAPI.list(new
PipelineContextKey(InstanceType.PROXY)).stream().map(each -> new
LocalDataQueryResultRow(each.getJobMetaData().getJobId(),
((TableBasedPipelineJobInfo) each).getDatabaseName(),
((TableBasedPipelineJobInfo) each).getTable(),
each.getJobMetaData().getJobItemCount(),
each.getJobMetaData().isActive() ? Boolean.TRUE.toString() :
Boolean.FALSE.toString(),
each.getJobMetaData().getCreateTime(),
Optional.ofNullable(each.getJobMetaData().getStopTime()).orElse(""))).collect(Collectors.toList());
diff --git
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationListExecutor.java
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationListExecutor.java
index 45ed253bdf0..94585e89753 100644
---
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationListExecutor.java
+++
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationListExecutor.java
@@ -21,6 +21,7 @@ import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey
import
org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
import
org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
+import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import
org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow;
import
org.apache.shardingsphere.migration.distsql.statement.ShowMigrationListStatement;
@@ -37,7 +38,7 @@ public final class ShowMigrationListExecutor implements
QueryableRALExecutor<Sho
@Override
public Collection<LocalDataQueryResultRow> getRows(final
ShowMigrationListStatement sqlStatement) {
- return
jobAPI.list(PipelineContextKey.buildForProxy()).stream().map(each -> new
LocalDataQueryResultRow(each.getJobMetaData().getJobId(),
+ return jobAPI.list(new
PipelineContextKey(InstanceType.PROXY)).stream().map(each -> new
LocalDataQueryResultRow(each.getJobMetaData().getJobId(),
((TableBasedPipelineJobInfo) each).getTable(),
each.getJobMetaData().getJobItemCount(),
each.getJobMetaData().isActive() ? Boolean.TRUE.toString() :
Boolean.FALSE.toString(),
each.getJobMetaData().getCreateTime(),
each.getJobMetaData().getStopTime())).collect(Collectors.toList());
diff --git
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationSourceStorageUnitsExecutor.java
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationSourceStorageUnitsExecutor.java
index 52d40f5c7c2..d4b5e537464 100644
---
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationSourceStorageUnitsExecutor.java
+++
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationSourceStorageUnitsExecutor.java
@@ -20,6 +20,7 @@ package
org.apache.shardingsphere.migration.distsql.handler.query;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
import
org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
+import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import
org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow;
import
org.apache.shardingsphere.migration.distsql.statement.ShowMigrationSourceStorageUnitsStatement;
@@ -38,7 +39,7 @@ public final class ShowMigrationSourceStorageUnitsExecutor
implements QueryableR
@Override
public Collection<LocalDataQueryResultRow> getRows(final
ShowMigrationSourceStorageUnitsStatement sqlStatement) {
- Iterator<Collection<Object>> data =
jobAPI.listMigrationSourceResources(PipelineContextKey.buildForProxy()).iterator();
+ Iterator<Collection<Object>> data =
jobAPI.listMigrationSourceResources(new
PipelineContextKey(InstanceType.PROXY)).iterator();
Collection<LocalDataQueryResultRow> result = new LinkedList<>();
while (data.hasNext()) {
result.add(new LocalDataQueryResultRow((List<Object>)
data.next()));
diff --git
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/MigrateTableUpdater.java
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/MigrateTableUpdater.java
index 854dcffe1ba..ba8c3947445 100644
---
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/MigrateTableUpdater.java
+++
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/MigrateTableUpdater.java
@@ -23,6 +23,7 @@ import
org.apache.shardingsphere.data.pipeline.core.exception.job.MissingRequire
import
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
+import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import
org.apache.shardingsphere.migration.distsql.statement.MigrateTableStatement;
/**
@@ -37,7 +38,7 @@ public final class MigrateTableUpdater implements
RALUpdater<MigrateTableStateme
public void executeUpdate(final String databaseName, final
MigrateTableStatement sqlStatement) {
String targetDatabaseName = null ==
sqlStatement.getTargetDatabaseName() ? databaseName :
sqlStatement.getTargetDatabaseName();
ShardingSpherePreconditions.checkNotNull(targetDatabaseName,
MissingRequiredTargetDatabaseException::new);
- jobAPI.createJobAndStart(PipelineContextKey.buildForProxy(), new
MigrateTableStatement(sqlStatement.getSourceTargetEntries(),
targetDatabaseName));
+ jobAPI.createJobAndStart(new PipelineContextKey(InstanceType.PROXY),
new MigrateTableStatement(sqlStatement.getSourceTargetEntries(),
targetDatabaseName));
}
@Override
diff --git
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/RegisterMigrationSourceStorageUnitUpdater.java
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/RegisterMigrationSourceStorageUnitUpdater.java
index 8dde82c6b6b..b1c43f9e905 100644
---
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/RegisterMigrationSourceStorageUnitUpdater.java
+++
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/RegisterMigrationSourceStorageUnitUpdater.java
@@ -30,6 +30,7 @@ import
org.apache.shardingsphere.infra.database.core.type.DatabaseTypeFactory;
import
org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties;
import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import
org.apache.shardingsphere.infra.exception.core.external.sql.type.generic.UnsupportedSQLOperationException;
+import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import
org.apache.shardingsphere.migration.distsql.statement.RegisterMigrationSourceStorageUnitStatement;
import java.util.ArrayList;
@@ -54,7 +55,7 @@ public final class RegisterMigrationSourceStorageUnitUpdater
implements RALUpdat
DatabaseType databaseType =
DatabaseTypeFactory.get(urlBasedDataSourceSegment.getUrl());
Map<String, DataSourcePoolProperties> propsMap =
DataSourceSegmentsConverter.convert(databaseType, dataSources);
validateHandler.validate(propsMap);
- jobAPI.addMigrationSourceResources(PipelineContextKey.buildForProxy(),
propsMap);
+ jobAPI.addMigrationSourceResources(new
PipelineContextKey(InstanceType.PROXY), propsMap);
}
@Override
diff --git
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/UnregisterMigrationSourceStorageUnitUpdater.java
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/UnregisterMigrationSourceStorageUnitUpdater.java
index f74d6f0eeca..975c218a37d 100644
---
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/UnregisterMigrationSourceStorageUnitUpdater.java
+++
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/UnregisterMigrationSourceStorageUnitUpdater.java
@@ -20,6 +20,7 @@ package
org.apache.shardingsphere.migration.distsql.handler.update;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
+import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import
org.apache.shardingsphere.migration.distsql.statement.UnregisterMigrationSourceStorageUnitStatement;
/**
@@ -31,7 +32,7 @@ public final class
UnregisterMigrationSourceStorageUnitUpdater implements RALUpd
@Override
public void executeUpdate(final String databaseName, final
UnregisterMigrationSourceStorageUnitStatement sqlStatement) {
-
jobAPI.dropMigrationSourceResources(PipelineContextKey.buildForProxy(),
sqlStatement.getNames());
+ jobAPI.dropMigrationSourceResources(new
PipelineContextKey(InstanceType.PROXY), sqlStatement.getNames());
}
@Override
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 15b5f7cc3f5..09ceb2debd5 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -78,6 +78,7 @@ import
org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparer
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import
org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
+import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import
org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
@@ -117,7 +118,7 @@ public final class CDCJobAPI extends
AbstractInventoryIncrementalJobAPIImpl {
* @return job id
*/
public String createJob(final StreamDataParameter param, final CDCSinkType
sinkType, final Properties sinkProps) {
- PipelineContextKey contextKey =
PipelineContextKey.buildForProxy(param.getDatabaseName());
+ PipelineContextKey contextKey = new
PipelineContextKey(param.getDatabaseName(), InstanceType.PROXY);
YamlCDCJobConfiguration yamlJobConfig =
getYamlCDCJobConfiguration(param, sinkType, sinkProps, contextKey);
extendYamlJobConfiguration(contextKey, yamlJobConfig);
CDCJobConfiguration jobConfig = new
YamlCDCJobConfigurationSwapper().swapToObject(yamlJobConfig);
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobIdTest.java
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobIdTest.java
index e61cce62ab4..70856a72c1c 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobIdTest.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobIdTest.java
@@ -34,7 +34,7 @@ class CDCJobIdTest {
@Test
void assertParseJobType() {
- PipelineContextKey contextKey =
PipelineContextKey.build("sharding_db", InstanceType.PROXY);
+ PipelineContextKey contextKey = new PipelineContextKey("sharding_db",
InstanceType.PROXY);
CDCJobId pipelineJobId = new CDCJobId(contextKey,
Arrays.asList("test", "t_order"), false, CDCSinkType.SOCKET.name());
String jobId =
PipelineJobIdUtils.marshalJobIdCommonPrefix(pipelineJobId) + "abcd";
JobType actualJobType = PipelineJobIdUtils.parseJobType(jobId);
diff --git
a/kernel/data-pipeline/scenario/migration/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtilsTest.java
b/kernel/data-pipeline/scenario/migration/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtilsTest.java
index 92c53973eb6..2b20d73649c 100644
---
a/kernel/data-pipeline/scenario/migration/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtilsTest.java
+++
b/kernel/data-pipeline/scenario/migration/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtilsTest.java
@@ -40,7 +40,7 @@ class PipelineJobIdUtilsTest {
}
private void assertParse0(final InstanceType instanceType) {
- PipelineContextKey contextKey =
PipelineContextKey.build("sharding_db", instanceType);
+ PipelineContextKey contextKey = new PipelineContextKey("sharding_db",
instanceType);
MigrationJobId pipelineJobId = new MigrationJobId(contextKey,
Collections.singletonList("t_order:ds_0.t_order_0,ds_0.t_order_1"));
String jobId = new MigrationJobAPI().marshalJobId(pipelineJobId);
assertThat(PipelineJobIdUtils.parseJobType(jobId),
instanceOf(MigrationJobType.class));
diff --git
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowMigrationRuleExecutor.java
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowMigrationRuleExecutor.java
index ffb025ccbe8..e592dd7ee12 100644
---
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowMigrationRuleExecutor.java
+++
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowMigrationRuleExecutor.java
@@ -23,6 +23,7 @@ import
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncreme
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
import
org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
import
org.apache.shardingsphere.distsql.statement.ral.queryable.ShowMigrationRuleStatement;
+import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import
org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.util.json.JsonUtils;
@@ -39,7 +40,7 @@ public final class ShowMigrationRuleExecutor implements
QueryableRALExecutor<Sho
@Override
public Collection<LocalDataQueryResultRow> getRows(final
ShowMigrationRuleStatement sqlStatement) {
PipelineProcessConfiguration processConfig =
((InventoryIncrementalJobAPI) TypedSPILoader.getService(PipelineJobAPI.class,
"MIGRATION"))
- .showProcessConfiguration(PipelineContextKey.buildForProxy());
+ .showProcessConfiguration(new
PipelineContextKey(InstanceType.PROXY));
Collection<LocalDataQueryResultRow> result = new LinkedList<>();
result.add(new
LocalDataQueryResultRow(getString(processConfig.getRead()),
getString(processConfig.getWrite()),
getString(processConfig.getStreamChannel())));
return result;
diff --git
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterInventoryIncrementalRuleUpdater.java
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterInventoryIncrementalRuleUpdater.java
index c4e277cc2d2..4931418c499 100644
---
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterInventoryIncrementalRuleUpdater.java
+++
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterInventoryIncrementalRuleUpdater.java
@@ -23,6 +23,7 @@ import
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncreme
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
import
org.apache.shardingsphere.distsql.statement.ral.updatable.AlterInventoryIncrementalRuleStatement;
+import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import
org.apache.shardingsphere.proxy.backend.handler.distsql.ral.updatable.converter.InventoryIncrementalProcessConfigurationSegmentConverter;
@@ -35,7 +36,7 @@ public final class AlterInventoryIncrementalRuleUpdater
implements RALUpdater<Al
public void executeUpdate(final String databaseName, final
AlterInventoryIncrementalRuleStatement sqlStatement) {
InventoryIncrementalJobAPI jobAPI = (InventoryIncrementalJobAPI)
TypedSPILoader.getService(PipelineJobAPI.class, sqlStatement.getJobTypeName());
PipelineProcessConfiguration processConfig =
InventoryIncrementalProcessConfigurationSegmentConverter.convert(sqlStatement.getProcessConfigSegment());
- jobAPI.alterProcessConfiguration(PipelineContextKey.buildForProxy(),
processConfig);
+ jobAPI.alterProcessConfiguration(new
PipelineContextKey(InstanceType.PROXY), processConfig);
}
@Override
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java
index 9a44512b41d..09abcb219d2 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java
@@ -78,7 +78,7 @@ public final class JobConfigurationBuilder {
result.setTargetTableSchemaMap(targetTableSchemaMap);
result.setTablesFirstDataNodes("t_order:ds_0.t_order");
result.setJobShardingDataNodes(Collections.singletonList("t_order:ds_0.t_order"));
- PipelineContextKey contextKey =
PipelineContextKey.build(RandomStringUtils.randomAlphabetic(32),
InstanceType.PROXY);
+ PipelineContextKey contextKey = new
PipelineContextKey(RandomStringUtils.randomAlphabetic(32), InstanceType.PROXY);
result.setJobId(generateMigrationJobId(contextKey, result));
Map<String, YamlPipelineDataSourceConfiguration> sources = new
LinkedHashMap<>();
String databaseNameSuffix = RandomStringUtils.randomAlphabetic(9);
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
index 8993efaacc8..ba528a95c1f 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
@@ -38,6 +38,7 @@ import
org.apache.shardingsphere.data.pipeline.scenario.migration.context.Migrat
import
org.apache.shardingsphere.driver.api.yaml.YamlShardingSphereDataSourceFactory;
import
org.apache.shardingsphere.driver.jdbc.core.datasource.ShardingSphereDataSource;
import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
+import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereColumn;
import
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
@@ -65,7 +66,7 @@ import java.util.Map;
*/
public final class PipelineContextUtils {
- private static final PipelineContextKey CONTEXT_KEY =
PipelineContextKey.buildForProxy();
+ private static final PipelineContextKey CONTEXT_KEY = new
PipelineContextKey(InstanceType.PROXY);
private static final ExecuteEngine EXECUTE_ENGINE =
ExecuteEngine.newCachedThreadInstance(PipelineContextUtils.class.getSimpleName());
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
index a8c3af697f2..f918adf24df 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobTest.java
@@ -27,6 +27,7 @@ import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.imp
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.yaml.YamlConsistencyCheckJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.context.ConsistencyCheckJobItemContext;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
+import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import
org.apache.shardingsphere.test.it.data.pipeline.core.util.JobConfigurationBuilder;
import
org.apache.shardingsphere.test.it.data.pipeline.core.util.PipelineContextUtils;
@@ -48,7 +49,7 @@ class ConsistencyCheckJobTest {
@Test
void assertBuildPipelineJobItemContext() {
- ConsistencyCheckJobId pipelineJobId = new
ConsistencyCheckJobId(PipelineContextKey.buildForProxy(),
JobConfigurationBuilder.createYamlMigrationJobConfiguration().getJobId());
+ ConsistencyCheckJobId pipelineJobId = new ConsistencyCheckJobId(new
PipelineContextKey(InstanceType.PROXY),
JobConfigurationBuilder.createYamlMigrationJobConfiguration().getJobId());
String checkJobId = new
ConsistencyCheckJobAPI().marshalJobId(pipelineJobId);
Map<String, Object> expectTableCheckPosition =
Collections.singletonMap("t_order", 100);
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineContextUtils.getContextKey()).persistJobItemProgress(checkJobId,
0,