This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 9228d184d16 Add record exist check at no unique key migration E2E  
(#26653)
9228d184d16 is described below

commit 9228d184d16312dc9a48914671db6b16ba3a998b
Author: Xinze Guo <[email protected]>
AuthorDate: Wed Jun 28 19:14:18 2023 +0800

    Add record exist check at no unique key migration E2E  (#26653)
    
    * Incr max wait time at register storage unit at E2E
    
    * Add record exist check at no unique key migration
    
    * Remove retry at PipelineContainerComposer.queryForListWithLog
    
    * Use AwaitTimeoutUtil
---
 .../pipeline/cases/PipelineContainerComposer.java  | 21 ++++---------
 .../test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java |  8 +++--
 .../cases/migration/AbstractMigrationE2EIT.java    |  9 ++++--
 .../primarykey/IndexesMigrationE2EIT.java          |  9 +++---
 .../e2e/data/pipeline/util/AwaitTimeoutUtil.java   | 34 ++++++++++++++++++++++
 5 files changed, 55 insertions(+), 26 deletions(-)

diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
index c1c5349bddf..dbfee4598a0 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
@@ -404,20 +404,12 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
      */
     public List<Map<String, Object>> queryForListWithLog(final DataSource 
dataSource, final String sql) {
         log.info("Query SQL: {}", sql);
-        int retryNumber = 0;
-        while (retryNumber <= 3) {
-            try (Connection connection = dataSource.getConnection()) {
-                ResultSet resultSet = 
connection.createStatement().executeQuery(sql);
-                return transformResultSetToList(resultSet);
-                // CHECKSTYLE:OFF
-            } catch (final SQLException | RuntimeException ex) {
-                // CHECKSTYLE:ON
-                log.error("Data access error, sql: {}.", sql, ex);
-            }
-            Awaitility.await().pollDelay(3L, TimeUnit.SECONDS).until(() -> 
true);
-            retryNumber++;
+        try (Connection connection = dataSource.getConnection()) {
+            ResultSet resultSet = 
connection.createStatement().executeQuery(sql);
+            return transformResultSetToList(resultSet);
+        } catch (final SQLException ex) {
+            throw new RuntimeException(ex);
         }
-        throw new RuntimeException("Can not get result from proxy.");
     }
     
     /**
@@ -547,10 +539,9 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
      * Generate ShardingSphere data source from proxy.
      *
      * @return ShardingSphere data source
-     * @throws SQLException SQL exception
      */
     // TODO proxy support for some fields still needs to be optimized, such as 
binary of MySQL, after these problems are optimized, Proxy dataSource can be 
used.
-    public DataSource generateShardingSphereDataSourceFromProxy() throws 
SQLException {
+    public DataSource generateShardingSphereDataSourceFromProxy() {
         Awaitility.await().atMost(5L, TimeUnit.SECONDS).pollInterval(1L, 
TimeUnit.SECONDS).until(() -> null != getYamlRootConfig().getRules());
         YamlRootConfiguration rootConfig = getYamlRootConfig();
         ShardingSpherePreconditions.checkNotNull(rootConfig.getDataSources(), 
() -> new IllegalStateException("dataSources is null"));
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
index 4aeacfff4e6..cd47ac95c63 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
@@ -49,6 +49,7 @@ import 
org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.Pipeline
 import 
org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineE2ESettings.PipelineE2EDatabaseSettings;
 import 
org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineE2ETestCaseArgumentsProvider;
 import 
org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineTestParameter;
+import org.apache.shardingsphere.test.e2e.data.pipeline.util.AwaitTimeoutUtil;
 import 
org.apache.shardingsphere.test.e2e.data.pipeline.util.DataSourceExecuteUtils;
 import 
org.apache.shardingsphere.test.e2e.env.container.atomic.constants.ProxyContainerConstants;
 import org.junit.jupiter.api.condition.EnabledIf;
@@ -105,8 +106,8 @@ class CDCE2EIT {
             for (String each : Arrays.asList(PipelineContainerComposer.DS_0, 
PipelineContainerComposer.DS_1)) {
                 containerComposer.registerStorageUnit(each);
             }
-            Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(1, 
TimeUnit.SECONDS).until(() -> containerComposer.showStorageUnitsName()
-                    .containsAll(Arrays.asList(PipelineContainerComposer.DS_0, 
PipelineContainerComposer.DS_1)));
+            
Awaitility.await().ignoreExceptions().atMost(AwaitTimeoutUtil.getTimeout(containerComposer.getDatabaseType()),
 TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS)
+                    .until(() -> 
containerComposer.showStorageUnitsName().containsAll(Arrays.asList(PipelineContainerComposer.DS_0,
 PipelineContainerComposer.DS_1)));
             createOrderTableRule(containerComposer);
             try (Connection connection = 
containerComposer.getProxyDataSource().getConnection()) {
                 initSchemaAndTable(containerComposer, connection, 3);
@@ -150,7 +151,8 @@ class CDCE2EIT {
     }
     
     private void createOrderTableRule(final PipelineContainerComposer 
containerComposer) throws SQLException {
-        containerComposer.proxyExecuteWithLog(CREATE_SHARDING_RULE_SQL, 2);
+        containerComposer.proxyExecuteWithLog(CREATE_SHARDING_RULE_SQL, 0);
+        Awaitility.await().atMost(20L, TimeUnit.SECONDS).pollInterval(2L, 
TimeUnit.SECONDS).until(() -> !containerComposer.queryForListWithLog("SHOW 
SHARDING TABLE RULE t_order").isEmpty());
     }
     
     private void initSchemaAndTable(final PipelineContainerComposer 
containerComposer, final Connection connection, final int sleepSeconds) throws 
SQLException {
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
index 2750dc6850f..c6baade542a 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
@@ -24,6 +24,7 @@ import 
org.apache.shardingsphere.test.e2e.data.pipeline.cases.PipelineContainerC
 import 
org.apache.shardingsphere.test.e2e.data.pipeline.command.MigrationDistSQLCommand;
 import 
org.apache.shardingsphere.test.e2e.data.pipeline.env.PipelineE2EEnvironment;
 import 
org.apache.shardingsphere.test.e2e.data.pipeline.env.enums.PipelineEnvTypeEnum;
+import org.apache.shardingsphere.test.e2e.data.pipeline.util.AwaitTimeoutUtil;
 import 
org.apache.shardingsphere.test.e2e.env.container.atomic.util.DatabaseTypeUtils;
 import org.awaitility.Awaitility;
 import org.opengauss.util.PSQLException;
@@ -74,7 +75,8 @@ public abstract class AbstractMigrationE2EIT {
                 .replace("${ds3}", 
containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_3, 
true))
                 .replace("${ds4}", 
containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_4, 
true));
         containerComposer.proxyExecuteWithLog(addTargetResource, 0);
-        Awaitility.await().atMost(5L, TimeUnit.SECONDS).pollInterval(500L, 
TimeUnit.MILLISECONDS).until(() -> 3 == 
containerComposer.showStorageUnitsName().size());
+        
Awaitility.await().ignoreExceptions().atMost(AwaitTimeoutUtil.getTimeout(containerComposer.getDatabaseType()),
 TimeUnit.SECONDS).pollInterval(1L, TimeUnit.SECONDS)
+                .until(() -> 3 == 
containerComposer.showStorageUnitsName().size());
     }
     
     protected void createSourceSchema(final PipelineContainerComposer 
containerComposer, final String schemaName) throws SQLException {
@@ -102,7 +104,8 @@ public abstract class AbstractMigrationE2EIT {
     
     protected void createTargetOrderTableRule(final PipelineContainerComposer 
containerComposer) throws SQLException {
         
containerComposer.proxyExecuteWithLog(migrationDistSQL.getCreateTargetOrderTableRule(),
 0);
-        Awaitility.await().atMost(4L, TimeUnit.SECONDS).pollInterval(500L, 
TimeUnit.MILLISECONDS).until(() -> !containerComposer.queryForListWithLog("SHOW 
SHARDING TABLE RULE t_order").isEmpty());
+        
Awaitility.await().atMost(AwaitTimeoutUtil.getTimeout(containerComposer.getDatabaseType()),
 TimeUnit.SECONDS).pollInterval(1L, TimeUnit.SECONDS)
+                .until(() -> !containerComposer.queryForListWithLog("SHOW 
SHARDING TABLE RULE t_order").isEmpty());
     }
     
     protected void createTargetOrderTableEncryptRule(final 
PipelineContainerComposer containerComposer) throws SQLException {
@@ -111,7 +114,7 @@ public abstract class AbstractMigrationE2EIT {
     
     protected void createTargetOrderItemTableRule(final 
PipelineContainerComposer containerComposer) throws SQLException {
         
containerComposer.proxyExecuteWithLog(migrationDistSQL.getCreateTargetOrderItemTableRule(),
 0);
-        Awaitility.await().atMost(4L, TimeUnit.SECONDS).pollInterval(500L, 
TimeUnit.MILLISECONDS)
+        
Awaitility.await().atMost(AwaitTimeoutUtil.getTimeout(containerComposer.getDatabaseType()),
 TimeUnit.SECONDS).pollInterval(1L, TimeUnit.SECONDS)
                 .until(() -> !containerComposer.queryForListWithLog("SHOW 
SHARDING TABLE RULE t_order_item").isEmpty());
     }
     
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
index 5a61df80f40..d91a6e22541 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
@@ -91,13 +91,12 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
             KeyGenerateAlgorithm keyGenerateAlgorithm = new 
UUIDKeyGenerateAlgorithm();
             // TODO PostgreSQL update delete events not support if table 
without unique keys at increment task.
             final Consumer<DataSource> incrementalTaskFn = dataSource -> {
-                Object orderId = keyGenerateAlgorithm.generateKey();
-                insertOneOrder(containerComposer, orderId);
                 if (containerComposer.getDatabaseType() instanceof 
MySQLDatabaseType) {
-                    updateOneOrder(containerComposer, orderId, "updated");
-                    deleteOneOrder(containerComposer, orderId, "updated");
-                    insertOneOrder(containerComposer, 
keyGenerateAlgorithm.generateKey());
+                    doCreateUpdateDelete(containerComposer, 
keyGenerateAlgorithm.generateKey());
                 }
+                Object orderId = keyGenerateAlgorithm.generateKey();
+                insertOneOrder(containerComposer, orderId);
+                containerComposer.assertOrderRecordExist(dataSource, 
"t_order", orderId);
             };
             assertMigrationSuccess(containerComposer, sql, "user_id", 
keyGenerateAlgorithm, consistencyCheckAlgorithmType, incrementalTaskFn);
         }
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/util/AwaitTimeoutUtil.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/util/AwaitTimeoutUtil.java
new file mode 100644
index 00000000000..37f89ac7f8a
--- /dev/null
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/util/AwaitTimeoutUtil.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.test.e2e.data.pipeline.util;
+
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import 
org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
+
+public final class AwaitTimeoutUtil {
+    
+    /**
+     * Get timeout, openGauss metadata reload slowly need special handling.
+     *
+     * @param databaseType database type.
+     * @return timeout
+     */
+    public static long getTimeout(final DatabaseType databaseType) {
+        return databaseType instanceof OpenGaussDatabaseType ? 60 : 10;
+    }
+}

Reply via email to