azexcy commented on code in PR #23168:
URL: https://github.com/apache/shardingsphere/pull/23168#discussion_r1059697360


##########
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java:
##########
@@ -19,42 +19,101 @@
 
 import lombok.AccessLevel;
 import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
 import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
+import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
+import 
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressUpdatedParameter;
+import 
org.apache.shardingsphere.data.pipeline.cdc.core.importer.connector.CDCImporterConnector;
+import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterType;
 import 
org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
 import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
 /**
  * CDC importer.
  */
+@Slf4j
 public final class CDCImporter extends AbstractLifecycleExecutor implements 
Importer {
     
     @Getter(AccessLevel.PROTECTED)
     private final ImporterConfiguration importerConfig;
     
     private final PipelineChannel channel;
     
+    private final CDCImporterConnector importerConnector;
+    
     private final PipelineJobProgressListener jobProgressListener;
     
+    @Getter
+    private final ImporterType importerType;
+    
     private final JobRateLimitAlgorithm rateLimitAlgorithm;
     
-    public CDCImporter(final ImporterConfiguration importerConfig, final 
ImporterConnector importerConnector, final PipelineChannel channel, final 
PipelineJobProgressListener jobProgressListener) {
+    public CDCImporter(final ImporterConfiguration importerConfig, final 
ImporterConnector importerConnector, final PipelineChannel channel, final 
PipelineJobProgressListener jobProgressListener,
+                       final ImporterType importerType) {
         this.importerConfig = importerConfig;
         rateLimitAlgorithm = importerConfig.getRateLimitAlgorithm();
         this.channel = channel;
+        this.importerConnector = (CDCImporterConnector) importerConnector;
         this.jobProgressListener = jobProgressListener;
+        this.importerType = importerType;
     }
     
     @Override
     protected void runBlocking() {
-        // TODO to be implemented
+        int batchSize = importerConfig.getBatchSize();
+        if (ImporterType.INCREMENTAL == importerType) {
+            importerConnector.sendIncrementalStartEvent(batchSize);
+        }
+        while (isRunning()) {
+            List<Record> records = channel.fetchRecords(batchSize, 3);
+            if (null != records && !records.isEmpty()) {
+                List<Record> recordList = records.stream().filter(each -> 
!(each instanceof PlaceholderRecord)).collect(Collectors.toList());
+                try {
+                    processDataRecords(recordList);
+                } catch (final SQLException ex) {
+                    log.error("process data records failed", ex);
+                }
+                if (FinishedRecord.class.equals(records.get(records.size() - 
1).getClass())) {
+                    break;
+                }
+            }
+        }
+    }
+    
+    private void processDataRecords(final List<Record> recordList) throws 
SQLException {
+        if (null == recordList || recordList.isEmpty()) {
+            return;
+        }
+        if (null != rateLimitAlgorithm) {
+            rateLimitAlgorithm.intercept(JobOperationType.INSERT, 1);
+        }
+        importerConnector.write(recordList, this, importerType);
+    }
+    
+    /**
+     * Ack with last data record.
+     *
+     * @param lastDataRecord last data record
+     */
+    public void ackWithLastDataRecord(final Record lastDataRecord) {
+        channel.ack(Collections.singletonList(lastDataRecord));
+        jobProgressListener.onProgressUpdated(new 
PipelineJobProgressUpdatedParameter(0));

Review Comment:
   Fixed, but `processedRecordsCount` not used now, maybe it will be used later
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to