This is an automated email from the ASF dual-hosted git repository.
menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 8a0beb4 Add ModeScheduleContext and integrate with ContextManager
(#14001)
8a0beb4 is described below
commit 8a0beb4198839785045a4e0b4cb323ef982f1d42
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Wed Dec 8 20:19:34 2021 +0800
Add ModeScheduleContext and integrate with ContextManager (#14001)
* Add ModeScheduleContext and integrate with ContextManager
* Fix typo
* Fix unit test, modeConfig may be null
* Fix unit test, mode type could be any value
---
.../shardingsphere-schedule-core/pom.xml | 29 +++++
.../schedule/core/api/JobParameter.java | 24 ++++
.../schedule/core/api/ModeScheduleContext.java | 134 +++++++++++++++++++++
.../shardingsphere-mode-core/pom.xml | 5 +
.../mode/manager/ContextManager.java | 7 +-
.../mode/manager/ContextManagerTest.java | 6 +-
.../cluster/ClusterContextManagerBuilder.java | 3 +-
.../memory/MemoryContextManagerBuilder.java | 3 +-
.../StandaloneContextManagerBuilder.java | 3 +-
9 files changed, 209 insertions(+), 5 deletions(-)
diff --git
a/shardingsphere-kernel/shardingsphere-schedule/shardingsphere-schedule-core/pom.xml
b/shardingsphere-kernel/shardingsphere-schedule/shardingsphere-schedule-core/pom.xml
index d77cf94..065e185 100644
---
a/shardingsphere-kernel/shardingsphere-schedule/shardingsphere-schedule-core/pom.xml
+++
b/shardingsphere-kernel/shardingsphere-schedule/shardingsphere-schedule-core/pom.xml
@@ -28,4 +28,33 @@
<artifactId>shardingsphere-schedule-core</artifactId>
<name>${project.artifactId}</name>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.shardingsphere</groupId>
+ <artifactId>shardingsphere-infra-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.shardingsphere.elasticjob</groupId>
+ <artifactId>elasticjob-lite-core</artifactId>
+ <version>${elasticjob.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.zaxxer</groupId>
+ <artifactId>HikariCP-java7</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.shardingsphere.elasticjob</groupId>
+ <artifactId>elasticjob-lite-lifecycle</artifactId>
+ <version>${elasticjob.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.zaxxer</groupId>
+ <artifactId>HikariCP-java7</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
</project>
diff --git
a/shardingsphere-kernel/shardingsphere-schedule/shardingsphere-schedule-core/src/main/java/org/apache/shardingsphere/schedule/core/api/JobParameter.java
b/shardingsphere-kernel/shardingsphere-schedule/shardingsphere-schedule-core/src/main/java/org/apache/shardingsphere/schedule/core/api/JobParameter.java
new file mode 100644
index 0000000..36931eb
--- /dev/null
+++
b/shardingsphere-kernel/shardingsphere-schedule/shardingsphere-schedule-core/src/main/java/org/apache/shardingsphere/schedule/core/api/JobParameter.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.schedule.core.api;
+
+/**
+ * Schedule job parameter.
+ */
+public final class JobParameter {
+}
diff --git
a/shardingsphere-kernel/shardingsphere-schedule/shardingsphere-schedule-core/src/main/java/org/apache/shardingsphere/schedule/core/api/ModeScheduleContext.java
b/shardingsphere-kernel/shardingsphere-schedule/shardingsphere-schedule-core/src/main/java/org/apache/shardingsphere/schedule/core/api/ModeScheduleContext.java
new file mode 100644
index 0000000..2c56845
--- /dev/null
+++
b/shardingsphere-kernel/shardingsphere-schedule/shardingsphere-schedule-core/src/main/java/org/apache/shardingsphere/schedule/core/api/ModeScheduleContext.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.schedule.core.api;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
+import org.apache.shardingsphere.elasticjob.api.ShardingContext;
+import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
+import
org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;
+import
org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobConfigurationAPI;
+import
org.apache.shardingsphere.elasticjob.lite.lifecycle.internal.settings.JobConfigurationAPIImpl;
+import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
+import
org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
+import
org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;
+import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
+import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
+
+import java.util.Properties;
+import java.util.function.Consumer;
+
+/**
+ * Mode schedule context, used for proxy and jdbc.
+ */
+@Slf4j
+public final class ModeScheduleContext {
+
+ private final CoordinatorRegistryCenter registryCenter;
+
+ private final JobConfigurationAPI jobConfigAPI;
+
+ public ModeScheduleContext(final ModeConfiguration modeConfig) {
+ CoordinatorRegistryCenter registryCenter =
initRegistryCenter(modeConfig);
+ this.registryCenter = registryCenter;
+ this.jobConfigAPI = null != registryCenter ? new
JobConfigurationAPIImpl(registryCenter) : null;
+ }
+
+ private CoordinatorRegistryCenter initRegistryCenter(final
ModeConfiguration modeConfig) {
+ if (null == modeConfig) {
+ return null;
+ }
+ String modeType = modeConfig.getType();
+ switch (modeType) {
+ // TODO do not hard-code mode type, refactor later
+ case "Cluster":
+ return initRegistryCenterForClusterMode(modeConfig);
+ case "Standalone":
+ return null;
+ case "Memory":
+ return null;
+ default:
+ // TODO ModeConfiguration.type is not limited, it could be any
value
+ log.warn("Unknown mode type '{}'", modeType);
+ return null;
+ }
+ }
+
+ private CoordinatorRegistryCenter initRegistryCenterForClusterMode(final
ModeConfiguration modeConfig) {
+ String clusterType = modeConfig.getRepository().getType();
+ Properties props = modeConfig.getRepository().getProps();
+ // TODO do not hard-code cluster type and property key, refactor later
+ if ("ZooKeeper".equals(clusterType)) {
+ ZookeeperConfiguration zkConfig = new
ZookeeperConfiguration(props.getProperty("server-lists"),
props.getProperty("namespace"));
+ CoordinatorRegistryCenter result = new
ZookeeperRegistryCenter(zkConfig);
+ result.init();
+ return result;
+ }
+ log.warn("Unsupported clusterType '{}'", clusterType);
+ return null;
+ }
+
+ /**
+ * Schedule with cron.
+ *
+ * @param jobName job name
+ * @param job job implementation
+ * @param cron cron expression
+ */
+ public void scheduleWithCron(final String jobName, final
Consumer<JobParameter> job, final String cron) {
+ if (null == registryCenter) {
+ log.warn("registryCenter is null, ignore, jobName={}, cron={}",
job, cron);
+ return;
+ }
+ JobConfiguration jobConfig = JobConfiguration.newBuilder(jobName,
1).cron(cron).build();
+ ScheduleJobBootstrap bootstrap = new
ScheduleJobBootstrap(registryCenter, new ConsumerSimpleJob(job), jobConfig);
+ bootstrap.schedule();
+ }
+
+ /**
+ * Update job cron.
+ *
+ * @param jobName job name
+ * @param cron cron expression
+ */
+ public void updateJobCron(final String jobName, final String cron) {
+ if (null == jobConfigAPI) {
+ log.warn("jobConfigAPI is null, ignore, jobName={}, cron={}",
jobName, cron);
+ return;
+ }
+ JobConfigurationPOJO jobConfig = new JobConfigurationPOJO();
+ jobConfig.setJobName(jobName);
+ jobConfig.setCron(cron);
+ jobConfig.setShardingTotalCount(1);
+ jobConfigAPI.updateJobConfiguration(jobConfig);
+ }
+
+ private static final class ConsumerSimpleJob implements SimpleJob {
+
+ private final Consumer<JobParameter> job;
+
+ ConsumerSimpleJob(final Consumer<JobParameter> job) {
+ this.job = job;
+ }
+
+ @Override
+ public void execute(final ShardingContext shardingContext) {
+ job.accept(new JobParameter());
+ }
+ }
+}
diff --git a/shardingsphere-mode/shardingsphere-mode-core/pom.xml
b/shardingsphere-mode/shardingsphere-mode-core/pom.xml
index a0c7aa7..304925e 100644
--- a/shardingsphere-mode/shardingsphere-mode-core/pom.xml
+++ b/shardingsphere-mode/shardingsphere-mode-core/pom.xml
@@ -34,6 +34,11 @@
<artifactId>shardingsphere-infra-context</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.shardingsphere</groupId>
+ <artifactId>shardingsphere-schedule-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.shardingsphere</groupId>
diff --git
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
index a2cb0a5..0553def 100644
---
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
+++
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
@@ -42,6 +42,7 @@ import
org.apache.shardingsphere.infra.rule.identifier.type.MutableDataNodeRule;
import org.apache.shardingsphere.infra.state.StateContext;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.mode.metadata.MetaDataContextsBuilder;
+import org.apache.shardingsphere.schedule.core.api.ModeScheduleContext;
import
org.apache.shardingsphere.transaction.ShardingSphereTransactionManagerEngine;
import org.apache.shardingsphere.transaction.context.TransactionContexts;
import org.apache.shardingsphere.transaction.rule.TransactionRule;
@@ -70,6 +71,8 @@ public final class ContextManager implements AutoCloseable {
private volatile TransactionContexts transactionContexts = new
TransactionContexts();
+ private volatile ModeScheduleContext modeScheduleContext;
+
private final StateContext stateContext = new StateContext();
/**
@@ -77,10 +80,12 @@ public final class ContextManager implements AutoCloseable {
*
* @param metaDataContexts meta data contexts
* @param transactionContexts transaction contexts
+ * @param modeScheduleContext mode schedule context
*/
- public void init(final MetaDataContexts metaDataContexts, final
TransactionContexts transactionContexts) {
+ public void init(final MetaDataContexts metaDataContexts, final
TransactionContexts transactionContexts, final ModeScheduleContext
modeScheduleContext) {
this.metaDataContexts = metaDataContexts;
this.transactionContexts = transactionContexts;
+ this.modeScheduleContext = modeScheduleContext;
}
/**
diff --git
a/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/ContextManagerTest.java
b/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/ContextManagerTest.java
index ebd7b36..354e892 100644
---
a/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/ContextManagerTest.java
+++
b/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/ContextManagerTest.java
@@ -27,6 +27,7 @@ import
org.apache.shardingsphere.infra.metadata.resource.ShardingSphereResource;
import
org.apache.shardingsphere.infra.metadata.rule.ShardingSphereRuleMetaData;
import org.apache.shardingsphere.infra.metadata.schema.ShardingSphereSchema;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
+import org.apache.shardingsphere.schedule.core.api.ModeScheduleContext;
import org.apache.shardingsphere.transaction.context.TransactionContexts;
import org.junit.Before;
import org.junit.Test;
@@ -57,13 +58,16 @@ public final class ContextManagerTest {
@Mock
private TransactionContexts transactionContexts;
+
+ @Mock
+ private ModeScheduleContext modeScheduleContext;
private ContextManager contextManager;
@Before
public void setUp() throws SQLException {
contextManager = new ContextManager();
- contextManager.init(metaDataContexts, transactionContexts);
+ contextManager.init(metaDataContexts, transactionContexts,
modeScheduleContext);
dataSourceMap = new HashMap<>(2, 1);
DataSource primaryDataSource = mock(DataSource.class);
DataSource replicaDataSource = mock(DataSource.class);
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
index 7277cb6..ad4c41a 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
@@ -40,6 +40,7 @@ import
org.apache.shardingsphere.mode.metadata.MetaDataContextsBuilder;
import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
+import org.apache.shardingsphere.schedule.core.api.ModeScheduleContext;
import org.apache.shardingsphere.spi.ShardingSphereServiceLoader;
import org.apache.shardingsphere.spi.typed.TypedSPIRegistry;
import
org.apache.shardingsphere.transaction.ShardingSphereTransactionManagerEngine;
@@ -83,7 +84,7 @@ public final class ClusterContextManagerBuilder implements
ContextManagerBuilder
final Properties props, final boolean
isOverwrite, final Integer port) throws SQLException {
beforeBuildContextManager(modeConfig, dataSourcesMap,
schemaRuleConfigs, globalRuleConfigs, props, isOverwrite, port);
contextManager = new ContextManager();
- contextManager.init(metaDataContexts, transactionContexts);
+ contextManager.init(metaDataContexts, transactionContexts, new
ModeScheduleContext(modeConfig));
afterBuildContextManager();
return contextManager;
}
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-memory-mode/shardingsphere-memory-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/memory/MemoryContextManagerBuilder.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-memory-mode/shardingsphere-memory-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/memory/MemoryContextManagerBuilder.java
index f6eed3c..dab5db8 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-memory-mode/shardingsphere-memory-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/memory/MemoryContextManagerBuilder.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-memory-mode/shardingsphere-memory-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/memory/MemoryContextManagerBuilder.java
@@ -28,6 +28,7 @@ import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.ContextManagerBuilder;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.mode.metadata.MetaDataContextsBuilder;
+import org.apache.shardingsphere.schedule.core.api.ModeScheduleContext;
import
org.apache.shardingsphere.transaction.ShardingSphereTransactionManagerEngine;
import org.apache.shardingsphere.transaction.context.TransactionContexts;
import org.apache.shardingsphere.transaction.rule.TransactionRule;
@@ -55,7 +56,7 @@ public final class MemoryContextManagerBuilder implements
ContextManagerBuilder
MetaDataContexts metaDataContexts = new
MetaDataContextsBuilder(dataSourcesMap, schemaRuleConfigs, globalRuleConfigs,
schemas, rules, props).build(null);
TransactionContexts transactionContexts =
createTransactionContexts(metaDataContexts);
ContextManager result = new ContextManager();
- result.init(metaDataContexts, transactionContexts);
+ result.init(metaDataContexts, transactionContexts, new
ModeScheduleContext(modeConfig));
return result;
}
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-standalone-mode/shardingsphere-standalone-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/StandaloneContextManagerBuilder.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-standalone-mode/shardingsphere-standalone-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/StandaloneContextManagerBuilder.java
index a577c5b..4fc9f1b 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-standalone-mode/shardingsphere-standalone-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/StandaloneContextManagerBuilder.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-standalone-mode/shardingsphere-standalone-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/StandaloneContextManagerBuilder.java
@@ -34,6 +34,7 @@ import
org.apache.shardingsphere.mode.metadata.MetaDataContextsBuilder;
import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
import
org.apache.shardingsphere.mode.repository.standalone.StandalonePersistRepository;
import
org.apache.shardingsphere.mode.repository.standalone.StandalonePersistRepositoryConfiguration;
+import org.apache.shardingsphere.schedule.core.api.ModeScheduleContext;
import org.apache.shardingsphere.spi.ShardingSphereServiceLoader;
import org.apache.shardingsphere.spi.typed.TypedSPIRegistry;
import
org.apache.shardingsphere.transaction.ShardingSphereTransactionManagerEngine;
@@ -79,7 +80,7 @@ public final class StandaloneContextManagerBuilder implements
ContextManagerBuil
rules, standaloneProps).build(metaDataPersistService);
TransactionContexts transactionContexts =
createTransactionContexts(metaDataContexts);
ContextManager result = new ContextManager();
- result.init(metaDataContexts, transactionContexts);
+ result.init(metaDataContexts, transactionContexts, new
ModeScheduleContext(modeConfig));
return result;
}