This is an automated email from the ASF dual-hosted git repository.
zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 414964caa8a Revise ImporterCreator and ImporterCreatorFactoryTest
(#20144)
414964caa8a is described below
commit 414964caa8a43e53a5dc3adb7158d550750ca534
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Sat Aug 13 21:39:58 2022 +0800
Revise ImporterCreator and ImporterCreatorFactoryTest (#20144)
* Revise ImporterCreator and ImporterCreatorFactoryTest
* Fix rat
* Refactor PipelineDataSourceManager as interface, add default impl
* Remove ImporterCreator and factory to api module spi package
---
.../datasource/PipelineDataSourceManager.java} | 21 +++++-
.../pipeline/{spi => api}/importer/Importer.java | 2 +-
.../pipeline/spi}/importer/ImporterCreator.java | 16 ++--
.../spi}/importer/ImporterCreatorFactory.java | 11 +--
....java => DefaultPipelineDataSourceManager.java} | 5 +-
.../pipeline/core/importer/DefaultImporter.java | 4 +-
.../core}/importer/DefaultImporterCreator.java | 21 +++---
.../datasource/AbstractDataSourcePreparer.java | 15 ++--
.../datasource/PrepareTargetSchemasParameter.java | 5 +-
.../datasource/PrepareTargetTablesParameter.java | 2 +-
.../data/pipeline/core/task/IncrementalTask.java | 8 +-
.../data/pipeline/core/task/InventoryTask.java | 6 +-
.../core/util/PipelineJobPreparerUtils.java | 2 +-
.../scenario/rulealtered/RuleAlteredJob.java | 7 +-
.../rulealtered/RuleAlteredJobContext.java | 2 +-
.../rulealtered/RuleAlteredJobPreparer.java | 2 +-
.../rulealtered/prepare/InventoryTaskSplitter.java | 2 +-
...ere.data.pipeline.spi.importer.ImporterCreator} | 2 +-
...phere.scaling.core.job.importer.ImporterCreator | 18 -----
.../datasource/MySQLDataSourcePreparer.java | 2 +-
.../mysql/ingest/MySQLIncrementalDumperTest.java | 7 +-
.../mysql/ingest/MySQLInventoryDumperTest.java | 5 +-
.../datasource/MySQLDataSourcePreparerTest.java | 2 +-
.../ingest/PostgreSQLJdbcDumperTest.java | 5 +-
.../postgresql/ingest/PostgreSQLWalDumperTest.java | 5 +-
.../ingest/wal/WalEventConverterTest.java | 7 +-
.../shardingsphere-pipeline-test/pom.xml | 12 ++-
.../api/impl/GovernanceRepositoryAPIImplTest.java | 9 ++-
.../core/api/impl/RuleAlteredJobAPIImplTest.java | 12 +--
.../consistency/DataConsistencyCheckerTest.java | 6 +-
...a => DefaultPipelineDataSourceManagerTest.java} | 13 +++-
.../pipeline/core/fixture/FixtureImporter.java | 4 +-
.../core/fixture/FixtureImporterCreator.java | 6 +-
.../FixturePipelineDataSourceConfiguration.java | 36 ++++-----
.../core/importer/DefaultImporterTest.java | 2 +-
.../core/importer/ImporterCreatorFactoryTest.java | 71 ++++++++++++++++++
.../pipeline/core/task/IncrementalTaskTest.java | 6 +-
.../data/pipeline/core/task/InventoryTaskTest.java | 12 +--
.../rulealtered/RuleAlteredJobWorkerTest.java | 6 +-
.../prepare/InventoryTaskSplitterTest.java | 7 +-
.../job/importer/ImporterCreatorFactoryTest.java | 87 ----------------------
...ere.data.pipeline.spi.importer.ImporterCreator} | 0
42 files changed, 245 insertions(+), 228 deletions(-)
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/Importer.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/PipelineDataSourceManager.java
similarity index 59%
copy from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/Importer.java
copy to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/PipelineDataSourceManager.java
index 2ecb413685d..41f52958bda 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/Importer.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/PipelineDataSourceManager.java
@@ -15,12 +15,25 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.spi.importer;
+package org.apache.shardingsphere.data.pipeline.api.datasource;
-import org.apache.shardingsphere.data.pipeline.api.executor.LifecycleExecutor;
+import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
/**
- * Importer.
+ * Pipeline data source manager.
*/
-public interface Importer extends LifecycleExecutor {
+public interface PipelineDataSourceManager {
+
+ /**
+ * Get cached data source.
+ *
+ * @param dataSourceConfig data source configuration
+ * @return data source
+ */
+ PipelineDataSourceWrapper getDataSource(PipelineDataSourceConfiguration
dataSourceConfig);
+
+ /**
+ * Close, close cached data source.
+ */
+ void close();
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/Importer.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/importer/Importer.java
similarity index 93%
rename from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/Importer.java
rename to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/importer/Importer.java
index 2ecb413685d..29a1a104998 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/Importer.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/importer/Importer.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.spi.importer;
+package org.apache.shardingsphere.data.pipeline.api.importer;
import org.apache.shardingsphere.data.pipeline.api.executor.LifecycleExecutor;
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/importer/ImporterCreator.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/ImporterCreator.java
similarity index 79%
rename from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/importer/ImporterCreator.java
rename to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/ImporterCreator.java
index 20a96972b3d..8289e767199 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/importer/ImporterCreator.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/ImporterCreator.java
@@ -15,25 +15,29 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.scaling.core.job.importer;
+package org.apache.shardingsphere.data.pipeline.spi.importer;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
+import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
+/**
+ * Importer creator.
+ */
@SingletonSPI
public interface ImporterCreator extends TypedSPI {
/**
* Create importer.
- * @param importerConfig importerConfig
- * @param dataSourceManager dataSourceManager
+ *
+ * @param importerConfig importer configuration
+ * @param dataSourceManager data source manager
* @param channel channel
- * @param jobProgressListener jobProgressListener
+ * @param jobProgressListener job progress listener
* @return importer
*/
Importer createImporter(ImporterConfiguration importerConfig,
PipelineDataSourceManager dataSourceManager, PipelineChannel channel,
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/importer/ImporterCreatorFactory.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/ImporterCreatorFactory.java
similarity index 87%
rename from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/importer/ImporterCreatorFactory.java
rename to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/ImporterCreatorFactory.java
index b10924edc3e..35e85f0d5f7 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/importer/ImporterCreatorFactory.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/ImporterCreatorFactory.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.scaling.core.job.importer;
+package org.apache.shardingsphere.data.pipeline.spi.importer;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
@@ -23,7 +23,7 @@ import
org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPIRegistry;
/**
- * Importer factory.
+ * Importer creator factory.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class ImporterCreatorFactory {
@@ -33,9 +33,10 @@ public final class ImporterCreatorFactory {
}
/**
- * Get ImporterCreator.
- * @param databaseType databaseType
- * @return ImporterCreator
+ * Get importer creator instance.
+ *
+ * @param databaseType database type
+ * @return importer creator
*/
public static ImporterCreator getInstance(final String databaseType) {
return TypedSPIRegistry.getRegisteredService(ImporterCreator.class,
databaseType);
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManager.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/DefaultPipelineDataSourceManager.java
similarity index 91%
rename from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManager.java
rename to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/DefaultPipelineDataSourceManager.java
index 8659174fe4b..66ad7498d3c 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManager.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/DefaultPipelineDataSourceManager.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.data.pipeline.core.datasource;
import lombok.extern.slf4j.Slf4j;
+import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
@@ -26,10 +27,10 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
- * Pipeline data source manager.
+ * Default pipeline data source manager.
*/
@Slf4j
-public final class PipelineDataSourceManager implements AutoCloseable {
+public final class DefaultPipelineDataSourceManager implements
PipelineDataSourceManager {
private final Map<PipelineDataSourceConfiguration,
PipelineDataSourceWrapper> cachedDataSources = new ConcurrentHashMap<>();
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
index 28b87b12eb2..2e069cc5e4b 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
@@ -21,7 +21,9 @@ import lombok.AccessLevel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
+import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
@@ -30,13 +32,11 @@ import
org.apache.shardingsphere.data.pipeline.api.ingest.record.GroupedDataReco
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobExecutionException;
import
org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
import org.apache.shardingsphere.data.pipeline.core.record.RecordUtil;
import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
-import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
import javax.sql.DataSource;
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/importer/DefaultImporterCreator.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporterCreator.java
similarity index 80%
rename from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/importer/DefaultImporterCreator.java
rename to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporterCreator.java
index 2e407671b8c..c8d5b4ce50f 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/importer/DefaultImporterCreator.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporterCreator.java
@@ -15,22 +15,26 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.scaling.core.job.importer;
+package org.apache.shardingsphere.data.pipeline.core.importer;
-import java.util.Collection;
-import java.util.LinkedList;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
+import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.core.importer.DefaultImporter;
-import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
+import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
/**
* Default importer creator.
*/
public final class DefaultImporterCreator implements ImporterCreator {
+ private static final Collection<String> TYPE_ALIASES =
Collections.unmodifiableList(Arrays.asList("PostgreSQL", "openGauss"));
+
@Override
public Importer createImporter(final ImporterConfiguration importerConfig,
final PipelineDataSourceManager
dataSourceManager, final PipelineChannel channel,
@@ -45,9 +49,6 @@ public final class DefaultImporterCreator implements
ImporterCreator {
@Override
public Collection<String> getTypeAliases() {
- Collection<String> aliases = new LinkedList<>();
- aliases.add("PostgreSQL");
- aliases.add("openGauss");
- return aliases;
+ return TYPE_ALIASES;
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
index a5bde33ac74..ed7a73235ea 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
@@ -17,23 +17,24 @@
package org.apache.shardingsphere.data.pipeline.core.prepare.datasource;
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.regex.Pattern;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeEngine;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.regex.Pattern;
+
/**
* Abstract data source preparer.
*/
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetSchemasParameter.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetSchemasParameter.java
index 5c4a4b382bc..df94812d018 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetSchemasParameter.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetSchemasParameter.java
@@ -17,14 +17,15 @@
package org.apache.shardingsphere.data.pipeline.core.prepare.datasource;
-import java.util.List;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
+import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import java.util.List;
+
/**
* Prepare target schemas parameter.
*/
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetTablesParameter.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetTablesParameter.java
index cf399464508..9aaf034ba3a 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetTablesParameter.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetTablesParameter.java
@@ -21,8 +21,8 @@ import lombok.Getter;
import lombok.NonNull;
import
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
+import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
/**
* Prepare target tables parameter.
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
index f21ca5e85d2..df8d0859487 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
@@ -22,23 +22,23 @@ import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
+import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
import
org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobExecutionException;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
-import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
+import
org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreatorFactory;
import
org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.Dumper;
import org.apache.shardingsphere.scaling.core.job.dumper.DumperFactory;
-import
org.apache.shardingsphere.scaling.core.job.importer.ImporterCreatorFactory;
import java.util.Collection;
import java.util.LinkedList;
@@ -49,7 +49,7 @@ import java.util.concurrent.Future;
* Incremental task.
*/
@Slf4j
-@ToString(exclude = {"incrementalDumperExecuteEngine", "channel", "dumper",
"importers", "progress"})
+@ToString(exclude = {"incrementalDumperExecuteEngine", "channel", "dumper",
"importers", "taskProgress"})
public final class IncrementalTask extends AbstractLifecycleExecutor
implements PipelineTask, AutoCloseable {
@Getter
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
index 401841be999..ffc9f328832 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
@@ -22,23 +22,23 @@ import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
+import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
import
org.apache.shardingsphere.data.pipeline.api.task.progress.InventoryTaskProgress;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobExecutionException;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
-import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
+import
org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreatorFactory;
import
org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.Dumper;
import org.apache.shardingsphere.scaling.core.job.dumper.DumperFactory;
-import
org.apache.shardingsphere.scaling.core.job.importer.ImporterCreatorFactory;
import javax.sql.DataSource;
import java.util.List;
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineJobPreparerUtils.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineJobPreparerUtils.java
index de6dcb4b892..71f590b1b32 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineJobPreparerUtils.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineJobPreparerUtils.java
@@ -20,13 +20,13 @@ package org.apache.shardingsphere.data.pipeline.core.util;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.PositionInitializerFactory;
import
org.apache.shardingsphere.data.pipeline.core.prepare.datasource.DataSourcePreparer;
import
org.apache.shardingsphere.data.pipeline.core.prepare.datasource.DataSourcePreparerFactory;
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
index 58768c0ac95..358e0af29b7 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
@@ -19,14 +19,15 @@ package
org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import
org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPIFactory;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
+import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.PipelineJob;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import
org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPIFactory;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineIgnoredException;
import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
@@ -42,7 +43,7 @@ import
org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
@RequiredArgsConstructor
public final class RuleAlteredJob extends AbstractPipelineJob implements
SimpleJob, PipelineJob {
- private final PipelineDataSourceManager dataSourceManager = new
PipelineDataSourceManager();
+ private final PipelineDataSourceManager dataSourceManager = new
DefaultPipelineDataSourceManager();
// Shared by all sharding items
private final RuleAlteredJobPreparer jobPreparer = new
RuleAlteredJobPreparer();
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
index e2b2e4b8772..142497c64d8 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
@@ -25,11 +25,11 @@ import
org.apache.commons.lang3.concurrent.ConcurrentException;
import org.apache.commons.lang3.concurrent.LazyInitializer;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
index 304bc4fcc32..ebf4b7d2b68 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
@@ -21,6 +21,7 @@ import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
@@ -28,7 +29,6 @@ import
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrement
import org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPI;
import
org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineIgnoredException;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
index 23c1a6e3d6a..5740d80df71 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
@@ -25,6 +25,7 @@ import
org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfigu
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
import
org.apache.shardingsphere.data.pipeline.api.context.PipelineProcessContext;
+import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.IntegerPrimaryKeyPosition;
@@ -34,7 +35,6 @@ import
org.apache.shardingsphere.data.pipeline.api.ingest.position.StringPrimary
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreationException;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.scaling.core.job.importer.ImporterCreator
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator
similarity index 91%
rename from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.scaling.core.job.importer.ImporterCreator
rename to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator
index f7c30c08c48..377062ac343 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.scaling.core.job.importer.ImporterCreator
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator
@@ -15,4 +15,4 @@
# limitations under the License.
#
-org.apache.shardingsphere.scaling.core.job.importer.DefaultImporterCreator
+org.apache.shardingsphere.data.pipeline.core.importer.DefaultImporterCreator
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/resources/META-INF/services/org.apache.shardingsphere.scaling.core.job.importer.ImporterCreator
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/resources/META-INF/services/org.apache.shardingsphere.scaling.core.job.importer.ImporterCreator
deleted file mode 100644
index f7c30c08c48..00000000000
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/resources/META-INF/services/org.apache.shardingsphere.scaling.core.job.importer.ImporterCreator
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.shardingsphere.scaling.core.job.importer.DefaultImporterCreator
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
index 5c282fbfe0d..84a3879e5cf 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
@@ -19,8 +19,8 @@ package
org.apache.shardingsphere.data.pipeline.mysql.prepare.datasource;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeEntry;
+import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
import
org.apache.shardingsphere.data.pipeline.core.metadata.generator.PipelineDDLGenerator;
import
org.apache.shardingsphere.data.pipeline.core.prepare.datasource.AbstractDataSourcePreparer;
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
index 8ecb162cc34..09d0e2bc24d 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
@@ -20,13 +20,14 @@ package
org.apache.shardingsphere.data.pipeline.mysql.ingest;
import lombok.SneakyThrows;
import
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
import
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import
org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
import
org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.MultiplexMemoryPipelineChannel;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
@@ -70,7 +71,7 @@ public final class MySQLIncrementalDumperTest {
private MultiplexMemoryPipelineChannel channel;
- private final PipelineDataSourceManager dataSourceManager = new
PipelineDataSourceManager();
+ private final PipelineDataSourceManager dataSourceManager = new
DefaultPipelineDataSourceManager();
@Mock
private PipelineTableMetaData pipelineTableMetaData;
@@ -97,7 +98,7 @@ public final class MySQLIncrementalDumperTest {
@SneakyThrows(SQLException.class)
private void initTableData(final DumperConfiguration dumperConfig) {
- DataSource dataSource = new
PipelineDataSourceManager().getDataSource(dumperConfig.getDataSourceConfig());
+ DataSource dataSource = new
DefaultPipelineDataSourceManager().getDataSource(dumperConfig.getDataSourceConfig());
try (
Connection connection = dataSource.getConnection();
Statement statement = connection.createStatement()) {
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumperTest.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumperTest.java
index a8bffd315fb..61892fd483a 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumperTest.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumperTest.java
@@ -20,9 +20,10 @@ package org.apache.shardingsphere.data.pipeline.mysql.ingest;
import lombok.SneakyThrows;
import
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.SimpleMemoryPipelineChannel;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import org.junit.Before;
@@ -50,7 +51,7 @@ public final class MySQLInventoryDumperTest {
@Before
public void setUp() {
- PipelineDataSourceManager dataSourceManager = new
PipelineDataSourceManager();
+ PipelineDataSourceManager dataSourceManager = new
DefaultPipelineDataSourceManager();
InventoryDumperConfiguration dumperConfig =
mockInventoryDumperConfiguration();
PipelineDataSourceWrapper dataSource =
dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
mysqlJdbcDumper = new
MySQLInventoryDumper(mockInventoryDumperConfiguration(), new
SimpleMemoryPipelineChannel(100), dataSource, new
PipelineTableMetaDataLoader(dataSource));
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
index 98c6d100a99..64995eeecfc 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
@@ -19,11 +19,11 @@ package
org.apache.shardingsphere.data.pipeline.mysql.prepare.datasource;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
+import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
import
org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetTablesParameter;
import org.junit.Before;
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLJdbcDumperTest.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLJdbcDumperTest.java
index ea3f6b9be84..d4388d1473f 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLJdbcDumperTest.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLJdbcDumperTest.java
@@ -20,9 +20,10 @@ package
org.apache.shardingsphere.data.pipeline.postgresql.ingest;
import lombok.SneakyThrows;
import
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.SimpleMemoryPipelineChannel;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import org.junit.Before;
@@ -45,7 +46,7 @@ public final class PostgreSQLJdbcDumperTest {
@Before
public void setUp() {
- PipelineDataSourceManager dataSourceManager = new
PipelineDataSourceManager();
+ PipelineDataSourceManager dataSourceManager = new
DefaultPipelineDataSourceManager();
InventoryDumperConfiguration dumperConfig =
mockInventoryDumperConfiguration();
dataSource =
dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
jdbcDumper = new
PostgreSQLInventoryDumper(mockInventoryDumperConfiguration(), new
SimpleMemoryPipelineChannel(100),
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java
index 8c0912d1874..fef17423483 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java
@@ -19,11 +19,12 @@ package
org.apache.shardingsphere.data.pipeline.postgresql.ingest;
import
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
import
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.MultiplexMemoryPipelineChannel;
import
org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
@@ -72,7 +73,7 @@ public final class PostgreSQLWalDumperTest {
private MultiplexMemoryPipelineChannel channel;
- private final PipelineDataSourceManager dataSourceManager = new
PipelineDataSourceManager();
+ private final PipelineDataSourceManager dataSourceManager = new
DefaultPipelineDataSourceManager();
@Before
public void setUp() {
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverterTest.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverterTest.java
index d0f7b954a82..16681b8b044 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverterTest.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverterTest.java
@@ -20,13 +20,14 @@ package
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal;
import lombok.SneakyThrows;
import
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
import
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import
org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractRowEvent;
@@ -53,7 +54,7 @@ public final class WalEventConverterTest {
private WalEventConverter walEventConverter;
- private final PipelineDataSourceManager dataSourceManager = new
PipelineDataSourceManager();
+ private final PipelineDataSourceManager dataSourceManager = new
DefaultPipelineDataSourceManager();
@Before
public void setUp() {
@@ -77,7 +78,7 @@ public final class WalEventConverterTest {
@SneakyThrows(SQLException.class)
private void initTableData(final DumperConfiguration dumperConfig) {
- DataSource dataSource = new
PipelineDataSourceManager().getDataSource(dumperConfig.getDataSourceConfig());
+ DataSource dataSource = new
DefaultPipelineDataSourceManager().getDataSource(dumperConfig.getDataSourceConfig());
try (
Connection connection = dataSource.getConnection();
Statement statement = connection.createStatement()) {
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/pom.xml
b/shardingsphere-test/shardingsphere-pipeline-test/pom.xml
index 7f0534a81c9..046ef5758df 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/pom.xml
+++ b/shardingsphere-test/shardingsphere-pipeline-test/pom.xml
@@ -30,7 +30,17 @@
<dependencies>
<dependency>
<groupId>org.apache.shardingsphere</groupId>
- <artifactId>shardingsphere-data-pipeline-core</artifactId>
+ <artifactId>shardingsphere-data-pipeline-mysql</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.shardingsphere</groupId>
+ <artifactId>shardingsphere-data-pipeline-postgresql</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.shardingsphere</groupId>
+ <artifactId>shardingsphere-data-pipeline-opengauss</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
index 3f8a59ac51f..41335152a85 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
@@ -26,7 +26,7 @@ import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncreme
import
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import
org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineJobProgressListener;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
@@ -126,7 +126,8 @@ public final class GovernanceRepositoryAPIImplTest {
}
private RuleAlteredJobContext mockJobItemContext() {
- RuleAlteredJobContext result = new
RuleAlteredJobContext(JobConfigurationBuilder.createJobConfiguration(), 0, new
InventoryIncrementalJobItemProgress(), new PipelineDataSourceManager());
+ RuleAlteredJobContext result = new
RuleAlteredJobContext(JobConfigurationBuilder.createJobConfiguration(),
+ 0, new InventoryIncrementalJobItemProgress(), new
DefaultPipelineDataSourceManager());
TaskConfiguration taskConfig = result.getTaskConfig();
result.getInventoryTasks().add(mockInventoryTask(taskConfig));
result.getIncrementalTasks().add(mockIncrementalTask(taskConfig));
@@ -144,7 +145,7 @@ public final class GovernanceRepositoryAPIImplTest {
PipelineDataSourceWrapper dataSource =
mock(PipelineDataSourceWrapper.class);
PipelineTableMetaDataLoader metaDataLoader = new
PipelineTableMetaDataLoader(dataSource);
return new InventoryTask(dumperConfig, taskConfig.getImporterConfig(),
PipelineContextUtil.getPipelineChannelCreator(),
- new PipelineDataSourceManager(), dataSource, metaDataLoader,
PipelineContextUtil.getExecuteEngine(), new
FixturePipelineJobProgressListener());
+ new DefaultPipelineDataSourceManager(), dataSource,
metaDataLoader, PipelineContextUtil.getExecuteEngine(), new
FixturePipelineJobProgressListener());
}
private IncrementalTask mockIncrementalTask(final TaskConfiguration
taskConfig) {
@@ -152,6 +153,6 @@ public final class GovernanceRepositoryAPIImplTest {
dumperConfig.setPosition(new PlaceholderPosition());
PipelineTableMetaDataLoader metaDataLoader = new
PipelineTableMetaDataLoader(mock(PipelineDataSourceWrapper.class));
return new IncrementalTask(3, dumperConfig,
taskConfig.getImporterConfig(), PipelineContextUtil.getPipelineChannelCreator(),
- new PipelineDataSourceManager(), metaDataLoader,
PipelineContextUtil.getExecuteEngine(), new
FixturePipelineJobProgressListener());
+ new DefaultPipelineDataSourceManager(), metaDataLoader,
PipelineContextUtil.getExecuteEngine(), new
FixturePipelineJobProgressListener());
}
}
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
index 9a465a59670..8a0ce93370d 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
@@ -19,8 +19,6 @@ package org.apache.shardingsphere.data.pipeline.core.api.impl;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPI;
-import
org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPIFactory;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyContentCheckResult;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
@@ -32,7 +30,9 @@ import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncreme
import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
import
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPI;
+import
org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPIFactory;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.datasource.creator.PipelineDataSourceCreatorFactory;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineVerifyFailedException;
import
org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
@@ -200,7 +200,7 @@ public final class RuleAlteredJobAPIImplTest {
Optional<String> jobId = ruleAlteredJobAPI.start(jobConfig);
assertTrue(jobId.isPresent());
final GovernanceRepositoryAPI repositoryAPI =
PipelineAPIFactory.getGovernanceRepositoryAPI();
- RuleAlteredJobContext jobItemContext = new
RuleAlteredJobContext(jobConfig, 0, new InventoryIncrementalJobItemProgress(),
new PipelineDataSourceManager());
+ RuleAlteredJobContext jobItemContext = new
RuleAlteredJobContext(jobConfig, 0, new InventoryIncrementalJobItemProgress(),
new DefaultPipelineDataSourceManager());
ruleAlteredJobAPI.persistJobItemProgress(jobItemContext);
repositoryAPI.persistJobCheckResult(jobId.get(), true);
ruleAlteredJobAPI.updateJobItemStatus(jobId.get(), 0,
JobStatus.FINISHED);
@@ -213,7 +213,7 @@ public final class RuleAlteredJobAPIImplTest {
Optional<String> jobId = ruleAlteredJobAPI.start(jobConfig);
assertTrue(jobId.isPresent());
GovernanceRepositoryAPI repositoryAPI =
PipelineAPIFactory.getGovernanceRepositoryAPI();
- RuleAlteredJobContext jobItemContext = new
RuleAlteredJobContext(jobConfig, 0, new InventoryIncrementalJobItemProgress(),
new PipelineDataSourceManager());
+ RuleAlteredJobContext jobItemContext = new
RuleAlteredJobContext(jobConfig, 0, new InventoryIncrementalJobItemProgress(),
new DefaultPipelineDataSourceManager());
ruleAlteredJobAPI.persistJobItemProgress(jobItemContext);
repositoryAPI.persistJobCheckResult(jobId.get(), true);
ruleAlteredJobAPI.updateJobItemStatus(jobId.get(),
jobItemContext.getShardingItem(), JobStatus.EXECUTE_INVENTORY_TASK);
@@ -257,7 +257,7 @@ public final class RuleAlteredJobAPIImplTest {
@Test
public void assertRenewJobStatus() {
final RuleAlteredJobConfiguration jobConfig =
JobConfigurationBuilder.createJobConfiguration();
- RuleAlteredJobContext jobItemContext = new
RuleAlteredJobContext(jobConfig, 0, new InventoryIncrementalJobItemProgress(),
new PipelineDataSourceManager());
+ RuleAlteredJobContext jobItemContext = new
RuleAlteredJobContext(jobConfig, 0, new InventoryIncrementalJobItemProgress(),
new DefaultPipelineDataSourceManager());
ruleAlteredJobAPI.persistJobItemProgress(jobItemContext);
ruleAlteredJobAPI.updateJobItemStatus(jobConfig.getJobId(), 0,
JobStatus.FINISHED);
InventoryIncrementalJobItemProgress actual =
ruleAlteredJobAPI.getJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem());
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java
index fbffa7cb359..743e5ee4f06 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java
@@ -21,7 +21,7 @@ import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsist
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.fixture.DataConsistencyCalculateAlgorithmFixture;
import
org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
@@ -55,7 +55,7 @@ public final class DataConsistencyCheckerTest {
private RuleAlteredJobConfiguration createJobConfiguration() throws
SQLException {
RuleAlteredJobContext jobItemContext = new
RuleAlteredJobContext(JobConfigurationBuilder.createJobConfiguration(), 0,
- new InventoryIncrementalJobItemProgress(), new
PipelineDataSourceManager());
+ new InventoryIncrementalJobItemProgress(), new
DefaultPipelineDataSourceManager());
initTableData(jobItemContext.getTaskConfig().getDumperConfig().getDataSourceConfig());
initTableData(jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig());
return jobItemContext.getJobConfig();
@@ -63,7 +63,7 @@ public final class DataConsistencyCheckerTest {
private void initTableData(final PipelineDataSourceConfiguration
dataSourceConfig) throws SQLException {
try (
- Connection connection = new
PipelineDataSourceManager().getDataSource(dataSourceConfig).getConnection();
+ Connection connection = new
DefaultPipelineDataSourceManager().getDataSource(dataSourceConfig).getConnection();
Statement statement = connection.createStatement()) {
statement.execute("DROP TABLE IF EXISTS t_order");
statement.execute("CREATE TABLE t_order (order_id INT PRIMARY KEY,
user_id VARCHAR(12))");
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManagerTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/DefaultPipelineDataSourceManagerTest.java
similarity index 86%
rename from
shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManagerTest.java
rename to
shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/DefaultPipelineDataSourceManagerTest.java
index 7cf6418e7df..d7b80a38fac 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManagerTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/DefaultPipelineDataSourceManagerTest.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.data.pipeline.core.datasource;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
import
org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
@@ -36,7 +37,7 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
-public final class PipelineDataSourceManagerTest {
+public final class DefaultPipelineDataSourceManagerTest {
private RuleAlteredJobConfiguration jobConfig;
@@ -52,7 +53,7 @@ public final class PipelineDataSourceManagerTest {
@Test
public void assertGetDataSource() {
- PipelineDataSourceManager dataSourceManager = new
PipelineDataSourceManager();
+ PipelineDataSourceManager dataSourceManager = new
DefaultPipelineDataSourceManager();
DataSource actual = dataSourceManager.getDataSource(
PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getSource().getType(),
jobConfig.getSource().getParameter()));
assertThat(actual, instanceOf(PipelineDataSourceWrapper.class));
@@ -60,7 +61,9 @@ public final class PipelineDataSourceManagerTest {
@Test
public void assertClose() throws NoSuchFieldException,
IllegalAccessException {
- try (PipelineDataSourceManager dataSourceManager = new
PipelineDataSourceManager()) {
+ PipelineDataSourceManager dataSourceManager = null;
+ try {
+ dataSourceManager = new DefaultPipelineDataSourceManager();
dataSourceManager.getDataSource(
PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getSource().getType(),
jobConfig.getSource().getParameter()));
dataSourceManager.getDataSource(
@@ -70,6 +73,10 @@ public final class PipelineDataSourceManagerTest {
assertThat(cachedDataSources.size(), is(2));
dataSourceManager.close();
assertTrue(cachedDataSources.isEmpty());
+ } finally {
+ if (null != dataSourceManager) {
+ dataSourceManager.close();
+ }
}
}
}
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java
index fdbd30b03dc..ec3d2ef7220 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java
@@ -18,10 +18,10 @@
package org.apache.shardingsphere.data.pipeline.core.fixture;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
+import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
public final class FixtureImporter implements Importer {
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporterCreator.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporterCreator.java
index 36484d23195..9d0ae14fdfb 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporterCreator.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporterCreator.java
@@ -18,11 +18,11 @@
package org.apache.shardingsphere.data.pipeline.core.fixture;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
+import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
-import org.apache.shardingsphere.scaling.core.job.importer.ImporterCreator;
+import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator;
/**
* Fixture importer creator.
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetSchemasParameter.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePipelineDataSourceConfiguration.java
similarity index 58%
copy from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetSchemasParameter.java
copy to
shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePipelineDataSourceConfiguration.java
index 5c4a4b382bc..1d03f7f84ca 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetSchemasParameter.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePipelineDataSourceConfiguration.java
@@ -15,32 +15,34 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.prepare.datasource;
+package org.apache.shardingsphere.data.pipeline.core.fixture;
-import java.util.List;
-import lombok.Getter;
import lombok.RequiredArgsConstructor;
-import
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
-/**
- * Prepare target schemas parameter.
- */
@RequiredArgsConstructor
-@Getter
-public final class PrepareTargetSchemasParameter {
-
- private final List<String> logicTableNames;
+public final class FixturePipelineDataSourceConfiguration implements
PipelineDataSourceConfiguration {
- private final DatabaseType targetDatabaseType;
+ private final DatabaseType databaseType;
- private final String databaseName;
+ @Override
+ public String getParameter() {
+ return null;
+ }
- private final PipelineDataSourceConfiguration dataSourceConfig;
+ @Override
+ public Object getDataSourceConfiguration() {
+ return null;
+ }
- private final PipelineDataSourceManager dataSourceManager;
+ @Override
+ public DatabaseType getDatabaseType() {
+ return databaseType;
+ }
- private final TableNameSchemaNameMapping tableNameSchemaNameMapping;
+ @Override
+ public String getType() {
+ return "FIXTURE";
+ }
}
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporterTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporterTest.java
index 49ab326e7a9..e1f29a5f5b3 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporterTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporterTest.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.data.pipeline.core.importer;
import
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
@@ -29,7 +30,6 @@ import
org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import
org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineJobProgressListener;
import org.apache.shardingsphere.data.pipeline.core.record.RecordUtil;
import org.junit.Before;
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterCreatorFactoryTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterCreatorFactoryTest.java
new file mode 100644
index 00000000000..9b09961c71c
--- /dev/null
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterCreatorFactoryTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.importer;
+
+import
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
+import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
+import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
+import org.apache.shardingsphere.data.pipeline.core.fixture.FixtureImporter;
+import
org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineDataSourceConfiguration;
+import
org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineJobProgressListener;
+import
org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreatorFactory;
+import org.apache.shardingsphere.infra.database.type.DatabaseTypeFactory;
+import org.junit.Test;
+import org.mockito.Mock;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertThat;
+
+public final class ImporterCreatorFactoryTest {
+
+ @Mock
+ private PipelineDataSourceManager dataSourceManager;
+
+ @Mock
+ private PipelineChannel channel;
+
+ @Test
+ public void assertCreateImporter() {
+ for (String each : Arrays.asList("MySQL", "PostgreSQL", "openGauss")) {
+ Importer actual =
ImporterCreatorFactory.getInstance(each).createImporter(createImporterConfiguration(each),
dataSourceManager, channel, new FixturePipelineJobProgressListener());
+ assertThat(actual, instanceOf(DefaultImporter.class));
+ }
+ }
+
+ @Test
+ public void assertCreateImporterForH2() {
+ Importer actual =
ImporterCreatorFactory.getInstance("H2").createImporter(createImporterConfiguration("H2"),
dataSourceManager, channel, new FixturePipelineJobProgressListener());
+ assertThat(actual, instanceOf(FixtureImporter.class));
+ }
+
+ private ImporterConfiguration createImporterConfiguration(final String
databaseType) {
+ Map<LogicTableName, Set<String>> shardingColumnsMap =
Collections.singletonMap(new LogicTableName("t_order"), new
HashSet<>(Arrays.asList("order_id", "user_id", "status")));
+ PipelineDataSourceConfiguration dataSourceConfig = new
FixturePipelineDataSourceConfiguration(DatabaseTypeFactory.getInstance(databaseType));
+ return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap,
new TableNameSchemaNameMapping(Collections.emptyMap()), 1000, 3, 3);
+ }
+}
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
index 6eac12dddb6..bbbd8b323e7 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
@@ -21,7 +21,7 @@ import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfig
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineJobProgressListener;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
@@ -49,12 +49,12 @@ public final class IncrementalTaskTest {
@Before
public void setUp() {
TaskConfiguration taskConfig = new
RuleAlteredJobContext(JobConfigurationBuilder.createJobConfiguration(), 0, new
InventoryIncrementalJobItemProgress(),
- new PipelineDataSourceManager()).getTaskConfig();
+ new DefaultPipelineDataSourceManager()).getTaskConfig();
taskConfig.getDumperConfig().setPosition(new PlaceholderPosition());
PipelineTableMetaDataLoader metaDataLoader = new
PipelineTableMetaDataLoader(mock(PipelineDataSourceWrapper.class));
incrementalTask = new IncrementalTask(3, taskConfig.getDumperConfig(),
taskConfig.getImporterConfig(),
PipelineContextUtil.getPipelineChannelCreator(),
- new PipelineDataSourceManager(), metaDataLoader,
PipelineContextUtil.getExecuteEngine(), new
FixturePipelineJobProgressListener());
+ new DefaultPipelineDataSourceManager(), metaDataLoader,
PipelineContextUtil.getExecuteEngine(), new
FixturePipelineJobProgressListener());
}
@Test
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
index 0a0b42f39b1..fa507f89363 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
@@ -20,10 +20,11 @@ package org.apache.shardingsphere.data.pipeline.core.task;
import
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.IntegerPrimaryKeyPosition;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineJobProgressListener;
import
org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
@@ -45,7 +46,7 @@ import static org.junit.Assert.assertThat;
public final class InventoryTaskTest {
- private static final PipelineDataSourceManager DATA_SOURCE_MANAGER = new
PipelineDataSourceManager();
+ private static final PipelineDataSourceManager DATA_SOURCE_MANAGER = new
DefaultPipelineDataSourceManager();
private TaskConfiguration taskConfig;
@@ -61,7 +62,7 @@ public final class InventoryTaskTest {
@Before
public void setUp() {
- taskConfig = new
RuleAlteredJobContext(JobConfigurationBuilder.createJobConfiguration(), 0, new
InventoryIncrementalJobItemProgress(), new
PipelineDataSourceManager()).getTaskConfig();
+ taskConfig = new
RuleAlteredJobContext(JobConfigurationBuilder.createJobConfiguration(), 0, new
InventoryIncrementalJobItemProgress(), new
DefaultPipelineDataSourceManager()).getTaskConfig();
}
@Test(expected = IngestException.class)
@@ -87,15 +88,15 @@ public final class InventoryTaskTest {
try (
InventoryTask inventoryTask = new
InventoryTask(inventoryDumperConfig, taskConfig.getImporterConfig(),
PipelineContextUtil.getPipelineChannelCreator(),
- new PipelineDataSourceManager(), dataSource,
metaDataLoader, PipelineContextUtil.getExecuteEngine(), new
FixturePipelineJobProgressListener())) {
+ new DefaultPipelineDataSourceManager(), dataSource,
metaDataLoader, PipelineContextUtil.getExecuteEngine(), new
FixturePipelineJobProgressListener())) {
inventoryTask.start();
assertThat(inventoryTask.getTaskProgress().getPosition(),
instanceOf(IntegerPrimaryKeyPosition.class));
}
}
private void initTableData(final DumperConfiguration dumperConfig) throws
SQLException {
+ PipelineDataSourceManager dataSourceManager = new
DefaultPipelineDataSourceManager();
try (
- PipelineDataSourceManager dataSourceManager = new
PipelineDataSourceManager();
PipelineDataSourceWrapper dataSource =
dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
Connection connection = dataSource.getConnection();
Statement statement = connection.createStatement()) {
@@ -103,6 +104,7 @@ public final class InventoryTaskTest {
statement.execute("CREATE TABLE t_order (order_id INT PRIMARY KEY,
user_id VARCHAR(12))");
statement.execute("INSERT INTO t_order (order_id, user_id) VALUES
(1, 'xxx'), (999, 'yyy')");
}
+ dataSourceManager.close();
}
private InventoryDumperConfiguration
createInventoryDumperConfiguration(final String logicTableName, final String
actualTableName) {
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
index d94acec2c48..a05b8849877 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
@@ -18,7 +18,6 @@
package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
import org.apache.commons.io.FileUtils;
-import
org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPIFactory;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.YamlRuleAlteredJobConfiguration;
@@ -27,7 +26,8 @@ import
org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import
org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPIFactory;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreationException;
import
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.core.util.ConfigurationFileUtil;
@@ -98,7 +98,7 @@ public final class RuleAlteredJobWorkerTest {
// @Test
public void assertHasUncompletedJob() throws InvocationTargetException,
NoSuchMethodException, IllegalAccessException, IOException {
final RuleAlteredJobConfiguration jobConfig =
JobConfigurationBuilder.createJobConfiguration();
- RuleAlteredJobContext jobItemContext = new
RuleAlteredJobContext(jobConfig, 0, new InventoryIncrementalJobItemProgress(),
new PipelineDataSourceManager());
+ RuleAlteredJobContext jobItemContext = new
RuleAlteredJobContext(jobConfig, 0, new InventoryIncrementalJobItemProgress(),
new DefaultPipelineDataSourceManager());
jobItemContext.setStatus(JobStatus.PREPARING);
GovernanceRepositoryAPI repositoryAPI =
PipelineAPIFactory.getGovernanceRepositoryAPI();
RuleAlteredJobAPIFactory.getInstance().persistJobItemProgress(jobItemContext);
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java
index c7606c53531..449e484a0ac 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java
@@ -17,13 +17,14 @@
package org.apache.shardingsphere.data.pipeline.scenario.rulealtered.prepare;
-import
org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPIFactory;
import
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.IntegerPrimaryKeyPosition;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import
org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPIFactory;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreationException;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
import
org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
@@ -68,7 +69,7 @@ public final class InventoryTaskSplitterTest {
private void initJobItemContext() {
RuleAlteredJobConfiguration jobConfig =
JobConfigurationBuilder.createJobConfiguration();
InventoryIncrementalJobItemProgress initProgress =
RuleAlteredJobAPIFactory.getInstance().getJobItemProgress(jobConfig.getJobId(),
0);
- jobItemContext = new RuleAlteredJobContext(jobConfig, 0, initProgress,
new PipelineDataSourceManager());
+ jobItemContext = new RuleAlteredJobContext(jobConfig, 0, initProgress,
new DefaultPipelineDataSourceManager());
dataSourceManager = jobItemContext.getDataSourceManager();
taskConfig = jobItemContext.getTaskConfig();
}
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/scaling/cor/job/importer/ImporterCreatorFactoryTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/scaling/cor/job/importer/ImporterCreatorFactoryTest.java
deleted file mode 100644
index 2f54c27594d..00000000000
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/scaling/cor/job/importer/ImporterCreatorFactoryTest.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.scaling.cor.job.importer;
-
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.junit.Assert.assertThat;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Set;
-import
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
-import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
-import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
-import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
-import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.core.fixture.FixtureImporter;
-import
org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineJobProgressListener;
-import org.apache.shardingsphere.data.pipeline.core.importer.DefaultImporter;
-import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
-import
org.apache.shardingsphere.scaling.core.job.importer.ImporterCreatorFactory;
-import org.junit.Test;
-import org.mockito.Mock;
-
-public final class ImporterCreatorFactoryTest {
-
- @Mock
- private PipelineDataSourceManager dataSourceManager;
-
- @Mock
- private PipelineChannel channel;
-
- private final PipelineDataSourceConfiguration dataSourceConfig = new
StandardPipelineDataSourceConfiguration(
-
"jdbc:h2:mem:test_db;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL;USER=root;PASSWORD=root",
"root", "root");
-
- @Test
- public void assertCreateImporterForMysql() {
- Importer importer = ImporterCreatorFactory.getInstance("MySQL")
- .createImporter(mockImporterConfiguration(),
dataSourceManager, channel,
- new FixturePipelineJobProgressListener());
- assertThat(importer, instanceOf(DefaultImporter.class));
- }
-
- @Test
- public void assertCreateImporterForPostgreSQL() {
- Importer importer = ImporterCreatorFactory.getInstance("PostgreSQL")
- .createImporter(mockImporterConfiguration(),
dataSourceManager, channel,
- new FixturePipelineJobProgressListener());
- assertThat(importer, instanceOf(DefaultImporter.class));
- }
-
- @Test
- public void assertCreateImporterForOpenGauss() {
- Importer importer = ImporterCreatorFactory.getInstance("openGauss")
- .createImporter(mockImporterConfiguration(),
dataSourceManager, channel,
- new FixturePipelineJobProgressListener());
- assertThat(importer, instanceOf(DefaultImporter.class));
- }
-
- @Test
- public void assertCreateImporterForH2() {
- Importer importer = ImporterCreatorFactory.getInstance("H2")
- .createImporter(mockImporterConfiguration(),
dataSourceManager, channel,
- new FixturePipelineJobProgressListener());
- assertThat(importer, instanceOf(FixtureImporter.class));
- }
-
- private ImporterConfiguration mockImporterConfiguration() {
- Map<LogicTableName, Set<String>> shardingColumnsMap =
Collections.singletonMap(new LogicTableName("test_table"),
Collections.singleton("user"));
- return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap,
new TableNameSchemaNameMapping(Collections.emptyMap()), 1000, 3, 3);
- }
-}
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/META-INF/services/org.apache.shardingsphere.scaling.core.job.importer.ImporterCreator
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator
similarity index 100%
rename from
shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/META-INF/services/org.apache.shardingsphere.scaling.core.job.importer.ImporterCreator
rename to
shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator