sandynz commented on code in PR #28889:
URL: https://github.com/apache/shardingsphere/pull/28889#discussion_r1437605205


##########
docs/document/content/user-manual/shardingsphere-proxy/cdc/_index.cn.md:
##########
@@ -0,0 +1,8 @@
++++
+title = "CDC"
+weight = 9
++++
+
+CDC(Change Data Capture)增量数据捕捉。CDC 可以监控 ShardingSphere-Proxy 
的存储节点中的数据变化,捕捉到数据操作事件,过滤并提取有用信息,最终将这些变化数据发送到指定的目标上.
+
+CDC可以用于数据同步,数据备份和恢复等方面。通常情况下,目前支持 openGauss、MySQL 和 PostgreSQL.

Review Comment:
   1, The last full stop should be Chinese.
   
   2, All the English words could be surrounded with blank.



##########
docs/document/content/user-manual/shardingsphere-proxy/cdc/build.en.md:
##########
@@ -0,0 +1,225 @@
++++
+title = "Build"
+weight = 1
++++
+
+## Background Information
+
+ShardingSphere CDC is divided into two parts, one is the CDC Server, and the 
other is the CDC Client. The CDC Server and ShardingSphere-Proxy are currently 
deployed together.
+
+Users can introduce the CDC Client into their own projects to implement data 
consumption logic.
+
+## Constraints
+
+- Pure JAVA development, JDK recommended 1.8 or above.
+- CDC Server requires SharingSphere-Proxy to use cluster mode, currently 
supports ZooKeeper as the registry center.
+- CDC only synchronizes data, does not synchronize table structure, and 
currently does not support DDL statement synchronization.
+- CDC incremental stage will output data according to the dimension of the 
transaction. If you want to enable XA transaction compatibility, both openGauss 
and ShardingSphere-Proxy need the GLT module.
+
+## CDC Server Deployment Steps
+
+Here, the openGauss database is used as an example to introduce the deployment 
steps of the CDC Server.
+
+Since the CDC Server is built into ShardingSphere-Proxy, you need to get 
ShardingSphere-Proxy. For details, please refer to the [proxy startup 
manual](/cn/user-manual/shardingsphere-proxy/startup/bin/).
+
+### Configure GLT Module (Optional)
+
+The official website's released binary package does not include the GLT module 
by default and does not guarantee the integrity of cross-library transactions. 
If you are using the openGauss database with GLT functionality, you can 
additionally introduce the GLT module to ensure the integrity of cross-library 
transactions.
+
+There are currently two ways to introduce the GLT module, and corresponding 
configurations need to be made in server.yaml.
+
+#### 1. Source code compilation and installation
+
+1.1 Prepare the code environment, download in advance or use Git clone to 
download the [ShardingSphere](https://github.com/apache/shardingsphere.git) 
source code from Github.
+
+1.2 Delete the `<scope>provided</scope>` tag of the 
shardingsphere-global-clock-tso-provider-redis dependency in 
kernel/global-clock/type/tso/core/pom.xml and the `<scope>provided</scope>` tag 
of jedis in kernel/global-clock/type/tso/provider/redis/pom.xml
+
+1.3 Compile ShardingSphere-Proxy, for specific compilation steps, please refer 
to the [ShardingSphere Compilation 
Manual](https://github.com/apache/shardingsphere/wiki#build-apache-shardingsphere).
+
+#### 2. Directly introduce GLT dependencies
+
+Can be introduced from the maven repository
+
+2.1. 
[shardingsphere-global-clock-tso-provider-redis](https://repo1.maven.org/maven2/org/apache/shardingsphere/shardingsphere-global-clock-tso-provider-redis),
 download the same version as ShardingSphere-Proxy
+
+2.2. 
[jedis-4.3.1](https://repo1.maven.org/maven2/redis/clients/jedis/4.3.1/jedis-4.3.1.jar)
+
+### CDC Server User Manual
+
+1. Modify the configuration file `conf/server.yaml` and turn on the CDC 
function. Currently, `mode` must be `Cluster`, and the corresponding registry 
center needs to be started in advance. If the GLT provider uses Redis, Redis 
needs to be started in advance.
+
+Configuration example:
+
+1. Enable CDC function in `server.yaml`.
+
+```yaml
+mode:
+  type: Cluster
+  repository:
+    type: ZooKeeper
+    props:
+      namespace: cdc_demo
+      server-lists: localhost:2181
+      retryIntervalMilliseconds: 500
+      timeToLiveSeconds: 60
+      maxRetries: 3
+      operationTimeoutMilliseconds: 500
+
+authority:
+  users:
+    - user: root@%
+      password: root
+  privilege:
+    type: ALL_PERMITTED
+
+# When using GLT, you also need to enable distributed transactions
+#transaction:
+#  defaultType: XA
+#  providerType: Atomikos
+#
+#globalClock:
+#  enabled: true
+#  type: TSO
+#  provider: redis
+#  props:
+#    host: 127.0.0.1
+#    port: 6379
+
+props:
+  system-log-level: INFO
+  check-table-metadata-enabled: false
+  proxy-default-port: 3307 # Proxy default port.
+  cdc-server-port: 33071 # CDC Server port, must be configured
+  #proxy-frontend-database-protocol-type: openGauss # Consistent with the type 
of backend database
+```
+
+2. Introduce JDBC driver.
+
+Proxy already includes PostgreSQL JDBC driver.
+
+If the backend connects to the following databases, please download the 
corresponding JDBC driver jar package and put it in the 
`${shardingsphere-proxy}/ext-lib` directory.
+
+| Database   | JDBC Driver                                                     
                                                                    |
+|-----------|---------------------------------------------------------------------------------------------------------------------------------|
+| MySQL     | 
[mysql-connector-java-8.0.31.jar](https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.31/)
                            |
+| openGauss | 
[opengauss-jdbc-3.1.1-og.jar](https://repo1.maven.org/maven2/org/opengauss/opengauss-jdbc/3.1.1-og/opengauss-jdbc-3.1.1-og.jar)
 |
+
+4. Start ShardingSphere-Proxy:
+
+```
+sh bin/start.sh
+```
+
+5. View the proxy log `logs/stdout.log`, and see in the log:
+
+```
+[INFO ] [main] o.a.s.p.frontend.ShardingSphereProxy - ShardingSphere-Proxy 
Cluster mode started successfully
+```
+
+Confirm successful startup.
+
+6. Configure CDC task synchronization configuration as needed
+
+6.1. Query configuration.
+
+```sql
+SHOW STREAMING RULE;
+```
+
+The default configuration is as follows:
+
+```
++--------------------------------------------------------------+--------------------------------------+-------------------------------------------------------+
+| read                                                         | write         
                       | stream_channel                                        |
++--------------------------------------------------------------+--------------------------------------+-------------------------------------------------------+
+| {"workerThread":20,"batchSize":1000,"shardingSize":10000000} | 
{"workerThread":20,"batchSize":1000} | 
{"type":"MEMORY","props":{"block-queue-size":"2000"}} |
++--------------------------------------------------------------+--------------------------------------+-------------------------------------------------------+
+```
+
+6.2. Modify configuration (optional).
+
+Because the streaming rule has a default value, no creation is required, only 
the ALTER statement is provided.
+
+Complete configuration DistSQL example:
+
+```sql
+ALTER STREAMING RULE (
+READ(
+  WORKER_THREAD=20,
+  BATCH_SIZE=1000,
+  SHARDING_SIZE=10000000,
+  RATE_LIMITER (TYPE(NAME='QPS',PROPERTIES('qps'='500')))
+),
+WRITE(
+  WORKER_THREAD=20,
+  BATCH_SIZE=1000,
+  RATE_LIMITER (TYPE(NAME='TPS',PROPERTIES('tps'='2000')))
+),
+STREAM_CHANNEL (TYPE(NAME='MEMORY',PROPERTIES('block-queue-size'='2000')))
+);
+```
+
+Configuration item description:
+
+```sql
+ALTER STREAMING RULE (
+READ( -- Data reading configuration. If not configured, some parameters will 
take effect by default.
+  WORKER_THREAD=20, -- Affects full and incremental tasks, the size of the 
thread pool for fetching data from the source end. If not configured, the 
default value will be used. It needs to ensure that this value is not lower 
than the number of sub-libraries

Review Comment:
   Looks `sub-libraries` is not exact enough



##########
docs/document/content/user-manual/shardingsphere-proxy/cdc/usage.cn.md:
##########
@@ -0,0 +1,269 @@
++++
+title = "使用手册"
+weight = 2
++++
+
+## CDC 功能介绍
+
+CDC 只会同步数据,不会同步表结构,目前也不支持 DDL 的语句同步。
+
+### CDC 协议介绍
+
+CDC 协议使用 Protobuf,对应的 Protobuf 类型是根据 Java 中的类型来映射的。
+
+这里以 openGauss 为例,CDC 协议的数据类型和数据库类型的映射关系如下
+
+| openGauss 类型                             | Java 数据类型          | CDC 对应的 
protobuf 类型 | 备注                                     |
+|------------------------------------------|--------------------|---------------------|----------------------------------------|
+| tinyint、smallint、integer                 | Integer            | int32        
       |                                        |
+| bigint                                   | Long               | int64        
       |                                        |
+| numeric                                  | BigDecimal         | string       
       |                                        |
+| real、float4                              | Float              | float        
       |                                        |
+| binary_double、double precision           | Double             | double       
       |                                        |
+| boolean                                  | Boolean            | bool         
       |                                        |
+| char、varchar、text、clob                   | String             | string       
       |                                        |
+| blob、bytea、raw                           | byte[]             | bytes        
       |                                        |
+| date、timestamp,timestamptz、smalldatetime | java.sql.Timestamp | Timestamp    
       | protobuf 的 Timestamp 类型只包含秒和纳秒,所以和时区无关 |
+| time、timetz                              | java.sql.Time      | int64        
       | 代表当天的纳秒数,和时区无关                         |
+| interval、reltime、abstime                 | String             | string       
       |                                        |
+| point、lseg、box、path、polygon、circle       | String             | string       
       |                                        |
+| cidr、inet、macaddr                        | String             | string       
       |                                        |
+| tsvector                                 | String             | string       
       |                                        |
+| tsquery                                  | String             | String       
       |                                        |
+| uuid                                     | String             | string       
       |                                        |
+| json、jsonb                               | String             | string       
       |                                        |
+| hll                                      | String             | string       
       |                                        |
+| int4range、daterange、tsrange、tstzrange    | String             | string       
       |                                        |
+| hash16、hash32                            | String             | string       
       |                                        |
+| bit、bit varying                          | String             | string       
       | bit(1) 的时候返回 Boolean 类型                |
+
+## openGauss 使用手册
+
+### 环境要求
+
+支持的 openGauss 版本:2.x ~ 3.x。
+
+### 权限要求
+
+1. 调整源端 WAL 配置。
+
+`postgresql.conf` 示例配置:
+```
+wal_level = logical
+max_wal_senders = 10
+max_replication_slots = 10
+wal_sender_timeout = 0
+max_connections = 600
+```
+
+详情请参见 [Write Ahead 
Log](https://docs.opengauss.org/en/docs/2.0.1/docs/Developerguide/settings.html)
 和 
[Replication](https://docs.opengauss.org/en/docs/2.0.1/docs/Developerguide/sending-server.html)。
+
+2. 赋予源端 openGauss 账号 replication 权限。
+
+`pg_hba.conf` 示例配置:
+
+```
+host replication repl_acct 0.0.0.0/0 md5
+# 0.0.0.0/0 表示允许任意 IP 地址访问,可以根据实际情况调整成 CDC Server 的 IP 地址
+```
+
+详情请参见 [Configuring Client Access 
Authentication](https://docs.opengauss.org/en/docs/2.0.1/docs/Developerguide/configuring-client-access-authentication.html)
 和 [Example: Logic Replication 
Code](https://docs.opengauss.org/en/docs/2.0.1/docs/Developerguide/example-logic-replication-code.html)。
+
+3. 赋予 openGauss 账号 DDL DML 权限。
+
+如果使用非超级管理员账号,要求该账号在用到的数据库上,具备 CREATE 和 CONNECT 的权限。
+
+示例:
+```sql
+GRANT CREATE, CONNECT ON DATABASE source_ds TO cdc_user;
+```
+
+还需要账号对迁移的表和 schema 具备访问权限,以 test schema 下的 t_order 表为例。
+
+```sql
+\c source_ds
+
+GRANT USAGE ON SCHEMA test TO GROUP cdc_user;
+GRANT SELECT ON TABLE test.t_order TO cdc_user;
+```
+
+openGauss 有 OWNER 的概念,如果是数据库,SCHEMA,表的 OWNER,则可以省略对应的授权步骤。
+
+openGauss 不允许普通账户在 public schema 下操作。所以如果迁移的表在 public schema 下,需要额外授权。
+
+```sql
+GRANT ALL PRIVILEGES TO cdc_user;
+```
+
+详情请参见 [openGauss 
GRANT](https://docs.opengauss.org/zh/docs/2.0.1/docs/Developerguide/GRANT.html)
+
+### 完整流程示例
+
+#### 前提条件
+
+1. 准备好 CDC 源端的库、表、数据。
+
+```sql
+DROP DATABASE IF EXISTS ds_0;
+CREATE DATABASE ds_0;
+
+DROP DATABASE IF EXISTS ds_1;
+CREATE DATABASE ds_1;
+```
+
+#### 配置 CDC Server
+
+1. 创建逻辑库。
+
+```sql
+CREATE DATABASE sharding_db;
+
+\c sharding_db
+```
+2. 注册存储单元。
+
+```sql
+REGISTER STORAGE UNIT ds_0 (
+    URL="jdbc:opengauss://127.0.0.1:5432/ds_0",
+    USER="gaussdb",
+    PASSWORD="Root@123",
+    PROPERTIES("minPoolSize"="1","maxPoolSize"="20","idleTimeout"="60000")
+), ds_1 (
+    URL="jdbc:opengauss://127.0.0.1:5432/ds_1",
+    USER="gaussdb",
+    PASSWORD="Root@123",
+    PROPERTIES("minPoolSize"="1","maxPoolSize"="20","idleTimeout"="60000")
+);
+```
+
+3. 创建分片规则。
+
+```sql
+CREATE SHARDING TABLE RULE t_order(
+STORAGE_UNITS(ds_0,ds_1),
+SHARDING_COLUMN=order_id,
+TYPE(NAME="hash_mod",PROPERTIES("sharding-count"="2")),
+KEY_GENERATE_STRATEGY(COLUMN=order_id,TYPE(NAME="snowflake"))
+);
+```
+
+4. 创建表和初始化数据

Review Comment:
   Is there incremental data step?



##########
docs/document/content/user-manual/shardingsphere-proxy/cdc/build.cn.md:
##########
@@ -0,0 +1,224 @@
++++
+title = "运行部署"
+weight = 1
++++
+
+## 背景信息
+
+ShardingSphere CDC 分为两个部分,一个是 CDC Server,另一个是 CDC Client。 CDC Server 和 
ShardingSphere-Proxy 目前是一同部署的。
+
+用户可以在自己的项目中引入 CDC Client,实现数据的消费逻辑。
+
+## 约束条件
+
+- 纯 JAVA 开发,JDK 建议 1.8 或以上版本。
+- CDC Server 要求 SharingSphere-Proxy 使用集群模式,目前支持 ZooKeeper 作为注册中心。
+- CDC 只同步数据,不会同步表结构,目前也不支持 DDL 的语句同步。
+- CDC 增量阶段会按照事务的维度输出数据, 如果要开启 XA 事务的兼容,则 openGauss 和 ShardingSphere-Proxy 都需要 
GLT 模块
+
+## CDC Server 部署步骤
+
+这里以 openGauss 数据库为例,介绍 CDC Server 的部署步骤。
+
+由于 CDC Server 内置于 ShardingSphere-Proxy,所以需要获取 ShardingSphere-Proxy。详情请参见 
[proxy 启动手册](/cn/user-manual/shardingsphere-proxy/startup/bin/)。
+
+### 配置 GLT 模块(可选)
+
+官网发布的二进制包默认不包含 GLT 模块,不保证跨库事务完整性,如果使用的是包含 GLT 功能的 openGauss 数据库,则可以额外引入 GLT 
模块,保证跨库事务的完整性。
+
+目前有两种方式引入 GLT 模块,并且需要在 server.yaml 中也进行相应的配置。
+
+#### 1. 源码编译安装
+
+1.1 准备代码环境,提前下载或者使用 Git clone,从 Github 下载 
[ShardingSphere](https://github.com/apache/shardingsphere.git) 源码。
+
+1.2 删除 kernel/global-clock/type/tso/core/pom.xml 中 
shardingsphere-global-clock-tso-provider-redis 依赖的 `<scope>provided</scope>` 
标签和 kernel/global-clock/type/tso/provider/redis/pom.xml 中 jedis
+   的 `<scope>provided</scope>` 标签
+
+1.3 编译 ShardingSphere-Proxy,具体编译步骤请参考 [ShardingSphere 
编译手册](https://github.com/apache/shardingsphere/wiki#build-apache-shardingsphere)。
+
+#### 2. 直接引入 GLT 依赖
+
+可以从 maven 仓库中引入
+
+2.1. 
[shardingsphere-global-clock-tso-provider-redis](https://repo1.maven.org/maven2/org/apache/shardingsphere/shardingsphere-global-clock-tso-provider-redis),下载和
 ShardingSphere-Proxy 同名版本
+
+2.2. 
[jedis-4.3.1](https://repo1.maven.org/maven2/redis/clients/jedis/4.3.1/jedis-4.3.1.jar)
+
+### CDC Server 使用手册
+
+1. 修改配置文件 `conf/server.yaml`,打开 CDC 功能。 目前 `mode` 必须是 
`Cluster`,需要提前启动对应的注册中心。如果 GLT provider 使用 Redis,需要提前启动 Redis。
+
+配置示例:
+
+1. 在 `server.yaml` 中开启 CDC 功能。
+
+```yaml
+mode:
+  type: Cluster
+  repository:
+    type: ZooKeeper
+    props:
+      namespace: cdc_demo
+      server-lists: localhost:2181
+      retryIntervalMilliseconds: 500
+      timeToLiveSeconds: 60
+      maxRetries: 3
+      operationTimeoutMilliseconds: 500
+
+authority:
+  users:
+    - user: root@%
+      password: root
+  privilege:
+    type: ALL_PERMITTED
+
+# 使用 GLT 的时候也需要开启分布式事务
+#transaction:
+#  defaultType: XA
+#  providerType: Atomikos
+#
+#globalClock:
+#  enabled: true
+#  type: TSO
+#  provider: redis
+#  props:
+#    host: 127.0.0.1
+#    port: 6379
+
+props:
+  system-log-level: INFO
+  check-table-metadata-enabled: false
+  proxy-default-port: 3307 # Proxy default port.
+  cdc-server-port: 33071 # CDC Server 端口,必须配置
+  #proxy-frontend-database-protocol-type: openGauss # 和后端数据库的类型一致

Review Comment:
   Could we uncomment `globalClock` and 
`proxy-frontend-database-protocol-type`, if they're required



##########
docs/document/content/user-manual/shardingsphere-proxy/cdc/build.cn.md:
##########
@@ -0,0 +1,224 @@
++++
+title = "运行部署"
+weight = 1
++++
+
+## 背景信息
+
+ShardingSphere CDC 分为两个部分,一个是 CDC Server,另一个是 CDC Client。 CDC Server 和 
ShardingSphere-Proxy 目前是一同部署的。
+
+用户可以在自己的项目中引入 CDC Client,实现数据的消费逻辑。
+
+## 约束条件
+
+- 纯 JAVA 开发,JDK 建议 1.8 或以上版本。
+- CDC Server 要求 SharingSphere-Proxy 使用集群模式,目前支持 ZooKeeper 作为注册中心。
+- CDC 只同步数据,不会同步表结构,目前也不支持 DDL 的语句同步。
+- CDC 增量阶段会按照事务的维度输出数据, 如果要开启 XA 事务的兼容,则 openGauss 和 ShardingSphere-Proxy 都需要 
GLT 模块
+
+## CDC Server 部署步骤
+
+这里以 openGauss 数据库为例,介绍 CDC Server 的部署步骤。
+
+由于 CDC Server 内置于 ShardingSphere-Proxy,所以需要获取 ShardingSphere-Proxy。详情请参见 
[proxy 启动手册](/cn/user-manual/shardingsphere-proxy/startup/bin/)。
+
+### 配置 GLT 模块(可选)
+
+官网发布的二进制包默认不包含 GLT 模块,不保证跨库事务完整性,如果使用的是包含 GLT 功能的 openGauss 数据库,则可以额外引入 GLT 
模块,保证跨库事务的完整性。
+
+目前有两种方式引入 GLT 模块,并且需要在 server.yaml 中也进行相应的配置。
+
+#### 1. 源码编译安装
+
+1.1 准备代码环境,提前下载或者使用 Git clone,从 Github 下载 
[ShardingSphere](https://github.com/apache/shardingsphere.git) 源码。
+
+1.2 删除 kernel/global-clock/type/tso/core/pom.xml 中 
shardingsphere-global-clock-tso-provider-redis 依赖的 `<scope>provided</scope>` 
标签和 kernel/global-clock/type/tso/provider/redis/pom.xml 中 jedis
+   的 `<scope>provided</scope>` 标签
+
+1.3 编译 ShardingSphere-Proxy,具体编译步骤请参考 [ShardingSphere 
编译手册](https://github.com/apache/shardingsphere/wiki#build-apache-shardingsphere)。
+
+#### 2. 直接引入 GLT 依赖
+
+可以从 maven 仓库中引入
+
+2.1. 
[shardingsphere-global-clock-tso-provider-redis](https://repo1.maven.org/maven2/org/apache/shardingsphere/shardingsphere-global-clock-tso-provider-redis),下载和
 ShardingSphere-Proxy 同名版本
+
+2.2. 
[jedis-4.3.1](https://repo1.maven.org/maven2/redis/clients/jedis/4.3.1/jedis-4.3.1.jar)
+
+### CDC Server 使用手册
+
+1. 修改配置文件 `conf/server.yaml`,打开 CDC 功能。 目前 `mode` 必须是 
`Cluster`,需要提前启动对应的注册中心。如果 GLT provider 使用 Redis,需要提前启动 Redis。
+
+配置示例:
+
+1. 在 `server.yaml` 中开启 CDC 功能。
+
+```yaml
+mode:
+  type: Cluster
+  repository:
+    type: ZooKeeper
+    props:
+      namespace: cdc_demo
+      server-lists: localhost:2181
+      retryIntervalMilliseconds: 500
+      timeToLiveSeconds: 60
+      maxRetries: 3
+      operationTimeoutMilliseconds: 500
+
+authority:
+  users:
+    - user: root@%
+      password: root
+  privilege:
+    type: ALL_PERMITTED
+
+# 使用 GLT 的时候也需要开启分布式事务
+#transaction:
+#  defaultType: XA
+#  providerType: Atomikos
+#
+#globalClock:
+#  enabled: true
+#  type: TSO
+#  provider: redis
+#  props:
+#    host: 127.0.0.1
+#    port: 6379
+
+props:
+  system-log-level: INFO
+  check-table-metadata-enabled: false
+  proxy-default-port: 3307 # Proxy default port.
+  cdc-server-port: 33071 # CDC Server 端口,必须配置
+  #proxy-frontend-database-protocol-type: openGauss # 和后端数据库的类型一致
+```
+
+2. 引入 JDBC 驱动。
+
+proxy 已包含 PostgreSQL JDBC 驱动。
+
+如果后端连接以下数据库,请下载相应 JDBC 驱动 jar 包,并将其放入 `${shardingsphere-proxy}/ext-lib` 目录。
+
+| 数据库       | JDBC 驱动                                                          
                                                               |
+|-----------|---------------------------------------------------------------------------------------------------------------------------------|
+| MySQL     | 
[mysql-connector-java-8.0.31.jar](https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.31/)
                            |
+| openGauss | 
[opengauss-jdbc-3.1.1-og.jar](https://repo1.maven.org/maven2/org/opengauss/opengauss-jdbc/3.1.1-og/opengauss-jdbc-3.1.1-og.jar)
 |
+
+4. 启动 ShardingSphere-Proxy:
+
+```
+sh bin/start.sh
+```
+
+5. 查看 proxy 日志 `logs/stdout.log`,看到日志中出现:
+
+```
+[INFO ] [main] o.a.s.p.frontend.ShardingSphereProxy - ShardingSphere-Proxy 
Cluster mode started successfully
+```
+
+确认启动成功。
+
+6. 按需配置 CDC 任务同步配置
+
+6.1. 查询配置。
+
+```sql
+SHOW STREAMING RULE;
+```
+
+默认配置如下:
+
+```
++--------------------------------------------------------------+--------------------------------------+-------------------------------------------------------+
+| read                                                         | write         
                       | stream_channel                                        |
++--------------------------------------------------------------+--------------------------------------+-------------------------------------------------------+
+| {"workerThread":20,"batchSize":1000,"shardingSize":10000000} | 
{"workerThread":20,"batchSize":1000} | 
{"type":"MEMORY","props":{"block-queue-size":"2000"}} |
++--------------------------------------------------------------+--------------------------------------+-------------------------------------------------------+
+```
+
+6.2. 修改配置(可选)。
+
+因 streaming rule 具有默认值,无需创建,仅提供 ALTER 语句。
+
+完整配置 DistSQL 示例:
+
+```sql
+ALTER STREAMING RULE (
+READ(
+  WORKER_THREAD=20,
+  BATCH_SIZE=1000,
+  SHARDING_SIZE=10000000,
+  RATE_LIMITER (TYPE(NAME='QPS',PROPERTIES('qps'='500')))
+),
+WRITE(
+  WORKER_THREAD=20,
+  BATCH_SIZE=1000,
+  RATE_LIMITER (TYPE(NAME='TPS',PROPERTIES('tps'='2000')))
+),
+STREAM_CHANNEL (TYPE(NAME='MEMORY',PROPERTIES('block-queue-size'='2000')))
+);
+```
+
+配置项说明:
+
+```sql
+ALTER STREAMING RULE (
+READ( -- 数据读取配置。如果不配置则部分参数默认生效。
+  WORKER_THREAD=20, -- 影响全量、增量任务,从源端摄取数据的线程池大小。不配置则使用默认值。需要确保该值不低于分库的数量
+  BATCH_SIZE=1000, -- 影响全量、增量任务,一次查询操作返回的最大记录数。如果一个事务中的数据量大于该值,增量情况下可能超过设定的值。
+  SHARDING_SIZE=10000000, -- 影响全量任务,存量数据分片大小。如果不配置则使用默认值。
+  RATE_LIMITER ( -- 影响全量、增量任务,限流算法。如果不配置则不限流。
+  TYPE( -- 算法类型。可选项:QPS
+  NAME='QPS',
+  PROPERTIES( -- 算法属性
+  'qps'='500'
+  )))
+),
+WRITE( -- 数据写入配置。如果不配置则部分参数默认生效。
+  WORKER_THREAD=20, -- 影响全量、增量任务,数据写入到目标端的线程池大小。如果不配置则使用默认值。
+  BATCH_SIZE=1000, -- 
影响全量、增量任务,存量任务一次批量写入操作的最大记录数。如果不配置则使用默认值。如果一个事务中的数据量大于该值,增量情况下可能超过设定的值。
+  RATE_LIMITER ( -- 限流算法。如果不配置则不限流。
+  TYPE( -- 算法类型。可选项:TPS
+  NAME='TPS',
+  PROPERTIES( -- 算法属性
+  'tps'='2000'
+  )))
+),
+STREAM_CHANNEL ( -- 数据通道,连接生产者和消费者,用于 read 和 write 环节。如果不配置则默认使用 MEMORY 类型。
+TYPE( -- 算法类型。可选项:MEMORY
+NAME='MEMORY',
+PROPERTIES( -- 算法属性
+'block-queue-size'='2000' -- 属性:阻塞队列大小
+)))
+);
+```
+
+## CDC Client 手册
+
+CDC Client 不需要额外部署,只需要通过 maven 引入 CDC Client 的依赖就可以在项目中使用。用户可以通过 CDC Client 
和服务端进行交互。
+
+如果有需要,用户也可以自行实现一个 CDC Client,进行数据的消费和 ACK。
+
+```xml
+<dependency>
+    <groupId>org.apache.shardingsphere</groupId>
+    <artifactId>shardingsphere-data-pipeline-cdc-client</artifactId>
+    <version>${version}</version>
+</dependency>
+```
+
+### CDC Client 介绍
+
+`org.apache.shardingsphere.data.pipeline.cdc.client.CDCClient` 是 CDC Client 
的入口类,用户可以通过该类和 CDC Server 进行交互。主要的和新方法如下。
+
+| 方法名                                                                          
                                               | 返回值                            
    | 说明                                                                        
                              |
+|-----------------------------------------------------------------------------------------------------------------------------|------------------------------------|---------------------------------------------------------------------------------------------------------|
+| connect(Consumer<List<Record>> dataConsumer, ExceptionHandler 
exceptionHandler, ServerErrorResultHandler errorResultHandler | void            
                   | 和服务端进行连接,连接的时候需要指定 <br/>1. 数据的消费处理逻辑 <br/>2. 消费时候的异常处理逻辑 
<br/>3. 服务端错误的异常处理逻辑                           |
+| login(CDCLoginParameter parameter)                                           
                                               | void                           
    | CDC登陆,参数 <br/>username:用户名 <br/>password:密码                               
                              |
+| startStreaming(StartStreamingParameter parameter)                            
                                               | String (CDC 任务唯一标识,用于后续操作) | 
开启 CDC 订阅, StartStreamingParameter 参数 <br/> database:逻辑库名称 <br/> 
schemaTables:订阅的表名 <br/> full:是否订阅全量数据 |

Review Comment:
   `,` could be `,`



##########
docs/document/content/user-manual/shardingsphere-proxy/cdc/build.cn.md:
##########
@@ -0,0 +1,224 @@
++++
+title = "运行部署"
+weight = 1
++++
+
+## 背景信息
+
+ShardingSphere CDC 分为两个部分,一个是 CDC Server,另一个是 CDC Client。 CDC Server 和 
ShardingSphere-Proxy 目前是一同部署的。
+
+用户可以在自己的项目中引入 CDC Client,实现数据的消费逻辑。
+
+## 约束条件
+
+- 纯 JAVA 开发,JDK 建议 1.8 或以上版本。
+- CDC Server 要求 SharingSphere-Proxy 使用集群模式,目前支持 ZooKeeper 作为注册中心。
+- CDC 只同步数据,不会同步表结构,目前也不支持 DDL 的语句同步。
+- CDC 增量阶段会按照事务的维度输出数据, 如果要开启 XA 事务的兼容,则 openGauss 和 ShardingSphere-Proxy 都需要 
GLT 模块
+
+## CDC Server 部署步骤
+
+这里以 openGauss 数据库为例,介绍 CDC Server 的部署步骤。
+
+由于 CDC Server 内置于 ShardingSphere-Proxy,所以需要获取 ShardingSphere-Proxy。详情请参见 
[proxy 启动手册](/cn/user-manual/shardingsphere-proxy/startup/bin/)。
+
+### 配置 GLT 模块(可选)
+
+官网发布的二进制包默认不包含 GLT 模块,不保证跨库事务完整性,如果使用的是包含 GLT 功能的 openGauss 数据库,则可以额外引入 GLT 
模块,保证跨库事务的完整性。
+
+目前有两种方式引入 GLT 模块,并且需要在 server.yaml 中也进行相应的配置。
+
+#### 1. 源码编译安装
+
+1.1 准备代码环境,提前下载或者使用 Git clone,从 Github 下载 
[ShardingSphere](https://github.com/apache/shardingsphere.git) 源码。
+
+1.2 删除 kernel/global-clock/type/tso/core/pom.xml 中 
shardingsphere-global-clock-tso-provider-redis 依赖的 `<scope>provided</scope>` 
标签和 kernel/global-clock/type/tso/provider/redis/pom.xml 中 jedis
+   的 `<scope>provided</scope>` 标签
+
+1.3 编译 ShardingSphere-Proxy,具体编译步骤请参考 [ShardingSphere 
编译手册](https://github.com/apache/shardingsphere/wiki#build-apache-shardingsphere)。
+
+#### 2. 直接引入 GLT 依赖
+
+可以从 maven 仓库中引入
+
+2.1. 
[shardingsphere-global-clock-tso-provider-redis](https://repo1.maven.org/maven2/org/apache/shardingsphere/shardingsphere-global-clock-tso-provider-redis),下载和
 ShardingSphere-Proxy 同名版本
+
+2.2. 
[jedis-4.3.1](https://repo1.maven.org/maven2/redis/clients/jedis/4.3.1/jedis-4.3.1.jar)
+
+### CDC Server 使用手册
+
+1. 修改配置文件 `conf/server.yaml`,打开 CDC 功能。 目前 `mode` 必须是 
`Cluster`,需要提前启动对应的注册中心。如果 GLT provider 使用 Redis,需要提前启动 Redis。
+
+配置示例:
+
+1. 在 `server.yaml` 中开启 CDC 功能。
+
+```yaml
+mode:
+  type: Cluster
+  repository:
+    type: ZooKeeper
+    props:
+      namespace: cdc_demo
+      server-lists: localhost:2181
+      retryIntervalMilliseconds: 500
+      timeToLiveSeconds: 60
+      maxRetries: 3
+      operationTimeoutMilliseconds: 500
+
+authority:
+  users:
+    - user: root@%
+      password: root
+  privilege:
+    type: ALL_PERMITTED
+
+# 使用 GLT 的时候也需要开启分布式事务
+#transaction:
+#  defaultType: XA
+#  providerType: Atomikos
+#
+#globalClock:
+#  enabled: true
+#  type: TSO
+#  provider: redis
+#  props:
+#    host: 127.0.0.1
+#    port: 6379
+
+props:
+  system-log-level: INFO
+  check-table-metadata-enabled: false
+  proxy-default-port: 3307 # Proxy default port.
+  cdc-server-port: 33071 # CDC Server 端口,必须配置
+  #proxy-frontend-database-protocol-type: openGauss # 和后端数据库的类型一致
+```
+
+2. 引入 JDBC 驱动。
+
+proxy 已包含 PostgreSQL JDBC 驱动。
+
+如果后端连接以下数据库,请下载相应 JDBC 驱动 jar 包,并将其放入 `${shardingsphere-proxy}/ext-lib` 目录。
+
+| 数据库       | JDBC 驱动                                                          
                                                               |
+|-----------|---------------------------------------------------------------------------------------------------------------------------------|
+| MySQL     | 
[mysql-connector-java-8.0.31.jar](https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.31/)
                            |
+| openGauss | 
[opengauss-jdbc-3.1.1-og.jar](https://repo1.maven.org/maven2/org/opengauss/opengauss-jdbc/3.1.1-og/opengauss-jdbc-3.1.1-og.jar)
 |
+
+4. 启动 ShardingSphere-Proxy:
+
+```
+sh bin/start.sh
+```
+
+5. 查看 proxy 日志 `logs/stdout.log`,看到日志中出现:
+
+```
+[INFO ] [main] o.a.s.p.frontend.ShardingSphereProxy - ShardingSphere-Proxy 
Cluster mode started successfully
+```
+
+确认启动成功。
+
+6. 按需配置 CDC 任务同步配置
+
+6.1. 查询配置。
+
+```sql
+SHOW STREAMING RULE;
+```
+
+默认配置如下:
+
+```
++--------------------------------------------------------------+--------------------------------------+-------------------------------------------------------+
+| read                                                         | write         
                       | stream_channel                                        |
++--------------------------------------------------------------+--------------------------------------+-------------------------------------------------------+
+| {"workerThread":20,"batchSize":1000,"shardingSize":10000000} | 
{"workerThread":20,"batchSize":1000} | 
{"type":"MEMORY","props":{"block-queue-size":"2000"}} |
++--------------------------------------------------------------+--------------------------------------+-------------------------------------------------------+
+```
+
+6.2. 修改配置(可选)。
+
+因 streaming rule 具有默认值,无需创建,仅提供 ALTER 语句。
+
+完整配置 DistSQL 示例:
+
+```sql
+ALTER STREAMING RULE (
+READ(
+  WORKER_THREAD=20,
+  BATCH_SIZE=1000,
+  SHARDING_SIZE=10000000,
+  RATE_LIMITER (TYPE(NAME='QPS',PROPERTIES('qps'='500')))
+),
+WRITE(
+  WORKER_THREAD=20,
+  BATCH_SIZE=1000,
+  RATE_LIMITER (TYPE(NAME='TPS',PROPERTIES('tps'='2000')))
+),
+STREAM_CHANNEL (TYPE(NAME='MEMORY',PROPERTIES('block-queue-size'='2000')))
+);
+```
+
+配置项说明:
+
+```sql
+ALTER STREAMING RULE (
+READ( -- 数据读取配置。如果不配置则部分参数默认生效。
+  WORKER_THREAD=20, -- 影响全量、增量任务,从源端摄取数据的线程池大小。不配置则使用默认值。需要确保该值不低于分库的数量
+  BATCH_SIZE=1000, -- 影响全量、增量任务,一次查询操作返回的最大记录数。如果一个事务中的数据量大于该值,增量情况下可能超过设定的值。
+  SHARDING_SIZE=10000000, -- 影响全量任务,存量数据分片大小。如果不配置则使用默认值。
+  RATE_LIMITER ( -- 影响全量、增量任务,限流算法。如果不配置则不限流。
+  TYPE( -- 算法类型。可选项:QPS
+  NAME='QPS',
+  PROPERTIES( -- 算法属性
+  'qps'='500'
+  )))
+),
+WRITE( -- 数据写入配置。如果不配置则部分参数默认生效。
+  WORKER_THREAD=20, -- 影响全量、增量任务,数据写入到目标端的线程池大小。如果不配置则使用默认值。
+  BATCH_SIZE=1000, -- 
影响全量、增量任务,存量任务一次批量写入操作的最大记录数。如果不配置则使用默认值。如果一个事务中的数据量大于该值,增量情况下可能超过设定的值。
+  RATE_LIMITER ( -- 限流算法。如果不配置则不限流。
+  TYPE( -- 算法类型。可选项:TPS
+  NAME='TPS',
+  PROPERTIES( -- 算法属性
+  'tps'='2000'
+  )))
+),
+STREAM_CHANNEL ( -- 数据通道,连接生产者和消费者,用于 read 和 write 环节。如果不配置则默认使用 MEMORY 类型。
+TYPE( -- 算法类型。可选项:MEMORY
+NAME='MEMORY',
+PROPERTIES( -- 算法属性
+'block-queue-size'='2000' -- 属性:阻塞队列大小
+)))
+);
+```
+
+## CDC Client 手册
+
+CDC Client 不需要额外部署,只需要通过 maven 引入 CDC Client 的依赖就可以在项目中使用。用户可以通过 CDC Client 
和服务端进行交互。
+
+如果有需要,用户也可以自行实现一个 CDC Client,进行数据的消费和 ACK。
+
+```xml
+<dependency>
+    <groupId>org.apache.shardingsphere</groupId>
+    <artifactId>shardingsphere-data-pipeline-cdc-client</artifactId>
+    <version>${version}</version>
+</dependency>
+```
+
+### CDC Client 介绍
+
+`org.apache.shardingsphere.data.pipeline.cdc.client.CDCClient` 是 CDC Client 
的入口类,用户可以通过该类和 CDC Server 进行交互。主要的和新方法如下。
+
+| 方法名                                                                          
                                               | 返回值                            
    | 说明                                                                        
                              |
+|-----------------------------------------------------------------------------------------------------------------------------|------------------------------------|---------------------------------------------------------------------------------------------------------|
+| connect(Consumer<List<Record>> dataConsumer, ExceptionHandler 
exceptionHandler, ServerErrorResultHandler errorResultHandler | void            
                   | 和服务端进行连接,连接的时候需要指定 <br/>1. 数据的消费处理逻辑 <br/>2. 消费时候的异常处理逻辑 
<br/>3. 服务端错误的异常处理逻辑                           |
+| login(CDCLoginParameter parameter)                                           
                                               | void                           
    | CDC登陆,参数 <br/>username:用户名 <br/>password:密码                               
                              |
+| startStreaming(StartStreamingParameter parameter)                            
                                               | String (CDC 任务唯一标识,用于后续操作) | 
开启 CDC 订阅, StartStreamingParameter 参数 <br/> database:逻辑库名称 <br/> 
schemaTables:订阅的表名 <br/> full:是否订阅全量数据 |
+| restartStreaming(String streamingId)                                         
                                               | void                           
    | 重启订阅                                                                      
                              |
+| stopStreaming(String streamingId)                                            
                                               | void                           
    | 停止订阅                                                                      
                              |
+| dropStreaming(String streamingId)                                            
                                               | void                           
    | 删除订阅                                                                      
                              |
+| await()                                                                      
                                               | void                           
    | 阻塞 CDC 线程,等待 channel 关闭                                                   
                              |
+| close()                                                                      
                                               | void                           
    | 关闭 channel,流程结束。                                                          
                              |

Review Comment:
   The full stop should be consistent in lines



##########
docs/document/content/user-manual/shardingsphere-proxy/cdc/usage.cn.md:
##########
@@ -0,0 +1,269 @@
++++
+title = "使用手册"
+weight = 2
++++
+
+## CDC 功能介绍
+
+CDC 只会同步数据,不会同步表结构,目前也不支持 DDL 的语句同步。
+
+### CDC 协议介绍
+
+CDC 协议使用 Protobuf,对应的 Protobuf 类型是根据 Java 中的类型来映射的。
+
+这里以 openGauss 为例,CDC 协议的数据类型和数据库类型的映射关系如下
+
+| openGauss 类型                             | Java 数据类型          | CDC 对应的 
protobuf 类型 | 备注                                     |
+|------------------------------------------|--------------------|---------------------|----------------------------------------|
+| tinyint、smallint、integer                 | Integer            | int32        
       |                                        |
+| bigint                                   | Long               | int64        
       |                                        |
+| numeric                                  | BigDecimal         | string       
       |                                        |
+| real、float4                              | Float              | float        
       |                                        |
+| binary_double、double precision           | Double             | double       
       |                                        |
+| boolean                                  | Boolean            | bool         
       |                                        |
+| char、varchar、text、clob                   | String             | string       
       |                                        |
+| blob、bytea、raw                           | byte[]             | bytes        
       |                                        |
+| date、timestamp,timestamptz、smalldatetime | java.sql.Timestamp | Timestamp    
       | protobuf 的 Timestamp 类型只包含秒和纳秒,所以和时区无关 |
+| time、timetz                              | java.sql.Time      | int64        
       | 代表当天的纳秒数,和时区无关                         |
+| interval、reltime、abstime                 | String             | string       
       |                                        |
+| point、lseg、box、path、polygon、circle       | String             | string       
       |                                        |
+| cidr、inet、macaddr                        | String             | string       
       |                                        |
+| tsvector                                 | String             | string       
       |                                        |
+| tsquery                                  | String             | String       
       |                                        |
+| uuid                                     | String             | string       
       |                                        |
+| json、jsonb                               | String             | string       
       |                                        |
+| hll                                      | String             | string       
       |                                        |
+| int4range、daterange、tsrange、tstzrange    | String             | string       
       |                                        |
+| hash16、hash32                            | String             | string       
       |                                        |
+| bit、bit varying                          | String             | string       
       | bit(1) 的时候返回 Boolean 类型                |
+
+## openGauss 使用手册
+
+### 环境要求
+
+支持的 openGauss 版本:2.x ~ 3.x。
+
+### 权限要求
+
+1. 调整源端 WAL 配置。
+
+`postgresql.conf` 示例配置:
+```
+wal_level = logical
+max_wal_senders = 10
+max_replication_slots = 10
+wal_sender_timeout = 0
+max_connections = 600
+```
+
+详情请参见 [Write Ahead 
Log](https://docs.opengauss.org/en/docs/2.0.1/docs/Developerguide/settings.html)
 和 
[Replication](https://docs.opengauss.org/en/docs/2.0.1/docs/Developerguide/sending-server.html)。
+
+2. 赋予源端 openGauss 账号 replication 权限。
+
+`pg_hba.conf` 示例配置:
+
+```
+host replication repl_acct 0.0.0.0/0 md5
+# 0.0.0.0/0 表示允许任意 IP 地址访问,可以根据实际情况调整成 CDC Server 的 IP 地址
+```
+
+详情请参见 [Configuring Client Access 
Authentication](https://docs.opengauss.org/en/docs/2.0.1/docs/Developerguide/configuring-client-access-authentication.html)
 和 [Example: Logic Replication 
Code](https://docs.opengauss.org/en/docs/2.0.1/docs/Developerguide/example-logic-replication-code.html)。
+
+3. 赋予 openGauss 账号 DDL DML 权限。
+
+如果使用非超级管理员账号,要求该账号在用到的数据库上,具备 CREATE 和 CONNECT 的权限。
+
+示例:
+```sql
+GRANT CREATE, CONNECT ON DATABASE source_ds TO cdc_user;
+```
+
+还需要账号对迁移的表和 schema 具备访问权限,以 test schema 下的 t_order 表为例。
+
+```sql
+\c source_ds
+
+GRANT USAGE ON SCHEMA test TO GROUP cdc_user;
+GRANT SELECT ON TABLE test.t_order TO cdc_user;
+```
+
+openGauss 有 OWNER 的概念,如果是数据库,SCHEMA,表的 OWNER,则可以省略对应的授权步骤。
+
+openGauss 不允许普通账户在 public schema 下操作。所以如果迁移的表在 public schema 下,需要额外授权。
+
+```sql
+GRANT ALL PRIVILEGES TO cdc_user;
+```
+
+详情请参见 [openGauss 
GRANT](https://docs.opengauss.org/zh/docs/2.0.1/docs/Developerguide/GRANT.html)
+
+### 完整流程示例
+
+#### 前提条件
+
+1. 准备好 CDC 源端的库、表、数据。
+
+```sql
+DROP DATABASE IF EXISTS ds_0;
+CREATE DATABASE ds_0;
+
+DROP DATABASE IF EXISTS ds_1;
+CREATE DATABASE ds_1;
+```
+
+#### 配置 CDC Server
+
+1. 创建逻辑库。
+
+```sql
+CREATE DATABASE sharding_db;
+
+\c sharding_db
+```
+2. 注册存储单元。
+
+```sql
+REGISTER STORAGE UNIT ds_0 (
+    URL="jdbc:opengauss://127.0.0.1:5432/ds_0",
+    USER="gaussdb",
+    PASSWORD="Root@123",
+    PROPERTIES("minPoolSize"="1","maxPoolSize"="20","idleTimeout"="60000")
+), ds_1 (
+    URL="jdbc:opengauss://127.0.0.1:5432/ds_1",
+    USER="gaussdb",
+    PASSWORD="Root@123",
+    PROPERTIES("minPoolSize"="1","maxPoolSize"="20","idleTimeout"="60000")
+);
+```
+
+3. 创建分片规则。
+
+```sql
+CREATE SHARDING TABLE RULE t_order(
+STORAGE_UNITS(ds_0,ds_1),
+SHARDING_COLUMN=order_id,
+TYPE(NAME="hash_mod",PROPERTIES("sharding-count"="2")),
+KEY_GENERATE_STRATEGY(COLUMN=order_id,TYPE(NAME="snowflake"))
+);
+```
+
+4. 创建表和初始化数据
+
+在 proxy 执行建表语句。
+
+```sql
+CREATE TABLE t_order (id INT NOT NULL, user_id INT NOT NULL, status 
VARCHAR(45) NULL, PRIMARY KEY (id));
+
+INSERT INTO t_order (id, user_id, status) VALUES 
(1,1,'ok1'),(2,2,'ok2'),(3,3,'ok3');
+```
+
+#### 启动 CDC Client
+
+目前 CDC Client 只提供了 Java API,用户需要自行实现数据的消费逻辑。
+
+下面是一个简单的启动 CDC Client 的示例。
+
+```java
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.cdc.client.CDCClient;
+import 
org.apache.shardingsphere.data.pipeline.cdc.client.config.CDCClientConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.cdc.client.handler.RetryStreamingExceptionHandler;
+import 
org.apache.shardingsphere.data.pipeline.cdc.client.parameter.CDCLoginParameter;
+import 
org.apache.shardingsphere.data.pipeline.cdc.client.parameter.StartStreamingParameter;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody.SchemaTable;
+
+import java.util.Collections;
+
+@Slf4j
+public final class Bootstrap {
+    
+    @SneakyThrows(InterruptedException.class)
+    public static void main(final String[] args) {
+        String address = "127.0.0.1";
+        // 构造 CDCClient,传入 CDCClientConfiguration,CDCClientConfiguration 中包含了 
CDC Server 的地址和端口,以及超时时间
+        try (CDCClient cdcClient = new CDCClient(new 
CDCClientConfiguration(address, 33071, 10000))) {
+            // 先调用 connect 连接到 CDC Server,需要传入 1. 数据的消费处理逻辑 2. 消费时候的异常处理逻辑 3. 
服务端错误的异常处理逻辑
+            cdcClient.connect(records -> log.info("records: {}", records), new 
RetryStreamingExceptionHandler(cdcClient, 5, 5000),
+                    (ctx, result) -> log.error("Server error: {}", 
result.getErrorMessage()));
+            cdcClient.login(new CDCLoginParameter("root", "root"));
+            // 开始 CDC 数据同步,返回的 streamingId 是这次 CDC 任务的唯一标识,CDC Server 
生成唯一标识的依据是 订阅的数据库名称 + 订阅的表 + 是否是全量同步
+            String streamingId = cdcClient.startStreaming(new 
StartStreamingParameter("sharding_db", 
Collections.singleton(SchemaTable.newBuilder().setTable("t_order").build()), 
true));
+            log.info("Streaming id={}", streamingId);
+            // 防止 main 主线程退出
+            cdcClient.await();
+        }
+    }
+}
+```
+
+主要有4个步骤
+1. 构造 CDCClient,传入 CDCClientConfiguration
+2. 调用 CDCClient.connect,这一步是和 CDC Server 建立连接
+3. 调用 CDCClient.login,使用 server.yaml 中配置好的用户名和密码登录
+4. 调用 CDCClient.startStreaming,开启订阅,需要保证订阅的库和表在 ShardingSphere-Proxy 存在,否则会报错。
+
+> CDCClient.await 是阻塞主线程,非必需的步骤,用其他方式也可以,只要保证 CDC 线程一直在工作就行。
+
+如果需要更复杂数据消费的实现,例如写入到数据库,可以参考 
[DataSourceRecordConsumer](https://github.com/apache/shardingsphere/blob/master/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/DataSourceRecordConsumer.java)
+
+#### 查看 CDC 任务运行情况
+
+CDC 任务的启动和停止目前只能通过 CDC Client 控制,可以通过在 proxy 中执行 DistSQL 查看 CDC 任务状态
+
+1. 查看 CDC 任务列表
+
+SHOW STREAMING LIST;
+
+运行结果
+
+```
+sharding_db=> SHOW STREAMING LIST;
+                     id                     |  database   | tables  | 
job_item_count | active |     create_time     | stop_time 
+--------------------------------------------+-------------+---------+----------------+--------+---------------------+-----------
+ j0302p0000702a83116fcee83f70419ca5e2993791 | sharding_db | t_order | 1        
      | true   | 2023-10-27 22:01:27 | 
+(1 row)
+```
+
+2. 查看 CDC 任务详情
+
+SHOW STREAMING STATUS j0302p0000702a83116fcee83f70419ca5e2993791;
+
+运行结果
+
+```
+sharding_db=> SHOW STREAMING STATUS j0302p0000702a83116fcee83f70419ca5e2993791;
+ item | data_source |          status          | active | 
processed_records_count | inventory_finished_percentage | 
incremental_idle_seconds | error_message
+------+-------------+--------------------------+--------+-------------------------+-------------------------------+--------------------------+---------------
+ 0    | ds_0        | EXECUTE_INCREMENTAL_TASK | true   | 1                    
   | 100                           | 101                      |
+ 1    | ds_1        | EXECUTE_INCREMENTAL_TASK | true   | 2                    
   | 100                           | 100                      |
+(2 rows)
+```
+
+3. 删除 CDC 任务
+
+DROP STREAMING j0302p0000702a83116fcee83f70419ca5e2993791;
+
+只有当 CDC 任务没有订阅的时候才可以删除,此时也会删除 openGauss 物理库上的 replication slots
+
+```
+sharding_db=> DROP STREAMING j0302p0000702a83116fcee83f70419ca5e2993791;
+SUCCESS
+```
+
+# 注意事项
+
+## 增量数据推送的说明
+
+1. CDC 增量推送目前是按照事务维度的,物理库的事务不会被拆分,所以如果一个事务中有多个表的数据变更,那么这些数据变更会被一起推送。
+如果要支持 XA 事务(目前只支持 openGauss),则 openGauss 和 Proxy 都需要 GLT 模块。
+2. 满足推送的条件是满足了一定大小的数据量或者到了一定的时间间隔(目前是 300ms),在处理 XA 事务时,收到的多个分库增量事件超过了 
300ms,可能会导致 XA 事务被拆开推送。
+
+## 超大事务的处理
+
+目前是将大事务完整解析,这样可能会导致 CDC Server 进程 OOM,后续可能会考虑强制截断。
+
+## 建议的配置
+
+1. 如果有限流的要求,读写推荐只配置一侧,另一侧会自动受到限流的影响。

Review Comment:
   Looks this suggestion is not valuable enough, new coming users could not 
understand it easily. Could we remove it?



##########
docs/document/content/user-manual/shardingsphere-proxy/cdc/build.cn.md:
##########
@@ -0,0 +1,224 @@
++++
+title = "运行部署"
+weight = 1
++++
+
+## 背景信息
+
+ShardingSphere CDC 分为两个部分,一个是 CDC Server,另一个是 CDC Client。 CDC Server 和 
ShardingSphere-Proxy 目前是一同部署的。
+
+用户可以在自己的项目中引入 CDC Client,实现数据的消费逻辑。
+
+## 约束条件
+
+- 纯 JAVA 开发,JDK 建议 1.8 或以上版本。
+- CDC Server 要求 SharingSphere-Proxy 使用集群模式,目前支持 ZooKeeper 作为注册中心。
+- CDC 只同步数据,不会同步表结构,目前也不支持 DDL 的语句同步。
+- CDC 增量阶段会按照事务的维度输出数据, 如果要开启 XA 事务的兼容,则 openGauss 和 ShardingSphere-Proxy 都需要 
GLT 模块
+
+## CDC Server 部署步骤
+
+这里以 openGauss 数据库为例,介绍 CDC Server 的部署步骤。
+
+由于 CDC Server 内置于 ShardingSphere-Proxy,所以需要获取 ShardingSphere-Proxy。详情请参见 
[proxy 启动手册](/cn/user-manual/shardingsphere-proxy/startup/bin/)。
+
+### 配置 GLT 模块(可选)
+
+官网发布的二进制包默认不包含 GLT 模块,不保证跨库事务完整性,如果使用的是包含 GLT 功能的 openGauss 数据库,则可以额外引入 GLT 
模块,保证跨库事务的完整性。
+
+目前有两种方式引入 GLT 模块,并且需要在 server.yaml 中也进行相应的配置。
+
+#### 1. 源码编译安装
+
+1.1 准备代码环境,提前下载或者使用 Git clone,从 Github 下载 
[ShardingSphere](https://github.com/apache/shardingsphere.git) 源码。
+
+1.2 删除 kernel/global-clock/type/tso/core/pom.xml 中 
shardingsphere-global-clock-tso-provider-redis 依赖的 `<scope>provided</scope>` 
标签和 kernel/global-clock/type/tso/provider/redis/pom.xml 中 jedis
+   的 `<scope>provided</scope>` 标签
+
+1.3 编译 ShardingSphere-Proxy,具体编译步骤请参考 [ShardingSphere 
编译手册](https://github.com/apache/shardingsphere/wiki#build-apache-shardingsphere)。
+
+#### 2. 直接引入 GLT 依赖
+
+可以从 maven 仓库中引入
+
+2.1. 
[shardingsphere-global-clock-tso-provider-redis](https://repo1.maven.org/maven2/org/apache/shardingsphere/shardingsphere-global-clock-tso-provider-redis),下载和
 ShardingSphere-Proxy 同名版本
+
+2.2. 
[jedis-4.3.1](https://repo1.maven.org/maven2/redis/clients/jedis/4.3.1/jedis-4.3.1.jar)
+
+### CDC Server 使用手册
+
+1. 修改配置文件 `conf/server.yaml`,打开 CDC 功能。 目前 `mode` 必须是 
`Cluster`,需要提前启动对应的注册中心。如果 GLT provider 使用 Redis,需要提前启动 Redis。
+
+配置示例:
+
+1. 在 `server.yaml` 中开启 CDC 功能。
+
+```yaml
+mode:
+  type: Cluster
+  repository:
+    type: ZooKeeper
+    props:
+      namespace: cdc_demo
+      server-lists: localhost:2181
+      retryIntervalMilliseconds: 500
+      timeToLiveSeconds: 60
+      maxRetries: 3
+      operationTimeoutMilliseconds: 500
+
+authority:
+  users:
+    - user: root@%
+      password: root
+  privilege:
+    type: ALL_PERMITTED
+
+# 使用 GLT 的时候也需要开启分布式事务
+#transaction:
+#  defaultType: XA
+#  providerType: Atomikos
+#
+#globalClock:
+#  enabled: true
+#  type: TSO
+#  provider: redis
+#  props:
+#    host: 127.0.0.1
+#    port: 6379
+
+props:
+  system-log-level: INFO
+  check-table-metadata-enabled: false
+  proxy-default-port: 3307 # Proxy default port.
+  cdc-server-port: 33071 # CDC Server 端口,必须配置
+  #proxy-frontend-database-protocol-type: openGauss # 和后端数据库的类型一致
+```
+
+2. 引入 JDBC 驱动。
+
+proxy 已包含 PostgreSQL JDBC 驱动。
+
+如果后端连接以下数据库,请下载相应 JDBC 驱动 jar 包,并将其放入 `${shardingsphere-proxy}/ext-lib` 目录。
+
+| 数据库       | JDBC 驱动                                                          
                                                               |
+|-----------|---------------------------------------------------------------------------------------------------------------------------------|
+| MySQL     | 
[mysql-connector-java-8.0.31.jar](https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.31/)
                            |
+| openGauss | 
[opengauss-jdbc-3.1.1-og.jar](https://repo1.maven.org/maven2/org/opengauss/opengauss-jdbc/3.1.1-og/opengauss-jdbc-3.1.1-og.jar)
 |
+
+4. 启动 ShardingSphere-Proxy:
+
+```
+sh bin/start.sh
+```
+
+5. 查看 proxy 日志 `logs/stdout.log`,看到日志中出现:
+
+```
+[INFO ] [main] o.a.s.p.frontend.ShardingSphereProxy - ShardingSphere-Proxy 
Cluster mode started successfully
+```
+
+确认启动成功。
+
+6. 按需配置 CDC 任务同步配置
+
+6.1. 查询配置。
+
+```sql
+SHOW STREAMING RULE;
+```
+
+默认配置如下:
+
+```
++--------------------------------------------------------------+--------------------------------------+-------------------------------------------------------+
+| read                                                         | write         
                       | stream_channel                                        |
++--------------------------------------------------------------+--------------------------------------+-------------------------------------------------------+
+| {"workerThread":20,"batchSize":1000,"shardingSize":10000000} | 
{"workerThread":20,"batchSize":1000} | 
{"type":"MEMORY","props":{"block-queue-size":"2000"}} |
++--------------------------------------------------------------+--------------------------------------+-------------------------------------------------------+
+```
+
+6.2. 修改配置(可选)。
+
+因 streaming rule 具有默认值,无需创建,仅提供 ALTER 语句。
+
+完整配置 DistSQL 示例:
+
+```sql
+ALTER STREAMING RULE (
+READ(
+  WORKER_THREAD=20,
+  BATCH_SIZE=1000,
+  SHARDING_SIZE=10000000,
+  RATE_LIMITER (TYPE(NAME='QPS',PROPERTIES('qps'='500')))
+),
+WRITE(
+  WORKER_THREAD=20,
+  BATCH_SIZE=1000,
+  RATE_LIMITER (TYPE(NAME='TPS',PROPERTIES('tps'='2000')))
+),
+STREAM_CHANNEL (TYPE(NAME='MEMORY',PROPERTIES('block-queue-size'='2000')))
+);
+```
+
+配置项说明:
+
+```sql
+ALTER STREAMING RULE (
+READ( -- 数据读取配置。如果不配置则部分参数默认生效。
+  WORKER_THREAD=20, -- 影响全量、增量任务,从源端摄取数据的线程池大小。不配置则使用默认值。需要确保该值不低于分库的数量
+  BATCH_SIZE=1000, -- 影响全量、增量任务,一次查询操作返回的最大记录数。如果一个事务中的数据量大于该值,增量情况下可能超过设定的值。
+  SHARDING_SIZE=10000000, -- 影响全量任务,存量数据分片大小。如果不配置则使用默认值。
+  RATE_LIMITER ( -- 影响全量、增量任务,限流算法。如果不配置则不限流。
+  TYPE( -- 算法类型。可选项:QPS
+  NAME='QPS',
+  PROPERTIES( -- 算法属性
+  'qps'='500'
+  )))
+),
+WRITE( -- 数据写入配置。如果不配置则部分参数默认生效。
+  WORKER_THREAD=20, -- 影响全量、增量任务,数据写入到目标端的线程池大小。如果不配置则使用默认值。
+  BATCH_SIZE=1000, -- 
影响全量、增量任务,存量任务一次批量写入操作的最大记录数。如果不配置则使用默认值。如果一个事务中的数据量大于该值,增量情况下可能超过设定的值。
+  RATE_LIMITER ( -- 限流算法。如果不配置则不限流。
+  TYPE( -- 算法类型。可选项:TPS
+  NAME='TPS',
+  PROPERTIES( -- 算法属性
+  'tps'='2000'
+  )))
+),
+STREAM_CHANNEL ( -- 数据通道,连接生产者和消费者,用于 read 和 write 环节。如果不配置则默认使用 MEMORY 类型。
+TYPE( -- 算法类型。可选项:MEMORY
+NAME='MEMORY',
+PROPERTIES( -- 算法属性
+'block-queue-size'='2000' -- 属性:阻塞队列大小
+)))
+);
+```
+
+## CDC Client 手册
+
+CDC Client 不需要额外部署,只需要通过 maven 引入 CDC Client 的依赖就可以在项目中使用。用户可以通过 CDC Client 
和服务端进行交互。
+
+如果有需要,用户也可以自行实现一个 CDC Client,进行数据的消费和 ACK。
+
+```xml
+<dependency>
+    <groupId>org.apache.shardingsphere</groupId>
+    <artifactId>shardingsphere-data-pipeline-cdc-client</artifactId>
+    <version>${version}</version>
+</dependency>
+```
+
+### CDC Client 介绍
+
+`org.apache.shardingsphere.data.pipeline.cdc.client.CDCClient` 是 CDC Client 
的入口类,用户可以通过该类和 CDC Server 进行交互。主要的和新方法如下。
+
+| 方法名                                                                          
                                               | 返回值                            
    | 说明                                                                        
                              |
+|-----------------------------------------------------------------------------------------------------------------------------|------------------------------------|---------------------------------------------------------------------------------------------------------|
+| connect(Consumer<List<Record>> dataConsumer, ExceptionHandler 
exceptionHandler, ServerErrorResultHandler errorResultHandler | void            
                   | 和服务端进行连接,连接的时候需要指定 <br/>1. 数据的消费处理逻辑 <br/>2. 消费时候的异常处理逻辑 
<br/>3. 服务端错误的异常处理逻辑                           |
+| login(CDCLoginParameter parameter)                                           
                                               | void                           
    | CDC登陆,参数 <br/>username:用户名 <br/>password:密码                               
                              |
+| startStreaming(StartStreamingParameter parameter)                            
                                               | String (CDC 任务唯一标识,用于后续操作) | 
开启 CDC 订阅, StartStreamingParameter 参数 <br/> database:逻辑库名称 <br/> 
schemaTables:订阅的表名 <br/> full:是否订阅全量数据 |

Review Comment:
   `startStreaming` return value should be `streamingId`, the same as other 
methods' parameter



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to