This is an automated email from the ASF dual-hosted git repository.

menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a0beb4  Add ModeScheduleContext and integrate with ContextManager 
(#14001)
8a0beb4 is described below

commit 8a0beb4198839785045a4e0b4cb323ef982f1d42
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Wed Dec 8 20:19:34 2021 +0800

    Add ModeScheduleContext and integrate with ContextManager (#14001)
    
    * Add ModeScheduleContext and integrate with ContextManager
    
    * Fix typo
    
    * Fix unit test, modeConfig may be null
    
    * Fix unit test, mode type could be any value
---
 .../shardingsphere-schedule-core/pom.xml           |  29 +++++
 .../schedule/core/api/JobParameter.java            |  24 ++++
 .../schedule/core/api/ModeScheduleContext.java     | 134 +++++++++++++++++++++
 .../shardingsphere-mode-core/pom.xml               |   5 +
 .../mode/manager/ContextManager.java               |   7 +-
 .../mode/manager/ContextManagerTest.java           |   6 +-
 .../cluster/ClusterContextManagerBuilder.java      |   3 +-
 .../memory/MemoryContextManagerBuilder.java        |   3 +-
 .../StandaloneContextManagerBuilder.java           |   3 +-
 9 files changed, 209 insertions(+), 5 deletions(-)

diff --git 
a/shardingsphere-kernel/shardingsphere-schedule/shardingsphere-schedule-core/pom.xml
 
b/shardingsphere-kernel/shardingsphere-schedule/shardingsphere-schedule-core/pom.xml
index d77cf94..065e185 100644
--- 
a/shardingsphere-kernel/shardingsphere-schedule/shardingsphere-schedule-core/pom.xml
+++ 
b/shardingsphere-kernel/shardingsphere-schedule/shardingsphere-schedule-core/pom.xml
@@ -28,4 +28,33 @@
     <artifactId>shardingsphere-schedule-core</artifactId>
     <name>${project.artifactId}</name>
     
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.shardingsphere</groupId>
+            <artifactId>shardingsphere-infra-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.shardingsphere.elasticjob</groupId>
+            <artifactId>elasticjob-lite-core</artifactId>
+            <version>${elasticjob.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.zaxxer</groupId>
+                    <artifactId>HikariCP-java7</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.shardingsphere.elasticjob</groupId>
+            <artifactId>elasticjob-lite-lifecycle</artifactId>
+            <version>${elasticjob.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.zaxxer</groupId>
+                    <artifactId>HikariCP-java7</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+    </dependencies>
 </project>
diff --git 
a/shardingsphere-kernel/shardingsphere-schedule/shardingsphere-schedule-core/src/main/java/org/apache/shardingsphere/schedule/core/api/JobParameter.java
 
b/shardingsphere-kernel/shardingsphere-schedule/shardingsphere-schedule-core/src/main/java/org/apache/shardingsphere/schedule/core/api/JobParameter.java
new file mode 100644
index 0000000..36931eb
--- /dev/null
+++ 
b/shardingsphere-kernel/shardingsphere-schedule/shardingsphere-schedule-core/src/main/java/org/apache/shardingsphere/schedule/core/api/JobParameter.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.schedule.core.api;
+
+/**
+ * Schedule job parameter.
+ */
+public final class JobParameter {
+}
diff --git 
a/shardingsphere-kernel/shardingsphere-schedule/shardingsphere-schedule-core/src/main/java/org/apache/shardingsphere/schedule/core/api/ModeScheduleContext.java
 
b/shardingsphere-kernel/shardingsphere-schedule/shardingsphere-schedule-core/src/main/java/org/apache/shardingsphere/schedule/core/api/ModeScheduleContext.java
new file mode 100644
index 0000000..2c56845
--- /dev/null
+++ 
b/shardingsphere-kernel/shardingsphere-schedule/shardingsphere-schedule-core/src/main/java/org/apache/shardingsphere/schedule/core/api/ModeScheduleContext.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.schedule.core.api;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
+import org.apache.shardingsphere.elasticjob.api.ShardingContext;
+import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
+import 
org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;
+import 
org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobConfigurationAPI;
+import 
org.apache.shardingsphere.elasticjob.lite.lifecycle.internal.settings.JobConfigurationAPIImpl;
+import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
+import 
org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
+import 
org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;
+import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
+import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
+
+import java.util.Properties;
+import java.util.function.Consumer;
+
+/**
+ * Mode schedule context, used for proxy and jdbc.
+ */
+@Slf4j
+public final class ModeScheduleContext {
+    
+    private final CoordinatorRegistryCenter registryCenter;
+    
+    private final JobConfigurationAPI jobConfigAPI;
+    
+    public ModeScheduleContext(final ModeConfiguration modeConfig) {
+        CoordinatorRegistryCenter registryCenter = 
initRegistryCenter(modeConfig);
+        this.registryCenter = registryCenter;
+        this.jobConfigAPI = null != registryCenter ? new 
JobConfigurationAPIImpl(registryCenter) : null;
+    }
+    
+    private CoordinatorRegistryCenter initRegistryCenter(final 
ModeConfiguration modeConfig) {
+        if (null == modeConfig) {
+            return null;
+        }
+        String modeType = modeConfig.getType();
+        switch (modeType) {
+            // TODO do not hard-code mode type, refactor later
+            case "Cluster":
+                return initRegistryCenterForClusterMode(modeConfig);
+            case "Standalone":
+                return null;
+            case "Memory":
+                return null;
+            default:
+                // TODO ModeConfiguration.type is not limited, it could be any 
value
+                log.warn("Unknown mode type '{}'", modeType);
+                return null;
+        }
+    }
+    
+    private CoordinatorRegistryCenter initRegistryCenterForClusterMode(final 
ModeConfiguration modeConfig) {
+        String clusterType = modeConfig.getRepository().getType();
+        Properties props = modeConfig.getRepository().getProps();
+        // TODO do not hard-code cluster type and property key, refactor later
+        if ("ZooKeeper".equals(clusterType)) {
+            ZookeeperConfiguration zkConfig = new 
ZookeeperConfiguration(props.getProperty("server-lists"), 
props.getProperty("namespace"));
+            CoordinatorRegistryCenter result = new 
ZookeeperRegistryCenter(zkConfig);
+            result.init();
+            return result;
+        }
+        log.warn("Unsupported clusterType '{}'", clusterType);
+        return null;
+    }
+    
+    /**
+     * Schedule with cron.
+     *
+     * @param jobName job name
+     * @param job job implementation
+     * @param cron cron expression
+     */
+    public void scheduleWithCron(final String jobName, final 
Consumer<JobParameter> job, final String cron) {
+        if (null == registryCenter) {
+            log.warn("registryCenter is null, ignore, jobName={}, cron={}", 
job, cron);
+            return;
+        }
+        JobConfiguration jobConfig = JobConfiguration.newBuilder(jobName, 
1).cron(cron).build();
+        ScheduleJobBootstrap bootstrap = new 
ScheduleJobBootstrap(registryCenter, new ConsumerSimpleJob(job), jobConfig);
+        bootstrap.schedule();
+    }
+    
+    /**
+     * Update job cron.
+     *
+     * @param jobName job name
+     * @param cron cron expression
+     */
+    public void updateJobCron(final String jobName, final String cron) {
+        if (null == jobConfigAPI) {
+            log.warn("jobConfigAPI is null, ignore, jobName={}, cron={}", 
jobName, cron);
+            return;
+        }
+        JobConfigurationPOJO jobConfig = new JobConfigurationPOJO();
+        jobConfig.setJobName(jobName);
+        jobConfig.setCron(cron);
+        jobConfig.setShardingTotalCount(1);
+        jobConfigAPI.updateJobConfiguration(jobConfig);
+    }
+    
+    private static final class ConsumerSimpleJob implements SimpleJob {
+        
+        private final Consumer<JobParameter> job;
+        
+        ConsumerSimpleJob(final Consumer<JobParameter> job) {
+            this.job = job;
+        }
+        
+        @Override
+        public void execute(final ShardingContext shardingContext) {
+            job.accept(new JobParameter());
+        }
+    }
+}
diff --git a/shardingsphere-mode/shardingsphere-mode-core/pom.xml 
b/shardingsphere-mode/shardingsphere-mode-core/pom.xml
index a0c7aa7..304925e 100644
--- a/shardingsphere-mode/shardingsphere-mode-core/pom.xml
+++ b/shardingsphere-mode/shardingsphere-mode-core/pom.xml
@@ -34,6 +34,11 @@
             <artifactId>shardingsphere-infra-context</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.shardingsphere</groupId>
+            <artifactId>shardingsphere-schedule-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         
         <dependency>
             <groupId>org.apache.shardingsphere</groupId>
diff --git 
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
 
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
index a2cb0a5..0553def 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
@@ -42,6 +42,7 @@ import 
org.apache.shardingsphere.infra.rule.identifier.type.MutableDataNodeRule;
 import org.apache.shardingsphere.infra.state.StateContext;
 import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
 import org.apache.shardingsphere.mode.metadata.MetaDataContextsBuilder;
+import org.apache.shardingsphere.schedule.core.api.ModeScheduleContext;
 import 
org.apache.shardingsphere.transaction.ShardingSphereTransactionManagerEngine;
 import org.apache.shardingsphere.transaction.context.TransactionContexts;
 import org.apache.shardingsphere.transaction.rule.TransactionRule;
@@ -70,6 +71,8 @@ public final class ContextManager implements AutoCloseable {
     
     private volatile TransactionContexts transactionContexts = new 
TransactionContexts();
     
+    private volatile ModeScheduleContext modeScheduleContext;
+    
     private final StateContext stateContext = new StateContext();
     
     /**
@@ -77,10 +80,12 @@ public final class ContextManager implements AutoCloseable {
      *
      * @param metaDataContexts meta data contexts
      * @param transactionContexts transaction contexts
+     * @param modeScheduleContext mode schedule context
      */
-    public void init(final MetaDataContexts metaDataContexts, final 
TransactionContexts transactionContexts) {
+    public void init(final MetaDataContexts metaDataContexts, final 
TransactionContexts transactionContexts, final ModeScheduleContext 
modeScheduleContext) {
         this.metaDataContexts = metaDataContexts;
         this.transactionContexts = transactionContexts;
+        this.modeScheduleContext = modeScheduleContext;
     }
     
     /**
diff --git 
a/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/ContextManagerTest.java
 
b/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/ContextManagerTest.java
index ebd7b36..354e892 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/ContextManagerTest.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/ContextManagerTest.java
@@ -27,6 +27,7 @@ import 
org.apache.shardingsphere.infra.metadata.resource.ShardingSphereResource;
 import 
org.apache.shardingsphere.infra.metadata.rule.ShardingSphereRuleMetaData;
 import org.apache.shardingsphere.infra.metadata.schema.ShardingSphereSchema;
 import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
+import org.apache.shardingsphere.schedule.core.api.ModeScheduleContext;
 import org.apache.shardingsphere.transaction.context.TransactionContexts;
 import org.junit.Before;
 import org.junit.Test;
@@ -57,13 +58,16 @@ public final class ContextManagerTest {
 
     @Mock
     private TransactionContexts transactionContexts;
+    
+    @Mock
+    private ModeScheduleContext modeScheduleContext;
 
     private ContextManager contextManager;
 
     @Before
     public void setUp() throws SQLException {
         contextManager = new ContextManager();
-        contextManager.init(metaDataContexts, transactionContexts);
+        contextManager.init(metaDataContexts, transactionContexts, 
modeScheduleContext);
         dataSourceMap = new HashMap<>(2, 1);
         DataSource primaryDataSource = mock(DataSource.class);
         DataSource replicaDataSource = mock(DataSource.class);
diff --git 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
index 7277cb6..ad4c41a 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
@@ -40,6 +40,7 @@ import 
org.apache.shardingsphere.mode.metadata.MetaDataContextsBuilder;
 import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
+import org.apache.shardingsphere.schedule.core.api.ModeScheduleContext;
 import org.apache.shardingsphere.spi.ShardingSphereServiceLoader;
 import org.apache.shardingsphere.spi.typed.TypedSPIRegistry;
 import 
org.apache.shardingsphere.transaction.ShardingSphereTransactionManagerEngine;
@@ -83,7 +84,7 @@ public final class ClusterContextManagerBuilder implements 
ContextManagerBuilder
                                 final Properties props, final boolean 
isOverwrite, final Integer port) throws SQLException {
         beforeBuildContextManager(modeConfig, dataSourcesMap, 
schemaRuleConfigs, globalRuleConfigs, props, isOverwrite, port);
         contextManager = new ContextManager();
-        contextManager.init(metaDataContexts, transactionContexts);
+        contextManager.init(metaDataContexts, transactionContexts, new 
ModeScheduleContext(modeConfig));
         afterBuildContextManager();
         return contextManager;
     }
diff --git 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-memory-mode/shardingsphere-memory-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/memory/MemoryContextManagerBuilder.java
 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-memory-mode/shardingsphere-memory-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/memory/MemoryContextManagerBuilder.java
index f6eed3c..dab5db8 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-memory-mode/shardingsphere-memory-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/memory/MemoryContextManagerBuilder.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-memory-mode/shardingsphere-memory-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/memory/MemoryContextManagerBuilder.java
@@ -28,6 +28,7 @@ import org.apache.shardingsphere.mode.manager.ContextManager;
 import org.apache.shardingsphere.mode.manager.ContextManagerBuilder;
 import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
 import org.apache.shardingsphere.mode.metadata.MetaDataContextsBuilder;
+import org.apache.shardingsphere.schedule.core.api.ModeScheduleContext;
 import 
org.apache.shardingsphere.transaction.ShardingSphereTransactionManagerEngine;
 import org.apache.shardingsphere.transaction.context.TransactionContexts;
 import org.apache.shardingsphere.transaction.rule.TransactionRule;
@@ -55,7 +56,7 @@ public final class MemoryContextManagerBuilder implements 
ContextManagerBuilder
         MetaDataContexts metaDataContexts = new 
MetaDataContextsBuilder(dataSourcesMap, schemaRuleConfigs, globalRuleConfigs, 
schemas, rules, props).build(null);
         TransactionContexts transactionContexts = 
createTransactionContexts(metaDataContexts);
         ContextManager result = new ContextManager();
-        result.init(metaDataContexts, transactionContexts);
+        result.init(metaDataContexts, transactionContexts, new 
ModeScheduleContext(modeConfig));
         return result;
     }
     
diff --git 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-standalone-mode/shardingsphere-standalone-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/StandaloneContextManagerBuilder.java
 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-standalone-mode/shardingsphere-standalone-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/StandaloneContextManagerBuilder.java
index a577c5b..4fc9f1b 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-standalone-mode/shardingsphere-standalone-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/StandaloneContextManagerBuilder.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-standalone-mode/shardingsphere-standalone-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/StandaloneContextManagerBuilder.java
@@ -34,6 +34,7 @@ import 
org.apache.shardingsphere.mode.metadata.MetaDataContextsBuilder;
 import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
 import 
org.apache.shardingsphere.mode.repository.standalone.StandalonePersistRepository;
 import 
org.apache.shardingsphere.mode.repository.standalone.StandalonePersistRepositoryConfiguration;
+import org.apache.shardingsphere.schedule.core.api.ModeScheduleContext;
 import org.apache.shardingsphere.spi.ShardingSphereServiceLoader;
 import org.apache.shardingsphere.spi.typed.TypedSPIRegistry;
 import 
org.apache.shardingsphere.transaction.ShardingSphereTransactionManagerEngine;
@@ -79,7 +80,7 @@ public final class StandaloneContextManagerBuilder implements 
ContextManagerBuil
                 rules, standaloneProps).build(metaDataPersistService);
         TransactionContexts transactionContexts = 
createTransactionContexts(metaDataContexts);
         ContextManager result = new ContextManager();
-        result.init(metaDataContexts, transactionContexts);
+        result.init(metaDataContexts, transactionContexts, new 
ModeScheduleContext(modeConfig));
         return result;
     }
     

Reply via email to