azexcy commented on code in PR #23168:
URL: https://github.com/apache/shardingsphere/pull/23168#discussion_r1059697360
##########
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java:
##########
@@ -19,42 +19,101 @@
import lombok.AccessLevel;
import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
+import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
+import
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressUpdatedParameter;
+import
org.apache.shardingsphere.data.pipeline.cdc.core.importer.connector.CDCImporterConnector;
+import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterType;
import
org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
/**
* CDC importer.
*/
+@Slf4j
public final class CDCImporter extends AbstractLifecycleExecutor implements
Importer {
@Getter(AccessLevel.PROTECTED)
private final ImporterConfiguration importerConfig;
private final PipelineChannel channel;
+ private final CDCImporterConnector importerConnector;
+
private final PipelineJobProgressListener jobProgressListener;
+ @Getter
+ private final ImporterType importerType;
+
private final JobRateLimitAlgorithm rateLimitAlgorithm;
- public CDCImporter(final ImporterConfiguration importerConfig, final
ImporterConnector importerConnector, final PipelineChannel channel, final
PipelineJobProgressListener jobProgressListener) {
+ public CDCImporter(final ImporterConfiguration importerConfig, final
ImporterConnector importerConnector, final PipelineChannel channel, final
PipelineJobProgressListener jobProgressListener,
+ final ImporterType importerType) {
this.importerConfig = importerConfig;
rateLimitAlgorithm = importerConfig.getRateLimitAlgorithm();
this.channel = channel;
+ this.importerConnector = (CDCImporterConnector) importerConnector;
this.jobProgressListener = jobProgressListener;
+ this.importerType = importerType;
}
@Override
protected void runBlocking() {
- // TODO to be implemented
+ int batchSize = importerConfig.getBatchSize();
+ if (ImporterType.INCREMENTAL == importerType) {
+ importerConnector.sendIncrementalStartEvent(batchSize);
+ }
+ while (isRunning()) {
+ List<Record> records = channel.fetchRecords(batchSize, 3);
+ if (null != records && !records.isEmpty()) {
+ List<Record> recordList = records.stream().filter(each ->
!(each instanceof PlaceholderRecord)).collect(Collectors.toList());
+ try {
+ processDataRecords(recordList);
+ } catch (final SQLException ex) {
+ log.error("process data records failed", ex);
+ }
+ if (FinishedRecord.class.equals(records.get(records.size() -
1).getClass())) {
+ break;
+ }
+ }
+ }
+ }
+
+ private void processDataRecords(final List<Record> recordList) throws
SQLException {
+ if (null == recordList || recordList.isEmpty()) {
+ return;
+ }
+ if (null != rateLimitAlgorithm) {
+ rateLimitAlgorithm.intercept(JobOperationType.INSERT, 1);
+ }
+ importerConnector.write(recordList, this, importerType);
+ }
+
+ /**
+ * Ack with last data record.
+ *
+ * @param lastDataRecord last data record
+ */
+ public void ackWithLastDataRecord(final Record lastDataRecord) {
+ channel.ack(Collections.singletonList(lastDataRecord));
+ jobProgressListener.onProgressUpdated(new
PipelineJobProgressUpdatedParameter(0));
Review Comment:
Fixed, but `processedRecordsCount` not used now, maybe it will be used later
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]