This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d99137d496 Rename PipelineCDCSocketSink (#29526)
1d99137d496 is described below

commit 1d99137d496bd597f714b14e419574654a35b656
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Dec 24 13:28:31 2023 +0800

    Rename PipelineCDCSocketSink (#29526)
---
 .../shardingsphere/data/pipeline/cdc/CDCJob.java   |  6 ++--
 ...CSocketSink.java => PipelineCDCSocketSink.java} | 36 ++++++++++++----------
 .../pipeline/cdc/handler/CDCBackendHandler.java    |  6 ++--
 3 files changed, 26 insertions(+), 22 deletions(-)

diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
index 99384096348..f39a246d5f1 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
@@ -26,7 +26,7 @@ import 
org.apache.shardingsphere.data.pipeline.cdc.api.CDCJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
 import org.apache.shardingsphere.data.pipeline.cdc.context.CDCJobItemContext;
-import 
org.apache.shardingsphere.data.pipeline.cdc.core.importer.sink.CDCSocketSink;
+import 
org.apache.shardingsphere.data.pipeline.cdc.core.importer.sink.PipelineCDCSocketSink;
 import org.apache.shardingsphere.data.pipeline.cdc.core.prepare.CDCJobPreparer;
 import org.apache.shardingsphere.data.pipeline.cdc.core.task.CDCTasksRunner;
 import org.apache.shardingsphere.data.pipeline.cdc.engine.CDCJobRunnerCleaner;
@@ -166,8 +166,8 @@ public final class CDCJob extends 
AbstractInseparablePipelineJob<CDCJobItemConte
             log.error("onFailure, {} task execute failed.", identifier, 
throwable);
             String jobId = jobItemContext.getJobId();
             
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().update(jobId,
 jobItemContext.getShardingItem(), throwable);
-            if (jobItemContext.getSink() instanceof CDCSocketSink) {
-                CDCSocketSink cdcSink = (CDCSocketSink) 
jobItemContext.getSink();
+            if (jobItemContext.getSink() instanceof PipelineCDCSocketSink) {
+                PipelineCDCSocketSink cdcSink = (PipelineCDCSocketSink) 
jobItemContext.getSink();
                 cdcSink.getChannel().writeAndFlush(CDCResponseUtils.failed("", 
"", throwable.getMessage()));
             }
             PipelineJobRegistry.stop(jobId);
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/PipelineCDCSocketSink.java
similarity index 81%
rename from 
kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java
rename to 
kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/PipelineCDCSocketSink.java
index a5b2c85f039..c175e38812a 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/PipelineCDCSocketSink.java
@@ -35,7 +35,6 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.LinkedList;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
@@ -43,9 +42,9 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
 /**
- * CDC socket sink.
+ * Pipeline CDC socket sink.
  */
-public final class CDCSocketSink implements PipelineSink {
+public final class PipelineCDCSocketSink implements PipelineSink {
     
     private static final long DEFAULT_TIMEOUT_MILLISECONDS = 100L;
     
@@ -53,19 +52,20 @@ public final class CDCSocketSink implements PipelineSink {
     
     private final Condition condition = lock.newCondition();
     
-    private final ShardingSphereDatabase database;
-    
     @Getter
     private final Channel channel;
     
-    private final Map<String, String> tableNameSchemaMap = new HashMap<>();
+    private final ShardingSphereDatabase database;
+    
+    private final Map<String, String> tableSchemaNameMap;
     
-    public CDCSocketSink(final Channel channel, final ShardingSphereDatabase 
database, final Collection<String> schemaTableNames) {
+    public PipelineCDCSocketSink(final Channel channel, final 
ShardingSphereDatabase database, final Collection<String> schemaTableNames) {
         this.channel = channel;
         this.database = database;
+        tableSchemaNameMap = new HashMap<>(schemaTableNames.size(), 1F);
         schemaTableNames.stream().filter(each -> 
each.contains(".")).forEach(each -> {
             String[] split = each.split("\\.");
-            tableNameSchemaMap.put(split[1], split[0]);
+            tableSchemaNameMap.put(split[1], split[0]);
         });
     }
     
@@ -80,14 +80,7 @@ public final class CDCSocketSink implements PipelineSink {
         if (!channel.isActive()) {
             return new PipelineJobProgressUpdatedParameter(0);
         }
-        List<DataRecordResult.Record> resultRecords = new LinkedList<>();
-        for (Record each : records) {
-            if (!(each instanceof DataRecord)) {
-                continue;
-            }
-            DataRecord dataRecord = (DataRecord) each;
-            
resultRecords.add(DataRecordResultConvertUtils.convertDataRecordToRecord(database.getName(),
 tableNameSchemaMap.get(dataRecord.getTableName()), dataRecord));
-        }
+        Collection<DataRecordResult.Record> resultRecords = 
getResultRecords(records);
         DataRecordResult dataRecordResult = 
DataRecordResult.newBuilder().addAllRecord(resultRecords).setAckId(ackId).build();
         channel.writeAndFlush(CDCResponseUtils.succeed("", 
ResponseCase.DATA_RECORD_RESULT, dataRecordResult));
         return new PipelineJobProgressUpdatedParameter(resultRecords.size());
@@ -109,6 +102,17 @@ public final class CDCSocketSink implements PipelineSink {
         }
     }
     
+    private Collection<DataRecordResult.Record> getResultRecords(final 
Collection<Record> records) {
+        Collection<DataRecordResult.Record> result = new LinkedList<>();
+        for (Record each : records) {
+            if (each instanceof DataRecord) {
+                DataRecord dataRecord = (DataRecord) each;
+                
result.add(DataRecordResultConvertUtils.convertDataRecordToRecord(database.getName(),
 tableSchemaNameMap.get(dataRecord.getTableName()), dataRecord));
+            }
+        }
+        return result;
+    }
+    
     @Override
     public void close() throws IOException {
         channel.writeAndFlush(CDCResponseUtils.failed("", 
XOpenSQLState.GENERAL_ERROR.getValue(), "The socket channel is closed."));
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
index 68e17dda1df..12c628b35c9 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
@@ -31,7 +31,7 @@ import 
org.apache.shardingsphere.data.pipeline.cdc.context.CDCConnectionContext;
 import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckId;
 import org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCImporter;
 import 
org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCImporterManager;
-import 
org.apache.shardingsphere.data.pipeline.cdc.core.importer.sink.CDCSocketSink;
+import 
org.apache.shardingsphere.data.pipeline.cdc.core.importer.sink.PipelineCDCSocketSink;
 import 
org.apache.shardingsphere.data.pipeline.cdc.exception.CDCExceptionWrapper;
 import 
org.apache.shardingsphere.data.pipeline.cdc.exception.CDCServerException;
 import 
org.apache.shardingsphere.data.pipeline.cdc.exception.NotFindStreamDataSourceTableException;
@@ -136,7 +136,7 @@ public final class CDCBackendHandler {
         ShardingSpherePreconditions.checkNotNull(cdcJobConfig, () -> new 
PipelineJobNotFoundException(jobId));
         PipelineJobRegistry.stop(jobId);
         ShardingSphereDatabase database = 
PipelineContextManager.getProxyContext().getContextManager().getMetaDataContexts().getMetaData().getDatabase(cdcJobConfig.getDatabaseName());
-        jobAPI.start(jobId, new CDCSocketSink(channel, database, 
cdcJobConfig.getSchemaTableNames()));
+        jobAPI.start(jobId, new PipelineCDCSocketSink(channel, database, 
cdcJobConfig.getSchemaTableNames()));
         connectionContext.setJobId(jobId);
     }
     
@@ -155,7 +155,7 @@ public final class CDCBackendHandler {
         if (null == job) {
             return;
         }
-        if (((CDCSocketSink) 
job.getSink()).getChannel().id().equals(channelId)) {
+        if (((PipelineCDCSocketSink) 
job.getSink()).getChannel().id().equals(channelId)) {
             log.info("close CDC job, channel id: {}", channelId);
             PipelineJobRegistry.stop(jobId);
             jobAPI.disable(jobId);

Reply via email to