This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new f7f5c15d50b Improve inventory task dumper at pipeline (#25055)
f7f5c15d50b is described below

commit f7f5c15d50b5aa9a5b95cffd751ee319e7f19f40
Author: Xinze Guo <[email protected]>
AuthorDate: Mon Apr 10 19:42:57 2023 +0800

    Improve inventory task dumper at pipeline (#25055)
    
    * Improve inventory task dumper at pipeline
    
    * Verify insert column names
---
 .../config/ingest/InventoryDumperConfiguration.java   |  4 ++++
 .../pipeline/core/ingest/dumper/InventoryDumper.java  | 19 +++++++++++++------
 .../core/task/InventoryIncrementalTasksRunner.java    |  2 +-
 .../migration/prepare/MigrationJobPreparer.java       |  5 +++++
 4 files changed, 23 insertions(+), 7 deletions(-)

diff --git 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
index 68a44376295..3be5b12f5ea 100644
--- 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
+++ 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/InventoryDumperConfiguration.java
@@ -40,6 +40,10 @@ public final class InventoryDumperConfiguration extends 
DumperConfiguration {
     
     private List<PipelineColumnMetaData> uniqueKeyColumns;
     
+    private List<String> insertColumnNames;
+    
+    private String querySQL;
+    
     private Integer shardingItem;
     
     private int batchSize = 1000;
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
index 40d6196622a..1de123faa15 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
@@ -17,11 +17,11 @@
 
 package org.apache.shardingsphere.data.pipeline.core.ingest.dumper;
 
+import com.google.common.base.Strings;
 import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.Dumper;
@@ -38,6 +38,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 import 
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
 import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
+import 
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
 import org.apache.shardingsphere.data.pipeline.core.util.JDBCStreamQueryUtils;
@@ -49,7 +50,6 @@ import 
org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
 import 
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
-import 
org.apache.shardingsphere.infra.util.exception.external.sql.type.generic.UnsupportedSQLOperationException;
 
 import javax.sql.DataSource;
 import java.sql.Connection;
@@ -60,6 +60,7 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 
 /**
  * Inventory dumper.
@@ -83,8 +84,6 @@ public final class InventoryDumper extends 
AbstractLifecycleExecutor implements
     private volatile Statement dumpStatement;
     
     public InventoryDumper(final InventoryDumperConfiguration dumperConfig, 
final PipelineChannel channel, final DataSource dataSource, final 
PipelineTableMetaDataLoader metaDataLoader) {
-        
ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperConfig.getDataSourceConfig().getClass()),
-                () -> new 
UnsupportedSQLOperationException("AbstractInventoryDumper only support 
StandardPipelineDataSourceConfiguration"));
         this.dumperConfig = dumperConfig;
         this.channel = channel;
         this.dataSource = dataSource;
@@ -143,6 +142,9 @@ public final class InventoryDumper extends 
AbstractLifecycleExecutor implements
     }
     
     private String buildInventoryDumpSQL() {
+        if (!Strings.isNullOrEmpty(dumperConfig.getQuerySQL())) {
+            return dumperConfig.getQuerySQL();
+        }
         LogicTableName logicTableName = new 
LogicTableName(dumperConfig.getLogicTableName());
         String schemaName = dumperConfig.getSchemaName(logicTableName);
         if (!dumperConfig.hasUniqueKey()) {
@@ -188,9 +190,14 @@ public final class InventoryDumper extends 
AbstractLifecycleExecutor implements
         DataRecord result = new DataRecord(newPosition(resultSet), 
columnCount);
         result.setType(IngestDataChangeType.INSERT);
         result.setTableName(dumperConfig.getLogicTableName());
+        List<String> insertColumnNames = 
Optional.ofNullable(dumperConfig.getInsertColumnNames()).orElse(Collections.emptyList());
+        ShardingSpherePreconditions.checkState(insertColumnNames.isEmpty() || 
insertColumnNames.size() == resultSetMetaData.getColumnCount(),
+                () -> new PipelineInvalidParameterException("Insert colum 
names count not equals ResultSet column count"));
         for (int i = 1; i <= columnCount; i++) {
-            String columnName = resultSetMetaData.getColumnName(i);
-            result.addColumn(new Column(columnName, 
columnValueReader.readValue(resultSet, resultSetMetaData, i), true, 
tableMetaData.getColumnMetaData(columnName).isUniqueKey()));
+            String columnName = insertColumnNames.isEmpty() ? 
resultSetMetaData.getColumnName(i) : insertColumnNames.get(i - 1);
+            
ShardingSpherePreconditions.checkNotNull(tableMetaData.getColumnMetaData(columnName),
 () -> new PipelineInvalidParameterException(String.format("Column name is %s", 
columnName)));
+            boolean isUniqueKey = 
tableMetaData.getColumnMetaData(columnName).isUniqueKey();
+            result.addColumn(new Column(columnName, 
columnValueReader.readValue(resultSet, resultSetMetaData, i), true, 
isUniqueKey));
         }
         return result;
     }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
index 0834db969a9..13432217b60 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
@@ -97,7 +97,7 @@ public class InventoryIncrementalTasksRunner implements 
PipelineTasksRunner {
         ExecuteEngine.trigger(futures, new InventoryTaskExecuteCallback());
     }
     
-    private void updateLocalAndRemoteJobItemStatus(final JobStatus jobStatus) {
+    protected void updateLocalAndRemoteJobItemStatus(final JobStatus 
jobStatus) {
         jobItemContext.setStatus(jobStatus);
         jobAPI.updateJobItemStatus(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), jobStatus);
     }
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
index e1f7ac6fa02..91cf01f2cc5 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
@@ -23,6 +23,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumper
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress;
@@ -50,6 +51,8 @@ import org.apache.shardingsphere.infra.lock.LockContext;
 import org.apache.shardingsphere.infra.lock.LockDefinition;
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
 import org.apache.shardingsphere.infra.parser.SQLParserEngine;
+import 
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
+import 
org.apache.shardingsphere.infra.util.exception.external.sql.type.generic.UnsupportedSQLOperationException;
 import org.apache.shardingsphere.mode.lock.GlobalLockDefinition;
 
 import java.sql.SQLException;
@@ -71,6 +74,8 @@ public final class MigrationJobPreparer {
      * @throws SQLException SQL exception
      */
     public void prepare(final MigrationJobItemContext jobItemContext) throws 
SQLException {
+        
ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(jobItemContext.getTaskConfig().getDumperConfig().getDataSourceConfig().getClass()),
+                () -> new UnsupportedSQLOperationException("Migration 
inventory dumper only support StandardPipelineDataSourceConfiguration"));
         
PipelineJobPreparerUtils.checkSourceDataSource(jobItemContext.getJobConfig().getSourceDatabaseType(),
 Collections.singleton(jobItemContext.getSourceDataSource()));
         if (jobItemContext.isStopping()) {
             PipelineJobCenter.stop(jobItemContext.getJobId());

Reply via email to