This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 3da1aa02918 Refactor PipelineDataSourceSink (#29521)
3da1aa02918 is described below
commit 3da1aa0291805e90e7c4dbc3eec7e7402a96da96
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Dec 24 12:07:09 2023 +0800
Refactor PipelineDataSourceSink (#29521)
* Refactor PipelineDataSourceSink
* Refactor PipelineDataSourceSink
* Refactor PipelineDataSourceSink
* Refactor PipelineDataSourceSink
* Refactor PipelineDataSourceSink
* Refactor PipelineDataSourceSink
---
.../sink/{ => type}/PipelineDataSourceSink.java | 152 +++++++++------------
.../ingest/record/group/GroupedDataRecord.java | 10 +-
.../record/group/DataRecordGroupEngineTest.java | 33 ++---
.../migration/context/MigrationJobItemContext.java | 2 +-
.../core/importer/PipelineDataSourceSinkTest.java | 2 +-
5 files changed, 89 insertions(+), 110 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSink.java
similarity index 64%
rename from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
rename to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSink.java
index e58dd63ad6b..d1785edc690 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSink.java
@@ -15,16 +15,15 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.importer.sink;
+package org.apache.shardingsphere.data.pipeline.core.importer.sink.type;
-import lombok.AccessLevel;
-import lombok.Getter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineImporterJobWriteException;
import
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
+import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
@@ -32,7 +31,6 @@ import
org.apache.shardingsphere.data.pipeline.core.ingest.record.RecordUtils;
import
org.apache.shardingsphere.data.pipeline.core.ingest.record.group.DataRecordGroupEngine;
import
org.apache.shardingsphere.data.pipeline.core.ingest.record.group.GroupedDataRecord;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
-import
org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelineImportSQLBuilder;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
@@ -40,10 +38,10 @@ import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
-import java.sql.Statement;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@@ -54,72 +52,58 @@ import java.util.stream.Collectors;
@Slf4j
public final class PipelineDataSourceSink implements PipelineSink {
- @Getter(AccessLevel.PROTECTED)
private final ImporterConfiguration importerConfig;
private final PipelineDataSourceManager dataSourceManager;
- private final JobRateLimitAlgorithm rateLimitAlgorithm;
-
private final PipelineImportSQLBuilder importSQLBuilder;
private final DataRecordGroupEngine groupEngine;
- private final AtomicReference<Statement> batchInsertStatement = new
AtomicReference<>();
+ private final AtomicReference<PreparedStatement>
runningBatchInsertStatement;
- private final AtomicReference<Statement> updateStatement = new
AtomicReference<>();
+ private final AtomicReference<PreparedStatement> runningUpdateStatement;
- private final AtomicReference<Statement> batchDeleteStatement = new
AtomicReference<>();
+ private final AtomicReference<PreparedStatement>
runningBatchDeleteStatement;
public PipelineDataSourceSink(final ImporterConfiguration importerConfig,
final PipelineDataSourceManager dataSourceManager) {
this.importerConfig = importerConfig;
this.dataSourceManager = dataSourceManager;
- rateLimitAlgorithm = importerConfig.getRateLimitAlgorithm();
importSQLBuilder = new
PipelineImportSQLBuilder(importerConfig.getDataSourceConfig().getDatabaseType());
groupEngine = new DataRecordGroupEngine();
+ runningBatchInsertStatement = new AtomicReference<>();
+ runningUpdateStatement = new AtomicReference<>();
+ runningBatchDeleteStatement = new AtomicReference<>();
}
@Override
public PipelineJobProgressUpdatedParameter write(final String ackId, final
Collection<Record> records) {
- return
flush(dataSourceManager.getDataSource(importerConfig.getDataSourceConfig()),
records);
- }
-
- private PipelineJobProgressUpdatedParameter flush(final DataSource
dataSource, final Collection<Record> buffer) {
- Collection<DataRecord> dataRecords =
buffer.stream().filter(DataRecord.class::isInstance).map(DataRecord.class::cast).collect(Collectors.toList());
+ DataSource dataSource =
dataSourceManager.getDataSource(importerConfig.getDataSourceConfig());
+ Collection<DataRecord> dataRecords =
records.stream().filter(DataRecord.class::isInstance).map(DataRecord.class::cast).collect(Collectors.toList());
if (dataRecords.isEmpty()) {
return new PipelineJobProgressUpdatedParameter(0);
}
- int insertRecordNumber = 0;
- for (DataRecord each : dataRecords) {
- if (PipelineSQLOperationType.INSERT == each.getType()) {
- insertRecordNumber++;
- }
- }
- Collection<GroupedDataRecord> groupedDataRecords =
groupEngine.group(dataRecords);
- for (GroupedDataRecord each : groupedDataRecords) {
- flushInternal(dataSource, each.getBatchDeleteDataRecords());
- flushInternal(dataSource, each.getBatchInsertDataRecords());
- flushInternal(dataSource, each.getBatchUpdateDataRecords());
- sequentialFlush(dataSource, each.getNonBatchRecords());
+ for (GroupedDataRecord each : groupEngine.group(dataRecords)) {
+ batchWrite(dataSource, each.getBatchDeleteDataRecords());
+ batchWrite(dataSource, each.getBatchInsertDataRecords());
+ batchWrite(dataSource, each.getBatchUpdateDataRecords());
+ sequentialWrite(dataSource, each.getNonBatchRecords());
}
- return new PipelineJobProgressUpdatedParameter(insertRecordNumber);
+ return new PipelineJobProgressUpdatedParameter((int)
dataRecords.stream().filter(each -> PipelineSQLOperationType.INSERT ==
each.getType()).count());
}
- private void flushInternal(final DataSource dataSource, final
List<DataRecord> buffer) {
- if (null == buffer || buffer.isEmpty()) {
+ @SuppressWarnings("BusyWait")
+ @SneakyThrows(InterruptedException.class)
+ private void batchWrite(final DataSource dataSource, final
Collection<DataRecord> records) {
+ if (records.isEmpty()) {
return;
}
- tryFlush(dataSource, buffer);
- }
-
- @SneakyThrows(InterruptedException.class)
- private void tryFlush(final DataSource dataSource, final List<DataRecord>
buffer) {
for (int i = 0; !Thread.interrupted() && i <=
importerConfig.getRetryTimes(); i++) {
try {
- doFlush(dataSource, buffer);
- return;
+ doWrite(dataSource, records);
+ break;
} catch (final SQLException ex) {
- log.error("flush failed {}/{} times.", i,
importerConfig.getRetryTimes(), ex);
+ log.error("Flush failed {}/{} times.", i,
importerConfig.getRetryTimes(), ex);
if (i == importerConfig.getRetryTimes()) {
throw new PipelineImporterJobWriteException(ex);
}
@@ -128,30 +112,35 @@ public final class PipelineDataSourceSink implements
PipelineSink {
}
}
- private void doFlush(final DataSource dataSource, final List<DataRecord>
buffer) throws SQLException {
+ private void sequentialWrite(final DataSource dataSource, final
Collection<DataRecord> records) {
+ // TODO it's better use transaction, but execute delete maybe not
effect when open transaction of PostgreSQL sometimes
+ try {
+ for (DataRecord each : records) {
+ doWrite(dataSource, Collections.singleton(each));
+ }
+ } catch (final SQLException ex) {
+ throw new PipelineImporterJobWriteException(ex);
+ }
+ }
+
+ private void doWrite(final DataSource dataSource, final
Collection<DataRecord> records) throws SQLException {
try (Connection connection = dataSource.getConnection()) {
- boolean enableTransaction = buffer.size() > 1;
+ boolean enableTransaction = records.size() > 1;
if (enableTransaction) {
connection.setAutoCommit(false);
}
- switch (buffer.get(0).getType()) {
+ switch (records.iterator().next().getType()) {
case INSERT:
- if (null != rateLimitAlgorithm) {
-
rateLimitAlgorithm.intercept(PipelineSQLOperationType.INSERT, 1);
- }
- executeBatchInsert(connection, buffer);
+
Optional.ofNullable(importerConfig.getRateLimitAlgorithm()).ifPresent(optional
-> optional.intercept(PipelineSQLOperationType.INSERT, 1));
+ executeBatchInsert(connection, records);
break;
case UPDATE:
- if (null != rateLimitAlgorithm) {
-
rateLimitAlgorithm.intercept(PipelineSQLOperationType.UPDATE, 1);
- }
- executeUpdate(connection, buffer);
+
Optional.ofNullable(importerConfig.getRateLimitAlgorithm()).ifPresent(optional
-> optional.intercept(PipelineSQLOperationType.UPDATE, 1));
+ executeUpdate(connection, records);
break;
case DELETE:
- if (null != rateLimitAlgorithm) {
-
rateLimitAlgorithm.intercept(PipelineSQLOperationType.DELETE, 1);
- }
- executeBatchDelete(connection, buffer);
+
Optional.ofNullable(importerConfig.getRateLimitAlgorithm()).ifPresent(optional
-> optional.intercept(PipelineSQLOperationType.DELETE, 1));
+ executeBatchDelete(connection, records);
break;
default:
break;
@@ -162,11 +151,11 @@ public final class PipelineDataSourceSink implements
PipelineSink {
}
}
- private void executeBatchInsert(final Connection connection, final
List<DataRecord> dataRecords) throws SQLException {
- DataRecord dataRecord = dataRecords.get(0);
- String insertSql =
importSQLBuilder.buildInsertSQL(getImporterConfig().findSchemaName(dataRecord.getTableName()).orElse(null),
dataRecord);
- try (PreparedStatement preparedStatement =
connection.prepareStatement(insertSql)) {
- batchInsertStatement.set(preparedStatement);
+ private void executeBatchInsert(final Connection connection, final
Collection<DataRecord> dataRecords) throws SQLException {
+ DataRecord dataRecord = dataRecords.iterator().next();
+ String sql =
importSQLBuilder.buildInsertSQL(importerConfig.findSchemaName(dataRecord.getTableName()).orElse(null),
dataRecord);
+ try (PreparedStatement preparedStatement =
connection.prepareStatement(sql)) {
+ runningBatchInsertStatement.set(preparedStatement);
preparedStatement.setQueryTimeout(30);
for (DataRecord each : dataRecords) {
for (int i = 0; i < each.getColumnCount(); i++) {
@@ -176,11 +165,11 @@ public final class PipelineDataSourceSink implements
PipelineSink {
}
preparedStatement.executeBatch();
} finally {
- batchInsertStatement.set(null);
+ runningBatchInsertStatement.set(null);
}
}
- private void executeUpdate(final Connection connection, final
List<DataRecord> dataRecords) throws SQLException {
+ private void executeUpdate(final Connection connection, final
Collection<DataRecord> dataRecords) throws SQLException {
for (DataRecord each : dataRecords) {
executeUpdate(connection, each);
}
@@ -190,9 +179,9 @@ public final class PipelineDataSourceSink implements
PipelineSink {
Set<String> shardingColumns =
importerConfig.getShardingColumns(dataRecord.getTableName());
List<Column> conditionColumns =
RecordUtils.extractConditionColumns(dataRecord, shardingColumns);
List<Column> setColumns =
dataRecord.getColumns().stream().filter(Column::isUpdated).collect(Collectors.toList());
- String updateSql =
importSQLBuilder.buildUpdateSQL(getImporterConfig().findSchemaName(dataRecord.getTableName()).orElse(null),
dataRecord, conditionColumns);
- try (PreparedStatement preparedStatement =
connection.prepareStatement(updateSql)) {
- updateStatement.set(preparedStatement);
+ String sql =
importSQLBuilder.buildUpdateSQL(importerConfig.findSchemaName(dataRecord.getTableName()).orElse(null),
dataRecord, conditionColumns);
+ try (PreparedStatement preparedStatement =
connection.prepareStatement(sql)) {
+ runningUpdateStatement.set(preparedStatement);
for (int i = 0; i < setColumns.size(); i++) {
preparedStatement.setObject(i + 1,
setColumns.get(i).getValue());
}
@@ -208,19 +197,19 @@ public final class PipelineDataSourceSink implements
PipelineSink {
// TODO if table without unique key the conditionColumns before
values is null, so update will fail at PostgreSQL
int updateCount = preparedStatement.executeUpdate();
if (1 != updateCount) {
- log.warn("executeUpdate failed, updateCount={}, updateSql={},
updatedColumns={}, conditionColumns={}", updateCount, updateSql, setColumns,
conditionColumns);
+ log.warn("executeUpdate failed, updateCount={}, updateSql={},
updatedColumns={}, conditionColumns={}", updateCount, sql, setColumns,
conditionColumns);
}
} finally {
- updateStatement.set(null);
+ runningUpdateStatement.set(null);
}
}
- private void executeBatchDelete(final Connection connection, final
List<DataRecord> dataRecords) throws SQLException {
- DataRecord dataRecord = dataRecords.get(0);
- String deleteSQL =
importSQLBuilder.buildDeleteSQL(getImporterConfig().findSchemaName(dataRecord.getTableName()).orElse(null),
dataRecord,
+ private void executeBatchDelete(final Connection connection, final
Collection<DataRecord> dataRecords) throws SQLException {
+ DataRecord dataRecord = dataRecords.iterator().next();
+ String sql =
importSQLBuilder.buildDeleteSQL(importerConfig.findSchemaName(dataRecord.getTableName()).orElse(null),
dataRecord,
RecordUtils.extractConditionColumns(dataRecord,
importerConfig.getShardingColumns(dataRecord.getTableName())));
- try (PreparedStatement preparedStatement =
connection.prepareStatement(deleteSQL)) {
- batchDeleteStatement.set(preparedStatement);
+ try (PreparedStatement preparedStatement =
connection.prepareStatement(sql)) {
+ runningBatchDeleteStatement.set(preparedStatement);
preparedStatement.setQueryTimeout(30);
for (DataRecord each : dataRecords) {
List<Column> conditionColumns =
RecordUtils.extractConditionColumns(each,
importerConfig.getShardingColumns(dataRecord.getTableName()));
@@ -235,25 +224,14 @@ public final class PipelineDataSourceSink implements
PipelineSink {
}
preparedStatement.executeBatch();
} finally {
- batchDeleteStatement.set(null);
- }
- }
-
- private void sequentialFlush(final DataSource dataSource, final
List<DataRecord> buffer) {
- // TODO it's better use transaction, but execute delete maybe not
effect when open transaction of PostgreSQL sometimes
- try {
- for (DataRecord each : buffer) {
- doFlush(dataSource, Collections.singletonList(each));
- }
- } catch (final SQLException ex) {
- throw new PipelineImporterJobWriteException(ex);
+ runningBatchDeleteStatement.set(null);
}
}
@Override
public void close() {
- PipelineJdbcUtils.cancelStatement(batchInsertStatement.get());
- PipelineJdbcUtils.cancelStatement(updateStatement.get());
- PipelineJdbcUtils.cancelStatement(batchDeleteStatement.get());
+ PipelineJdbcUtils.cancelStatement(runningBatchInsertStatement.get());
+ PipelineJdbcUtils.cancelStatement(runningUpdateStatement.get());
+ PipelineJdbcUtils.cancelStatement(runningBatchDeleteStatement.get());
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/GroupedDataRecord.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/GroupedDataRecord.java
index daa0e47c177..ef54b3c7743 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/GroupedDataRecord.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/GroupedDataRecord.java
@@ -21,7 +21,7 @@ import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
-import java.util.List;
+import java.util.Collection;
@RequiredArgsConstructor
@Getter
@@ -29,11 +29,11 @@ public final class GroupedDataRecord {
private final String tableName;
- private final List<DataRecord> batchInsertDataRecords;
+ private final Collection<DataRecord> batchInsertDataRecords;
- private final List<DataRecord> batchUpdateDataRecords;
+ private final Collection<DataRecord> batchUpdateDataRecords;
- private final List<DataRecord> batchDeleteDataRecords;
+ private final Collection<DataRecord> batchDeleteDataRecords;
- private final List<DataRecord> nonBatchRecords;
+ private final Collection<DataRecord> nonBatchRecords;
}
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngineTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngineTest.java
index 17e2e9fede5..64a6b84769b 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngineTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngineTest.java
@@ -87,20 +87,21 @@ class DataRecordGroupEngineTest {
assertThat(actual.size(), is(1));
GroupedDataRecord actualGroupedDataRecord = actual.iterator().next();
assertThat(actualGroupedDataRecord.getBatchUpdateDataRecords().size(),
is(2));
- DataRecord actualFirstDataRecord =
actualGroupedDataRecord.getBatchUpdateDataRecords().get(0);
- assertThat(actualFirstDataRecord.getType(),
is(PipelineSQLOperationType.UPDATE));
- assertThat(actualFirstDataRecord.getTableName(), is("order"));
- assertThat(actualFirstDataRecord.getColumn(0).getOldValue(), is(1));
- assertThat(actualFirstDataRecord.getColumn(0).getValue(), is(2));
- assertThat(actualFirstDataRecord.getColumn(1).getValue(), is(1));
- assertThat(actualFirstDataRecord.getColumn(2).getValue(), is(1));
- DataRecord actualSecondDataRecord =
actualGroupedDataRecord.getBatchUpdateDataRecords().get(1);
- assertThat(actualSecondDataRecord.getType(),
is(PipelineSQLOperationType.UPDATE));
- assertThat(actualSecondDataRecord.getTableName(), is("order"));
- assertThat(actualSecondDataRecord.getColumn(0).getOldValue(), is(2));
- assertThat(actualSecondDataRecord.getColumn(0).getValue(), is(3));
- assertThat(actualSecondDataRecord.getColumn(1).getValue(), is(2));
- assertThat(actualSecondDataRecord.getColumn(2).getValue(), is(2));
+ Iterator<DataRecord> batchUpdateDataRecords =
actualGroupedDataRecord.getBatchUpdateDataRecords().iterator();
+ DataRecord actualDataRecord1 = batchUpdateDataRecords.next();
+ assertThat(actualDataRecord1.getType(),
is(PipelineSQLOperationType.UPDATE));
+ assertThat(actualDataRecord1.getTableName(), is("order"));
+ assertThat(actualDataRecord1.getColumn(0).getOldValue(), is(1));
+ assertThat(actualDataRecord1.getColumn(0).getValue(), is(2));
+ assertThat(actualDataRecord1.getColumn(1).getValue(), is(1));
+ assertThat(actualDataRecord1.getColumn(2).getValue(), is(1));
+ DataRecord actualDataRecord2 = batchUpdateDataRecords.next();
+ assertThat(actualDataRecord2.getType(),
is(PipelineSQLOperationType.UPDATE));
+ assertThat(actualDataRecord2.getTableName(), is("order"));
+ assertThat(actualDataRecord2.getColumn(0).getOldValue(), is(2));
+ assertThat(actualDataRecord2.getColumn(0).getValue(), is(3));
+ assertThat(actualDataRecord2.getColumn(1).getValue(), is(2));
+ assertThat(actualDataRecord2.getColumn(2).getValue(), is(2));
}
@Test
@@ -130,9 +131,9 @@ class DataRecordGroupEngineTest {
assertDataRecordsMatched(actual.iterator().next().getNonBatchRecords(),
Arrays.asList(beforeDataRecord, afterDataRecord));
}
- private void assertDataRecordsMatched(final List<DataRecord>
actualRecords, final List<DataRecord> expectedRecords) {
+ private void assertDataRecordsMatched(final Collection<DataRecord>
actualRecords, final List<DataRecord> expectedRecords) {
for (int i = 0; i < actualRecords.size(); i++) {
- assertThat(actualRecords.get(0),
sameInstance(expectedRecords.get(0)));
+ assertThat(actualRecords.iterator().next(),
sameInstance(expectedRecords.get(0)));
}
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java
index ea77dd55b4b..a7caa8aa922 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java
@@ -31,7 +31,7 @@ import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJob
import
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
-import
org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineDataSourceSink;
+import
org.apache.shardingsphere.data.pipeline.core.importer.sink.type.PipelineDataSourceSink;
import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
index f091e71c5e9..3e61f66f537 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
@@ -24,7 +24,7 @@ import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourc
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
import
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.importer.SingleChannelConsumerImporter;
-import
org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineDataSourceSink;
+import
org.apache.shardingsphere.data.pipeline.core.importer.sink.type.PipelineDataSourceSink;
import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
import
org.apache.shardingsphere.data.pipeline.core.ingest.channel.PipelineChannel;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;