This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere-elasticjob.git
The following commit(s) were added to refs/heads/master by this push:
new 78e05b7 Add cloud job disable listener (#1310)
78e05b7 is described below
commit 78e05b7ec7434881e9b71d96c5d7cd8643b71cac
Author: Haoran Meng <[email protected]>
AuthorDate: Fri Jul 31 17:36:24 2020 +0800
Add cloud job disable listener (#1310)
* Add cloud job disable listener
* Revise java doc
---
.../cloud/scheduler/mesos/SchedulerService.java | 6 ++
.../state/disable/job/CloudJobDisableListener.java | 85 +++++++++++++++
.../scheduler/mesos/SchedulerServiceTest.java | 6 +-
.../disable/job/CloudJobDisableListenerTest.java | 119 +++++++++++++++++++++
4 files changed, 215 insertions(+), 1 deletion(-)
diff --git
a/elasticjob-cloud/elasticjob-cloud-scheduler/src/main/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/mesos/SchedulerService.java
b/elasticjob-cloud/elasticjob-cloud-scheduler/src/main/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/mesos/SchedulerService.java
index b33c41a..2cf1092 100755
---
a/elasticjob-cloud/elasticjob-cloud-scheduler/src/main/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/mesos/SchedulerService.java
+++
b/elasticjob-cloud/elasticjob-cloud-scheduler/src/main/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/mesos/SchedulerService.java
@@ -31,6 +31,7 @@ import
org.apache.shardingsphere.elasticjob.cloud.scheduler.env.BootstrapEnviron
import
org.apache.shardingsphere.elasticjob.cloud.scheduler.env.MesosConfiguration;
import
org.apache.shardingsphere.elasticjob.cloud.scheduler.ha.FrameworkIDService;
import
org.apache.shardingsphere.elasticjob.cloud.scheduler.producer.ProducerManager;
+import
org.apache.shardingsphere.elasticjob.cloud.scheduler.state.disable.job.CloudJobDisableListener;
import
org.apache.shardingsphere.elasticjob.cloud.scheduler.statistics.StatisticManager;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.tracing.JobEventBus;
@@ -63,6 +64,8 @@ public final class SchedulerService {
private final ReconcileService reconcileService;
+ private final CloudJobDisableListener cloudJobDisableListener;
+
public SchedulerService(final CoordinatorRegistryCenter regCenter) {
env = BootstrapEnvironment.getINSTANCE();
facadeService = new FacadeService(regCenter);
@@ -72,6 +75,7 @@ public final class SchedulerService {
schedulerDriver = getSchedulerDriver(taskScheduler, jobEventBus, new
FrameworkIDService(regCenter));
producerManager = new ProducerManager(schedulerDriver, regCenter);
cloudJobConfigurationListener = new
CloudJobConfigurationListener(regCenter, producerManager);
+ cloudJobDisableListener = new CloudJobDisableListener(regCenter,
producerManager);
taskLaunchScheduledService = new
TaskLaunchScheduledService(schedulerDriver, taskScheduler, facadeService,
jobEventBus);
reconcileService = new ReconcileService(schedulerDriver,
facadeService);
consoleBootstrap = new ConsoleBootstrap(regCenter,
env.getRestfulServerConfiguration(), producerManager, reconcileService);
@@ -116,6 +120,7 @@ public final class SchedulerService {
producerManager.startup();
statisticManager.startup();
cloudJobConfigurationListener.start();
+ cloudJobDisableListener.start();
taskLaunchScheduledService.startAsync();
consoleBootstrap.start();
schedulerDriver.start();
@@ -131,6 +136,7 @@ public final class SchedulerService {
consoleBootstrap.stop();
taskLaunchScheduledService.stopAsync();
cloudJobConfigurationListener.stop();
+ cloudJobDisableListener.stop();
statisticManager.shutdown();
producerManager.shutdown();
schedulerDriver.stop(true);
diff --git
a/elasticjob-cloud/elasticjob-cloud-scheduler/src/main/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/state/disable/job/CloudJobDisableListener.java
b/elasticjob-cloud/elasticjob-cloud-scheduler/src/main/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/state/disable/job/CloudJobDisableListener.java
new file mode 100644
index 0000000..c5800bc
--- /dev/null
+++
b/elasticjob-cloud/elasticjob-cloud-scheduler/src/main/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/state/disable/job/CloudJobDisableListener.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.cloud.scheduler.state.disable.job;
+
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.CuratorCache;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
+import
org.apache.shardingsphere.elasticjob.cloud.scheduler.producer.ProducerManager;
+import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
+
+import java.util.Objects;
+import java.util.concurrent.Executors;
+
+/**
+ * Cloud job disable listener.
+ */
+public final class CloudJobDisableListener implements CuratorCacheListener {
+
+ private final CoordinatorRegistryCenter regCenter;
+
+ private final ProducerManager producerManager;
+
+ public CloudJobDisableListener(final CoordinatorRegistryCenter regCenter,
final ProducerManager producerManager) {
+ this.regCenter = regCenter;
+ this.producerManager = producerManager;
+ }
+
+ @Override
+ public void event(final Type type, final ChildData oldData, final
ChildData data) {
+ String path = data.getPath();
+ if (Type.NODE_CREATED == type && isJobDisableNode(path)) {
+ String jobName = path.substring(DisableJobNode.ROOT.length() + 1);
+ if (Objects.nonNull(jobName)) {
+ producerManager.unschedule(jobName);
+ }
+ } else if (Type.NODE_DELETED == type && isJobDisableNode(path)) {
+ String jobName = path.substring(DisableJobNode.ROOT.length() + 1);
+ if (Objects.nonNull(jobName)) {
+ producerManager.reschedule(jobName);
+ }
+ }
+ }
+
+ private boolean isJobDisableNode(final String path) {
+ return path.startsWith(DisableJobNode.ROOT) && path.length() >
DisableJobNode.ROOT.length();
+ }
+
+ /**
+ * Start the listener service of the cloud job service.
+ */
+ public void start() {
+ getCache().listenable().addListener(this,
Executors.newSingleThreadExecutor());
+ }
+
+ /**
+ * Stop the listener service of the cloud job service.
+ */
+ public void stop() {
+ getCache().listenable().removeListener(this);
+ }
+
+ private CuratorCache getCache() {
+ CuratorCache result = (CuratorCache)
regCenter.getRawCache(DisableJobNode.ROOT);
+ if (null != result) {
+ return result;
+ }
+ regCenter.addCacheData(DisableJobNode.ROOT);
+ return (CuratorCache) regCenter.getRawCache(DisableJobNode.ROOT);
+ }
+}
diff --git
a/elasticjob-cloud/elasticjob-cloud-scheduler/src/test/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/mesos/SchedulerServiceTest.java
b/elasticjob-cloud/elasticjob-cloud-scheduler/src/test/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/mesos/SchedulerServiceTest.java
index edf2a82..345a578 100755
---
a/elasticjob-cloud/elasticjob-cloud-scheduler/src/test/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/mesos/SchedulerServiceTest.java
+++
b/elasticjob-cloud/elasticjob-cloud-scheduler/src/test/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/mesos/SchedulerServiceTest.java
@@ -24,6 +24,7 @@ import
org.apache.shardingsphere.elasticjob.cloud.scheduler.config.job.CloudJobC
import
org.apache.shardingsphere.elasticjob.cloud.scheduler.env.BootstrapEnvironment;
import
org.apache.shardingsphere.elasticjob.cloud.scheduler.env.FrameworkConfiguration;
import
org.apache.shardingsphere.elasticjob.cloud.scheduler.producer.ProducerManager;
+import
org.apache.shardingsphere.elasticjob.cloud.scheduler.state.disable.job.CloudJobDisableListener;
import
org.apache.shardingsphere.elasticjob.cloud.scheduler.statistics.StatisticManager;
import org.junit.Before;
import org.junit.Test;
@@ -67,13 +68,16 @@ public class SchedulerServiceTest {
@Mock
private ReconcileService reconcileService;
+ @Mock
+ private CloudJobDisableListener cloudJobDisableListener;
+
private SchedulerService schedulerService;
@Before
public void setUp() {
schedulerService = new SchedulerService(env, facadeService,
schedulerDriver,
producerManager, statisticManager,
cloudJobConfigurationListener,
- taskLaunchScheduledService, consoleBootstrap,
reconcileService);
+ taskLaunchScheduledService, consoleBootstrap,
reconcileService, cloudJobDisableListener);
}
@Test
diff --git
a/elasticjob-cloud/elasticjob-cloud-scheduler/src/test/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/state/disable/job/CloudJobDisableListenerTest.java
b/elasticjob-cloud/elasticjob-cloud-scheduler/src/test/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/state/disable/job/CloudJobDisableListenerTest.java
new file mode 100644
index 0000000..5169a14
--- /dev/null
+++
b/elasticjob-cloud/elasticjob-cloud-scheduler/src/test/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/state/disable/job/CloudJobDisableListenerTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.cloud.scheduler.state.disable.job;
+
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
+import org.apache.shardingsphere.elasticjob.cloud.ReflectionUtils;
+import
org.apache.shardingsphere.elasticjob.cloud.scheduler.config.job.CloudJobConfigurationListenerTest;
+import
org.apache.shardingsphere.elasticjob.cloud.scheduler.fixture.EmbedTestingServer;
+import
org.apache.shardingsphere.elasticjob.cloud.scheduler.producer.ProducerManager;
+import
org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
+import
org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentMatchers;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+@RunWith(MockitoJUnitRunner.class)
+public final class CloudJobDisableListenerTest {
+
+ private static ZookeeperRegistryCenter regCenter;
+
+ @Mock
+ private ProducerManager producerManager;
+
+ @InjectMocks
+ private CloudJobDisableListener cloudJobDisableListener;
+
+ @Before
+ public void setUp() {
+ ReflectionUtils.setFieldValue(cloudJobDisableListener,
"producerManager", producerManager);
+ initRegistryCenter();
+ ReflectionUtils.setFieldValue(cloudJobDisableListener, "regCenter",
regCenter);
+ }
+
+ private void initRegistryCenter() {
+ EmbedTestingServer.start();
+ ZookeeperConfiguration configuration = new
ZookeeperConfiguration(EmbedTestingServer.getConnectionString(),
CloudJobConfigurationListenerTest.class.getName());
+ configuration.setDigest("digest:password");
+ configuration.setSessionTimeoutMilliseconds(5000);
+ configuration.setConnectionTimeoutMilliseconds(5000);
+ regCenter = new ZookeeperRegistryCenter(configuration);
+ regCenter.init();
+ }
+
+ @Test
+ public void assertDisableWithInvalidPath() {
+ cloudJobDisableListener.event(CuratorCacheListener.Type.NODE_CREATED,
null, new ChildData("/other/test_job", null, "".getBytes()));
+ verify(producerManager, times(0)).unschedule(ArgumentMatchers.any());
+ verify(producerManager, times(0)).reschedule(ArgumentMatchers.any());
+ }
+
+ @Test
+ public void assertDisableWithNoJobNamePath() {
+ cloudJobDisableListener.event(CuratorCacheListener.Type.NODE_CREATED,
null, new ChildData("/state/disable/job", null, "".getBytes()));
+ verify(producerManager, times(0)).unschedule(ArgumentMatchers.any());
+ verify(producerManager, times(0)).reschedule(ArgumentMatchers.any());
+ }
+
+ @Test
+ public void assertDisable() {
+ cloudJobDisableListener.event(CuratorCacheListener.Type.NODE_CREATED,
null, new ChildData("/state/disable/job/job_test", null, "".getBytes()));
+ verify(producerManager).unschedule(eq("job_test"));
+ }
+
+ @Test
+ public void assertEnableWithInvalidPath() {
+ cloudJobDisableListener.event(CuratorCacheListener.Type.NODE_DELETED,
null, new ChildData("/other/test_job", null, "".getBytes()));
+ verify(producerManager, times(0)).unschedule(ArgumentMatchers.any());
+ verify(producerManager, times(0)).reschedule(ArgumentMatchers.any());
+ }
+
+ @Test
+ public void assertEnableWithNoJobNamePath() {
+ cloudJobDisableListener.event(CuratorCacheListener.Type.NODE_DELETED,
null, new ChildData("/state/disable/job", null, "".getBytes()));
+ verify(producerManager, times(0)).unschedule(ArgumentMatchers.any());
+ verify(producerManager, times(0)).reschedule(ArgumentMatchers.any());
+ }
+
+ @Test
+ public void assertEnable() {
+ cloudJobDisableListener.event(CuratorCacheListener.Type.NODE_DELETED,
null, new ChildData("/state/disable/job/job_test", null, "".getBytes()));
+ verify(producerManager).reschedule(eq("job_test"));
+ }
+
+ @Test
+ public void assertStart() {
+ cloudJobDisableListener.start();
+ }
+
+ @Test
+ public void assertStop() {
+ regCenter.addCacheData("/state/disable/job");
+ ReflectionUtils.setFieldValue(cloudJobDisableListener, "regCenter",
regCenter);
+ cloudJobDisableListener.stop();
+ }
+}