This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 5acd45ee2eb Refactor CRC32MatchSingleTableDataCalculator (#16872)
5acd45ee2eb is described below
commit 5acd45ee2ebdcb55cadcc0b9d65d71c3ca57d04f
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Apr 16 07:54:22 2022 +0800
Refactor CRC32MatchSingleTableDataCalculator (#16872)
* Refactor CRC32MatchSingleTableDataCalculator
* make SingleTableDataCalculator extends StatefulTypedSPI
* Remve SingleTableDataCalculator.AlgorithmProps
* Remve SingleTableDataCalculator.AlgorithmProps
* Revise java doc
* Rename FixtureSingleTableDataCalculator
* Refactor DataMatchSingleTableDataCalculator
* Refactor DataMatchSingleTableDataCalculator
* Refactor DataMatchSingleTableDataCalculator
* Remove AbstractSingleTableDataCalculator
---
.../consistency/DataConsistencyCheckerImpl.java | 2 +-
.../SingleTableDataCalculatorFactory.java | 42 ++---------
.../datasource/AbstractDataSourceChecker.java | 2 +-
.../AbstractDataConsistencyCheckAlgorithm.java | 4 +-
.../AbstractSingleTableDataCalculator.java | 38 ----------
...AbstractStreamingSingleTableDataCalculator.java | 3 +-
.../CRC32MatchSingleTableDataCalculator.java} | 56 ++++++--------
.../DataMatchSingleTableDataCalculator.java | 45 ++++++------
.../core/sqlbuilder/PipelineSQLBuilderFactory.java | 6 +-
.../rulealtered/prepare/InventoryTaskSplitter.java | 2 +-
.../job/environment/ScalingEnvironmentManager.java | 2 +-
...spi.check.consistency.SingleTableDataCalculator | 1 +
.../CRC32MatchSingleTableDataCalculatorTest.java} | 59 ++++++---------
.../fixture/FixturePipelineSQLBuilder.java | 85 ++++++++++++++++++++++
...spi.check.consistency.SingleTableDataCalculator | 3 +-
...ata.pipeline.spi.sqlbuilder.PipelineSQLBuilder} | 2 +-
.../datasource/MySQLDataSourcePreparer.java | 2 +-
.../mysql/sqlbuilder/MySQLPipelineSQLBuilder.java | 13 +---
.../sqlbuilder/MySQLPipelineSQLBuilderTest.java | 7 +-
...32MatchPostgreSQLSingleTableDataCalculator.java | 50 -------------
.../PostgreSQLPipelineSQLBuilderTest.java | 2 +-
.../consistency/SingleTableDataCalculator.java | 42 ++---------
.../spi/sqlbuilder/PipelineSQLBuilder.java | 12 +++
.../SingleTableDataCalculatorFactoryTest.java | 17 +----
.../FixtureDataConsistencyCheckAlgorithm.java | 2 +-
....java => FixtureSingleTableDataCalculator.java} | 18 ++---
...RC32MatchDataConsistencyCheckAlgorithmTest.java | 2 +-
...spi.check.consistency.SingleTableDataCalculator | 2 +-
28 files changed, 213 insertions(+), 308 deletions(-)
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java
index 0eb046f7025..5fb325b6cc7 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java
@@ -131,7 +131,7 @@ public final class DataConsistencyCheckerImpl implements
DataConsistencyChecker
private long count(final DataSource dataSource, final String table, final
DatabaseType databaseType) {
try (Connection connection = dataSource.getConnection();
- PreparedStatement preparedStatement =
connection.prepareStatement(PipelineSQLBuilderFactory.getSQLBuilder(databaseType.getName()).buildCountSQL(table));
+ PreparedStatement preparedStatement =
connection.prepareStatement(PipelineSQLBuilderFactory.newInstance(databaseType.getName()).buildCountSQL(table));
ResultSet resultSet = preparedStatement.executeQuery()) {
resultSet.next();
return resultSet.getLong(1);
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableDataCalculatorFactory.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableDataCalculatorFactory.java
index c7f2b238db3..b0425ba4d4c 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableDataCalculatorFactory.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableDataCalculatorFactory.java
@@ -17,58 +17,28 @@
package org.apache.shardingsphere.data.pipeline.core.check.consistency;
-import com.google.common.base.Preconditions;
-import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator;
import org.apache.shardingsphere.spi.ShardingSphereServiceLoader;
-import
org.apache.shardingsphere.spi.exception.ServiceLoaderInstantiationException;
+import org.apache.shardingsphere.spi.type.typed.TypedSPIRegistry;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.Properties;
/**
* Single table data calculator factory.
*/
-@Slf4j
public final class SingleTableDataCalculatorFactory {
- private static final Map<String, Map<String, SingleTableDataCalculator>>
ALGORITHM_DATABASE_CALCULATOR_MAP = new HashMap<>();
-
static {
ShardingSphereServiceLoader.register(SingleTableDataCalculator.class);
- for (SingleTableDataCalculator each :
ShardingSphereServiceLoader.getSingletonServiceInstances(SingleTableDataCalculator.class))
{
- Map<String, SingleTableDataCalculator> dataCalculatorMap =
ALGORITHM_DATABASE_CALCULATOR_MAP.computeIfAbsent(each.getAlgorithmType(),
algorithmType -> new HashMap<>());
- for (String databaseType : each.getDatabaseTypes()) {
- SingleTableDataCalculator replaced =
dataCalculatorMap.put(databaseType, each);
- if (null != replaced) {
- log.warn("element replaced, algorithmType={},
databaseTypes={}, current={}, replaced={}",
- each.getAlgorithmType(), each.getDatabaseTypes(),
each.getClass().getName(), replaced.getClass().getName());
- }
- }
- }
}
/**
- * New service instance.
+ * Create new instance of single table data calculator.
*
* @param algorithmType algorithm type
- * @param databaseType database type
- * @return single table data calculator
- * @throws NullPointerException if calculator not found
- * @throws ServiceLoaderInstantiationException if new instance by
reflection failed
+ * @return new instance of single table data calculator
*/
- public static SingleTableDataCalculator newServiceInstance(final String
algorithmType, final String databaseType) {
-
-
-
- Map<String, SingleTableDataCalculator> calculatorMap =
ALGORITHM_DATABASE_CALCULATOR_MAP.get(algorithmType);
- Preconditions.checkNotNull(calculatorMap, String.format("calculator
not found for algorithmType '%s'", algorithmType));
- SingleTableDataCalculator calculator = calculatorMap.get(databaseType);
- Preconditions.checkNotNull(calculator, String.format("calculator not
found for algorithmType '%s' databaseType '%s'", algorithmType, databaseType));
- try {
- return
calculator.getClass().getDeclaredConstructor().newInstance();
- } catch (final ReflectiveOperationException ex) {
- throw new
ServiceLoaderInstantiationException(calculator.getClass(), ex);
- }
+ public static SingleTableDataCalculator newInstance(final String
algorithmType) {
+ return
TypedSPIRegistry.getRegisteredService(SingleTableDataCalculator.class,
algorithmType, new Properties());
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/datasource/AbstractDataSourceChecker.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/datasource/AbstractDataSourceChecker.java
index 9c387f1165e..5fc4191846d 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/datasource/AbstractDataSourceChecker.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/datasource/AbstractDataSourceChecker.java
@@ -70,7 +70,7 @@ public abstract class AbstractDataSourceChecker implements
DataSourceChecker {
}
private PipelineSQLBuilder getSQLBuilder() {
- return PipelineSQLBuilderFactory.getSQLBuilder(getDatabaseType());
+ return PipelineSQLBuilderFactory.newInstance(getDatabaseType());
}
protected abstract String getDatabaseType();
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/AbstractDataConsistencyCheckAlgorithm.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/AbstractDataConsistencyCheckAlgorithm.java
index 1750cfdd0c5..151ce31787c 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/AbstractDataConsistencyCheckAlgorithm.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/AbstractDataConsistencyCheckAlgorithm.java
@@ -40,8 +40,8 @@ public abstract class AbstractDataConsistencyCheckAlgorithm
implements DataConsi
@Override
public final SingleTableDataCalculator getSingleTableDataCalculator(final
String supportedDatabaseType) {
- SingleTableDataCalculator result =
SingleTableDataCalculatorFactory.newServiceInstance(getType(),
supportedDatabaseType);
- result.setAlgorithmProps(props);
+ SingleTableDataCalculator result =
SingleTableDataCalculatorFactory.newInstance(getType());
+ result.setProps(props);
result.init();
return result;
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/AbstractSingleTableDataCalculator.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/AbstractSingleTableDataCalculator.java
deleted file mode 100644
index 972f9f6779f..00000000000
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/AbstractSingleTableDataCalculator.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.spi.check.consistency;
-
-import lombok.Getter;
-import lombok.Setter;
-import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator;
-
-import java.util.Properties;
-
-/**
- * Abstract single table data calculator.
- */
-@Getter
-@Setter
-public abstract class AbstractSingleTableDataCalculator implements
SingleTableDataCalculator {
-
- private Properties algorithmProps;
-
- @Override
- public void init() {
- }
-}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/AbstractStreamingSingleTableDataCalculator.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/AbstractStreamingSingleTableDataCalculator.java
index a9730208789..373a80fb46f 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/AbstractStreamingSingleTableDataCalculator.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/AbstractStreamingSingleTableDataCalculator.java
@@ -21,6 +21,7 @@ import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataCalculateParameter;
+import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator;
import java.util.Iterator;
import java.util.Optional;
@@ -32,7 +33,7 @@ import java.util.concurrent.atomic.AtomicInteger;
@RequiredArgsConstructor
@Getter
@Slf4j
-public abstract class AbstractStreamingSingleTableDataCalculator extends
AbstractSingleTableDataCalculator {
+public abstract class AbstractStreamingSingleTableDataCalculator implements
SingleTableDataCalculator {
@Override
public final Iterable<Object> calculate(final DataCalculateParameter
dataCalculateParameter) {
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/check/consistency/CRC32MatchMySQLSingleTableDataCalculator.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/CRC32MatchSingleTableDataCalculator.java
similarity index 51%
rename from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/check/consistency/CRC32MatchMySQLSingleTableDataCalculator.java
rename to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/CRC32MatchSingleTableDataCalculator.java
index 565bac7975e..f111d7240df 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/check/consistency/CRC32MatchMySQLSingleTableDataCalculator.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/CRC32MatchSingleTableDataCalculator.java
@@ -15,57 +15,44 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.mysql.check.consistency;
+package org.apache.shardingsphere.data.pipeline.core.spi.check.consistency;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataCalculateParameter;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineDataConsistencyCheckFailedException;
-import
org.apache.shardingsphere.data.pipeline.core.spi.check.consistency.AbstractSingleTableDataCalculator;
-import
org.apache.shardingsphere.data.pipeline.core.spi.check.consistency.CRC32MatchDataConsistencyCheckAlgorithm;
import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
-import
org.apache.shardingsphere.data.pipeline.mysql.sqlbuilder.MySQLPipelineSQLBuilder;
-import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator;
+import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
-import java.util.Collection;
import java.util.Collections;
-import java.util.List;
+import java.util.Optional;
import java.util.stream.Collectors;
/**
- * CRC32 match MySQL implementation of single table data calculator.
+ * CRC32 match single table data calculator.
*/
-public final class CRC32MatchMySQLSingleTableDataCalculator extends
AbstractSingleTableDataCalculator {
-
- private static final Collection<String> DATABASE_TYPES =
Collections.singletonList(new MySQLDatabaseType().getName());
-
- private static final MySQLPipelineSQLBuilder SQL_BUILDER =
(MySQLPipelineSQLBuilder) PipelineSQLBuilderFactory.getSQLBuilder("MySQL");
+public final class CRC32MatchSingleTableDataCalculator implements
SingleTableDataCalculator {
@Override
- public String getAlgorithmType() {
- return CRC32MatchDataConsistencyCheckAlgorithm.TYPE;
- }
-
- @Override
- public Collection<String> getDatabaseTypes() {
- return DATABASE_TYPES;
+ public Iterable<Object> calculate(final DataCalculateParameter
dataCalculateParameter) {
+ PipelineSQLBuilder sqlBuilder =
PipelineSQLBuilderFactory.newInstance(dataCalculateParameter.getDatabaseType());
+ return
Collections.unmodifiableList(dataCalculateParameter.getColumnNames().stream().map(each
-> calculateCRC32(sqlBuilder, dataCalculateParameter,
each)).collect(Collectors.toList()));
}
- @Override
- public Iterable<Object> calculate(final DataCalculateParameter
dataCalculateParameter) {
- String logicTableName = dataCalculateParameter.getLogicTableName();
- List<Long> result =
dataCalculateParameter.getColumnNames().stream().map(each -> {
- String sql = SQL_BUILDER.buildCRC32SQL(logicTableName, each);
- try {
- return calculateCRC32(dataCalculateParameter.getDataSource(),
sql);
- } catch (final SQLException ex) {
- throw new
PipelineDataConsistencyCheckFailedException(String.format("table %s data check
failed.", logicTableName), ex);
- }
- }).collect(Collectors.toList());
- return Collections.unmodifiableList(result);
+ private long calculateCRC32(final PipelineSQLBuilder sqlBuilder, final
DataCalculateParameter dataCalculateParameter, final String columnName) {
+ Optional<String> sql =
sqlBuilder.buildCRC32SQL(dataCalculateParameter.getLogicTableName(),
columnName);
+ if (!sql.isPresent()) {
+ throw new
PipelineDataConsistencyCheckFailedException(String.format("Unsupported
CRC32MatchSingleTableDataCalculator with database type `%s`",
dataCalculateParameter.getDatabaseType()));
+ }
+ try {
+ return calculateCRC32(dataCalculateParameter.getDataSource(),
sql.get());
+ } catch (final SQLException ex) {
+ throw new
PipelineDataConsistencyCheckFailedException(String.format("Table `%s` data
check failed.", dataCalculateParameter.getLogicTableName()), ex);
+ }
}
private long calculateCRC32(final DataSource dataSource, final String sql)
throws SQLException {
@@ -76,4 +63,9 @@ public final class CRC32MatchMySQLSingleTableDataCalculator
extends AbstractSing
return resultSet.getLong(1);
}
}
+
+ @Override
+ public String getType() {
+ return "CRC32_MATCH";
+ }
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/DataMatchSingleTableDataCalculator.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/DataMatchSingleTableDataCalculator.java
index dffeb2bbd46..46ebf5e630e 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/DataMatchSingleTableDataCalculator.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/DataMatchSingleTableDataCalculator.java
@@ -17,10 +17,10 @@
package org.apache.shardingsphere.data.pipeline.core.spi.check.consistency;
-import com.google.common.base.Strings;
import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
+import lombok.Setter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.builder.EqualsBuilder;
@@ -29,7 +29,6 @@ import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataCalcula
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineDataConsistencyCheckFailedException;
import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
-import org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry;
import javax.sql.DataSource;
import java.sql.Connection;
@@ -45,44 +44,39 @@ import java.util.Optional;
import java.util.Properties;
/**
- * Data match implementation of single table data calculator.
+ * Data match single table data calculator.
*/
+@Getter
+@Setter
@Slf4j
public final class DataMatchSingleTableDataCalculator extends
AbstractStreamingSingleTableDataCalculator {
- private static final Collection<String> DATABASE_TYPES =
DatabaseTypeRegistry.getDatabaseTypeNames();
-
private static final String CHUNK_SIZE_KEY = "chunk-size";
- private volatile int chunkSize = 1000;
+ private static final int DEFAULT_CHUNK_SIZE = 1000;
- @Override
- public String getAlgorithmType() {
- return DataMatchDataConsistencyCheckAlgorithm.TYPE;
- }
+ private int chunkSize;
- @Override
- public Collection<String> getDatabaseTypes() {
- return DATABASE_TYPES;
- }
+ private Properties props;
@Override
public void init() {
- Properties algorithmProps = getAlgorithmProps();
- String chunkSizeValue = algorithmProps.getProperty(CHUNK_SIZE_KEY);
- if (!Strings.isNullOrEmpty(chunkSizeValue)) {
- int chunkSize = Integer.parseInt(chunkSizeValue);
- if (chunkSize <= 0) {
- log.warn("invalid chunkSize={}, use default value", chunkSize);
- }
- this.chunkSize = chunkSize;
+ chunkSize = getChunkSize();
+ }
+
+ private int getChunkSize() {
+ int result = Integer.parseInt(props.getProperty(CHUNK_SIZE_KEY,
DEFAULT_CHUNK_SIZE + ""));
+ if (result <= 0) {
+ log.warn("Invalid result={}, use default value", result);
+ return DEFAULT_CHUNK_SIZE;
}
+ return result;
}
@Override
protected Optional<Object> calculateChunk(final DataCalculateParameter
dataCalculateParameter) {
String logicTableName = dataCalculateParameter.getLogicTableName();
- PipelineSQLBuilder sqlBuilder =
PipelineSQLBuilderFactory.getSQLBuilder(dataCalculateParameter.getDatabaseType());
+ PipelineSQLBuilder sqlBuilder =
PipelineSQLBuilderFactory.newInstance(dataCalculateParameter.getDatabaseType());
String uniqueKey = dataCalculateParameter.getUniqueKey();
CalculatedResult previousCalculatedResult = (CalculatedResult)
dataCalculateParameter.getPreviousCalculatedResult();
Number startUniqueKeyValue = null != previousCalculatedResult ?
previousCalculatedResult.getMaxUniqueKeyValue() : -1;
@@ -117,6 +111,11 @@ public final class DataMatchSingleTableDataCalculator
extends AbstractStreamingS
}
}
+ @Override
+ public String getType() {
+ return "DATA_MATCH";
+ }
+
@RequiredArgsConstructor
@Getter
private static final class CalculatedResult {
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineSQLBuilderFactory.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineSQLBuilderFactory.java
index b1f43becdb6..cbf686ac87a 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineSQLBuilderFactory.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineSQLBuilderFactory.java
@@ -34,12 +34,12 @@ public final class PipelineSQLBuilderFactory {
}
/**
- * Get SQL builder instance.
+ * Create new instance of pipeline SQL builder.
*
* @param databaseType database type
- * @return SQL builder
+ * @return new instance of pipeline SQL builder
*/
- public static PipelineSQLBuilder getSQLBuilder(final String databaseType) {
+ public static PipelineSQLBuilder newInstance(final String databaseType) {
return TypedSPIRegistry.getRegisteredService(PipelineSQLBuilder.class,
databaseType);
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
index b9016636c94..e383ef7278d 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
@@ -178,7 +178,7 @@ public final class InventoryTaskSplitter {
private Collection<IngestPosition<?>> getPositionByPrimaryKeyRange(final
RuleAlteredJobContext jobContext, final DataSource dataSource, final
InventoryDumperConfiguration dumperConfig) {
Collection<IngestPosition<?>> result = new ArrayList<>();
JobConfiguration jobConfig = jobContext.getJobConfig();
- String sql =
PipelineSQLBuilderFactory.getSQLBuilder(jobConfig.getHandleConfig().getSourceDatabaseType())
+ String sql =
PipelineSQLBuilderFactory.newInstance(jobConfig.getHandleConfig().getSourceDatabaseType())
.buildSplitByPrimaryKeyRangeSQL(dumperConfig.getTableName(),
dumperConfig.getPrimaryKey());
try (Connection connection = dataSource.getConnection();
PreparedStatement ps = connection.prepareStatement(sql)) {
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environment/ScalingEnvironmentManager.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environment/ScalingEnvironmentManager.java
index fc74134abbb..bcb6b3c3b7e 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environment/ScalingEnvironmentManager.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environment/ScalingEnvironmentManager.java
@@ -52,7 +52,7 @@ public final class ScalingEnvironmentManager {
try (PipelineDataSourceWrapper dataSource =
dataSourceFactory.newInstance(PipelineDataSourceConfigurationFactory.newInstance(target.getType(),
target.getParameter()));
Connection connection = dataSource.getConnection()) {
for (String each : tables) {
- String sql =
PipelineSQLBuilderFactory.getSQLBuilder(jobConfig.getHandleConfig().getTargetDatabaseType()).buildTruncateSQL(each);
+ String sql =
PipelineSQLBuilderFactory.newInstance(jobConfig.getHandleConfig().getTargetDatabaseType()).buildTruncateSQL(each);
try (PreparedStatement preparedStatement =
connection.prepareStatement(sql)) {
preparedStatement.execute();
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator
index 57490568b15..ff4be3fe311 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator
@@ -15,4 +15,5 @@
# limitations under the License.
#
+org.apache.shardingsphere.data.pipeline.core.spi.check.consistency.CRC32MatchSingleTableDataCalculator
org.apache.shardingsphere.data.pipeline.core.spi.check.consistency.DataMatchSingleTableDataCalculator
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/check/consistency/CRC32MatchMySQLSingleTableDataCalculatorTest.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/CRC32MatchSingleTableDataCalculatorTest.java
similarity index 68%
rename from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/check/consistency/CRC32MatchMySQLSingleTableDataCalculatorTest.java
rename to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/CRC32MatchSingleTableDataCalculatorTest.java
index 1a4b0b00f1b..6c4de0c103e 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/check/consistency/CRC32MatchMySQLSingleTableDataCalculatorTest.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/CRC32MatchSingleTableDataCalculatorTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.mysql.check.consistency;
+package org.apache.shardingsphere.data.pipeline.core.spi.check.consistency;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataCalculateParameter;
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
@@ -25,6 +25,7 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
+
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
@@ -35,27 +36,27 @@ import java.util.stream.StreamSupport;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-import static org.mockito.Mockito.anyString;
-import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
@RunWith(MockitoJUnitRunner.class)
-public final class CRC32MatchMySQLSingleTableDataCalculatorTest {
-
+public final class CRC32MatchSingleTableDataCalculatorTest {
+
@Mock
private DataCalculateParameter dataCalculateParameter;
-
+
private PipelineDataSourceWrapper pipelineDataSource;
-
+
private Connection connection;
-
+
@Mock
private PreparedStatement preparedStatement;
-
+
@Mock
private ResultSet resultSet;
-
+
@Before
public void setUp() throws SQLException {
pipelineDataSource = mock(PipelineDataSourceWrapper.class,
RETURNS_DEEP_STUBS);
@@ -64,55 +65,37 @@ public final class
CRC32MatchMySQLSingleTableDataCalculatorTest {
when(dataCalculateParameter.getLogicTableName()).thenReturn("tableName");
when(dataCalculateParameter.getColumnNames()).thenReturn(columnNames);
when(dataCalculateParameter.getDataSource()).thenReturn(pipelineDataSource);
+ when(dataCalculateParameter.getDatabaseType()).thenReturn("FIXTURE");
}
-
- @Test
- public void assertCRC32MatchMySQLSingleTableDataCalculatorSuccess() {
- String actualAlgorithmType = new
CRC32MatchMySQLSingleTableDataCalculator().getAlgorithmType();
- String expectedAlgorithmType = "CRC32_MATCH";
- assertThat(actualAlgorithmType, is(expectedAlgorithmType));
- }
-
- @Test
- public void assertGetDatabaseTypesSuccess() {
- Collection<String> actualDatabaseTypes = new
CRC32MatchMySQLSingleTableDataCalculator().getDatabaseTypes();
- long actualDatabaseTypesSize = actualDatabaseTypes.size();
- long expectedDatabaseTypesSize = new Long(1);
- String actualDatabaseTypesFirstElement =
actualDatabaseTypes.stream().findFirst().get();
- String expectedDatabaseTypesFirstElement = "MySQL";
- assertThat(actualDatabaseTypesSize, is(expectedDatabaseTypesSize));
- assertThat(actualDatabaseTypesFirstElement,
is(expectedDatabaseTypesFirstElement));
- }
-
+
@Test
public void assertCalculateSuccess() {
- Iterable<Object> calculate = new
CRC32MatchMySQLSingleTableDataCalculator().calculate(dataCalculateParameter);
+ Iterable<Object> calculate = new
CRC32MatchSingleTableDataCalculator().calculate(dataCalculateParameter);
long actualDatabaseTypesSize =
StreamSupport.stream(calculate.spliterator(), false).count();
long expectedDatabaseTypesSize =
dataCalculateParameter.getColumnNames().size();
assertThat(actualDatabaseTypesSize, is(expectedDatabaseTypesSize));
}
-
+
@Test
public void assertCalculateWithQuerySuccess() throws SQLException {
- String sqlCommandForFieldOne = "SELECT BIT_XOR(CAST(CRC32(`fieldOne`)
AS UNSIGNED)) AS checksum FROM `tableName`";
- String sqlCommandForFieldTwo = "SELECT BIT_XOR(CAST(CRC32(`fieldTwo`)
AS UNSIGNED)) AS checksum FROM `tableName`";
- String sqlCommandForFieldThree = "SELECT
BIT_XOR(CAST(CRC32(`fieldThree`) AS UNSIGNED)) AS checksum FROM `tableName`";
+ String sqlCommandForFieldOne = "SELECT CRC32(fieldOne) FROM tableName";
+ String sqlCommandForFieldTwo = "SELECT CRC32(fieldTwo) FROM tableName";
+ String sqlCommandForFieldThree = "SELECT CRC32(fieldThree) FROM
tableName";
when(pipelineDataSource.getConnection()).thenReturn(connection);
when(connection.prepareStatement(sqlCommandForFieldOne)).thenReturn(preparedStatement);
when(connection.prepareStatement(sqlCommandForFieldTwo)).thenReturn(preparedStatement);
when(connection.prepareStatement(sqlCommandForFieldThree)).thenReturn(preparedStatement);
when(preparedStatement.executeQuery()).thenReturn(resultSet);
-
- Iterable<Object> calculate = new
CRC32MatchMySQLSingleTableDataCalculator().calculate(dataCalculateParameter);
+ Iterable<Object> calculate = new
CRC32MatchSingleTableDataCalculator().calculate(dataCalculateParameter);
long actualDatabaseTypesSize =
StreamSupport.stream(calculate.spliterator(), false).count();
long expectedDatabaseTypesSize =
dataCalculateParameter.getColumnNames().size();
assertThat(actualDatabaseTypesSize, is(expectedDatabaseTypesSize));
}
-
+
@Test(expected = PipelineDataConsistencyCheckFailedException.class)
public void assertCalculateFailed() throws SQLException {
when(pipelineDataSource.getConnection()).thenReturn(connection);
when(connection.prepareStatement(anyString())).thenThrow(new
SQLException());
- new
CRC32MatchMySQLSingleTableDataCalculator().calculate(dataCalculateParameter);
+ new
CRC32MatchSingleTableDataCalculator().calculate(dataCalculateParameter);
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/fixture/FixturePipelineSQLBuilder.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/fixture/FixturePipelineSQLBuilder.java
new file mode 100644
index 00000000000..82b127e7baa
--- /dev/null
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/fixture/FixturePipelineSQLBuilder.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.fixture;
+
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
+import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+public final class FixturePipelineSQLBuilder implements PipelineSQLBuilder {
+
+ @Override
+ public String buildInsertSQL(final DataRecord dataRecord) {
+ return "";
+ }
+
+ @Override
+ public String buildUpdateSQL(final DataRecord dataRecord, final
Collection<Column> conditionColumns) {
+ return "";
+ }
+
+ @Override
+ public List<Column> extractUpdatedColumns(final Collection<Column>
columns, final DataRecord record) {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public String buildDeleteSQL(final DataRecord dataRecord, final
Collection<Column> conditionColumns) {
+ return "";
+ }
+
+ @Override
+ public String buildTruncateSQL(final String tableName) {
+ return "";
+ }
+
+ @Override
+ public String buildCountSQL(final String tableName) {
+ return "";
+ }
+
+ @Override
+ public String buildChunkedQuerySQL(final String tableName, final String
uniqueKey, final Number startUniqueValue) {
+ return "";
+ }
+
+ @Override
+ public String buildCheckEmptySQL(final String tableName) {
+ return null;
+ }
+
+ @Override
+ public String buildSplitByPrimaryKeyRangeSQL(final String tableName, final
String primaryKey) {
+ return "";
+ }
+
+ @Override
+ public Optional<String> buildCRC32SQL(final String tableName, final String
column) {
+ return Optional.of(String.format("SELECT CRC32(%s) FROM %s", column,
tableName));
+ }
+
+ @Override
+ public String getType() {
+ return "FIXTURE";
+ }
+}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator
similarity index 79%
rename from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator
rename to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator
index 305b7bf9ff9..ff4be3fe311 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator
@@ -15,4 +15,5 @@
# limitations under the License.
#
-org.apache.shardingsphere.data.pipeline.postgresql.check.consistency.CRC32MatchPostgreSQLSingleTableDataCalculator
+org.apache.shardingsphere.data.pipeline.core.spi.check.consistency.CRC32MatchSingleTableDataCalculator
+org.apache.shardingsphere.data.pipeline.core.spi.check.consistency.DataMatchSingleTableDataCalculator
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder
similarity index 88%
rename from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator
rename to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder
index 6ff55515394..df902547e48 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder
@@ -15,4 +15,4 @@
# limitations under the License.
#
-org.apache.shardingsphere.data.pipeline.mysql.check.consistency.CRC32MatchMySQLSingleTableDataCalculator
+org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.fixture.FixturePipelineSQLBuilder
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
index 5677ec434fe..75d9ddcae36 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
@@ -40,7 +40,7 @@ import java.util.stream.Collectors;
@Slf4j
public final class MySQLDataSourcePreparer extends AbstractDataSourcePreparer {
- private static final MySQLPipelineSQLBuilder SQL_BUILDER =
(MySQLPipelineSQLBuilder) PipelineSQLBuilderFactory.getSQLBuilder("MySQL");
+ private static final MySQLPipelineSQLBuilder SQL_BUILDER =
(MySQLPipelineSQLBuilder) PipelineSQLBuilderFactory.newInstance("MySQL");
@Override
public void prepareTargetTables(final PrepareTargetTablesParameter
parameter) {
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
index 4bd09360922..22e7e444ba5 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
@@ -23,6 +23,7 @@ import
org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.AbstractPipelineSQLBuilder;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
/**
@@ -63,15 +64,9 @@ public final class MySQLPipelineSQLBuilder extends
AbstractPipelineSQLBuilder {
return result.toString();
}
- /**
- * Build CRC32 SQL.
- *
- * @param tableName table Name
- * @param column column
- * @return select CRC32 SQL
- */
- public String buildCRC32SQL(final String tableName, final String column) {
- return String.format("SELECT BIT_XOR(CAST(CRC32(%s) AS UNSIGNED)) AS
checksum FROM %s", quote(column), quote(tableName));
+ @Override
+ public Optional<String> buildCRC32SQL(final String tableName, final String
column) {
+ return Optional.of(String.format("SELECT BIT_XOR(CAST(CRC32(%s) AS
UNSIGNED)) AS checksum FROM %s", quote(column), quote(tableName)));
}
@Override
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
index d111f4b7708..a8594a30119 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
@@ -24,10 +24,12 @@ import
org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import org.junit.Test;
import java.util.Collections;
+import java.util.Optional;
import java.util.Set;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
public final class MySQLPipelineSQLBuilderTest {
@@ -47,8 +49,9 @@ public final class MySQLPipelineSQLBuilderTest {
@Test
public void assertBuildSumCrc32SQL() {
- String actual = sqlBuilder.buildCRC32SQL("t2", "id");
- assertThat(actual, is("SELECT BIT_XOR(CAST(CRC32(`id`) AS UNSIGNED))
AS checksum FROM `t2`"));
+ Optional<String> actual = sqlBuilder.buildCRC32SQL("t2", "id");
+ assertTrue(actual.isPresent());
+ assertThat(actual.get(), is("SELECT BIT_XOR(CAST(CRC32(`id`) AS
UNSIGNED)) AS checksum FROM `t2`"));
}
private DataRecord mockDataRecord(final String tableName) {
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/check/consistency/CRC32MatchPostgreSQLSingleTableDataCalculator.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/check/consistency/CRC32
[...]
deleted file mode 100644
index 2a116f2c506..00000000000
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/check/consistency/CRC32MatchPostgreSQLSingleTableDataCalculator.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.postgresql.check.consistency;
-
-import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataCalculateParameter;
-import
org.apache.shardingsphere.data.pipeline.core.spi.check.consistency.AbstractSingleTableDataCalculator;
-import
org.apache.shardingsphere.data.pipeline.core.spi.check.consistency.CRC32MatchDataConsistencyCheckAlgorithm;
-import
org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseType;
-
-import java.util.Collection;
-import java.util.Collections;
-
-/**
- * CRC32 match PostgreSQL implementation of single table data calculator.
- */
-public final class CRC32MatchPostgreSQLSingleTableDataCalculator extends
AbstractSingleTableDataCalculator {
-
- private static final Collection<String> DATABASE_TYPES =
Collections.singletonList(new PostgreSQLDatabaseType().getName());
-
- @Override
- public String getAlgorithmType() {
- return CRC32MatchDataConsistencyCheckAlgorithm.TYPE;
- }
-
- @Override
- public Collection<String> getDatabaseTypes() {
- return DATABASE_TYPES;
- }
-
- @Override
- public Iterable<Object> calculate(final DataCalculateParameter
dataCalculateParameter) {
- //TODO PostgreSQL calculate
- return Collections.emptyList();
- }
-}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
index c7f2d5cb902..e87046ab9f8 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
@@ -32,7 +32,7 @@ public final class PostgreSQLPipelineSQLBuilderTest {
@Test
public void assertBuildInsertSQL() {
- String actual =
PipelineSQLBuilderFactory.getSQLBuilder("PostgreSQL").buildInsertSQL(mockDataRecord());
+ String actual =
PipelineSQLBuilderFactory.newInstance("PostgreSQL").buildInsertSQL(mockDataRecord());
assertThat(actual, is("INSERT INTO
\"t_order\"(\"order_id\",\"user_id\",\"status\") VALUES(?,?,?) ON CONFLICT
(order_id)"
+ " DO UPDATE SET
\"user_id\"=EXCLUDED.\"user_id\",\"status\"=EXCLUDED.\"status\""));
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/check/consistency/SingleTableDataCalculator.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/check/consistency/SingleTableDataCalculator.java
index 85524498d0f..26e12bb2811 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/check/consistency/SingleTableDataCalculator.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/check/consistency/SingleTableDataCalculator.java
@@ -18,54 +18,24 @@
package org.apache.shardingsphere.data.pipeline.spi.check.consistency;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataCalculateParameter;
-
-import java.util.Collection;
-import java.util.Properties;
+import org.apache.shardingsphere.spi.type.typed.StatefulTypedSPI;
/**
* Single table data calculator.
*/
-public interface SingleTableDataCalculator {
+public interface SingleTableDataCalculator extends StatefulTypedSPI {
/**
* Initialize create data calculator.
*/
- void init();
-
- /**
- * Get algorithm properties.
- *
- * @return properties
- */
- Properties getAlgorithmProps();
-
- /**
- * Set algorithm properties.
- * Used by data consistency check algorithm.
- *
- * @param algorithmProps algorithm properties
- */
- void setAlgorithmProps(Properties algorithmProps);
+ default void init() {
+ }
/**
- * Calculate table data content, return checksum typically.
+ * Calculate table data content.
*
* @param dataCalculateParameter data calculate parameter
- * @return calculated result, it will be used to check equality
+ * @return calculated result for checking equality
*/
Iterable<Object> calculate(DataCalculateParameter dataCalculateParameter);
-
- /**
- * Get algorithm type.
- *
- * @return algorithm type
- */
- String getAlgorithmType();
-
- /**
- * Get database types.
- *
- * @return database types
- */
- Collection<String> getDatabaseTypes();
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
index ae9627c0d42..015ee23813b 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
@@ -23,6 +23,7 @@ import
org.apache.shardingsphere.spi.type.typed.StatelessTypedSPI;
import java.util.Collection;
import java.util.List;
+import java.util.Optional;
/**
* Pipeline SQL builder.
@@ -110,4 +111,15 @@ public interface PipelineSQLBuilder extends
StatelessTypedSPI {
* @return split SQL
*/
String buildSplitByPrimaryKeyRangeSQL(String tableName, String primaryKey);
+
+ /**
+ * Build CRC32 SQL.
+ *
+ * @param tableName table Name
+ * @param column column
+ * @return CRC32 SQL
+ */
+ default Optional<String> buildCRC32SQL(final String tableName, final
String column) {
+ return Optional.empty();
+ }
}
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableDataCalculatorFactoryTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableDataCalculatorFactoryTest.java
index e84c15880b7..1fabc28e2c4 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableDataCalculatorFactoryTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableDataCalculatorFactoryTest.java
@@ -17,25 +17,12 @@
package org.apache.shardingsphere.data.pipeline.core.check.consistency;
-import
org.apache.shardingsphere.data.pipeline.core.fixture.FixtureDataConsistencyCheckAlgorithm;
-import org.apache.shardingsphere.infra.database.type.dialect.H2DatabaseType;
-import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
import org.junit.Test;
public final class SingleTableDataCalculatorFactoryTest {
@Test
- public void assertNewServiceInstanceSuccess() {
-
SingleTableDataCalculatorFactory.newServiceInstance(FixtureDataConsistencyCheckAlgorithm.TYPE,
new H2DatabaseType().getName());
- }
-
- @Test(expected = NullPointerException.class)
- public void assertNewServiceInstanceFailedNoAlgorithmType() {
- SingleTableDataCalculatorFactory.newServiceInstance("not-exists-alg",
new H2DatabaseType().getName());
- }
-
- @Test(expected = NullPointerException.class)
- public void assertNewServiceInstanceFailedNoDatabaseType() {
-
SingleTableDataCalculatorFactory.newServiceInstance(FixtureDataConsistencyCheckAlgorithm.TYPE,
new MySQLDatabaseType().getName());
+ public void assertNewInstanceSuccess() {
+ SingleTableDataCalculatorFactory.newInstance("FIXTURE");
}
}
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureDataConsistencyCheckAlgorithm.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureDataConsistencyCheckAlgorithm.java
index c95ef730af1..b715d48c152 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureDataConsistencyCheckAlgorithm.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureDataConsistencyCheckAlgorithm.java
@@ -41,7 +41,7 @@ public final class FixtureDataConsistencyCheckAlgorithm
implements DataConsisten
@Override
public SingleTableDataCalculator getSingleTableDataCalculator(final String
supportedDatabaseType) {
- return SingleTableDataCalculatorFactory.newServiceInstance(TYPE,
supportedDatabaseType);
+ return SingleTableDataCalculatorFactory.newInstance(TYPE);
}
@Override
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureH2SingleTableDataCalculator.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureSingleTableDataCalculator.java
similarity index 72%
rename from
shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureH2SingleTableDataCalculator.java
rename to
shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureSingleTableDataCalculator.java
index 61a7a142eab..8657b5663ad 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureH2SingleTableDataCalculator.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureSingleTableDataCalculator.java
@@ -18,25 +18,19 @@
package org.apache.shardingsphere.data.pipeline.core.fixture;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataCalculateParameter;
-import
org.apache.shardingsphere.data.pipeline.core.spi.check.consistency.AbstractSingleTableDataCalculator;
+import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator;
-import java.util.Collection;
import java.util.Collections;
-public final class FixtureH2SingleTableDataCalculator extends
AbstractSingleTableDataCalculator {
+public final class FixtureSingleTableDataCalculator implements
SingleTableDataCalculator {
@Override
- public String getAlgorithmType() {
- return FixtureDataConsistencyCheckAlgorithm.TYPE;
- }
-
- @Override
- public Collection<String> getDatabaseTypes() {
- return Collections.singletonList("H2");
+ public Iterable<Object> calculate(final DataCalculateParameter
dataCalculateParameter) {
+ return Collections.singletonList(true);
}
@Override
- public Iterable<Object> calculate(final DataCalculateParameter
dataCalculateParameter) {
- return Collections.singletonList(true);
+ public String getType() {
+ return "FIXTURE";
}
}
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/CRC32MatchDataConsistencyCheckAlgorithmTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/CRC32MatchDataConsistencyCheckAlgorithmTest.java
index caa937e3825..927c44cf9ea 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/CRC32MatchDataConsistencyCheckAlgorithmTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/CRC32MatchDataConsistencyCheckAlgorithmTest.java
@@ -36,7 +36,7 @@ public final class
CRC32MatchDataConsistencyCheckAlgorithmTest {
assertThat(checkAlgorithm.getSupportedDatabaseTypes(),
is(Collections.singletonList("MySQL")));
}
- @Test(expected = NullPointerException.class)
+ @Test
public void assertGetSingleTableDataCalculator() {
CRC32MatchDataConsistencyCheckAlgorithm checkAlgorithm = new
CRC32MatchDataConsistencyCheckAlgorithm();
checkAlgorithm.getSupportedDatabaseTypes().forEach(checkAlgorithm::getSingleTableDataCalculator);
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator
index bd682f90c4f..d128ebdefec 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator
@@ -15,4 +15,4 @@
# limitations under the License.
#
-org.apache.shardingsphere.data.pipeline.core.fixture.FixtureH2SingleTableDataCalculator
+org.apache.shardingsphere.data.pipeline.core.fixture.FixtureSingleTableDataCalculator