sandynz commented on code in PR #24798:
URL: https://github.com/apache/shardingsphere/pull/24798#discussion_r1147189499


##########
proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java:
##########
@@ -131,17 +130,17 @@ public String getDatabaseNameByJobId(final String jobId) {
     /**
      * Start streaming.
      *
-     * @param requestId request id
      * @param jobId job id
      * @param channel channel
      * @param connectionContext connection context
-     * @return CDC response
      */
-    // TODO not return CDCResponse
-    public CDCResponse startStreaming(final String requestId, final String 
jobId, final CDCConnectionContext connectionContext, final Channel channel) {
+    public void startStreaming(final String jobId, final CDCConnectionContext 
connectionContext, final Channel channel) {
         CDCJobConfiguration cdcJobConfig = jobAPI.getJobConfiguration(jobId);
         if (null == cdcJobConfig) {
-            return CDCResponseGenerator.failed(jobId, 
CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, String.format("the %s job config 
doesn't exist", jobId));
+            throw new PipelineJobNotFoundException(jobId);
+        }
+        if (PipelineJobCenter.isJobExisting(jobId)) {
+            throw new PipelineJobHasAlreadyStartedException(jobId);
         }

Review Comment:
   If existing job is stopping, then we could not just throw exception



##########
proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java:
##########
@@ -165,59 +168,49 @@ private String getHostAddress(final ChannelHandlerContext 
context) {
     
     private void processStreamDataRequest(final ChannelHandlerContext ctx, 
final CDCRequest request, final CDCConnectionContext connectionContext) {
         if (!request.hasStreamDataRequestBody()) {
-            
ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), 
CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Miss stream data request body"));
-            return;
+            throw CDCExceptionWrapper.wrapper(request.getRequestId(), new 
PipelineInvalidParameterException("stream data request body is empty"));
         }
         StreamDataRequestBody requestBody = request.getStreamDataRequestBody();
         if (requestBody.getDatabase().isEmpty()) {
-            
ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), 
CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "database not allowed to be 
empty"));
-            return;
+            throw CDCExceptionWrapper.wrapper(request.getRequestId(), new 
PipelineInvalidParameterException("database is empty"));
         }
         if (requestBody.getSourceSchemaTableList().isEmpty()) {
-            
ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), 
CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Illegal stream data request 
parameter"));
-            return;
+            throw CDCExceptionWrapper.wrapper(request.getRequestId(), new 
PipelineInvalidParameterException("stream data request is empty"));
         }

Review Comment:
   Error message starting letter could be uppercase, and also other ones



##########
proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java:
##########
@@ -165,59 +168,49 @@ private String getHostAddress(final ChannelHandlerContext 
context) {
     
     private void processStreamDataRequest(final ChannelHandlerContext ctx, 
final CDCRequest request, final CDCConnectionContext connectionContext) {
         if (!request.hasStreamDataRequestBody()) {
-            
ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), 
CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Miss stream data request body"));
-            return;
+            throw CDCExceptionWrapper.wrapper(request.getRequestId(), new 
PipelineInvalidParameterException("stream data request body is empty"));
         }
         StreamDataRequestBody requestBody = request.getStreamDataRequestBody();
         if (requestBody.getDatabase().isEmpty()) {
-            
ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), 
CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "database not allowed to be 
empty"));
-            return;
+            throw CDCExceptionWrapper.wrapper(request.getRequestId(), new 
PipelineInvalidParameterException("database is empty"));
         }
         if (requestBody.getSourceSchemaTableList().isEmpty()) {
-            
ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), 
CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Illegal stream data request 
parameter"));
-            return;
+            throw CDCExceptionWrapper.wrapper(request.getRequestId(), new 
PipelineInvalidParameterException("stream data request is empty"));
         }

Review Comment:
   Error message doesn't match the if condition block



##########
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/exception/CDCExceptionWrapper.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.exception;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import 
org.apache.shardingsphere.infra.util.exception.external.sql.ShardingSphereSQLException;
+
+/**
+ * CDC exception wrapper.
+ */
+@RequiredArgsConstructor
+@Getter
+public final class CDCExceptionWrapper extends RuntimeException {
+    
+    private final String requestId;
+    
+    private final ShardingSphereSQLException exception;
+    
+    /**
+     * Wrapper the exception.
+     *
+     * @param requestId request id
+     * @param exception ShardingSphereCDCException
+     * @return Wrapped ShardingSphereCDCException
+     */
+    public static CDCExceptionWrapper wrapper(final String requestId, final 
ShardingSphereSQLException exception) {
+        return new CDCExceptionWrapper(requestId, exception);
+    }

Review Comment:
   `wrapper` method is not required, we could use constructor



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to