This is an automated email from the ASF dual-hosted git repository.

tuichenchuxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 274ef7bfbca Review and improve pipeline code (#21089)
274ef7bfbca is described below

commit 274ef7bfbca73125bf9a61c56408c1f2fffc9133
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Tue Sep 20 16:50:55 2022 +0800

    Review and improve pipeline code (#21089)
    
    * Improve PipelineTableMetaDataUtil
    
    * Improve job preparer stopping
    
    * Improve SchemaTableName
    
    * Improve AbstractDataSourceChecker
    
    * Move PipelineTableMetaDataUtil
    
    * Rename and move PipelineSchemaTableUtil
    
    * Rename and move PipelineProcessConfigurationUtils
---
 .../pipeline/api/metadata/SchemaTableName.java     |  19 ++++
 .../core/api/impl/AbstractPipelineJobAPIImpl.java  |   8 +-
 .../datasource/AbstractDataSourceChecker.java      |  25 ++--
 .../process/PipelineProcessConfigurationUtil.java} |   6 +-
 ...AbstractInventoryIncrementalProcessContext.java |   4 +-
 .../loader/PipelineSchemaUtil.java}                |  13 ++-
 .../metadata/loader/PipelineTableMetaDataUtil.java |  67 +++++++++++
 .../core/util/PipelineTableMetaDataUtil.java       | 126 ---------------------
 .../migration/MigrationDataConsistencyChecker.java |   5 +-
 .../scenario/migration/MigrationJobAPIImpl.java    |  18 ++-
 .../scenario/migration/MigrationJobPreparer.java   |   6 +
 .../PipelineProcessConfigurationUtilTest.java}     |   8 +-
 .../core/prepare/InventoryTaskSplitterTest.java    |   5 +-
 .../pipeline/core/util/PipelineContextUtil.java    |   3 +-
 14 files changed, 145 insertions(+), 168 deletions(-)

diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/SchemaTableName.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/SchemaTableName.java
index f3f7dc7fbd4..e66e4670f98 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/SchemaTableName.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/SchemaTableName.java
@@ -22,6 +22,8 @@ import lombok.NonNull;
 import lombok.RequiredArgsConstructor;
 import lombok.ToString;
 
+import java.util.Objects;
+
 /**
  * Schema name and table name.
  */
@@ -35,4 +37,21 @@ public class SchemaTableName {
     
     @NonNull
     private final TableName tableName;
+    
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        final SchemaTableName that = (SchemaTableName) o;
+        return schemaName.equals(that.schemaName) && 
tableName.equals(that.tableName);
+    }
+    
+    @Override
+    public int hashCode() {
+        return Objects.hash(schemaName, tableName);
+    }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
index a2e51f2fa05..3d5ce269185 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
@@ -32,6 +32,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
 import 
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI;
+import 
org.apache.shardingsphere.data.pipeline.core.config.process.PipelineProcessConfigurationUtil;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationWithInvalidShardingCountException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobHasAlreadyStartedException;
@@ -41,7 +42,6 @@ import 
org.apache.shardingsphere.data.pipeline.core.exception.metadata.CreateExi
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
 import 
