This is an automated email from the ASF dual-hosted git repository.
azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 7b367f689f8 Refactor DATA_MATCH consistency check by one query (#24075)
7b367f689f8 is described below
commit 7b367f689f8aeaa2767eaa73c83a5d64adf2d48b
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Thu Feb 9 17:32:23 2023 +0800
Refactor DATA_MATCH consistency check by one query (#24075)
* Remove unused fields
* Refactor DATA_MATCH consistency check by one query
---
.../DataConsistencyCalculateParameter.java | 16 +--
.../spi/sqlbuilder/PipelineSQLBuilder.java | 4 +-
...SingleTableInventoryDataConsistencyChecker.java | 26 +++-
...StreamingDataConsistencyCalculateAlgorithm.java | 1 -
...DataMatchDataConsistencyCalculateAlgorithm.java | 144 ++++++++++++++-------
.../sqlbuilder/AbstractPipelineSQLBuilder.java | 7 +-
.../data/pipeline/core/util/CloseUtil.java | 45 +++++++
.../fixture/FixturePipelineSQLBuilder.java | 2 +-
...MatchDataConsistencyCalculateAlgorithmTest.java | 32 ++---
9 files changed, 189 insertions(+), 88 deletions(-)
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculateParameter.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculateParameter.java
index 04d9c757adf..9fb97eb3f3a 100644
---
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculateParameter.java
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculateParameter.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.data.pipeline.api.check.consistency;
-import com.google.common.collect.Range;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
@@ -59,20 +58,9 @@ public final class DataConsistencyCalculateParameter {
private final PipelineColumnMetaData uniqueKey;
/**
- * Used for range query.
+ * Calculation context.
*/
- private volatile Range<? extends Comparable<?>> uniqueKeyValueRange;
-
- /**
- * Used for multiple records query.
- * If it's configured, then it could be translated to SQL like "uniqueKey
IN (value1,value2,value3)".
- */
- private volatile Collection<Object> uniqueKeyValues;
-
- /**
- * Previous calculated result will be transferred to next call.
- */
- private volatile Object previousCalculatedResult;
+ private volatile AutoCloseable calculationContext;
private final Object tableCheckPosition;
}
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
index 8195e5eea11..5406f5cbc8f 100644
---
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
@@ -130,7 +130,7 @@ public interface PipelineSQLBuilder extends TypedSPI {
String buildCountSQL(String schemaName, String tableName);
/**
- * Build query SQL.
+ * Build query all ordering SQL.
*
* @param schemaName schema name
* @param tableName table name
@@ -138,7 +138,7 @@ public interface PipelineSQLBuilder extends TypedSPI {
* @param firstQuery first query
* @return query SQL
*/
- String buildChunkedQuerySQL(String schemaName, String tableName, String
uniqueKey, boolean firstQuery);
+ String buildQueryAllOrderingSQL(String schemaName, String tableName,
String uniqueKey, boolean firstQuery);
/**
* Build check empty SQL.
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
index 060421536fd..7f4461394ec 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
@@ -33,6 +33,7 @@ import
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumn
import
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLException;
import
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
+import org.apache.shardingsphere.data.pipeline.core.util.CloseUtil;
import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import
org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
@@ -109,6 +110,23 @@ public final class
SingleTableInventoryDataConsistencyChecker {
columnNames, targetDatabaseType, sourceDatabaseType,
uniqueKey, tableCheckPositions.get(targetTableName));
Iterator<DataConsistencyCalculatedResult> sourceCalculatedResults =
calculateAlgorithm.calculate(sourceParam).iterator();
Iterator<DataConsistencyCalculatedResult> targetCalculatedResults =
calculateAlgorithm.calculate(targetParam).iterator();
+ try {
+ return check0(sourceCalculatedResults, targetCalculatedResults,
executor);
+ // CHECKSTYLE:OFF
+ } catch (final RuntimeException ex) {
+ // CHECKSTYLE:ON
+ if (null != sourceParam.getCalculationContext()) {
+ CloseUtil.closeQuietly(sourceParam.getCalculationContext());
+ }
+ if (null != targetParam.getCalculationContext()) {
+ CloseUtil.closeQuietly(targetParam.getCalculationContext());
+ }
+ throw ex;
+ }
+ }
+
+ private DataConsistencyCheckResult check0(final
Iterator<DataConsistencyCalculatedResult> sourceCalculatedResults, final
Iterator<DataConsistencyCalculatedResult> targetCalculatedResults,
+ final ThreadPoolExecutor
executor) {
long sourceRecordsCount = 0;
long targetRecordsCount = 0;
boolean contentMatched = true;
@@ -128,10 +146,10 @@ public final class
SingleTableInventoryDataConsistencyChecker {
break;
}
if (sourceCalculatedResult.getMaxUniqueKeyValue().isPresent()) {
- tableCheckPositions.put(sourceTableName,
sourceCalculatedResult.getMaxUniqueKeyValue().get());
+
progressContext.getTableCheckPositions().put(sourceTable.getTableName().getOriginal(),
sourceCalculatedResult.getMaxUniqueKeyValue().get());
}
if (targetCalculatedResult.getMaxUniqueKeyValue().isPresent()) {
- tableCheckPositions.put(targetTableName,
targetCalculatedResult.getMaxUniqueKeyValue().get());
+
progressContext.getTableCheckPositions().put(targetTable.getTableName().getOriginal(),
targetCalculatedResult.getMaxUniqueKeyValue().get());
}
progressContext.onProgressUpdated(new
PipelineJobProgressUpdatedParameter(sourceCalculatedResult.getRecordsCount()));
}
@@ -146,8 +164,8 @@ public final class
SingleTableInventoryDataConsistencyChecker {
private DataConsistencyCalculateParameter buildParameter(final
PipelineDataSourceWrapper sourceDataSource,
final String
schemaName, final String tableName, final Collection<String> columnNames,
final String
sourceDatabaseType, final String targetDatabaseType, final
PipelineColumnMetaData uniqueKey,
- final Object
tableCheckPositionValue) {
- return new DataConsistencyCalculateParameter(sourceDataSource,
schemaName, tableName, columnNames, sourceDatabaseType, targetDatabaseType,
uniqueKey, tableCheckPositionValue);
+ final Object
tableCheckPosition) {
+ return new DataConsistencyCalculateParameter(sourceDataSource,
schemaName, tableName, columnNames, sourceDatabaseType, targetDatabaseType,
uniqueKey, tableCheckPosition);
}
private <T> T waitFuture(final Future<T> future) {
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/AbstractStreamingDataConsistencyCalculateAlgorithm.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/AbstractStreamingDataConsistencyCalculateAlgorithm.java
index e0600b895bb..37797fc8a57 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/AbstractStreamingDataConsistencyCalculateAlgorithm.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/AbstractStreamingDataConsistencyCalculateAlgorithm.java
@@ -78,7 +78,6 @@ public abstract class
AbstractStreamingDataConsistencyCalculateAlgorithm extends
public DataConsistencyCalculatedResult next() {
calculateIfNecessary();
Optional<DataConsistencyCalculatedResult> nextResult =
this.nextResult;
- param.setPreviousCalculatedResult(nextResult.orElse(null));
this.nextResult = null;
return nextResult.orElse(null);
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
index f1e9c0f100b..7a1ee87528f 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
@@ -20,6 +20,7 @@ package
org.apache.shardingsphere.data.pipeline.core.check.consistency.algorithm
import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
+import lombok.Setter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.builder.EqualsBuilder;
@@ -27,12 +28,14 @@ import org.apache.commons.lang3.builder.HashCodeBuilder;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculateParameter;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculatedResult;
import
org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCheckUtils;
+import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLException;
import
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
+import org.apache.shardingsphere.data.pipeline.core.util.CloseUtil;
import
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader;
import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
-import org.apache.shardingsphere.infra.util.spi.annotation.SPIDescription;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
+import org.apache.shardingsphere.infra.util.spi.annotation.SPIDescription;
import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
import java.math.BigDecimal;
@@ -45,11 +48,9 @@ import java.sql.SQLXML;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
-import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
/**
@@ -68,10 +69,6 @@ public final class
DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
private int chunkSize;
- private final Map<String, String> firstSQLCache = new
ConcurrentHashMap<>();
-
- private final Map<String, String> laterSQLCache = new
ConcurrentHashMap<>();
-
@Override
public void init(final Properties props) {
chunkSize = getChunkSize(props);
@@ -88,46 +85,84 @@ public final class
DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
@Override
public Optional<DataConsistencyCalculatedResult> calculateChunk(final
DataConsistencyCalculateParameter param) {
- CalculatedResult previousCalculatedResult = (CalculatedResult)
param.getPreviousCalculatedResult();
- String sql = getQuerySQL(param);
- try (
- Connection connection = param.getDataSource().getConnection();
- PreparedStatement preparedStatement =
setCurrentStatement(connection.prepareStatement(sql))) {
- preparedStatement.setFetchSize(chunkSize);
- Object tableCheckPosition = param.getTableCheckPosition();
- if (null == previousCalculatedResult) {
- if (null == tableCheckPosition) {
- preparedStatement.setInt(1, chunkSize);
- } else {
- preparedStatement.setObject(1, tableCheckPosition);
- preparedStatement.setInt(2, chunkSize);
- }
- } else {
- preparedStatement.setObject(1,
previousCalculatedResult.getMaxUniqueKeyValue().orElse(null));
- preparedStatement.setInt(2, chunkSize);
- }
+ CalculationContext calculationContext =
getOrCreateCalculationContext(param);
+ try {
Collection<Collection<Object>> records = new LinkedList<>();
Object maxUniqueKeyValue = null;
- try (ResultSet resultSet = preparedStatement.executeQuery()) {
- ColumnValueReader columnValueReader =
TypedSPILoader.getService(ColumnValueReader.class, param.getDatabaseType());
- while (resultSet.next()) {
- if (isCanceling()) {
- throw new
PipelineTableDataConsistencyCheckLoadingFailedException(param.getSchemaName(),
param.getLogicTableName());
- }
- ResultSetMetaData resultSetMetaData =
resultSet.getMetaData();
- int columnCount = resultSetMetaData.getColumnCount();
- Collection<Object> record = new LinkedList<>();
- for (int columnIndex = 1; columnIndex <= columnCount;
columnIndex++) {
- record.add(columnValueReader.readValue(resultSet,
resultSetMetaData, columnIndex));
- }
- records.add(record);
- maxUniqueKeyValue = columnValueReader.readValue(resultSet,
resultSetMetaData, param.getUniqueKey().getOrdinalPosition());
+ ColumnValueReader columnValueReader =
TypedSPILoader.getService(ColumnValueReader.class, param.getDatabaseType());
+ ResultSet resultSet = calculationContext.getResultSet();
+ while (resultSet.next()) {
+ if (isCanceling()) {
+ throw new
PipelineTableDataConsistencyCheckLoadingFailedException(param.getSchemaName(),
param.getLogicTableName());
+ }
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ int columnCount = resultSetMetaData.getColumnCount();
+ Collection<Object> record = new LinkedList<>();
+ for (int columnIndex = 1; columnIndex <= columnCount;
columnIndex++) {
+ record.add(columnValueReader.readValue(resultSet,
resultSetMetaData, columnIndex));
}
+ records.add(record);
+ maxUniqueKeyValue = columnValueReader.readValue(resultSet,
resultSetMetaData, param.getUniqueKey().getOrdinalPosition());
+ if (records.size() == chunkSize) {
+ break;
+ }
+ }
+ if (records.isEmpty()) {
+ calculationContext.close();
}
return records.isEmpty() ? Optional.empty() : Optional.of(new
CalculatedResult(maxUniqueKeyValue, records.size(), records));
- } catch (final SQLException ex) {
+ // CHECKSTYLE:OFF
+ } catch (final Exception ex) {
+ // CHECKSTYLE:ON
+ calculationContext.close();
+ if (ex instanceof PipelineSQLException) {
+ throw (PipelineSQLException) ex;
+ }
+ throw new
PipelineTableDataConsistencyCheckLoadingFailedException(param.getSchemaName(),
param.getLogicTableName(), ex);
+ }
+ }
+
+ private CalculationContext getOrCreateCalculationContext(final
DataConsistencyCalculateParameter param) {
+ CalculationContext result = (CalculationContext)
param.getCalculationContext();
+ if (null != result) {
+ return result;
+ }
+ try {
+ result = createCalculationContext(param);
+ // CHECKSTYLE:OFF
+ } catch (final Exception ex) {
+ // CHECKSTYLE:ON
+ throw new
PipelineTableDataConsistencyCheckLoadingFailedException(param.getSchemaName(),
param.getLogicTableName(), ex);
+ }
+ try {
+ fulfillCalculationContext(result, param);
+ // CHECKSTYLE:OFF
+ } catch (final Exception ex) {
+ // CHECKSTYLE:ON
+ result.close();
throw new
PipelineTableDataConsistencyCheckLoadingFailedException(param.getSchemaName(),
param.getLogicTableName(), ex);
}
+ return result;
+ }
+
+ private CalculationContext createCalculationContext(final
DataConsistencyCalculateParameter param) throws SQLException {
+ Connection connection = param.getDataSource().getConnection();
+ CalculationContext result = new CalculationContext(connection);
+ param.setCalculationContext(result);
+ return result;
+ }
+
+ private void fulfillCalculationContext(final CalculationContext
calculationContext, final DataConsistencyCalculateParameter param) throws
SQLException {
+ String sql = getQuerySQL(param);
+ PreparedStatement preparedStatement =
setCurrentStatement(calculationContext.getConnection().prepareStatement(sql));
+ calculationContext.setPreparedStatement(preparedStatement);
+ preparedStatement.setFetchSize(chunkSize);
+ Object tableCheckPosition = param.getTableCheckPosition();
+ if (null != tableCheckPosition) {
+ preparedStatement.setObject(1, tableCheckPosition);
+ }
+ ResultSet resultSet = preparedStatement.executeQuery();
+ calculationContext.setResultSet(resultSet);
}
private String getQuerySQL(final DataConsistencyCalculateParameter param) {
@@ -135,13 +170,10 @@ public final class
DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
String logicTableName = param.getLogicTableName();
String schemaName = param.getSchemaName();
String uniqueKey = param.getUniqueKey().getName();
- String cacheKey = param.getDatabaseType() + "-" + (null != schemaName
&& TypedSPILoader.getService(DatabaseType.class,
param.getDatabaseType()).isSchemaAvailable()
- ? schemaName + "." + logicTableName
- : logicTableName);
- if (null == param.getPreviousCalculatedResult() && null ==
param.getTableCheckPosition()) {
- return firstSQLCache.computeIfAbsent(cacheKey, s ->
sqlBuilder.buildChunkedQuerySQL(schemaName, logicTableName, uniqueKey, true));
+ if (null != param.getTableCheckPosition()) {
+ return sqlBuilder.buildQueryAllOrderingSQL(schemaName,
logicTableName, uniqueKey, false);
}
- return laterSQLCache.computeIfAbsent(cacheKey, s ->
sqlBuilder.buildChunkedQuerySQL(schemaName, logicTableName, uniqueKey, false));
+ return sqlBuilder.buildQueryAllOrderingSQL(schemaName, logicTableName,
uniqueKey, true);
}
@Override
@@ -154,6 +186,26 @@ public final class
DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
return SUPPORTED_DATABASE_TYPES;
}
+ @RequiredArgsConstructor
+ @Getter
+ private static final class CalculationContext implements AutoCloseable {
+
+ private final Connection connection;
+
+ @Setter
+ private volatile PreparedStatement preparedStatement;
+
+ @Setter
+ private volatile ResultSet resultSet;
+
+ @Override
+ public void close() {
+ CloseUtil.closeQuietly(resultSet);
+ CloseUtil.closeQuietly(preparedStatement);
+ CloseUtil.closeQuietly(connection);
+ }
+ }
+
@RequiredArgsConstructor
@Getter
static final class CalculatedResult implements
DataConsistencyCalculatedResult {
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
index 991ac270e50..8928e401973 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
@@ -18,7 +18,6 @@
package org.apache.shardingsphere.data.pipeline.core.sqlbuilder;
import com.google.common.base.Strings;
-import lombok.NonNull;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.core.record.RecordUtil;
@@ -171,12 +170,12 @@ public abstract class AbstractPipelineSQLBuilder
implements PipelineSQLBuilder {
}
@Override
- public String buildChunkedQuerySQL(final String schemaName, final @NonNull
String tableName, final @NonNull String uniqueKey, final boolean firstQuery) {
+ public String buildQueryAllOrderingSQL(final String schemaName, final
String tableName, final String uniqueKey, final boolean firstQuery) {
String qualifiedTableName = getQualifiedTableName(schemaName,
tableName);
String quotedUniqueKey = quote(uniqueKey);
return firstQuery
- ? String.format("SELECT * FROM %s ORDER BY %s ASC LIMIT ?",
qualifiedTableName, quotedUniqueKey)
- : String.format("SELECT * FROM %s WHERE %s>? ORDER BY %s ASC
LIMIT ?", qualifiedTableName, quotedUniqueKey, quotedUniqueKey);
+ ? String.format("SELECT * FROM %s ORDER BY %s ASC",
qualifiedTableName, quotedUniqueKey)
+ : String.format("SELECT * FROM %s WHERE %s>? ORDER BY %s ASC",
qualifiedTableName, quotedUniqueKey, quotedUniqueKey);
}
@Override
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/CloseUtil.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/CloseUtil.java
new file mode 100644
index 00000000000..1d4fe97f5be
--- /dev/null
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/CloseUtil.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.util;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
+/**
+ * Close util.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class CloseUtil {
+
+ /**
+ * Close quietly.
+ *
+ * @param closeable closeable
+ */
+ public static void closeQuietly(final AutoCloseable closeable) {
+ if (null == closeable) {
+ return;
+ }
+ try {
+ closeable.close();
+ // CHECKSTYLE:OFF
+ } catch (final Exception ignored) {
+ // CHECKSTYLE:ON
+ }
+ }
+}
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/fixture/FixturePipelineSQLBuilder.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/fixture/FixturePipelineSQLBuilder.java
index b33a83d70ac..3db2baad3f5 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/fixture/FixturePipelineSQLBuilder.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/fixture/FixturePipelineSQLBuilder.java
@@ -69,7 +69,7 @@ public final class FixturePipelineSQLBuilder implements
PipelineSQLBuilder {
}
@Override
- public String buildChunkedQuerySQL(final String schemaName, final String
tableName, final String uniqueKey, final boolean firstQuery) {
+ public String buildQueryAllOrderingSQL(final String schemaName, final
String tableName, final String uniqueKey, final boolean firstQuery) {
return "";
}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithmTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithmTest.java
index d41de94a18d..de667ecb34a 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithmTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithmTest.java
@@ -24,8 +24,8 @@ import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSource
import
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
import
org.apache.shardingsphere.data.pipeline.core.check.consistency.algorithm.DataMatchDataConsistencyCalculateAlgorithm;
import org.apache.shardingsphere.infra.database.type.dialect.H2DatabaseType;
-import org.junit.After;
-import org.junit.Before;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.internal.configuration.plugins.Plugins;
@@ -42,19 +42,25 @@ import static org.junit.Assert.assertTrue;
public final class DataMatchDataConsistencyCalculateAlgorithmTest {
- private PipelineDataSourceWrapper source;
+ private static PipelineDataSourceWrapper source;
- private PipelineDataSourceWrapper target;
+ private static PipelineDataSourceWrapper target;
- @Before
- public void setUp() throws Exception {
+ @BeforeClass
+ public static void setUp() throws Exception {
source = new
PipelineDataSourceWrapper(createHikariDataSource("source_ds"), new
H2DatabaseType());
createTableAndInitData(source, "t_order_copy");
target = new
PipelineDataSourceWrapper(createHikariDataSource("target_ds"), new
H2DatabaseType());
createTableAndInitData(target, "t_order");
}
- private HikariDataSource createHikariDataSource(final String databaseName)
{
+ @AfterClass
+ public static void tearDown() throws Exception {
+ source.close();
+ target.close();
+ }
+
+ private static HikariDataSource createHikariDataSource(final String
databaseName) {
HikariDataSource result = new HikariDataSource();
result.setJdbcUrl(String.format("jdbc:h2:mem:%s;DATABASE_TO_UPPER=false;MODE=MySQL",
databaseName));
result.setUsername("root");
@@ -66,7 +72,7 @@ public final class
DataMatchDataConsistencyCalculateAlgorithmTest {
return result;
}
- private void createTableAndInitData(final PipelineDataSourceWrapper
dataSource, final String tableName) throws SQLException {
+ private static void createTableAndInitData(final PipelineDataSourceWrapper
dataSource, final String tableName) throws SQLException {
try (Connection connection = dataSource.getConnection()) {
String sql = String.format("CREATE TABLE %s (order_id INT NOT
NULL, user_id INT NOT NULL, status VARCHAR(45) NULL, PRIMARY KEY (order_id))",
tableName);
connection.createStatement().execute(sql);
@@ -114,15 +120,9 @@ public final class
DataMatchDataConsistencyCalculateAlgorithmTest {
assertThat(sourceCalculateResult.get(),
is(targetCalculateResult.get()));
}
- private DataConsistencyCalculateParameter generateParameter(final
PipelineDataSourceWrapper dataSource, final String logicTableName, final Object
dataCheckPositionValue) {
+ private DataConsistencyCalculateParameter generateParameter(final
PipelineDataSourceWrapper dataSource, final String logicTableName, final Object
dataCheckPosition) {
PipelineColumnMetaData uniqueKey = new PipelineColumnMetaData(1,
"order_id", Types.INTEGER, "integer", false, true, true);
return new DataConsistencyCalculateParameter(dataSource, null,
logicTableName, Collections.emptyList(),
- "MySQL", "MySQL", uniqueKey, dataCheckPositionValue);
- }
-
- @After
- public void tearDown() throws Exception {
- source.close();
- target.close();
+ "MySQL", "MySQL", uniqueKey, dataCheckPosition);
}
}