This is an automated email from the ASF dual-hosted git repository.
tuichenchuxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new fa653d5da23 Extract InventoryIncrementalJobPublicAPI for common usage
(#21111)
fa653d5da23 is described below
commit fa653d5da237768c2972da0c4078c2524bc0f27e
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Wed Sep 21 17:13:53 2022 +0800
Extract InventoryIncrementalJobPublicAPI for common usage (#21111)
* Move commit and rollback from PipelineJobPublicAPI to
MigrationJobPublicAPI
* Add InventoryIncrementalJobPublicAPI, move PipelineJobPublicAPI
processConfig apis
* Move commit and rollback to InventoryIncrementalJobPublicAPI
* Throw SQLException for commit
* Fix compile error
---
.../handler/update/CommitMigrationUpdater.java | 8 ++-
....java => InventoryIncrementalJobPublicAPI.java} | 30 ++-------
.../data/pipeline/api/MigrationJobPublicAPI.java | 2 +-
.../data/pipeline/api/PipelineJobPublicAPI.java | 48 ++------------
.../pipeline/api/PipelineJobPublicAPIFactory.java | 8 +--
.../core/api/impl/AbstractPipelineJobAPIImpl.java | 71 +--------------------
.../impl/InventoryIncrementalJobPublicAPIImpl.java | 74 ++++++++++++++++++++++
.../scenario/migration/MigrationJobAPIImpl.java | 23 +++++--
....pipeline.api.InventoryIncrementalJobPublicAPI} | 0
...toryIncrementalProcessConfigurationUpdater.java | 4 +-
...toryIncrementalProcessConfigurationUpdater.java | 4 +-
.../DropPipelineProcessConfigurationUpdater.java | 4 +-
.../api/PipelineJobPublicAPIFactoryTest.java | 8 +--
.../core/api/impl/MigrationJobAPIImplTest.java | 2 +-
14 files changed, 126 insertions(+), 160 deletions(-)
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CommitMigrationUpdater.java
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CommitMigrationUpdater.java
index 7f1544e10fd..5d1370a629c 100644
---
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CommitMigrationUpdater.java
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CommitMigrationUpdater.java
@@ -22,6 +22,8 @@ import
org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
import org.apache.shardingsphere.infra.distsql.update.RALUpdater;
import
org.apache.shardingsphere.migration.distsql.statement.CommitMigrationStatement;
+import java.sql.SQLException;
+
/**
* Commit migration updater.
*/
@@ -31,7 +33,11 @@ public final class CommitMigrationUpdater implements
RALUpdater<CommitMigrationS
@Override
public void executeUpdate(final String databaseName, final
CommitMigrationStatement sqlStatement) {
- JOB_API.commit(sqlStatement.getJobId());
+ try {
+ JOB_API.commit(sqlStatement.getJobId());
+ } catch (final SQLException ex) {
+ throw new RuntimeException(ex);
+ }
}
@Override
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPI.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/InventoryIncrementalJobPublicAPI.java
similarity index 78%
copy from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPI.java
copy to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/InventoryIncrementalJobPublicAPI.java
index 2898ade8cc1..53b89e72f35 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPI.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/InventoryIncrementalJobPublicAPI.java
@@ -18,16 +18,14 @@
package org.apache.shardingsphere.data.pipeline.api;
import
org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
import java.sql.SQLException;
-import java.util.List;
/**
- * Pipeline job public API.
+ * Inventory incremental job API.
*/
-public interface PipelineJobPublicAPI extends TypedSPI {
+public interface InventoryIncrementalJobPublicAPI extends
PipelineJobPublicAPI, TypedSPI {
/**
* Create process configuration.
@@ -57,20 +55,6 @@ public interface PipelineJobPublicAPI extends TypedSPI {
*/
PipelineProcessConfiguration showProcessConfiguration();
- /**
- * Start disabled job.
- *
- * @param jobId job id
- */
- void startDisabledJob(String jobId);
-
- /**
- * Stop pipeline job.
- *
- * @param jobId job id
- */
- void stop(String jobId);
-
/**
* Rollback pipeline job.
*
@@ -83,13 +67,7 @@ public interface PipelineJobPublicAPI extends TypedSPI {
* Commit pipeline job.
*
* @param jobId job id
+ * @throws SQLException when commit underlying database data
*/
- void commit(String jobId);
-
- /**
- * Get pipeline job info.
- *
- * @return jobInfos
- */
- List<? extends PipelineJobInfo> list();
+ void commit(String jobId) throws SQLException;
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/MigrationJobPublicAPI.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/MigrationJobPublicAPI.java
index 2ebc30b0921..e60b48bfe92 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/MigrationJobPublicAPI.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/MigrationJobPublicAPI.java
@@ -35,7 +35,7 @@ import java.util.Properties;
* Migration job public API.
*/
@SingletonSPI
-public interface MigrationJobPublicAPI extends PipelineJobPublicAPI,
RequiredSPI {
+public interface MigrationJobPublicAPI extends
InventoryIncrementalJobPublicAPI, RequiredSPI {
/**
* List all jobs.
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPI.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPI.java
index 2898ade8cc1..1b635825c7e 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPI.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPI.java
@@ -17,45 +17,22 @@
package org.apache.shardingsphere.data.pipeline.api;
-import
org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
-import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
-import java.sql.SQLException;
import java.util.List;
/**
* Pipeline job public API.
*/
-public interface PipelineJobPublicAPI extends TypedSPI {
+public interface PipelineJobPublicAPI {
/**
- * Create process configuration.
+ * Get job type.
*
- * @param processConfig process configuration
+ * @return job type
*/
- void createProcessConfiguration(PipelineProcessConfiguration
processConfig);
-
- /**
- * Alter process configuration.
- *
- * @param processConfig process configuration
- */
- void alterProcessConfiguration(PipelineProcessConfiguration processConfig);
-
- /**
- * Drop process configuration.
- *
- * @param confPath configuration path. e.g. <code>/</code>,
<code>/READ</code>, <code>/READ/RATE_LIMITER</code>
- */
- void dropProcessConfiguration(String confPath);
-
- /**
- * Show process configuration.
- *
- * @return process configuration, non-null
- */
- PipelineProcessConfiguration showProcessConfiguration();
+ JobType getJobType();
/**
* Start disabled job.
@@ -71,21 +48,6 @@ public interface PipelineJobPublicAPI extends TypedSPI {
*/
void stop(String jobId);
- /**
- * Rollback pipeline job.
- *
- * @param jobId job id
- * @throws SQLException when rollback underlying database data
- */
- void rollback(String jobId) throws SQLException;
-
- /**
- * Commit pipeline job.
- *
- * @param jobId job id
- */
- void commit(String jobId);
-
/**
* Get pipeline job info.
*
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactory.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactory.java
index 4443bbc6db1..a6ab527ac32 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactory.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactory.java
@@ -28,18 +28,18 @@ import
org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPIRegistry;
public final class PipelineJobPublicAPIFactory {
static {
- ShardingSphereServiceLoader.register(PipelineJobPublicAPI.class);
+
ShardingSphereServiceLoader.register(InventoryIncrementalJobPublicAPI.class);
ShardingSphereServiceLoader.register(MigrationJobPublicAPI.class);
}
/**
- * Get instance of pipeline job public API.
+ * Get instance of inventory incremental job public API.
*
* @param jobTypeName job type name
* @return got instance
*/
- public static PipelineJobPublicAPI getPipelineJobPublicAPI(@NonNull final
String jobTypeName) {
- return
TypedSPIRegistry.getRegisteredService(PipelineJobPublicAPI.class, jobTypeName);
+ public static InventoryIncrementalJobPublicAPI
getInventoryIncrementalJobPublicAPI(@NonNull final String jobTypeName) {
+ return
TypedSPIRegistry.getRegisteredService(InventoryIncrementalJobPublicAPI.class,
jobTypeName);
}
/**
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
index 3d5ce269185..235ff348eac 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
@@ -23,22 +23,15 @@ import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.lang3.ObjectUtils;
import
org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration;
-import
org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
-import
org.apache.shardingsphere.data.pipeline.api.config.process.yaml.YamlPipelineProcessConfiguration;
-import
org.apache.shardingsphere.data.pipeline.api.config.process.yaml.YamlPipelineProcessConfigurationSwapper;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
import
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI;
-import
org.apache.shardingsphere.data.pipeline.core.config.process.PipelineProcessConfigurationUtil;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationWithInvalidShardingCountException;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobHasAlreadyStartedException;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
-import
org.apache.shardingsphere.data.pipeline.core.exception.metadata.AlterNotExistProcessConfigurationException;
-import
org.apache.shardingsphere.data.pipeline.core.exception.metadata.CreateExistsProcessConfigurationException;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
import
org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
@@ -49,7 +42,6 @@ import
org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
import
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
-import java.sql.SQLException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
@@ -66,51 +58,8 @@ public abstract class AbstractPipelineJobAPIImpl implements
PipelineJobAPI {
protected static final DateTimeFormatter DATE_TIME_FORMATTER =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
- private static final YamlPipelineProcessConfigurationSwapper
PROCESS_CONFIG_SWAPPER = new YamlPipelineProcessConfigurationSwapper();
-
- private final PipelineProcessConfigurationPersistService
processConfigPersistService = new PipelineProcessConfigurationPersistService();
-
private final PipelineDistributedBarrier pipelineDistributedBarrier =
PipelineDistributedBarrier.getInstance();
- protected abstract JobType getJobType();
-
- @Override
- public void createProcessConfiguration(final PipelineProcessConfiguration
processConfig) {
- PipelineProcessConfiguration existingProcessConfig =
processConfigPersistService.load(getJobType());
- ShardingSpherePreconditions.checkState(null == existingProcessConfig,
CreateExistsProcessConfigurationException::new);
- processConfigPersistService.persist(getJobType(), processConfig);
- }
-
- @Override
- public void alterProcessConfiguration(final PipelineProcessConfiguration
processConfig) {
- // TODO check rateLimiter type match or not
- YamlPipelineProcessConfiguration targetYamlProcessConfig =
getTargetYamlProcessConfiguration();
-
targetYamlProcessConfig.copyNonNullFields(PROCESS_CONFIG_SWAPPER.swapToYamlConfiguration(processConfig));
- processConfigPersistService.persist(getJobType(),
PROCESS_CONFIG_SWAPPER.swapToObject(targetYamlProcessConfig));
- }
-
- private YamlPipelineProcessConfiguration
getTargetYamlProcessConfiguration() {
- PipelineProcessConfiguration existingProcessConfig =
processConfigPersistService.load(getJobType());
- ShardingSpherePreconditions.checkNotNull(existingProcessConfig,
AlterNotExistProcessConfigurationException::new);
- return
PROCESS_CONFIG_SWAPPER.swapToYamlConfiguration(existingProcessConfig);
- }
-
- @Override
- public void dropProcessConfiguration(final String confPath) {
- String finalConfPath = confPath.trim();
- PipelineProcessConfigurationUtil.verifyConfPath(confPath);
- YamlPipelineProcessConfiguration targetYamlProcessConfig =
getTargetYamlProcessConfiguration();
-
PipelineProcessConfigurationUtil.setFieldsNullByConfPath(targetYamlProcessConfig,
finalConfPath);
- processConfigPersistService.persist(getJobType(),
PROCESS_CONFIG_SWAPPER.swapToObject(targetYamlProcessConfig));
- }
-
- @Override
- public PipelineProcessConfiguration showProcessConfiguration() {
- PipelineProcessConfiguration result =
processConfigPersistService.load(getJobType());
- result =
PipelineProcessConfigurationUtil.convertWithDefaultValue(result);
- return result;
- }
-
@Override
public final String marshalJobId(final PipelineJobId pipelineJobId) {
return PipelineJobIdUtils.marshalJobIdCommonPrefix(pipelineJobId) +
marshalJobIdLeftPart(pipelineJobId);
@@ -200,29 +149,11 @@ public abstract class AbstractPipelineJobAPIImpl
implements PipelineJobAPI {
pipelineDistributedBarrier.await(barrierPath, 5, TimeUnit.SECONDS);
}
- @Override
- public void rollback(final String jobId) throws SQLException {
- log.info("Rollback job {}", jobId);
- stop(jobId);
- cleanTempTableOnRollback(jobId);
- dropJob(jobId);
- }
-
- protected abstract void cleanTempTableOnRollback(String jobId) throws
SQLException;
-
- private void dropJob(final String jobId) {
+ protected void dropJob(final String jobId) {
PipelineAPIFactory.getJobOperateAPI().remove(String.valueOf(jobId),
null);
PipelineAPIFactory.getGovernanceRepositoryAPI().deleteJob(jobId);
}
- @Override
- public void commit(final String jobId) {
- checkModeConfig();
- log.info("Commit job {}", jobId);
- stop(jobId);
- dropJob(jobId);
- }
-
protected final JobConfigurationPOJO getElasticJobConfigPOJO(final String
jobId) {
JobConfigurationPOJO result =
PipelineAPIFactory.getJobConfigurationAPI().getJobConfiguration(jobId);
Preconditions.checkNotNull(result, new
PipelineJobNotFoundException(jobId));
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/InventoryIncrementalJobPublicAPIImpl.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/InventoryIncrementalJobPublicAPIImpl.java
new file mode 100644
index 00000000000..44f48883545
--- /dev/null
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/InventoryIncrementalJobPublicAPIImpl.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.api.impl;
+
+import
org.apache.shardingsphere.data.pipeline.api.InventoryIncrementalJobPublicAPI;
+import
org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.config.process.yaml.YamlPipelineProcessConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.config.process.yaml.YamlPipelineProcessConfigurationSwapper;
+import
org.apache.shardingsphere.data.pipeline.core.config.process.PipelineProcessConfigurationUtil;
+import
org.apache.shardingsphere.data.pipeline.core.exception.metadata.AlterNotExistProcessConfigurationException;
+import
org.apache.shardingsphere.data.pipeline.core.exception.metadata.CreateExistsProcessConfigurationException;
+import
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
+
+/**
+ * Inventory incremental job API implementation.
+ */
+public abstract class InventoryIncrementalJobPublicAPIImpl extends
AbstractPipelineJobAPIImpl implements InventoryIncrementalJobPublicAPI {
+
+ private static final YamlPipelineProcessConfigurationSwapper
PROCESS_CONFIG_SWAPPER = new YamlPipelineProcessConfigurationSwapper();
+
+ private final PipelineProcessConfigurationPersistService
processConfigPersistService = new PipelineProcessConfigurationPersistService();
+
+ @Override
+ public void createProcessConfiguration(final PipelineProcessConfiguration
processConfig) {
+ PipelineProcessConfiguration existingProcessConfig =
processConfigPersistService.load(getJobType());
+ ShardingSpherePreconditions.checkState(null == existingProcessConfig,
CreateExistsProcessConfigurationException::new);
+ processConfigPersistService.persist(getJobType(), processConfig);
+ }
+
+ @Override
+ public void alterProcessConfiguration(final PipelineProcessConfiguration
processConfig) {
+ // TODO check rateLimiter type match or not
+ YamlPipelineProcessConfiguration targetYamlProcessConfig =
getTargetYamlProcessConfiguration();
+
targetYamlProcessConfig.copyNonNullFields(PROCESS_CONFIG_SWAPPER.swapToYamlConfiguration(processConfig));
+ processConfigPersistService.persist(getJobType(),
PROCESS_CONFIG_SWAPPER.swapToObject(targetYamlProcessConfig));
+ }
+
+ private YamlPipelineProcessConfiguration
getTargetYamlProcessConfiguration() {
+ PipelineProcessConfiguration existingProcessConfig =
processConfigPersistService.load(getJobType());
+ ShardingSpherePreconditions.checkNotNull(existingProcessConfig,
AlterNotExistProcessConfigurationException::new);
+ return
PROCESS_CONFIG_SWAPPER.swapToYamlConfiguration(existingProcessConfig);
+ }
+
+ @Override
+ public void dropProcessConfiguration(final String confPath) {
+ String finalConfPath = confPath.trim();
+ PipelineProcessConfigurationUtil.verifyConfPath(confPath);
+ YamlPipelineProcessConfiguration targetYamlProcessConfig =
getTargetYamlProcessConfiguration();
+
PipelineProcessConfigurationUtil.setFieldsNullByConfPath(targetYamlProcessConfig,
finalConfPath);
+ processConfigPersistService.persist(getJobType(),
PROCESS_CONFIG_SWAPPER.swapToObject(targetYamlProcessConfig));
+ }
+
+ @Override
+ public PipelineProcessConfiguration showProcessConfiguration() {
+ PipelineProcessConfiguration result =
processConfigPersistService.load(getJobType());
+ result =
PipelineProcessConfigurationUtil.convertWithDefaultValue(result);
+ return result;
+ }
+}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
index 560379526fa..27ef7c8f539 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
@@ -59,8 +59,8 @@ import
org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgo
import org.apache.shardingsphere.data.pipeline.api.pojo.MigrationJobInfo;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobItemAPI;
-import
org.apache.shardingsphere.data.pipeline.core.api.impl.AbstractPipelineJobAPIImpl;
import
org.apache.shardingsphere.data.pipeline.core.api.impl.InventoryIncrementalJobItemAPIImpl;
+import
org.apache.shardingsphere.data.pipeline.core.api.impl.InventoryIncrementalJobPublicAPIImpl;
import
org.apache.shardingsphere.data.pipeline.core.api.impl.PipelineDataSourcePersistService;
import
org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCalculateAlgorithmChooser;
import
org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCalculateAlgorithmFactory;
@@ -116,7 +116,7 @@ import java.util.stream.IntStream;
* Migration job API impl.
*/
@Slf4j
-public final class MigrationJobAPIImpl extends AbstractPipelineJobAPIImpl
implements MigrationJobAPI {
+public final class MigrationJobAPIImpl extends
InventoryIncrementalJobPublicAPIImpl implements MigrationJobAPI {
private static final YamlRuleConfigurationSwapperEngine
RULE_CONFIG_SWAPPER_ENGINE = new YamlRuleConfigurationSwapperEngine();
@@ -127,7 +127,7 @@ public final class MigrationJobAPIImpl extends
AbstractPipelineJobAPIImpl implem
private final PipelineDataSourcePersistService dataSourcePersistService =
new PipelineDataSourcePersistService();
@Override
- protected JobType getJobType() {
+ public JobType getJobType() {
return JobType.MIGRATION;
}
@@ -356,7 +356,14 @@ public final class MigrationJobAPIImpl extends
AbstractPipelineJobAPIImpl implem
}
@Override
- protected void cleanTempTableOnRollback(final String jobId) throws
SQLException {
+ public void rollback(final String jobId) throws SQLException {
+ log.info("Rollback job {}", jobId);
+ stop(jobId);
+ cleanTempTableOnRollback(jobId);
+ dropJob(jobId);
+ }
+
+ private void cleanTempTableOnRollback(final String jobId) throws
SQLException {
MigrationJobConfiguration jobConfig = getJobConfiguration(jobId);
String targetTableName = jobConfig.getTargetTableName();
// TODO use jobConfig.targetSchemaName
@@ -373,6 +380,14 @@ public final class MigrationJobAPIImpl extends
AbstractPipelineJobAPIImpl implem
}
}
+ @Override
+ public void commit(final String jobId) throws SQLException {
+ checkModeConfig();
+ log.info("Commit job {}", jobId);
+ stop(jobId);
+ dropJob(jobId);
+ }
+
@Override
public void addMigrationSourceResources(final Map<String,
DataSourceProperties> dataSourcePropsMap) {
log.info("Add migration source resources {}",
dataSourcePropsMap.keySet());
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPI
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.InventoryIncrementalJobPublicAPI
similarity index 100%
rename from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPI
rename to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.InventoryIncrementalJobPublicAPI
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterInventoryIncrementalProcessConfigurationUpdater.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterInventoryIncrementalProcessConfigurationUpdater.java
index 8c8e405a871..cdbe647657c 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterInventoryIncrementalProcessConfigurationUpdater.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterInventoryIncrementalProcessConfigurationUpdater.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.proxy.backend.handler.distsql.ral.updatable;
-import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPI;
+import
org.apache.shardingsphere.data.pipeline.api.InventoryIncrementalJobPublicAPI;
import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
import
org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
import
org.apache.shardingsphere.distsql.parser.statement.ral.updatable.AlterInventoryIncrementalProcessConfigurationStatement;
@@ -31,7 +31,7 @@ public final class
AlterInventoryIncrementalProcessConfigurationUpdater implemen
@Override
public void executeUpdate(final String databaseName, final
AlterInventoryIncrementalProcessConfigurationStatement sqlStatement) {
- PipelineJobPublicAPI jobAPI =
PipelineJobPublicAPIFactory.getPipelineJobPublicAPI(sqlStatement.getJobTypeName());
+ InventoryIncrementalJobPublicAPI jobAPI =
PipelineJobPublicAPIFactory.getInventoryIncrementalJobPublicAPI(sqlStatement.getJobTypeName());
PipelineProcessConfiguration processConfig =
InventoryIncrementalProcessConfigurationSegmentConverter.convert(sqlStatement.getProcessConfigSegment());
jobAPI.alterProcessConfiguration(processConfig);
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/CreateInventoryIncrementalProcessConfigurationUpdater.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/CreateInventoryIncrementalProcessConfigurationUpdater.java
index 97454efb0ba..2fab9c8c6c3 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/CreateInventoryIncrementalProcessConfigurationUpdater.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/CreateInventoryIncrementalProcessConfigurationUpdater.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.proxy.backend.handler.distsql.ral.updatable;
-import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPI;
+import
org.apache.shardingsphere.data.pipeline.api.InventoryIncrementalJobPublicAPI;
import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
import
org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
import
org.apache.shardingsphere.distsql.parser.statement.ral.updatable.CreateInventoryIncrementalProcessConfigurationStatement;
@@ -31,7 +31,7 @@ public final class
CreateInventoryIncrementalProcessConfigurationUpdater impleme
@Override
public void executeUpdate(final String databaseName, final
CreateInventoryIncrementalProcessConfigurationStatement sqlStatement) {
- PipelineJobPublicAPI jobAPI =
PipelineJobPublicAPIFactory.getPipelineJobPublicAPI(sqlStatement.getJobTypeName());
+ InventoryIncrementalJobPublicAPI jobAPI =
PipelineJobPublicAPIFactory.getInventoryIncrementalJobPublicAPI(sqlStatement.getJobTypeName());
PipelineProcessConfiguration processConfig =
InventoryIncrementalProcessConfigurationSegmentConverter.convert(sqlStatement.getProcessConfigSegment());
jobAPI.createProcessConfiguration(processConfig);
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/DropPipelineProcessConfigurationUpdater.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/DropPipelineProcessConfigurationUpdater.java
index 6d67482be2a..d550b456268 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/DropPipelineProcessConfigurationUpdater.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/DropPipelineProcessConfigurationUpdater.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.proxy.backend.handler.distsql.ral.updatable;
-import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPI;
+import
org.apache.shardingsphere.data.pipeline.api.InventoryIncrementalJobPublicAPI;
import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
import
org.apache.shardingsphere.distsql.parser.statement.ral.updatable.DropPipelineProcessConfigurationStatement;
import org.apache.shardingsphere.infra.distsql.update.RALUpdater;
@@ -29,7 +29,7 @@ public final class DropPipelineProcessConfigurationUpdater
implements RALUpdater
@Override
public void executeUpdate(final String databaseName, final
DropPipelineProcessConfigurationStatement sqlStatement) {
- PipelineJobPublicAPI jobAPI =
PipelineJobPublicAPIFactory.getPipelineJobPublicAPI(sqlStatement.getJobTypeName());
+ InventoryIncrementalJobPublicAPI jobAPI =
PipelineJobPublicAPIFactory.getInventoryIncrementalJobPublicAPI(sqlStatement.getJobTypeName());
jobAPI.dropProcessConfiguration(sqlStatement.getConfPath());
}
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactoryTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactoryTest.java
index fb2eca869b8..912a9ebf2c2 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactoryTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactoryTest.java
@@ -31,11 +31,11 @@ import static org.hamcrest.MatcherAssert.assertThat;
public final class PipelineJobPublicAPIFactoryTest {
@Test
- public void assertGetPipelineJobPublicAPI() {
- Collection<Pair<JobType, Class<? extends PipelineJobPublicAPI>>>
paramResult = new LinkedList<>();
+ public void assertGetInventoryIncrementalJobPublicAPI() {
+ Collection<Pair<JobType, Class<? extends
InventoryIncrementalJobPublicAPI>>> paramResult = new LinkedList<>();
paramResult.add(Pair.of(JobType.MIGRATION, MigrationJobAPIImpl.class));
- for (Pair<JobType, Class<? extends PipelineJobPublicAPI>> each :
paramResult) {
-
assertThat(PipelineJobPublicAPIFactory.getPipelineJobPublicAPI(each.getKey().getTypeName()),
instanceOf(each.getValue()));
+ for (Pair<JobType, Class<? extends InventoryIncrementalJobPublicAPI>>
each : paramResult) {
+
assertThat(PipelineJobPublicAPIFactory.getInventoryIncrementalJobPublicAPI(each.getKey().getTypeName()),
instanceOf(each.getValue()));
}
}
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
index 41d85c69570..9770d830787 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
@@ -135,7 +135,7 @@ public final class MigrationJobAPIImplTest {
}
@Test
- public void assertCommit() {
+ public void assertCommit() throws SQLException {
Optional<String> jobId =
jobAPI.start(JobConfigurationBuilder.createJobConfiguration());
assertTrue(jobId.isPresent());
MigrationJobConfiguration jobConfig =
jobAPI.getJobConfiguration(jobId.get());