sandynz commented on code in PR #24485:
URL: https://github.com/apache/shardingsphere/pull/24485#discussion_r1130350307


##########
test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java:
##########
@@ -90,14 +90,15 @@ public void assertMigrationSuccess() throws SQLException, 
InterruptedException {
         createTargetOrderItemTableRule();
         Pair<List<Object[]>, List<Object[]>> dataPair = 
PipelineCaseHelper.generateFullInsertData(testParam.getDatabaseType(), 
PipelineBaseE2EIT.TABLE_INIT_ROW_COUNT);
         log.info("init data begin: {}", LocalDateTime.now());
-        DataSourceExecuteUtil.execute(getSourceDataSource(), 
getExtraSQLCommand().getFullInsertOrder(getSourceTableOrderName()), 
dataPair.getLeft());
+        String insertOrderSql = 
getExtraSQLCommand().getFullInsertOrder(getSourceTableOrderName());

Review Comment:
   `insertOrderSql` could be `insertOrderSQL`



##########
test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java:
##########
@@ -125,38 +125,36 @@ public void assertCDCDataImportSuccess() throws 
SQLException, InterruptedExcepti
             registerStorageUnit(each);
         }
         createOrderTableRule();
-        try (Connection connection = getProxyDataSource().getConnection()) {
+        DataSource proxyDataSource = 
generateShardingSphereDataSourceFromProxy();
+        try (Connection connection = proxyDataSource.getConnection()) {
             initSchemaAndTable(connection);
         }
         Pair<List<Object[]>, List<Object[]>> dataPair = 
PipelineCaseHelper.generateFullInsertData(getDatabaseType(), 20);
         log.info("init data begin: {}", LocalDateTime.now());
-        DataSourceExecuteUtil.execute(getProxyDataSource(), 
getExtraSQLCommand().getFullInsertOrder(getSourceTableOrderName()), 
dataPair.getLeft());
+        String insertOrderTableSql = 
getExtraSQLCommand().getFullInsertOrder(getSourceTableOrderName());
+        DataSourceExecuteUtil.execute(proxyDataSource, insertOrderTableSql, 
dataPair.getLeft());
         log.info("init data end: {}", LocalDateTime.now());
         try (Connection connection = 
DriverManager.getConnection(getActualJdbcUrlTemplate(DS_4, false), 
getUsername(), getPassword())) {
             initSchemaAndTable(connection);
         }
         startCDCClient();
         Awaitility.await().atMost(10, TimeUnit.SECONDS).pollInterval(1, 
TimeUnit.SECONDS).until(() -> !queryForListWithLog("SHOW STREAMING 
LIST").isEmpty());
-        if (getDatabaseType() instanceof MySQLDatabaseType) {
-            startIncrementTask(new MySQLIncrementTask(getProxyDataSource(), 
getSourceTableOrderName(), new SnowflakeKeyGenerateAlgorithm(), 20));
-        } else {
-            startIncrementTask(new 
PostgreSQLIncrementTask(getProxyDataSource(), PipelineBaseE2EIT.SCHEMA_NAME, 
getSourceTableOrderName(), 20));
-        }
+        startIncrementTask(new E2EIncrementalTask(proxyDataSource, 
getSourceTableOrderName(), insertOrderTableSql, new 
SnowflakeKeyGenerateAlgorithm(), getDatabaseType(), 20));
         getIncreaseTaskThread().join(10000);
         List<Map<String, Object>> actualProxyList;
-        try (Connection connection = getProxyDataSource().getConnection()) {
+        try (Connection connection = proxyDataSource.getConnection()) {
             ResultSet resultSet = 
connection.createStatement().executeQuery(String.format("SELECT * FROM %s ORDER 
BY order_id ASC", getOrderTableNameWithSchema()));
             actualProxyList = transformResultSetToList(resultSet);
         }
-        Awaitility.await().atMost(10, TimeUnit.SECONDS).pollInterval(2, 
TimeUnit.SECONDS).until(() -> 
listOrderRecords(getOrderTableNameWithSchema()).size() == 
actualProxyList.size());
+        Awaitility.await().atMost(20, TimeUnit.SECONDS).pollInterval(2, 
TimeUnit.SECONDS).until(() -> 
listOrderRecords(getOrderTableNameWithSchema()).size() == 
actualProxyList.size());

Review Comment:
   Could we remove `actualProxyList` check? Since there's 
SingleTableInventoryDataConsistencyChecker.check



##########
db-protocol/mysql/src/main/java/org/apache/shardingsphere/db/protocol/mysql/packet/binlog/row/column/value/string/MySQLStringBinlogProtocolValue.java:
##########
@@ -45,7 +45,7 @@ public Serializable read(final MySQLBinlogColumnDef 
columnDef, final MySQLPacket
             case MYSQL_TYPE_SET:
                 return payload.getByteBuf().readByte();
             case MYSQL_TYPE_STRING:
-                return payload.readStringFix(readActualLength(length, 
payload));
+                return payload.readStringFixByBytes(readActualLength(length, 
payload));

Review Comment:
   It should return `new MySQLBinaryString(bytes)`



##########
test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.test.e2e.data.pipeline.cases.task;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import 
org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
+import 
org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseType;
+import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
+import 
org.apache.shardingsphere.test.e2e.data.pipeline.cases.base.BaseIncrementTask;
+import 
org.apache.shardingsphere.test.e2e.data.pipeline.framework.helper.PipelineCaseHelper;
+import 
org.apache.shardingsphere.test.e2e.data.pipeline.util.DataSourceExecuteUtil;
+
+import javax.sql.DataSource;
+import java.math.BigDecimal;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.time.Year;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+@RequiredArgsConstructor
+@Slf4j
+public final class E2EIncrementalTask extends BaseIncrementTask {
+    
+    private static final List<String> MYSQL_COLUMN_NAMES = 
Arrays.asList("status", "t_mediumint", "t_smallint", "t_tinyint", 
"t_unsigned_int", "t_unsigned_mediumint", "t_unsigned_smallint",
+            "t_unsigned_tinyint", "t_float", "t_double", "t_decimal", 
"t_timestamp", "t_datetime", "t_date", "t_time", "t_year", "t_bit", "t_binary", 
"t_varbinary", "t_blob", "t_mediumblob",
+            "t_char", "t_text", "t_mediumtext", "t_enum", "t_set", "t_json");
+    
+    private static final List<String> POSTGRESQL_COLUMN_NAMES = 
Arrays.asList("status", "t_int2", "t_numeric", "t_bool", "t_bytea", "t_char", 
"t_float", "t_double", "t_json", "t_jsonb", "t_text",
+            "t_date", "t_time", "t_timestamp", "t_timestamptz");
+    
+    private final DataSource dataSource;
+    
+    private final String orderTableName;
+    
+    private final String insertTableSql;
+    
+    private final KeyGenerateAlgorithm primaryKeyGenerateAlgorithm;
+    
+    private final DatabaseType databaseType;
+    
+    private final int loopCount;
+    
+    @Override
+    public void run() {
+        List<Object[]> orderInsertData = 
PipelineCaseHelper.generateOrderInsertData(databaseType, 
primaryKeyGenerateAlgorithm, loopCount);
+        List<Object> primaryKeys = new LinkedList<>();
+        for (Object[] each : orderInsertData) {
+            primaryKeys.add(each[0]);
+            DataSourceExecuteUtil.execute(dataSource, insertTableSql, each);
+        }
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        for (int i = 0; i < Math.max(1, loopCount / 3); i++) {
+            // TODO 0000-00-00 00:00:00 now will cause consistency check 
failed of MySQL.
+            // DataSourceUtil.execute(dataSource, String.format("UPDATE %s SET 
t_datetime='0000-00-00 00:00:00' WHERE order_id = ?", orderTableName)
+            updateOrderById(primaryKeys.get(random.nextInt(0, 
primaryKeys.size())));
+        }
+        for (int i = 0; i < Math.max(1, loopCount / 3); i++) {
+            setNullToAllFields(primaryKeys.get(random.nextInt(0, 
primaryKeys.size())));
+            deleteOrderById(primaryKeys.get(random.nextInt(0, 
primaryKeys.size())));
+        }
+        log.info("increment task runnable execute successfully.");
+    }
+    
+    private void updateOrderById(final Object orderId) {
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        int randomInt = random.nextInt(-100, 100);
+        if (databaseType instanceof MySQLDatabaseType) {
+            String sql = String.format(buildUpdateSql(MYSQL_COLUMN_NAMES, 
"?"), orderTableName);
+            log.info("update sql: {}", sql);
+            int randomUnsignedInt = random.nextInt(10, 100);
+            LocalDateTime now = LocalDateTime.now();
+            DataSourceExecuteUtil.execute(dataSource, sql, new 
Object[]{"中文测试", randomInt, randomInt, randomInt, randomUnsignedInt, 
randomUnsignedInt, randomUnsignedInt,
+                    randomUnsignedInt, 1.0F, 1.0, new BigDecimal("999"), now, 
now, now.toLocalDate(), now.toLocalTime(), Year.now().getValue() + 1, new 
byte[]{}, new byte[]{1, 2, -1, -3},
+                    "D".getBytes(), "A".getBytes(), "T".getBytes(), "E", 
"text", "mediumText", "3", "3", PipelineCaseHelper.generateJsonString(32, 
true), orderId});
+            return;
+        }
+        if (databaseType instanceof PostgreSQLDatabaseType || databaseType 
instanceof OpenGaussDatabaseType) {
+            String sql = String.format(buildUpdateSql(POSTGRESQL_COLUMN_NAMES, 
"?"), orderTableName);
+            log.info("update sql: {}", sql);
+            DataSourceExecuteUtil.execute(dataSource, sql, new 
Object[]{"中文测试", randomInt, BigDecimal.valueOf(10000), true, new byte[]{}, 
"update", PipelineCaseHelper.generateFloat(),
+                    PipelineCaseHelper.generateDouble(), 
PipelineCaseHelper.generateJsonString(10, true), 
PipelineCaseHelper.generateJsonString(20, true), "text-update", LocalDate.now(),
+                    LocalTime.now(), Timestamp.valueOf(LocalDateTime.now()), 
OffsetDateTime.now(), orderId});
+        }
+    }
+    
+    private String buildUpdateSql(final List<String> columnNames, final String 
placeholder) {

Review Comment:
   `buildUpdateSql` could be `buildUpdateSQL`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to