This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 76ba8a14e06 Merge DefaultPipelineDataSourceManager and
PipelineDataSourceManager (#29348)
76ba8a14e06 is described below
commit 76ba8a14e06d38152c5c802f16ff347505fdc0b8
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Dec 10 14:58:05 2023 +0800
Merge DefaultPipelineDataSourceManager and PipelineDataSourceManager
(#29348)
---
.../DefaultPipelineDataSourceManager.java | 70 ----------------------
.../core/datasource/PipelineDataSourceManager.java | 44 +++++++++++++-
.../core/job/engine/PipelineJobRunnerManager.java | 3 +-
.../mysql/ingest/MySQLIncrementalDumperTest.java | 11 ++--
.../postgresql/ingest/PostgreSQLWALDumperTest.java | 9 ++-
.../ingest/wal/WALEventConverterTest.java | 21 ++++---
.../data/pipeline/cdc/api/CDCJobAPI.java | 13 ++--
.../MigrationDataConsistencyChecker.java | 21 ++++---
...est.java => PipelineDataSourceManagerTest.java} | 9 ++-
.../data/pipeline/core/task/InventoryTaskTest.java | 13 ++--
.../pipeline/core/util/PipelineContextUtils.java | 24 ++++----
.../MigrationDataConsistencyCheckerTest.java | 9 ++-
12 files changed, 107 insertions(+), 140 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/DefaultPipelineDataSourceManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/DefaultPipelineDataSourceManager.java
deleted file mode 100644
index ddc807d7c9f..00000000000
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/DefaultPipelineDataSourceManager.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.datasource;
-
-import lombok.extern.slf4j.Slf4j;
-import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
-
-import java.sql.SQLException;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * Default pipeline data source manager.
- */
-@Slf4j
-public final class DefaultPipelineDataSourceManager implements
PipelineDataSourceManager {
-
- private final Map<PipelineDataSourceConfiguration,
PipelineDataSourceWrapper> cachedDataSources = new ConcurrentHashMap<>();
-
- @Override
- public PipelineDataSourceWrapper getDataSource(final
PipelineDataSourceConfiguration dataSourceConfig) {
- PipelineDataSourceWrapper result =
cachedDataSources.get(dataSourceConfig);
- if (null != result) {
- return result;
- }
- synchronized (cachedDataSources) {
- result = cachedDataSources.get(dataSourceConfig);
- if (null != result) {
- if (!result.isClosed()) {
- return result;
- } else {
- log.warn("{} is already closed, create again", result);
- }
- }
- result = PipelineDataSourceFactory.newInstance(dataSourceConfig);
- cachedDataSources.put(dataSourceConfig, result);
- return result;
- }
- }
-
- @Override
- public void close() {
- for (PipelineDataSourceWrapper each : cachedDataSources.values()) {
- if (each.isClosed()) {
- continue;
- }
- try {
- each.close();
- } catch (final SQLException ex) {
- log.error("An exception occurred while closing the data
source", ex);
- }
- }
- cachedDataSources.clear();
- }
-}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManager.java
index f7f1359db7b..db31eefe755 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManager.java
@@ -17,12 +17,20 @@
package org.apache.shardingsphere.data.pipeline.core.datasource;
+import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
/**
* Pipeline data source manager.
*/
-public interface PipelineDataSourceManager extends AutoCloseable {
+@Slf4j
+public final class PipelineDataSourceManager implements AutoCloseable {
+
+ private final Map<PipelineDataSourceConfiguration,
PipelineDataSourceWrapper> cachedDataSources = new ConcurrentHashMap<>();
/**
* Get cached data source.
@@ -30,8 +38,38 @@ public interface PipelineDataSourceManager extends
AutoCloseable {
* @param dataSourceConfig data source configuration
* @return data source
*/
- PipelineDataSourceWrapper getDataSource(PipelineDataSourceConfiguration
dataSourceConfig);
+ public PipelineDataSourceWrapper getDataSource(final
PipelineDataSourceConfiguration dataSourceConfig) {
+ PipelineDataSourceWrapper result =
cachedDataSources.get(dataSourceConfig);
+ if (null != result) {
+ return result;
+ }
+ synchronized (cachedDataSources) {
+ result = cachedDataSources.get(dataSourceConfig);
+ if (null != result) {
+ if (!result.isClosed()) {
+ return result;
+ } else {
+ log.warn("{} is already closed, create again", result);
+ }
+ }
+ result = PipelineDataSourceFactory.newInstance(dataSourceConfig);
+ cachedDataSources.put(dataSourceConfig, result);
+ return result;
+ }
+ }
@Override
- void close();
+ public void close() {
+ for (PipelineDataSourceWrapper each : cachedDataSources.values()) {
+ if (each.isClosed()) {
+ continue;
+ }
+ try {
+ each.close();
+ } catch (final SQLException ex) {
+ log.error("An exception occurred while closing the data
source", ex);
+ }
+ }
+ cachedDataSources.clear();
+ }
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/engine/PipelineJobRunnerManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/engine/PipelineJobRunnerManager.java
index 6f51ba89525..f39c70ffb46 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/engine/PipelineJobRunnerManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/engine/PipelineJobRunnerManager.java
@@ -20,7 +20,6 @@ package
org.apache.shardingsphere.data.pipeline.core.job.engine;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.job.engine.cleaner.PipelineJobRunnerCleaner;
import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
@@ -58,7 +57,7 @@ public final class PipelineJobRunnerManager {
private final Map<Integer, PipelineTasksRunner> tasksRunners = new
ConcurrentHashMap<>();
@Getter
- private final PipelineDataSourceManager dataSourceManager = new
DefaultPipelineDataSourceManager();
+ private final PipelineDataSourceManager dataSourceManager = new
PipelineDataSourceManager();
private final PipelineJobRunnerCleaner cleaner;
diff --git
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
index 6b77ba07ed5..cdfb39e1b56 100644
---
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
+++
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
@@ -18,22 +18,21 @@
package org.apache.shardingsphere.data.pipeline.mysql.ingest;
import
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
import
org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
import
org.apache.shardingsphere.data.pipeline.core.ingest.channel.EmptyAckCallback;
import
org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.SimpleMemoryPipelineChannel;
-import
org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveIdentifier;
-import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
-import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
-import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.DumperCommonContext;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
+import
org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveIdentifier;
+import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
+import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.BinlogPosition;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractBinlogEvent;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.DeleteRowsEvent;
@@ -105,7 +104,7 @@ class MySQLIncrementalDumperTest {
private void initTableData(final IncrementalDumperContext dumperContext)
throws SQLException {
try (
- PipelineDataSourceManager dataSourceManager = new
DefaultPipelineDataSourceManager();
+ PipelineDataSourceManager dataSourceManager = new
PipelineDataSourceManager();
PipelineDataSourceWrapper dataSource =
dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig());
Connection connection = dataSource.getConnection();
Statement statement = connection.createStatement()) {
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
index 5d52b2815c6..eff5fbc0d23 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
@@ -18,17 +18,16 @@
package org.apache.shardingsphere.data.pipeline.postgresql.ingest;
import
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import org.apache.shardingsphere.data.pipeline.core.exception.IngestException;
import
org.apache.shardingsphere.data.pipeline.core.ingest.channel.EmptyAckCallback;
import
org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.SimpleMemoryPipelineChannel;
-import
org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveIdentifier;
-import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
-import org.apache.shardingsphere.data.pipeline.core.exception.IngestException;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.DumperCommonContext;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
+import
org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveIdentifier;
+import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.PostgreSQLLogicalReplication;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALPosition;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.PostgreSQLLogSequenceNumber;
@@ -79,7 +78,7 @@ class PostgreSQLWALDumperTest {
private SimpleMemoryPipelineChannel channel;
- private final PipelineDataSourceManager dataSourceManager = new
DefaultPipelineDataSourceManager();
+ private final PipelineDataSourceManager dataSourceManager = new
PipelineDataSourceManager();
@BeforeEach
void setUp() {
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
index 66824896827..b399d2112d2 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
@@ -18,14 +18,9 @@
package org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal;
import
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
import
org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
-import
org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveIdentifier;
-import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
-import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
-import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.DumperCommonContext;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper;
@@ -33,6 +28,10 @@ import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper
import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
import
org.apache.shardingsphere.data.pipeline.core.ingest.record.PlaceholderRecord;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
+import
org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveIdentifier;
+import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
+import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
+import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.PostgreSQLLogSequenceNumber;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractRowEvent;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.BeginTXEvent;
@@ -42,6 +41,7 @@ import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.Place
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.UpdateRowEvent;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.WriteRowEvent;
import
org.apache.shardingsphere.infra.exception.core.external.sql.type.generic.UnsupportedSQLOperationException;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.internal.configuration.plugins.Plugins;
@@ -75,10 +75,12 @@ class WALEventConverterTest {
private PipelineTableMetaData pipelineTableMetaData;
+ private PipelineDataSourceManager dataSourceManager;
+
@BeforeEach
void setUp() throws SQLException {
IncrementalDumperContext dumperContext = mockDumperContext();
- PipelineDataSourceManager dataSourceManager = new
DefaultPipelineDataSourceManager();
+ dataSourceManager = new PipelineDataSourceManager();
walEventConverter = new WALEventConverter(dumperContext, new
StandardPipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig())));
initTableData(dumperContext);
pipelineTableMetaData = new PipelineTableMetaData("t_order",
mockOrderColumnsMetaDataMap(), Collections.emptyList());
@@ -94,7 +96,7 @@ class WALEventConverterTest {
private void initTableData(final IncrementalDumperContext dumperContext)
throws SQLException {
try (
- PipelineDataSourceManager dataSourceManager = new
DefaultPipelineDataSourceManager();
+ PipelineDataSourceManager dataSourceManager = new
PipelineDataSourceManager();
PipelineDataSourceWrapper dataSource =
dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig());
Connection connection = dataSource.getConnection();
Statement statement = connection.createStatement()) {
@@ -116,6 +118,11 @@ class WALEventConverterTest {
return result;
}
+ @AfterEach
+ void clean() {
+ dataSourceManager.close();
+ }
+
@Test
void assertWriteRowEvent() throws ReflectiveOperationException {
DataRecord actual = getDataRecord(createWriteRowEvent());
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
index 0398be69743..7567053f893 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
@@ -33,14 +33,9 @@ import
org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManag
import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeEntry;
import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLine;
import
org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLineConvertUtils;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceConfigurationFactory;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.datasource.yaml.YamlPipelineDataSourceConfigurationSwapper;
-import
org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress;
-import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
-import
org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.PipelineGovernanceFacade;
-import
org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationWithInvalidShardingCountException;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException;
@@ -49,13 +44,17 @@ import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.Dumper
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
-import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
+import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import
org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparerUtils;
+import
org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.PipelineGovernanceFacade;
+import
org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import
org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
@@ -170,7 +169,7 @@ public final class CDCJobAPI implements TransmissionJobAPI {
private void initIncrementalPosition(final CDCJobConfiguration jobConfig) {
String jobId = jobConfig.getJobId();
PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager =
new PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
- try (PipelineDataSourceManager pipelineDataSourceManager = new
DefaultPipelineDataSourceManager()) {
+ try (PipelineDataSourceManager pipelineDataSourceManager = new
PipelineDataSourceManager()) {
for (int i = 0; i < jobConfig.getJobShardingCount(); i++) {
if (jobItemManager.getProgress(jobId, i).isPresent()) {
continue;
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
index 82d06118847..5914737803b 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
@@ -18,15 +18,23 @@
package
org.apache.shardingsphere.data.pipeline.scenario.migration.check.consistency;
import lombok.extern.slf4j.Slf4j;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyCheckerFactory;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryCheckParameter;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryChecker;
import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
import org.apache.shardingsphere.data.pipeline.core.datanode.DataNodeUtils;
import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeEntry;
import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLine;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
+import
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
import
org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveQualifiedTable;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataUtils;
@@ -34,15 +42,6 @@ import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipe
import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
import
org.apache.shardingsphere.data.pipeline.core.spi.algorithm.JobRateLimitAlgorithm;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyCheckerFactory;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryCheckParameter;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryChecker;
-import
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
-import
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
import org.apache.shardingsphere.infra.datanode.DataNode;
@@ -89,7 +88,7 @@ public final class MigrationDataConsistencyChecker implements
PipelineDataConsis
progressContext.onProgressUpdated(new
PipelineJobProgressUpdatedParameter(0));
Map<CaseInsensitiveQualifiedTable, TableDataConsistencyCheckResult>
result = new LinkedHashMap<>();
try (
- PipelineDataSourceManager dataSourceManager = new
DefaultPipelineDataSourceManager();
+ PipelineDataSourceManager dataSourceManager = new
PipelineDataSourceManager();
TableDataConsistencyChecker tableChecker =
TableDataConsistencyCheckerFactory.newInstance(algorithmType, algorithmProps)) {
for (JobDataNodeLine each : jobConfig.getJobShardingDataNodes()) {
checkTableInventoryData(each, tableChecker, result,
dataSourceManager);
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/common/DefaultPipelineDataSourceManagerTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/common/PipelineDataSourceManagerTest.java
similarity index 88%
rename from
test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/common/DefaultPipelineDataSourceManagerTest.java
rename to
test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/common/PipelineDataSourceManagerTest.java
index 7b256a4fcd9..ac4fa1cc98a 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/common/DefaultPipelineDataSourceManagerTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/common/PipelineDataSourceManagerTest.java
@@ -19,7 +19,6 @@ package
org.apache.shardingsphere.test.it.data.pipeline.common;
import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceConfigurationFactory;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
@@ -38,7 +37,7 @@ import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
-class DefaultPipelineDataSourceManagerTest {
+class PipelineDataSourceManagerTest {
private MigrationJobConfiguration jobConfig;
@@ -54,7 +53,7 @@ class DefaultPipelineDataSourceManagerTest {
@Test
void assertGetDataSource() {
- try (PipelineDataSourceManager dataSourceManager = new
DefaultPipelineDataSourceManager()) {
+ try (PipelineDataSourceManager dataSourceManager = new
PipelineDataSourceManager()) {
PipelineDataSourceConfiguration source =
jobConfig.getSources().values().iterator().next();
DataSource actual =
dataSourceManager.getDataSource(PipelineDataSourceConfigurationFactory.newInstance(source.getType(),
source.getParameter()));
assertThat(actual, instanceOf(PipelineDataSourceWrapper.class));
@@ -64,10 +63,10 @@ class DefaultPipelineDataSourceManagerTest {
@Test
void assertClose() throws ReflectiveOperationException {
PipelineDataSourceConfiguration source =
jobConfig.getSources().values().iterator().next();
- try (PipelineDataSourceManager dataSourceManager = new
DefaultPipelineDataSourceManager()) {
+ try (PipelineDataSourceManager dataSourceManager = new
PipelineDataSourceManager()) {
dataSourceManager.getDataSource(PipelineDataSourceConfigurationFactory.newInstance(source.getType(),
source.getParameter()));
dataSourceManager.getDataSource(PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getTarget().getType(),
jobConfig.getTarget().getParameter()));
- Map<?, ?> cachedDataSources = (Map<?, ?>)
Plugins.getMemberAccessor().get(DefaultPipelineDataSourceManager.class.getDeclaredField("cachedDataSources"),
dataSourceManager);
+ Map<?, ?> cachedDataSources = (Map<?, ?>)
Plugins.getMemberAccessor().get(PipelineDataSourceManager.class.getDeclaredField("cachedDataSources"),
dataSourceManager);
assertThat(cachedDataSources.size(), is(2));
dataSourceManager.close();
assertTrue(cachedDataSources.isEmpty());
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java
index e47946375dc..c15047698d4 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java
@@ -17,15 +17,14 @@
package org.apache.shardingsphere.test.it.data.pipeline.core.task;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
+import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
+import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.InventoryDumperContext;
-import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.pk.type.IntegerPrimaryKeyPosition;
-import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
@@ -52,7 +51,7 @@ import static org.mockito.Mockito.mock;
class InventoryTaskTest {
- private static final PipelineDataSourceManager DATA_SOURCE_MANAGER = new
DefaultPipelineDataSourceManager();
+ private static final PipelineDataSourceManager DATA_SOURCE_MANAGER = new
PipelineDataSourceManager();
private MigrationTaskConfiguration taskConfig;
@@ -85,7 +84,7 @@ class InventoryTaskTest {
}
private void initTableData(final IncrementalDumperContext dumperContext)
throws SQLException {
- PipelineDataSourceManager dataSourceManager = new
DefaultPipelineDataSourceManager();
+ PipelineDataSourceManager dataSourceManager = new
PipelineDataSourceManager();
try (
PipelineDataSourceWrapper dataSource =
dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig());
Connection connection = dataSource.getConnection();
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
index d9f7b8018c0..939ff8881a7 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
@@ -20,31 +20,31 @@ package
org.apache.shardingsphere.test.it.data.pipeline.core.util;
import lombok.SneakyThrows;
import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
-import
org.apache.shardingsphere.data.pipeline.core.preparer.CreateTableConfiguration;
-import
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
-import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
-import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfigurationUtils;
-import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.yaml.config.YamlPipelineProcessConfiguration;
-import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.yaml.config.YamlPipelineReadConfiguration;
-import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.yaml.swapper.YamlPipelineProcessConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
import
org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeEntry;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
+import
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfigurationUtils;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.yaml.config.YamlPipelineProcessConfiguration;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.yaml.config.YamlPipelineReadConfiguration;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.yaml.swapper.YamlPipelineProcessConfigurationSwapper;
import
org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveIdentifier;
import
org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveQualifiedTable;
import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
+import
org.apache.shardingsphere.data.pipeline.core.preparer.CreateTableConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.spi.algorithm.JobRateLimitAlgorithm;
import
org.apache.shardingsphere.data.pipeline.core.util.ShardingColumnsExtractor;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.ingest.dumper.MigrationIncrementalDumperContextCreator;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
+import
org.apache.shardingsphere.data.pipeline.scenario.migration.ingest.dumper.MigrationIncrementalDumperContextCreator;
import
org.apache.shardingsphere.driver.api.yaml.YamlShardingSphereDataSourceFactory;
import
org.apache.shardingsphere.driver.jdbc.core.datasource.ShardingSphereDataSource;
import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
@@ -184,7 +184,7 @@ public final class PipelineContextUtils {
TransmissionProcessContext processContext = new
TransmissionProcessContext(jobConfig.getJobId(), processConfig);
int jobShardingItem = 0;
MigrationTaskConfiguration taskConfig =
buildTaskConfiguration(jobConfig, jobShardingItem, processConfig);
- return new MigrationJobItemContext(jobConfig, jobShardingItem, null,
processContext, taskConfig, new DefaultPipelineDataSourceManager());
+ return new MigrationJobItemContext(jobConfig, jobShardingItem, null,
processContext, taskConfig, new PipelineDataSourceManager());
}
private static PipelineProcessConfiguration
mockPipelineProcessConfiguration() {
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
index fd9f1e22031..02d6722a8a1 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
@@ -18,15 +18,14 @@
package
org.apache.shardingsphere.test.it.data.pipeline.scenario.migration.check.consistency;
import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import
org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
-import
org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.PipelineGovernanceFacade;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
+import
org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.PipelineGovernanceFacade;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.check.consistency.MigrationDataConsistencyChecker;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.swapper.YamlMigrationJobConfigurationSwapper;
@@ -89,7 +88,7 @@ class MigrationDataConsistencyCheckerTest {
private void initTableData(final PipelineDataSourceConfiguration
dataSourceConfig) throws SQLException {
try (
- PipelineDataSourceManager dataSourceManager = new
DefaultPipelineDataSourceManager();
+ PipelineDataSourceManager dataSourceManager = new
PipelineDataSourceManager();
PipelineDataSourceWrapper dataSource =
dataSourceManager.getDataSource(dataSourceConfig);
Connection connection = dataSource.getConnection();
Statement statement = connection.createStatement()) {