This is an automated email from the ASF dual-hosted git repository. menghaoran pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push: new f2d4b4170e8 Revert "Add database name for PipelineContextUtils (#36868)" (#36871) f2d4b4170e8 is described below commit f2d4b4170e869ba0c8cd2fee0d14dbcbad16c90c Author: Haoran Meng <menghaora...@gmail.com> AuthorDate: Mon Oct 13 14:51:56 2025 +0800 Revert "Add database name for PipelineContextUtils (#36868)" (#36871) This reverts commit 88c8e1bf6a1bccd8a54f2e07ac14deff56a9cd63. --- .../datasource/PipelineDataSourceManagerTest.java | 2 +- ...lineProcessConfigurationPersistServiceTest.java | 9 +++----- .../splitter/InventoryTaskSplitterTest.java | 2 +- .../repository/PipelineGovernanceFacadeTest.java | 8 +++---- .../pipeline/core/task/IncrementalTaskTest.java | 2 +- .../data/pipeline/core/task/InventoryTaskTest.java | 2 +- .../core/util/PipelineDistributedBarrierTest.java | 8 +++---- .../ConsistencyCheckJobExecutorCallbackTest.java | 6 ++--- .../api/ConsistencyCheckJobAPITest.java | 10 ++++---- .../migration/api/MigrationJobAPITest.java | 27 +++++++++------------- .../MigrationDataConsistencyCheckerTest.java | 8 +++---- .../pipeline/core/util/PipelineContextUtils.java | 13 +++++------ 12 files changed, 39 insertions(+), 58 deletions(-) diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManagerTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManagerTest.java index e27ac430ae8..37f1b64ccbc 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManagerTest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManagerTest.java @@ -41,7 +41,7 @@ class PipelineDataSourceManagerTest { @BeforeAll static void beforeClass() { - PipelineContextUtils.initPipelineContextManager(PipelineDataSourceManagerTest.class.getSimpleName()); + PipelineContextUtils.initPipelineContextManager(); } @BeforeEach diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineProcessConfigurationPersistServiceTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineProcessConfigurationPersistServiceTest.java index 37ad37954ac..3a19079d14f 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineProcessConfigurationPersistServiceTest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineProcessConfigurationPersistServiceTest.java @@ -33,11 +33,9 @@ import static org.hamcrest.MatcherAssert.assertThat; class PipelineProcessConfigurationPersistServiceTest { - private static final String TEST_DATABASE_NAME = PipelineProcessConfigurationPersistServiceTest.class.getSimpleName(); - @BeforeAll static void beforeClass() { - PipelineContextUtils.initPipelineContextManager(TEST_DATABASE_NAME); + PipelineContextUtils.initPipelineContextManager(); } @Test @@ -54,9 +52,8 @@ class PipelineProcessConfigurationPersistServiceTest { String expectedYamlText = YamlEngine.marshal(yamlProcessConfig); PipelineProcessConfiguration processConfig = new YamlPipelineProcessConfigurationSwapper().swapToObject(yamlProcessConfig); PipelineProcessConfigurationPersistService persistService = new PipelineProcessConfigurationPersistService(); - persistService.persist(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME), "MIGRATION", processConfig); - String actualYamlText = YamlEngine.marshal(new YamlPipelineProcessConfigurationSwapper() - .swapToYamlConfiguration(persistService.load(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME), "MIGRATION"))); + persistService.persist(PipelineContextUtils.getContextKey(), "MIGRATION", processConfig); + String actualYamlText = YamlEngine.marshal(new YamlPipelineProcessConfigurationSwapper().swapToYamlConfiguration(persistService.load(PipelineContextUtils.getContextKey(), "MIGRATION"))); assertThat(actualYamlText, is(expectedYamlText)); } } diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitterTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitterTest.java index 0fc5ad1fff6..36def006535 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitterTest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitterTest.java @@ -59,7 +59,7 @@ class InventoryTaskSplitterTest { @BeforeAll static void beforeClass() { - PipelineContextUtils.initPipelineContextManager(InventoryTaskSplitterTest.class.getSimpleName()); + PipelineContextUtils.initPipelineContextManager(); } @BeforeEach diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/registrycenter/repository/PipelineGovernanceFacadeTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/registrycenter/repository/PipelineGovernanceFacadeTest.java index af3590328e4..06052dd6b68 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/registrycenter/repository/PipelineGovernanceFacadeTest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/registrycenter/repository/PipelineGovernanceFacadeTest.java @@ -67,12 +67,10 @@ class PipelineGovernanceFacadeTest { private static final CountDownLatch COUNT_DOWN_LATCH = new CountDownLatch(1); - private static final String TEST_DATABASE_NAME = PipelineGovernanceFacadeTest.class.getSimpleName(); - @BeforeAll static void beforeClass() { - PipelineContextUtils.initPipelineContextManager(TEST_DATABASE_NAME); - governanceFacade = PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME)); + PipelineContextUtils.initPipelineContextManager(); + governanceFacade = PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()); watch(); } @@ -190,7 +188,7 @@ class PipelineGovernanceFacadeTest { } private ClusterPersistRepository getClusterPersistRepository() { - return (ClusterPersistRepository) PipelineContextManager.getContext(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME)).getPersistServiceFacade().getRepository(); + return (ClusterPersistRepository) PipelineContextManager.getContext(PipelineContextUtils.getContextKey()).getPersistServiceFacade().getRepository(); } private MigrationJobItemContext mockJobItemContext() { diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java index 0eae6053cd0..de7d2a158dd 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java @@ -46,7 +46,7 @@ class IncrementalTaskTest { @BeforeAll static void beforeClass() { - PipelineContextUtils.initPipelineContextManager(IncrementalTaskTest.class.getSimpleName()); + PipelineContextUtils.initPipelineContextManager(); } @BeforeEach diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java index b080c126684..368f3dc1852 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java @@ -56,7 +56,7 @@ class InventoryTaskTest { @BeforeAll static void beforeClass() { - PipelineContextUtils.initPipelineContextManager(InventoryTaskTest.class.getSimpleName()); + PipelineContextUtils.initPipelineContextManager(); } @AfterAll diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrierTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrierTest.java index 2fb96e733bb..5422ce2f32c 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrierTest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrierTest.java @@ -35,17 +35,15 @@ import static org.junit.jupiter.api.Assertions.assertTrue; class PipelineDistributedBarrierTest { - private static final String TEST_DATABASE_NAME = PipelineDistributedBarrierTest.class.getSimpleName(); - @BeforeAll static void setUp() { - PipelineContextUtils.initPipelineContextManager(TEST_DATABASE_NAME); + PipelineContextUtils.initPipelineContextManager(); } @Test void assertRegisterAndRemove() throws ReflectiveOperationException { String jobId = JobConfigurationBuilder.createYamlMigrationJobConfiguration().getJobId(); - PipelineContextKey contextKey = PipelineContextUtils.getContextKey(TEST_DATABASE_NAME); + PipelineContextKey contextKey = PipelineContextUtils.getContextKey(); PipelineContextManager.getContext(contextKey).getPersistServiceFacade().getRepository().persist(PipelineMetaDataNode.getJobRootPath(jobId), ""); PipelineDistributedBarrier instance = PipelineDistributedBarrier.getInstance(contextKey); String parentPath = "/barrier"; @@ -60,7 +58,7 @@ class PipelineDistributedBarrierTest { @Test void assertAwait() { String jobId = JobConfigurationBuilder.createYamlMigrationJobConfiguration().getJobId(); - PipelineContextKey contextKey = PipelineContextUtils.getContextKey(TEST_DATABASE_NAME); + PipelineContextKey contextKey = PipelineContextUtils.getContextKey(); PipelineContextManager.getContext(contextKey).getPersistServiceFacade().getRepository().persist(PipelineMetaDataNode.getJobRootPath(jobId), ""); PipelineDistributedBarrier instance = PipelineDistributedBarrier.getInstance(contextKey); String barrierEnablePath = PipelineMetaDataNode.getJobBarrierEnablePath(jobId); diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobExecutorCallbackTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobExecutorCallbackTest.java index 4c742d1608c..32c4db66fd4 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobExecutorCallbackTest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobExecutorCallbackTest.java @@ -48,11 +48,9 @@ import static org.hamcrest.Matchers.is; class ConsistencyCheckJobExecutorCallbackTest { - private static final String TEST_DATABASE_NAME = ConsistencyCheckJobExecutorCallbackTest.class.getSimpleName(); - @BeforeAll static void beforeClass() { - PipelineContextUtils.initPipelineContextManager(TEST_DATABASE_NAME); + PipelineContextUtils.initPipelineContextManager(); } @Test @@ -60,7 +58,7 @@ class ConsistencyCheckJobExecutorCallbackTest { ConsistencyCheckJobId pipelineJobId = new ConsistencyCheckJobId(new PipelineContextKey(InstanceType.PROXY), JobConfigurationBuilder.createYamlMigrationJobConfiguration().getJobId()); String checkJobId = PipelineJobIdUtils.marshal(pipelineJobId); List<YamlTableCheckRangePosition> expectedYamlTableCheckRangePositions = Collections.singletonList(createYamlTableCheckRangePosition()); - PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME)).getJobItemFacade().getProcess().persist(checkJobId, 0, + PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemFacade().getProcess().persist(checkJobId, 0, YamlEngine.marshal(createYamlConsistencyCheckJobItemProgress(expectedYamlTableCheckRangePositions))); ConsistencyCheckJobExecutorCallback callback = new ConsistencyCheckJobExecutorCallback(); ConsistencyCheckJobConfiguration jobConfig = new YamlConsistencyCheckJobConfigurationSwapper().swapToObject(createYamlConsistencyCheckJobConfiguration(checkJobId)); diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPITest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPITest.java index f0faac443af..bb8ba7c779c 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPITest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPITest.java @@ -55,8 +55,6 @@ import static org.junit.jupiter.api.Assertions.assertNull; class ConsistencyCheckJobAPITest { - private static final String TEST_DATABASE_NAME = ConsistencyCheckJobAPITest.class.getSimpleName(); - private final ConsistencyCheckJobType jobType = new ConsistencyCheckJobType(); private final ConsistencyCheckJobAPI jobAPI = new ConsistencyCheckJobAPI(jobType); @@ -67,7 +65,7 @@ class ConsistencyCheckJobAPITest { @BeforeAll static void beforeClass() { - PipelineContextUtils.initPipelineContextManager(TEST_DATABASE_NAME); + PipelineContextUtils.initPipelineContextManager(); } @Test @@ -83,7 +81,7 @@ class ConsistencyCheckJobAPITest { assertNull(checkJobConfig.getAlgorithmTypeName()); int sequence = ConsistencyCheckJobId.parseSequence(expectCheckJobId); assertThat(sequence, is(expectedSequence)); - PipelineGovernanceFacade governanceFacade = PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME)); + PipelineGovernanceFacade governanceFacade = PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()); Collection<String> actualCheckJobIds = governanceFacade.getJobFacade().getCheck().listCheckJobIds(parentJobId); assertThat(actualCheckJobIds.size(), is(1)); assertThat(actualCheckJobIds.iterator().next(), is(expectCheckJobId)); @@ -95,7 +93,7 @@ class ConsistencyCheckJobAPITest { void assertDropByParentJobId() { MigrationJobConfiguration parentJobConfig = jobConfigSwapper.swapToObject(JobConfigurationBuilder.createYamlMigrationJobConfiguration()); String parentJobId = parentJobConfig.getJobId(); - PipelineGovernanceFacade governanceFacade = PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME)); + PipelineGovernanceFacade governanceFacade = PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()); int expectedSequence = 1; for (int i = 0; i < 3; i++) { String checkJobId = jobAPI.start(new CreateConsistencyCheckJobParameter(parentJobId, null, null, @@ -237,7 +235,7 @@ class ConsistencyCheckJobAPITest { private void persistCheckJobResult(final String parentJobId, final String checkJobId) { Map<String, TableDataConsistencyCheckResult> dataConsistencyCheckResult = Collections.singletonMap("t_order", new TableDataConsistencyCheckResult(true)); - PipelineGovernanceFacade governanceFacade = PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME)); + PipelineGovernanceFacade governanceFacade = PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()); governanceFacade.getJobFacade().getCheck().persistCheckJobResult(parentJobId, checkJobId, dataConsistencyCheckResult); } diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPITest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPITest.java index 83fb82cbe9e..91b60d1e063 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPITest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPITest.java @@ -108,11 +108,9 @@ class MigrationJobAPITest { private static DatabaseType databaseType; - private static final String TEST_DATABASE_NAME = MigrationJobAPITest.class.getSimpleName(); - @BeforeAll static void beforeClass() { - PipelineContextUtils.initPipelineContextManager(TEST_DATABASE_NAME); + PipelineContextUtils.initPipelineContextManager(); jobType = new MigrationJobType(); jobAPI = (MigrationJobAPI) TypedSPILoader.getService(TransmissionJobAPI.class, "MIGRATION"); jobConfigManager = new PipelineJobConfigurationManager(jobType); @@ -125,13 +123,12 @@ class MigrationJobAPITest { props.put("jdbcUrl", jdbcUrl); props.put("username", "root"); props.put("password", "root"); - jobAPI.registerMigrationSourceStorageUnits(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME), Collections.singletonMap("ds_0", - new DataSourcePoolProperties("com.zaxxer.hikari.HikariDataSource", props))); + jobAPI.registerMigrationSourceStorageUnits(PipelineContextUtils.getContextKey(), Collections.singletonMap("ds_0", new DataSourcePoolProperties("com.zaxxer.hikari.HikariDataSource", props))); } @AfterAll static void afterClass() { - jobAPI.dropMigrationSourceResources(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME), Collections.singletonList("ds_0")); + jobAPI.dropMigrationSourceResources(PipelineContextUtils.getContextKey(), Collections.singletonList("ds_0")); } @Test @@ -254,7 +251,7 @@ class MigrationJobAPITest { @Test void assertAddMigrationSourceResources() { PipelineDataSourcePersistService persistService = new PipelineDataSourcePersistService(); - Map<String, DataSourcePoolProperties> actual = persistService.load(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME), "MIGRATION"); + Map<String, DataSourcePoolProperties> actual = persistService.load(PipelineContextUtils.getContextKey(), "MIGRATION"); assertTrue(actual.containsKey("ds_0")); } @@ -262,20 +259,20 @@ class MigrationJobAPITest { void assertCreateJobConfigFailedOnMoreThanOneSourceTable() { Collection<MigrationSourceTargetEntry> sourceTargetEntries = Stream.of("t_order_0", "t_order_1") .map(each -> new MigrationSourceTargetEntry(new DataNode("ds_0", each), "t_order")).collect(Collectors.toList()); - assertThrows(PipelineInvalidParameterException.class, () -> jobAPI.schedule(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME), sourceTargetEntries, "logic_db")); + assertThrows(PipelineInvalidParameterException.class, () -> jobAPI.schedule(PipelineContextUtils.getContextKey(), sourceTargetEntries, "logic_db")); } @Test void assertCreateJobConfigFailedOnDataSourceNotExist() { Collection<MigrationSourceTargetEntry> sourceTargetEntries = Collections.singleton(new MigrationSourceTargetEntry(new DataNode("ds_not_exists", "t_order"), "t_order")); - assertThrows(PipelineInvalidParameterException.class, () -> jobAPI.schedule(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME), sourceTargetEntries, "logic_db")); + assertThrows(PipelineInvalidParameterException.class, () -> jobAPI.schedule(PipelineContextUtils.getContextKey(), sourceTargetEntries, "logic_db")); } @Test void assertCreateJobConfig() throws SQLException { initIntPrimaryEnvironment(); MigrationSourceTargetEntry sourceTargetEntry = new MigrationSourceTargetEntry(new DataNode("ds_0", "t_order"), "t_order"); - String jobId = jobAPI.schedule(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME), Collections.singleton(sourceTargetEntry), "logic_db"); + String jobId = jobAPI.schedule(PipelineContextUtils.getContextKey(), Collections.singleton(sourceTargetEntry), "logic_db"); MigrationJobConfiguration actual = jobConfigManager.getJobConfiguration(jobId); assertThat(actual.getTargetDatabaseName(), is("logic_db")); List<JobDataNodeLine> dataNodeLines = actual.getJobShardingDataNodes(); @@ -290,7 +287,7 @@ class MigrationJobAPITest { } private void initIntPrimaryEnvironment() throws SQLException { - Map<String, DataSourcePoolProperties> metaDataDataSource = new PipelineDataSourcePersistService().load(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME), "MIGRATION"); + Map<String, DataSourcePoolProperties> metaDataDataSource = new PipelineDataSourcePersistService().load(PipelineContextUtils.getContextKey(), "MIGRATION"); DataSourcePoolProperties props = metaDataDataSource.get("ds_0"); try ( PipelineDataSource dataSource = new PipelineDataSource(DataSourcePoolCreator.create(props), databaseType); @@ -303,7 +300,7 @@ class MigrationJobAPITest { @Test void assertShowMigrationSourceResources() { - Collection<Collection<Object>> actual = jobAPI.listMigrationSourceResources(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME)); + Collection<Collection<Object>> actual = jobAPI.listMigrationSourceResources(PipelineContextUtils.getContextKey()); assertThat(actual.size(), is(1)); Collection<Object> objects = actual.iterator().next(); assertThat(objects.toArray()[0], is("ds_0")); @@ -316,8 +313,7 @@ class MigrationJobAPITest { YamlTransmissionJobItemProgress yamlJobItemProgress = new YamlTransmissionJobItemProgress(); yamlJobItemProgress.setStatus(JobStatus.RUNNING.name()); yamlJobItemProgress.setSourceDatabaseType("MySQL"); - PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME)).getJobItemFacade().getProcess().persist(jobConfig.getJobId(), 0, - YamlEngine.marshal(yamlJobItemProgress)); + PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemFacade().getProcess().persist(jobConfig.getJobId(), 0, YamlEngine.marshal(yamlJobItemProgress)); Collection<TransmissionJobItemInfo> jobItemInfos = transmissionJobManager.getJobItemInfos(jobConfig.getJobId()); assertThat(jobItemInfos.size(), is(1)); TransmissionJobItemInfo jobItemInfo = jobItemInfos.iterator().next(); @@ -334,8 +330,7 @@ class MigrationJobAPITest { yamlJobItemProgress.setStatus(JobStatus.EXECUTE_INCREMENTAL_TASK.name()); yamlJobItemProgress.setProcessedRecordsCount(100L); yamlJobItemProgress.setInventoryRecordsCount(50L); - PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME)).getJobItemFacade().getProcess().persist(jobConfig.getJobId(), 0, - YamlEngine.marshal(yamlJobItemProgress)); + PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemFacade().getProcess().persist(jobConfig.getJobId(), 0, YamlEngine.marshal(yamlJobItemProgress)); Collection<TransmissionJobItemInfo> jobItemInfos = transmissionJobManager.getJobItemInfos(jobConfig.getJobId()); TransmissionJobItemInfo jobItemInfo = jobItemInfos.stream().iterator().next(); assertThat(jobItemInfo.getJobItemProgress().getStatus(), is(JobStatus.EXECUTE_INCREMENTAL_TASK)); diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java index 3720397f7e0..04f4574ad55 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java @@ -50,11 +50,9 @@ import static org.junit.jupiter.api.Assertions.assertTrue; class MigrationDataConsistencyCheckerTest { - private static final String TEST_DATABASE_NAME = MigrationDataConsistencyCheckerTest.class.getSimpleName(); - @BeforeAll static void beforeClass() { - PipelineContextUtils.initPipelineContextManager(TEST_DATABASE_NAME); + PipelineContextUtils.initPipelineContextManager(); } @Test @@ -81,7 +79,7 @@ class MigrationDataConsistencyCheckerTest { jobConfigurationPOJO.setJobParameter(YamlEngine.marshal(new YamlMigrationJobConfigurationSwapper().swapToYamlConfiguration(jobConfig))); jobConfigurationPOJO.setJobName(jobConfig.getJobId()); jobConfigurationPOJO.setShardingTotalCount(1); - PipelineGovernanceFacade governanceFacade = PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME)); + PipelineGovernanceFacade governanceFacade = PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()); getClusterPersistRepository().persist(String.format("/pipeline/jobs/%s/config", jobConfig.getJobId()), YamlEngine.marshal(jobConfigurationPOJO)); governanceFacade.getJobItemFacade().getProcess().persist(jobConfig.getJobId(), 0, ""); return new MigrationDataConsistencyChecker(jobConfig, new TransmissionProcessContext(jobConfig.getJobId(), null), @@ -89,7 +87,7 @@ class MigrationDataConsistencyCheckerTest { } private ClusterPersistRepository getClusterPersistRepository() { - return (ClusterPersistRepository) PipelineContextManager.getContext(PipelineContextUtils.getContextKey(TEST_DATABASE_NAME)).getPersistServiceFacade().getRepository(); + return (ClusterPersistRepository) PipelineContextManager.getContext(PipelineContextUtils.getContextKey()).getPersistServiceFacade().getRepository(); } private ConsistencyCheckJobItemProgressContext createConsistencyCheckJobItemProgressContext(final String jobId) { diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java index ded82dca6a9..e58fb2b4122 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java @@ -92,16 +92,16 @@ import static org.mockito.Mockito.mock; */ public final class PipelineContextUtils { + private static final PipelineContextKey CONTEXT_KEY = new PipelineContextKey(InstanceType.PROXY); + private static final PipelineExecuteEngine EXECUTE_ENGINE = PipelineExecuteEngine.newCachedThreadInstance(PipelineContextUtils.class.getSimpleName()); /** * Init pipeline context manager. - * - * @param databaseName database name */ - public static void initPipelineContextManager(final String databaseName) { + public static void initPipelineContextManager() { EmbedTestingServer.start(); - PipelineContextKey contextKey = getContextKey(databaseName); + PipelineContextKey contextKey = getContextKey(); if (null != PipelineContextManager.getContext(contextKey)) { return; } @@ -171,11 +171,10 @@ public final class PipelineContextUtils { /** * Get context key. * - * @param databaseName database name * @return context key */ - public static PipelineContextKey getContextKey(final String databaseName) { - return new PipelineContextKey(databaseName, InstanceType.PROXY); + public static PipelineContextKey getContextKey() { + return CONTEXT_KEY; } /**