This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 7e1b24ce118 Improve MppdbDecodingPlugin, compatible with more types 
(#28985)
7e1b24ce118 is described below

commit 7e1b24ce1188b8cb5f7a3e30ab05df8e31e50f62
Author: Xinze Guo <[email protected]>
AuthorDate: Thu Nov 9 17:35:19 2023 +0800

    Improve MppdbDecodingPlugin, compatible with more types (#28985)
    
    * Improve MppdbDecodingPlugin,compatible with more types
    
    * Add unit test
    
    * Improve E2E
    
    * Adjust timeout of openGauss
    
    * Fix columns
    
    * Revert incr timeout
    
    * Add special byte value
---
 .../ingest/wal/decode/MppdbDecodingPlugin.java     | 20 ++++-----
 .../ingest/wal/decode/MppdbDecodingPluginTest.java | 50 ++++++++++++++++++++--
 .../handler/RetryStreamingExceptionHandler.java    |  6 +--
 .../pipeline/cases/PipelineContainerComposer.java  |  5 ++-
 .../cases/migration/AbstractMigrationE2EIT.java    |  4 +-
 .../general/PostgreSQLMigrationGeneralE2EIT.java   |  2 +-
 .../pipeline/cases/task/E2EIncrementalTask.java    | 15 ++++---
 .../framework/helper/PipelineCaseHelper.java       | 12 +++---
 .../resources/env/scenario/general/opengauss.xml   | 13 ++++--
 9 files changed, 89 insertions(+), 38 deletions(-)

diff --git 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPlugin.java
 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPlugin.java
index 19fca8ed5a9..be187823a36 100644
--- 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPlugin.java
+++ 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPlugin.java
@@ -193,9 +193,16 @@ public final class MppdbDecodingPlugin implements 
DecodingPlugin {
                     throw new DecodingException(ex);
                 }
             case "bytea":
+            case "blob":
                 return decodeBytea(data);
             case "raw":
             case "reltime":
+            case "int4range":
+            case "int8range":
+            case "numrange":
+            case "tsrange":
+            case "tstzrange":
+            case "daterange":
                 return decodePgObject(data, columnType);
             case "money":
                 return decodeMoney(data);
@@ -205,6 +212,7 @@ public final class MppdbDecodingPlugin implements 
DecodingPlugin {
             case "text":
             case "character":
             case "nvarchar2":
+            case "tsquery":
             default:
                 return decodeString(data).replace("''", "'");
         }
@@ -229,16 +237,8 @@ public final class MppdbDecodingPlugin implements 
DecodingPlugin {
         }
     }
     
-    private PGobject decodeBytea(final String data) {
-        try {
-            PGobject result = new PGobject();
-            result.setType("bytea");
-            byte[] decodeByte = decodeHex(decodeString(data).substring(2));
-            result.setValue(new String(decodeByte));
-            return result;
-        } catch (final SQLException ignored) {
-            return null;
-        }
+    private Object decodeBytea(final String data) {
+        return decodeHex(decodeString(data).substring(2));
     }
     
     private String decodeMoney(final String data) {
diff --git 
a/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPluginTest.java
 
b/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPluginTest.java
index 3739a649d70..5b446bcfc7e 100644
--- 
a/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPluginTest.java
+++ 
b/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPluginTest.java
@@ -125,7 +125,7 @@ class MppdbDecodingPluginTest {
         assertThat(actual.getLogSequenceNumber(), is(logSequenceNumber));
         assertThat(actual.getTableName(), is("test"));
         Object byteaObj = actual.getAfterRow().get(0);
-        assertThat(byteaObj.toString(), is("1.08"));
+        assertThat(byteaObj, is("1.08"));
     }
     
     @Test
@@ -183,8 +183,8 @@ class MppdbDecodingPluginTest {
         assertThat(actual.getLogSequenceNumber(), is(logSequenceNumber));
         assertThat(actual.getTableName(), is("test"));
         Object byteaObj = actual.getAfterRow().get(0);
-        assertThat(byteaObj, instanceOf(PGobject.class));
-        assertThat(byteaObj.toString(), is(new String(new byte[]{(byte) 0xff, 
(byte) 0, (byte) 0xab})));
+        assertThat(byteaObj, instanceOf(byte[].class));
+        assertThat(byteaObj, is(new byte[]{(byte) 0xff, (byte) 0, (byte) 
0xab}));
     }
     
     @Test
@@ -260,4 +260,48 @@ class MppdbDecodingPluginTest {
         assertThat(((CommitTXEvent) actualLastEvent).getCsn(), is(3468L));
         assertThat(((CommitTXEvent) actualLastEvent).getXid(), is(1L));
     }
+    
+    @Test
+    void assertDecodeWithTsrange() {
+        MppTableData tableData = new MppTableData();
+        tableData.setTableName("public.test");
+        tableData.setOpType("INSERT");
+        tableData.setColumnsName(new String[]{"data"});
+        tableData.setColumnsType(new String[]{"tsrange"});
+        tableData.setColumnsVal(new String[]{"'[\"2020-01-01 
00:00:00\",\"2021-01-01 00:00:00\")'"});
+        ByteBuffer data = ByteBuffer.wrap(toJSON(tableData).getBytes());
+        WriteRowEvent actual = (WriteRowEvent) new 
MppdbDecodingPlugin(null).decode(data, logSequenceNumber);
+        Object byteaObj = actual.getAfterRow().get(0);
+        assertThat(byteaObj, instanceOf(PGobject.class));
+        assertThat(byteaObj.toString(), is("[\"2020-01-01 
00:00:00\",\"2021-01-01 00:00:00\")"));
+    }
+    
+    @Test
+    void assertDecodeWithDaterange() {
+        MppTableData tableData = new MppTableData();
+        tableData.setTableName("public.test");
+        tableData.setOpType("INSERT");
+        tableData.setColumnsName(new String[]{"data"});
+        tableData.setColumnsType(new String[]{"daterange"});
+        tableData.setColumnsVal(new String[]{"'[2020-01-02,2021-01-02)'"});
+        ByteBuffer data = ByteBuffer.wrap(toJSON(tableData).getBytes());
+        WriteRowEvent actual = (WriteRowEvent) new 
MppdbDecodingPlugin(null).decode(data, logSequenceNumber);
+        Object byteaObj = actual.getAfterRow().get(0);
+        assertThat(byteaObj, instanceOf(PGobject.class));
+        assertThat(byteaObj.toString(), is("[2020-01-02,2021-01-02)"));
+    }
+    
+    @Test
+    void assertDecodeWithTsquery() {
+        MppTableData tableData = new MppTableData();
+        tableData.setTableName("public.test");
+        tableData.setOpType("INSERT");
+        tableData.setColumnsName(new String[]{"data"});
+        tableData.setColumnsType(new String[]{"tsquery"});
+        tableData.setColumnsVal(new String[]{"'''fff'' | ''faa'''"});
+        ByteBuffer data = ByteBuffer.wrap(toJSON(tableData).getBytes());
+        WriteRowEvent actual = (WriteRowEvent) new 
MppdbDecodingPlugin(null).decode(data, logSequenceNumber);
+        Object byteaObj = actual.getAfterRow().get(0);
+        assertThat(byteaObj.toString(), is("'fff' | 'faa'"));
+    }
 }
diff --git 
a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/RetryStreamingExceptionHandler.java
 
b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/RetryStreamingExceptionHandler.java
index 7efa76da195..2794705780a 100644
--- 
a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/RetryStreamingExceptionHandler.java
+++ 
b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/RetryStreamingExceptionHandler.java
@@ -35,7 +35,7 @@ public final class RetryStreamingExceptionHandler implements 
ExceptionHandler {
     
     private final CDCClient cdcClient;
     
-    private final AtomicInteger maxRetryTimes;
+    private final int maxRetryTimes;
     
     private final int retryIntervalMills;
     
@@ -43,7 +43,7 @@ public final class RetryStreamingExceptionHandler implements 
ExceptionHandler {
     
     public RetryStreamingExceptionHandler(final CDCClient cdcClient, final int 
maxRetryTimes, final int retryIntervalMills) {
         this.cdcClient = cdcClient;
-        this.maxRetryTimes = new AtomicInteger(maxRetryTimes);
+        this.maxRetryTimes = maxRetryTimes;
         this.retryIntervalMills = retryIntervalMills;
     }
     
@@ -57,7 +57,7 @@ public final class RetryStreamingExceptionHandler implements 
ExceptionHandler {
     private void reconnect(final ChannelHandlerContext ctx) {
         retryTimes.incrementAndGet();
         ClientConnectionContext connectionContext = 
ctx.channel().attr(ClientConnectionContext.CONTEXT_KEY).get();
-        if (retryTimes.get() > maxRetryTimes.get()) {
+        if (retryTimes.get() > maxRetryTimes) {
             log.warn("Stop try to reconnect, stop streaming ids: {}", 
connectionContext.getStreamingIds());
             connectionContext.getStreamingIds().forEach(each -> 
CompletableFuture.runAsync(() -> cdcClient.stopStreaming(each)));
             return;
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
index a3bd0073576..edd99c6e21f 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
@@ -229,13 +229,14 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
      * @throws SQLException SQL exception
      */
     public void registerStorageUnit(final String storageUnitName) throws 
SQLException {
-        String username = getDatabaseType() instanceof OracleDatabaseType ? 
storageUnitName : getUsername();
+        String username = databaseType instanceof OracleDatabaseType ? 
storageUnitName : getUsername();
         String registerStorageUnitTemplate = "REGISTER STORAGE UNIT ${ds} ( 
URL='${url}', USER='${user}', PASSWORD='${password}')".replace("${ds}", 
storageUnitName)
                 .replace("${user}", username)
                 .replace("${password}", getPassword())
                 .replace("${url}", getActualJdbcUrlTemplate(storageUnitName, 
true));
         proxyExecuteWithLog(registerStorageUnitTemplate, 0);
-        Awaitility.await().ignoreExceptions().atMost(10, 
TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until(() -> 
showStorageUnitsName().contains(storageUnitName));
+        int timeout = databaseType instanceof OpenGaussDatabaseType ? 60 : 10;
+        Awaitility.await().ignoreExceptions().atMost(timeout, 
TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until(() -> 
showStorageUnitsName().contains(storageUnitName));
     }
     
     /**
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
index 470f46434a1..374e4eb13c1 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
@@ -76,7 +76,7 @@ public abstract class AbstractMigrationE2EIT {
                 .replace("${ds3}", 
containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_3, 
true))
                 .replace("${ds4}", 
containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_4, 
true));
         containerComposer.proxyExecuteWithLog(addTargetResource, 0);
-        Awaitility.await().ignoreExceptions().atMost(5L, 
TimeUnit.SECONDS).pollInterval(1L, TimeUnit.SECONDS).until(() -> 3 == 
containerComposer.showStorageUnitsName().size());
+        Awaitility.await().ignoreExceptions().atMost(10L, 
TimeUnit.SECONDS).pollInterval(1L, TimeUnit.SECONDS).until(() -> 3 == 
containerComposer.showStorageUnitsName().size());
     }
     
     protected void createSourceSchema(final PipelineContainerComposer 
containerComposer, final String schemaName) throws SQLException {
@@ -104,7 +104,7 @@ public abstract class AbstractMigrationE2EIT {
     
     protected void createTargetOrderTableRule(final PipelineContainerComposer 
containerComposer) throws SQLException {
         
containerComposer.proxyExecuteWithLog(migrationDistSQL.getCreateTargetOrderTableRule(),
 0);
-        Awaitility.await().atMost(4L, TimeUnit.SECONDS).pollInterval(1L, 
TimeUnit.SECONDS).until(() -> !containerComposer.queryForListWithLog("SHOW 
SHARDING TABLE RULE t_order").isEmpty());
+        Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(1L, 
TimeUnit.SECONDS).until(() -> !containerComposer.queryForListWithLog("SHOW 
SHARDING TABLE RULE t_order").isEmpty());
     }
     
     protected void createTargetOrderTableEncryptRule(final 
PipelineContainerComposer containerComposer) throws SQLException {
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
index 877d72dd5f7..b56c1a933a9 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
@@ -77,7 +77,7 @@ class PostgreSQLMigrationGeneralE2EIT extends 
AbstractMigrationE2EIT {
             
DataSourceExecuteUtils.execute(containerComposer.getSourceDataSource(), 
containerComposer.getExtraSQLCommand().getFullInsertOrderItem(), 
dataPair.getRight());
             log.info("init data end: {}", LocalDateTime.now());
             startMigrationWithSchema(containerComposer, SOURCE_TABLE_NAME, 
"t_order");
-            Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(1L, 
TimeUnit.SECONDS).until(() -> listJobId(containerComposer).size() > 0);
+            Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(1L, 
TimeUnit.SECONDS).until(() -> !listJobId(containerComposer).isEmpty());
             String jobId = getJobIdByTableName(containerComposer, "ds_0.test." 
+ SOURCE_TABLE_NAME);
             containerComposer.waitIncrementTaskFinished(String.format("SHOW 
MIGRATION STATUS '%s'", jobId));
             String schemaTableName = String.join(".", 
PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_NAME);
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java
index d03bb3b7a86..4e03016e24f 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java
@@ -56,9 +56,10 @@ public final class E2EIncrementalTask extends 
BaseIncrementTask {
     private static final List<String> POSTGRESQL_COLUMN_NAMES = 
Arrays.asList("order_id", "user_id", "status", "t_int2", "t_numeric", "t_bool", 
"t_bytea", "t_char", "t_varchar", "t_float",
             "t_double", "t_json", "t_jsonb", "t_text", "t_date", "t_time", 
"t_timestamp", "t_timestamptz");
     
-    private static final List<String> OPENGAUSS_COLUMN_NAMES = 
Arrays.asList("order_id", "user_id", "status", "c_int", "c_smallint", 
"c_float", "c_double", "c_numeric", "c_boolean", "c_char",
-            "c_text", "c_bytea", "c_date", "c_time", "c_smalldatetime", 
"c_timestamp", "c_timestamptz", "c_interval", "c_array", "c_json", "c_jsonb", 
"c_uuid", "c_hash32", "c_tsvector", "c_bit",
-            "c_int4range", "c_reltime", "c_abstime", "c_point", "c_lseg", 
"c_box", "c_circle", "c_bitvarying", "c_cidr", "c_inet", "c_macaddr", "c_hll");
+    private static final List<String> OPENGAUSS_COLUMN_NAMES = 
Arrays.asList("order_id", "user_id", "status", "c_int", "c_smallint", 
"c_float", "c_double", "c_numeric", "c_boolean",
+            "c_char", "c_text", "c_bytea", "c_raw", "c_date", "c_time", 
"c_smalldatetime", "c_timestamp", "c_timestamptz", "c_interval", "c_array", 
"c_json", "c_jsonb", "c_uuid", "c_hash32",
+            "c_tsvector", "c_tsquery", "c_bit", "c_int4range", "c_daterange", 
"c_tsrange", "c_reltime", "c_abstime", "c_point", "c_lseg", "c_box", 
"c_circle", "c_bitvarying", "c_cidr", "c_inet",
+            "c_macaddr", "c_hll", "c_money");
     
     private final DataSource dataSource;
     
@@ -132,11 +133,11 @@ public final class E2EIncrementalTask extends 
BaseIncrementTask {
             LocalDateTime now = LocalDateTime.now();
             String sql = 
SQLBuilderUtils.buildUpdateSQL(ignoreShardingColumns(OPENGAUSS_COLUMN_NAMES), 
orderTableName, "?");
             Object[] parameters = {"中文测试", randomInt, random.nextInt(-999, 
999), PipelineCaseHelper.generateFloat(), PipelineCaseHelper.generateDouble(), 
BigDecimal.valueOf(10000),
-                    random.nextBoolean(), "update-char", "update-text", 
"update-bytea".getBytes(), now.toLocalDate().plusDays(1), 
now.toLocalTime().plusHours(6), "2023-03-01", now,
+                    random.nextBoolean(), "update-char", "update-text", new 
byte[]{0, 0, 1}, new byte[]{1, 0}, now.toLocalDate().plusDays(1), 
now.toLocalTime().plusHours(6), "2023-03-01", now,
                     OffsetDateTime.now(), "1 years 1 mons 1 days 1 hours 1 
mins 1 secs", "{4, 5, 6}", PipelineCaseHelper.generateJsonString(1, true), 
PipelineCaseHelper.generateJsonString(1, false),
-                    
UUID.fromString("00000000-000-0000-0000-000000000001").toString(),
-                    DigestUtils.md5Hex(now.toString()), null, "1111", 
"[1,10000)", "2 years 2 mons 2 days 06:00:00", "2023-01-01 00:00:00+00", 
"(2.0,2.0)",
-                    "[(0.0,0.0),(3.0,3.0)]", "(1.0,1.0),(3.0,3.0)", 
"<(5.0,5.0),1.0>", "1010", "192.168.0.0/24", "192.168.1.1", 
"08:00:3b:01:02:03", null, orderId};
+                    
UUID.fromString("00000000-000-0000-0000-000000000001").toString(), 
DigestUtils.md5Hex(now.toString()), "aaa", "bbb", "1111", "[1,10000)", 
"[2020-01-02,2021-01-01)",
+                    "[2020-01-01 00:00:00,2021-01-01 00:00:00)", "2 years 2 
mons 2 days 06:00:00", "2023-01-01 00:00:00+00", "(2.0,2.0)", 
"[(0.0,0.0),(3.0,3.0)]", "(1.0,1.0),(3.0,3.0)",
+                    "<(5.0,5.0),1.0>", "1010", "192.168.0.0/24", 
"192.168.1.1", "08:00:3b:01:02:03", null, 1000, orderId};
             log.info("update sql: {}, params: {}", sql, parameters);
             DataSourceExecuteUtils.execute(dataSource, sql, parameters);
         }
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/framework/helper/PipelineCaseHelper.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/framework/helper/PipelineCaseHelper.java
index 5538645f3de..f243f5c3fed 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/framework/helper/PipelineCaseHelper.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/framework/helper/PipelineCaseHelper.java
@@ -107,13 +107,13 @@ public final class PipelineCaseHelper {
         if (databaseType instanceof OpenGaussDatabaseType) {
             for (int i = 0; i < insertRows; i++) {
                 Object orderId = keyGenerateAlgorithm.generateKey();
-                // TODO openGauss mpp plugin parses single quotes incorrectly
-                result.add(new Object[]{orderId, generateInt(0, 1000), 
"status" + i, generateInt(-1000, 9999), generateInt(0, 100), generateFloat(), 
generateDouble(),
-                        BigDecimal.valueOf(generateDouble()), false, 
generateString(6), "texts", "bytea".getBytes(), LocalDate.now(), 
LocalTime.now(), "2001-10-01",
+                byte[] bytesValue = {Byte.MIN_VALUE, -1, 0, 1, Byte.MAX_VALUE};
+                result.add(new Object[]{orderId, generateInt(0, 1000), 
"'status'" + i, generateInt(-1000, 9999), generateInt(0, 100), generateFloat(), 
generateDouble(),
+                        BigDecimal.valueOf(generateDouble()), false, 
generateString(6), "texts", bytesValue, bytesValue, LocalDate.now(), 
LocalTime.now(), "2001-10-01",
                         Timestamp.valueOf(LocalDateTime.now()), 
OffsetDateTime.now(), "0 years 0 mons 1 days 2 hours 3 mins 4 secs", "{1, 2, 
3}", generateJsonString(8, false),
-                        generateJsonString(8, true), 
UUID.randomUUID().toString(), DigestUtils.md5Hex(orderId.toString()), null, 
"0000", "[1,1000)",
-                        "1 years 1 mons 10 days -06:00:00", "2000-01-02 
00:00:00+00", "(1.0,1.0)", "[(0.0,0.0),(2.0,2.0)]", "(3.0,3.0),(1.0,1.0)", 
"<(5.0,5.0),5.0>", "1111",
-                        "192.168.0.0/16", "192.168.1.1", "08:00:2b:01:02:03", 
"\\x484c4c00000000002b05000000000000000000000000000000000000"});
+                        generateJsonString(8, true), 
UUID.randomUUID().toString(), DigestUtils.md5Hex(orderId.toString()), "'rat' 
'sat'", "tsquery", "0000", "[1,1000)", "[2020-01-02,2021-01-01)",
+                        "[2020-01-01 00:00:00,2021-01-01 00:00:00)", "1 years 
1 mons 10 days -06:00:00", "2000-01-02 00:00:00+00", "(1.0,1.0)", 
"[(0.0,0.0),(2.0,2.0)]", "(3.0,3.0),(1.0,1.0)",
+                        "<(5.0,5.0),5.0>", "1111", "192.168.0.0/16", 
"192.168.1.1", "08:00:2b:01:02:03", 
"\\x484c4c00000000002b05000000000000000000000000000000000000", 999});
             }
             return result;
         }
diff --git 
a/test/e2e/operation/pipeline/src/test/resources/env/scenario/general/opengauss.xml
 
b/test/e2e/operation/pipeline/src/test/resources/env/scenario/general/opengauss.xml
index c7da6ef3ab4..87fdc0d33d3 100644
--- 
a/test/e2e/operation/pipeline/src/test/resources/env/scenario/general/opengauss.xml
+++ 
b/test/e2e/operation/pipeline/src/test/resources/env/scenario/general/opengauss.xml
@@ -29,6 +29,7 @@
         c_char character(32),
         c_text text,
         c_bytea bytea,
+        c_raw bytea,
         c_date date,
         c_time time without time zone,
         c_smalldatetime smalldatetime,
@@ -41,8 +42,11 @@
         c_uuid uuid,
         c_hash32 hash32,
         c_tsvector tsvector,
+        c_tsquery tsquery,
         c_bit bit(4),
         c_int4range int4range,
+        c_daterange daterange,
+        c_tsrange tsrange,
         c_reltime reltime,
         c_abstime abstime,
         c_point point,
@@ -54,16 +58,17 @@
         c_inet inet,
         c_macaddr macaddr,
         c_hll hll(14,10,12,0),
+        c_money money,
         PRIMARY KEY ( order_id )
         );
     </create-table-order>
 
     <full-insert-order>
         INSERT INTO test.t_order (
-        order_id, user_id, status, c_int, c_smallint, c_float, c_double, 
c_numeric, c_boolean, c_char, c_text, c_bytea, c_date, c_time,
-        c_smalldatetime, c_timestamp, c_timestamptz, c_interval, c_array, 
c_json, c_jsonb, c_uuid, c_hash32, c_tsvector, c_bit,
-        c_int4range, c_reltime, c_abstime, c_point, c_lseg, c_box, c_circle, 
c_bitvarying, c_cidr, c_inet, c_macaddr, c_hll
-        ) VALUES ( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);
+        order_id, user_id, status, c_int, c_smallint, c_float, c_double, 
c_numeric, c_boolean, c_char, c_text, c_bytea, c_raw, c_date, c_time,
+        c_smalldatetime, c_timestamp, c_timestamptz, c_interval, c_array, 
c_json, c_jsonb, c_uuid, c_hash32, c_tsvector, c_tsquery, c_bit,
+        c_int4range, c_daterange, c_tsrange, c_reltime, c_abstime, c_point, 
c_lseg, c_box, c_circle, c_bitvarying, c_cidr, c_inet, c_macaddr, c_hll, c_money
+        ) VALUES ( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);
     </full-insert-order>
 
     <create-table-order-item>

Reply via email to