This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 5b22c0db386 Remove AbstractImporter sub-classes and simplify
ImporterFactory (#19949)
5b22c0db386 is described below
commit 5b22c0db386d89ade4f4607f8d73a14e0fc9f036
Author: Da Xiang Huang <[email protected]>
AuthorDate: Wed Aug 10 11:36:40 2022 +0800
Remove AbstractImporter sub-classes and simplify ImporterFactory (#19949)
---
...{AbstractImporter.java => DefaultImporter.java} | 8 +-
.../data/pipeline/core/task/IncrementalTask.java | 5 +-
.../data/pipeline/core/task/InventoryTask.java | 4 +-
.../core/job/importer/DefaultImporterCreator.java} | 32 ++++++--
.../core/job/importer/ImporterCreator.java} | 26 ++++---
.../importer/ImporterCreatorFactory.java} | 43 +++++------
.../scaling/core/job/importer/ImporterFactory.java | 57 --------------
.../scaling/core/spi/ScalingEntry.java | 8 --
...phere.scaling.core.job.importer.ImporterCreator | 18 +++++
.../core/spi/fixture/ScalingEntryFixture.java | 8 +-
...phere.scaling.core.job.importer.ImporterCreator | 18 +++++
.../data/pipeline/mysql/MySQLScalingEntry.java | 8 +-
.../data/pipeline/mysql/MySQLScalingEntryTest.java | 2 -
.../pipeline/opengauss/OpenGaussScalingEntry.java | 8 +-
.../opengauss/OpenGaussScalingEntryTest.java | 2 -
.../postgresql/PostgreSQLScalingEntry.java | 8 +-
.../postgresql/PostgreSQLScalingEntryTest.java | 2 -
.../core/fixture/FixtureImporterCreator.java | 22 ++++--
.../core/fixture/H2ScalingEntryFixture.java | 6 --
...tImporterTest.java => DefaultImporterTest.java} | 7 +-
.../job/importer/ImporterCreatorFactoryTest.java | 87 ++++++++++++++++++++++
...phere.scaling.core.job.importer.ImporterCreator | 18 +++++
22 files changed, 230 insertions(+), 167 deletions(-)
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporter.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
similarity index 96%
rename from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporter.java
rename to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
index dbfb221d2eb..28b87b12eb2 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporter.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
@@ -48,10 +48,10 @@ import java.util.Set;
import java.util.stream.Collectors;
/**
- * Abstract importer.
+ * Default importer.
*/
@Slf4j
-public abstract class AbstractImporter extends AbstractLifecycleExecutor
implements Importer {
+public final class DefaultImporter extends AbstractLifecycleExecutor
implements Importer {
private static final DataRecordMerger MERGER = new DataRecordMerger();
@@ -66,8 +66,8 @@ public abstract class AbstractImporter extends
AbstractLifecycleExecutor impleme
private final PipelineJobProgressListener jobProgressListener;
- protected AbstractImporter(final ImporterConfiguration importerConfig,
final PipelineDataSourceManager dataSourceManager, final PipelineChannel
channel,
- final PipelineJobProgressListener
jobProgressListener) {
+ public DefaultImporter(final ImporterConfiguration importerConfig, final
PipelineDataSourceManager dataSourceManager, final PipelineChannel channel,
+ final PipelineJobProgressListener
jobProgressListener) {
this.importerConfig = importerConfig;
this.dataSourceManager = dataSourceManager;
this.channel = channel;
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
index abafe1c9600..ce61d09e89c 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
@@ -38,7 +38,7 @@ import
org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
import
org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.Dumper;
import org.apache.shardingsphere.scaling.core.job.dumper.DumperFactory;
-import org.apache.shardingsphere.scaling.core.job.importer.ImporterFactory;
+import
org.apache.shardingsphere.scaling.core.job.importer.ImporterCreatorFactory;
import java.util.Collection;
import java.util.LinkedList;
@@ -97,7 +97,8 @@ public final class IncrementalTask extends
AbstractLifecycleExecutor implements
final
PipelineJobProgressListener jobProgressListener) {
Collection<Importer> result = new LinkedList<>();
for (int i = 0; i < concurrency; i++) {
- result.add(ImporterFactory.createImporter(importerConfig,
dataSourceManager, channel, jobProgressListener));
+
result.add(ImporterCreatorFactory.getInstance(importerConfig.getDataSourceConfig().getDatabaseType().getType()).createImporter(importerConfig,
dataSourceManager, channel,
+ jobProgressListener));
}
return result;
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
index 85a1f9cdfde..1671ba01bb8 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
@@ -38,7 +38,7 @@ import
org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
import
org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.Dumper;
import org.apache.shardingsphere.scaling.core.job.dumper.DumperFactory;
-import org.apache.shardingsphere.scaling.core.job.importer.ImporterFactory;
+import
org.apache.shardingsphere.scaling.core.job.importer.ImporterCreatorFactory;
import javax.sql.DataSource;
import java.util.List;
@@ -73,7 +73,7 @@ public final class InventoryTask extends
AbstractLifecycleExecutor implements Pi
taskId = generateTaskId(inventoryDumperConfig);
channel = createChannel(pipelineChannelCreator);
dumper = DumperFactory.createInventoryDumper(inventoryDumperConfig,
channel, sourceDataSource, sourceMetaDataLoader);
- importer = ImporterFactory.createImporter(importerConfig,
dataSourceManager, channel, jobProgressListener);
+ importer =
ImporterCreatorFactory.getInstance(importerConfig.getDataSourceConfig().getDatabaseType().getType()).createImporter(importerConfig,
dataSourceManager, channel, jobProgressListener);
position = inventoryDumperConfig.getPosition();
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/importer/MySQLImporter.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/importer/DefaultImporterCreator.java
similarity index 52%
rename from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/importer/MySQLImporter.java
rename to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/importer/DefaultImporterCreator.java
index 773786ab0e9..0aaed77b0f5 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/importer/MySQLImporter.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/importer/DefaultImporterCreator.java
@@ -15,21 +15,39 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.mysql.importer;
+package org.apache.shardingsphere.scaling.core.job.importer;
+import java.util.Collection;
+import java.util.LinkedList;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.core.importer.AbstractImporter;
+import org.apache.shardingsphere.data.pipeline.core.importer.DefaultImporter;
+import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
/**
- * MySQL importer.
+ * Default importer creator.
*/
-public final class MySQLImporter extends AbstractImporter {
+public final class DefaultImporterCreator implements ImporterCreator {
- public MySQLImporter(final ImporterConfiguration importerConfig, final
PipelineDataSourceManager dataSourceManager, final PipelineChannel channel,
- final PipelineJobProgressListener
jobProgressListener) {
- super(importerConfig, dataSourceManager, channel, jobProgressListener);
+ @Override
+ public Importer createImporter(final ImporterConfiguration importerConfig,
+ final PipelineDataSourceManager
dataSourceManager, final PipelineChannel channel,
+ final PipelineJobProgressListener
jobProgressListener) {
+ return new DefaultImporter(importerConfig, dataSourceManager, channel,
jobProgressListener);
+ }
+
+ @Override
+ public String getType() {
+ return "MySQL";
+ }
+
+ @Override
+ public Collection<String> getTypeAliases() {
+ Collection<String> aliases = new LinkedList<>();
+ aliases.add("PostgreSQL");
+ aliases.add("openGauss");
+ return aliases;
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/importer/OpenGaussImporter.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/importer/ImporterCreator.java
similarity index 59%
rename from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/importer/OpenGaussImporter.java
rename to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/importer/ImporterCreator.java
index 3fb304ee247..20a96972b3d 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/importer/OpenGaussImporter.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/importer/ImporterCreator.java
@@ -15,21 +15,27 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.opengauss.importer;
+package org.apache.shardingsphere.scaling.core.job.importer;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.core.importer.AbstractImporter;
+import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
+import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
+import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
-/**
- * Importer of openGauss.
- */
-public final class OpenGaussImporter extends AbstractImporter {
+@SingletonSPI
+public interface ImporterCreator extends TypedSPI {
- public OpenGaussImporter(final ImporterConfiguration importerConfig, final
PipelineDataSourceManager dataSourceManager, final PipelineChannel channel,
- final PipelineJobProgressListener
jobProgressListener) {
- super(importerConfig, dataSourceManager, channel, jobProgressListener);
- }
+ /**
+ * Create importer.
+ * @param importerConfig importerConfig
+ * @param dataSourceManager dataSourceManager
+ * @param channel channel
+ * @param jobProgressListener jobProgressListener
+ * @return importer
+ */
+ Importer createImporter(ImporterConfiguration importerConfig,
PipelineDataSourceManager dataSourceManager, PipelineChannel channel,
+ PipelineJobProgressListener jobProgressListener);
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/importer/ImporterCreatorFactory.java
similarity index 50%
copy from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java
copy to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/importer/ImporterCreatorFactory.java
index 8f327d99ccf..b10924edc3e 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/importer/ImporterCreatorFactory.java
@@ -15,38 +15,29 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.scaling.core.spi;
+package org.apache.shardingsphere.scaling.core.job.importer;
-import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
-import
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumper;
-import
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumper;
-import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
-import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
+import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPIRegistry;
/**
- * Scaling entry.
+ * Importer factory.
*/
-@SingletonSPI
-public interface ScalingEntry extends TypedSPI {
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class ImporterCreatorFactory {
- /**
- * Get inventory dumper type.
- *
- * @return inventory dumper type
- */
- Class<? extends InventoryDumper> getInventoryDumperClass();
-
- /**
- * Get incremental dumper type.
- *
- * @return incremental dumper type
- */
- Class<? extends IncrementalDumper> getIncrementalDumperClass();
+ static {
+ ShardingSphereServiceLoader.register(ImporterCreator.class);
+ }
/**
- * Get importer type.
- *
- * @return importer type
+ * Get ImporterCreator.
+ * @param databaseType databaseType
+ * @return ImporterCreator
*/
- Class<? extends Importer> getImporterClass();
+ public static ImporterCreator getInstance(final String databaseType) {
+ return TypedSPIRegistry.getRegisteredService(ImporterCreator.class,
databaseType);
+ }
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/importer/ImporterFactory.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/importer/ImporterFactory.java
deleted file mode 100644
index fbe1bfeb305..00000000000
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/importer/ImporterFactory.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.scaling.core.job.importer;
-
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-import lombok.SneakyThrows;
-import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
-import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
-import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
-import org.apache.shardingsphere.scaling.core.spi.ScalingEntryFactory;
-
-import java.lang.reflect.Constructor;
-
-/**
- * Importer factory.
- */
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class ImporterFactory {
-
- /**
- * Create importer.
- *
- * @param importerConfig importer configuration
- * @param dataSourceManager data source manager
- * @param channel channel
- * @param jobProgressListener job progress listener
- * @return importer
- */
- @SneakyThrows(ReflectiveOperationException.class)
- public static Importer createImporter(final ImporterConfiguration
importerConfig, final PipelineDataSourceManager dataSourceManager, final
PipelineChannel channel,
- final PipelineJobProgressListener
jobProgressListener) {
- String databaseType =
importerConfig.getDataSourceConfig().getDatabaseType().getType();
- ScalingEntry scalingEntry =
ScalingEntryFactory.getInstance(databaseType);
- Constructor<? extends Importer> constructor =
scalingEntry.getImporterClass().getConstructor(ImporterConfiguration.class,
PipelineDataSourceManager.class, PipelineChannel.class,
- PipelineJobProgressListener.class);
- return constructor.newInstance(importerConfig, dataSourceManager,
channel, jobProgressListener);
- }
-}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java
index 8f327d99ccf..5c1dfd02e6a 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.scaling.core.spi;
-import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
import
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumper;
import
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumper;
import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
@@ -42,11 +41,4 @@ public interface ScalingEntry extends TypedSPI {
* @return incremental dumper type
*/
Class<? extends IncrementalDumper> getIncrementalDumperClass();
-
- /**
- * Get importer type.
- *
- * @return importer type
- */
- Class<? extends Importer> getImporterClass();
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.scaling.core.job.importer.ImporterCreator
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.scaling.core.job.importer.ImporterCreator
new file mode 100644
index 00000000000..f7c30c08c48
--- /dev/null
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.scaling.core.job.importer.ImporterCreator
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.scaling.core.job.importer.DefaultImporterCreator
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/scaling/core/spi/fixture/ScalingEntryFixture.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/scaling/core/spi/fixture/ScalingEntryFixture.java
index 0e9792ae259..800563bb8d8 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/scaling/core/spi/fixture/ScalingEntryFixture.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/scaling/core/spi/fixture/ScalingEntryFixture.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.scaling.core.spi.fixture;
-import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
import
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumper;
import
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumper;
import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
@@ -33,12 +32,7 @@ public final class ScalingEntryFixture implements
ScalingEntry {
public Class<? extends IncrementalDumper> getIncrementalDumperClass() {
return IncrementalDumper.class;
}
-
- @Override
- public Class<? extends Importer> getImporterClass() {
- return Importer.class;
- }
-
+
@Override
public String getType() {
return "FIXTURE";
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/resources/META-INF/services/org.apache.shardingsphere.scaling.core.job.importer.ImporterCreator
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/resources/META-INF/services/org.apache.shardingsphere.scaling.core.job.importer.ImporterCreator
new file mode 100644
index 00000000000..f7c30c08c48
--- /dev/null
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/resources/META-INF/services/org.apache.shardingsphere.scaling.core.job.importer.ImporterCreator
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.scaling.core.job.importer.DefaultImporterCreator
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLScalingEntry.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLScalingEntry.java
index ce69a1a824b..c0c5b2d3878 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLScalingEntry.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLScalingEntry.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.data.pipeline.mysql;
-import org.apache.shardingsphere.data.pipeline.mysql.importer.MySQLImporter;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.MySQLIncrementalDumper;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.MySQLInventoryDumper;
import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
@@ -36,12 +35,7 @@ public final class MySQLScalingEntry implements ScalingEntry
{
public Class<MySQLIncrementalDumper> getIncrementalDumperClass() {
return MySQLIncrementalDumper.class;
}
-
- @Override
- public Class<MySQLImporter> getImporterClass() {
- return MySQLImporter.class;
- }
-
+
@Override
public String getType() {
return "MySQL";
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLScalingEntryTest.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLScalingEntryTest.java
index 326f443dc7d..5ce566cb32e 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLScalingEntryTest.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLScalingEntryTest.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.data.pipeline.mysql;
-import org.apache.shardingsphere.data.pipeline.mysql.importer.MySQLImporter;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.MySQLIncrementalDumper;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.MySQLInventoryDumper;
import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
@@ -34,7 +33,6 @@ public final class MySQLScalingEntryTest {
public void assertGetScalingEntryByDatabaseType() {
ScalingEntry scalingEntry = ScalingEntryFactory.getInstance("MySQL");
assertThat(scalingEntry, instanceOf(MySQLScalingEntry.class));
- assertThat(scalingEntry.getImporterClass(),
equalTo(MySQLImporter.class));
assertThat(scalingEntry.getInventoryDumperClass(),
equalTo(MySQLInventoryDumper.class));
assertThat(scalingEntry.getIncrementalDumperClass(),
equalTo(MySQLIncrementalDumper.class));
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussScalingEntry.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussScalingEntry.java
index 2f0d3e16487..294364f41f0 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussScalingEntry.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussScalingEntry.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.data.pipeline.opengauss;
-import
org.apache.shardingsphere.data.pipeline.opengauss.importer.OpenGaussImporter;
import
org.apache.shardingsphere.data.pipeline.opengauss.ingest.OpenGaussWalDumper;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.PostgreSQLInventoryDumper;
import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
@@ -36,12 +35,7 @@ public final class OpenGaussScalingEntry implements
ScalingEntry {
public Class<OpenGaussWalDumper> getIncrementalDumperClass() {
return OpenGaussWalDumper.class;
}
-
- @Override
- public Class<OpenGaussImporter> getImporterClass() {
- return OpenGaussImporter.class;
- }
-
+
@Override
public String getType() {
return "openGauss";
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussScalingEntryTest.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussScalingEntryTest.java
index 6ba5321dfb5..8c648087a4f 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussScalingEntryTest.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussScalingEntryTest.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.data.pipeline.opengauss;
-import
org.apache.shardingsphere.data.pipeline.opengauss.importer.OpenGaussImporter;
import
org.apache.shardingsphere.data.pipeline.opengauss.ingest.OpenGaussWalDumper;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.PostgreSQLInventoryDumper;
import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
@@ -34,7 +33,6 @@ public final class OpenGaussScalingEntryTest {
public void assertGetScalingEntryByDatabaseType() {
ScalingEntry actual = ScalingEntryFactory.getInstance("openGauss");
assertThat(actual, instanceOf(OpenGaussScalingEntry.class));
- assertThat(actual.getImporterClass(),
equalTo(OpenGaussImporter.class));
assertThat(actual.getInventoryDumperClass(),
equalTo(PostgreSQLInventoryDumper.class));
assertThat(actual.getIncrementalDumperClass(),
equalTo(OpenGaussWalDumper.class));
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLScalingEntry.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLScalingEntry.java
index 90e5c714226..89b3f415732 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLScalingEntry.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLScalingEntry.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.data.pipeline.postgresql;
-import
org.apache.shardingsphere.data.pipeline.postgresql.importer.PostgreSQLImporter;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.PostgreSQLInventoryDumper;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.PostgreSQLWalDumper;
import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
@@ -36,12 +35,7 @@ public final class PostgreSQLScalingEntry implements
ScalingEntry {
public Class<PostgreSQLWalDumper> getIncrementalDumperClass() {
return PostgreSQLWalDumper.class;
}
-
- @Override
- public Class<PostgreSQLImporter> getImporterClass() {
- return PostgreSQLImporter.class;
- }
-
+
@Override
public String getType() {
return "PostgreSQL";
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLScalingEntryTest.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLScalingEntryTest.java
index f1fbec9d60c..700e139d702 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLScalingEntryTest.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLScalingEntryTest.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.data.pipeline.postgresql;
-import
org.apache.shardingsphere.data.pipeline.postgresql.importer.PostgreSQLImporter;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.PostgreSQLInventoryDumper;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.PostgreSQLWalDumper;
import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
@@ -34,7 +33,6 @@ public final class PostgreSQLScalingEntryTest {
public void assertGetScalingEntryByDatabaseType() {
ScalingEntry scalingEntry =
ScalingEntryFactory.getInstance("PostgreSQL");
assertThat(scalingEntry, instanceOf(PostgreSQLScalingEntry.class));
- assertThat(scalingEntry.getImporterClass(),
equalTo(PostgreSQLImporter.class));
assertThat(scalingEntry.getInventoryDumperClass(),
equalTo(PostgreSQLInventoryDumper.class));
assertThat(scalingEntry.getIncrementalDumperClass(),
equalTo(PostgreSQLWalDumper.class));
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/importer/PostgreSQLImporter.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporterCreator.java
similarity index 60%
rename from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/importer/PostgreSQLImporter.java
rename to
shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporterCreator.java
index ff63036612b..0413a02845a 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/importer/PostgreSQLImporter.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporterCreator.java
@@ -15,21 +15,29 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.postgresql.importer;
+package org.apache.shardingsphere.data.pipeline.core.fixture;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.core.importer.AbstractImporter;
+import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
+import org.apache.shardingsphere.scaling.core.job.importer.ImporterCreator;
/**
- * PostgreSQL importer.
+ * Fixture importer creator.
*/
-public final class PostgreSQLImporter extends AbstractImporter {
+public final class FixtureImporterCreator implements ImporterCreator {
- public PostgreSQLImporter(final ImporterConfiguration importerConfig,
final PipelineDataSourceManager dataSourceManager, final PipelineChannel
channel,
- final PipelineJobProgressListener
jobProgressListener) {
- super(importerConfig, dataSourceManager, channel, jobProgressListener);
+ @Override
+ public Importer createImporter(final ImporterConfiguration importerConfig,
+ final PipelineDataSourceManager
dataSourceManager, final PipelineChannel channel,
+ final PipelineJobProgressListener
jobProgressListener) {
+ return new FixtureImporter(importerConfig, dataSourceManager, channel,
jobProgressListener);
+ }
+
+ @Override
+ public String getType() {
+ return "H2";
}
}
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/H2ScalingEntryFixture.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/H2ScalingEntryFixture.java
index 459da7dac8d..d599be9af00 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/H2ScalingEntryFixture.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/H2ScalingEntryFixture.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.data.pipeline.core.fixture;
-import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
import
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumper;
import
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumper;
import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
@@ -34,11 +33,6 @@ public final class H2ScalingEntryFixture implements
ScalingEntry {
return FixtureIncrementalDumper.class;
}
- @Override
- public Class<? extends Importer> getImporterClass() {
- return FixtureImporter.class;
- }
-
@Override
public String getType() {
return "H2";
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporterTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporterTest.java
similarity index 97%
rename from
shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporterTest.java
rename to
shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporterTest.java
index ff0b7280b7d..49ab326e7a9 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporterTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporterTest.java
@@ -56,7 +56,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
-public final class AbstractImporterTest {
+public final class DefaultImporterTest {
private static final String TABLE_NAME = "test_table";
@@ -78,12 +78,11 @@ public final class AbstractImporterTest {
@Mock
private PreparedStatement preparedStatement;
- private AbstractImporter jdbcImporter;
+ private DefaultImporter jdbcImporter;
@Before
public void setUp() throws SQLException {
- jdbcImporter = new AbstractImporter(mockImporterConfiguration(),
dataSourceManager, channel, new FixturePipelineJobProgressListener()) {
- };
+ jdbcImporter = new DefaultImporter(mockImporterConfiguration(),
dataSourceManager, channel, new FixturePipelineJobProgressListener());
when(dataSourceManager.getDataSource(dataSourceConfig)).thenReturn(dataSource);
when(dataSource.getConnection()).thenReturn(connection);
}
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/scaling/cor/job/importer/ImporterCreatorFactoryTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/scaling/cor/job/importer/ImporterCreatorFactoryTest.java
new file mode 100644
index 00000000000..2f54c27594d
--- /dev/null
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/scaling/cor/job/importer/ImporterCreatorFactoryTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.cor.job.importer;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertThat;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
+import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import org.apache.shardingsphere.data.pipeline.core.fixture.FixtureImporter;
+import
org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineJobProgressListener;
+import org.apache.shardingsphere.data.pipeline.core.importer.DefaultImporter;
+import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
+import
org.apache.shardingsphere.scaling.core.job.importer.ImporterCreatorFactory;
+import org.junit.Test;
+import org.mockito.Mock;
+
+public final class ImporterCreatorFactoryTest {
+
+ @Mock
+ private PipelineDataSourceManager dataSourceManager;
+
+ @Mock
+ private PipelineChannel channel;
+
+ private final PipelineDataSourceConfiguration dataSourceConfig = new
StandardPipelineDataSourceConfiguration(
+
"jdbc:h2:mem:test_db;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL;USER=root;PASSWORD=root",
"root", "root");
+
+ @Test
+ public void assertCreateImporterForMysql() {
+ Importer importer = ImporterCreatorFactory.getInstance("MySQL")
+ .createImporter(mockImporterConfiguration(),
dataSourceManager, channel,
+ new FixturePipelineJobProgressListener());
+ assertThat(importer, instanceOf(DefaultImporter.class));
+ }
+
+ @Test
+ public void assertCreateImporterForPostgreSQL() {
+ Importer importer = ImporterCreatorFactory.getInstance("PostgreSQL")
+ .createImporter(mockImporterConfiguration(),
dataSourceManager, channel,
+ new FixturePipelineJobProgressListener());
+ assertThat(importer, instanceOf(DefaultImporter.class));
+ }
+
+ @Test
+ public void assertCreateImporterForOpenGauss() {
+ Importer importer = ImporterCreatorFactory.getInstance("openGauss")
+ .createImporter(mockImporterConfiguration(),
dataSourceManager, channel,
+ new FixturePipelineJobProgressListener());
+ assertThat(importer, instanceOf(DefaultImporter.class));
+ }
+
+ @Test
+ public void assertCreateImporterForH2() {
+ Importer importer = ImporterCreatorFactory.getInstance("H2")
+ .createImporter(mockImporterConfiguration(),
dataSourceManager, channel,
+ new FixturePipelineJobProgressListener());
+ assertThat(importer, instanceOf(FixtureImporter.class));
+ }
+
+ private ImporterConfiguration mockImporterConfiguration() {
+ Map<LogicTableName, Set<String>> shardingColumnsMap =
Collections.singletonMap(new LogicTableName("test_table"),
Collections.singleton("user"));
+ return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap,
new TableNameSchemaNameMapping(Collections.emptyMap()), 1000, 3, 3);
+ }
+}
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/META-INF/services/org.apache.shardingsphere.scaling.core.job.importer.ImporterCreator
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/META-INF/services/org.apache.shardingsphere.scaling.core.job.importer.ImporterCreator
new file mode 100644
index 00000000000..7ba874250f7
--- /dev/null
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/META-INF/services/org.apache.shardingsphere.scaling.core.job.importer.ImporterCreator
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.data.pipeline.core.fixture.FixtureImporterCreator