This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 79ca3bc112e Notify the CDC Client if streaming stopped due to
exception (#28998)
79ca3bc112e is described below
commit 79ca3bc112e988559513afaaa2f15478266bb8cc
Author: Xinze Guo <[email protected]>
AuthorDate: Fri Nov 10 17:16:32 2023 +0800
Notify the CDC Client if streaming stopped due to exception (#28998)
* Add tinyint decode
* Notify the client if streaming stopped due to exception
---
.../opengauss/ingest/wal/decode/MppdbDecodingPlugin.java | 2 +-
.../ingest/wal/decode/MppdbDecodingPluginTest.java | 14 ++++++++++++++
.../pipeline/cdc/core/importer/sink/CDCSocketSink.java | 2 ++
.../shardingsphere/data/pipeline/cdc/core/job/CDCJob.java | 8 +++++++-
4 files changed, 24 insertions(+), 2 deletions(-)
diff --git
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPlugin.java
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPlugin.java
index be187823a36..8858632858a 100644
---
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPlugin.java
+++
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPlugin.java
@@ -163,8 +163,8 @@ public final class MppdbDecodingPlugin implements
DecodingPlugin {
return decodeString(data.substring(1));
}
switch (columnType) {
+ case "tinyint":
case "smallint":
- return Short.parseShort(data);
case "integer":
return Integer.parseInt(data);
case "bigint":
diff --git
a/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPluginTest.java
b/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPluginTest.java
index 5b446bcfc7e..23d4bf31512 100644
---
a/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPluginTest.java
+++
b/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPluginTest.java
@@ -304,4 +304,18 @@ class MppdbDecodingPluginTest {
Object byteaObj = actual.getAfterRow().get(0);
assertThat(byteaObj.toString(), is("'fff' | 'faa'"));
}
+
+ @Test
+ void assertDecodeWitTinyint() {
+ MppTableData tableData = new MppTableData();
+ tableData.setTableName("public.test");
+ tableData.setOpType("INSERT");
+ tableData.setColumnsName(new String[]{"data"});
+ tableData.setColumnsType(new String[]{"tinyint"});
+ tableData.setColumnsVal(new String[]{"255"});
+ ByteBuffer data = ByteBuffer.wrap(toJSON(tableData).getBytes());
+ WriteRowEvent actual = (WriteRowEvent) new
MppdbDecodingPlugin(null).decode(data, logSequenceNumber);
+ Object byteaObj = actual.getAfterRow().get(0);
+ assertThat(byteaObj, is(255));
+ }
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java
index d9f5ad83012..d45c14b4b8c 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.data.pipeline.cdc.core.importer.sink;
import io.netty.channel.Channel;
+import lombok.Getter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
@@ -55,6 +56,7 @@ public final class CDCSocketSink implements PipelineSink {
private final ShardingSphereDatabase database;
+ @Getter
private final Channel channel;
private final Map<String, String> tableNameSchemaMap = new HashMap<>();
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
index 9858e2932e3..50e572c5a32 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
@@ -25,8 +25,10 @@ import
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguratio
import
org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
import org.apache.shardingsphere.data.pipeline.cdc.context.CDCJobItemContext;
import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext;
+import
org.apache.shardingsphere.data.pipeline.cdc.core.importer.sink.CDCSocketSink;
import org.apache.shardingsphere.data.pipeline.cdc.core.prepare.CDCJobPreparer;
import org.apache.shardingsphere.data.pipeline.cdc.core.task.CDCTasksRunner;
+import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseUtils;
import
org.apache.shardingsphere.data.pipeline.cdc.yaml.swapper.YamlCDCJobConfigurationSwapper;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
import
org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipelineDataSourceManager;
@@ -184,7 +186,7 @@ public final class CDCJob extends AbstractPipelineJob
implements SimpleJob {
private final String identifier;
- private final PipelineJobItemContext jobItemContext;
+ private final CDCJobItemContext jobItemContext;
@Override
public void onSuccess() {
@@ -200,6 +202,10 @@ public final class CDCJob extends AbstractPipelineJob
implements SimpleJob {
log.error("onFailure, {} task execute failed.", identifier,
throwable);
String jobId = jobItemContext.getJobId();
jobAPI.updateJobItemErrorMessage(jobId,
jobItemContext.getShardingItem(), throwable);
+ if (jobItemContext.getSink() instanceof CDCSocketSink) {
+ CDCSocketSink cdcSink = (CDCSocketSink)
jobItemContext.getSink();
+ cdcSink.getChannel().writeAndFlush(CDCResponseUtils.failed("",
"", throwable.getMessage()));
+ }
PipelineJobCenter.stop(jobId);
jobAPI.updateJobConfigurationDisabled(jobId, true);
}