This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 33ec1efb017 Improve pipeline importer job write exception (#25207)
33ec1efb017 is described below
commit 33ec1efb017f66bada6f97fc14e8ca4b9bc46ca8
Author: Xinze Guo <[email protected]>
AuthorDate: Tue Apr 18 16:08:47 2023 +0800
Improve pipeline importer job write exception (#25207)
* Improve pipeline importer job write exception
* Add transaction isolation at inventory
* Improve unit test
---
.../api/config/ingest/InventoryDumperConfiguration.java | 2 ++
.../exception/job/PipelineImporterJobWriteException.java | 4 ++--
.../data/pipeline/core/importer/DataSourceImporter.java | 16 ++++++++++------
.../pipeline/core/ingest/dumper/InventoryDumper.java | 3 +++
.../consistency/MigrationDataConsistencyCheckerTest.java | 2 ++
5 files changed, 19 insertions(+), 8 deletions(-)
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
index 3be5b12f5ea..8e8f88d5a49 100644
---
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
@@ -44,6 +44,8 @@ public final class InventoryDumperConfiguration extends
DumperConfiguration {
private String querySQL;
+ private Integer transactionIsolation;
+
private Integer shardingItem;
private int batchSize = 1000;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineImporterJobWriteException.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineImporterJobWriteException.java
index 762d9ace2b4..b0f832722f7 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineImporterJobWriteException.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineImporterJobWriteException.java
@@ -27,7 +27,7 @@ public final class PipelineImporterJobWriteException extends
PipelineSQLExceptio
private static final long serialVersionUID = -7924663094479253130L;
- public PipelineImporterJobWriteException() {
- super(XOpenSQLState.GENERAL_ERROR, 91, "Importer job write data
failed.");
+ public PipelineImporterJobWriteException(final Exception cause) {
+ super(XOpenSQLState.GENERAL_ERROR, 91, "Importer job write data
failed.", cause);
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
index 09f83d9eb24..a6a891d6e1b 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
@@ -42,7 +42,6 @@ import
org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterCo
import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
-import
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
import javax.sql.DataSource;
import java.sql.Connection;
@@ -130,22 +129,27 @@ public final class DataSourceImporter extends
AbstractLifecycleExecutor implemen
if (null == buffer || buffer.isEmpty()) {
return;
}
- boolean success = tryFlush(dataSource, buffer);
- ShardingSpherePreconditions.checkState(!isRunning() || success,
PipelineImporterJobWriteException::new);
+ try {
+ tryFlush(dataSource, buffer);
+ } catch (final SQLException ex) {
+ throw new PipelineImporterJobWriteException(ex);
+ }
}
@SneakyThrows(InterruptedException.class)
- private boolean tryFlush(final DataSource dataSource, final
List<DataRecord> buffer) {
+ private void tryFlush(final DataSource dataSource, final List<DataRecord>
buffer) throws SQLException {
for (int i = 0; isRunning() && i <= importerConfig.getRetryTimes();
i++) {
try {
doFlush(dataSource, buffer);
- return true;
+ return;
} catch (final SQLException ex) {
log.error("flush failed {}/{} times.", i,
importerConfig.getRetryTimes(), ex);
+ if (i == importerConfig.getRetryTimes()) {
+ throw ex;
+ }
Thread.sleep(Math.min(5 * 60 * 1000L, 1000L << i));
}
}
- return false;
}
private void doFlush(final DataSource dataSource, final List<DataRecord>
buffer) throws SQLException {
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
index 1de123faa15..548eac94a80 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
@@ -114,6 +114,9 @@ public final class InventoryDumper extends
AbstractLifecycleExecutor implements
private void dump(final PipelineTableMetaData tableMetaData, final
Connection connection) throws SQLException {
int batchSize = dumperConfig.getBatchSize();
DatabaseType databaseType =
dumperConfig.getDataSourceConfig().getDatabaseType();
+ if (null != dumperConfig.getTransactionIsolation()) {
+
connection.setTransactionIsolation(dumperConfig.getTransactionIsolation());
+ }
try (PreparedStatement preparedStatement =
JDBCStreamQueryUtils.generateStreamQueryPreparedStatement(databaseType,
connection, buildInventoryDumpSQL())) {
dumpStatement = preparedStatement;
if (!(databaseType instanceof MySQLDatabaseType)) {
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
index 93d0986119f..e05dd7b08c7 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
@@ -57,6 +57,8 @@ class MigrationDataConsistencyCheckerTest {
MigrationJobConfiguration jobConfig = createJobConfiguration();
JobConfigurationPOJO jobConfigurationPOJO = new JobConfigurationPOJO();
jobConfigurationPOJO.setJobParameter(YamlEngine.marshal(new
YamlMigrationJobConfigurationSwapper().swapToYamlConfiguration(jobConfig)));
+ jobConfigurationPOJO.setJobName(jobConfig.getJobId());
+ jobConfigurationPOJO.setShardingTotalCount(1);
GovernanceRepositoryAPI governanceRepositoryAPI =
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineContextUtils.getContextKey());
governanceRepositoryAPI.persist(String.format("/pipeline/jobs/%s/config",
jobConfig.getJobId()), YamlEngine.marshal(jobConfigurationPOJO));
governanceRepositoryAPI.persistJobItemProgress(jobConfig.getJobId(),
0, "");