This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 778161eed71 Refactor for code format of CDC (#26737)
778161eed71 is described below

commit 778161eed71dab4e16c63d88a02fff9e226335fe
Author: Xinze Guo <101622833+aze...@users.noreply.github.com>
AuthorDate: Mon Jul 3 12:34:55 2023 +0800

    Refactor for code format of CDC (#26737)
    
    * Refactor CDCJobItemContext package name
    
    * Refactor CDCResponseGenerator
    
    * Rename prepareIncremental to initIncrementalPosition
    
    * Improve CDC swapper package name
---
 .../data/pipeline/cdc/api/impl/CDCJobAPI.java      |  6 +-
 .../cdc/context/{job => }/CDCJobItemContext.java   |  3 +-
 .../cdc/core/importer/sink/CDCSocketSink.java      |  5 +-
 .../data/pipeline/cdc/core/job/CDCJob.java         |  4 +-
 .../pipeline/cdc/core/prepare/CDCJobPreparer.java  |  6 +-
 .../cdc/generator/CDCResponseGenerator.java        | 53 -------------
 .../pipeline/cdc/generator/CDCResponseUtils.java   | 88 ++++++++++++++++++++++
 .../pipeline/cdc/handler/CDCBackendHandler.java    |  5 +-
 .../{job => config}/YamlCDCJobConfiguration.java   |  2 +-
 .../YamlCDCJobConfigurationSwapper.java            |  5 +-
 .../job/YamlCDCJobConfigurationSwapperTest.java    |  4 +-
 .../frontend/netty/CDCChannelInboundHandler.java   | 14 ++--
 12 files changed, 117 insertions(+), 78 deletions(-)

diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index e979b33b6a3..bb9c398c4fd 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -38,9 +38,9 @@ import 
org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
 import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext;
 import org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJob;
 import org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJobId;
-import 
org.apache.shardingsphere.data.pipeline.cdc.yaml.job.YamlCDCJobConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.cdc.yaml.job.YamlCDCJobConfiguration.YamlSinkConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.cdc.yaml.job.YamlCDCJobConfigurationSwapper;
+import 
org.apache.shardingsphere.data.pipeline.cdc.yaml.config.YamlCDCJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.cdc.yaml.config.YamlCDCJobConfiguration.YamlSinkConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.cdc.yaml.swapper.YamlCDCJobConfigurationSwapper;
 import 
org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration;
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/job/CDCJobItemContext.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java
similarity index 97%
rename from 
kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/job/CDCJobItemContext.java
rename to 
kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java
index eb4f902fd14..4c08d89209b 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/job/CDCJobItemContext.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.cdc.context.job;
+package org.apache.shardingsphere.data.pipeline.cdc.context;
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
@@ -26,7 +26,6 @@ import org.apache.commons.lang3.concurrent.LazyInitializer;
 import 
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
-import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext;
 import 
org.apache.shardingsphere.data.pipeline.common.context.InventoryIncrementalJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java
index ef48f9be588..0db35459a89 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java
@@ -22,7 +22,8 @@ import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
-import 
org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseGenerator;
+import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseUtils;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.ResponseCase;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult;
 import 
org.apache.shardingsphere.data.pipeline.cdc.util.DataRecordResultConvertUtils;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.listener.PipelineJobProgressUpdatedParameter;
@@ -92,7 +93,7 @@ public final class CDCSocketSink implements PipelineSink {
             
resultRecords.add(DataRecordResultConvertUtils.convertDataRecordToRecord(database.getName(),
 tableNameSchemaMap.get(dataRecord.getTableName()), dataRecord));
         }
         DataRecordResult dataRecordResult = 
DataRecordResult.newBuilder().addAllRecord(resultRecords).setAckId(ackId).build();
-        
channel.writeAndFlush(CDCResponseGenerator.succeedBuilder("").setDataRecordResult(dataRecordResult).build());
+        channel.writeAndFlush(CDCResponseUtils.succeed("", 
ResponseCase.DATA_RECORD_RESULT, dataRecordResult));
         return new PipelineJobProgressUpdatedParameter(resultRecords.size());
     }
     
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
index b67eda9c1da..1d7ee243f9a 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
@@ -24,10 +24,10 @@ import 
org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
 import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext;
-import 
org.apache.shardingsphere.data.pipeline.cdc.context.job.CDCJobItemContext;
+import org.apache.shardingsphere.data.pipeline.cdc.context.CDCJobItemContext;
 import org.apache.shardingsphere.data.pipeline.cdc.core.prepare.CDCJobPreparer;
 import org.apache.shardingsphere.data.pipeline.cdc.core.task.CDCTasksRunner;
-import 
org.apache.shardingsphere.data.pipeline.cdc.yaml.job.YamlCDCJobConfigurationSwapper;
+import 
org.apache.shardingsphere.data.pipeline.cdc.yaml.swapper.YamlCDCJobConfigurationSwapper;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager;
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
index edd5e8a4d9e..28d9617a341 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
@@ -26,7 +26,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPositio
 import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
 import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext;
-import 
org.apache.shardingsphere.data.pipeline.cdc.context.job.CDCJobItemContext;
+import org.apache.shardingsphere.data.pipeline.cdc.context.CDCJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCChannelProgressPair;
 import org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCImporter;
 import 
org.apache.shardingsphere.data.pipeline.cdc.core.task.CDCIncrementalTask;
@@ -92,14 +92,14 @@ public final class CDCJobPreparer {
             PipelineJobCenter.stop(jobItemContext.getJobId());
             return;
         }
-        prepareIncremental(jobItemContext);
+        initIncrementalPosition(jobItemContext);
         if (jobItemContext.getJobConfig().isFull()) {
             initInventoryTasks(jobItemContext, inventoryImporterUsed, 
inventoryChannelProgressPairs);
         }
         initIncrementalTask(jobItemContext, incrementalImporterUsed, 
incrementalChannelProgressPairs);
     }
     
