This is an automated email from the ASF dual-hosted git repository.
tuichenchuxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 274ef7bfbca Review and improve pipeline code (#21089)
274ef7bfbca is described below
commit 274ef7bfbca73125bf9a61c56408c1f2fffc9133
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Tue Sep 20 16:50:55 2022 +0800
Review and improve pipeline code (#21089)
* Improve PipelineTableMetaDataUtil
* Improve job preparer stopping
* Improve SchemaTableName
* Improve AbstractDataSourceChecker
* Move PipelineTableMetaDataUtil
* Rename and move PipelineSchemaTableUtil
* Rename and move PipelineProcessConfigurationUtils
---
.../pipeline/api/metadata/SchemaTableName.java | 19 ++++
.../core/api/impl/AbstractPipelineJobAPIImpl.java | 8 +-
.../datasource/AbstractDataSourceChecker.java | 25 ++--
.../process/PipelineProcessConfigurationUtil.java} | 6 +-
...AbstractInventoryIncrementalProcessContext.java | 4 +-
.../loader/PipelineSchemaUtil.java} | 13 ++-
.../metadata/loader/PipelineTableMetaDataUtil.java | 67 +++++++++++
.../core/util/PipelineTableMetaDataUtil.java | 126 ---------------------
.../migration/MigrationDataConsistencyChecker.java | 5 +-
.../scenario/migration/MigrationJobAPIImpl.java | 18 ++-
.../scenario/migration/MigrationJobPreparer.java | 6 +
.../PipelineProcessConfigurationUtilTest.java} | 8 +-
.../core/prepare/InventoryTaskSplitterTest.java | 5 +-
.../pipeline/core/util/PipelineContextUtil.java | 3 +-
14 files changed, 145 insertions(+), 168 deletions(-)
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/SchemaTableName.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/SchemaTableName.java
index f3f7dc7fbd4..e66e4670f98 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/SchemaTableName.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/SchemaTableName.java
@@ -22,6 +22,8 @@ import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
+import java.util.Objects;
+
/**
* Schema name and table name.
*/
@@ -35,4 +37,21 @@ public class SchemaTableName {
@NonNull
private final TableName tableName;
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final SchemaTableName that = (SchemaTableName) o;
+ return schemaName.equals(that.schemaName) &&
tableName.equals(that.tableName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(schemaName, tableName);
+ }
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
index a2e51f2fa05..3d5ce269185 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
@@ -32,6 +32,7 @@ import
org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
import
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI;
+import
org.apache.shardingsphere.data.pipeline.core.config.process.PipelineProcessConfigurationUtil;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationWithInvalidShardingCountException;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobHasAlreadyStartedException;
@@ -41,7 +42,6 @@ import
org.apache.shardingsphere.data.pipeline.core.exception.metadata.CreateExi
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
import
org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
-import
org.apache.shardingsphere.data.pipeline.core.util.PipelineProcessConfigurationUtils;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJob;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.JobBriefInfo;
@@ -98,16 +98,16 @@ public abstract class AbstractPipelineJobAPIImpl implements
PipelineJobAPI {
@Override
public void dropProcessConfiguration(final String confPath) {
String finalConfPath = confPath.trim();
- PipelineProcessConfigurationUtils.verifyConfPath(confPath);
+ PipelineProcessConfigurationUtil.verifyConfPath(confPath);
YamlPipelineProcessConfiguration targetYamlProcessConfig =
getTargetYamlProcessConfiguration();
-
PipelineProcessConfigurationUtils.setFieldsNullByConfPath(targetYamlProcessConfig,
finalConfPath);
+
PipelineProcessConfigurationUtil.setFieldsNullByConfPath(targetYamlProcessConfig,
finalConfPath);
processConfigPersistService.persist(getJobType(),
PROCESS_CONFIG_SWAPPER.swapToObject(targetYamlProcessConfig));
}
@Override
public PipelineProcessConfiguration showProcessConfiguration() {
PipelineProcessConfiguration result =
processConfigPersistService.load(getJobType());
- result =
PipelineProcessConfigurationUtils.convertWithDefaultValue(result);
+ result =
PipelineProcessConfigurationUtil.convertWithDefaultValue(result);
return result;
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/datasource/AbstractDataSourceChecker.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/datasource/AbstractDataSourceChecker.java
index 847cae0ac4e..a06c465aa99 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/datasource/AbstractDataSourceChecker.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/datasource/AbstractDataSourceChecker.java
@@ -24,7 +24,6 @@ import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWith
import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
import
org.apache.shardingsphere.data.pipeline.spi.check.datasource.DataSourceChecker;
import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
-import
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
import javax.sql.DataSource;
import java.sql.Connection;
@@ -54,23 +53,25 @@ public abstract class AbstractDataSourceChecker implements
DataSourceChecker {
public final void checkTargetTable(final Collection<? extends DataSource>
dataSources, final TableNameSchemaNameMapping tableNameSchemaNameMapping, final
Collection<String> logicTableNames) {
try {
for (DataSource each : dataSources) {
- checkEmpty(each, tableNameSchemaNameMapping, logicTableNames);
+ for (String tableName : logicTableNames) {
+ if (!checkEmpty(each,
tableNameSchemaNameMapping.getSchemaName(tableName), tableName)) {
+ throw new
PrepareJobWithTargetTableNotEmptyException(tableName);
+ }
+ }
}
} catch (final SQLException ex) {
throw new PrepareJobWithInvalidConnectionException(ex);
}
}
- private void checkEmpty(final DataSource dataSource, final
TableNameSchemaNameMapping tableNameSchemaNameMapping, final Collection<String>
logicTableNames) throws SQLException {
- for (String each : logicTableNames) {
- String sql =
getSQLBuilder().buildCheckEmptySQL(tableNameSchemaNameMapping.getSchemaName(each),
each);
- log.info("Check whether table is empty, SQL: {}", sql);
- try (
- Connection connection = dataSource.getConnection();
- PreparedStatement preparedStatement =
connection.prepareStatement(sql);
- ResultSet resultSet = preparedStatement.executeQuery()) {
- ShardingSpherePreconditions.checkState(!resultSet.next(), ()
-> new PrepareJobWithTargetTableNotEmptyException(each));
- }
+ private boolean checkEmpty(final DataSource dataSource, final String
schemaName, final String tableName) throws SQLException {
+ String sql = getSQLBuilder().buildCheckEmptySQL(schemaName, tableName);
+ log.info("checkEmpty, sql={}", sql);
+ try (
+ Connection connection = dataSource.getConnection();
+ PreparedStatement preparedStatement =
connection.prepareStatement(sql);
+ ResultSet resultSet = preparedStatement.executeQuery()) {
+ return !resultSet.next();
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineProcessConfigurationUtils.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/config/process/PipelineProcessConfigurationUtil.java
similarity index 97%
rename from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineProcessConfigurationUtils.java
rename to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/config/process/PipelineProcessConfigurationUtil.java
index b9369180c14..07b67b000cb 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineProcessConfigurationUtils.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/config/process/PipelineProcessConfigurationUtil.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.util;
+package org.apache.shardingsphere.data.pipeline.core.config.process;
import com.google.common.base.Splitter;
import
org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
@@ -31,9 +31,9 @@ import java.util.Properties;
import java.util.regex.Pattern;
/**
- * Pipeline process configuration utils.
+ * Pipeline process configuration util.
*/
-public final class PipelineProcessConfigurationUtils {
+public final class PipelineProcessConfigurationUtil {
private static final YamlPipelineProcessConfigurationSwapper SWAPPER = new
YamlPipelineProcessConfigurationSwapper();
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/AbstractInventoryIncrementalProcessContext.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/AbstractInventoryIncrementalProcessContext.java
index d04fb51346d..26a731fc75c 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/AbstractInventoryIncrementalProcessContext.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/AbstractInventoryIncrementalProcessContext.java
@@ -25,8 +25,8 @@ import org.apache.commons.lang3.concurrent.LazyInitializer;
import
org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.process.PipelineReadConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.process.PipelineWriteConfiguration;
+import
org.apache.shardingsphere.data.pipeline.core.config.process.PipelineProcessConfigurationUtil;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
-import
org.apache.shardingsphere.data.pipeline.core.util.PipelineProcessConfigurationUtils;
import
org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
import
org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreatorFactory;
import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
@@ -55,7 +55,7 @@ public abstract class
AbstractInventoryIncrementalProcessContext implements Inve
private final LazyInitializer<ExecuteEngine>
importerExecuteEngineLazyInitializer;
public AbstractInventoryIncrementalProcessContext(final String jobId,
final PipelineProcessConfiguration originalProcessConfig) {
- PipelineProcessConfiguration processConfig =
PipelineProcessConfigurationUtils.convertWithDefaultValue(originalProcessConfig);
+ PipelineProcessConfiguration processConfig =
PipelineProcessConfigurationUtil.convertWithDefaultValue(originalProcessConfig);
this.pipelineProcessConfig = processConfig;
PipelineReadConfiguration readConfig = processConfig.getRead();
AlgorithmConfiguration readRateLimiter = readConfig.getRateLimiter();
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineSchemaTableUtil.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineSchemaUtil.java
similarity index 83%
rename from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineSchemaTableUtil.java
rename to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineSchemaUtil.java
index 08618150c19..4eff51a7d02 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineSchemaTableUtil.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineSchemaUtil.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.util;
+package org.apache.shardingsphere.data.pipeline.core.metadata.loader;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
@@ -29,24 +29,25 @@ import java.sql.Connection;
import java.sql.SQLException;
/**
- * Pipeline schema table util.
+ * Pipeline schema util.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
@Slf4j
-public final class PipelineSchemaTableUtil {
+public final class PipelineSchemaUtil {
/**
* Get default schema by connection.getSchema().
*
- * @param dataSourceConfig pipeline data source config
+ * @param dataSourceConfig pipeline data source configuration
* @return schema
*/
@SneakyThrows(SQLException.class)
public static String getDefaultSchema(final
PipelineDataSourceConfiguration dataSourceConfig) {
try (PipelineDataSourceWrapper dataSource =
PipelineDataSourceFactory.newInstance(dataSourceConfig)) {
try (Connection connection = dataSource.getConnection()) {
- log.info("get default schema {}", connection.getSchema());
- return connection.getSchema();
+ String result = connection.getSchema();
+ log.info("get default schema {}", result);
+ return result;
}
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineTableMetaDataUtil.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineTableMetaDataUtil.java
new file mode 100644
index 00000000000..2935845ac78
--- /dev/null
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineTableMetaDataUtil.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.metadata.loader;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
+import
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineIndexMetaData;
+import
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
+import
org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobByRangeException;
+import
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
+
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Pipeline table meta data util.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class PipelineTableMetaDataUtil {
+
+ /**
+ * Get unique key column.
+ *
+ * @param schemaName schema name
+ * @param tableName table name
+ * @param metaDataLoader meta data loader
+ * @return pipeline column meta data
+ */
+ public static PipelineColumnMetaData getUniqueKeyColumn(final String
schemaName, final String tableName, final StandardPipelineTableMetaDataLoader
metaDataLoader) {
+ PipelineTableMetaData pipelineTableMetaData =
metaDataLoader.getTableMetaData(schemaName, tableName);
+ return mustGetAnAppropriateUniqueKeyColumn(pipelineTableMetaData,
tableName);
+ }
+
+ private static PipelineColumnMetaData
mustGetAnAppropriateUniqueKeyColumn(final PipelineTableMetaData tableMetaData,
final String tableName) {
+ ShardingSpherePreconditions.checkNotNull(tableMetaData, () -> new
SplitPipelineJobByRangeException(tableName, "can not get table metadata"));
+ List<String> primaryKeys = tableMetaData.getPrimaryKeyColumns();
+ if (1 == primaryKeys.size()) {
+ return
tableMetaData.getColumnMetaData(tableMetaData.getPrimaryKeyColumns().get(0));
+ }
+ ShardingSpherePreconditions.checkState(primaryKeys.isEmpty(), () ->
new SplitPipelineJobByRangeException(tableName, "primary key is union
primary"));
+ Collection<PipelineIndexMetaData> uniqueIndexes =
tableMetaData.getUniqueIndexes();
+ ShardingSpherePreconditions.checkState(!uniqueIndexes.isEmpty(), () ->
new SplitPipelineJobByRangeException(tableName, "no primary key or unique
index"));
+ if (1 == uniqueIndexes.size() && 1 ==
uniqueIndexes.iterator().next().getColumns().size()) {
+ PipelineColumnMetaData column =
uniqueIndexes.iterator().next().getColumns().get(0);
+ if (!column.isNullable()) {
+ return column;
+ }
+ }
+ throw new SplitPipelineJobByRangeException(tableName, "table contains
multiple unique index or unique index contains nullable/multiple column(s)");
+ }
+}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineTableMetaDataUtil.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineTableMetaDataUtil.java
deleted file mode 100644
index abdb15dfff4..00000000000
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineTableMetaDataUtil.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.util;
-
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-import lombok.SneakyThrows;
-import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
-import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
-import
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
-import
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
-import
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineIndexMetaData;
-import
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
-import
org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobByRangeException;
-import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
-import
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
-
-import java.sql.SQLException;
-import java.util.Collection;
-import java.util.List;
-
-/**
- * Pipeline table meta data util.
- */
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class PipelineTableMetaDataUtil {
-
- /**
- * Get pipeline table meta data.
- *
- * @param schemaName schema name
- * @param tableName table name
- * @param dataSourceConfig source configuration
- * @param loader pipeline table meta data loader
- * @return pipeline table meta data
- */
- @SneakyThrows(SQLException.class)
- public static PipelineTableMetaData getPipelineTableMetaData(final String
schemaName, final String tableName, final
StandardPipelineDataSourceConfiguration dataSourceConfig,
- final
PipelineTableMetaDataLoader loader) {
- try (PipelineDataSourceWrapper dataSource =
PipelineDataSourceFactory.newInstance(dataSourceConfig)) {
- return getPipelineTableMetaData(schemaName, tableName, dataSource,
loader);
- }
- }
-
- /**
- * Get pipeline table meta data.
- *
- * @param schemaName schema name
- * @param tableName table name
- * @param dataSource data source
- * @param loader pipeline table meta data loader
- * @return pipeline table meta data.
- */
- public static PipelineTableMetaData getPipelineTableMetaData(final String
schemaName, final String tableName, final PipelineDataSourceWrapper dataSource,
- final
PipelineTableMetaDataLoader loader) {
- if (null == loader) {
- return new
StandardPipelineTableMetaDataLoader(dataSource).getTableMetaData(schemaName,
tableName);
- } else {
- return loader.getTableMetaData(schemaName, tableName);
- }
- }
-
- /**
- * Get unique key column.
- *
- * @param schemaName schema name
- * @param tableName table name
- * @param dataSourceConfig data source config
- * @param loader pipeline table meta data loader
- * @return pipeline column meta data.
- */
- public static PipelineColumnMetaData getUniqueKeyColumn(final String
schemaName, final String tableName, final
StandardPipelineDataSourceConfiguration dataSourceConfig,
- final
StandardPipelineTableMetaDataLoader loader) {
- PipelineTableMetaData pipelineTableMetaData =
getPipelineTableMetaData(schemaName, tableName, dataSourceConfig, loader);
- return mustGetAnAppropriateUniqueKeyColumn(pipelineTableMetaData,
tableName);
- }
-
- /**
- * Get unique key column.
- *
- * @param schemaName schema name
- * @param tableName table name
- * @param dataSource data source
- * @param loader pipeline table meta data loader
- * @return pipeline column meta data.
- */
- public static PipelineColumnMetaData getUniqueKeyColumn(final String
schemaName, final String tableName, final PipelineDataSourceWrapper dataSource,
- final
StandardPipelineTableMetaDataLoader loader) {
- PipelineTableMetaData pipelineTableMetaData =
getPipelineTableMetaData(schemaName, tableName, dataSource, loader);
- return mustGetAnAppropriateUniqueKeyColumn(pipelineTableMetaData,
tableName);
- }
-
- private static PipelineColumnMetaData
mustGetAnAppropriateUniqueKeyColumn(final PipelineTableMetaData tableMetaData,
final String tableName) {
- ShardingSpherePreconditions.checkNotNull(tableMetaData, () -> new
SplitPipelineJobByRangeException(tableName, "can not get table metadata"));
- List<String> primaryKeys = tableMetaData.getPrimaryKeyColumns();
- if (1 == primaryKeys.size()) {
- return
tableMetaData.getColumnMetaData(tableMetaData.getPrimaryKeyColumns().get(0));
- }
- ShardingSpherePreconditions.checkState(primaryKeys.isEmpty(), () ->
new SplitPipelineJobByRangeException(tableName, "primary key is union
primary"));
- Collection<PipelineIndexMetaData> uniqueIndexes =
tableMetaData.getUniqueIndexes();
- ShardingSpherePreconditions.checkState(!uniqueIndexes.isEmpty(), () ->
new SplitPipelineJobByRangeException(tableName, "no primary key or unique
index"));
- if (1 == uniqueIndexes.size() && 1 ==
uniqueIndexes.iterator().next().getColumns().size()) {
- PipelineColumnMetaData column =
uniqueIndexes.iterator().next().getColumns().get(0);
- if (!column.isNullable()) {
- return column;
- }
- }
- throw new SplitPipelineJobByRangeException(tableName, "table contains
multiple unique index or unique index contains nullable/multiple column(s)");
- }
-}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
index c6061c1eaf4..fbbdcca1389 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
@@ -32,8 +32,8 @@ import
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumn
import
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
import
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineDataConsistencyCheckFailedException;
+import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
-import
org.apache.shardingsphere.data.pipeline.core.util.PipelineTableMetaDataUtil;
import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
@@ -173,8 +173,9 @@ public final class MigrationDataConsistencyChecker {
PipelineDataSourceWrapper targetDataSource =
PipelineDataSourceFactory.newInstance(targetDataSourceConfig)) {
String sourceDatabaseType =
sourceDataSourceConfig.getDatabaseType().getType();
String targetDatabaseType =
targetDataSourceConfig.getDatabaseType().getType();
+ StandardPipelineTableMetaDataLoader metaDataLoader = new
StandardPipelineTableMetaDataLoader(sourceDataSource);
for (String each : Collections.singletonList(sourceTableName)) {
- PipelineTableMetaData tableMetaData =
PipelineTableMetaDataUtil.getPipelineTableMetaData(tableNameSchemaNameMapping.getSchemaName(each),
each, sourceDataSource, null);
+ PipelineTableMetaData tableMetaData =
metaDataLoader.getTableMetaData(tableNameSchemaNameMapping.getSchemaName(each),
each);
if (null == tableMetaData) {
throw new PipelineDataConsistencyCheckFailedException("Can
not get metadata for table " + each);
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
index bd4f0e67664..e2ad1584cb0 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
@@ -68,9 +68,10 @@ import
org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
import
org.apache.shardingsphere.data.pipeline.core.exception.connection.AddMigrationSourceResourceException;
import
org.apache.shardingsphere.data.pipeline.core.exception.connection.DropMigrationSourceResourceException;
+import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineSchemaUtil;
+import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataUtil;
+import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
-import
org.apache.shardingsphere.data.pipeline.core.util.PipelineSchemaTableUtil;
-import
org.apache.shardingsphere.data.pipeline.core.util.PipelineTableMetaDataUtil;
import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
@@ -452,7 +453,7 @@ public final class MigrationJobAPIImpl extends
AbstractPipelineJobAPIImpl implem
DatabaseType sourceDatabaseType =
sourceDataSourceConfig.getDatabaseType();
result.setSourceDatabaseType(sourceDatabaseType.getType());
String sourceSchemaName = null == parameter.getSourceSchemaName() &&
sourceDatabaseType.isSchemaAvailable()
- ?
PipelineSchemaTableUtil.getDefaultSchema(sourceDataSourceConfig)
+ ? PipelineSchemaUtil.getDefaultSchema(sourceDataSourceConfig)
: parameter.getSourceSchemaName();
result.setSourceSchemaName(sourceSchemaName);
result.setSourceTableName(parameter.getSourceTableName());
@@ -469,9 +470,14 @@ public final class MigrationJobAPIImpl extends
AbstractPipelineJobAPIImpl implem
result.setTargetDatabaseType(targetPipelineDataSource.getDatabaseType().getType());
result.setTargetDatabaseName(targetDatabaseName);
result.setTargetTableName(parameter.getTargetTableName());
- YamlPipelineColumnMetaData uniqueKeyColumn = new
YamlPipelineColumnMetaDataSwapper().swapToYamlConfiguration(PipelineTableMetaDataUtil.getUniqueKeyColumn(sourceSchemaName,
- parameter.getSourceTableName(), sourceDataSourceConfig, null));
- result.setUniqueKeyColumn(uniqueKeyColumn);
+ try (PipelineDataSourceWrapper dataSource =
PipelineDataSourceFactory.newInstance(sourceDataSourceConfig)) {
+ StandardPipelineTableMetaDataLoader metaDataLoader = new
StandardPipelineTableMetaDataLoader(dataSource);
+ YamlPipelineColumnMetaData uniqueKeyColumn = new
YamlPipelineColumnMetaDataSwapper().swapToYamlConfiguration(
+
PipelineTableMetaDataUtil.getUniqueKeyColumn(sourceSchemaName,
parameter.getSourceTableName(), metaDataLoader));
+ result.setUniqueKeyColumn(uniqueKeyColumn);
+ } catch (final SQLException ex) {
+ throw new RuntimeException(ex);
+ }
extendYamlJobConfiguration(result);
MigrationJobConfiguration jobConfiguration = new
YamlMigrationJobConfigurationSwapper().swapToObject(result);
start(jobConfiguration);
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
index 93776d3ae2e..1c913a4140d 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
@@ -64,17 +64,23 @@ public final class MigrationJobPreparer {
public void prepare(final MigrationJobItemContext jobItemContext) throws
SQLException {
PipelineJobPreparerUtils.checkSourceDataSource(jobItemContext.getJobConfig().getSourceDatabaseType(),
Collections.singleton(jobItemContext.getSourceDataSource()));
if (jobItemContext.isStopping()) {
+ log.info("prepare, job is stopping, jobId={}",
jobItemContext.getJobId());
PipelineJobCenter.stop(jobItemContext.getJobId());
+ return;
}
prepareAndCheckTargetWithLock(jobItemContext);
if (jobItemContext.isStopping()) {
+ log.info("prepare, job is stopping, jobId={}",
jobItemContext.getJobId());
PipelineJobCenter.stop(jobItemContext.getJobId());
+ return;
}
// TODO check metadata
if
(PipelineJobPreparerUtils.isIncrementalSupported(jobItemContext.getJobConfig().getSourceDatabaseType()))
{
initIncrementalTasks(jobItemContext);
if (jobItemContext.isStopping()) {
+ log.info("prepare, job is stopping, jobId={}",
jobItemContext.getJobId());
PipelineJobCenter.stop(jobItemContext.getJobId());
+ return;
}
}
initInventoryTasks(jobItemContext);
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineProcessConfigurationUtilsTest.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/config/process/PipelineProcessConfigurationUtilTest.java
similarity index 85%
rename from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineProcessConfigurationUtilsTest.java
rename to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/config/process/PipelineProcessConfigurationUtilTest.java
index c51f6f223af..0bbbd652e25 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineProcessConfigurationUtilsTest.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/config/process/PipelineProcessConfigurationUtilTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.util;
+package org.apache.shardingsphere.data.pipeline.core.config.process;
import org.junit.Test;
@@ -25,12 +25,12 @@ import java.util.Collection;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
-public final class PipelineProcessConfigurationUtilsTest {
+public final class PipelineProcessConfigurationUtilTest {
@Test
public void assertVerifyConfPathSuccess() {
for (String each : Arrays.asList("/", "/READ", "/READ/RATE_LIMITER")) {
- PipelineProcessConfigurationUtils.verifyConfPath(each);
+ PipelineProcessConfigurationUtil.verifyConfPath(each);
}
}
@@ -40,7 +40,7 @@ public final class PipelineProcessConfigurationUtilsTest {
int failCount = 0;
for (String each : confPaths) {
try {
- PipelineProcessConfigurationUtils.verifyConfPath(each);
+ PipelineProcessConfigurationUtil.verifyConfPath(each);
} catch (final IllegalArgumentException ex) {
++failCount;
}
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
index 9e0d6934cf7..7e4237c1c7e 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
@@ -25,10 +25,11 @@ import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSource
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.IntegerPrimaryKeyPosition;
import
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobByRangeException;
+import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataUtil;
+import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
import
org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
-import
org.apache.shardingsphere.data.pipeline.core.util.PipelineTableMetaDataUtil;
import org.apache.shardingsphere.data.pipeline.core.util.ReflectionUtil;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobItemContext;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationTaskConfiguration;
@@ -134,7 +135,7 @@ public final class InventoryTaskSplitterTest {
public void assertSplitInventoryDataWithoutPrimaryAndUniqueIndex() throws
SQLException, NoSuchFieldException, IllegalAccessException {
initNoPrimaryEnvironment(taskConfig.getDumperConfig());
try (PipelineDataSourceWrapper dataSource =
dataSourceManager.getDataSource(taskConfig.getDumperConfig().getDataSourceConfig()))
{
- PipelineColumnMetaData uniqueKeyColumn =
PipelineTableMetaDataUtil.getUniqueKeyColumn(null, "t_order", dataSource, null);
+ PipelineColumnMetaData uniqueKeyColumn =
PipelineTableMetaDataUtil.getUniqueKeyColumn(null, "t_order", new
StandardPipelineTableMetaDataLoader(dataSource));
ReflectionUtil.setFieldValue(jobItemContext.getJobConfig(),
"uniqueKeyColumn", uniqueKeyColumn);
}
inventoryTaskSplitter.splitInventoryData(jobItemContext);
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java
index 2feab94d054..5d2c56534aa 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java
@@ -26,6 +26,7 @@ import
org.apache.shardingsphere.data.pipeline.api.config.process.yaml.YamlPipel
import
org.apache.shardingsphere.data.pipeline.api.config.process.yaml.YamlPipelineProcessConfigurationSwapper;
import
org.apache.shardingsphere.data.pipeline.api.config.process.yaml.YamlPipelineReadConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
+import
org.apache.shardingsphere.data.pipeline.core.config.process.PipelineProcessConfigurationUtil;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
@@ -168,7 +169,7 @@ public final class PipelineContextUtil {
yamlReadConfig.setShardingSize(10);
YamlPipelineProcessConfiguration yamlProcessConfig = new
YamlPipelineProcessConfiguration();
yamlProcessConfig.setRead(yamlReadConfig);
-
PipelineProcessConfigurationUtils.fillInDefaultValue(yamlProcessConfig);
+ PipelineProcessConfigurationUtil.fillInDefaultValue(yamlProcessConfig);
return new
YamlPipelineProcessConfigurationSwapper().swapToObject(yamlProcessConfig);
}
}