This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 58a1e3c279c Add CDC document of build and usage (#28889)
58a1e3c279c is described below

commit 58a1e3c279c30608c971f37d617e98b0971cbeaf
Author: Xinze Guo <[email protected]>
AuthorDate: Fri Dec 29 14:07:01 2023 +0800

    Add CDC document of build and usage (#28889)
    
    * Add CDC document of build and usage
    
    * Improve doc
    
    * Advise CDC doc
    
    * Add alter streaming rule doc
    
    * Update doc
    
    * Update doc
    
    * Update english doc
    
    * Improve CDC doc
    
    * Improve CDC doc
---
 .../shardingsphere-proxy/cdc/_index.cn.md          |   8 +
 .../shardingsphere-proxy/cdc/_index.en.md          |   8 +
 .../shardingsphere-proxy/cdc/build.cn.md           | 223 ++++++++++++++++
 .../shardingsphere-proxy/cdc/build.en.md           | 222 ++++++++++++++++
 .../shardingsphere-proxy/cdc/usage.cn.md           | 287 +++++++++++++++++++++
 .../shardingsphere-proxy/cdc/usage.en.md           | 287 +++++++++++++++++++++
 6 files changed, 1035 insertions(+)

diff --git 
a/docs/document/content/user-manual/shardingsphere-proxy/cdc/_index.cn.md 
b/docs/document/content/user-manual/shardingsphere-proxy/cdc/_index.cn.md
new file mode 100644
index 00000000000..d19f9e57988
--- /dev/null
+++ b/docs/document/content/user-manual/shardingsphere-proxy/cdc/_index.cn.md
@@ -0,0 +1,8 @@
++++
+title = "CDC"
+weight = 9
++++
+
+CDC(Change Data Capture)增量数据捕捉。CDC 可以监控 ShardingSphere-Proxy 
的存储节点中的数据变化,捕捉到数据操作事件,过滤并提取有用信息,最终将这些变化数据发送到指定的目标上。
+
+CDC 可以用于数据同步,数据备份和恢复等方面。通常情况下,目前支持 openGauss、MySQL 和 PostgreSQL。
diff --git 
a/docs/document/content/user-manual/shardingsphere-proxy/cdc/_index.en.md 
b/docs/document/content/user-manual/shardingsphere-proxy/cdc/_index.en.md
new file mode 100644
index 00000000000..5052863886e
--- /dev/null
+++ b/docs/document/content/user-manual/shardingsphere-proxy/cdc/_index.en.md
@@ -0,0 +1,8 @@
++++
+title = "CDC"
+weight = 9
++++
+
+CDC (Change Data Capture) captures incremental data changes. CDC can monitor 
data changes in the storage nodes of ShardingSphere-Proxy, capture data 
operation events, filter and extract useful information, and finally send these 
changed data to a specified target.
+
+CDC can be used for data synchronization, data backup and recovery, etc. Under 
normal circumstances, it currently supports openGauss, MySQL, and PostgreSQL.
\ No newline at end of file
diff --git 
a/docs/document/content/user-manual/shardingsphere-proxy/cdc/build.cn.md 
b/docs/document/content/user-manual/shardingsphere-proxy/cdc/build.cn.md
new file mode 100644
index 00000000000..6ffeb3b4040
--- /dev/null
+++ b/docs/document/content/user-manual/shardingsphere-proxy/cdc/build.cn.md
@@ -0,0 +1,223 @@
++++
+title = "运行部署"
+weight = 1
++++
+
+## 背景信息
+
+ShardingSphere CDC 分为两个部分,一个是 CDC Server,另一个是 CDC Client。 CDC Server 和 
ShardingSphere-Proxy 目前是一同部署的。
+
+用户可以在自己的项目中引入 CDC Client,实现数据的消费逻辑。
+
+## 约束条件
+
+- 纯 JAVA 开发,JDK 建议 1.8 或以上版本。
+- CDC Server 要求 SharingSphere-Proxy 使用集群模式,目前支持 ZooKeeper 作为注册中心。
+- CDC 只同步数据,不会同步表结构,目前也不支持 DDL 的语句同步。
+- CDC 增量阶段会按照分库事务的维度输出数据, 如果要开启 XA 事务的兼容,则 openGauss 和 ShardingSphere-Proxy 
都需要 GLT 模块
+
+## CDC Server 部署步骤
+
+这里以 openGauss 数据库为例,介绍 CDC Server 的部署步骤。
+
+由于 CDC Server 内置于 ShardingSphere-Proxy,所以需要获取 ShardingSphere-Proxy。详情请参见 
[proxy 启动手册](/cn/user-manual/shardingsphere-proxy/startup/bin/)。
+
+### 配置 GLT 模块(可选)
+
+官网发布的二进制包默认不包含 GLT 模块,如果使用的是包含 GLT 功能的 openGauss 数据库,则可以额外引入 GLT 模块,保证 XA 
事务的完整性。
+
+目前有两种方式引入 GLT 模块,并且需要在 server.yaml 中也进行相应的配置。
+
+#### 1. 源码编译安装
+
+1.1 准备代码环境,提前下载或者使用 Git clone,从 Github 下载 
[ShardingSphere](https://github.com/apache/shardingsphere.git) 源码。
+
+1.2 删除 kernel/global-clock/type/tso/core/pom.xml 中 
shardingsphere-global-clock-tso-provider-redis 依赖的 `<scope>provided</scope>` 
标签和 kernel/global-clock/type/tso/provider/redis/pom.xml 中 jedis
+   的 `<scope>provided</scope>` 标签
+
+1.3 编译 ShardingSphere-Proxy,具体编译步骤请参考 [ShardingSphere 
编译手册](https://github.com/apache/shardingsphere/wiki#build-apache-shardingsphere)。
+
+#### 2. 直接引入 GLT 依赖
+
+可以从 maven 仓库中引入
+
+2.1. 
[shardingsphere-global-clock-tso-provider-redis](https://repo1.maven.org/maven2/org/apache/shardingsphere/shardingsphere-global-clock-tso-provider-redis),下载和
 ShardingSphere-Proxy 同名版本
+
+2.2. 
[jedis-4.3.1](https://repo1.maven.org/maven2/redis/clients/jedis/4.3.1/jedis-4.3.1.jar)
+
+### CDC Server 使用手册
+
+1. 修改配置文件 `conf/server.yaml`,打开 CDC 功能。 目前 `mode` 必须是 
`Cluster`,需要提前启动对应的注册中心。如果 GLT provider 使用 Redis,需要提前启动 Redis。
+
+配置示例:
+
+1. 在 `server.yaml` 中开启 CDC 功能。
+
+```yaml
+mode:
+  type: Cluster
+  repository:
+    type: ZooKeeper
+    props:
+      namespace: cdc_demo
+      server-lists: localhost:2181
+      retryIntervalMilliseconds: 500
+      timeToLiveSeconds: 60
+      maxRetries: 3
+      operationTimeoutMilliseconds: 500
+
+authority:
+  users:
+    - user: root@%
+      password: root
+  privilege:
+    type: ALL_PERMITTED
+
+# 使用 GLT 的时候也需要开启分布式事务,目前 GLT 只有 openGauss 数据库支持
+#transaction:
+#  defaultType: XA
+#  providerType: Atomikos
+#
+#globalClock:
+#  enabled: true
+#  type: TSO
+#  provider: redis
+#  props:
+#    host: 127.0.0.1
+#    port: 6379
+
+props:
+  system-log-level: INFO
+  check-table-metadata-enabled: false
+  proxy-default-port: 3307 # Proxy default port
+  cdc-server-port: 33071 # CDC Server 端口,必须配置
+  proxy-frontend-database-protocol-type: openGauss # 和后端数据库的类型一致
+```
+
+2. 引入 JDBC 驱动。
+
+proxy 已包含 PostgreSQL、openGauss JDBC 驱动。
+
+如果后端连接以下数据库,请下载相应 JDBC 驱动 jar 包,并将其放入 `${shardingsphere-proxy}/ext-lib` 目录。
+
+| 数据库       | JDBC 驱动                                                          
                                                               |
+|-----------|---------------------------------------------------------------------------------------------------------------------------------|
+| MySQL     | 
[mysql-connector-java-8.0.31.jar](https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.31/)
                            |
+
+4. 启动 ShardingSphere-Proxy:
+
+```
+sh bin/start.sh
+```
+
+5. 查看 proxy 日志 `logs/stdout.log`,看到日志中出现:
+
+```
+[INFO ] [main] o.a.s.p.frontend.ShardingSphereProxy - ShardingSphere-Proxy 
Cluster mode started successfully
+```
+
+确认启动成功。
+
+6. 按需配置 CDC 任务同步配置
+
+6.1. 查询配置。
+
+```sql
+SHOW STREAMING RULE;
+```
+
+默认配置如下:
+
+```
++--------------------------------------------------------------+--------------------------------------+-------------------------------------------------------+
+| read                                                         | write         
                       | stream_channel                                        |
++--------------------------------------------------------------+--------------------------------------+-------------------------------------------------------+
+| {"workerThread":20,"batchSize":1000,"shardingSize":10000000} | 
{"workerThread":20,"batchSize":1000} | 
{"type":"MEMORY","props":{"block-queue-size":"2000"}} |
++--------------------------------------------------------------+--------------------------------------+-------------------------------------------------------+
+```
+
+6.2. 修改配置(可选)。
+
+因 streaming rule 具有默认值,无需创建,仅提供 ALTER 语句。
+
+完整配置 DistSQL 示例:
+
+```sql
+ALTER STREAMING RULE (
+READ(
+  WORKER_THREAD=20,
+  BATCH_SIZE=1000,
+  SHARDING_SIZE=10000000,
+  RATE_LIMITER (TYPE(NAME='QPS',PROPERTIES('qps'='500')))
+),
+WRITE(
+  WORKER_THREAD=20,
+  BATCH_SIZE=1000,
+  RATE_LIMITER (TYPE(NAME='TPS',PROPERTIES('tps'='2000')))
+),
+STREAM_CHANNEL (TYPE(NAME='MEMORY',PROPERTIES('block-queue-size'='2000')))
+);
+```
+
+配置项说明:
+
+```sql
+ALTER STREAMING RULE (
+READ( -- 数据读取配置。如果不配置则部分参数默认生效。
+  WORKER_THREAD=20, -- 影响全量、增量任务,从源端摄取数据的线程池大小。不配置则使用默认值。需要确保该值不低于分库的数量
+  BATCH_SIZE=1000, -- 影响全量、增量任务,一次查询操作返回的最大记录数。如果一个事务中的数据量大于该值,增量情况下可能超过设定的值。
+  SHARDING_SIZE=10000000, -- 影响全量任务,存量数据分片大小。如果不配置则使用默认值。
+  RATE_LIMITER ( -- 影响全量、增量任务,限流算法。如果不配置则不限流。
+  TYPE( -- 算法类型。可选项:QPS
+  NAME='QPS',
+  PROPERTIES( -- 算法属性
+  'qps'='500'
+  )))
+),
+WRITE( -- 数据写入配置。如果不配置则部分参数默认生效。
+  WORKER_THREAD=20, -- 影响全量、增量任务,数据写入到目标端的线程池大小。如果不配置则使用默认值。
+  BATCH_SIZE=1000, -- 
影响全量、增量任务,存量任务一次批量写入操作的最大记录数。如果不配置则使用默认值。如果一个事务中的数据量大于该值,增量情况下可能超过设定的值。
+  RATE_LIMITER ( -- 限流算法。如果不配置则不限流。
+  TYPE( -- 算法类型。可选项:TPS
+  NAME='TPS',
+  PROPERTIES( -- 算法属性
+  'tps'='2000'
+  )))
+),
+STREAM_CHANNEL ( -- 数据通道,连接生产者和消费者,用于 read 和 write 环节。如果不配置则默认使用 MEMORY 类型。
+TYPE( -- 算法类型。可选项:MEMORY
+NAME='MEMORY',
+PROPERTIES( -- 算法属性
+'block-queue-size'='2000' -- 属性:阻塞队列大小
+)))
+);
+```
+
+## CDC Client 手册
+
+CDC Client 不需要额外部署,只需要通过 maven 引入 CDC Client 的依赖就可以在项目中使用。用户可以通过 CDC Client 
和服务端进行交互。
+
+如果有需要,用户也可以自行实现一个 CDC Client,进行数据的消费和 ACK。
+
+```xml
+<dependency>
+    <groupId>org.apache.shardingsphere</groupId>
+    <artifactId>shardingsphere-data-pipeline-cdc-client</artifactId>
+    <version>${version}</version>
+</dependency>
+```
+
+### CDC Client 介绍
+
+`org.apache.shardingsphere.data.pipeline.cdc.client.CDCClient` 是 CDC Client 
的入口类,用户可以通过该类和 CDC Server 进行交互。主要的和新方法如下。
+
+| 方法名                                                                          
                                               | 返回值         | 说明               
                                                                                
       |
+|-----------------------------------------------------------------------------------------------------------------------------|-------------|---------------------------------------------------------------------------------------------------------|
+| connect(Consumer<List<Record>> dataConsumer, ExceptionHandler 
exceptionHandler, ServerErrorResultHandler errorResultHandler | void        | 
和服务端进行连接,连接的时候需要指定 <br/>1. 数据的消费处理逻辑 <br/>2. 消费时候的异常处理逻辑 <br/>3. 服务端错误的异常处理逻辑   
                        |
+| login(CDCLoginParameter parameter)                                           
                                               | void        | CDC登陆,参数 
<br/>username:用户名 <br/>password:密码                                              
               |
+| startStreaming(StartStreamingParameter parameter)                            
                                               | streamingId | 开启 CDC 订阅, 
StartStreamingParameter 参数 <br/> database:逻辑库名称 <br/> schemaTables:订阅的表名 <br/> 
full:是否订阅全量数据 |
+| restartStreaming(String streamingId)                                         
                                               | void        | 重启订阅             
                                                                                
       |
+| stopStreaming(String streamingId)                                            
                                               | void        | 停止订阅             
                                                                                
       |
+| dropStreaming(String streamingId)                                            
                                               | void        | 删除订阅             
                                                                                
       |
+| await()                                                                      
                                               | void        | 阻塞 CDC 线程,等待 
channel 关闭                                                                      
           |
+| close()                                                                      
                                               | void        | 关闭 channel,流程结束  
                                                                                
       |
diff --git 
a/docs/document/content/user-manual/shardingsphere-proxy/cdc/build.en.md 
b/docs/document/content/user-manual/shardingsphere-proxy/cdc/build.en.md
new file mode 100644
index 00000000000..7a6e1916791
--- /dev/null
+++ b/docs/document/content/user-manual/shardingsphere-proxy/cdc/build.en.md
@@ -0,0 +1,222 @@
++++
+title = "Build"
+weight = 1
++++
+
+## Background Information
+
+ShardingSphere CDC is divided into two parts, one is the CDC Server, and the 
other is the CDC Client. The CDC Server and ShardingSphere-Proxy are currently 
deployed together.
+
+Users can introduce the CDC Client into their own projects to implement data 
consumption logic.
+
+## Constraints
+
+- Pure JAVA development, JDK recommended 1.8 or above.
+- CDC Server requires SharingSphere-Proxy to use cluster mode, currently 
supports ZooKeeper as the registry center.
+- CDC only synchronizes data, does not synchronize table structure, and 
currently does not support DDL statement synchronization.
+- CDC incremental task will not split transaction data of the database shards. 
If you want to enable XA transaction compatibility, both openGauss and 
ShardingSphere-Proxy need the GLT module.
+
+## CDC Server Deployment Steps
+
+Here, the openGauss database is used as an example to introduce the deployment 
steps of the CDC Server.
+
+Since the CDC Server is built into ShardingSphere-Proxy, you need to get 
ShardingSphere-Proxy. For details, please refer to the [proxy startup 
manual](/cn/user-manual/shardingsphere-proxy/startup/bin/).
+
+### Configure GLT Module (Optional)
+
+The official website's released binary package does not include the GLT module 
by default, if you are using the openGauss database with GLT functionality, you 
can additionally introduce the GLT module to ensure the integrity of XA 
transactions.
+
+There are currently two ways to introduce the GLT module, and corresponding 
configurations need to be made in server.yaml.
+
+#### 1. Source code compilation and installation
+
+1.1 Prepare the code environment, download in advance or use Git clone to 
download the [ShardingSphere](https://github.com/apache/shardingsphere.git) 
source code from Github.
+
+1.2 Delete the `<scope>provided</scope>` tag of the 
shardingsphere-global-clock-tso-provider-redis dependency in 
kernel/global-clock/type/tso/core/pom.xml and the `<scope>provided</scope>` tag 
of jedis in kernel/global-clock/type/tso/provider/redis/pom.xml
+
+1.3 Compile ShardingSphere-Proxy, for specific compilation steps, please refer 
to the [ShardingSphere Compilation 
Manual](https://github.com/apache/shardingsphere/wiki#build-apache-shardingsphere).
+
+#### 2. Directly introduce GLT dependencies
+
+Can be introduced from the maven repository
+
+2.1. 
[shardingsphere-global-clock-tso-provider-redis](https://repo1.maven.org/maven2/org/apache/shardingsphere/shardingsphere-global-clock-tso-provider-redis),
 download the same version as ShardingSphere-Proxy
+
+2.2. 
[jedis-4.3.1](https://repo1.maven.org/maven2/redis/clients/jedis/4.3.1/jedis-4.3.1.jar)
+
+### CDC Server User Manual
+
+1. Modify the configuration file `conf/server.yaml` and turn on the CDC 
function. Currently, `mode` must be `Cluster`, and the corresponding registry 
center needs to be started in advance. If the GLT provider uses Redis, Redis 
needs to be started in advance.
+
+Configuration example:
+
+1. Enable CDC function in `server.yaml`.
+
+```yaml
+mode:
+  type: Cluster
+  repository:
+    type: ZooKeeper
+    props:
+      namespace: cdc_demo
+      server-lists: localhost:2181
+      retryIntervalMilliseconds: 500
+      timeToLiveSeconds: 60
+      maxRetries: 3
+      operationTimeoutMilliseconds: 500
+
+authority:
+  users:
+    - user: root@%
+      password: root
+  privilege:
+    type: ALL_PERMITTED
+
+# When using GLT, you also need to enable distributed transactions, GLT is 
only supported by the openGauss database currently.
+#transaction:
+#  defaultType: XA
+#  providerType: Atomikos
+#
+#globalClock:
+#  enabled: true
+#  type: TSO
+#  provider: redis
+#  props:
+#    host: 127.0.0.1
+#    port: 6379
+
+props:
+  system-log-level: INFO
+  check-table-metadata-enabled: false
+  proxy-default-port: 3307 # Proxy default port.
+  cdc-server-port: 33071 # CDC Server port, must be configured
+  proxy-frontend-database-protocol-type: openGauss # Consistent with the type 
of backend database
+```
+
+2. Introduce JDBC driver.
+
+Proxy already includes PostgreSQL, openGauss JDBC driver.
+
+If the backend connects to the following databases, please download the 
corresponding JDBC driver jar package and put it in the 
`${shardingsphere-proxy}/ext-lib` directory.
+
+| Database   | JDBC Driver                                                     
                                                                    |
+|-----------|---------------------------------------------------------------------------------------------------------------------------------|
+| MySQL     | 
[mysql-connector-java-8.0.31.jar](https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.31/)
                            |
+
+4. Start ShardingSphere-Proxy:
+
+```
+sh bin/start.sh
+```
+
+5. View the proxy log `logs/stdout.log`, and see in the log:
+
+```
+[INFO ] [main] o.a.s.p.frontend.ShardingSphereProxy - ShardingSphere-Proxy 
Cluster mode started successfully
+```
+
+Confirm successful startup.
+
+6. Configure CDC task synchronization configuration as needed
+
+6.1. Query configuration.
+
+```sql
+SHOW STREAMING RULE;
+```
+
+The default configuration is as follows:
+
+```
++--------------------------------------------------------------+--------------------------------------+-------------------------------------------------------+
+| read                                                         | write         
                       | stream_channel                                        |
++--------------------------------------------------------------+--------------------------------------+-------------------------------------------------------+
+| {"workerThread":20,"batchSize":1000,"shardingSize":10000000} | 
{"workerThread":20,"batchSize":1000} | 
{"type":"MEMORY","props":{"block-queue-size":"2000"}} |
++--------------------------------------------------------------+--------------------------------------+-------------------------------------------------------+
+```
+
+6.2. Modify configuration (optional).
+
+Because the streaming rule has a default value, no creation is required, only 
the ALTER statement is provided.
+
+Complete configuration DistSQL example:
+
+```sql
+ALTER STREAMING RULE (
+READ(
+  WORKER_THREAD=20,
+  BATCH_SIZE=1000,
+  SHARDING_SIZE=10000000,
+  RATE_LIMITER (TYPE(NAME='QPS',PROPERTIES('qps'='500')))
+),
+WRITE(
+  WORKER_THREAD=20,
+  BATCH_SIZE=1000,
+  RATE_LIMITER (TYPE(NAME='TPS',PROPERTIES('tps'='2000')))
+),
+STREAM_CHANNEL (TYPE(NAME='MEMORY',PROPERTIES('block-queue-size'='2000')))
+);
+```
+
+Configuration item description:
+
+```sql
+ALTER STREAMING RULE (
+READ( -- Data reading configuration. If not configured, some parameters will 
take effect by default.
+  WORKER_THREAD=20, -- Affects full and incremental tasks, the size of the 
thread pool for fetching data from the source end. If not configured, the 
default value will be used. It needs to ensure that this value is not lower 
than the number of database shards
+  BATCH_SIZE=1000, -- Affects full and incremental tasks, the maximum number 
of records returned by a query operation. If the amount of data in a 
transaction is greater than this value, the incremental situation may exceed 
the set value.
+  SHARDING_SIZE=10000000, -- Affects full tasks, the size of stock data 
sharding. If not configured, the default value will be used.
+  RATE_LIMITER ( -- Affects full and incremental tasks, rate limiting 
algorithm. If not configured, no rate limiting.
+  TYPE( -- Algorithm type. Optional: QPS
+  NAME='QPS',
+  PROPERTIES( -- Algorithm properties
+  'qps'='500'
+  )))
+),
+WRITE( -- Data writing configuration. If not configured, some parameters will 
take effect by default.
+  WORKER_THREAD=20, -- Affects full and incremental tasks, the size of the 
thread pool for writing data to the target end. If not configured, the default 
value will be used.
+  BATCH_SIZE=1000, -- Affects full and incremental tasks, the maximum number 
of records for a batch write operation in a stock task. If not configured, the 
default value will be used. If the amount of data in a transaction is greater 
than this value, the incremental situation may exceed the set value.
+  RATE_LIMITER ( -- Rate limiting algorithm. If not configured, no rate 
limiting.
+  TYPE( -- Algorithm type. Optional: TPS
+  NAME='TPS',
+  PROPERTIES( -- Algorithm properties
+  'tps'='2000'
+  )))
+),
+STREAM_CHANNEL ( -- Data channel, connecting producers and consumers, used for 
read and write links. If not configured, the MEMORY type is used by default.
+TYPE( -- Algorithm type. Optional: MEMORY
+NAME='MEMORY',
+PROPERTIES( -- Algorithm properties
+'block-queue-size'='2000' -- Property: Blocking queue size
+)))
+);
+```
+
+## CDC Client Manual
+
+The CDC Client does not need to be deployed separately, just need to introduce 
the dependency of the CDC Client through maven to use it in the project. Users 
can interact with the server through the CDC Client.
+
+If necessary, users can also implement a CDC Client themselves to consume data 
and ACK.
+
+```xml
+<dependency>
+    <groupId>org.apache.shardingsphere</groupId>
+    <artifactId>shardingsphere-data-pipeline-cdc-client</artifactId>
+    <version>${version}</version>
+</dependency>
+```
+
+### CDC Client Introduction
+
+`org.apache.shardingsphere.data.pipeline.cdc.client.CDCClient` is the entry 
class of the CDC Client. Users can interact with the CDC Server through this 
class. The main new methods are as follows.
+
+| Method Name                                                                  
                                               | Return Value | Description     
                                                                                
                                                                                
                                |
+|-----------------------------------------------------------------------------------------------------------------------------|--------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| connect(Consumer<List<Record>> dataConsumer, ExceptionHandler 
exceptionHandler, ServerErrorResultHandler errorResultHandler | void         | 
Connect with the server, when connecting, you need to specify <br/>1. Data 
consumption processing function <br/>2. Exception handling logic during 
consumption <br/>3. Server error exception handling function |
+| login(CDCLoginParameter parameter)                                           
                                               | void         | CDC login, 
parameters <br/>username: username <br/>password: password                      
                                                                                
                                     |
+| startStreaming(StartStreamingParameter parameter)                            
                                               | streamingId  | Start CDC 
subscription, StartStreamingParameter parameters <br/> database: logical 
database name <br/> schemaTables: subscribed table name <br/> full: whether to 
subscribe to full data                        |
+| restartStreaming(String streamingId)                                         
                                               | void         | Restart 
subscription                                                                    
                                                                                
                                        |
+| stopStreaming(String streamingId)                                            
                                               | void         | Stop 
subscription                                                                    
                                                                                
                                           |
+| dropStreaming(String streamingId)                                            
                                               | void         | Delete 
subscription                                                                    
                                                                                
                                         |
+| await()                                                                      
                                               | void         | Block the CDC 
thread and wait for the channel to close                                        
                                                                                
                                  |
+| close()                                                                      
                                               | void         | Close the 
channel, the process ends                                                       
                                                                                
                                      |
diff --git 
a/docs/document/content/user-manual/shardingsphere-proxy/cdc/usage.cn.md 
b/docs/document/content/user-manual/shardingsphere-proxy/cdc/usage.cn.md
new file mode 100644
index 00000000000..fdb66786951
--- /dev/null
+++ b/docs/document/content/user-manual/shardingsphere-proxy/cdc/usage.cn.md
@@ -0,0 +1,287 @@
++++
+title = "使用手册"
+weight = 2
++++
+
+## CDC 功能介绍
+
+CDC 只会同步数据,不会同步表结构,目前也不支持 DDL 的语句同步。
+
+### CDC 协议介绍
+
+CDC 协议使用 Protobuf,对应的 Protobuf 类型是根据 Java 中的类型来映射的。
+
+这里以 openGauss 为例,CDC 协议的数据类型和数据库类型的映射关系如下
+
+| openGauss 类型                             | Java 数据类型          | CDC 对应的 
protobuf 类型 | 备注                                     |
+|------------------------------------------|--------------------|---------------------|----------------------------------------|
+| tinyint、smallint、integer                 | Integer            | int32        
       |                                        |
+| bigint                                   | Long               | int64        
       |                                        |
+| numeric                                  | BigDecimal         | string       
       |                                        |
+| real、float4                              | Float              | float        
       |                                        |
+| binary_double、double precision           | Double             | double       
       |                                        |
+| boolean                                  | Boolean            | bool         
       |                                        |
+| char、varchar、text、clob                   | String             | string       
       |                                        |
+| blob、bytea、raw                           | byte[]             | bytes        
       |                                        |
+| date、timestamp,timestamptz、smalldatetime | java.sql.Timestamp | Timestamp    
       | protobuf 的 Timestamp 类型只包含秒和纳秒,所以和时区无关 |
+| time、timetz                              | java.sql.Time      | int64        
       | 代表当天的纳秒数,和时区无关                         |
+| interval、reltime、abstime                 | String             | string       
       |                                        |
+| point、lseg、box、path、polygon、circle       | String             | string       
       |                                        |
+| cidr、inet、macaddr                        | String             | string       
       |                                        |
+| tsvector                                 | String             | string       
       |                                        |
+| tsquery                                  | String             | String       
       |                                        |
+| uuid                                     | String             | string       
       |                                        |
+| json、jsonb                               | String             | string       
       |                                        |
+| hll                                      | String             | string       
       |                                        |
+| int4range、daterange、tsrange、tstzrange    | String             | string       
       |                                        |
+| hash16、hash32                            | String             | string       
       |                                        |
+| bit、bit varying                          | String             | string       
       | bit(1) 的时候返回 Boolean 类型                |
+
+## openGauss 使用手册
+
+### 环境要求
+
+支持的 openGauss 版本:2.x ~ 3.x。
+
+### 权限要求
+
+1. 调整源端 WAL 配置。
+
+`postgresql.conf` 示例配置:
+```
+wal_level = logical
+max_wal_senders = 10
+max_replication_slots = 10
+wal_sender_timeout = 0
+max_connections = 600
+```
+
+详情请参见 [Write Ahead 
Log](https://docs.opengauss.org/en/docs/2.0.1/docs/Developerguide/settings.html)
 和 
[Replication](https://docs.opengauss.org/en/docs/2.0.1/docs/Developerguide/sending-server.html)。
+
+2. 赋予源端 openGauss 账号 replication 权限。
+
+`pg_hba.conf` 示例配置:
+
+```
+host replication repl_acct 0.0.0.0/0 md5
+# 0.0.0.0/0 表示允许任意 IP 地址访问,可以根据实际情况调整成 CDC Server 的 IP 地址
+```
+
+详情请参见 [Configuring Client Access 
Authentication](https://docs.opengauss.org/en/docs/2.0.1/docs/Developerguide/configuring-client-access-authentication.html)
 和 [Example: Logic Replication 
Code](https://docs.opengauss.org/en/docs/2.0.1/docs/Developerguide/example-logic-replication-code.html)。
+
+3. 赋予 openGauss 账号 DDL DML 权限。
+
+如果使用非超级管理员账号,要求该账号在用到的数据库上,具备 CREATE 和 CONNECT 的权限。
+
+示例:
+```sql
+GRANT CREATE, CONNECT ON DATABASE source_ds TO cdc_user;
+```
+
+还需要账号对订阅的表和 schema 具备访问权限,以 test schema 下的 t_order 表为例。
+
+```sql
+\c source_ds
+
+GRANT USAGE ON SCHEMA test TO GROUP cdc_user;
+GRANT SELECT ON TABLE test.t_order TO cdc_user;
+```
+
+openGauss 有 OWNER 的概念,如果是数据库,SCHEMA,表的 OWNER,则可以省略对应的授权步骤。
+
+openGauss 不允许普通账户在 public schema 下操作。所以如果迁移的表在 public schema 下,需要额外授权。
+
+```sql
+GRANT ALL PRIVILEGES TO cdc_user;
+```
+
+详情请参见 [openGauss 
GRANT](https://docs.opengauss.org/zh/docs/2.0.1/docs/Developerguide/GRANT.html)
+
+### 完整流程示例
+
+#### 前提条件
+
+1. 准备好 CDC 源端的库、表、数据。
+
+```sql
+DROP DATABASE IF EXISTS ds_0;
+CREATE DATABASE ds_0;
+
+DROP DATABASE IF EXISTS ds_1;
+CREATE DATABASE ds_1;
+```
+
+#### 配置 CDC Server
+
+1. 创建逻辑库。
+
+```sql
+CREATE DATABASE sharding_db;
+
+\c sharding_db
+```
+2. 注册存储单元。
+
+```sql
+REGISTER STORAGE UNIT ds_0 (
+    URL="jdbc:opengauss://127.0.0.1:5432/ds_0",
+    USER="gaussdb",
+    PASSWORD="Root@123",
+    PROPERTIES("minPoolSize"="1","maxPoolSize"="20","idleTimeout"="60000")
+), ds_1 (
+    URL="jdbc:opengauss://127.0.0.1:5432/ds_1",
+    USER="gaussdb",
+    PASSWORD="Root@123",
+    PROPERTIES("minPoolSize"="1","maxPoolSize"="20","idleTimeout"="60000")
+);
+```
+
+3. 创建分片规则。
+
+```sql
+CREATE SHARDING TABLE RULE t_order(
+STORAGE_UNITS(ds_0,ds_1),
+SHARDING_COLUMN=order_id,
+TYPE(NAME="hash_mod",PROPERTIES("sharding-count"="2")),
+KEY_GENERATE_STRATEGY(COLUMN=order_id,TYPE(NAME="snowflake"))
+);
+```
+
+4. 创建表
+
+在 proxy 执行建表语句。
+
+```sql
+CREATE TABLE t_order (order_id INT NOT NULL, user_id INT NOT NULL, status 
VARCHAR(45) NULL, PRIMARY KEY (order_id));
+```
+
+#### 启动 CDC Client
+
+目前 CDC Client 只提供了 Java API,用户需要自行实现数据的消费逻辑。
+
+下面是一个简单的启动 CDC Client 的示例。
+
+```java
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.cdc.client.CDCClient;
+import 
org.apache.shardingsphere.data.pipeline.cdc.client.config.CDCClientConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.cdc.client.handler.RetryStreamingExceptionHandler;
+import 
org.apache.shardingsphere.data.pipeline.cdc.client.parameter.CDCLoginParameter;
+import 
org.apache.shardingsphere.data.pipeline.cdc.client.parameter.StartStreamingParameter;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody.SchemaTable;
+
+import java.util.Collections;
+
+@Slf4j
+public final class Bootstrap {
+    
+    @SneakyThrows(InterruptedException.class)
+    public static void main(final String[] args) {
+        String address = "127.0.0.1";
+        // 构造 CDCClient,传入 CDCClientConfiguration,CDCClientConfiguration 中包含了 
CDC Server 的地址和端口,以及超时时间
+        try (CDCClient cdcClient = new CDCClient(new 
CDCClientConfiguration(address, 33071, 10000))) {
+            // 先调用 connect 连接到 CDC Server,需要传入 1. 数据的消费处理逻辑 2. 消费时候的异常处理逻辑 3. 
服务端错误的异常处理逻辑
+            cdcClient.connect(records -> log.info("records: {}", records), new 
RetryStreamingExceptionHandler(cdcClient, 5, 5000),
+                    (ctx, result) -> log.error("Server error: {}", 
result.getErrorMessage()));
+            cdcClient.login(new CDCLoginParameter("root", "root"));
+            // 开始 CDC 数据同步,返回的 streamingId 是这次 CDC 任务的唯一标识,CDC Server 
生成唯一标识的依据是 订阅的数据库名称 + 订阅的表 + 是否是全量同步
+            String streamingId = cdcClient.startStreaming(new 
StartStreamingParameter("sharding_db", 
Collections.singleton(SchemaTable.newBuilder().setTable("t_order").build()), 
true));
+            log.info("Streaming id={}", streamingId);
+            // 防止 main 主线程退出
+            cdcClient.await();
+        }
+    }
+}
+```
+
+主要有4个步骤
+1. 构造 CDCClient,传入 CDCClientConfiguration
+2. 调用 CDCClient.connect,这一步是和 CDC Server 建立连接
+3. 调用 CDCClient.login,使用 server.yaml 中配置好的用户名和密码登录
+4. 调用 CDCClient.startStreaming,开启订阅,需要保证订阅的库和表在 ShardingSphere-Proxy 存在,否则会报错。
+
+> CDCClient.await 是阻塞主线程,非必需的步骤,用其他方式也可以,只要保证 CDC 线程一直在工作就行。
+
+如果需要更复杂数据消费的实现,例如写入到数据库,可以参考 
[DataSourceRecordConsumer](https://github.com/apache/shardingsphere/blob/master/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/DataSourceRecordConsumer.java)
+
+#### 写入数据
+
+通过 proxy 写入数据,此时 CDC Client 会收到数据变更的通知。
+
+```
+INSERT INTO t_order (order_id, user_id, status) VALUES 
(1,1,'ok1'),(2,2,'ok2'),(3,3,'ok3');
+UPDATE t_order SET status='updated' WHERE order_id = 1;
+DELETE FROM t_order WHERE order_id = 2;
+```
+
+Bootstrap 会输出类似的日志
+
+```
+  records: [before {
+  name: "order_id"
+  value {
+    type_url: "type.googleapis.com/google.protobuf.Empty"
+  }
+  ......
+```
+
+#### 查看 CDC 任务运行情况
+
+CDC 任务的启动和停止目前只能通过 CDC Client 控制,可以通过在 proxy 中执行 DistSQL 查看 CDC 任务状态
+
+1. 查看 CDC 任务列表
+
+SHOW STREAMING LIST;
+
+运行结果
+
+```
+sharding_db=> SHOW STREAMING LIST;
+                     id                     |  database   | tables  | 
job_item_count | active |     create_time     | stop_time 
+--------------------------------------------+-------------+---------+----------------+--------+---------------------+-----------
+ j0302p0000702a83116fcee83f70419ca5e2993791 | sharding_db | t_order | 1        
      | true   | 2023-10-27 22:01:27 | 
+(1 row)
+```
+
+2. 查看 CDC 任务详情
+
+SHOW STREAMING STATUS j0302p0000702a83116fcee83f70419ca5e2993791;
+
+运行结果
+
+```
+sharding_db=> SHOW STREAMING STATUS j0302p0000702a83116fcee83f70419ca5e2993791;
+ item | data_source |          status          | active | 
processed_records_count | inventory_finished_percentage | 
incremental_idle_seconds | confirmed_position | current_position | error_message
+------+-------------+--------------------------+--------+-------------------------+-------------------------------+--------------------------+--------------------+------------------+---------------
+ 0    | ds_0        | EXECUTE_INCREMENTAL_TASK | false  | 2                    
   | 100                           | 115                      | 5/597E43D0      
   | 5/597E4810       |
+ 1    | ds_1        | EXECUTE_INCREMENTAL_TASK | false  | 3                    
   | 100                           | 115                      | 5/597E4450      
   | 5/597E4810       |
+(2 rows)
+```
+
+3. 删除 CDC 任务
+
+DROP STREAMING j0302p0000702a83116fcee83f70419ca5e2993791;
+
+只有当 CDC 任务没有订阅的时候才可以删除,此时也会删除 openGauss 物理库上的 replication slots
+
+```
+sharding_db=> DROP STREAMING j0302p0000702a83116fcee83f70419ca5e2993791;
+SUCCESS
+```
+
+# 注意事项
+
+## 增量数据推送的说明
+
+1. CDC 增量推送目前是按照事务维度的,物理库的事务不会被拆分,所以如果一个事务中有多个表的数据变更,那么这些数据变更会被一起推送。
+如果要支持 XA 事务(目前只支持 openGauss),则 openGauss 和 Proxy 都需要 GLT 模块。
+2. 满足推送的条件是满足了一定大小的数据量或者到了一定的时间间隔(目前是 300ms),在处理 XA 事务时,收到的多个分库增量事件超过了 
300ms,可能会导致 XA 事务被拆开推送。
+
+## 超大事务的处理
+
+目前是将大事务完整解析,这样可能会导致 CDC Server 进程 OOM,后续可能会考虑强制截断。
+
+## 建议的配置
+
+CDC 的性能目前没有一个固定的值,可以关注配置中读/写的 batchSize,以及内存队列的大小,根据实际情况进行调优。
diff --git 
a/docs/document/content/user-manual/shardingsphere-proxy/cdc/usage.en.md 
b/docs/document/content/user-manual/shardingsphere-proxy/cdc/usage.en.md
new file mode 100644
index 00000000000..1cb1bc89618
--- /dev/null
+++ b/docs/document/content/user-manual/shardingsphere-proxy/cdc/usage.en.md
@@ -0,0 +1,287 @@
++++
+title = "User Manual"
+weight = 2
++++
+
+## Introduction to CDC Function
+
+CDC only synchronizes data, it does not synchronize table structures, and 
currently does not support the synchronization of DDL statements.
+
+### Introduction to CDC Protocol
+
+The CDC protocol uses Protobuf, and the corresponding Protobuf types are 
mapped based on the types in Java.
+
+Here, taking openGauss as an example, the mapping relationship between the 
data types of the CDC protocol and the database types is as follows.
+
+| openGauss type                           | Java data type     | CDC 
corresponding protobuf type | Remarks                                           
                                                            |
+|------------------------------------------|--------------------|---------------------------------|---------------------------------------------------------------------------------------------------------------|
+| tinyint, smallint, integer               | Integer            | int32        
                   |                                                            
                                                   |
+| bigint                                   | Long               | int64        
                   |                                                            
                                                   |
+| numeric                                  | BigDecimal         | string       
                   |                                                            
                                                   |
+| real, float4                             | Float              | float        
                   |                                                            
                                                   |
+| binary_double, double precision          | Double             | double       
                   |                                                            
                                                   |
+| boolean                                  | Boolean            | bool         
                   |                                                            
                                                   |
+| char, varchar, text, clob                | String             | string       
                   |                                                            
                                                   |
+| blob, bytea, raw                         | byte[]             | bytes        
                   |                                                            
                                                   |
+| date, timestamp, timestamptz, smalldatetime | java.sql.Timestamp | Timestamp 
                      | The Timestamp type of protobuf only contains seconds 
and nanoseconds, so it is irrelevant to the time zone |
+| time, timetz                             | java.sql.Time       | int64       
                    | Represents the number of nanoseconds of the day, 
irrelevant to the time zone                                  |
+| interval, reltime, abstime               | String             | string       
                   |                                                            
                                                   |
+| point, lseg, box, path, polygon, circle  | String             | string       
                   |                                                            
                                                   |
+| cidr, inet, macaddr                      | String             | string       
                   |                                                            
                                                   |
+| tsvector                                 | String             | string       
                   |                                                            
                                                   |
+| tsquery                                  | String             | String       
                   |                                                            
                                                   |
+| uuid                                     | String             | string       
                   |                                                            
                                                   |
+| json, jsonb                              | String             | string       
                   |                                                            
                                                   |
+| hll                                      | String             | string       
                   |                                                            
                                                   |
+| int4range, daterange, tsrange, tstzrange | String             | string       
                   |                                                            
                                                   |
+| hash16, hash32                           | String             | string       
                   |                                                            
                                                   |
+| bit, bit varying                         | String             | string       
                   | Returns Boolean type when bit(1)                           
                                                   |
+
+## openGauss User Manual
+
+### Environmental Requirements
+
+Supported openGauss versions: 2.x ~ 3.x.
+
+### Permission Requirements
+
+1. Adjust the source end WAL configuration.
+
+Example configuration for `postgresql.conf`:
+```
+wal_level = logical
+max_wal_senders = 10
+max_replication_slots = 10
+wal_sender_timeout = 0
+max_connections = 600
+```
+
+For details, please refer to [Write Ahead 
Log](https://docs.opengauss.org/en/docs/2.0.1/docs/Developerguide/settings.html)
 and 
[Replication](https://docs.opengauss.org/en/docs/2.0.1/docs/Developerguide/sending-server.html).
+
+2. Grant replication permission to the source end openGauss account.
+
+Example configuration for `pg_hba.conf`:
+
+```
+host replication repl_acct 0.0.0.0/0 md5
+# 0.0.0.0/0 means allowing access from any IP address, which can be adjusted 
to the IP address of the CDC Server according to the actual situation
+```
+
+For details, please refer to [Configuring Client Access 
Authentication](https://docs.opengauss.org/en/docs/2.0.1/docs/Developerguide/configuring-client-access-authentication.html)
 and [Example: Logic Replication 
Code](https://docs.opengauss.org/en/docs/2.0.1/docs/Developerguide/example-logic-replication-code.html).
+
+3. Grant DDL DML permissions to the openGauss account.
+
+If a non-super administrator account is used, it is required that this account 
has CREATE and CONNECT permissions on the database used.
+
+Example:
+```sql
+GRANT CREATE, CONNECT ON DATABASE source_ds TO cdc_user;
+```
+
+The account also needs to have access permissions to the table and schema to 
be subscribed, taking the t_order table under the test schema as an example.
+
+```sql
+\c source_ds
+
+GRANT USAGE ON SCHEMA test TO GROUP cdc_user;
+GRANT SELECT ON TABLE test.t_order TO cdc_user;
+```
+
+openGauss has the concept of OWNER. If it is the OWNER of the database, 
SCHEMA, or table, the corresponding authorization steps can be omitted.
+
+openGauss does not allow ordinary accounts to operate under the public schema. 
So if the table to be migrated is under the public schema, additional 
authorization is needed.
+
+```sql
+GRANT ALL PRIVILEGES TO cdc_user;
+```
+
+For details, please refer to [openGauss 
GRANT](https://docs.opengauss.org/zh/docs/2.0.1/docs/Developerguide/GRANT.html)
+
+### Complete Process Example
+
+#### Prerequisites
+
+1. Prepare the database, table, and data of the CDC source end.
+
+```sql
+DROP DATABASE IF EXISTS ds_0;
+CREATE DATABASE ds_0;
+
+DROP DATABASE IF EXISTS ds_1;
+CREATE DATABASE ds_1;
+```
+
+#### Configure CDC Server
+
+1. Create a logical database.
+
+```sql
+CREATE DATABASE sharding_db;
+
+\c sharding_db
+```
+2. Register storage unit.
+
+```sql
+REGISTER STORAGE UNIT ds_0 (
+    URL="jdbc:opengauss://127.0.0.1:5432/ds_0",
+    USER="gaussdb",
+    PASSWORD="Root@123",
+    PROPERTIES("minPoolSize"="1","maxPoolSize"="20","idleTimeout"="60000")
+), ds_1 (
+    URL="jdbc:opengauss://127.0.0.1:5432/ds_1",
+    USER="gaussdb",
+    PASSWORD="Root@123",
+    PROPERTIES("minPoolSize"="1","maxPoolSize"="20","idleTimeout"="60000")
+);
+```
+
+3. Create sharding rules.
+
+```sql
+CREATE SHARDING TABLE RULE t_order(
+STORAGE_UNITS(ds_0,ds_1),
+SHARDING_COLUMN=order_id,
+TYPE(NAME="hash_mod",PROPERTIES("sharding-count"="2")),
+KEY_GENERATE_STRATEGY(COLUMN=order_id,TYPE(NAME="snowflake"))
+);
+```
+
+4. Create tables
+
+Execute the creation table statement in the proxy.
+
+```sql
+CREATE TABLE t_order (order_id INT NOT NULL, user_id INT NOT NULL, status 
VARCHAR(45) NULL, PRIMARY KEY (order_id));
+```
+
+#### Start CDC Client
+
+Currently, the CDC Client only provides a Java API, and users need to 
implement the data consumption themselves.
+
+Below is a simple example of starting the CDC Client.
+
+```java
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.cdc.client.CDCClient;
+import 
org.apache.shardingsphere.data.pipeline.cdc.client.config.CDCClientConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.cdc.client.handler.RetryStreamingExceptionHandler;
+import 
org.apache.shardingsphere.data.pipeline.cdc.client.parameter.CDCLoginParameter;
+import 
org.apache.shardingsphere.data.pipeline.cdc.client.parameter.StartStreamingParameter;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody.SchemaTable;
+
+import java.util.Collections;
+
+@Slf4j
+public final class Bootstrap {
+
+    @SneakyThrows(InterruptedException.class)
+    public static void main(final String[] args) {
+        String address = "127.0.0.1";
+        // Construct CDCClient, pass in CDCClientConfiguration, 
CDCClientConfiguration contains the address and port of the CDC Server, as well 
as the timeout time
+        try (CDCClient cdcClient = new CDCClient(new 
CDCClientConfiguration(address, 33071, 10000))) {
+            // First call connect to the CDC Server, you need to pass in 1. 
Data consumption processing logic 2. Exception handling logic during 
consumption 3. Server error exception handling logic
+            cdcClient.connect(records -> log.info("records: {}", records), new 
RetryStreamingExceptionHandler(cdcClient, 5, 5000),
+                    (ctx, result) -> log.error("Server error: {}", 
result.getErrorMessage()));
+            cdcClient.login(new CDCLoginParameter("root", "root"));
+            // Start CDC data synchronization, the returned streamingId is the 
unique identifier of this CDC task, the basis for the CDC Server to generate a 
unique identifier is the name of the subscribed database + the subscribed table 
+ whether it is full synchronization
+            String streamingId = cdcClient.startStreaming(new 
StartStreamingParameter("sharding_db", 
Collections.singleton(SchemaTable.newBuilder().setTable("t_order").build()), 
true));
+            log.info("Streaming id={}", streamingId);
+            // Prevent the main thread from exiting
+            cdcClient.await();
+        }
+    }
+}
+```
+
+There are mainly 4 steps
+1. Construct CDCClient, pass in CDCClientConfiguration
+2. Call CDCClient.connect(), this step is to establish a connection with the 
CDC Server
+3. Call CDCClient.login(), log in with the username and password configured in 
server.yaml
+4. Call CDCClient.startStreaming(), start subscribing, you need to ensure that 
the subscribed database and table exist in ShardingSphere-Proxy, otherwise an 
error will be reported
+
+> CDCClient.await is to block the main thread, it is not a necessary step, 
other methods can also be used, as long as the CDC thread is always working.
+
+If you need more complex data consumption implementation, such as writing to 
the database, you can refer to 
[DataSourceRecordConsumer](https://github.com/apache/shardingsphere/blob/master/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/DataSourceRecordConsumer.java)
+
+#### Write Data
+
+When write data through a proxy, the CDC Client is notified of the data change.
+
+```
+INSERT INTO t_order (order_id, user_id, status) VALUES 
(1,1,'ok1'),(2,2,'ok2'),(3,3,'ok3');
+UPDATE t_order SET status='updated' WHERE order_id = 1;
+DELETE FROM t_order WHERE order_id = 2;
+```
+
+Bootstrap will output a similar log.
+
+```
+  records: [before {
+  name: "order_id"
+  value {
+    type_url: "type.googleapis.com/google.protobuf.Empty"
+  }
+  ......
+```
+
+#### View the Running Status of the CDC Task
+
+The start and stop of the CDC task can only be controlled by the CDC Client. 
You can view the status of the CDC task by executing DistSQL in the proxy
+
+1. View the CDC task list
+
+SHOW STREAMING LIST;
+
+Running result
+
+```
+sharding_db=> SHOW STREAMING LIST;
+                     id                     |  database   | tables  | 
job_item_count | active |     create_time     | stop_time
+--------------------------------------------+-------------+---------+----------------+--------+---------------------+-----------
+ j0302p0000702a83116fcee83f70419ca5e2993791 | sharding_db | t_order | 1        
      | true   | 2023-10-27 22:01:27 |
+(1 row)
+```
+
+2. View the details of the CDC task
+
+SHOW STREAMING STATUS j0302p0000702a83116fcee83f70419ca5e2993791;
+
+Running result
+
+```
+sharding_db=> SHOW STREAMING STATUS j0302p0000702a83116fcee83f70419ca5e2993791;
+ item | data_source |          status          | active | 
processed_records_count | inventory_finished_percentage | 
incremental_idle_seconds | confirmed_position | current_position | error_message
+------+-------------+--------------------------+--------+-------------------------+-------------------------------+--------------------------+--------------------+------------------+---------------
+ 0    | ds_0        | EXECUTE_INCREMENTAL_TASK | false  | 2                    
   | 100                           | 115                      | 5/597E43D0      
   | 5/597E4810       |
+ 1    | ds_1        | EXECUTE_INCREMENTAL_TASK | false  | 3                    
   | 100                           | 115                      | 5/597E4450      
   | 5/597E4810       |
+(2 rows)
+```
+
+3. Drop CDC task
+
+DROP STREAMING j0302p0000702a83116fcee83f70419ca5e2993791;
+
+The CDC task can only be deleted when there are no subscriptions. At this 
time, the replication slots on the openGauss physical database will also be 
deleted.
+
+```
+sharding_db=> DROP STREAMING j0302p0000702a83116fcee83f70419ca5e2993791;
+SUCCESS
+```
+
+# Precautions
+
+## Explanation of incremental data push
+
+1. The CDC incremental push is currently transactional, and the transactions 
of the physical database will not be split. Therefore, if there are data 
changes in multiple tables in a transaction, these data changes will be pushed 
together.
+If you want to support XA transactions (currently only supports openGauss), 
both openGauss and Proxy need the GLT module.
+2. The conditions for push are met when a certain amount of data is met or a 
certain time interval is reached (currently 300ms). When processing XA 
transactions, if the received multiple physical database incremental events 
exceed 300ms, it may cause the XA transaction to be split and pushed.
+
+## Handling of large transactions
+
+Currently, large transactions are fully parsed, which may cause the CDC Server 
process to OOM. In the future, forced truncation may be considered.
+
+## Recommended configuration
+
+There is no fixed value for the performance of CDC, you can focus on the 
batchSize of read/write in the configuration, and the size of the memory queue, 
and tune it according to the actual situation.

Reply via email to