This is an automated email from the ASF dual-hosted git repository.
azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new fe5096b681a Improve pipeline job item error message persistance
(#28466)
fe5096b681a is described below
commit fe5096b681aa6be4d152090f2940164e1653602b
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Tue Sep 19 19:09:24 2023 +0800
Improve pipeline job item error message persistance (#28466)
* Re-impl cleanJobItemErrorMessage
* Update persistJobItemErrorMessage impl to updateJobItemErrorMessage
* Remove assertStartWithGetEstimatedRowsFailure
* Update cleanJobItemErrorMessage impl
---
.../repository/GovernanceRepositoryAPI.java | 16 ++++++++--------
.../repository/GovernanceRepositoryAPIImpl.java | 10 +++++-----
.../data/pipeline/core/job/AbstractPipelineJob.java | 2 +-
.../data/pipeline/core/job/service/PipelineJobAPI.java | 4 ++--
.../job/service/impl/AbstractPipelineJobAPIImpl.java | 7 ++++---
.../task/runner/InventoryIncrementalTasksRunner.java | 4 ++--
.../data/pipeline/cdc/core/job/CDCJob.java | 4 ++--
.../task/ConsistencyCheckTasksRunner.java | 2 +-
.../it/data/pipeline/core/task/InventoryTaskTest.java | 18 ------------------
9 files changed, 25 insertions(+), 42 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPI.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPI.java
index dff2369cb60..ccc68d2a091 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPI.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPI.java
@@ -170,6 +170,14 @@ public interface GovernanceRepositoryAPI {
*/
void persist(String key, String value);
+ /**
+ * Update data.
+ *
+ * @param key key of data
+ * @param value value of data
+ */
+ void update(String key, String value);
+
/**
* Get sharding items of job.
*
@@ -218,12 +226,4 @@ public interface GovernanceRepositoryAPI {
* @return error msg
*/
String getJobItemErrorMessage(String jobId, int shardingItem);
-
- /**
- * Clean job item error message.
- *
- * @param jobId job id
- * @param shardingItem sharding item
- */
- void cleanJobItemErrorMessage(String jobId, int shardingItem);
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPIImpl.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPIImpl.java
index 1e7b9f50b00..b2bd85fa747 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPIImpl.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPIImpl.java
@@ -155,6 +155,11 @@ public final class GovernanceRepositoryAPIImpl implements
GovernanceRepositoryAP
repository.persist(key, value);
}
+ @Override
+ public void update(final String key, final String value) {
+ repository.update(key, value);
+ }
+
@Override
public List<Integer> getShardingItems(final String jobId) {
List<String> result =
getChildrenKeys(PipelineMetaDataNode.getJobOffsetPath(jobId));
@@ -185,9 +190,4 @@ public final class GovernanceRepositoryAPIImpl implements
GovernanceRepositoryAP
public String getJobItemErrorMessage(final String jobId, final int
shardingItem) {
return
repository.getDirectly(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId,
shardingItem));
}
-
- @Override
- public void cleanJobItemErrorMessage(final String jobId, final int
shardingItem) {
-
repository.delete(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId,
shardingItem));
- }
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
index bb8a4a6e051..5fceae71fc5 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
@@ -107,7 +107,7 @@ public abstract class AbstractPipelineJob implements
PipelineJob {
protected void processFailed(final PipelineJobItemContext jobItemContext,
final Exception ex) {
String jobId = jobItemContext.getJobId();
log.error("job prepare failed, {}-{}", jobId,
jobItemContext.getShardingItem(), ex);
- jobAPI.persistJobItemErrorMessage(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), ex);
+ jobAPI.updateJobItemErrorMessage(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), ex);
jobAPI.stop(jobId);
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
index 32cae2477ca..1dc330457f0 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
@@ -162,13 +162,13 @@ public interface PipelineJobAPI extends TypedSPI {
String getJobItemErrorMessage(String jobId, int shardingItem);
/**
- * Persist job item error message.
+ * Update job item error message.
*
* @param jobId job id
* @param shardingItem sharding item
* @param error error
*/
- void persistJobItemErrorMessage(String jobId, int shardingItem, Object
error);
+ void updateJobItemErrorMessage(String jobId, int shardingItem, Object
error);
/**
* Clean job item error message.
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
index 55a20a90f2b..bb32906c9d0 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
@@ -171,17 +171,18 @@ public abstract class AbstractPipelineJobAPIImpl
implements PipelineJobAPI {
}
@Override
- public void persistJobItemErrorMessage(final String jobId, final int
shardingItem, final Object error) {
+ public void updateJobItemErrorMessage(final String jobId, final int
shardingItem, final Object error) {
String key = PipelineMetaDataNode.getJobItemErrorMessagePath(jobId,
shardingItem);
String value = "";
if (null != error) {
value = error instanceof Throwable ?
ExceptionUtils.getStackTrace((Throwable) error) : error.toString();
}
-
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persist(key,
value);
+
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).update(key,
value);
}
@Override
public void cleanJobItemErrorMessage(final String jobId, final int
shardingItem) {
-
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).cleanJobItemErrorMessage(jobId,
shardingItem);
+ String key = PipelineMetaDataNode.getJobItemErrorMessagePath(jobId,
shardingItem);
+
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persist(key,
"");
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
index 49dd4ed586e..755a049cf12 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
@@ -141,7 +141,7 @@ public class InventoryIncrementalTasksRunner implements
PipelineTasksRunner {
protected void inventoryFailureCallback(final Throwable throwable) {
log.error("onFailure, inventory task execute failed.", throwable);
String jobId = jobItemContext.getJobId();
- jobAPI.persistJobItemErrorMessage(jobId,
jobItemContext.getShardingItem(), throwable);
+ jobAPI.updateJobItemErrorMessage(jobId,
jobItemContext.getShardingItem(), throwable);
jobAPI.stop(jobId);
}
@@ -169,7 +169,7 @@ public class InventoryIncrementalTasksRunner implements
PipelineTasksRunner {
public void onFailure(final Throwable throwable) {
log.error("onFailure, incremental task execute failed.",
throwable);
String jobId = jobItemContext.getJobId();
- jobAPI.persistJobItemErrorMessage(jobId,
jobItemContext.getShardingItem(), throwable);
+ jobAPI.updateJobItemErrorMessage(jobId,
jobItemContext.getShardingItem(), throwable);
jobAPI.stop(jobId);
}
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
index a4f231bc8c5..697fc3b9787 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
@@ -124,7 +124,7 @@ public final class CDCJob extends AbstractPipelineJob
implements SimpleJob {
protected void processFailed(final PipelineJobItemContext jobItemContext,
final Exception ex) {
String jobId = jobItemContext.getJobId();
log.error("job prepare failed, {}-{}", jobId,
jobItemContext.getShardingItem(), ex);
- jobAPI.persistJobItemErrorMessage(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), ex);
+ jobAPI.updateJobItemErrorMessage(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), ex);
PipelineJobCenter.stop(jobId);
jobAPI.updateJobConfigurationDisabled(jobId, true);
}
@@ -197,7 +197,7 @@ public final class CDCJob extends AbstractPipelineJob
implements SimpleJob {
public void onFailure(final Throwable throwable) {
log.error("onFailure, {} task execute failed.", identifier,
throwable);
String jobId = jobItemContext.getJobId();
- jobAPI.persistJobItemErrorMessage(jobId,
jobItemContext.getShardingItem(), throwable);
+ jobAPI.updateJobItemErrorMessage(jobId,
jobItemContext.getShardingItem(), throwable);
PipelineJobCenter.stop(jobId);
jobAPI.updateJobConfigurationDisabled(jobId, true);
}
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
index bfb36635f80..4a5a3fe622c 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
@@ -136,7 +136,7 @@ public final class ConsistencyCheckTasksRunner implements
PipelineTasksRunner {
return;
}
log.info("onFailure, check job id: {}, parent job id: {}",
checkJobId, parentJobId, throwable);
- checkJobAPI.persistJobItemErrorMessage(checkJobId, 0, throwable);
+ checkJobAPI.updateJobItemErrorMessage(checkJobId, 0, throwable);
checkJobAPI.stop(checkJobId);
}
}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java
index 54fa3ddb6c6..f56ea61452b 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java
@@ -19,16 +19,12 @@ package
org.apache.shardingsphere.test.it.data.pipeline.core.task;
import
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
-import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.Dumper;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
-import
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
import
org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.type.IntegerPrimaryKeyPosition;
-import
org.apache.shardingsphere.data.pipeline.common.metadata.loader.StandardPipelineTableMetaDataLoader;
-import org.apache.shardingsphere.data.pipeline.core.dumper.InventoryDumper;
import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
@@ -52,7 +48,6 @@ import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
class InventoryTaskTest {
@@ -76,19 +71,6 @@ class InventoryTaskTest {
taskConfig =
PipelineContextUtils.mockMigrationJobItemContext(JobConfigurationBuilder.createJobConfiguration()).getTaskConfig();
}
- @Test
- void assertStartWithGetEstimatedRowsFailure() {
- InventoryDumperConfiguration inventoryDumperConfig =
createInventoryDumperConfiguration("t_non_exist", "t_non_exist");
- AtomicReference<IngestPosition> position = new
AtomicReference<>(inventoryDumperConfig.getPosition());
- PipelineChannel channel =
PipelineTaskUtils.createInventoryChannel(PipelineContextUtils.getPipelineChannelCreator(),
100, position);
- PipelineDataSourceWrapper dataSource =
DATA_SOURCE_MANAGER.getDataSource(inventoryDumperConfig.getDataSourceConfig());
- PipelineTableMetaDataLoader metaDataLoader = new
StandardPipelineTableMetaDataLoader(dataSource);
- InventoryTask inventoryTask = new
InventoryTask(PipelineTaskUtils.generateInventoryTaskId(inventoryDumperConfig),
- PipelineContextUtils.getExecuteEngine(),
PipelineContextUtils.getExecuteEngine(),
- new InventoryDumper(inventoryDumperConfig, channel,
dataSource, metaDataLoader), mock(Importer.class), position);
- assertThrows(ExecutionException.class, () ->
CompletableFuture.allOf(inventoryTask.start().toArray(new
CompletableFuture[0])).get(10L, TimeUnit.SECONDS));
- }
-
@Test
void assertGetProgress() throws SQLException, ExecutionException,
InterruptedException, TimeoutException {
initTableData(taskConfig.getDumperConfig());