sandynz commented on code in PR #23808:
URL: https://github.com/apache/shardingsphere/pull/23808#discussion_r1090159326


##########
kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCheckIgnoredType.java:
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.api.check.consistency;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+/**
+ * Data consistency check ignored type.
+ */
+@RequiredArgsConstructor
+@Getter
+public enum DataConsistencyCheckIgnoredType {
+    
+    UNKNOWN("Unknown data consistency check ignored type"),

Review Comment:
   Could we remove `UNKNOWN` if it's not necessary



##########
kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCheckIgnoredType.java:
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.api.check.consistency;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+/**
+ * Data consistency check ignored type.
+ */
+@RequiredArgsConstructor
+@Getter
+public enum DataConsistencyCheckIgnoredType {
+    
+    UNKNOWN("Unknown data consistency check ignored type"),
+    
+    NONE_PRIMARY_KEY("Data consistency check are not supported for tables 
without primary keys");
+    

Review Comment:
   `NO_UNIQUE_KEY` might be better than `NONE_PRIMARY_KEY`



##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineTableMetaDataUtil.java:
##########
@@ -43,9 +44,9 @@ public final class PipelineTableMetaDataUtil {
      * @param metaDataLoader meta data loader
      * @return pipeline column meta data
      */
-    public static PipelineColumnMetaData getUniqueKeyColumn(final String 
schemaName, final String tableName, final PipelineTableMetaDataLoader 
metaDataLoader) {
+    public static Optional<PipelineColumnMetaData> getUniqueKeyColumn(final 
String schemaName, final String tableName, final PipelineTableMetaDataLoader 
metaDataLoader) {
         PipelineTableMetaData pipelineTableMetaData = 
metaDataLoader.getTableMetaData(schemaName, tableName);
-        return mustGetAnAppropriateUniqueKeyColumn(pipelineTableMetaData, 
tableName);
+        return 
Optional.ofNullable(mustGetAnAppropriateUniqueKeyColumn(pipelineTableMetaData, 
tableName));
     }
     
     private static PipelineColumnMetaData 
mustGetAnAppropriateUniqueKeyColumn(final PipelineTableMetaData tableMetaData, 
final String tableName) {

Review Comment:
   Since it might return `null`, method name 
`mustGetAnAppropriateUniqueKeyColumn` should be updated



##########
kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java:
##########
@@ -150,6 +150,14 @@ default Optional<String> buildCreateSchemaSQL(String 
schemaName) {
      */
     String buildSplitByPrimaryKeyRangeSQL(String schemaName, String tableName, 
String primaryKey);
     
+    /**
+     * Build select offset SQL.
+     * @param schemaName schema name

Review Comment:
   1, It's better to add newline between javadoc description and params.
   
   2, Consider to rename method. It's better not use offset when there's no 
unique key.
   



##########
kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCheckResult.java:
##########
@@ -33,12 +31,33 @@ public final class DataConsistencyCheckResult {
     
     private final DataConsistencyContentCheckResult contentCheckResult;
     
+    private final DataConsistencyCheckIgnoredType checkIgnoredType;
+    
+    private final boolean ignored;
+    
+    public DataConsistencyCheckResult(final DataConsistencyCountCheckResult 
countCheckResult, final DataConsistencyContentCheckResult contentCheckResult) {
+        this.countCheckResult = countCheckResult;
+        this.contentCheckResult = contentCheckResult;
+        checkIgnoredType = null;
+        ignored = false;
+    }
+    
+    public DataConsistencyCheckResult(final DataConsistencyCheckIgnoredType 
checkIgnoredType) {
+        this.checkIgnoredType = checkIgnoredType;
+        ignored = true;
+        countCheckResult = null;
+        contentCheckResult = null;
+    }
+    
     /**
      * Is count and content matched.
      *
      * @return matched or not
      */
     public boolean isMatched() {
+        if (ignored || null == countCheckResult || null == contentCheckResult) 
{

Review Comment:
   Looks it's not necessary to add ` || null == countCheckResult || null == 
contentCheckResult`, they're required when ignored is false



##########
kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/NonePrimaryKeyPosition.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.api.ingest.position;
+
+import lombok.RequiredArgsConstructor;
+
+/**
+ * None primary key position.
+ */
+@RequiredArgsConstructor
+public final class NonePrimaryKeyPosition extends PrimaryKeyPosition<Integer> 
implements IngestPosition<NonePrimaryKeyPosition> {

Review Comment:
   1, Class name could be changed, similar as enum name;
   
   2, Seems generic type `PrimaryKeyPosition<Integer>` is not suitable, could 
we use `Void` etc to replace `Integer`? And then `offset` field could be 
removed too. And also it's related to InventoryDumper and SQLBuilder
   



##########
kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCheckResult.java:
##########
@@ -33,12 +31,33 @@ public final class DataConsistencyCheckResult {
     
     private final DataConsistencyContentCheckResult contentCheckResult;
     
+    private final DataConsistencyCheckIgnoredType checkIgnoredType;
+    
+    private final boolean ignored;

Review Comment:
   1, It's better to put ignored related fields before check result fields;
   
   2, `ignored` field may be not necessary, could we just add `isIgnored()` 
method;
   



##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java:
##########
@@ -190,4 +190,10 @@ public String buildSplitByPrimaryKeyRangeSQL(final String 
schemaName, final Stri
         return String.format("SELECT MAX(%s),COUNT(*) FROM (SELECT %s FROM %s 
WHERE %s>=? ORDER BY %s LIMIT ?) t",
                 quotedUniqueKey, quotedUniqueKey, 
getQualifiedTableName(schemaName, tableName), quotedUniqueKey, quotedUniqueKey);
     }
+    
+    @Override
+    public String buildSelectOffsetSQL(final String schemaName, final String 
tableName) {
+        String qualifiedTableName = getQualifiedTableName(schemaName, 
tableName);
+        return String.format("SELECT * FROM %s LIMIT ? OFFSET ?", 
qualifiedTableName);

Review Comment:
   It's better not use LIMIT and OFFSET. Since records ordering are not 
guaranteed, it might get duplicated records or miss some records?



##########
kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusResultSet.java:
##########
@@ -42,13 +43,17 @@ public final class ShowMigrationCheckStatusResultSet 
implements DatabaseDistSQLR
     @Override
     public void init(final ShardingSphereDatabase database, final SQLStatement 
sqlStatement) {
         ShowMigrationCheckStatusStatement checkMigrationStatement = 
(ShowMigrationCheckStatusStatement) sqlStatement;
-        ConsistencyCheckJobItemInfo info = 
jobAPI.getJobItemInfo(checkMigrationStatement.getJobId());
-        String checkResult = null == info.getCheckSuccess() ? "" : 
info.getCheckSuccess().toString();
-        Collection<Object> result = 
Arrays.asList(Optional.ofNullable(info.getTableNames()).orElse(""), checkResult,
-                String.valueOf(info.getFinishedPercentage()), 
info.getRemainingSeconds(),
-                Optional.ofNullable(info.getCheckBeginTime()).orElse(""), 
Optional.ofNullable(info.getCheckEndTime()).orElse(""),
-                info.getDurationSeconds(), 
Optional.ofNullable(info.getErrorMessage()).orElse(""));
-        data = Collections.singletonList(result).iterator();

Review Comment:
   This code block could be extracted as a new method, and then reference it in 
for each loop



##########
test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/NonePrimaryKeyMigrationE2EIT.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.primarykey;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import 
org.apache.shardingsphere.test.e2e.data.pipeline.cases.base.PipelineBaseE2EIT;
+import 
org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.AbstractMigrationE2EIT;
+import 
org.apache.shardingsphere.test.e2e.data.pipeline.env.enums.PipelineEnvTypeEnum;
+import 
org.apache.shardingsphere.test.e2e.data.pipeline.framework.helper.PipelineCaseHelper;
+import 
org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineTestParameter;
+import 
org.apache.shardingsphere.test.e2e.data.pipeline.util.AutoIncrementKeyGenerateAlgorithm;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertFalse;
+
+@RunWith(Parameterized.class)
+@Slf4j
+public final class NonePrimaryKeyMigrationE2EIT extends AbstractMigrationE2EIT 
{
+    
+    private final PipelineTestParameter testParam;
+    
+    public NonePrimaryKeyMigrationE2EIT(final PipelineTestParameter testParam) 
{
+        super(testParam);
+        this.testParam = testParam;
+    }
+    
+    @Parameters(name = "{0}")
+    public static Collection<PipelineTestParameter> getTestParameters() {
+        Collection<PipelineTestParameter> result = new LinkedList<>();
+        if (PipelineBaseE2EIT.ENV.getItEnvType() == PipelineEnvTypeEnum.NONE) {
+            return result;
+        }
+        for (String version : 
PipelineBaseE2EIT.ENV.listStorageContainerImages(new MySQLDatabaseType())) {
+            result.add(new PipelineTestParameter(new MySQLDatabaseType(), 
version, "env/scenario/primary_key/none_primary_key/mysql.xml"));
+        }
+        return result;
+    }
+    
+    @Override
+    protected String getSourceTableOrderName() {
+        return "t_order";
+    }
+    
+    @Test
+    public void assertTextPrimaryMigrationSuccess() throws SQLException, 
InterruptedException {
+        log.info("assertTextPrimaryMigrationSuccess testParam:{}", testParam);
+        createSourceOrderTable();
+        try (Connection connection = getSourceDataSource().getConnection()) {
+            AutoIncrementKeyGenerateAlgorithm generateAlgorithm = new 
AutoIncrementKeyGenerateAlgorithm();
+            
PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection, 
generateAlgorithm, getSourceTableOrderName(), 
PipelineBaseE2EIT.TABLE_INIT_ROW_COUNT);
+        }
+        addMigrationProcessConfig();
+        addMigrationSourceResource();
+        addMigrationTargetResource();
+        createTargetOrderTableRule();
+        startMigration(getSourceTableOrderName(), getTargetTableOrderName());
+        String jobId = listJobId().get(0);
+        waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", 
jobId));
+        proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
+        assertTargetAndSourceCountAreSame();
+        if (PipelineBaseE2EIT.ENV.getItEnvType() == 
PipelineEnvTypeEnum.DOCKER) {
+            commitMigrationByJobId(jobId);
+            List<String> lastJobIds = listJobId();
+            assertThat(lastJobIds.size(), is(0));
+        }

Review Comment:
   We could just commit migration job, no matter env type. And also in 
TextPrimaryKeyMigrationE2EIT. Keep consistent with other migration E2E IT cases.



##########
kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java:
##########
@@ -213,56 +220,85 @@ public void dropByParentJobId(final String parentJobId) {
     }
     
     /**
-     * Get consistency job item info.
+     * Get consistency job item infos.
      *
      * @param parentJobId parent job id
-     * @return consistency job item info
+     * @return consistency job item infos
      */
-    public ConsistencyCheckJobItemInfo getJobItemInfo(final String 
parentJobId) {
+    public List<ConsistencyCheckJobItemInfo> getJobItemInfos(final String 
parentJobId) {

Review Comment:
   It's better to extract or keep `getJobItemInfo` method (private), and then 
add `getJobItemInfos` method to reference it



##########
kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java:
##########
@@ -77,12 +78,17 @@ public Map<String, DataConsistencyCheckResult> check(final 
DataConsistencyCalcul
         verifyPipelineDatabaseType(calculateAlgorithm, jobConfig.getTarget());
         SchemaTableName sourceTable = new SchemaTableName(new 
SchemaName(tableNameSchemaNameMapping.getSchemaName(jobConfig.getSourceTableName())),
 new TableName(jobConfig.getSourceTableName()));
         SchemaTableName targetTable = new SchemaTableName(new 
SchemaName(tableNameSchemaNameMapping.getSchemaName(jobConfig.getTargetTableName())),
 new TableName(jobConfig.getTargetTableName()));
+        progressContext.getTableNames().add(jobConfig.getSourceTableName());

Review Comment:
   Why `progressContext.getTableNames().add(jobConfig.getSourceTableName());` 
is moved to here, it's separated with 
`progressContext.setRecordsCount(getRecordsCount());`, is it used in new added 
code block?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to