azexcy commented on code in PR #23168:
URL: https://github.com/apache/shardingsphere/pull/23168#discussion_r1059699996
##########
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/connector/CDCImporterConnector.java:
##########
@@ -18,24 +18,201 @@
package org.apache.shardingsphere.data.pipeline.cdc.core.importer.connector;
import io.netty.channel.Channel;
+import lombok.Getter;
import lombok.RequiredArgsConstructor;
+import lombok.Setter;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
+import org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCImporter;
+import
org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseGenerator;
+import org.apache.shardingsphere.data.pipeline.cdc.holder.CDCAckHolder;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult;
+import
org.apache.shardingsphere.data.pipeline.cdc.util.DataRecordResultConvertUtil;
+import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
+import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterType;
import
org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+
/**
* CDC importer connector.
*/
-@RequiredArgsConstructor
+@Slf4j
public final class CDCImporterConnector implements ImporterConnector {
+ private static final long DEFAULT_TIMEOUT_MILLISECONDS = 200L;
+
+ private final Lock lock = new ReentrantLock();
+
+ private final Condition condition = lock.newCondition();
+
+ @Setter
+ private volatile boolean running = true;
+
+ @Getter
+ private final String database;
+
private final Channel channel;
+ private final int jobShardingCount;
+
+ private final Comparator<DataRecord> dataRecordComparator;
+
+ private final Map<String, String> tableNameSchemaMap = new HashMap<>();
+
+ private final Map<CDCImporter, BlockingQueue<Record>> incrementalRecordMap
= new ConcurrentHashMap<>();
+
+ private final AtomicInteger runningIncrementalTaskCount = new
AtomicInteger(0);
+
+ private Thread incrementalImporterTask;
+
+ public CDCImporterConnector(final Channel channel, final String database,
final int jobShardingCount, final List<String> tableNames, final
Comparator<DataRecord> dataRecordComparator) {
+ this.channel = channel;
+ this.database = database;
+ this.jobShardingCount = jobShardingCount;
+ tableNames.stream().filter(each -> each.contains(".")).forEach(each ->
{
+ String[] split = each.split("\\.");
+ tableNameSchemaMap.put(split[0], split[1]);
+ });
+ this.dataRecordComparator = dataRecordComparator;
+ }
+
@Override
public Object getConnector() {
return channel;
}
+ /**
+ * Write data record into channel.
+ *
+ * @param recordList data records
+ * @param cdcImporter cdc importer
+ * @param importerType importer type
+ */
+ public void write(final List<Record> recordList, final CDCImporter
cdcImporter, final ImporterType importerType) {
+ if (ImporterType.INVENTORY == importerType || null ==
dataRecordComparator) {
+ Map<CDCImporter, Record> importerDataRecordMap = new HashMap<>();
+ importerDataRecordMap.put(cdcImporter,
recordList.get(recordList.size() - 1));
+ writeImmediately(recordList, importerDataRecordMap);
+ } else if (ImporterType.INCREMENTAL == importerType) {
+ writeIntoQueue(recordList, cdcImporter);
+ }
+ }
+
+ private void writeImmediately(final List<Record> recordList, final
Map<CDCImporter, Record> importerDataRecordMap) {
+ while (!channel.isWritable() && channel.isActive()) {
+ doAwait();
+ }
+ List<DataRecordResult.Record> records = new LinkedList<>();
+ for (Record each : recordList) {
+ if (each instanceof DataRecord) {
+ DataRecord dataRecord = (DataRecord) each;
+
records.add(DataRecordResultConvertUtil.convertDataRecordToRecord(database,
tableNameSchemaMap.get(dataRecord.getTableName()), dataRecord));
+ }
+ }
+ String ackId =
CDCAckHolder.getInstance().bindAckId(importerDataRecordMap);
+ DataRecordResult dataRecordResult =
DataRecordResult.newBuilder().addAllRecords(records).setAckId(ackId).build();
+
channel.writeAndFlush(CDCResponseGenerator.succeedBuilder("").setDataRecordResult(dataRecordResult).build());
+ }
+
+ private void doAwait() {
+ lock.lock();
+ try {
+ condition.await(DEFAULT_TIMEOUT_MILLISECONDS,
TimeUnit.MILLISECONDS);
+ } catch (final InterruptedException ignored) {
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @SneakyThrows(InterruptedException.class)
+ private void writeIntoQueue(final List<Record> dataRecords, final
CDCImporter cdcImporter) {
+ BlockingQueue<Record> blockingQueue =
incrementalRecordMap.computeIfAbsent(cdcImporter, ignored -> new
ArrayBlockingQueue<>(500));
+ for (Record each : dataRecords) {
+ blockingQueue.put(each);
+ }
+ }
+
+ /**
+ * Send finished record event.
+ *
+ * @param batchSize batch size
+ */
+ public void sendIncrementalStartEvent(final int batchSize) {
+ int count = runningIncrementalTaskCount.incrementAndGet();
+ if (count < jobShardingCount || null == dataRecordComparator) {
+ return;
+ }
+ log.debug("start CDC incremental importer");
+ if (null == incrementalImporterTask) {
+ incrementalImporterTask = new Thread(new
CDCIncrementalImporterTask(batchSize));
+ incrementalImporterTask.start();
+ }
+ }
+
@Override
public String getType() {
return "CDC";
}
+
+ @RequiredArgsConstructor
+ private final class CDCIncrementalImporterTask implements Runnable {
+
+ private final int batchSize;
+
+ @Override
+ public void run() {
+ while (running && null != dataRecordComparator) {
+ int index = 0;
+ List<Record> dataRecords = new LinkedList<>();
+ Map<CDCImporter, Record> pipelineChannelPositions = new
HashMap<>(incrementalRecordMap.size(), 1);
+ while (index < batchSize) {
+ Map<Record, CDCImporter> recordChannelMap = new
HashMap<>(incrementalRecordMap.size(), 1);
+ for (Entry<CDCImporter, BlockingQueue<Record>> entry :
incrementalRecordMap.entrySet()) {
+ BlockingQueue<Record> blockingQueue = entry.getValue();
+ if (null == blockingQueue.peek()) {
+ continue;
+ }
+ Record record = blockingQueue.poll();
+ if (record instanceof FinishedRecord) {
+ continue;
+ }
+ recordChannelMap.put(record, entry.getKey());
+
pipelineChannelPositions.computeIfAbsent(entry.getKey(), key -> record);
+ }
+ List<DataRecord> filterRecord =
recordChannelMap.keySet().stream().filter(each -> each instanceof
DataRecord).map(each -> (DataRecord) each).collect(Collectors.toList());
+ if (filterRecord.isEmpty()) {
+ break;
+ }
+ DataRecord minDataRecord = Collections.min(filterRecord,
dataRecordComparator);
+ dataRecords.add(minDataRecord);
Review Comment:
Improved, use peek firstly
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]