This is an automated email from the ASF dual-hosted git repository. zhonghongsheng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push: new 778161eed71 Refactor for code format of CDC (#26737) 778161eed71 is described below commit 778161eed71dab4e16c63d88a02fff9e226335fe Author: Xinze Guo <101622833+aze...@users.noreply.github.com> AuthorDate: Mon Jul 3 12:34:55 2023 +0800 Refactor for code format of CDC (#26737) * Refactor CDCJobItemContext package name * Refactor CDCResponseGenerator * Rename prepareIncremental to initIncrementalPosition * Improve CDC swapper package name --- .../data/pipeline/cdc/api/impl/CDCJobAPI.java | 6 +- .../cdc/context/{job => }/CDCJobItemContext.java | 3 +- .../cdc/core/importer/sink/CDCSocketSink.java | 5 +- .../data/pipeline/cdc/core/job/CDCJob.java | 4 +- .../pipeline/cdc/core/prepare/CDCJobPreparer.java | 6 +- .../cdc/generator/CDCResponseGenerator.java | 53 ------------- .../pipeline/cdc/generator/CDCResponseUtils.java | 88 ++++++++++++++++++++++ .../pipeline/cdc/handler/CDCBackendHandler.java | 5 +- .../{job => config}/YamlCDCJobConfiguration.java | 2 +- .../YamlCDCJobConfigurationSwapper.java | 5 +- .../job/YamlCDCJobConfigurationSwapperTest.java | 4 +- .../frontend/netty/CDCChannelInboundHandler.java | 14 ++-- 12 files changed, 117 insertions(+), 78 deletions(-) diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java index e979b33b6a3..bb9c398c4fd 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java @@ -38,9 +38,9 @@ import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType; import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext; import org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJob; import org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJobId; -import org.apache.shardingsphere.data.pipeline.cdc.yaml.job.YamlCDCJobConfiguration; -import org.apache.shardingsphere.data.pipeline.cdc.yaml.job.YamlCDCJobConfiguration.YamlSinkConfiguration; -import org.apache.shardingsphere.data.pipeline.cdc.yaml.job.YamlCDCJobConfigurationSwapper; +import org.apache.shardingsphere.data.pipeline.cdc.yaml.config.YamlCDCJobConfiguration; +import org.apache.shardingsphere.data.pipeline.cdc.yaml.config.YamlCDCJobConfiguration.YamlSinkConfiguration; +import org.apache.shardingsphere.data.pipeline.cdc.yaml.swapper.YamlCDCJobConfigurationSwapper; import org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration; import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration; import org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration; diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/job/CDCJobItemContext.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java similarity index 97% rename from kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/job/CDCJobItemContext.java rename to kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java index eb4f902fd14..4c08d89209b 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/job/CDCJobItemContext.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.shardingsphere.data.pipeline.cdc.context.job; +package org.apache.shardingsphere.data.pipeline.cdc.context; import lombok.Getter; import lombok.RequiredArgsConstructor; @@ -26,7 +26,6 @@ import org.apache.commons.lang3.concurrent.LazyInitializer; import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader; import org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration; import org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration; -import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext; import org.apache.shardingsphere.data.pipeline.common.context.InventoryIncrementalJobItemContext; import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager; import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper; diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java index ef48f9be588..0db35459a89 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java @@ -22,7 +22,8 @@ import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord; import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record; -import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseGenerator; +import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseUtils; +import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.ResponseCase; import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult; import org.apache.shardingsphere.data.pipeline.cdc.util.DataRecordResultConvertUtils; import org.apache.shardingsphere.data.pipeline.common.job.progress.listener.PipelineJobProgressUpdatedParameter; @@ -92,7 +93,7 @@ public final class CDCSocketSink implements PipelineSink { resultRecords.add(DataRecordResultConvertUtils.convertDataRecordToRecord(database.getName(), tableNameSchemaMap.get(dataRecord.getTableName()), dataRecord)); } DataRecordResult dataRecordResult = DataRecordResult.newBuilder().addAllRecord(resultRecords).setAckId(ackId).build(); - channel.writeAndFlush(CDCResponseGenerator.succeedBuilder("").setDataRecordResult(dataRecordResult).build()); + channel.writeAndFlush(CDCResponseUtils.succeed("", ResponseCase.DATA_RECORD_RESULT, dataRecordResult)); return new PipelineJobProgressUpdatedParameter(resultRecords.size()); } diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java index b67eda9c1da..1d7ee243f9a 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java @@ -24,10 +24,10 @@ import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI; import org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration; import org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration; import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext; -import org.apache.shardingsphere.data.pipeline.cdc.context.job.CDCJobItemContext; +import org.apache.shardingsphere.data.pipeline.cdc.context.CDCJobItemContext; import org.apache.shardingsphere.data.pipeline.cdc.core.prepare.CDCJobPreparer; import org.apache.shardingsphere.data.pipeline.cdc.core.task.CDCTasksRunner; -import org.apache.shardingsphere.data.pipeline.cdc.yaml.job.YamlCDCJobConfigurationSwapper; +import org.apache.shardingsphere.data.pipeline.cdc.yaml.swapper.YamlCDCJobConfigurationSwapper; import org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext; import org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipelineDataSourceManager; import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager; diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java index edd5e8a4d9e..28d9617a341 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java @@ -26,7 +26,7 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPositio import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI; import org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration; import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext; -import org.apache.shardingsphere.data.pipeline.cdc.context.job.CDCJobItemContext; +import org.apache.shardingsphere.data.pipeline.cdc.context.CDCJobItemContext; import org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCChannelProgressPair; import org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCImporter; import org.apache.shardingsphere.data.pipeline.cdc.core.task.CDCIncrementalTask; @@ -92,14 +92,14 @@ public final class CDCJobPreparer { PipelineJobCenter.stop(jobItemContext.getJobId()); return; } - prepareIncremental(jobItemContext); + initIncrementalPosition(jobItemContext); if (jobItemContext.getJobConfig().isFull()) { initInventoryTasks(jobItemContext, inventoryImporterUsed, inventoryChannelProgressPairs); } initIncrementalTask(jobItemContext, incrementalImporterUsed, incrementalChannelProgressPairs); } - private void prepareIncremental(final CDCJobItemContext jobItemContext) { + private void initIncrementalPosition(final CDCJobItemContext jobItemContext) { CDCTaskConfiguration taskConfig = jobItemContext.getTaskConfig(); JobItemIncrementalTasksProgress initIncremental = null == jobItemContext.getInitProgress() ? null : jobItemContext.getInitProgress().getIncremental(); try { diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/generator/CDCResponseGenerator.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/generator/CDCResponseGenerator.java deleted file mode 100644 index 9983609f30c..00000000000 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/generator/CDCResponseGenerator.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.shardingsphere.data.pipeline.cdc.generator; - -import lombok.AccessLevel; -import lombok.NoArgsConstructor; -import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse; -import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.Builder; -import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.Status; - -/** - * CDC response message generator. - */ -@NoArgsConstructor(access = AccessLevel.PRIVATE) -public final class CDCResponseGenerator { - - /** - * Succeed response builder. - * - * @param requestId request id - * @return succeed response builder - */ - public static Builder succeedBuilder(final String requestId) { - return CDCResponse.newBuilder().setStatus(Status.SUCCEED).setRequestId(requestId); - } - - /** - * Failed response. - * - * @param requestId request id - * @param errorCode error code - * @param errorMessage error message - * @return failed response - */ - public static CDCResponse failed(final String requestId, final String errorCode, final String errorMessage) { - return CDCResponse.newBuilder().setStatus(Status.FAILED).setRequestId(requestId).setErrorCode(errorCode).setErrorMessage(errorMessage).build(); - } -} diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/generator/CDCResponseUtils.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/generator/CDCResponseUtils.java new file mode 100644 index 00000000000..1c4f0f73b8c --- /dev/null +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/generator/CDCResponseUtils.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.data.pipeline.cdc.generator; + +import com.google.protobuf.Message; +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse; +import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.Builder; +import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.ResponseCase; +import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.Status; +import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult; +import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.ServerGreetingResult; +import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.StreamDataResult; +import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException; + +/** + * CDC response utils. + */ +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public final class CDCResponseUtils { + + /** + * Succeed response. + * + * @param requestId request id + * @return CDC response + */ + public static CDCResponse succeed(final String requestId) { + return succeed(requestId, ResponseCase.RESPONSE_NOT_SET, null); + } + + /** + * Succeed response. + * + * @param requestId request id + * @param responseCase response case + * @param response response + * @return succeed response builder + * @throws PipelineInvalidParameterException pipeline invalid parameter exception + */ + public static CDCResponse succeed(final String requestId, final CDCResponse.ResponseCase responseCase, final Message response) { + Builder result = CDCResponse.newBuilder().setStatus(Status.SUCCEED).setRequestId(requestId); + switch (responseCase) { + case SERVER_GREETING_RESULT: + result.setServerGreetingResult((ServerGreetingResult) response); + break; + case DATA_RECORD_RESULT: + result.setDataRecordResult((DataRecordResult) response); + break; + case STREAM_DATA_RESULT: + result.setStreamDataResult((StreamDataResult) response); + break; + case RESPONSE_NOT_SET: + break; + default: + throw new PipelineInvalidParameterException(responseCase.name()); + } + return result.build(); + } + + /** + * Failed response. + * + * @param requestId request id + * @param errorCode error code + * @param errorMessage error message + * @return failed response + */ + public static CDCResponse failed(final String requestId, final String errorCode, final String errorMessage) { + return CDCResponse.newBuilder().setStatus(Status.FAILED).setRequestId(requestId).setErrorCode(errorCode).setErrorMessage(errorMessage).build(); + } +} diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java index a083e753ee4..e733e0e5488 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java @@ -34,11 +34,12 @@ import org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJob; import org.apache.shardingsphere.data.pipeline.cdc.exception.CDCExceptionWrapper; import org.apache.shardingsphere.data.pipeline.cdc.exception.CDCServerException; import org.apache.shardingsphere.data.pipeline.cdc.exception.NotFindStreamDataSourceTableException; -import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseGenerator; +import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseUtils; import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.AckStreamingRequestBody; import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody; import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody.SchemaTable; import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse; +import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.ResponseCase; import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.StreamDataResult; import org.apache.shardingsphere.data.pipeline.cdc.util.CDCDataNodeUtils; import org.apache.shardingsphere.data.pipeline.cdc.util.CDCSchemaTableUtils; @@ -111,7 +112,7 @@ public final class CDCBackendHandler { String jobId = jobAPI.createJob(parameter, CDCSinkType.SOCKET, new Properties()); connectionContext.setJobId(jobId); startStreaming(jobId, connectionContext, channel); - return CDCResponseGenerator.succeedBuilder(requestId).setStreamDataResult(StreamDataResult.newBuilder().setStreamingId(jobId).build()).build(); + return CDCResponseUtils.succeed(requestId, ResponseCase.STREAM_DATA_RESULT, StreamDataResult.newBuilder().setStreamingId(jobId).build()); } /** diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfiguration.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/config/YamlCDCJobConfiguration.java similarity index 96% rename from kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfiguration.java rename to kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/config/YamlCDCJobConfiguration.java index 246cebb9ab5..9c2580970af 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfiguration.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/config/YamlCDCJobConfiguration.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.shardingsphere.data.pipeline.cdc.yaml.job; +package org.apache.shardingsphere.data.pipeline.cdc.yaml.config; import lombok.Getter; import lombok.Setter; diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapper.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/swapper/YamlCDCJobConfigurationSwapper.java similarity index 95% rename from kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapper.java rename to kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/swapper/YamlCDCJobConfigurationSwapper.java index 4348c97d7c5..f4cb413f470 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapper.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/swapper/YamlCDCJobConfigurationSwapper.java @@ -15,14 +15,15 @@ * limitations under the License. */ -package org.apache.shardingsphere.data.pipeline.cdc.yaml.job; +package org.apache.shardingsphere.data.pipeline.cdc.yaml.swapper; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfigurationSwapper; import org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration; import org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration.SinkConfiguration; import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType; -import org.apache.shardingsphere.data.pipeline.cdc.yaml.job.YamlCDCJobConfiguration.YamlSinkConfiguration; +import org.apache.shardingsphere.data.pipeline.cdc.yaml.config.YamlCDCJobConfiguration; +import org.apache.shardingsphere.data.pipeline.cdc.yaml.config.YamlCDCJobConfiguration.YamlSinkConfiguration; import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine; import org.apache.shardingsphere.infra.util.yaml.YamlEngine; import org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper; diff --git a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapperTest.java b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapperTest.java index eabd72bc50b..a4888939630 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapperTest.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapperTest.java @@ -19,7 +19,9 @@ package org.apache.shardingsphere.data.pipeline.cdc.yaml.job; import org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration; import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType; -import org.apache.shardingsphere.data.pipeline.cdc.yaml.job.YamlCDCJobConfiguration.YamlSinkConfiguration; +import org.apache.shardingsphere.data.pipeline.cdc.yaml.config.YamlCDCJobConfiguration; +import org.apache.shardingsphere.data.pipeline.cdc.yaml.config.YamlCDCJobConfiguration.YamlSinkConfiguration; +import org.apache.shardingsphere.data.pipeline.cdc.yaml.swapper.YamlCDCJobConfigurationSwapper; import org.junit.jupiter.api.Test; import java.util.Arrays; diff --git a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java index 390bfa5736e..41f925ed363 100644 --- a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java +++ b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java @@ -29,7 +29,7 @@ import org.apache.shardingsphere.authority.rule.AuthorityRule; import org.apache.shardingsphere.data.pipeline.cdc.context.CDCConnectionContext; import org.apache.shardingsphere.data.pipeline.cdc.exception.CDCExceptionWrapper; import org.apache.shardingsphere.data.pipeline.cdc.exception.CDCLoginException; -import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseGenerator; +import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseUtils; import org.apache.shardingsphere.data.pipeline.cdc.handler.CDCBackendHandler; import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.AckStreamingRequestBody; import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest; @@ -90,9 +90,9 @@ public final class CDCChannelInboundHandler extends ChannelInboundHandlerAdapter if (cause instanceof CDCExceptionWrapper) { CDCExceptionWrapper wrapper = (CDCExceptionWrapper) cause; ShardingSphereSQLException exception = wrapper.getException(); - channelFuture = ctx.writeAndFlush(CDCResponseGenerator.failed(wrapper.getRequestId(), exception.toSQLException().getSQLState(), exception.getMessage())); + channelFuture = ctx.writeAndFlush(CDCResponseUtils.failed(wrapper.getRequestId(), exception.toSQLException().getSQLState(), exception.getMessage())); } else { - channelFuture = ctx.writeAndFlush(CDCResponseGenerator.failed("", XOpenSQLState.GENERAL_ERROR.getValue(), String.valueOf(cause.getMessage()))); + channelFuture = ctx.writeAndFlush(CDCResponseUtils.failed("", XOpenSQLState.GENERAL_ERROR.getValue(), String.valueOf(cause.getMessage()))); } CDCConnectionContext connectionContext = ctx.channel().attr(CONNECTION_CONTEXT_KEY).get(); if (null == connectionContext) { @@ -139,7 +139,7 @@ public final class CDCChannelInboundHandler extends ChannelInboundHandlerAdapter Optional<ShardingSphereUser> user = authorityRule.findUser(new Grantee(body.getUsername(), getHostAddress(ctx))); if (user.isPresent() && Objects.equals(Hashing.sha256().hashBytes(user.get().getPassword().getBytes()).toString().toUpperCase(), body.getPassword())) { ctx.channel().attr(CONNECTION_CONTEXT_KEY).set(new CDCConnectionContext(user.get())); - ctx.writeAndFlush(CDCResponseGenerator.succeedBuilder(request.getRequestId()).build()); + ctx.writeAndFlush(CDCResponseUtils.succeed(request.getRequestId())); } else { throw new CDCExceptionWrapper(request.getRequestId(), new CDCLoginException("Illegal username or password")); } @@ -197,7 +197,7 @@ public final class CDCChannelInboundHandler extends ChannelInboundHandlerAdapter String database = backendHandler.getDatabaseNameByJobId(requestBody.getStreamingId()); checkPrivileges(request.getRequestId(), connectionContext.getCurrentUser().getGrantee(), database); backendHandler.startStreaming(requestBody.getStreamingId(), connectionContext, ctx.channel()); - ctx.writeAndFlush(CDCResponseGenerator.succeedBuilder(request.getRequestId()).build()); + ctx.writeAndFlush(CDCResponseUtils.succeed(request.getRequestId())); } private void processStopStreamingRequest(final ChannelHandlerContext ctx, final CDCRequest request, final CDCConnectionContext connectionContext) { @@ -206,7 +206,7 @@ public final class CDCChannelInboundHandler extends ChannelInboundHandlerAdapter checkPrivileges(request.getRequestId(), connectionContext.getCurrentUser().getGrantee(), database); backendHandler.stopStreaming(connectionContext.getJobId(), ctx.channel().id()); connectionContext.setJobId(null); - ctx.writeAndFlush(CDCResponseGenerator.succeedBuilder(request.getRequestId()).build()); + ctx.writeAndFlush(CDCResponseUtils.succeed(request.getRequestId())); } private void processDropStreamingRequest(final ChannelHandlerContext ctx, final CDCRequest request, final CDCConnectionContext connectionContext) { @@ -214,6 +214,6 @@ public final class CDCChannelInboundHandler extends ChannelInboundHandlerAdapter checkPrivileges(request.getRequestId(), connectionContext.getCurrentUser().getGrantee(), backendHandler.getDatabaseNameByJobId(requestBody.getStreamingId())); backendHandler.dropStreaming(connectionContext.getJobId()); connectionContext.setJobId(null); - ctx.writeAndFlush(CDCResponseGenerator.succeedBuilder(request.getRequestId()).build()); + ctx.writeAndFlush(CDCResponseUtils.succeed(request.getRequestId())); } }