This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 16185ff33c2 Manually dispatch pipeline ElasticJob events when 
ShardingSphere-Proxy start (#29810)
16185ff33c2 is described below

commit 16185ff33c2c9456a220303afc3e4814d2dda572
Author: Xinze Guo <[email protected]>
AuthorDate: Tue Jan 23 19:31:21 2024 +0800

    Manually dispatch pipeline ElasticJob events when ShardingSphere-Proxy 
start (#29810)
    
    * Manually dispatch pipeline ElasticJob events
    
    * Add ex catch
    
    * Catch specified exceptions
    
    * ignored enable consistency job
    
    * Not thrown UnsupportedOperationException
---
 .../PipelineContextManagerLifecycleListener.java   | 35 ++++++++++++++++++++++
 .../consistencycheck/ConsistencyCheckJobType.java  |  2 +-
 2 files changed, 36 insertions(+), 1 deletion(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
index 0113fd036f9..a7c389d880b 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
@@ -21,15 +21,25 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
 import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
+import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
+import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNodeWatcher;
+import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobInfo;
 import org.apache.shardingsphere.elasticjob.infra.listener.ElasticJobListener;
+import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import org.apache.shardingsphere.elasticjob.infra.spi.ElasticJobServiceLoader;
+import 
org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobConfigurationAPI;
+import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.JobBriefInfo;
 import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
 import org.apache.shardingsphere.infra.database.core.DefaultDatabase;
 import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 import 
org.apache.shardingsphere.mode.manager.listener.ContextManagerLifecycleListener;
 
+import java.util.List;
+import java.util.stream.Collectors;
+
 /**
  * Pipeline context manager lifecycle listener.
  */
@@ -51,6 +61,31 @@ public final class PipelineContextManagerLifecycleListener 
implements ContextMan
         PipelineContextManager.putContext(contextKey, new 
PipelineContext(modeConfig, contextManager));
         PipelineMetaDataNodeWatcher.getInstance(contextKey);
         ElasticJobServiceLoader.registerTypedService(ElasticJobListener.class);
+        try {
+            dispatchEnablePipelineJobStartEvent(contextKey);
+            // CHECKSTYLE:OFF
+        } catch (final RuntimeException ex) {
+            // CHECKSTYLE:ON
+            log.error("Dispatch enable pipeline job start event failed", ex);
+        }
+    }
+    
+    private void dispatchEnablePipelineJobStartEvent(final PipelineContextKey 
contextKey) {
+        JobConfigurationAPI jobConfigAPI = 
PipelineAPIFactory.getJobConfigurationAPI(contextKey);
+        List<JobBriefInfo> allJobsBriefInfo = 
PipelineAPIFactory.getJobStatisticsAPI(contextKey).getAllJobsBriefInfo()
+                .stream().filter(each -> 
!each.getJobName().startsWith("_")).collect(Collectors.toList());
+        for (JobBriefInfo each : allJobsBriefInfo) {
+            PipelineJobType jobType = 
PipelineJobIdUtils.parseJobType(each.getJobName());
+            PipelineJobInfo jobInfo = jobType.getJobInfo(each.getJobName());
+            if (null == jobInfo || null == jobInfo.getJobMetaData()) {
+                continue;
+            }
+            if (!jobInfo.getJobMetaData().isActive()) {
+                return;
+            }
+            JobConfigurationPOJO jobConfig = 
jobConfigAPI.getJobConfiguration(each.getJobName());
+            jobConfigAPI.updateJobConfiguration(jobConfig);
+        }
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java
index 4278948f218..c0502d99f38 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java
@@ -60,7 +60,7 @@ public final class ConsistencyCheckJobType implements 
PipelineJobType {
     
     @Override
     public PipelineJobInfo getJobInfo(final String jobId) {
-        throw new UnsupportedOperationException();
+        return null;
     }
     
     @Override

Reply via email to