-    private void prepareIncremental(final CDCJobItemContext jobItemContext) {
+    private void initIncrementalPosition(final CDCJobItemContext 
jobItemContext) {
         CDCTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
         JobItemIncrementalTasksProgress initIncremental = null == 
jobItemContext.getInitProgress() ? null : 
jobItemContext.getInitProgress().getIncremental();
         try {
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/generator/CDCResponseGenerator.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/generator/CDCResponseGenerator.java
deleted file mode 100644
index 9983609f30c..00000000000
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/generator/CDCResponseGenerator.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.cdc.generator;
-
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
-import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.Builder;
-import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.Status;
-
-/**
- * CDC response message generator.
- */
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class CDCResponseGenerator {
-    
-    /**
-     * Succeed response builder.
-     *
-     * @param requestId request id
-     * @return succeed response builder
-     */
-    public static Builder succeedBuilder(final String requestId) {
-        return 
CDCResponse.newBuilder().setStatus(Status.SUCCEED).setRequestId(requestId);
-    }
-    
-    /**
-     * Failed response.
-     *
-     * @param requestId request id
-     * @param errorCode error code
-     * @param errorMessage error message
-     * @return failed response
-     */
-    public static CDCResponse failed(final String requestId, final String 
errorCode, final String errorMessage) {
-        return 
CDCResponse.newBuilder().setStatus(Status.FAILED).setRequestId(requestId).setErrorCode(errorCode).setErrorMessage(errorMessage).build();
-    }
-}
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/generator/CDCResponseUtils.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/generator/CDCResponseUtils.java
new file mode 100644
index 00000000000..1c4f0f73b8c
--- /dev/null
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/generator/CDCResponseUtils.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.generator;
+
+import com.google.protobuf.Message;
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.Builder;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.ResponseCase;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.Status;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.ServerGreetingResult;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.StreamDataResult;
+import 
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
+
+/**
+ * CDC response utils.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class CDCResponseUtils {
+    
+    /**
+     * Succeed response.
+     *
+     * @param requestId request id
+     * @return CDC response
+     */
+    public static CDCResponse succeed(final String requestId) {
+        return succeed(requestId, ResponseCase.RESPONSE_NOT_SET, null);
+    }
+    
+    /**
+     * Succeed response.
+     *
+     * @param requestId request id
+     * @param responseCase response case
+     * @param response response
+     * @return succeed response builder
+     * @throws PipelineInvalidParameterException pipeline invalid parameter 
exception
+     */
+    public static CDCResponse succeed(final String requestId, final 
CDCResponse.ResponseCase responseCase, final Message response) {
+        Builder result = 
CDCResponse.newBuilder().setStatus(Status.SUCCEED).setRequestId(requestId);
+        switch (responseCase) {
+            case SERVER_GREETING_RESULT:
+                result.setServerGreetingResult((ServerGreetingResult) 
response);
+                break;
+            case DATA_RECORD_RESULT:
+                result.setDataRecordResult((DataRecordResult) response);
+                break;
+            case STREAM_DATA_RESULT:
+                result.setStreamDataResult((StreamDataResult) response);
+                break;
+            case RESPONSE_NOT_SET:
+                break;
+            default:
+                throw new 
PipelineInvalidParameterException(responseCase.name());
+        }
+        return result.build();
+    }
+    
+    /**
+     * Failed response.
+     *
+     * @param requestId request id
+     * @param errorCode error code
+     * @param errorMessage error message
+     * @return failed response
+     */
+    public static CDCResponse failed(final String requestId, final String 
errorCode, final String errorMessage) {
+        return 
CDCResponse.newBuilder().setStatus(Status.FAILED).setRequestId(requestId).setErrorCode(errorCode).setErrorMessage(errorMessage).build();
+    }
+}
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
index a083e753ee4..e733e0e5488 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
@@ -34,11 +34,12 @@ import 
org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJob;
 import 
org.apache.shardingsphere.data.pipeline.cdc.exception.CDCExceptionWrapper;
 import 
org.apache.shardingsphere.data.pipeline.cdc.exception.CDCServerException;
 import 
org.apache.shardingsphere.data.pipeline.cdc.exception.NotFindStreamDataSourceTableException;
-import 
org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseGenerator;
+import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseUtils;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.AckStreamingRequestBody;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody.SchemaTable;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.ResponseCase;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.StreamDataResult;
 import org.apache.shardingsphere.data.pipeline.cdc.util.CDCDataNodeUtils;
 import org.apache.shardingsphere.data.pipeline.cdc.util.CDCSchemaTableUtils;
@@ -111,7 +112,7 @@ public final class CDCBackendHandler {
         String jobId = jobAPI.createJob(parameter, CDCSinkType.SOCKET, new 
Properties());
         connectionContext.setJobId(jobId);
         startStreaming(jobId, connectionContext, channel);
-        return 
CDCResponseGenerator.succeedBuilder(requestId).setStreamDataResult(StreamDataResult.newBuilder().setStreamingId(jobId).build()).build();
+        return CDCResponseUtils.succeed(requestId, 
ResponseCase.STREAM_DATA_RESULT, 
StreamDataResult.newBuilder().setStreamingId(jobId).build());
     }
     
     /**
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfiguration.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/config/YamlCDCJobConfiguration.java
similarity index 96%
rename from 
kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfiguration.java
rename to 
kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/config/YamlCDCJobConfiguration.java
index 246cebb9ab5..9c2580970af 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfiguration.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/config/YamlCDCJobConfiguration.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.cdc.yaml.job;
+package org.apache.shardingsphere.data.pipeline.cdc.yaml.config;
 
 import lombok.Getter;
 import lombok.Setter;
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapper.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/swapper/YamlCDCJobConfigurationSwapper.java
similarity index 95%
rename from 
kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapper.java
rename to 
kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/swapper/YamlCDCJobConfigurationSwapper.java
index 4348c97d7c5..f4cb413f470 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapper.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/swapper/YamlCDCJobConfigurationSwapper.java
@@ -15,14 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.cdc.yaml.job;
+package org.apache.shardingsphere.data.pipeline.cdc.yaml.swapper;
 
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfigurationSwapper;
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration.SinkConfiguration;
 import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
-import 
org.apache.shardingsphere.data.pipeline.cdc.yaml.job.YamlCDCJobConfiguration.YamlSinkConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.cdc.yaml.config.YamlCDCJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.cdc.yaml.config.YamlCDCJobConfiguration.YamlSinkConfiguration;
 import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import 
org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapperTest.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapperTest.java
index eabd72bc50b..a4888939630 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapperTest.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapperTest.java
@@ -19,7 +19,9 @@ package org.apache.shardingsphere.data.pipeline.cdc.yaml.job;
 
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
-import 
org.apache.shardingsphere.data.pipeline.cdc.yaml.job.YamlCDCJobConfiguration.YamlSinkConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.cdc.yaml.config.YamlCDCJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.cdc.yaml.config.YamlCDCJobConfiguration.YamlSinkConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.cdc.yaml.swapper.YamlCDCJobConfigurationSwapper;
 import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
diff --git 
a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
 
b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
index 390bfa5736e..41f925ed363 100644
--- 
a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
+++ 
b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
@@ -29,7 +29,7 @@ import org.apache.shardingsphere.authority.rule.AuthorityRule;
 import 
org.apache.shardingsphere.data.pipeline.cdc.context.CDCConnectionContext;
 import 
org.apache.shardingsphere.data.pipeline.cdc.exception.CDCExceptionWrapper;
 import org.apache.shardingsphere.data.pipeline.cdc.exception.CDCLoginException;
-import 
org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseGenerator;
+import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseUtils;
 import org.apache.shardingsphere.data.pipeline.cdc.handler.CDCBackendHandler;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.AckStreamingRequestBody;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
@@ -90,9 +90,9 @@ public final class CDCChannelInboundHandler extends 
ChannelInboundHandlerAdapter
         if (cause instanceof CDCExceptionWrapper) {
             CDCExceptionWrapper wrapper = (CDCExceptionWrapper) cause;
             ShardingSphereSQLException exception = wrapper.getException();
-            channelFuture = 
ctx.writeAndFlush(CDCResponseGenerator.failed(wrapper.getRequestId(), 
exception.toSQLException().getSQLState(), exception.getMessage()));
+            channelFuture = 
ctx.writeAndFlush(CDCResponseUtils.failed(wrapper.getRequestId(), 
exception.toSQLException().getSQLState(), exception.getMessage()));
         } else {
-            channelFuture = ctx.writeAndFlush(CDCResponseGenerator.failed("", 
XOpenSQLState.GENERAL_ERROR.getValue(), String.valueOf(cause.getMessage())));
+            channelFuture = ctx.writeAndFlush(CDCResponseUtils.failed("", 
XOpenSQLState.GENERAL_ERROR.getValue(), String.valueOf(cause.getMessage())));
         }
         CDCConnectionContext connectionContext = 
ctx.channel().attr(CONNECTION_CONTEXT_KEY).get();
         if (null == connectionContext) {
@@ -139,7 +139,7 @@ public final class CDCChannelInboundHandler extends 
ChannelInboundHandlerAdapter
         Optional<ShardingSphereUser> user = authorityRule.findUser(new 
Grantee(body.getUsername(), getHostAddress(ctx)));
         if (user.isPresent() && 
Objects.equals(Hashing.sha256().hashBytes(user.get().getPassword().getBytes()).toString().toUpperCase(),
 body.getPassword())) {
             ctx.channel().attr(CONNECTION_CONTEXT_KEY).set(new 
CDCConnectionContext(user.get()));
-            
ctx.writeAndFlush(CDCResponseGenerator.succeedBuilder(request.getRequestId()).build());
+            
ctx.writeAndFlush(CDCResponseUtils.succeed(request.getRequestId()));
         } else {
             throw new CDCExceptionWrapper(request.getRequestId(), new 
CDCLoginException("Illegal username or password"));
         }
@@ -197,7 +197,7 @@ public final class CDCChannelInboundHandler extends 
ChannelInboundHandlerAdapter
         String database = 
backendHandler.getDatabaseNameByJobId(requestBody.getStreamingId());
         checkPrivileges(request.getRequestId(), 
connectionContext.getCurrentUser().getGrantee(), database);
         backendHandler.startStreaming(requestBody.getStreamingId(), 
connectionContext, ctx.channel());
-        
ctx.writeAndFlush(CDCResponseGenerator.succeedBuilder(request.getRequestId()).build());
+        ctx.writeAndFlush(CDCResponseUtils.succeed(request.getRequestId()));
     }
     
     private void processStopStreamingRequest(final ChannelHandlerContext ctx, 
final CDCRequest request, final CDCConnectionContext connectionContext) {
@@ -206,7 +206,7 @@ public final class CDCChannelInboundHandler extends 
ChannelInboundHandlerAdapter
         checkPrivileges(request.getRequestId(), 
connectionContext.getCurrentUser().getGrantee(), database);
         backendHandler.stopStreaming(connectionContext.getJobId(), 
ctx.channel().id());
         connectionContext.setJobId(null);
-        
ctx.writeAndFlush(CDCResponseGenerator.succeedBuilder(request.getRequestId()).build());
+        ctx.writeAndFlush(CDCResponseUtils.succeed(request.getRequestId()));
     }
     
     private void processDropStreamingRequest(final ChannelHandlerContext ctx, 
final CDCRequest request, final CDCConnectionContext connectionContext) {
@@ -214,6 +214,6 @@ public final class CDCChannelInboundHandler extends 
ChannelInboundHandlerAdapter
         checkPrivileges(request.getRequestId(), 
connectionContext.getCurrentUser().getGrantee(), 
backendHandler.getDatabaseNameByJobId(requestBody.getStreamingId()));
         backendHandler.dropStreaming(connectionContext.getJobId());
         connectionContext.setJobId(null);
-        
ctx.writeAndFlush(CDCResponseGenerator.succeedBuilder(request.getRequestId()).build());
+        ctx.writeAndFlush(CDCResponseUtils.succeed(request.getRequestId()));
     }
 }

Reply via email to