sandynz commented on code in PR #23808: URL: https://github.com/apache/shardingsphere/pull/23808#discussion_r1090159326
########## kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCheckIgnoredType.java: ########## @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.data.pipeline.api.check.consistency; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +/** + * Data consistency check ignored type. + */ +@RequiredArgsConstructor +@Getter +public enum DataConsistencyCheckIgnoredType { + + UNKNOWN("Unknown data consistency check ignored type"), Review Comment: Could we remove `UNKNOWN` if it's not necessary ########## kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCheckIgnoredType.java: ########## @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.data.pipeline.api.check.consistency; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +/** + * Data consistency check ignored type. + */ +@RequiredArgsConstructor +@Getter +public enum DataConsistencyCheckIgnoredType { + + UNKNOWN("Unknown data consistency check ignored type"), + + NONE_PRIMARY_KEY("Data consistency check are not supported for tables without primary keys"); + Review Comment: `NO_UNIQUE_KEY` might be better than `NONE_PRIMARY_KEY` ########## kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineTableMetaDataUtil.java: ########## @@ -43,9 +44,9 @@ public final class PipelineTableMetaDataUtil { * @param metaDataLoader meta data loader * @return pipeline column meta data */ - public static PipelineColumnMetaData getUniqueKeyColumn(final String schemaName, final String tableName, final PipelineTableMetaDataLoader metaDataLoader) { + public static Optional<PipelineColumnMetaData> getUniqueKeyColumn(final String schemaName, final String tableName, final PipelineTableMetaDataLoader metaDataLoader) { PipelineTableMetaData pipelineTableMetaData = metaDataLoader.getTableMetaData(schemaName, tableName); - return mustGetAnAppropriateUniqueKeyColumn(pipelineTableMetaData, tableName); + return Optional.ofNullable(mustGetAnAppropriateUniqueKeyColumn(pipelineTableMetaData, tableName)); } private static PipelineColumnMetaData mustGetAnAppropriateUniqueKeyColumn(final PipelineTableMetaData tableMetaData, final String tableName) { Review Comment: Since it might return `null`, method name `mustGetAnAppropriateUniqueKeyColumn` should be updated ########## kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java: ########## @@ -150,6 +150,14 @@ default Optional<String> buildCreateSchemaSQL(String schemaName) { */ String buildSplitByPrimaryKeyRangeSQL(String schemaName, String tableName, String primaryKey); + /** + * Build select offset SQL. + * @param schemaName schema name Review Comment: 1, It's better to add newline between javadoc description and params. 2, Consider to rename method. It's better not use offset when there's no unique key. ########## kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCheckResult.java: ########## @@ -33,12 +31,33 @@ public final class DataConsistencyCheckResult { private final DataConsistencyContentCheckResult contentCheckResult; + private final DataConsistencyCheckIgnoredType checkIgnoredType; + + private final boolean ignored; + + public DataConsistencyCheckResult(final DataConsistencyCountCheckResult countCheckResult, final DataConsistencyContentCheckResult contentCheckResult) { + this.countCheckResult = countCheckResult; + this.contentCheckResult = contentCheckResult; + checkIgnoredType = null; + ignored = false; + } + + public DataConsistencyCheckResult(final DataConsistencyCheckIgnoredType checkIgnoredType) { + this.checkIgnoredType = checkIgnoredType; + ignored = true; + countCheckResult = null; + contentCheckResult = null; + } + /** * Is count and content matched. * * @return matched or not */ public boolean isMatched() { + if (ignored || null == countCheckResult || null == contentCheckResult) { Review Comment: Looks it's not necessary to add ` || null == countCheckResult || null == contentCheckResult`, they're required when ignored is false ########## kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/position/NonePrimaryKeyPosition.java: ########## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.data.pipeline.api.ingest.position; + +import lombok.RequiredArgsConstructor; + +/** + * None primary key position. + */ +@RequiredArgsConstructor +public final class NonePrimaryKeyPosition extends PrimaryKeyPosition<Integer> implements IngestPosition<NonePrimaryKeyPosition> { Review Comment: 1, Class name could be changed, similar as enum name; 2, Seems generic type `PrimaryKeyPosition<Integer>` is not suitable, could we use `Void` etc to replace `Integer`? And then `offset` field could be removed too. And also it's related to InventoryDumper and SQLBuilder ########## kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCheckResult.java: ########## @@ -33,12 +31,33 @@ public final class DataConsistencyCheckResult { private final DataConsistencyContentCheckResult contentCheckResult; + private final DataConsistencyCheckIgnoredType checkIgnoredType; + + private final boolean ignored; Review Comment: 1, It's better to put ignored related fields before check result fields; 2, `ignored` field may be not necessary, could we just add `isIgnored()` method; ########## kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java: ########## @@ -190,4 +190,10 @@ public String buildSplitByPrimaryKeyRangeSQL(final String schemaName, final Stri return String.format("SELECT MAX(%s),COUNT(*) FROM (SELECT %s FROM %s WHERE %s>=? ORDER BY %s LIMIT ?) t", quotedUniqueKey, quotedUniqueKey, getQualifiedTableName(schemaName, tableName), quotedUniqueKey, quotedUniqueKey); } + + @Override + public String buildSelectOffsetSQL(final String schemaName, final String tableName) { + String qualifiedTableName = getQualifiedTableName(schemaName, tableName); + return String.format("SELECT * FROM %s LIMIT ? OFFSET ?", qualifiedTableName); Review Comment: It's better not use LIMIT and OFFSET. Since records ordering are not guaranteed, it might get duplicated records or miss some records? ########## kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusResultSet.java: ########## @@ -42,13 +43,17 @@ public final class ShowMigrationCheckStatusResultSet implements DatabaseDistSQLR @Override public void init(final ShardingSphereDatabase database, final SQLStatement sqlStatement) { ShowMigrationCheckStatusStatement checkMigrationStatement = (ShowMigrationCheckStatusStatement) sqlStatement; - ConsistencyCheckJobItemInfo info = jobAPI.getJobItemInfo(checkMigrationStatement.getJobId()); - String checkResult = null == info.getCheckSuccess() ? "" : info.getCheckSuccess().toString(); - Collection<Object> result = Arrays.asList(Optional.ofNullable(info.getTableNames()).orElse(""), checkResult, - String.valueOf(info.getFinishedPercentage()), info.getRemainingSeconds(), - Optional.ofNullable(info.getCheckBeginTime()).orElse(""), Optional.ofNullable(info.getCheckEndTime()).orElse(""), - info.getDurationSeconds(), Optional.ofNullable(info.getErrorMessage()).orElse("")); - data = Collections.singletonList(result).iterator(); Review Comment: This code block could be extracted as a new method, and then reference it in for each loop ########## test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/NonePrimaryKeyMigrationE2EIT.java: ########## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.primarykey; + +import lombok.extern.slf4j.Slf4j; +import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType; +import org.apache.shardingsphere.test.e2e.data.pipeline.cases.base.PipelineBaseE2EIT; +import org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.AbstractMigrationE2EIT; +import org.apache.shardingsphere.test.e2e.data.pipeline.env.enums.PipelineEnvTypeEnum; +import org.apache.shardingsphere.test.e2e.data.pipeline.framework.helper.PipelineCaseHelper; +import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineTestParameter; +import org.apache.shardingsphere.test.e2e.data.pipeline.util.AutoIncrementKeyGenerateAlgorithm; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertFalse; + +@RunWith(Parameterized.class) +@Slf4j +public final class NonePrimaryKeyMigrationE2EIT extends AbstractMigrationE2EIT { + + private final PipelineTestParameter testParam; + + public NonePrimaryKeyMigrationE2EIT(final PipelineTestParameter testParam) { + super(testParam); + this.testParam = testParam; + } + + @Parameters(name = "{0}") + public static Collection<PipelineTestParameter> getTestParameters() { + Collection<PipelineTestParameter> result = new LinkedList<>(); + if (PipelineBaseE2EIT.ENV.getItEnvType() == PipelineEnvTypeEnum.NONE) { + return result; + } + for (String version : PipelineBaseE2EIT.ENV.listStorageContainerImages(new MySQLDatabaseType())) { + result.add(new PipelineTestParameter(new MySQLDatabaseType(), version, "env/scenario/primary_key/none_primary_key/mysql.xml")); + } + return result; + } + + @Override + protected String getSourceTableOrderName() { + return "t_order"; + } + + @Test + public void assertTextPrimaryMigrationSuccess() throws SQLException, InterruptedException { + log.info("assertTextPrimaryMigrationSuccess testParam:{}", testParam); + createSourceOrderTable(); + try (Connection connection = getSourceDataSource().getConnection()) { + AutoIncrementKeyGenerateAlgorithm generateAlgorithm = new AutoIncrementKeyGenerateAlgorithm(); + PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection, generateAlgorithm, getSourceTableOrderName(), PipelineBaseE2EIT.TABLE_INIT_ROW_COUNT); + } + addMigrationProcessConfig(); + addMigrationSourceResource(); + addMigrationTargetResource(); + createTargetOrderTableRule(); + startMigration(getSourceTableOrderName(), getTargetTableOrderName()); + String jobId = listJobId().get(0); + waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId)); + proxyExecuteWithLog("REFRESH TABLE METADATA", 1); + assertTargetAndSourceCountAreSame(); + if (PipelineBaseE2EIT.ENV.getItEnvType() == PipelineEnvTypeEnum.DOCKER) { + commitMigrationByJobId(jobId); + List<String> lastJobIds = listJobId(); + assertThat(lastJobIds.size(), is(0)); + } Review Comment: We could just commit migration job, no matter env type. And also in TextPrimaryKeyMigrationE2EIT. Keep consistent with other migration E2E IT cases. ########## kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java: ########## @@ -213,56 +220,85 @@ public void dropByParentJobId(final String parentJobId) { } /** - * Get consistency job item info. + * Get consistency job item infos. * * @param parentJobId parent job id - * @return consistency job item info + * @return consistency job item infos */ - public ConsistencyCheckJobItemInfo getJobItemInfo(final String parentJobId) { + public List<ConsistencyCheckJobItemInfo> getJobItemInfos(final String parentJobId) { Review Comment: It's better to extract or keep `getJobItemInfo` method (private), and then add `getJobItemInfos` method to reference it ########## kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java: ########## @@ -77,12 +78,17 @@ public Map<String, DataConsistencyCheckResult> check(final DataConsistencyCalcul verifyPipelineDatabaseType(calculateAlgorithm, jobConfig.getTarget()); SchemaTableName sourceTable = new SchemaTableName(new SchemaName(tableNameSchemaNameMapping.getSchemaName(jobConfig.getSourceTableName())), new TableName(jobConfig.getSourceTableName())); SchemaTableName targetTable = new SchemaTableName(new SchemaName(tableNameSchemaNameMapping.getSchemaName(jobConfig.getTargetTableName())), new TableName(jobConfig.getTargetTableName())); + progressContext.getTableNames().add(jobConfig.getSourceTableName()); Review Comment: Why `progressContext.getTableNames().add(jobConfig.getSourceTableName());` is moved to here, it's separated with `progressContext.setRecordsCount(getRecordsCount());`, is it used in new added code block? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
