This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 1d99137d496 Rename PipelineCDCSocketSink (#29526)
1d99137d496 is described below
commit 1d99137d496bd597f714b14e419574654a35b656
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Dec 24 13:28:31 2023 +0800
Rename PipelineCDCSocketSink (#29526)
---
.../shardingsphere/data/pipeline/cdc/CDCJob.java | 6 ++--
...CSocketSink.java => PipelineCDCSocketSink.java} | 36 ++++++++++++----------
.../pipeline/cdc/handler/CDCBackendHandler.java | 6 ++--
3 files changed, 26 insertions(+), 22 deletions(-)
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
index 99384096348..f39a246d5f1 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
@@ -26,7 +26,7 @@ import
org.apache.shardingsphere.data.pipeline.cdc.api.CDCJobAPI;
import
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
import org.apache.shardingsphere.data.pipeline.cdc.context.CDCJobItemContext;
-import
org.apache.shardingsphere.data.pipeline.cdc.core.importer.sink.CDCSocketSink;
+import
org.apache.shardingsphere.data.pipeline.cdc.core.importer.sink.PipelineCDCSocketSink;
import org.apache.shardingsphere.data.pipeline.cdc.core.prepare.CDCJobPreparer;
import org.apache.shardingsphere.data.pipeline.cdc.core.task.CDCTasksRunner;
import org.apache.shardingsphere.data.pipeline.cdc.engine.CDCJobRunnerCleaner;
@@ -166,8 +166,8 @@ public final class CDCJob extends
AbstractInseparablePipelineJob<CDCJobItemConte
log.error("onFailure, {} task execute failed.", identifier,
throwable);
String jobId = jobItemContext.getJobId();
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().update(jobId,
jobItemContext.getShardingItem(), throwable);
- if (jobItemContext.getSink() instanceof CDCSocketSink) {
- CDCSocketSink cdcSink = (CDCSocketSink)
jobItemContext.getSink();
+ if (jobItemContext.getSink() instanceof PipelineCDCSocketSink) {
+ PipelineCDCSocketSink cdcSink = (PipelineCDCSocketSink)
jobItemContext.getSink();
cdcSink.getChannel().writeAndFlush(CDCResponseUtils.failed("",
"", throwable.getMessage()));
}
PipelineJobRegistry.stop(jobId);
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/PipelineCDCSocketSink.java
similarity index 81%
rename from
kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java
rename to
kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/PipelineCDCSocketSink.java
index a5b2c85f039..c175e38812a 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/PipelineCDCSocketSink.java
@@ -35,7 +35,6 @@ import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
@@ -43,9 +42,9 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
- * CDC socket sink.
+ * Pipeline CDC socket sink.
*/
-public final class CDCSocketSink implements PipelineSink {
+public final class PipelineCDCSocketSink implements PipelineSink {
private static final long DEFAULT_TIMEOUT_MILLISECONDS = 100L;
@@ -53,19 +52,20 @@ public final class CDCSocketSink implements PipelineSink {
private final Condition condition = lock.newCondition();
- private final ShardingSphereDatabase database;
-
@Getter
private final Channel channel;
- private final Map<String, String> tableNameSchemaMap = new HashMap<>();
+ private final ShardingSphereDatabase database;
+
+ private final Map<String, String> tableSchemaNameMap;
- public CDCSocketSink(final Channel channel, final ShardingSphereDatabase
database, final Collection<String> schemaTableNames) {
+ public PipelineCDCSocketSink(final Channel channel, final
ShardingSphereDatabase database, final Collection<String> schemaTableNames) {
this.channel = channel;
this.database = database;
+ tableSchemaNameMap = new HashMap<>(schemaTableNames.size(), 1F);
schemaTableNames.stream().filter(each ->
each.contains(".")).forEach(each -> {
String[] split = each.split("\\.");
- tableNameSchemaMap.put(split[1], split[0]);
+ tableSchemaNameMap.put(split[1], split[0]);
});
}
@@ -80,14 +80,7 @@ public final class CDCSocketSink implements PipelineSink {
if (!channel.isActive()) {
return new PipelineJobProgressUpdatedParameter(0);
}
- List<DataRecordResult.Record> resultRecords = new LinkedList<>();
- for (Record each : records) {
- if (!(each instanceof DataRecord)) {
- continue;
- }
- DataRecord dataRecord = (DataRecord) each;
-
resultRecords.add(DataRecordResultConvertUtils.convertDataRecordToRecord(database.getName(),
tableNameSchemaMap.get(dataRecord.getTableName()), dataRecord));
- }
+ Collection<DataRecordResult.Record> resultRecords =
getResultRecords(records);
DataRecordResult dataRecordResult =
DataRecordResult.newBuilder().addAllRecord(resultRecords).setAckId(ackId).build();
channel.writeAndFlush(CDCResponseUtils.succeed("",
ResponseCase.DATA_RECORD_RESULT, dataRecordResult));
return new PipelineJobProgressUpdatedParameter(resultRecords.size());
@@ -109,6 +102,17 @@ public final class CDCSocketSink implements PipelineSink {
}
}
+ private Collection<DataRecordResult.Record> getResultRecords(final
Collection<Record> records) {
+ Collection<DataRecordResult.Record> result = new LinkedList<>();
+ for (Record each : records) {
+ if (each instanceof DataRecord) {
+ DataRecord dataRecord = (DataRecord) each;
+
result.add(DataRecordResultConvertUtils.convertDataRecordToRecord(database.getName(),
tableSchemaNameMap.get(dataRecord.getTableName()), dataRecord));
+ }
+ }
+ return result;
+ }
+
@Override
public void close() throws IOException {
channel.writeAndFlush(CDCResponseUtils.failed("",
XOpenSQLState.GENERAL_ERROR.getValue(), "The socket channel is closed."));
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
index 68e17dda1df..12c628b35c9 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
@@ -31,7 +31,7 @@ import
org.apache.shardingsphere.data.pipeline.cdc.context.CDCConnectionContext;
import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckId;
import org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCImporter;
import
org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCImporterManager;
-import
org.apache.shardingsphere.data.pipeline.cdc.core.importer.sink.CDCSocketSink;
+import
org.apache.shardingsphere.data.pipeline.cdc.core.importer.sink.PipelineCDCSocketSink;
import
org.apache.shardingsphere.data.pipeline.cdc.exception.CDCExceptionWrapper;
import
org.apache.shardingsphere.data.pipeline.cdc.exception.CDCServerException;
import
org.apache.shardingsphere.data.pipeline.cdc.exception.NotFindStreamDataSourceTableException;
@@ -136,7 +136,7 @@ public final class CDCBackendHandler {
ShardingSpherePreconditions.checkNotNull(cdcJobConfig, () -> new
PipelineJobNotFoundException(jobId));
PipelineJobRegistry.stop(jobId);
ShardingSphereDatabase database =
PipelineContextManager.getProxyContext().getContextManager().getMetaDataContexts().getMetaData().getDatabase(cdcJobConfig.getDatabaseName());
- jobAPI.start(jobId, new CDCSocketSink(channel, database,
cdcJobConfig.getSchemaTableNames()));
+ jobAPI.start(jobId, new PipelineCDCSocketSink(channel, database,
cdcJobConfig.getSchemaTableNames()));
connectionContext.setJobId(jobId);
}
@@ -155,7 +155,7 @@ public final class CDCBackendHandler {
if (null == job) {
return;
}
- if (((CDCSocketSink)
job.getSink()).getChannel().id().equals(channelId)) {
+ if (((PipelineCDCSocketSink)
job.getSink()).getChannel().id().equals(channelId)) {
log.info("close CDC job, channel id: {}", channelId);
PipelineJobRegistry.stop(jobId);
jobAPI.disable(jobId);