This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 56f8ee0f2e1 Improve CDCE2EIT and CreateTableSQLGeneratorIT for
openGauss nightly E2E (#37822)
56f8ee0f2e1 is described below
commit 56f8ee0f2e11ee8bc2c22eedabaecb280a2ec898
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Fri Jan 23 18:54:49 2026 +0800
Improve CDCE2EIT and CreateTableSQLGeneratorIT for openGauss nightly E2E
(#37822)
* Remove schema name in openGauss create-table-sql-generator.xml
* Add log, used to verify `date` type return `datetime` type from
pg_get_tabledef
* Add log for CDC; Increase streamData timeout on client
---
.../pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java | 3 +++
.../org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java | 2 ++
.../data/pipeline/cdc/client/parameter/StartStreamingParameter.java | 2 ++
.../shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java | 2 ++
.../shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java | 4 ++++
.../test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java | 2 +-
.../create-table-generator/opengauss/create-table-sql-generator.xml | 4 ++--
7 files changed, 16 insertions(+), 3 deletions(-)
diff --git
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
index c06d11f5d95..bb6a72ec797 100644
---
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
+++
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.data.pipeline.opengauss.sqlbuilder;
+import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.CreateTableSQLGenerateException;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder;
@@ -35,6 +36,7 @@ import java.util.stream.Collectors;
/**
* Pipeline SQL builder of openGauss.
*/
+@Slf4j
public final class OpenGaussPipelineSQLBuilder implements
DialectPipelineSQLBuilder {
@Override
@@ -78,6 +80,7 @@ public final class OpenGaussPipelineSQLBuilder implements
DialectPipelineSQLBuil
if (resultSet.next()) {
// TODO use ";" to split is not always correct if return
value's comments contains ";"
String tableDefinition =
resultSet.getString("pg_get_tabledef");
+ log.info("Generate create table definition for {}.{}: {}",
schemaName, tableName, tableDefinition);
return Arrays.asList(tableDefinition.split(";"));
}
}
diff --git
a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
index b18fce5aa22..9aa819e48ac 100644
---
a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
+++
b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
@@ -56,6 +56,7 @@ import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
/**
@@ -145,6 +146,7 @@ public final class CDCClient implements AutoCloseable {
ResponseFuture responseFuture = new ResponseFuture(requestId,
Type.STREAM_DATA);
connectionContext.getResponseFutureMap().put(requestId,
responseFuture);
channel.writeAndFlush(request);
+ log.info("Sending start streaming request, param: {}, timeout: {} s",
parameter, TimeUnit.MILLISECONDS.toSeconds(config.getTimeoutMillis()));
String result =
responseFuture.waitResponseResult(config.getTimeoutMillis(),
connectionContext).toString();
log.info("Start streaming success, streaming id: {}", result);
return result;
diff --git
a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartStreamingParameter.java
b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartStreamingParameter.java
index 1a7abd78a3f..0ff2d4a2cd3 100644
---
a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartStreamingParameter.java
+++
b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartStreamingParameter.java
@@ -19,6 +19,7 @@ package
org.apache.shardingsphere.data.pipeline.cdc.client.parameter;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
+import lombok.ToString;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody.SchemaTable;
import java.util.Set;
@@ -28,6 +29,7 @@ import java.util.Set;
*/
@RequiredArgsConstructor
@Getter
+@ToString
public final class StartStreamingParameter {
private final String database;
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
index 9f7ca7a8788..49bfd12beb3 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
@@ -121,7 +121,9 @@ public final class CDCBackendHandler {
StreamDataParameter parameter = new
StreamDataParameter(requestBody.getDatabase(), new
ArrayList<>(schemaTableNames), requestBody.getFull(), tableAndDataNodesMap,
isDecodeWithTransaction);
String jobId = jobAPI.create(parameter, CDCSinkType.SOCKET, new
Properties());
connectionContext.setJobId(jobId);
+ log.info("Stream data, jobId={}, database={}, schemaTableNames={},
full={}", jobId, requestBody.getDatabase(), schemaTableNames,
requestBody.getFull());
startStreaming(jobId, connectionContext, channel);
+ log.info("Stream data, started, jobId={}", jobId);
return CDCResponseUtils.succeed(requestId,
ResponseCase.STREAM_DATA_RESULT,
StreamDataResult.newBuilder().setStreamingId(jobId).build());
}
diff --git
a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
index f6f933f8eee..206e1a587c4 100644
---
a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
+++
b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
@@ -73,6 +73,7 @@ public final class CDCChannelInboundHandler extends
ChannelInboundHandlerAdapter
@Override
public void channelActive(final ChannelHandlerContext ctx) {
+ log.info("channel active: {}", ctx.channel().remoteAddress());
CDCResponse response =
CDCResponse.newBuilder().setServerGreetingResult(ServerGreetingResult.newBuilder().setServerVersion(ShardingSphereVersion.VERSION).setProtocolVersion("1").build())
.setStatus(Status.SUCCEED).build();
ctx.writeAndFlush(response);
@@ -80,6 +81,7 @@ public final class CDCChannelInboundHandler extends
ChannelInboundHandlerAdapter
@Override
public void channelInactive(final ChannelHandlerContext ctx) {
+ log.info("channel inactive: {}", ctx.channel().remoteAddress());
CDCConnectionContext connectionContext =
ctx.channel().attr(CONNECTION_CONTEXT_KEY).get();
if (null != connectionContext && null != connectionContext.getJobId())
{
backendHandler.stopStreaming(connectionContext.getJobId(),
ctx.channel().id());
@@ -109,6 +111,7 @@ public final class CDCChannelInboundHandler extends
ChannelInboundHandlerAdapter
public void channelRead(final ChannelHandlerContext ctx, final Object msg)
{
CDCConnectionContext connectionContext =
ctx.channel().attr(CONNECTION_CONTEXT_KEY).get();
CDCRequest request = (CDCRequest) msg;
+ log.info("channel read: {}, request type: {}, request id: {}",
ctx.channel().remoteAddress(), request.getType(), request.getRequestId());
if (null == connectionContext || request.hasLoginRequestBody()) {
processLogin(ctx, request);
return;
@@ -146,6 +149,7 @@ public final class CDCChannelInboundHandler extends
ChannelInboundHandlerAdapter
() -> new CDCExceptionWrapper(request.getRequestId(), new
CDCLoginFailedException()));
ctx.channel().attr(CONNECTION_CONTEXT_KEY).set(new
CDCConnectionContext(user));
ctx.writeAndFlush(CDCResponseUtils.succeed(request.getRequestId()));
+ log.info("Process login success, request id: {}",
request.getRequestId());
}
private void checkPrivileges(final String requestId, final Grantee
grantee, final String currentDatabase) {
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
index e8840ad589a..ee280ce29d9 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
@@ -174,7 +174,7 @@ class CDCE2EIT {
private CDCClient buildCDCClientAndStart(final PipelineDataSource
dataSource, final PipelineContainerComposer containerComposer) {
DataSourceRecordConsumer recordConsumer = new
DataSourceRecordConsumer(dataSource, containerComposer.getDatabaseType());
- CDCClient result = new CDCClient(new
CDCClientConfiguration("localhost",
containerComposer.getContainerComposer().getProxyCDCPort(), 5000));
+ CDCClient result = new CDCClient(new
CDCClientConfiguration("localhost",
containerComposer.getContainerComposer().getProxyCDCPort(), 10000));
result.connect(recordConsumer, new
RetryStreamingExceptionHandler(result, 5, 5000), (ctx, serverErrorResult) ->
log.error("Server error: {}", serverErrorResult.getErrorMessage()));
result.login(new CDCLoginParameter(ProxyContainerConstants.USER,
ProxyContainerConstants.PASSWORD));
// TODO add full=false test case later
diff --git
a/test/e2e/operation/pipeline/src/test/resources/env/scenario/create-table-generator/opengauss/create-table-sql-generator.xml
b/test/e2e/operation/pipeline/src/test/resources/env/scenario/create-table-generator/opengauss/create-table-sql-generator.xml
index 4259a9fea1b..44e5826baa1 100644
---
a/test/e2e/operation/pipeline/src/test/resources/env/scenario/create-table-generator/opengauss/create-table-sql-generator.xml
+++
b/test/e2e/operation/pipeline/src/test/resources/env/scenario/create-table-generator/opengauss/create-table-sql-generator.xml
@@ -35,7 +35,7 @@
WITH (orientation=row, compression=no)
</sql>
<sql>
- ALTER TABLE public.t_order ADD CONSTRAINT t_order_pkey PRIMARY
KEY (order_id)
+ ALTER TABLE t_order ADD CONSTRAINT t_order_pkey PRIMARY KEY
(order_id)
</sql>
</output>
</create-table-generator-assertion>
@@ -61,7 +61,7 @@
WITH (orientation=row, compression=no)
</sql>
<sql>
- ALTER TABLE public.t_order_item ADD CONSTRAINT
t_order_item_pkey PRIMARY KEY (item_id)
+ ALTER TABLE t_order_item ADD CONSTRAINT t_order_item_pkey
PRIMARY KEY (item_id)
</sql>
</output>
</create-table-generator-assertion>