This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new ec75ec4 Move heartbeat job initialization to DatabaseDiscoveryRule
(#14303)
ec75ec4 is described below
commit ec75ec41dfc70db6696b02282471ef0934a6f98c
Author: Haoran Meng <[email protected]>
AuthorDate: Sat Dec 25 12:29:28 2021 +0800
Move heartbeat job initialization to DatabaseDiscoveryRule (#14303)
---
.../dbdiscovery/rule/DatabaseDiscoveryRule.java | 40 ++++++------
.../route/DatabaseDiscoverySQLRouterTest.java | 2 +-
.../rule/DatabaseDiscoveryRuleTest.java | 2 +-
...hmProvidedDatabaseDiscoveryRuleBuilderTest.java | 2 +-
.../builder/DatabaseDiscoveryRuleBuilderTest.java | 2 +-
.../shardingsphere/infra}/instance/Instance.java | 4 +-
.../infra}/instance/utils/IpUtils.java | 2 +-
.../infra/rule/identifier/type/SchedulerRule.java | 40 ------------
.../infra}/instance/InstanceTest.java | 4 +-
.../infra/instance/util}/IpUtilsTest.java | 3 +-
.../core/api/ModeScheduleContextFactory.java | 71 ++++++++++++++++++++++
.../cluster/ClusterContextManagerBuilder.java | 16 +----
.../cluster/coordinator/RegistryCenter.java | 2 +-
.../lock/service/LockRegistryService.java | 2 +-
.../compute/service/ComputeNodeStatusService.java | 2 +-
.../subscriber/ComputeNodeStatusSubscriber.java | 2 +-
.../watcher/ComputeNodeStateChangedWatcher.java | 2 +-
.../lock/service/LockRegistryServiceTest.java | 2 +-
.../service/ComputeNodeStatusServiceTest.java | 2 +-
.../ComputeNodeStateChangedWatcherTest.java | 2 +-
.../common/show/executor/ShowInstanceExecutor.java | 2 +-
21 files changed, 117 insertions(+), 89 deletions(-)
diff --git
a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java
b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java
index f1d62cb..203ea2e 100644
---
a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java
+++
b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.dbdiscovery.rule;
+import com.google.common.base.Strings;
import lombok.Getter;
import
org.apache.shardingsphere.dbdiscovery.algorithm.config.AlgorithmProvidedDatabaseDiscoveryRuleConfiguration;
import
org.apache.shardingsphere.dbdiscovery.api.config.DatabaseDiscoveryRuleConfiguration;
@@ -36,26 +37,27 @@ import
org.apache.shardingsphere.infra.rule.event.impl.PrimaryDataSourceChangedE
import org.apache.shardingsphere.infra.rule.identifier.scope.SchemaRule;
import
org.apache.shardingsphere.infra.rule.identifier.type.DataSourceContainedRule;
import org.apache.shardingsphere.infra.rule.identifier.type.ExportableRule;
-import org.apache.shardingsphere.infra.rule.identifier.type.SchedulerRule;
import
org.apache.shardingsphere.infra.rule.identifier.type.StatusContainedRule;
import org.apache.shardingsphere.infra.schedule.CronJob;
+import org.apache.shardingsphere.schedule.core.api.ModeScheduleContext;
+import org.apache.shardingsphere.schedule.core.api.ModeScheduleContextFactory;
import org.apache.shardingsphere.spi.ShardingSphereServiceLoader;
import javax.sql.DataSource;
import java.sql.SQLException;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
-import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
+import java.util.Properties;
+import java.util.stream.Collectors;
/**
* Database discovery rule.
*/
-public final class DatabaseDiscoveryRule implements SchemaRule,
DataSourceContainedRule, StatusContainedRule, ExportableRule, SchedulerRule {
+public final class DatabaseDiscoveryRule implements SchemaRule,
DataSourceContainedRule, StatusContainedRule, ExportableRule {
static {
ShardingSphereServiceLoader.register(DatabaseDiscoveryType.class);
@@ -81,6 +83,7 @@ public final class DatabaseDiscoveryRule implements
SchemaRule, DataSourceContai
dataSourceRules = getDataSourceRules(dataSourceRuleConfigs,
heartBeatConfig);
findMasterSlaveRelation(schemaName, dataSourceMap);
initAware();
+ initHeartBeatJobs(schemaName, dataSourceMap);
}
private static Map<String, DatabaseDiscoveryType> getDiscoveryTypes(final
Map<String, ShardingSphereAlgorithmConfiguration> discoveryTypesConfig) {
@@ -95,7 +98,8 @@ public final class DatabaseDiscoveryRule implements
SchemaRule, DataSourceContai
final Map<String, DatabaseDiscoveryHeartBeatConfiguration> heartbeatConfig) {
Map<String, DatabaseDiscoveryDataSourceRule> result = new
HashMap<>(dataSources.size(), 1);
for (DatabaseDiscoveryDataSourceRuleConfiguration each : dataSources) {
- result.put(each.getName(), new
DatabaseDiscoveryDataSourceRule(each,
heartbeatConfig.get(each.getDiscoveryHeartbeatName()).getProps(),
discoveryTypes.get(each.getDiscoveryTypeName())));
+ result.put(each.getName(), new
DatabaseDiscoveryDataSourceRule(each,
Strings.isNullOrEmpty(each.getDiscoveryHeartbeatName()) ? new Properties()
+ :
heartbeatConfig.get(each.getDiscoveryHeartbeatName()).getProps(),
discoveryTypes.get(each.getDiscoveryTypeName())));
}
return result;
}
@@ -182,20 +186,22 @@ public final class DatabaseDiscoveryRule implements
SchemaRule, DataSourceContai
return result;
}
- @Override
- public String getType() {
- return DatabaseDiscoveryRule.class.getSimpleName();
+ private void initHeartBeatJobs(final String schemaName, final Map<String,
DataSource> dataSourceMap) {
+ Optional<ModeScheduleContext> modeScheduleContext =
ModeScheduleContextFactory.getInstance().get();
+ if (modeScheduleContext.isPresent()) {
+ for (Entry<String, DatabaseDiscoveryDataSourceRule> entry :
dataSourceRules.entrySet()) {
+ Map<String, DataSource> dataSources =
dataSourceMap.entrySet().stream().filter(dataSource ->
entry.getValue().getDisabledDataSourceNames().contains(dataSource.getKey()))
+ .collect(Collectors.toMap(Entry::getKey,
Entry::getValue));
+ CronJob job = new
CronJob(entry.getValue().getDatabaseDiscoveryType().getType() + "-" +
entry.getValue().getName(),
+ each -> new HeartbeatJob(schemaName, dataSources,
entry.getValue().getName(), entry.getValue().getDatabaseDiscoveryType(),
entry.getValue().getDisabledDataSourceNames())
+ .execute(null),
entry.getValue().getHeartbeatProps().getProperty("keep-alive-cron"));
+ modeScheduleContext.get().startCronJob(job);
+ }
+ }
}
@Override
- public List<CronJob> getCronJobs(final String schemaName, final
Map<String, DataSource> dataSources) {
- List<CronJob> result = new ArrayList<>(dataSourceRules.size());
- for (Entry<String, DatabaseDiscoveryDataSourceRule> entry :
dataSourceRules.entrySet()) {
- result.add(new
CronJob(entry.getValue().getDatabaseDiscoveryType().getType() + "-" +
entry.getValue().getName(),
- each -> new HeartbeatJob(schemaName, dataSources,
entry.getValue().getName(), entry.getValue().getDatabaseDiscoveryType(),
-
entry.getValue().getDisabledDataSourceNames()).execute(null),
-
entry.getValue().getHeartbeatProps().getProperty("keep-alive-cron")));
- }
- return result;
+ public String getType() {
+ return DatabaseDiscoveryRule.class.getSimpleName();
}
}
diff --git
a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/test/java/org/apache/shardingsphere/dbdiscovery/route/DatabaseDiscoverySQLRouterTest.java
b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/test/java/org/apache/shardingsphere/dbdiscovery/route/DatabaseDiscoverySQLRouterTest.java
index 6d95577..8ebe6bb 100644
---
a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/test/java/org/apache/shardingsphere/dbdiscovery/route/DatabaseDiscoverySQLRouterTest.java
+++
b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/test/java/org/apache/shardingsphere/dbdiscovery/route/DatabaseDiscoverySQLRouterTest.java
@@ -75,7 +75,7 @@ public final class DatabaseDiscoverySQLRouterTest {
@Before
public void setUp() {
DatabaseDiscoveryDataSourceRuleConfiguration dataSourceConfig
- = new
DatabaseDiscoveryDataSourceRuleConfiguration(DATA_SOURCE_NAME,
Collections.singletonList(PRIMARY_DATA_SOURCE), "ha_heartbeat", "TEST");
+ = new
DatabaseDiscoveryDataSourceRuleConfiguration(DATA_SOURCE_NAME,
Collections.singletonList(PRIMARY_DATA_SOURCE), "", "TEST");
ShardingSphereAlgorithmConfiguration algorithmConfig = new
ShardingSphereAlgorithmConfiguration("TEST", new Properties());
DatabaseDiscoveryRuleConfiguration config = new
DatabaseDiscoveryRuleConfiguration(Collections.singleton(dataSourceConfig),
Collections.singletonMap("ha_heartbeat", new
DatabaseDiscoveryHeartBeatConfiguration(new Properties())),
diff --git
a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/test/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRuleTest.java
b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/test/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRuleTest.java
index 942797d..da866ca 100644
---
a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/test/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRuleTest.java
+++
b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/test/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRuleTest.java
@@ -102,7 +102,7 @@ public final class DatabaseDiscoveryRuleTest {
}
private DatabaseDiscoveryRule createRule() {
- DatabaseDiscoveryDataSourceRuleConfiguration config = new
DatabaseDiscoveryDataSourceRuleConfiguration("test_pr", Arrays.asList("ds_0",
"ds_1"), "ha_heartbeat", "TEST");
+ DatabaseDiscoveryDataSourceRuleConfiguration config = new
DatabaseDiscoveryDataSourceRuleConfiguration("test_pr", Arrays.asList("ds_0",
"ds_1"), "", "TEST");
return new DatabaseDiscoveryRule("ha_db", dataSourceMap, new
DatabaseDiscoveryRuleConfiguration(
Collections.singleton(config),
Collections.singletonMap("ha_heartbeat", new
DatabaseDiscoveryHeartBeatConfiguration(new Properties())),
ImmutableMap.of("TEST", new
ShardingSphereAlgorithmConfiguration("TEST", new Properties()))));
diff --git
a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/test/java/org/apache/shardingsphere/dbdiscovery/rule/builder/AlgorithmProvidedDatabaseDiscoveryRuleBuilderTest.java
b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/test/java/org/apache/shardingsphere/dbdiscovery/rule/builder/AlgorithmProvidedDatabaseDiscoveryRuleBuilderTest.java
index f8dd2f4..fada0d1 100644
---
a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/test/java/org/apache/shardingsphere/dbdiscovery/rule/builder/AlgorithmProvidedDatabaseDiscoveryRuleBuilderTest.java
+++
b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/test/java/org/apache/shardingsphere/dbdiscovery/rule/builder/AlgorithmProvidedDatabaseDiscoveryRuleBuilderTest.java
@@ -48,7 +48,7 @@ public final class
AlgorithmProvidedDatabaseDiscoveryRuleBuilderTest {
@Test
public void assertBuild() {
AlgorithmProvidedDatabaseDiscoveryRuleConfiguration
algorithmProvidedRuleConfig =
mock(AlgorithmProvidedDatabaseDiscoveryRuleConfiguration.class);
- DatabaseDiscoveryDataSourceRuleConfiguration ruleConfig = new
DatabaseDiscoveryDataSourceRuleConfiguration("name",
Collections.singletonList("name"), "ha_heartbeat", "discoveryTypeName");
+ DatabaseDiscoveryDataSourceRuleConfiguration ruleConfig = new
DatabaseDiscoveryDataSourceRuleConfiguration("name",
Collections.singletonList("name"), "", "discoveryTypeName");
when(algorithmProvidedRuleConfig.getDataSources()).thenReturn(Collections.singletonList(ruleConfig));
when(algorithmProvidedRuleConfig.getDiscoveryHeartbeats()).thenReturn(Collections.singletonMap("ha_heartbeat",
new DatabaseDiscoveryHeartBeatConfiguration(new Properties())));
when(algorithmProvidedRuleConfig.getDiscoveryTypes()).thenReturn(Collections.singletonMap("discoveryTypeName",
new TestDatabaseDiscoveryType()));
diff --git
a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/test/java/org/apache/shardingsphere/dbdiscovery/rule/builder/DatabaseDiscoveryRuleBuilderTest.java
b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/test/java/org/apache/shardingsphere/dbdiscovery/rule/builder/DatabaseDiscoveryRuleBuilderTest.java
index 117d12a..098c565 100644
---
a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/test/java/org/apache/shardingsphere/dbdiscovery/rule/builder/DatabaseDiscoveryRuleBuilderTest.java
+++
b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/test/java/org/apache/shardingsphere/dbdiscovery/rule/builder/DatabaseDiscoveryRuleBuilderTest.java
@@ -50,7 +50,7 @@ public final class DatabaseDiscoveryRuleBuilderTest {
@Test
public void assertBuild() {
DatabaseDiscoveryDataSourceRuleConfiguration dataSourceConfig =
- new DatabaseDiscoveryDataSourceRuleConfiguration("name",
Collections.singletonList("name"), "ha_heartbeat", "TEST");
+ new DatabaseDiscoveryDataSourceRuleConfiguration("name",
Collections.singletonList("name"), "", "TEST");
DatabaseDiscoveryRuleConfiguration config = new
DatabaseDiscoveryRuleConfiguration(
Collections.singleton(dataSourceConfig),
Collections.singletonMap("ha_heartbeat", new
DatabaseDiscoveryHeartBeatConfiguration(new Properties())),
Collections.singletonMap("TEST", new
ShardingSphereAlgorithmConfiguration("TEST", new Properties())));
diff --git
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/instance/Instance.java
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/Instance.java
similarity index 94%
rename from
shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/instance/Instance.java
rename to
shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/Instance.java
index 69e024d..7e3d141 100644
---
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/instance/Instance.java
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/Instance.java
@@ -15,12 +15,12 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.mode.instance;
+package org.apache.shardingsphere.infra.instance;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.NoArgsConstructor;
-import org.apache.shardingsphere.mode.instance.utils.IpUtils;
+import org.apache.shardingsphere.infra.instance.utils.IpUtils;
import java.lang.management.ManagementFactory;
diff --git
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/instance/utils/IpUtils.java
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/utils/IpUtils.java
similarity index 98%
rename from
shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/instance/utils/IpUtils.java
rename to
shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/utils/IpUtils.java
index b517f85..7cccd5d 100644
---
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/instance/utils/IpUtils.java
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/utils/IpUtils.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.mode.instance.utils;
+package org.apache.shardingsphere.infra.instance.utils;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/SchedulerRule.java
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/SchedulerRule.java
deleted file mode 100644
index 969ac7b..0000000
---
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/rule/identifier/type/SchedulerRule.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.infra.rule.identifier.type;
-
-import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
-import org.apache.shardingsphere.infra.schedule.CronJob;
-
-import javax.sql.DataSource;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Scheduler rule.
- */
-public interface SchedulerRule extends ShardingSphereRule {
-
- /**
- * Get cron jobs.
- *
- * @param schemaName schema name
- * @param dataSources datasource
- * @return cron jobs
- */
- List<CronJob> getCronJobs(String schemaName, Map<String, DataSource>
dataSources);
-}
diff --git
a/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/instance/InstanceTest.java
b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/instance/InstanceTest.java
similarity index 94%
rename from
shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/instance/InstanceTest.java
rename to
shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/instance/InstanceTest.java
index cf592b4..e0a6d9b 100644
---
a/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/instance/InstanceTest.java
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/instance/InstanceTest.java
@@ -15,10 +15,10 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.mode.instance;
+package org.apache.shardingsphere.infra.instance;
import com.google.common.base.Joiner;
-import org.apache.shardingsphere.mode.instance.utils.IpUtils;
+import org.apache.shardingsphere.infra.instance.utils.IpUtils;
import org.junit.Test;
import java.lang.management.ManagementFactory;
diff --git
a/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/instance/utils/IpUtilsTest.java
b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/instance/util/IpUtilsTest.java
similarity index 89%
rename from
shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/instance/utils/IpUtilsTest.java
rename to
shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/instance/util/IpUtilsTest.java
index f4ae686..3293b12 100644
---
a/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/instance/utils/IpUtilsTest.java
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/instance/util/IpUtilsTest.java
@@ -15,8 +15,9 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.mode.instance.utils;
+package org.apache.shardingsphere.infra.instance.util;
+import org.apache.shardingsphere.infra.instance.utils.IpUtils;
import org.junit.Test;
import static org.junit.Assert.assertNotNull;
diff --git
a/shardingsphere-kernel/shardingsphere-schedule/shardingsphere-schedule-core/src/main/java/org/apache/shardingsphere/schedule/core/api/ModeScheduleContextFactory.java
b/shardingsphere-kernel/shardingsphere-schedule/shardingsphere-schedule-core/src/main/java/org/apache/shardingsphere/schedule/core/api/ModeScheduleContextFactory.java
new file mode 100644
index 0000000..de52172
--- /dev/null
+++
b/shardingsphere-kernel/shardingsphere-schedule/shardingsphere-schedule-core/src/main/java/org/apache/shardingsphere/schedule/core/api/ModeScheduleContextFactory.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.schedule.core.api;
+
+import com.google.common.base.Strings;
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
+import org.apache.shardingsphere.infra.instance.Instance;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Mode schedule context factory.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+@Getter
+public final class ModeScheduleContextFactory {
+
+ private static final ModeScheduleContextFactory INSTANCE = new
ModeScheduleContextFactory();
+
+ private final Map<String, ModeScheduleContext> modeScheduleContexts = new
ConcurrentHashMap<>();
+
+ /**
+ * Init mode schedule context.
+ *
+ * @param modeConfig mode configuration
+ */
+ public void init(final ModeConfiguration modeConfig) {
+ modeScheduleContexts.put(Instance.getInstance().getId(), new
ModeScheduleContext(modeConfig));
+ }
+
+ /**
+ * Get mode schedule context of current instance.
+ *
+ * @return mode schedule context
+ */
+ public Optional<ModeScheduleContext> get() {
+ if (Strings.isNullOrEmpty(Instance.getInstance().getId())) {
+ return Optional.empty();
+ }
+ return
Optional.ofNullable(modeScheduleContexts.get(Instance.getInstance().getId()));
+ }
+
+ /**
+ * Get instance.
+ *
+ * @return singleton instance
+ */
+ public static ModeScheduleContextFactory getInstance() {
+ return INSTANCE;
+ }
+}
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
index 8bd3c68..01e42939 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
@@ -29,7 +29,6 @@ import
org.apache.shardingsphere.infra.metadata.schema.loader.SchemaLoader;
import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
import org.apache.shardingsphere.infra.rule.builder.schema.SchemaRulesBuilder;
import
org.apache.shardingsphere.infra.rule.event.impl.DataSourceNameDisabledEvent;
-import org.apache.shardingsphere.infra.rule.identifier.type.SchedulerRule;
import
org.apache.shardingsphere.infra.rule.identifier.type.StatusContainedRule;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.ContextManagerBuilder;
@@ -41,7 +40,7 @@ import
org.apache.shardingsphere.mode.metadata.MetaDataContextsBuilder;
import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
-import org.apache.shardingsphere.schedule.core.api.ModeScheduleContext;
+import org.apache.shardingsphere.schedule.core.api.ModeScheduleContextFactory;
import org.apache.shardingsphere.spi.ShardingSphereServiceLoader;
import org.apache.shardingsphere.spi.typed.TypedSPIRegistry;
import org.apache.shardingsphere.transaction.context.TransactionContexts;
@@ -83,9 +82,9 @@ public final class ClusterContextManagerBuilder implements
ContextManagerBuilder
final Properties props, final boolean
isOverwrite, final Integer port, final String schemaName) throws SQLException {
beforeBuildContextManager(modeConfig, dataSourcesMap,
schemaRuleConfigs, globalRuleConfigs, props, isOverwrite, port, schemaName);
contextManager = new ContextManager();
- contextManager.init(metaDataContexts, transactionContexts, new
ModeScheduleContext(modeConfig));
+ contextManager.init(metaDataContexts, transactionContexts, null);
+ ModeScheduleContextFactory.getInstance().init(modeConfig);
afterBuildContextManager();
- startMonitor();
return contextManager;
}
@@ -230,15 +229,6 @@ public final class ClusterContextManagerBuilder implements
ContextManagerBuilder
schemas.forEach((key, value) ->
metaDataPersistService.getSchemaMetaDataService().persist(key, value));
}
- private void startMonitor() {
- metaDataContexts.getAllSchemaNames().forEach(each ->
metaDataContexts.getMetaData(each).getRuleMetaData().getRules().stream().filter(rule
-> rule instanceof SchedulerRule)
- .forEach(rule -> startSchedules(each,
metaDataContexts.getMetaData(each).getResource().getDataSources(),
(SchedulerRule) rule)));
- }
-
- private void startSchedules(final String schemaName, final Map<String,
DataSource> dataSources, final SchedulerRule schedulerRule) {
- schedulerRule.getCronJobs(schemaName, dataSources).forEach(each ->
contextManager.getModeScheduleContext().startCronJob(each));
- }
-
@Override
public String getType() {
return "Cluster";
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
index 871d22b..e19472e 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
@@ -18,7 +18,7 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator;
import lombok.Getter;
-import org.apache.shardingsphere.mode.instance.Instance;
+import org.apache.shardingsphere.infra.instance.Instance;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.service.LockRegistryService;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcherFactory;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.cache.subscriber.ScalingRegistrySubscriber;
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/service/LockRegistryService.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/service/LockRegistryService.java
index 7b1c269..05bc201 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/service/LockRegistryService.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/service/LockRegistryService.java
@@ -19,7 +19,7 @@ package
org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.service;
import com.google.common.base.Joiner;
import com.google.common.base.Strings;
-import org.apache.shardingsphere.mode.instance.Instance;
+import org.apache.shardingsphere.infra.instance.Instance;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.ComputeNodeStatus;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.node.ComputeStatusNode;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
index eefb6f2..26c283c 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
@@ -18,7 +18,7 @@
package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.service;
import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.mode.instance.Instance;
+import org.apache.shardingsphere.infra.instance.Instance;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.ComputeNodeStatus;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.node.ComputeStatusNode;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/subscriber/ComputeNodeStatusSubscriber.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/subscriber/ComputeNodeStatus
[...]
index 68b2aa9..006fad3 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/subscriber/ComputeNodeStatusSubscriber.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/subscriber/ComputeNodeStatusSubscriber.java
@@ -19,7 +19,7 @@ package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.stat
import com.google.common.eventbus.Subscribe;
import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
-import org.apache.shardingsphere.mode.instance.Instance;
+import org.apache.shardingsphere.infra.instance.Instance;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.ComputeNodeStatus;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ComputeNodeStatusChangedEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.node.ComputeStatusNode;
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChan
[...]
index db9969d..ed4c954 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
@@ -19,7 +19,7 @@ package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.stat
import org.apache.shardingsphere.infra.state.StateEvent;
import org.apache.shardingsphere.infra.state.StateType;
-import org.apache.shardingsphere.mode.instance.Instance;
+import org.apache.shardingsphere.infra.instance.Instance;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.ComputeNodeStatus;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.node.ComputeStatusNode;
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/service/LockRegistryServiceTest.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/service/LockRegistryServiceTest.java
index ee3cbf8..c0f231f 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/service/LockRegistryServiceTest.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/lock/service/LockRegistryServiceTest.java
@@ -17,7 +17,7 @@
package
org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.service;
-import org.apache.shardingsphere.mode.instance.Instance;
+import org.apache.shardingsphere.infra.instance.Instance;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.junit.Before;
import org.junit.Test;
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusServiceTest.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusServi
[...]
index 9302999..3736c9f 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusServiceTest.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusServiceTest.java
@@ -17,7 +17,7 @@
package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.service;
-import org.apache.shardingsphere.mode.instance.Instance;
+import org.apache.shardingsphere.infra.instance.Instance;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.junit.Before;
import org.junit.Test;
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcherTest.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeState
[...]
index ae1ff63..ee8a54d 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcherTest.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcherTest.java
@@ -18,7 +18,7 @@
package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.watcher;
import org.apache.shardingsphere.infra.state.StateEvent;
-import org.apache.shardingsphere.mode.instance.Instance;
+import org.apache.shardingsphere.infra.instance.Instance;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
import org.junit.After;
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/show/executor/ShowInstanceExecutor.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/show/executor/ShowInstanceExecutor.java
index 170ec19..abede32 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/show/executor/ShowInstanceExecutor.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/show/executor/ShowInstanceExecutor.java
@@ -20,7 +20,7 @@ package
org.apache.shardingsphere.proxy.backend.text.distsql.ral.common.show.exe
import org.apache.shardingsphere.infra.merge.result.MergedResult;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.ComputeNodeStatus;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.node.ComputeStatusNode;
-import org.apache.shardingsphere.mode.instance.utils.IpUtils;
+import org.apache.shardingsphere.infra.instance.utils.IpUtils;
import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import
org.apache.shardingsphere.proxy.backend.response.header.query.impl.QueryHeader;