org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
-import 
org.apache.shardingsphere.data.pipeline.core.util.PipelineProcessConfigurationUtils;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJob;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.JobBriefInfo;
@@ -98,16 +98,16 @@ public abstract class AbstractPipelineJobAPIImpl implements 
PipelineJobAPI {
     @Override
     public void dropProcessConfiguration(final String confPath) {
         String finalConfPath = confPath.trim();
-        PipelineProcessConfigurationUtils.verifyConfPath(confPath);
+        PipelineProcessConfigurationUtil.verifyConfPath(confPath);
         YamlPipelineProcessConfiguration targetYamlProcessConfig = 
getTargetYamlProcessConfiguration();
-        
PipelineProcessConfigurationUtils.setFieldsNullByConfPath(targetYamlProcessConfig,
 finalConfPath);
+        
PipelineProcessConfigurationUtil.setFieldsNullByConfPath(targetYamlProcessConfig,
 finalConfPath);
         processConfigPersistService.persist(getJobType(), 
PROCESS_CONFIG_SWAPPER.swapToObject(targetYamlProcessConfig));
     }
     
     @Override
     public PipelineProcessConfiguration showProcessConfiguration() {
         PipelineProcessConfiguration result = 
processConfigPersistService.load(getJobType());
-        result = 
PipelineProcessConfigurationUtils.convertWithDefaultValue(result);
+        result = 
PipelineProcessConfigurationUtil.convertWithDefaultValue(result);
         return result;
     }
     
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/datasource/AbstractDataSourceChecker.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/datasource/AbstractDataSourceChecker.java
index 847cae0ac4e..a06c465aa99 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/datasource/AbstractDataSourceChecker.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/datasource/AbstractDataSourceChecker.java
@@ -24,7 +24,6 @@ import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWith
 import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
 import 
org.apache.shardingsphere.data.pipeline.spi.check.datasource.DataSourceChecker;
 import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
-import 
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
 
 import javax.sql.DataSource;
 import java.sql.Connection;
@@ -54,23 +53,25 @@ public abstract class AbstractDataSourceChecker implements 
DataSourceChecker {
     public final void checkTargetTable(final Collection<? extends DataSource> 
dataSources, final TableNameSchemaNameMapping tableNameSchemaNameMapping, final 
Collection<String> logicTableNames) {
         try {
             for (DataSource each : dataSources) {
-                checkEmpty(each, tableNameSchemaNameMapping, logicTableNames);
+                for (String tableName : logicTableNames) {
+                    if (!checkEmpty(each, 
tableNameSchemaNameMapping.getSchemaName(tableName), tableName)) {
+                        throw new 
PrepareJobWithTargetTableNotEmptyException(tableName);
+                    }
+                }
             }
         } catch (final SQLException ex) {
             throw new PrepareJobWithInvalidConnectionException(ex);
         }
     }
     
-    private void checkEmpty(final DataSource dataSource, final 
TableNameSchemaNameMapping tableNameSchemaNameMapping, final Collection<String> 
logicTableNames) throws SQLException {
-        for (String each : logicTableNames) {
-            String sql = 
getSQLBuilder().buildCheckEmptySQL(tableNameSchemaNameMapping.getSchemaName(each),
 each);
-            log.info("Check whether table is empty, SQL: {}", sql);
-            try (
-                    Connection connection = dataSource.getConnection();
-                    PreparedStatement preparedStatement = 
connection.prepareStatement(sql);
-                    ResultSet resultSet = preparedStatement.executeQuery()) {
-                ShardingSpherePreconditions.checkState(!resultSet.next(), () 
-> new PrepareJobWithTargetTableNotEmptyException(each));
-            }
+    private boolean checkEmpty(final DataSource dataSource, final String 
schemaName, final String tableName) throws SQLException {
+        String sql = getSQLBuilder().buildCheckEmptySQL(schemaName, tableName);
+        log.info("checkEmpty, sql={}", sql);
+        try (
+                Connection connection = dataSource.getConnection();
+                PreparedStatement preparedStatement = 
connection.prepareStatement(sql);
+                ResultSet resultSet = preparedStatement.executeQuery()) {
+            return !resultSet.next();
         }
     }
     
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineProcessConfigurationUtils.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/config/process/PipelineProcessConfigurationUtil.java
similarity index 97%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineProcessConfigurationUtils.java
rename to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/config/process/PipelineProcessConfigurationUtil.java
index b9369180c14..07b67b000cb 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineProcessConfigurationUtils.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/config/process/PipelineProcessConfigurationUtil.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.util;
+package org.apache.shardingsphere.data.pipeline.core.config.process;
 
 import com.google.common.base.Splitter;
 import 
org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
@@ -31,9 +31,9 @@ import java.util.Properties;
 import java.util.regex.Pattern;
 
 /**
- * Pipeline process configuration utils.
+ * Pipeline process configuration util.
  */
-public final class PipelineProcessConfigurationUtils {
+public final class PipelineProcessConfigurationUtil {
     
     private static final YamlPipelineProcessConfigurationSwapper SWAPPER = new 
YamlPipelineProcessConfigurationSwapper();
     
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/AbstractInventoryIncrementalProcessContext.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/AbstractInventoryIncrementalProcessContext.java
index d04fb51346d..26a731fc75c 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/AbstractInventoryIncrementalProcessContext.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/AbstractInventoryIncrementalProcessContext.java
@@ -25,8 +25,8 @@ import org.apache.commons.lang3.concurrent.LazyInitializer;
 import 
org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.process.PipelineReadConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.process.PipelineWriteConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.core.config.process.PipelineProcessConfigurationUtil;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
-import 
org.apache.shardingsphere.data.pipeline.core.util.PipelineProcessConfigurationUtils;
 import 
org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
 import 
org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreatorFactory;
 import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
@@ -55,7 +55,7 @@ public abstract class 
AbstractInventoryIncrementalProcessContext implements Inve
     private final LazyInitializer<ExecuteEngine> 
importerExecuteEngineLazyInitializer;
     
     public AbstractInventoryIncrementalProcessContext(final String jobId, 
final PipelineProcessConfiguration originalProcessConfig) {
-        PipelineProcessConfiguration processConfig = 
PipelineProcessConfigurationUtils.convertWithDefaultValue(originalProcessConfig);
+        PipelineProcessConfiguration processConfig = 
PipelineProcessConfigurationUtil.convertWithDefaultValue(originalProcessConfig);
         this.pipelineProcessConfig = processConfig;
         PipelineReadConfiguration readConfig = processConfig.getRead();
         AlgorithmConfiguration readRateLimiter = readConfig.getRateLimiter();
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineSchemaTableUtil.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineSchemaUtil.java
similarity index 83%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineSchemaTableUtil.java
rename to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineSchemaUtil.java
index 08618150c19..4eff51a7d02 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineSchemaTableUtil.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineSchemaUtil.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.util;
+package org.apache.shardingsphere.data.pipeline.core.metadata.loader;
 
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
@@ -29,24 +29,25 @@ import java.sql.Connection;
 import java.sql.SQLException;
 
 /**
- * Pipeline schema table util.
+ * Pipeline schema util.
  */
 @NoArgsConstructor(access = AccessLevel.PRIVATE)
 @Slf4j
-public final class PipelineSchemaTableUtil {
+public final class PipelineSchemaUtil {
     
     /**
      * Get default schema by connection.getSchema().
      *
-     * @param dataSourceConfig pipeline data source config
+     * @param dataSourceConfig pipeline data source configuration
      * @return schema
      */
     @SneakyThrows(SQLException.class)
     public static String getDefaultSchema(final 
PipelineDataSourceConfiguration dataSourceConfig) {
         try (PipelineDataSourceWrapper dataSource = 
PipelineDataSourceFactory.newInstance(dataSourceConfig)) {
             try (Connection connection = dataSource.getConnection()) {
-                log.info("get default schema {}", connection.getSchema());
-                return connection.getSchema();
+                String result = connection.getSchema();
+                log.info("get default schema {}", result);
+                return result;
             }
         }
     }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineTableMetaDataUtil.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineTableMetaDataUtil.java
new file mode 100644
index 00000000000..2935845ac78
--- /dev/null
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineTableMetaDataUtil.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.metadata.loader;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineIndexMetaData;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
+import 
org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobByRangeException;
+import 
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
+
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Pipeline table meta data util.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class PipelineTableMetaDataUtil {
+    
+    /**
+     * Get unique key column.
+     *
+     * @param schemaName schema name
+     * @param tableName table name
+     * @param metaDataLoader meta data loader
+     * @return pipeline column meta data
+     */
+    public static PipelineColumnMetaData getUniqueKeyColumn(final String 
schemaName, final String tableName, final StandardPipelineTableMetaDataLoader 
metaDataLoader) {
+        PipelineTableMetaData pipelineTableMetaData = 
metaDataLoader.getTableMetaData(schemaName, tableName);
+        return mustGetAnAppropriateUniqueKeyColumn(pipelineTableMetaData, 
tableName);
+    }
+    
+    private static PipelineColumnMetaData 
mustGetAnAppropriateUniqueKeyColumn(final PipelineTableMetaData tableMetaData, 
final String tableName) {
+        ShardingSpherePreconditions.checkNotNull(tableMetaData, () -> new 
SplitPipelineJobByRangeException(tableName, "can not get table metadata"));
+        List<String> primaryKeys = tableMetaData.getPrimaryKeyColumns();
+        if (1 == primaryKeys.size()) {
+            return 
tableMetaData.getColumnMetaData(tableMetaData.getPrimaryKeyColumns().get(0));
+        }
+        ShardingSpherePreconditions.checkState(primaryKeys.isEmpty(), () -> 
new SplitPipelineJobByRangeException(tableName, "primary key is union 
primary"));
+        Collection<PipelineIndexMetaData> uniqueIndexes = 
tableMetaData.getUniqueIndexes();
+        ShardingSpherePreconditions.checkState(!uniqueIndexes.isEmpty(), () -> 
new SplitPipelineJobByRangeException(tableName, "no primary key or unique 
index"));
+        if (1 == uniqueIndexes.size() && 1 == 
uniqueIndexes.iterator().next().getColumns().size()) {
+            PipelineColumnMetaData column = 
uniqueIndexes.iterator().next().getColumns().get(0);
+            if (!column.isNullable()) {
+                return column;
+            }
+        }
+        throw new SplitPipelineJobByRangeException(tableName, "table contains 
multiple unique index or unique index contains nullable/multiple column(s)");
+    }
+}
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineTableMetaDataUtil.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineTableMetaDataUtil.java
deleted file mode 100644
index abdb15dfff4..00000000000
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineTableMetaDataUtil.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.util;
-
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-import lombok.SneakyThrows;
-import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
-import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
-import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
-import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineIndexMetaData;
-import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
-import 
org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobByRangeException;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
-import 
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
-
-import java.sql.SQLException;
-import java.util.Collection;
-import java.util.List;
-
-/**
- * Pipeline table meta data util.
- */
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class PipelineTableMetaDataUtil {
-    
-    /**
-     * Get pipeline table meta data.
-     *
-     * @param schemaName schema name
-     * @param tableName table name
-     * @param dataSourceConfig source configuration
-     * @param loader pipeline table meta data loader
-     * @return pipeline table meta data
-     */
-    @SneakyThrows(SQLException.class)
-    public static PipelineTableMetaData getPipelineTableMetaData(final String 
schemaName, final String tableName, final 
StandardPipelineDataSourceConfiguration dataSourceConfig,
-                                                                 final 
PipelineTableMetaDataLoader loader) {
-        try (PipelineDataSourceWrapper dataSource = 
PipelineDataSourceFactory.newInstance(dataSourceConfig)) {
-            return getPipelineTableMetaData(schemaName, tableName, dataSource, 
loader);
-        }
-    }
-    
-    /**
-     * Get pipeline table meta data.
-     *
-     * @param schemaName schema name
-     * @param tableName table name
-     * @param dataSource data source
-     * @param loader pipeline table meta data loader
-     * @return pipeline table meta data.
-     */
-    public static PipelineTableMetaData getPipelineTableMetaData(final String 
schemaName, final String tableName, final PipelineDataSourceWrapper dataSource,
-                                                                 final 
PipelineTableMetaDataLoader loader) {
-        if (null == loader) {
-            return new 
StandardPipelineTableMetaDataLoader(dataSource).getTableMetaData(schemaName, 
tableName);
-        } else {
-            return loader.getTableMetaData(schemaName, tableName);
-        }
-    }
-    
-    /**
-     * Get unique key column.
-     *
-     * @param schemaName schema name
-     * @param tableName table name
-     * @param dataSourceConfig data source config
-     * @param loader pipeline table meta data loader
-     * @return pipeline column meta data.
-     */
-    public static PipelineColumnMetaData getUniqueKeyColumn(final String 
schemaName, final String tableName, final 
StandardPipelineDataSourceConfiguration dataSourceConfig,
-                                                            final 
StandardPipelineTableMetaDataLoader loader) {
-        PipelineTableMetaData pipelineTableMetaData = 
getPipelineTableMetaData(schemaName, tableName, dataSourceConfig, loader);
-        return mustGetAnAppropriateUniqueKeyColumn(pipelineTableMetaData, 
tableName);
-    }
-    
-    /**
-     * Get unique key column.
-     *
-     * @param schemaName schema name
-     * @param tableName table name
-     * @param dataSource data source
-     * @param loader pipeline table meta data loader
-     * @return pipeline column meta data.
-     */
-    public static PipelineColumnMetaData getUniqueKeyColumn(final String 
schemaName, final String tableName, final PipelineDataSourceWrapper dataSource,
-                                                            final 
StandardPipelineTableMetaDataLoader loader) {
-        PipelineTableMetaData pipelineTableMetaData = 
getPipelineTableMetaData(schemaName, tableName, dataSource, loader);
-        return mustGetAnAppropriateUniqueKeyColumn(pipelineTableMetaData, 
tableName);
-    }
-    
-    private static PipelineColumnMetaData 
mustGetAnAppropriateUniqueKeyColumn(final PipelineTableMetaData tableMetaData, 
final String tableName) {
-        ShardingSpherePreconditions.checkNotNull(tableMetaData, () -> new 
SplitPipelineJobByRangeException(tableName, "can not get table metadata"));
-        List<String> primaryKeys = tableMetaData.getPrimaryKeyColumns();
-        if (1 == primaryKeys.size()) {
-            return 
tableMetaData.getColumnMetaData(tableMetaData.getPrimaryKeyColumns().get(0));
-        }
-        ShardingSpherePreconditions.checkState(primaryKeys.isEmpty(), () -> 
new SplitPipelineJobByRangeException(tableName, "primary key is union 
primary"));
-        Collection<PipelineIndexMetaData> uniqueIndexes = 
tableMetaData.getUniqueIndexes();
-        ShardingSpherePreconditions.checkState(!uniqueIndexes.isEmpty(), () -> 
new SplitPipelineJobByRangeException(tableName, "no primary key or unique 
index"));
-        if (1 == uniqueIndexes.size() && 1 == 
uniqueIndexes.iterator().next().getColumns().size()) {
-            PipelineColumnMetaData column = 
uniqueIndexes.iterator().next().getColumns().get(0);
-            if (!column.isNullable()) {
-                return column;
-            }
-        }
-        throw new SplitPipelineJobByRangeException(tableName, "table contains 
multiple unique index or unique index contains nullable/multiple column(s)");
-    }
-}
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
index c6061c1eaf4..fbbdcca1389 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
@@ -32,8 +32,8 @@ import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumn
 import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineDataConsistencyCheckFailedException;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
-import 
org.apache.shardingsphere.data.pipeline.core.util.PipelineTableMetaDataUtil;
 import 
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
 import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
@@ -173,8 +173,9 @@ public final class MigrationDataConsistencyChecker {
                 PipelineDataSourceWrapper targetDataSource = 
PipelineDataSourceFactory.newInstance(targetDataSourceConfig)) {
             String sourceDatabaseType = 
sourceDataSourceConfig.getDatabaseType().getType();
             String targetDatabaseType = 
targetDataSourceConfig.getDatabaseType().getType();
+            StandardPipelineTableMetaDataLoader metaDataLoader = new 
StandardPipelineTableMetaDataLoader(sourceDataSource);
             for (String each : Collections.singletonList(sourceTableName)) {
-                PipelineTableMetaData tableMetaData = 
PipelineTableMetaDataUtil.getPipelineTableMetaData(tableNameSchemaNameMapping.getSchemaName(each),
 each, sourceDataSource, null);
+                PipelineTableMetaData tableMetaData = 
metaDataLoader.getTableMetaData(tableNameSchemaNameMapping.getSchemaName(each), 
each);
                 if (null == tableMetaData) {
                     throw new PipelineDataConsistencyCheckFailedException("Can 
not get metadata for table " + each);
                 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
index bd4f0e67664..e2ad1584cb0 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
@@ -68,9 +68,10 @@ import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.connection.AddMigrationSourceResourceException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.connection.DropMigrationSourceResourceException;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineSchemaUtil;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataUtil;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
-import 
org.apache.shardingsphere.data.pipeline.core.util.PipelineSchemaTableUtil;
-import 
org.apache.shardingsphere.data.pipeline.core.util.PipelineTableMetaDataUtil;
 import 
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
 import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
@@ -452,7 +453,7 @@ public final class MigrationJobAPIImpl extends 
AbstractPipelineJobAPIImpl implem
         DatabaseType sourceDatabaseType = 
sourceDataSourceConfig.getDatabaseType();
         result.setSourceDatabaseType(sourceDatabaseType.getType());
         String sourceSchemaName = null == parameter.getSourceSchemaName() && 
sourceDatabaseType.isSchemaAvailable()
-                ? 
PipelineSchemaTableUtil.getDefaultSchema(sourceDataSourceConfig)
+                ? PipelineSchemaUtil.getDefaultSchema(sourceDataSourceConfig)
                 : parameter.getSourceSchemaName();
         result.setSourceSchemaName(sourceSchemaName);
         result.setSourceTableName(parameter.getSourceTableName());
@@ -469,9 +470,14 @@ public final class MigrationJobAPIImpl extends 
AbstractPipelineJobAPIImpl implem
         
result.setTargetDatabaseType(targetPipelineDataSource.getDatabaseType().getType());
         result.setTargetDatabaseName(targetDatabaseName);
         result.setTargetTableName(parameter.getTargetTableName());
-        YamlPipelineColumnMetaData uniqueKeyColumn = new 
YamlPipelineColumnMetaDataSwapper().swapToYamlConfiguration(PipelineTableMetaDataUtil.getUniqueKeyColumn(sourceSchemaName,
-                parameter.getSourceTableName(), sourceDataSourceConfig, null));
-        result.setUniqueKeyColumn(uniqueKeyColumn);
+        try (PipelineDataSourceWrapper dataSource = 
PipelineDataSourceFactory.newInstance(sourceDataSourceConfig)) {
+            StandardPipelineTableMetaDataLoader metaDataLoader = new 
StandardPipelineTableMetaDataLoader(dataSource);
+            YamlPipelineColumnMetaData uniqueKeyColumn = new 
YamlPipelineColumnMetaDataSwapper().swapToYamlConfiguration(
+                    
PipelineTableMetaDataUtil.getUniqueKeyColumn(sourceSchemaName, 
parameter.getSourceTableName(), metaDataLoader));
+            result.setUniqueKeyColumn(uniqueKeyColumn);
+        } catch (final SQLException ex) {
+            throw new RuntimeException(ex);
+        }
         extendYamlJobConfiguration(result);
         MigrationJobConfiguration jobConfiguration = new 
YamlMigrationJobConfigurationSwapper().swapToObject(result);
         start(jobConfiguration);
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
index 93776d3ae2e..1c913a4140d 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
@@ -64,17 +64,23 @@ public final class MigrationJobPreparer {
     public void prepare(final MigrationJobItemContext jobItemContext) throws 
SQLException {
         
PipelineJobPreparerUtils.checkSourceDataSource(jobItemContext.getJobConfig().getSourceDatabaseType(),
 Collections.singleton(jobItemContext.getSourceDataSource()));
         if (jobItemContext.isStopping()) {
+            log.info("prepare, job is stopping, jobId={}", 
jobItemContext.getJobId());
             PipelineJobCenter.stop(jobItemContext.getJobId());
+            return;
         }
         prepareAndCheckTargetWithLock(jobItemContext);
         if (jobItemContext.isStopping()) {
+            log.info("prepare, job is stopping, jobId={}", 
jobItemContext.getJobId());
             PipelineJobCenter.stop(jobItemContext.getJobId());
+            return;
         }
         // TODO check metadata
         if 
(PipelineJobPreparerUtils.isIncrementalSupported(jobItemContext.getJobConfig().getSourceDatabaseType()))
 {
             initIncrementalTasks(jobItemContext);
             if (jobItemContext.isStopping()) {
+                log.info("prepare, job is stopping, jobId={}", 
jobItemContext.getJobId());
                 PipelineJobCenter.stop(jobItemContext.getJobId());
+                return;
             }
         }
         initInventoryTasks(jobItemContext);
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineProcessConfigurationUtilsTest.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/config/process/PipelineProcessConfigurationUtilTest.java
similarity index 85%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineProcessConfigurationUtilsTest.java
rename to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/config/process/PipelineProcessConfigurationUtilTest.java
index c51f6f223af..0bbbd652e25 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineProcessConfigurationUtilsTest.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/config/process/PipelineProcessConfigurationUtilTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.util;
+package org.apache.shardingsphere.data.pipeline.core.config.process;
 
 import org.junit.Test;
 
@@ -25,12 +25,12 @@ import java.util.Collection;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 
-public final class PipelineProcessConfigurationUtilsTest {
+public final class PipelineProcessConfigurationUtilTest {
     
     @Test
     public void assertVerifyConfPathSuccess() {
         for (String each : Arrays.asList("/", "/READ", "/READ/RATE_LIMITER")) {
-            PipelineProcessConfigurationUtils.verifyConfPath(each);
+            PipelineProcessConfigurationUtil.verifyConfPath(each);
         }
     }
     
@@ -40,7 +40,7 @@ public final class PipelineProcessConfigurationUtilsTest {
         int failCount = 0;
         for (String each : confPaths) {
             try {
-                PipelineProcessConfigurationUtils.verifyConfPath(each);
+                PipelineProcessConfigurationUtil.verifyConfPath(each);
             } catch (final IllegalArgumentException ex) {
                 ++failCount;
             }
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
index 9e0d6934cf7..7e4237c1c7e 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
@@ -25,10 +25,11 @@ import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSource
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IntegerPrimaryKeyPosition;
 import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobByRangeException;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataUtil;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
 import 
org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
-import 
org.apache.shardingsphere.data.pipeline.core.util.PipelineTableMetaDataUtil;
 import org.apache.shardingsphere.data.pipeline.core.util.ReflectionUtil;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationTaskConfiguration;
@@ -134,7 +135,7 @@ public final class InventoryTaskSplitterTest {
     public void assertSplitInventoryDataWithoutPrimaryAndUniqueIndex() throws 
SQLException, NoSuchFieldException, IllegalAccessException {
         initNoPrimaryEnvironment(taskConfig.getDumperConfig());
         try (PipelineDataSourceWrapper dataSource = 
dataSourceManager.getDataSource(taskConfig.getDumperConfig().getDataSourceConfig()))
 {
-            PipelineColumnMetaData uniqueKeyColumn = 
PipelineTableMetaDataUtil.getUniqueKeyColumn(null, "t_order", dataSource, null);
+            PipelineColumnMetaData uniqueKeyColumn = 
PipelineTableMetaDataUtil.getUniqueKeyColumn(null, "t_order", new 
StandardPipelineTableMetaDataLoader(dataSource));
             ReflectionUtil.setFieldValue(jobItemContext.getJobConfig(), 
"uniqueKeyColumn", uniqueKeyColumn);
         }
         inventoryTaskSplitter.splitInventoryData(jobItemContext);
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java
index 2feab94d054..5d2c56534aa 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java
@@ -26,6 +26,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.config.process.yaml.YamlPipel
 import 
org.apache.shardingsphere.data.pipeline.api.config.process.yaml.YamlPipelineProcessConfigurationSwapper;
 import 
org.apache.shardingsphere.data.pipeline.api.config.process.yaml.YamlPipelineReadConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.core.config.process.PipelineProcessConfigurationUtil;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
@@ -168,7 +169,7 @@ public final class PipelineContextUtil {
         yamlReadConfig.setShardingSize(10);
         YamlPipelineProcessConfiguration yamlProcessConfig = new 
YamlPipelineProcessConfiguration();
         yamlProcessConfig.setRead(yamlReadConfig);
-        
PipelineProcessConfigurationUtils.fillInDefaultValue(yamlProcessConfig);
+        PipelineProcessConfigurationUtil.fillInDefaultValue(yamlProcessConfig);
         return new 
YamlPipelineProcessConfigurationSwapper().swapToObject(yamlProcessConfig);
     }
 }


Reply via email to