This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 3da1aa02918 Refactor PipelineDataSourceSink (#29521)
3da1aa02918 is described below

commit 3da1aa0291805e90e7c4dbc3eec7e7402a96da96
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Dec 24 12:07:09 2023 +0800

    Refactor PipelineDataSourceSink (#29521)
    
    * Refactor PipelineDataSourceSink
    
    * Refactor PipelineDataSourceSink
    
    * Refactor PipelineDataSourceSink
    
    * Refactor PipelineDataSourceSink
    
    * Refactor PipelineDataSourceSink
    
    * Refactor PipelineDataSourceSink
---
 .../sink/{ => type}/PipelineDataSourceSink.java    | 152 +++++++++------------
 .../ingest/record/group/GroupedDataRecord.java     |  10 +-
 .../record/group/DataRecordGroupEngineTest.java    |  33 ++---
 .../migration/context/MigrationJobItemContext.java |   2 +-
 .../core/importer/PipelineDataSourceSinkTest.java  |   2 +-
 5 files changed, 89 insertions(+), 110 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSink.java
similarity index 64%
rename from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
rename to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSink.java
index e58dd63ad6b..d1785edc690 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSink.java
@@ -15,16 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.importer.sink;
+package org.apache.shardingsphere.data.pipeline.core.importer.sink.type;
 
-import lombok.AccessLevel;
-import lombok.Getter;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineImporterJobWriteException;
 import 
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
+import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
@@ -32,7 +31,6 @@ import 
org.apache.shardingsphere.data.pipeline.core.ingest.record.RecordUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.record.group.DataRecordGroupEngine;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.record.group.GroupedDataRecord;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
-import 
org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
 import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelineImportSQLBuilder;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
 
@@ -40,10 +38,10 @@ import javax.sql.DataSource;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
-import java.sql.Statement;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
@@ -54,72 +52,58 @@ import java.util.stream.Collectors;
 @Slf4j
 public final class PipelineDataSourceSink implements PipelineSink {
     
-    @Getter(AccessLevel.PROTECTED)
     private final ImporterConfiguration importerConfig;
     
     private final PipelineDataSourceManager dataSourceManager;
     
-    private final JobRateLimitAlgorithm rateLimitAlgorithm;
-    
     private final PipelineImportSQLBuilder importSQLBuilder;
     
     private final DataRecordGroupEngine groupEngine;
     
-    private final AtomicReference<Statement> batchInsertStatement = new 
AtomicReference<>();
+    private final AtomicReference<PreparedStatement> 
runningBatchInsertStatement;
     
-    private final AtomicReference<Statement> updateStatement = new 
AtomicReference<>();
+    private final AtomicReference<PreparedStatement> runningUpdateStatement;
     
-    private final AtomicReference<Statement> batchDeleteStatement = new 
AtomicReference<>();
+    private final AtomicReference<PreparedStatement> 
runningBatchDeleteStatement;
     
     public PipelineDataSourceSink(final ImporterConfiguration importerConfig, 
final PipelineDataSourceManager dataSourceManager) {
         this.importerConfig = importerConfig;
         this.dataSourceManager = dataSourceManager;
-        rateLimitAlgorithm = importerConfig.getRateLimitAlgorithm();
         importSQLBuilder = new 
PipelineImportSQLBuilder(importerConfig.getDataSourceConfig().getDatabaseType());
         groupEngine = new DataRecordGroupEngine();
+        runningBatchInsertStatement = new AtomicReference<>();
+        runningUpdateStatement = new AtomicReference<>();
+        runningBatchDeleteStatement = new AtomicReference<>();
     }
     
     @Override
     public PipelineJobProgressUpdatedParameter write(final String ackId, final 
Collection<Record> records) {
-        return 
flush(dataSourceManager.getDataSource(importerConfig.getDataSourceConfig()), 
records);
-    }
-    
-    private PipelineJobProgressUpdatedParameter flush(final DataSource 
dataSource, final Collection<Record> buffer) {
-        Collection<DataRecord> dataRecords = 
buffer.stream().filter(DataRecord.class::isInstance).map(DataRecord.class::cast).collect(Collectors.toList());
+        DataSource dataSource = 
dataSourceManager.getDataSource(importerConfig.getDataSourceConfig());
+        Collection<DataRecord> dataRecords = 
records.stream().filter(DataRecord.class::isInstance).map(DataRecord.class::cast).collect(Collectors.toList());
         if (dataRecords.isEmpty()) {
             return new PipelineJobProgressUpdatedParameter(0);
         }
-        int insertRecordNumber = 0;
-        for (DataRecord each : dataRecords) {
-            if (PipelineSQLOperationType.INSERT == each.getType()) {
-                insertRecordNumber++;
-            }
-        }
-        Collection<GroupedDataRecord> groupedDataRecords = 
groupEngine.group(dataRecords);
-        for (GroupedDataRecord each : groupedDataRecords) {
-            flushInternal(dataSource, each.getBatchDeleteDataRecords());
-            flushInternal(dataSource, each.getBatchInsertDataRecords());
-            flushInternal(dataSource, each.getBatchUpdateDataRecords());
-            sequentialFlush(dataSource, each.getNonBatchRecords());
+        for (GroupedDataRecord each : groupEngine.group(dataRecords)) {
+            batchWrite(dataSource, each.getBatchDeleteDataRecords());
+            batchWrite(dataSource, each.getBatchInsertDataRecords());
+            batchWrite(dataSource, each.getBatchUpdateDataRecords());
+            sequentialWrite(dataSource, each.getNonBatchRecords());
         }
-        return new PipelineJobProgressUpdatedParameter(insertRecordNumber);
+        return new PipelineJobProgressUpdatedParameter((int) 
dataRecords.stream().filter(each -> PipelineSQLOperationType.INSERT == 
each.getType()).count());
     }
     
-    private void flushInternal(final DataSource dataSource, final 
List<DataRecord> buffer) {
-        if (null == buffer || buffer.isEmpty()) {
+    @SuppressWarnings("BusyWait")
+    @SneakyThrows(InterruptedException.class)
+    private void batchWrite(final DataSource dataSource, final 
Collection<DataRecord> records) {
+        if (records.isEmpty()) {
             return;
         }
-        tryFlush(dataSource, buffer);
-    }
-    
-    @SneakyThrows(InterruptedException.class)
-    private void tryFlush(final DataSource dataSource, final List<DataRecord> 
buffer) {
         for (int i = 0; !Thread.interrupted() && i <= 
importerConfig.getRetryTimes(); i++) {
             try {
-                doFlush(dataSource, buffer);
-                return;
+                doWrite(dataSource, records);
+                break;
             } catch (final SQLException ex) {
-                log.error("flush failed {}/{} times.", i, 
importerConfig.getRetryTimes(), ex);
+                log.error("Flush failed {}/{} times.", i, 
importerConfig.getRetryTimes(), ex);
                 if (i == importerConfig.getRetryTimes()) {
                     throw new PipelineImporterJobWriteException(ex);
                 }
@@ -128,30 +112,35 @@ public final class PipelineDataSourceSink implements 
PipelineSink {
         }
     }
     
-    private void doFlush(final DataSource dataSource, final List<DataRecord> 
buffer) throws SQLException {
+    private void sequentialWrite(final DataSource dataSource, final 
Collection<DataRecord> records) {
+        // TODO it's better use transaction, but execute delete maybe not 
effect when open transaction of PostgreSQL sometimes
+        try {
+            for (DataRecord each : records) {
+                doWrite(dataSource, Collections.singleton(each));
+            }
+        } catch (final SQLException ex) {
+            throw new PipelineImporterJobWriteException(ex);
+        }
+    }
+    
+    private void doWrite(final DataSource dataSource, final 
Collection<DataRecord> records) throws SQLException {
         try (Connection connection = dataSource.getConnection()) {
-            boolean enableTransaction = buffer.size() > 1;
+            boolean enableTransaction = records.size() > 1;
             if (enableTransaction) {
                 connection.setAutoCommit(false);
             }
-            switch (buffer.get(0).getType()) {
+            switch (records.iterator().next().getType()) {
                 case INSERT:
-                    if (null != rateLimitAlgorithm) {
-                        
rateLimitAlgorithm.intercept(PipelineSQLOperationType.INSERT, 1);
-                    }
-                    executeBatchInsert(connection, buffer);
+                    
Optional.ofNullable(importerConfig.getRateLimitAlgorithm()).ifPresent(optional 
-> optional.intercept(PipelineSQLOperationType.INSERT, 1));
+                    executeBatchInsert(connection, records);
                     break;
                 case UPDATE:
-                    if (null != rateLimitAlgorithm) {
-                        
rateLimitAlgorithm.intercept(PipelineSQLOperationType.UPDATE, 1);
-                    }
-                    executeUpdate(connection, buffer);
+                    
Optional.ofNullable(importerConfig.getRateLimitAlgorithm()).ifPresent(optional 
-> optional.intercept(PipelineSQLOperationType.UPDATE, 1));
+                    executeUpdate(connection, records);
                     break;
                 case DELETE:
-                    if (null != rateLimitAlgorithm) {
-                        
rateLimitAlgorithm.intercept(PipelineSQLOperationType.DELETE, 1);
-                    }
-                    executeBatchDelete(connection, buffer);
+                    
Optional.ofNullable(importerConfig.getRateLimitAlgorithm()).ifPresent(optional 
-> optional.intercept(PipelineSQLOperationType.DELETE, 1));
+                    executeBatchDelete(connection, records);
                     break;
                 default:
                     break;
@@ -162,11 +151,11 @@ public final class PipelineDataSourceSink implements 
PipelineSink {
         }
     }
     
-    private void executeBatchInsert(final Connection connection, final 
List<DataRecord> dataRecords) throws SQLException {
-        DataRecord dataRecord = dataRecords.get(0);
-        String insertSql = 
importSQLBuilder.buildInsertSQL(getImporterConfig().findSchemaName(dataRecord.getTableName()).orElse(null),
 dataRecord);
-        try (PreparedStatement preparedStatement = 
connection.prepareStatement(insertSql)) {
-            batchInsertStatement.set(preparedStatement);
+    private void executeBatchInsert(final Connection connection, final 
Collection<DataRecord> dataRecords) throws SQLException {
+        DataRecord dataRecord = dataRecords.iterator().next();
+        String sql = 
importSQLBuilder.buildInsertSQL(importerConfig.findSchemaName(dataRecord.getTableName()).orElse(null),
 dataRecord);
+        try (PreparedStatement preparedStatement = 
connection.prepareStatement(sql)) {
+            runningBatchInsertStatement.set(preparedStatement);
             preparedStatement.setQueryTimeout(30);
             for (DataRecord each : dataRecords) {
                 for (int i = 0; i < each.getColumnCount(); i++) {
@@ -176,11 +165,11 @@ public final class PipelineDataSourceSink implements 
PipelineSink {
             }
             preparedStatement.executeBatch();
         } finally {
-            batchInsertStatement.set(null);
+            runningBatchInsertStatement.set(null);
         }
     }
     
-    private void executeUpdate(final Connection connection, final 
List<DataRecord> dataRecords) throws SQLException {
+    private void executeUpdate(final Connection connection, final 
Collection<DataRecord> dataRecords) throws SQLException {
         for (DataRecord each : dataRecords) {
             executeUpdate(connection, each);
         }
@@ -190,9 +179,9 @@ public final class PipelineDataSourceSink implements 
PipelineSink {
         Set<String> shardingColumns = 
importerConfig.getShardingColumns(dataRecord.getTableName());
         List<Column> conditionColumns = 
RecordUtils.extractConditionColumns(dataRecord, shardingColumns);
         List<Column> setColumns = 
dataRecord.getColumns().stream().filter(Column::isUpdated).collect(Collectors.toList());
-        String updateSql = 
importSQLBuilder.buildUpdateSQL(getImporterConfig().findSchemaName(dataRecord.getTableName()).orElse(null),
 dataRecord, conditionColumns);
-        try (PreparedStatement preparedStatement = 
connection.prepareStatement(updateSql)) {
-            updateStatement.set(preparedStatement);
+        String sql = 
importSQLBuilder.buildUpdateSQL(importerConfig.findSchemaName(dataRecord.getTableName()).orElse(null),
 dataRecord, conditionColumns);
+        try (PreparedStatement preparedStatement = 
connection.prepareStatement(sql)) {
+            runningUpdateStatement.set(preparedStatement);
             for (int i = 0; i < setColumns.size(); i++) {
                 preparedStatement.setObject(i + 1, 
setColumns.get(i).getValue());
             }
@@ -208,19 +197,19 @@ public final class PipelineDataSourceSink implements 
PipelineSink {
             // TODO if table without unique key the conditionColumns before 
values is null, so update will fail at PostgreSQL
             int updateCount = preparedStatement.executeUpdate();
             if (1 != updateCount) {
-                log.warn("executeUpdate failed, updateCount={}, updateSql={}, 
updatedColumns={}, conditionColumns={}", updateCount, updateSql, setColumns, 
conditionColumns);
+                log.warn("executeUpdate failed, updateCount={}, updateSql={}, 
updatedColumns={}, conditionColumns={}", updateCount, sql, setColumns, 
conditionColumns);
             }
         } finally {
-            updateStatement.set(null);
+            runningUpdateStatement.set(null);
         }
     }
     
-    private void executeBatchDelete(final Connection connection, final 
List<DataRecord> dataRecords) throws SQLException {
-        DataRecord dataRecord = dataRecords.get(0);
-        String deleteSQL = 
importSQLBuilder.buildDeleteSQL(getImporterConfig().findSchemaName(dataRecord.getTableName()).orElse(null),
 dataRecord,
+    private void executeBatchDelete(final Connection connection, final 
Collection<DataRecord> dataRecords) throws SQLException {
+        DataRecord dataRecord = dataRecords.iterator().next();
+        String sql = 
importSQLBuilder.buildDeleteSQL(importerConfig.findSchemaName(dataRecord.getTableName()).orElse(null),
 dataRecord,
                 RecordUtils.extractConditionColumns(dataRecord, 
importerConfig.getShardingColumns(dataRecord.getTableName())));
-        try (PreparedStatement preparedStatement = 
connection.prepareStatement(deleteSQL)) {
-            batchDeleteStatement.set(preparedStatement);
+        try (PreparedStatement preparedStatement = 
connection.prepareStatement(sql)) {
+            runningBatchDeleteStatement.set(preparedStatement);
             preparedStatement.setQueryTimeout(30);
             for (DataRecord each : dataRecords) {
                 List<Column> conditionColumns = 
RecordUtils.extractConditionColumns(each, 
importerConfig.getShardingColumns(dataRecord.getTableName()));
@@ -235,25 +224,14 @@ public final class PipelineDataSourceSink implements 
PipelineSink {
             }
             preparedStatement.executeBatch();
         } finally {
-            batchDeleteStatement.set(null);
-        }
-    }
-    
-    private void sequentialFlush(final DataSource dataSource, final 
List<DataRecord> buffer) {
-        // TODO it's better use transaction, but execute delete maybe not 
effect when open transaction of PostgreSQL sometimes
-        try {
-            for (DataRecord each : buffer) {
-                doFlush(dataSource, Collections.singletonList(each));
-            }
-        } catch (final SQLException ex) {
-            throw new PipelineImporterJobWriteException(ex);
+            runningBatchDeleteStatement.set(null);
         }
     }
     
     @Override
     public void close() {
-        PipelineJdbcUtils.cancelStatement(batchInsertStatement.get());
-        PipelineJdbcUtils.cancelStatement(updateStatement.get());
-        PipelineJdbcUtils.cancelStatement(batchDeleteStatement.get());
+        PipelineJdbcUtils.cancelStatement(runningBatchInsertStatement.get());
+        PipelineJdbcUtils.cancelStatement(runningUpdateStatement.get());
+        PipelineJdbcUtils.cancelStatement(runningBatchDeleteStatement.get());
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/GroupedDataRecord.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/GroupedDataRecord.java
index daa0e47c177..ef54b3c7743 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/GroupedDataRecord.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/GroupedDataRecord.java
@@ -21,7 +21,7 @@ import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
 
-import java.util.List;
+import java.util.Collection;
 
 @RequiredArgsConstructor
 @Getter
@@ -29,11 +29,11 @@ public final class GroupedDataRecord {
     
     private final String tableName;
     
-    private final List<DataRecord> batchInsertDataRecords;
+    private final Collection<DataRecord> batchInsertDataRecords;
     
-    private final List<DataRecord> batchUpdateDataRecords;
+    private final Collection<DataRecord> batchUpdateDataRecords;
     
-    private final List<DataRecord> batchDeleteDataRecords;
+    private final Collection<DataRecord> batchDeleteDataRecords;
     
-    private final List<DataRecord> nonBatchRecords;
+    private final Collection<DataRecord> nonBatchRecords;
 }
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngineTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngineTest.java
index 17e2e9fede5..64a6b84769b 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngineTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/group/DataRecordGroupEngineTest.java
@@ -87,20 +87,21 @@ class DataRecordGroupEngineTest {
         assertThat(actual.size(), is(1));
         GroupedDataRecord actualGroupedDataRecord = actual.iterator().next();
         assertThat(actualGroupedDataRecord.getBatchUpdateDataRecords().size(), 
is(2));
-        DataRecord actualFirstDataRecord = 
actualGroupedDataRecord.getBatchUpdateDataRecords().get(0);
-        assertThat(actualFirstDataRecord.getType(), 
is(PipelineSQLOperationType.UPDATE));
-        assertThat(actualFirstDataRecord.getTableName(), is("order"));
-        assertThat(actualFirstDataRecord.getColumn(0).getOldValue(), is(1));
-        assertThat(actualFirstDataRecord.getColumn(0).getValue(), is(2));
-        assertThat(actualFirstDataRecord.getColumn(1).getValue(), is(1));
-        assertThat(actualFirstDataRecord.getColumn(2).getValue(), is(1));
-        DataRecord actualSecondDataRecord = 
actualGroupedDataRecord.getBatchUpdateDataRecords().get(1);
-        assertThat(actualSecondDataRecord.getType(), 
is(PipelineSQLOperationType.UPDATE));
-        assertThat(actualSecondDataRecord.getTableName(), is("order"));
-        assertThat(actualSecondDataRecord.getColumn(0).getOldValue(), is(2));
-        assertThat(actualSecondDataRecord.getColumn(0).getValue(), is(3));
-        assertThat(actualSecondDataRecord.getColumn(1).getValue(), is(2));
-        assertThat(actualSecondDataRecord.getColumn(2).getValue(), is(2));
+        Iterator<DataRecord> batchUpdateDataRecords = 
actualGroupedDataRecord.getBatchUpdateDataRecords().iterator();
+        DataRecord actualDataRecord1 = batchUpdateDataRecords.next();
+        assertThat(actualDataRecord1.getType(), 
is(PipelineSQLOperationType.UPDATE));
+        assertThat(actualDataRecord1.getTableName(), is("order"));
+        assertThat(actualDataRecord1.getColumn(0).getOldValue(), is(1));
+        assertThat(actualDataRecord1.getColumn(0).getValue(), is(2));
+        assertThat(actualDataRecord1.getColumn(1).getValue(), is(1));
+        assertThat(actualDataRecord1.getColumn(2).getValue(), is(1));
+        DataRecord actualDataRecord2 = batchUpdateDataRecords.next();
+        assertThat(actualDataRecord2.getType(), 
is(PipelineSQLOperationType.UPDATE));
+        assertThat(actualDataRecord2.getTableName(), is("order"));
+        assertThat(actualDataRecord2.getColumn(0).getOldValue(), is(2));
+        assertThat(actualDataRecord2.getColumn(0).getValue(), is(3));
+        assertThat(actualDataRecord2.getColumn(1).getValue(), is(2));
+        assertThat(actualDataRecord2.getColumn(2).getValue(), is(2));
     }
     
     @Test
@@ -130,9 +131,9 @@ class DataRecordGroupEngineTest {
         
assertDataRecordsMatched(actual.iterator().next().getNonBatchRecords(), 
Arrays.asList(beforeDataRecord, afterDataRecord));
     }
     
-    private void assertDataRecordsMatched(final List<DataRecord> 
actualRecords, final List<DataRecord> expectedRecords) {
+    private void assertDataRecordsMatched(final Collection<DataRecord> 
actualRecords, final List<DataRecord> expectedRecords) {
         for (int i = 0; i < actualRecords.size(); i++) {
-            assertThat(actualRecords.get(0), 
sameInstance(expectedRecords.get(0)));
+            assertThat(actualRecords.iterator().next(), 
sameInstance(expectedRecords.get(0)));
         }
     }
     
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java
index ea77dd55b4b..a7caa8aa922 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java
@@ -31,7 +31,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJob
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
-import 
org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineDataSourceSink;
+import 
org.apache.shardingsphere.data.pipeline.core.importer.sink.type.PipelineDataSourceSink;
 import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
index f091e71c5e9..3e61f66f537 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
@@ -24,7 +24,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourc
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
 import 
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.importer.SingleChannelConsumerImporter;
-import 
org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineDataSourceSink;
+import 
org.apache.shardingsphere.data.pipeline.core.importer.sink.type.PipelineDataSourceSink;
 import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.channel.PipelineChannel;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;

Reply via email to