This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere-elasticjob.git


The following commit(s) were added to refs/heads/master by this push:
     new 78e05b7  Add cloud job disable listener (#1310)
78e05b7 is described below

commit 78e05b7ec7434881e9b71d96c5d7cd8643b71cac
Author: Haoran Meng <[email protected]>
AuthorDate: Fri Jul 31 17:36:24 2020 +0800

    Add cloud job disable listener (#1310)
    
    * Add cloud job disable listener
    
    * Revise java doc
---
 .../cloud/scheduler/mesos/SchedulerService.java    |   6 ++
 .../state/disable/job/CloudJobDisableListener.java |  85 +++++++++++++++
 .../scheduler/mesos/SchedulerServiceTest.java      |   6 +-
 .../disable/job/CloudJobDisableListenerTest.java   | 119 +++++++++++++++++++++
 4 files changed, 215 insertions(+), 1 deletion(-)

diff --git 
a/elasticjob-cloud/elasticjob-cloud-scheduler/src/main/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/mesos/SchedulerService.java
 
b/elasticjob-cloud/elasticjob-cloud-scheduler/src/main/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/mesos/SchedulerService.java
index b33c41a..2cf1092 100755
--- 
a/elasticjob-cloud/elasticjob-cloud-scheduler/src/main/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/mesos/SchedulerService.java
+++ 
b/elasticjob-cloud/elasticjob-cloud-scheduler/src/main/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/mesos/SchedulerService.java
@@ -31,6 +31,7 @@ import 
org.apache.shardingsphere.elasticjob.cloud.scheduler.env.BootstrapEnviron
 import 
org.apache.shardingsphere.elasticjob.cloud.scheduler.env.MesosConfiguration;
 import 
org.apache.shardingsphere.elasticjob.cloud.scheduler.ha.FrameworkIDService;
 import 
org.apache.shardingsphere.elasticjob.cloud.scheduler.producer.ProducerManager;
+import 
org.apache.shardingsphere.elasticjob.cloud.scheduler.state.disable.job.CloudJobDisableListener;
 import 
org.apache.shardingsphere.elasticjob.cloud.scheduler.statistics.StatisticManager;
 import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
 import org.apache.shardingsphere.elasticjob.tracing.JobEventBus;
@@ -63,6 +64,8 @@ public final class SchedulerService {
     
     private final ReconcileService reconcileService;
     
+    private final CloudJobDisableListener cloudJobDisableListener;
+    
     public SchedulerService(final CoordinatorRegistryCenter regCenter) {
         env = BootstrapEnvironment.getINSTANCE();
         facadeService = new FacadeService(regCenter);
@@ -72,6 +75,7 @@ public final class SchedulerService {
         schedulerDriver = getSchedulerDriver(taskScheduler, jobEventBus, new 
FrameworkIDService(regCenter));
         producerManager = new ProducerManager(schedulerDriver, regCenter);
         cloudJobConfigurationListener = new 
CloudJobConfigurationListener(regCenter, producerManager);
+        cloudJobDisableListener = new CloudJobDisableListener(regCenter, 
producerManager);
         taskLaunchScheduledService = new 
TaskLaunchScheduledService(schedulerDriver, taskScheduler, facadeService, 
jobEventBus);
         reconcileService = new ReconcileService(schedulerDriver, 
facadeService);
         consoleBootstrap = new ConsoleBootstrap(regCenter, 
env.getRestfulServerConfiguration(), producerManager, reconcileService);
@@ -116,6 +120,7 @@ public final class SchedulerService {
         producerManager.startup();
         statisticManager.startup();
         cloudJobConfigurationListener.start();
+        cloudJobDisableListener.start();
         taskLaunchScheduledService.startAsync();
         consoleBootstrap.start();
         schedulerDriver.start();
@@ -131,6 +136,7 @@ public final class SchedulerService {
         consoleBootstrap.stop();
         taskLaunchScheduledService.stopAsync();
         cloudJobConfigurationListener.stop();
+        cloudJobDisableListener.stop();
         statisticManager.shutdown();
         producerManager.shutdown();
         schedulerDriver.stop(true);
diff --git 
a/elasticjob-cloud/elasticjob-cloud-scheduler/src/main/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/state/disable/job/CloudJobDisableListener.java
 
b/elasticjob-cloud/elasticjob-cloud-scheduler/src/main/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/state/disable/job/CloudJobDisableListener.java
new file mode 100644
index 0000000..c5800bc
--- /dev/null
+++ 
b/elasticjob-cloud/elasticjob-cloud-scheduler/src/main/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/state/disable/job/CloudJobDisableListener.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.cloud.scheduler.state.disable.job;
+
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.CuratorCache;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
+import 
org.apache.shardingsphere.elasticjob.cloud.scheduler.producer.ProducerManager;
+import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
+
+import java.util.Objects;
+import java.util.concurrent.Executors;
+
+/**
+ * Cloud job disable listener.
+ */
+public final class CloudJobDisableListener implements CuratorCacheListener {
+    
+    private final CoordinatorRegistryCenter regCenter;
+    
+    private final ProducerManager producerManager;
+    
+    public CloudJobDisableListener(final CoordinatorRegistryCenter regCenter, 
final ProducerManager producerManager) {
+        this.regCenter = regCenter;
+        this.producerManager = producerManager;
+    }
+    
+    @Override
+    public void event(final Type type, final ChildData oldData, final 
ChildData data) {
+        String path = data.getPath();
+        if (Type.NODE_CREATED == type && isJobDisableNode(path)) {
+            String jobName = path.substring(DisableJobNode.ROOT.length() + 1);
+            if (Objects.nonNull(jobName)) {
+                producerManager.unschedule(jobName);
+            }
+        } else if (Type.NODE_DELETED == type && isJobDisableNode(path)) {
+            String jobName = path.substring(DisableJobNode.ROOT.length() + 1);
+            if (Objects.nonNull(jobName)) {
+                producerManager.reschedule(jobName);
+            }
+        }
+    }
+    
+    private boolean isJobDisableNode(final String path) {
+        return path.startsWith(DisableJobNode.ROOT) && path.length() > 
DisableJobNode.ROOT.length();
+    }
+    
+    /**
+     * Start the listener service of the cloud job service.
+     */
+    public void start() {
+        getCache().listenable().addListener(this, 
Executors.newSingleThreadExecutor());
+    }
+    
+    /**
+     * Stop the listener service of the cloud job service.
+     */
+    public void stop() {
+        getCache().listenable().removeListener(this);
+    }
+    
+    private CuratorCache getCache() {
+        CuratorCache result = (CuratorCache) 
regCenter.getRawCache(DisableJobNode.ROOT);
+        if (null != result) {
+            return result;
+        }
+        regCenter.addCacheData(DisableJobNode.ROOT);
+        return (CuratorCache) regCenter.getRawCache(DisableJobNode.ROOT);
+    }
+}
diff --git 
a/elasticjob-cloud/elasticjob-cloud-scheduler/src/test/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/mesos/SchedulerServiceTest.java
 
b/elasticjob-cloud/elasticjob-cloud-scheduler/src/test/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/mesos/SchedulerServiceTest.java
index edf2a82..345a578 100755
--- 
a/elasticjob-cloud/elasticjob-cloud-scheduler/src/test/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/mesos/SchedulerServiceTest.java
+++ 
b/elasticjob-cloud/elasticjob-cloud-scheduler/src/test/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/mesos/SchedulerServiceTest.java
@@ -24,6 +24,7 @@ import 
org.apache.shardingsphere.elasticjob.cloud.scheduler.config.job.CloudJobC
 import 
org.apache.shardingsphere.elasticjob.cloud.scheduler.env.BootstrapEnvironment;
 import 
org.apache.shardingsphere.elasticjob.cloud.scheduler.env.FrameworkConfiguration;
 import 
org.apache.shardingsphere.elasticjob.cloud.scheduler.producer.ProducerManager;
+import 
org.apache.shardingsphere.elasticjob.cloud.scheduler.state.disable.job.CloudJobDisableListener;
 import 
org.apache.shardingsphere.elasticjob.cloud.scheduler.statistics.StatisticManager;
 import org.junit.Before;
 import org.junit.Test;
@@ -67,13 +68,16 @@ public class SchedulerServiceTest {
     @Mock
     private ReconcileService reconcileService;
     
+    @Mock
+    private CloudJobDisableListener cloudJobDisableListener;
+    
     private SchedulerService schedulerService;
     
     @Before
     public void setUp() {
         schedulerService = new SchedulerService(env, facadeService, 
schedulerDriver,
                 producerManager, statisticManager, 
cloudJobConfigurationListener,
-                taskLaunchScheduledService, consoleBootstrap, 
reconcileService);
+                taskLaunchScheduledService, consoleBootstrap, 
reconcileService, cloudJobDisableListener);
     }
     
     @Test
diff --git 
a/elasticjob-cloud/elasticjob-cloud-scheduler/src/test/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/state/disable/job/CloudJobDisableListenerTest.java
 
b/elasticjob-cloud/elasticjob-cloud-scheduler/src/test/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/state/disable/job/CloudJobDisableListenerTest.java
new file mode 100644
index 0000000..5169a14
--- /dev/null
+++ 
b/elasticjob-cloud/elasticjob-cloud-scheduler/src/test/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/state/disable/job/CloudJobDisableListenerTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.cloud.scheduler.state.disable.job;
+
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
+import org.apache.shardingsphere.elasticjob.cloud.ReflectionUtils;
+import 
org.apache.shardingsphere.elasticjob.cloud.scheduler.config.job.CloudJobConfigurationListenerTest;
+import 
org.apache.shardingsphere.elasticjob.cloud.scheduler.fixture.EmbedTestingServer;
+import 
org.apache.shardingsphere.elasticjob.cloud.scheduler.producer.ProducerManager;
+import 
org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
+import 
org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentMatchers;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+@RunWith(MockitoJUnitRunner.class)
+public final class CloudJobDisableListenerTest {
+    
+    private static ZookeeperRegistryCenter regCenter;
+    
+    @Mock
+    private ProducerManager producerManager;
+    
+    @InjectMocks
+    private CloudJobDisableListener cloudJobDisableListener;
+    
+    @Before
+    public void setUp() {
+        ReflectionUtils.setFieldValue(cloudJobDisableListener, 
"producerManager", producerManager);
+        initRegistryCenter();
+        ReflectionUtils.setFieldValue(cloudJobDisableListener, "regCenter", 
regCenter);
+    }
+    
+    private void initRegistryCenter() {
+        EmbedTestingServer.start();
+        ZookeeperConfiguration configuration = new 
ZookeeperConfiguration(EmbedTestingServer.getConnectionString(), 
CloudJobConfigurationListenerTest.class.getName());
+        configuration.setDigest("digest:password");
+        configuration.setSessionTimeoutMilliseconds(5000);
+        configuration.setConnectionTimeoutMilliseconds(5000);
+        regCenter = new ZookeeperRegistryCenter(configuration);
+        regCenter.init();
+    }
+    
+    @Test
+    public void assertDisableWithInvalidPath() {
+        cloudJobDisableListener.event(CuratorCacheListener.Type.NODE_CREATED, 
null, new ChildData("/other/test_job", null, "".getBytes()));
+        verify(producerManager, times(0)).unschedule(ArgumentMatchers.any());
+        verify(producerManager, times(0)).reschedule(ArgumentMatchers.any());
+    }
+    
+    @Test
+    public void assertDisableWithNoJobNamePath() {
+        cloudJobDisableListener.event(CuratorCacheListener.Type.NODE_CREATED, 
null, new ChildData("/state/disable/job", null, "".getBytes()));
+        verify(producerManager, times(0)).unschedule(ArgumentMatchers.any());
+        verify(producerManager, times(0)).reschedule(ArgumentMatchers.any());
+    }
+    
+    @Test
+    public void assertDisable() {
+        cloudJobDisableListener.event(CuratorCacheListener.Type.NODE_CREATED, 
null, new ChildData("/state/disable/job/job_test", null, "".getBytes()));
+        verify(producerManager).unschedule(eq("job_test"));
+    }
+    
+    @Test
+    public void assertEnableWithInvalidPath() {
+        cloudJobDisableListener.event(CuratorCacheListener.Type.NODE_DELETED, 
null, new ChildData("/other/test_job", null, "".getBytes()));
+        verify(producerManager, times(0)).unschedule(ArgumentMatchers.any());
+        verify(producerManager, times(0)).reschedule(ArgumentMatchers.any());
+    }
+    
+    @Test
+    public void assertEnableWithNoJobNamePath() {
+        cloudJobDisableListener.event(CuratorCacheListener.Type.NODE_DELETED, 
null, new ChildData("/state/disable/job", null, "".getBytes()));
+        verify(producerManager, times(0)).unschedule(ArgumentMatchers.any());
+        verify(producerManager, times(0)).reschedule(ArgumentMatchers.any());
+    }
+    
+    @Test
+    public void assertEnable() {
+        cloudJobDisableListener.event(CuratorCacheListener.Type.NODE_DELETED, 
null, new ChildData("/state/disable/job/job_test", null, "".getBytes()));
+        verify(producerManager).reschedule(eq("job_test"));
+    }
+    
+    @Test
+    public void assertStart() {
+        cloudJobDisableListener.start();
+    }
+    
+    @Test
+    public void assertStop() {
+        regCenter.addCacheData("/state/disable/job");
+        ReflectionUtils.setFieldValue(cloudJobDisableListener, "regCenter", 
regCenter);
+        cloudJobDisableListener.stop();
+    }
+}

Reply via email to