sandynz commented on code in PR #21526:
URL: https://github.com/apache/shardingsphere/pull/21526#discussion_r993327266
##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java:
##########
@@ -110,7 +111,12 @@ private DataConsistencyCheckResult check(final
DataConsistencyCalculateAlgorithm
checkJobItemContext.setTableNames(Collections.singletonList(sourceTableName));
InventoryIncrementalJobPublicAPI inventoryIncrementalJobPublicAPI
=
PipelineJobPublicAPIFactory.getInventoryIncrementalJobPublicAPI(PipelineJobIdUtils.parseJobType(jobId).getTypeName());
Map<Integer, InventoryIncrementalJobItemProgress> jobProgress =
inventoryIncrementalJobPublicAPI.getJobProgress(jobId);
- long recordsCount =
jobProgress.values().stream().filter(Objects::nonNull).mapToLong(InventoryIncrementalJobItemProgress::getProcessedRecordsCount).sum();
+ long recordsCount;
+ if (calculateAlgorithm instanceof
CRC32MatchDataConsistencyCalculateAlgorithm) {
+ recordsCount = tableMetaData.getColumnNames().size();
Review Comment:
1, This class is for common usage, could not hard-code
`CRC32MatchDataConsistencyCalculateAlgorithm` here.
2, `DataConsistencyCalculatedResult` has `getRecordsCount()` method, we
could get records count for every algorithm.
##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java:
##########
@@ -35,21 +35,23 @@
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
-import java.util.stream.Collectors;
/**
* CRC32 match data consistency calculate algorithm.
*/
@Slf4j
-public final class CRC32MatchDataConsistencyCalculateAlgorithm extends
AbstractDataConsistencyCalculateAlgorithm {
+public final class CRC32MatchDataConsistencyCalculateAlgorithm extends
AbstractStreamingDataConsistencyCalculateAlgorithm {
private static final Collection<String> SUPPORTED_DATABASE_TYPES =
Collections.singletonList(new MySQLDatabaseType().getType());
+ private final PipelineSQLBuilder sqlBuilder =
PipelineSQLBuilderFactory.getInstance(new MySQLDatabaseType().getType());
Review Comment:
Could not hard-code with MySQL
##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java:
##########
@@ -59,13 +61,18 @@ public void init(final Properties props) {
}
@Override
- public Iterable<DataConsistencyCalculatedResult> calculate(final
DataConsistencyCalculateParameter parameter) {
- PipelineSQLBuilder sqlBuilder =
PipelineSQLBuilderFactory.getInstance(parameter.getDatabaseType());
- List<CalculatedItem> calculatedItems =
parameter.getColumnNames().stream().map(each -> calculateCRC32(sqlBuilder,
parameter, each)).collect(Collectors.toList());
- return Collections.singletonList(new
CalculatedResult(calculatedItems.get(0).getRecordsCount(),
calculatedItems.stream().map(CalculatedItem::getCrc32).collect(Collectors.toList())));
+ protected Optional<DataConsistencyCalculatedResult> calculateChunk(final
DataConsistencyCalculateParameter parameter) {
+ CalculatedResult previousCalculatedResult = (CalculatedResult)
parameter.getPreviousCalculatedResult();
+ int columIndex = null == previousCalculatedResult ? 0 :
(previousCalculatedResult.getColumnIndex() + 1);
+ if (columIndex >= parameter.getColumnNames().size()) {
+ return Optional.empty();
+ }
+ List<String> columnNames = new ArrayList<>(parameter.getColumnNames());
+ CalculatedItem calculatedItem = calculateCRC32(parameter,
columnNames.get(columIndex));
+ return Optional.of(new CalculatedResult(1, calculatedItem.getCrc32(),
columIndex));
Review Comment:
1, We could get real records count from DataConsistencyCalculatedResult, but
not hard-coded `1`.
2, If there's much records, calculate on every column will cost much time,
the progress is still not updated, so it's better to calculate crc32 block by
block.
##########
kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithmTest.java:
##########
@@ -53,40 +45,17 @@ public final class
CRC32MatchDataConsistencyCalculateAlgorithmTest {
@Mock
private PipelineDataSourceWrapper pipelineDataSource;
- @Mock(answer = Answers.RETURNS_DEEP_STUBS)
- private Connection connection;
-
@Before
public void setUp() throws SQLException {
PipelineColumnMetaData uniqueKey = new PipelineColumnMetaData(1, "id",
Types.INTEGER, "integer", false, true, true);
parameter = new DataConsistencyCalculateParameter(pipelineDataSource,
null,
"foo_tbl", Arrays.asList("foo_col", "bar_col"), "FIXTURE",
"FIXTURE", uniqueKey);
- when(pipelineDataSource.getConnection()).thenReturn(connection);
}
- @Test
- public void assertCalculateSuccess() throws SQLException {
- PreparedStatement preparedStatement0 = mockPreparedStatement(123L, 10);
- when(connection.prepareStatement("SELECT CRC32(foo_col) FROM
foo_tbl")).thenReturn(preparedStatement0);
- PreparedStatement preparedStatement1 = mockPreparedStatement(456L, 10);
- when(connection.prepareStatement("SELECT CRC32(bar_col) FROM
foo_tbl")).thenReturn(preparedStatement1);
+ @Test(expected =
UnsupportedCRC32DataConsistencyCalculateAlgorithmException.class)
+ public void assertCalculateFailed() {
Review Comment:
Why is it unsupported?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]