This is an automated email from the ASF dual-hosted git repository.
zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 61ee704c19e Refactor PipelineJobCenter (#29331)
61ee704c19e is described below
commit 61ee704c19e27ae95cb720b630ace79c8082c5e4
Author: Liang Zhang <[email protected]>
AuthorDate: Fri Dec 8 23:24:24 2023 +0800
Refactor PipelineJobCenter (#29331)
* Refactor PipelineJobCenter
* Refactor PipelineJobCenter
* Refactor PipelineJobCenter
* Refactor PipelineJobCenter
* Refactor PipelineJobCenter
---
.../data/pipeline/core/job/PipelineJobCenter.java | 64 ++++++++++------------
.../persist/PipelineJobProgressPersistService.java | 2 +-
.../AbstractJobConfigurationChangedProcessor.java | 4 +-
.../pipeline/core/job/PipelineJobCenterTest.java | 20 +++----
.../data/pipeline/cdc/api/CDCJobAPI.java | 2 +-
.../pipeline/cdc/handler/CDCBackendHandler.java | 4 +-
6 files changed, 44 insertions(+), 52 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobCenter.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobCenter.java
index 9dc82da6881..2fdffdaaa06 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobCenter.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobCenter.java
@@ -19,7 +19,6 @@ package org.apache.shardingsphere.data.pipeline.core.job;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
@@ -33,79 +32,72 @@ import java.util.concurrent.ConcurrentHashMap;
* Pipeline job center.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
-@Slf4j
public final class PipelineJobCenter {
- private static final Map<String, PipelineJob> JOB_MAP = new
ConcurrentHashMap<>();
+ private static final Map<String, PipelineJob> JOBS = new
ConcurrentHashMap<>();
/**
- * Add job.
+ * Add pipeline job.
*
- * @param jobId job id
- * @param job job
+ * @param jobId pipeline job id
+ * @param job pipeline job
*/
- public static void addJob(final String jobId, final PipelineJob job) {
- JOB_MAP.put(jobId, job);
+ public static void add(final String jobId, final PipelineJob job) {
+ JOBS.put(jobId, job);
}
/**
- * Is job existing.
+ * Judge whether pipeline job existing.
*
- * @param jobId job id
- * @return true when job exists, else false
+ * @param jobId pipeline job id
+ * @return pipeline job exists or not
*/
- public static boolean isJobExisting(final String jobId) {
- return JOB_MAP.containsKey(jobId);
+ public static boolean isExisting(final String jobId) {
+ return JOBS.containsKey(jobId);
}
/**
- * Get job.
+ * Get pipeline job.
*
- * @param jobId job id
- * @return job
+ * @param jobId pipeline job id
+ * @return pipeline job
*/
- public static PipelineJob getJob(final String jobId) {
- return JOB_MAP.get(jobId);
+ public static PipelineJob get(final String jobId) {
+ return JOBS.get(jobId);
}
/**
- * Stop job.
+ * Stop pipeline job.
*
- * @param jobId job id
+ * @param jobId pipeline job id
*/
public static void stop(final String jobId) {
- PipelineJob job = JOB_MAP.get(jobId);
+ PipelineJob job = JOBS.get(jobId);
if (null == job) {
return;
}
job.stop();
- JOB_MAP.remove(jobId);
+ JOBS.remove(jobId);
}
/**
- * Get job item context.
+ * Get pipeline job item context.
*
- * @param jobId job id
+ * @param jobId pipeline job id
* @param shardingItem sharding item
- * @return job item context
+ * @return pipeline job item context
*/
- public static Optional<PipelineJobItemContext> getJobItemContext(final
String jobId, final int shardingItem) {
- PipelineJob job = JOB_MAP.get(jobId);
- if (null == job) {
- return Optional.empty();
- }
- Optional<PipelineTasksRunner> tasksRunner =
job.getTasksRunner(shardingItem);
- return tasksRunner.map(PipelineTasksRunner::getJobItemContext);
+ public static Optional<PipelineJobItemContext> getItemContext(final String
jobId, final int shardingItem) {
+ return JOBS.containsKey(jobId) ?
JOBS.get(jobId).getTasksRunner(shardingItem).map(PipelineTasksRunner::getJobItemContext)
: Optional.empty();
}
/**
* Get sharding items.
*
- * @param jobId job id
- * @return sharding items.
+ * @param jobId pipeline job id
+ * @return sharding items
*/
public static Collection<Integer> getShardingItems(final String jobId) {
- PipelineJob pipelineJob = JOB_MAP.get(jobId);
- return null == pipelineJob ? Collections.emptyList() :
pipelineJob.getShardingItems();
+ return JOBS.containsKey(jobId) ? JOBS.get(jobId).getShardingItems() :
Collections.emptyList();
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
index a2a1b9db263..64268bc27d9 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
@@ -121,7 +121,7 @@ public final class PipelineJobProgressPersistService {
&& !persistContext.getHasNewEvents().get()) {
return;
}
- Optional<PipelineJobItemContext> jobItemContext =
PipelineJobCenter.getJobItemContext(jobId, shardingItem);
+ Optional<PipelineJobItemContext> jobItemContext =
PipelineJobCenter.getItemContext(jobId, shardingItem);
if (!jobItemContext.isPresent()) {
return;
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/impl/AbstractJobConfigurationChangedProcessor.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/impl/AbstractJobConfigurationChangedProcessor.java
index ccb92aa53f5..d7d5882c6cf 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/impl/AbstractJobConfigurationChangedProcessor.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/impl/AbstractJobConfigurationChangedProcessor.java
@@ -57,7 +57,7 @@ public abstract class
AbstractJobConfigurationChangedProcessor implements JobCon
switch (eventType) {
case ADDED:
case UPDATED:
- if (PipelineJobCenter.isJobExisting(jobId)) {
+ if (PipelineJobCenter.isExisting(jobId)) {
log.info("{} added to executing jobs failed since it
already exists", jobId);
} else {
executeJob(jobConfig);
@@ -81,7 +81,7 @@ public abstract class
AbstractJobConfigurationChangedProcessor implements JobCon
protected void executeJob(final JobConfiguration jobConfig) {
String jobId = jobConfig.getJobName();
AbstractPipelineJob job = buildPipelineJob(jobId);
- PipelineJobCenter.addJob(jobId, job);
+ PipelineJobCenter.add(jobId, job);
OneOffJobBootstrap oneOffJobBootstrap = new
OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(PipelineJobIdUtils.parseContextKey(jobId)),
job, jobConfig);
job.setJobBootstrap(oneOffJobBootstrap);
oneOffJobBootstrap.execute();
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobCenterTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobCenterTest.java
index ee2b40be51e..01957a103af 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobCenterTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobCenterTest.java
@@ -41,12 +41,12 @@ class PipelineJobCenterTest {
@Test
void assertPipelineJobCenter() {
PipelineJob pipelineJob = mock(PipelineJob.class);
- PipelineJobCenter.addJob("Job1", pipelineJob);
- assertTrue(PipelineJobCenter.isJobExisting("Job1"));
- assertFalse(PipelineJobCenter.isJobExisting("Job2"));
- assertNotNull(PipelineJobCenter.getJob("Job1"));
- assertEquals(pipelineJob, PipelineJobCenter.getJob("Job1"));
- assertNull(PipelineJobCenter.getJob("Job2"));
+ PipelineJobCenter.add("Job1", pipelineJob);
+ assertTrue(PipelineJobCenter.isExisting("Job1"));
+ assertFalse(PipelineJobCenter.isExisting("Job2"));
+ assertNotNull(PipelineJobCenter.get("Job1"));
+ assertEquals(pipelineJob, PipelineJobCenter.get("Job1"));
+ assertNull(PipelineJobCenter.get("Job2"));
PipelineJobCenter.stop("Job1");
}
@@ -57,11 +57,11 @@ class PipelineJobCenterTest {
PipelineJobItemContext pipelineJobItemContext =
mock(PipelineJobItemContext.class);
when(pipelineJob.getTasksRunner(anyInt())).thenReturn(Optional.of(pipelineTasksRunner));
when(pipelineTasksRunner.getJobItemContext()).thenReturn(pipelineJobItemContext);
- PipelineJobCenter.addJob("Job1", pipelineJob);
- Optional<PipelineJobItemContext> result =
PipelineJobCenter.getJobItemContext("Job1", 1);
+ PipelineJobCenter.add("Job1", pipelineJob);
+ Optional<PipelineJobItemContext> result =
PipelineJobCenter.getItemContext("Job1", 1);
Optional<PipelineJobItemContext> optionalPipelineJobItemContext =
Optional.ofNullable(pipelineJobItemContext);
assertTrue(result.isPresent());
- assertEquals(Optional.empty(),
PipelineJobCenter.getJobItemContext("Job2", 1));
+ assertEquals(Optional.empty(),
PipelineJobCenter.getItemContext("Job2", 1));
assertEquals(optionalPipelineJobItemContext, result);
PipelineJobCenter.stop("Job1");
}
@@ -69,7 +69,7 @@ class PipelineJobCenterTest {
@Test
void assertGetShardingItems() {
PipelineJob pipelineJob = mock(PipelineJob.class);
- PipelineJobCenter.addJob("Job1", pipelineJob);
+ PipelineJobCenter.add("Job1", pipelineJob);
when(pipelineJob.getShardingItems()).thenReturn(Arrays.asList(1, 2,
3));
Collection<Integer> shardingItems = pipelineJob.getShardingItems();
Assertions.assertFalse(shardingItems.isEmpty());
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
index 1a57c52c09c..4f26e82eca2 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
@@ -213,7 +213,7 @@ public final class CDCJobAPI implements TransmissionJobAPI {
*/
public void start(final String jobId, final PipelineSink sink) {
CDCJob job = new CDCJob(jobId, sink);
- PipelineJobCenter.addJob(jobId, job);
+ PipelineJobCenter.add(jobId, job);
enable(jobId);
JobConfigurationPOJO jobConfigPOJO =
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
OneOffJobBootstrap oneOffJobBootstrap = new
OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(PipelineJobIdUtils.parseContextKey(jobId)),
job, jobConfigPOJO.toJobConfiguration());
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
index e518026af1c..dc372106477 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
@@ -134,7 +134,7 @@ public final class CDCBackendHandler {
public void startStreaming(final String jobId, final CDCConnectionContext
connectionContext, final Channel channel) {
CDCJobConfiguration cdcJobConfig =
jobConfigManager.getJobConfiguration(jobId);
ShardingSpherePreconditions.checkNotNull(cdcJobConfig, () -> new
PipelineJobNotFoundException(jobId));
- if (PipelineJobCenter.isJobExisting(jobId)) {
+ if (PipelineJobCenter.isExisting(jobId)) {
PipelineJobCenter.stop(jobId);
}
ShardingSphereDatabase database =
PipelineContextManager.getProxyContext().getContextManager().getMetaDataContexts().getMetaData().getDatabase(cdcJobConfig.getDatabaseName());
@@ -153,7 +153,7 @@ public final class CDCBackendHandler {
log.warn("job id is null or empty, ignored");
return;
}
- CDCJob job = (CDCJob) PipelineJobCenter.getJob(jobId);
+ CDCJob job = (CDCJob) PipelineJobCenter.get(jobId);
if (null == job) {
return;
}