This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new f7f5c15d50b Improve inventory task dumper at pipeline (#25055)
f7f5c15d50b is described below
commit f7f5c15d50b5aa9a5b95cffd751ee319e7f19f40
Author: Xinze Guo <[email protected]>
AuthorDate: Mon Apr 10 19:42:57 2023 +0800
Improve inventory task dumper at pipeline (#25055)
* Improve inventory task dumper at pipeline
* Verify insert column names
---
.../config/ingest/InventoryDumperConfiguration.java | 4 ++++
.../pipeline/core/ingest/dumper/InventoryDumper.java | 19 +++++++++++++------
.../core/task/InventoryIncrementalTasksRunner.java | 2 +-
.../migration/prepare/MigrationJobPreparer.java | 5 +++++
4 files changed, 23 insertions(+), 7 deletions(-)
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
index 68a44376295..3be5b12f5ea 100644
---
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
@@ -40,6 +40,10 @@ public final class InventoryDumperConfiguration extends
DumperConfiguration {
private List<PipelineColumnMetaData> uniqueKeyColumns;
+ private List<String> insertColumnNames;
+
+ private String querySQL;
+
private Integer shardingItem;
private int batchSize = 1000;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
index 40d6196622a..1de123faa15 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
@@ -17,11 +17,11 @@
package org.apache.shardingsphere.data.pipeline.core.ingest.dumper;
+import com.google.common.base.Strings;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
-import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.Dumper;
@@ -38,6 +38,7 @@ import
org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
import
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
+import
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
import
org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
import
org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
import org.apache.shardingsphere.data.pipeline.core.util.JDBCStreamQueryUtils;
@@ -49,7 +50,6 @@ import
org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
import
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
-import
org.apache.shardingsphere.infra.util.exception.external.sql.type.generic.UnsupportedSQLOperationException;
import javax.sql.DataSource;
import java.sql.Connection;
@@ -60,6 +60,7 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
/**
* Inventory dumper.
@@ -83,8 +84,6 @@ public final class InventoryDumper extends
AbstractLifecycleExecutor implements
private volatile Statement dumpStatement;
public InventoryDumper(final InventoryDumperConfiguration dumperConfig,
final PipelineChannel channel, final DataSource dataSource, final
PipelineTableMetaDataLoader metaDataLoader) {
-
ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperConfig.getDataSourceConfig().getClass()),
- () -> new
UnsupportedSQLOperationException("AbstractInventoryDumper only support
StandardPipelineDataSourceConfiguration"));
this.dumperConfig = dumperConfig;
this.channel = channel;
this.dataSource = dataSource;
@@ -143,6 +142,9 @@ public final class InventoryDumper extends
AbstractLifecycleExecutor implements
}
private String buildInventoryDumpSQL() {
+ if (!Strings.isNullOrEmpty(dumperConfig.getQuerySQL())) {
+ return dumperConfig.getQuerySQL();
+ }
LogicTableName logicTableName = new
LogicTableName(dumperConfig.getLogicTableName());
String schemaName = dumperConfig.getSchemaName(logicTableName);
if (!dumperConfig.hasUniqueKey()) {
@@ -188,9 +190,14 @@ public final class InventoryDumper extends
AbstractLifecycleExecutor implements
DataRecord result = new DataRecord(newPosition(resultSet),
columnCount);
result.setType(IngestDataChangeType.INSERT);
result.setTableName(dumperConfig.getLogicTableName());
+ List<String> insertColumnNames =
Optional.ofNullable(dumperConfig.getInsertColumnNames()).orElse(Collections.emptyList());
+ ShardingSpherePreconditions.checkState(insertColumnNames.isEmpty() ||
insertColumnNames.size() == resultSetMetaData.getColumnCount(),
+ () -> new PipelineInvalidParameterException("Insert colum
names count not equals ResultSet column count"));
for (int i = 1; i <= columnCount; i++) {
- String columnName = resultSetMetaData.getColumnName(i);
- result.addColumn(new Column(columnName,
columnValueReader.readValue(resultSet, resultSetMetaData, i), true,
tableMetaData.getColumnMetaData(columnName).isUniqueKey()));
+ String columnName = insertColumnNames.isEmpty() ?
resultSetMetaData.getColumnName(i) : insertColumnNames.get(i - 1);
+
ShardingSpherePreconditions.checkNotNull(tableMetaData.getColumnMetaData(columnName),
() -> new PipelineInvalidParameterException(String.format("Column name is %s",
columnName)));
+ boolean isUniqueKey =
tableMetaData.getColumnMetaData(columnName).isUniqueKey();
+ result.addColumn(new Column(columnName,
columnValueReader.readValue(resultSet, resultSetMetaData, i), true,
isUniqueKey));
}
return result;
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
index 0834db969a9..13432217b60 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
@@ -97,7 +97,7 @@ public class InventoryIncrementalTasksRunner implements
PipelineTasksRunner {
ExecuteEngine.trigger(futures, new InventoryTaskExecuteCallback());
}
- private void updateLocalAndRemoteJobItemStatus(final JobStatus jobStatus) {
+ protected void updateLocalAndRemoteJobItemStatus(final JobStatus
jobStatus) {
jobItemContext.setStatus(jobStatus);
jobAPI.updateJobItemStatus(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), jobStatus);
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
index e1f7ac6fa02..91cf01f2cc5 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
@@ -23,6 +23,7 @@ import
org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumper
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress;
@@ -50,6 +51,8 @@ import org.apache.shardingsphere.infra.lock.LockContext;
import org.apache.shardingsphere.infra.lock.LockDefinition;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.parser.SQLParserEngine;
+import
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
+import
org.apache.shardingsphere.infra.util.exception.external.sql.type.generic.UnsupportedSQLOperationException;
import org.apache.shardingsphere.mode.lock.GlobalLockDefinition;
import java.sql.SQLException;
@@ -71,6 +74,8 @@ public final class MigrationJobPreparer {
* @throws SQLException SQL exception
*/
public void prepare(final MigrationJobItemContext jobItemContext) throws
SQLException {
+
ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(jobItemContext.getTaskConfig().getDumperConfig().getDataSourceConfig().getClass()),
+ () -> new UnsupportedSQLOperationException("Migration
inventory dumper only support StandardPipelineDataSourceConfiguration"));
PipelineJobPreparerUtils.checkSourceDataSource(jobItemContext.getJobConfig().getSourceDatabaseType(),
Collections.singleton(jobItemContext.getSourceDataSource()));
if (jobItemContext.isStopping()) {
PipelineJobCenter.stop(jobItemContext.getJobId());