This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 9d26e07ff53 Improve cdc stop job when channel inactive (#25474)
9d26e07ff53 is described below
commit 9d26e07ff53a79b05a5da87a3c4f97aee2a543d2
Author: Xinze Guo <[email protected]>
AuthorDate: Sat May 6 09:52:21 2023 +0800
Improve cdc stop job when channel inactive (#25474)
* Improve cdc stop job when channel inactive
* Fix getJobItemInfos lose error message
---
.../cdc/client/handler/CDCRequestHandler.java | 1 +
.../pipeline/cdc/handler/CDCBackendHandler.java | 29 +++++++++++++++++++---
.../AbstractInventoryIncrementalJobAPIImpl.java | 4 +--
.../frontend/netty/CDCChannelInboundHandler.java | 4 +--
4 files changed, 30 insertions(+), 8 deletions(-)
diff --git
a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandler.java
b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandler.java
index 1a2cce307bc..7ae88320240 100644
---
a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandler.java
+++
b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandler.java
@@ -94,6 +94,7 @@ public final class CDCRequestHandler extends
ChannelInboundHandlerAdapter {
@Override
public void channelInactive(final ChannelHandlerContext ctx) {
+ log.info("Request handler channel inactive");
ctx.fireChannelInactive();
}
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
index 99766be2e85..04d86bea04f 100644
---
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
+++
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
@@ -19,13 +19,16 @@ package org.apache.shardingsphere.data.pipeline.cdc.handler;
import com.google.common.base.Strings;
import io.netty.channel.Channel;
+import io.netty.channel.ChannelId;
import lombok.extern.slf4j.Slf4j;
+import
org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI;
import
org.apache.shardingsphere.data.pipeline.cdc.api.pojo.StreamDataParameter;
import
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
import
org.apache.shardingsphere.data.pipeline.cdc.context.CDCConnectionContext;
+import
org.apache.shardingsphere.data.pipeline.cdc.context.job.CDCJobItemContext;
import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckHolder;
import
org.apache.shardingsphere.data.pipeline.cdc.core.connector.SocketSinkImporterConnector;
import
org.apache.shardingsphere.data.pipeline.cdc.exception.CDCExceptionWrapper;
@@ -50,6 +53,7 @@ import
org.apache.shardingsphere.infra.util.exception.ShardingSpherePrecondition
import org.apache.shardingsphere.sharding.rule.ShardingRule;
import java.sql.SQLException;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
@@ -57,6 +61,7 @@ import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
@@ -143,15 +148,31 @@ public final class CDCBackendHandler {
/**
* Stop streaming.
*
- * @param jobId job id
+ * @param jobId job id
+ * @param channelId channel id
*/
- public void stopStreaming(final String jobId) {
+ public void stopStreaming(final String jobId, final ChannelId channelId) {
if (Strings.isNullOrEmpty(jobId)) {
log.warn("job id is null or empty, ignored");
return;
}
- PipelineJobCenter.stop(jobId);
- jobAPI.updateJobConfigurationDisabled(jobId, true);
+ List<Integer> shardingItems = new
ArrayList<>(PipelineJobCenter.getShardingItems(jobId));
+ if (0 == shardingItems.size()) {
+ return;
+ }
+ Optional<PipelineJobItemContext> jobItemContext =
PipelineJobCenter.getJobItemContext(jobId, shardingItems.get(0));
+ if (!jobItemContext.isPresent()) {
+ return;
+ }
+ CDCJobItemContext cdcJobItemContext = (CDCJobItemContext)
jobItemContext.get();
+ if (cdcJobItemContext.getImporterConnector() instanceof
SocketSinkImporterConnector) {
+ Channel channel = (Channel)
cdcJobItemContext.getImporterConnector().getConnector();
+ if (channelId.equals(channel.id())) {
+ log.info("close CDC job, channel id: {}", channelId);
+ PipelineJobCenter.stop(jobId);
+ jobAPI.updateJobConfigurationDisabled(jobId, true);
+ }
+ }
}
/**
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
index e6970343b0f..cb65c7538b0 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -126,8 +126,9 @@ public abstract class
AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
int shardingItem = entry.getKey();
TableBasedPipelineJobInfo jobInfo = getJobInfo(jobId);
InventoryIncrementalJobItemProgress jobItemProgress =
entry.getValue();
+ String errorMessage = getJobItemErrorMessage(jobId, shardingItem);
if (null == jobItemProgress) {
- result.add(new InventoryIncrementalJobItemInfo(shardingItem,
jobInfo.getTable(), null, startTimeMillis, 0, ""));
+ result.add(new InventoryIncrementalJobItemInfo(shardingItem,
jobInfo.getTable(), null, startTimeMillis, 0, errorMessage));
continue;
}
int inventoryFinishedPercentage = 0;
@@ -136,7 +137,6 @@ public abstract class
AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
} else if (0 != jobItemProgress.getProcessedRecordsCount() && 0 !=
jobItemProgress.getInventoryRecordsCount()) {
inventoryFinishedPercentage = (int) Math.min(100,
jobItemProgress.getProcessedRecordsCount() * 100 /
jobItemProgress.getInventoryRecordsCount());
}
- String errorMessage = getJobItemErrorMessage(jobId, shardingItem);
result.add(new InventoryIncrementalJobItemInfo(shardingItem,
jobInfo.getTable(), jobItemProgress, startTimeMillis,
inventoryFinishedPercentage, errorMessage));
}
return result;
diff --git
a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
index 54310daea95..00d05403f55 100644
---
a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
+++
b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
@@ -84,7 +84,7 @@ public final class CDCChannelInboundHandler extends
ChannelInboundHandlerAdapter
public void channelInactive(final ChannelHandlerContext ctx) {
CDCConnectionContext connectionContext =
ctx.channel().attr(CONNECTION_CONTEXT_KEY).get();
if (null != connectionContext.getJobId()) {
- backendHandler.stopStreaming(connectionContext.getJobId());
+ backendHandler.stopStreaming(connectionContext.getJobId(),
ctx.channel().id());
}
ctx.channel().attr(CONNECTION_CONTEXT_KEY).set(null);
}
@@ -216,7 +216,7 @@ public final class CDCChannelInboundHandler extends
ChannelInboundHandlerAdapter
StopStreamingRequestBody requestBody =
request.getStopStreamingRequestBody();
String database =
backendHandler.getDatabaseNameByJobId(requestBody.getStreamingId());
checkPrivileges(request.getRequestId(),
connectionContext.getCurrentUser().getGrantee(), database);
- backendHandler.stopStreaming(connectionContext.getJobId());
+ backendHandler.stopStreaming(connectionContext.getJobId(),
ctx.channel().id());
connectionContext.setJobId(null);
ctx.writeAndFlush(CDCResponseGenerator.succeedBuilder(request.getRequestId()).build());
}