This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 16185ff33c2 Manually dispatch pipeline ElasticJob events when
ShardingSphere-Proxy start (#29810)
16185ff33c2 is described below
commit 16185ff33c2c9456a220303afc3e4814d2dda572
Author: Xinze Guo <[email protected]>
AuthorDate: Tue Jan 23 19:31:21 2024 +0800
Manually dispatch pipeline ElasticJob events when ShardingSphere-Proxy
start (#29810)
* Manually dispatch pipeline ElasticJob events
* Add ex catch
* Catch specified exceptions
* ignored enable consistency job
* Not thrown UnsupportedOperationException
---
.../PipelineContextManagerLifecycleListener.java | 35 ++++++++++++++++++++++
.../consistencycheck/ConsistencyCheckJobType.java | 2 +-
2 files changed, 36 insertions(+), 1 deletion(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
index 0113fd036f9..a7c389d880b 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
@@ -21,15 +21,25 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
import
org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
+import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
+import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
import
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNodeWatcher;
+import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobInfo;
import org.apache.shardingsphere.elasticjob.infra.listener.ElasticJobListener;
+import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.elasticjob.infra.spi.ElasticJobServiceLoader;
+import
org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobConfigurationAPI;
+import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.JobBriefInfo;
import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
import org.apache.shardingsphere.infra.database.core.DefaultDatabase;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import org.apache.shardingsphere.mode.manager.ContextManager;
import
org.apache.shardingsphere.mode.manager.listener.ContextManagerLifecycleListener;
+import java.util.List;
+import java.util.stream.Collectors;
+
/**
* Pipeline context manager lifecycle listener.
*/
@@ -51,6 +61,31 @@ public final class PipelineContextManagerLifecycleListener
implements ContextMan
PipelineContextManager.putContext(contextKey, new
PipelineContext(modeConfig, contextManager));
PipelineMetaDataNodeWatcher.getInstance(contextKey);
ElasticJobServiceLoader.registerTypedService(ElasticJobListener.class);
+ try {
+ dispatchEnablePipelineJobStartEvent(contextKey);
+ // CHECKSTYLE:OFF
+ } catch (final RuntimeException ex) {
+ // CHECKSTYLE:ON
+ log.error("Dispatch enable pipeline job start event failed", ex);
+ }
+ }
+
+ private void dispatchEnablePipelineJobStartEvent(final PipelineContextKey
contextKey) {
+ JobConfigurationAPI jobConfigAPI =
PipelineAPIFactory.getJobConfigurationAPI(contextKey);
+ List<JobBriefInfo> allJobsBriefInfo =
PipelineAPIFactory.getJobStatisticsAPI(contextKey).getAllJobsBriefInfo()
+ .stream().filter(each ->
!each.getJobName().startsWith("_")).collect(Collectors.toList());
+ for (JobBriefInfo each : allJobsBriefInfo) {
+ PipelineJobType jobType =
PipelineJobIdUtils.parseJobType(each.getJobName());
+ PipelineJobInfo jobInfo = jobType.getJobInfo(each.getJobName());
+ if (null == jobInfo || null == jobInfo.getJobMetaData()) {
+ continue;
+ }
+ if (!jobInfo.getJobMetaData().isActive()) {
+ return;
+ }
+ JobConfigurationPOJO jobConfig =
jobConfigAPI.getJobConfiguration(each.getJobName());
+ jobConfigAPI.updateJobConfiguration(jobConfig);
+ }
}
@Override
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java
index 4278948f218..c0502d99f38 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java
@@ -60,7 +60,7 @@ public final class ConsistencyCheckJobType implements
PipelineJobType {
@Override
public PipelineJobInfo getJobInfo(final String jobId) {
- throw new UnsupportedOperationException();
+ return null;
}
@Override