This is an automated email from the ASF dual-hosted git repository. zhangliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/shardingsphere-elasticjob-lite.git
The following commit(s) were added to refs/heads/master by this push: new faa7791 Add SetUpFacade (#874) faa7791 is described below commit faa779183e2aba70c362748f15369bcaab52af76 Author: Liang Zhang <terrym...@163.com> AuthorDate: Wed Jul 1 00:31:57 2020 +0800 Add SetUpFacade (#874) * For checkstyle * Refactor JobScheduler * Refactor ElasticJobExecutor * refactor ConfigurationService.setUpJobConfiguration * Add SetUpFacade * refactor JobScheduler --- .../src/main/resources/bin/start.sh | 2 +- .../elasticjob/lite/api/JobScheduler.java | 27 ++++++---- .../lite/executor/ElasticJobExecutor.java | 2 +- .../lite/internal/config/ConfigurationService.java | 9 ++-- .../internal/schedule/JobShutdownHookPlugin.java | 6 +-- .../lite/internal/schedule/SchedulerFacade.java | 61 ---------------------- .../SetUpFacade.java} | 61 ++++------------------ .../elasticjob/lite/api/JobSchedulerTest.java | 8 ++- .../internal/config/ConfigurationServiceTest.java | 23 +++++--- .../lite/internal/listener/JobListenerTest.java | 4 +- .../internal/schedule/SchedulerFacadeTest.java | 49 +---------------- .../SetUpFacadeTest.java} | 61 +++++++--------------- .../statistics/ServerStatisticsAPIImpl.java | 4 +- 13 files changed, 84 insertions(+), 233 deletions(-) diff --git a/elastic-job-lite-console/src/main/resources/bin/start.sh b/elastic-job-lite-console/src/main/resources/bin/start.sh index 49ee8bf..d36f879 100644 --- a/elastic-job-lite-console/src/main/resources/bin/start.sh +++ b/elastic-job-lite-console/src/main/resources/bin/start.sh @@ -31,4 +31,4 @@ DEPLOY_DIR=`pwd` CLASS_PATH=.:${DEPLOY_DIR}/conf:${DEPLOY_DIR}/lib/*:${DEPLOY_DIR}/ext-lib/* CONSOLE_MAIN=org.apache.shardingsphere.elasticjob.lite.console.ConsoleBootstrap -java -classpath ${CLASS_PATH}:. ${CONSOLE_MAIN} $port +java -classpath ${CLASS_PATH}:. ${CONSOLE_MAIN} ${port} diff --git a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/JobScheduler.java b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/JobScheduler.java index 3da4391..519cd63 100644 --- a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/JobScheduler.java +++ b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/api/JobScheduler.java @@ -29,6 +29,7 @@ import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobScheduleCo import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobShutdownHookPlugin; import org.apache.shardingsphere.elasticjob.lite.internal.schedule.LiteJob; import org.apache.shardingsphere.elasticjob.lite.internal.schedule.SchedulerFacade; +import org.apache.shardingsphere.elasticjob.lite.internal.setup.SetUpFacade; import org.apache.shardingsphere.elasticjob.lite.reg.base.CoordinatorRegistryCenter; import org.apache.shardingsphere.elasticjob.lite.tracing.api.TracingConfiguration; import org.quartz.JobBuilder; @@ -67,6 +68,8 @@ public final class JobScheduler { private final TracingConfiguration tracingConfig; + private final SetUpFacade setUpFacade; + private final SchedulerFacade schedulerFacade; public JobScheduler(final CoordinatorRegistryCenter regCenter, final ElasticJob elasticJob, final JobConfiguration jobConfig, final ElasticJobListener... elasticJobListeners) { @@ -77,12 +80,13 @@ public final class JobScheduler { final ElasticJobListener... elasticJobListeners) { this.regCenter = regCenter; this.elasticJob = elasticJob; - this.jobConfig = jobConfig; - JobRegistry.getInstance().addJobInstance(jobConfig.getJobName(), new JobInstance()); this.elasticJobListeners = Arrays.asList(elasticJobListeners); this.tracingConfig = tracingConfig; + JobRegistry.getInstance().addJobInstance(jobConfig.getJobName(), new JobInstance()); + setUpFacade = new SetUpFacade(regCenter, jobConfig.getJobName(), this.elasticJobListeners); + schedulerFacade = new SchedulerFacade(regCenter, jobConfig.getJobName()); + this.jobConfig = setUpFacade.setUpJobConfiguration(null == elasticJob ? ScriptJob.class.getName() : elasticJob.getClass().getName(), jobConfig); setGuaranteeServiceForElasticJobListeners(regCenter, this.elasticJobListeners); - schedulerFacade = new SchedulerFacade(regCenter, jobConfig.getJobName(), this.elasticJobListeners); } private void setGuaranteeServiceForElasticJobListeners(final CoordinatorRegistryCenter regCenter, final List<ElasticJobListener> elasticJobListeners) { @@ -98,12 +102,15 @@ public final class JobScheduler { * Initialize job. */ public void init() { - JobConfiguration jobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(null == elasticJob ? ScriptJob.class.getName() : elasticJob.getClass().getName(), jobConfig); - JobRegistry.getInstance().setCurrentShardingTotalCount(jobConfigFromRegCenter.getJobName(), jobConfigFromRegCenter.getShardingTotalCount()); - JobScheduleController jobScheduleController = new JobScheduleController(createScheduler(), createJobDetail(jobConfigFromRegCenter), jobConfigFromRegCenter.getJobName()); - JobRegistry.getInstance().registerJob(jobConfigFromRegCenter.getJobName(), jobScheduleController, regCenter); - schedulerFacade.registerStartUpInfo(!jobConfigFromRegCenter.isDisabled()); - jobScheduleController.scheduleJob(jobConfigFromRegCenter.getCron()); + JobScheduleController jobScheduleController = new JobScheduleController(createScheduler(), createJobDetail(), jobConfig.getJobName()); + JobRegistry.getInstance().registerJob(jobConfig.getJobName(), jobScheduleController, regCenter); + registerStartUpInfo(); + jobScheduleController.scheduleJob(jobConfig.getCron()); + } + + private void registerStartUpInfo() { + JobRegistry.getInstance().setCurrentShardingTotalCount(jobConfig.getJobName(), jobConfig.getShardingTotalCount()); + setUpFacade.registerStartUpInfo(!jobConfig.isDisabled()); } private Scheduler createScheduler() { @@ -130,7 +137,7 @@ public final class JobScheduler { return result; } - private JobDetail createJobDetail(final JobConfiguration jobConfig) { + private JobDetail createJobDetail() { JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(jobConfig.getJobName()).build(); result.getJobDataMap().put(REG_CENTER_DATA_MAP_KEY, regCenter); result.getJobDataMap().put(JOB_CONFIG_DATA_MAP_KEY, jobConfig); diff --git a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/ElasticJobExecutor.java b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/ElasticJobExecutor.java index f872fb2..41ec11e 100644 --- a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/ElasticJobExecutor.java +++ b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/executor/ElasticJobExecutor.java @@ -170,7 +170,7 @@ public final class ElasticJobExecutor { process(shardingContexts, item, jobExecutionEvent); return; } - final CountDownLatch latch = new CountDownLatch(items.size()); + CountDownLatch latch = new CountDownLatch(items.size()); for (int each : items) { JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(IpUtils.getHostName(), IpUtils.getIp(), shardingContexts.getTaskId(), jobConfig.getJobName(), executionSource, each); if (executorService.isShutdown()) { diff --git a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/config/ConfigurationService.java b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/config/ConfigurationService.java index 6afbc29..a616819 100644 --- a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/config/ConfigurationService.java +++ b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/config/ConfigurationService.java @@ -61,17 +61,20 @@ public final class ConfigurationService { } /** - * Persist job configuration. + * Set up job configuration. * * @param jobClassName job class name - * @param jobConfig job configuration + * @param jobConfig job configuration to be updated + * @return accepted job configuration */ - public void persist(final String jobClassName, final JobConfiguration jobConfig) { + public JobConfiguration setUpJobConfiguration(final String jobClassName, final JobConfiguration jobConfig) { checkConflictJob(jobClassName, jobConfig); if (!jobNodeStorage.isJobNodeExisted(ConfigurationNode.ROOT) || jobConfig.isOverwrite()) { jobNodeStorage.replaceJobNode(ConfigurationNode.ROOT, YamlEngine.marshal(YamlJobConfiguration.fromJobConfiguration(jobConfig))); jobNodeStorage.replaceJobRootNode(jobClassName); + return jobConfig; } + return load(false); } private void checkConflictJob(final String newJobClassName, final JobConfiguration jobConfig) { diff --git a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/JobShutdownHookPlugin.java b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/JobShutdownHookPlugin.java index 3c59674..15adf56 100644 --- a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/JobShutdownHookPlugin.java +++ b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/JobShutdownHookPlugin.java @@ -39,13 +39,13 @@ public final class JobShutdownHookPlugin implements SchedulerPlugin { @Setter private boolean cleanShutdown = true; - + @Override public void initialize(final String name, final Scheduler scheduler, final ClassLoadHelper classLoadHelper) throws SchedulerException { jobName = scheduler.getSchedulerName(); registerShutdownHook(); } - + /** * <p> * Called when the associated <code>Scheduler</code> is started, in order @@ -70,7 +70,7 @@ public final class JobShutdownHookPlugin implements SchedulerPlugin { } new InstanceService(regCenter, jobName).removeInstance(); } - + private void registerShutdownHook() { log.info("Registering Quartz shutdown hook. {}", jobName); Thread t = new Thread("Quartz Shutdown-Hook " + jobName) { diff --git a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/SchedulerFacade.java b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/SchedulerFacade.java index 2c31a6f..c297e18 100644 --- a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/SchedulerFacade.java +++ b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/SchedulerFacade.java @@ -17,21 +17,13 @@ package org.apache.shardingsphere.elasticjob.lite.internal.schedule; -import org.apache.shardingsphere.elasticjob.lite.api.listener.ElasticJobListener; -import org.apache.shardingsphere.elasticjob.lite.config.JobConfiguration; -import org.apache.shardingsphere.elasticjob.lite.internal.config.ConfigurationService; import org.apache.shardingsphere.elasticjob.lite.internal.election.LeaderService; -import org.apache.shardingsphere.elasticjob.lite.internal.instance.InstanceService; -import org.apache.shardingsphere.elasticjob.lite.internal.listener.ListenerManager; import org.apache.shardingsphere.elasticjob.lite.internal.monitor.MonitorService; import org.apache.shardingsphere.elasticjob.lite.internal.reconcile.ReconcileService; -import org.apache.shardingsphere.elasticjob.lite.internal.server.ServerService; import org.apache.shardingsphere.elasticjob.lite.internal.sharding.ExecutionService; import org.apache.shardingsphere.elasticjob.lite.internal.sharding.ShardingService; import org.apache.shardingsphere.elasticjob.lite.reg.base.CoordinatorRegistryCenter; -import java.util.List; - /** * Scheduler facade. */ @@ -39,14 +31,8 @@ public final class SchedulerFacade { private final String jobName; - private final ConfigurationService configService; - private final LeaderService leaderService; - private final ServerService serverService; - - private final InstanceService instanceService; - private final ShardingService shardingService; private final ExecutionService executionService; @@ -55,33 +41,15 @@ public final class SchedulerFacade { private final ReconcileService reconcileService; - private ListenerManager listenerManager; - public SchedulerFacade(final CoordinatorRegistryCenter regCenter, final String jobName) { this.jobName = jobName; - configService = new ConfigurationService(regCenter, jobName); leaderService = new LeaderService(regCenter, jobName); - serverService = new ServerService(regCenter, jobName); - instanceService = new InstanceService(regCenter, jobName); shardingService = new ShardingService(regCenter, jobName); executionService = new ExecutionService(regCenter, jobName); monitorService = new MonitorService(regCenter, jobName); reconcileService = new ReconcileService(regCenter, jobName); } - public SchedulerFacade(final CoordinatorRegistryCenter regCenter, final String jobName, final List<ElasticJobListener> elasticJobListeners) { - this.jobName = jobName; - configService = new ConfigurationService(regCenter, jobName); - leaderService = new LeaderService(regCenter, jobName); - serverService = new ServerService(regCenter, jobName); - instanceService = new InstanceService(regCenter, jobName); - shardingService = new ShardingService(regCenter, jobName); - executionService = new ExecutionService(regCenter, jobName); - monitorService = new MonitorService(regCenter, jobName); - reconcileService = new ReconcileService(regCenter, jobName); - listenerManager = new ListenerManager(regCenter, jobName, elasticJobListeners); - } - /** * Create job trigger listener. * @@ -92,35 +60,6 @@ public final class SchedulerFacade { } /** - * Update job configuration. - * - * @param jobClassName job class name - * @param jobConfig job configuration to be updated - * @return updated job configuration - */ - public JobConfiguration updateJobConfiguration(final String jobClassName, final JobConfiguration jobConfig) { - configService.persist(jobClassName, jobConfig); - return configService.load(false); - } - - /** - * Register start up info. - * - * @param enabled enable job on startup - */ - public void registerStartUpInfo(final boolean enabled) { - listenerManager.startAllListeners(); - leaderService.electLeader(); - serverService.persistOnline(enabled); - instanceService.persistOnline(); - shardingService.setReshardingFlag(); - monitorService.listen(); - if (!reconcileService.isRunning()) { - reconcileService.startAsync(); - } - } - - /** * Shutdown instance. */ public void shutdownInstance() { diff --git a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/SchedulerFacade.java b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/setup/SetUpFacade.java similarity index 61% copy from elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/SchedulerFacade.java copy to elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/setup/SetUpFacade.java index 2c31a6f..401a515 100644 --- a/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/SchedulerFacade.java +++ b/elastic-job-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/setup/SetUpFacade.java @@ -7,7 +7,7 @@ * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.shardingsphere.elasticjob.lite.internal.schedule; +package org.apache.shardingsphere.elasticjob.lite.internal.setup; import org.apache.shardingsphere.elasticjob.lite.api.listener.ElasticJobListener; import org.apache.shardingsphere.elasticjob.lite.config.JobConfiguration; @@ -26,18 +26,15 @@ import org.apache.shardingsphere.elasticjob.lite.internal.listener.ListenerManag import org.apache.shardingsphere.elasticjob.lite.internal.monitor.MonitorService; import org.apache.shardingsphere.elasticjob.lite.internal.reconcile.ReconcileService; import org.apache.shardingsphere.elasticjob.lite.internal.server.ServerService; -import org.apache.shardingsphere.elasticjob.lite.internal.sharding.ExecutionService; import org.apache.shardingsphere.elasticjob.lite.internal.sharding.ShardingService; import org.apache.shardingsphere.elasticjob.lite.reg.base.CoordinatorRegistryCenter; import java.util.List; /** - * Scheduler facade. + * Set up facade. */ -public final class SchedulerFacade { - - private final String jobName; +public final class SetUpFacade { private final ConfigurationService configService; @@ -49,58 +46,32 @@ public final class SchedulerFacade { private final ShardingService shardingService; - private final ExecutionService executionService; - private final MonitorService monitorService; private final ReconcileService reconcileService; private ListenerManager listenerManager; - public SchedulerFacade(final CoordinatorRegistryCenter regCenter, final String jobName) { - this.jobName = jobName; - configService = new ConfigurationService(regCenter, jobName); - leaderService = new LeaderService(regCenter, jobName); - serverService = new ServerService(regCenter, jobName); - instanceService = new InstanceService(regCenter, jobName); - shardingService = new ShardingService(regCenter, jobName); - executionService = new ExecutionService(regCenter, jobName); - monitorService = new MonitorService(regCenter, jobName); - reconcileService = new ReconcileService(regCenter, jobName); - } - - public SchedulerFacade(final CoordinatorRegistryCenter regCenter, final String jobName, final List<ElasticJobListener> elasticJobListeners) { - this.jobName = jobName; + public SetUpFacade(final CoordinatorRegistryCenter regCenter, final String jobName, final List<ElasticJobListener> elasticJobListeners) { configService = new ConfigurationService(regCenter, jobName); leaderService = new LeaderService(regCenter, jobName); serverService = new ServerService(regCenter, jobName); instanceService = new InstanceService(regCenter, jobName); shardingService = new ShardingService(regCenter, jobName); - executionService = new ExecutionService(regCenter, jobName); monitorService = new MonitorService(regCenter, jobName); reconcileService = new ReconcileService(regCenter, jobName); listenerManager = new ListenerManager(regCenter, jobName, elasticJobListeners); } /** - * Create job trigger listener. - * - * @return job trigger listener - */ - public JobTriggerListener newJobTriggerListener() { - return new JobTriggerListener(executionService, shardingService); - } - - /** - * Update job configuration. + * Set up job configuration. * * @param jobClassName job class name * @param jobConfig job configuration to be updated - * @return updated job configuration + * @return accepted job configuration */ - public JobConfiguration updateJobConfiguration(final String jobClassName, final JobConfiguration jobConfig) { - configService.persist(jobClassName, jobConfig); - return configService.load(false); + public JobConfiguration setUpJobConfiguration(final String jobClassName, final JobConfiguration jobConfig) { + return configService.setUpJobConfiguration(jobClassName, jobConfig); } /** @@ -119,18 +90,4 @@ public final class SchedulerFacade { reconcileService.startAsync(); } } - - /** - * Shutdown instance. - */ - public void shutdownInstance() { - if (leaderService.isLeader()) { - leaderService.removeLeader(); - } - monitorService.close(); - if (reconcileService.isRunning()) { - reconcileService.stopAsync(); - } - JobRegistry.getInstance().shutdown(jobName); - } } diff --git a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/api/JobSchedulerTest.java b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/api/JobSchedulerTest.java index 2f3790f..48bfb7e 100644 --- a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/api/JobSchedulerTest.java +++ b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/api/JobSchedulerTest.java @@ -23,6 +23,7 @@ import org.apache.shardingsphere.elasticjob.lite.handler.sharding.JobInstance; import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobRegistry; import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobTriggerListener; import org.apache.shardingsphere.elasticjob.lite.internal.schedule.SchedulerFacade; +import org.apache.shardingsphere.elasticjob.lite.internal.setup.SetUpFacade; import org.apache.shardingsphere.elasticjob.lite.reg.base.CoordinatorRegistryCenter; import org.apache.shardingsphere.elasticjob.lite.util.ReflectionUtils; import org.junit.After; @@ -47,6 +48,9 @@ public final class JobSchedulerTest { private CoordinatorRegistryCenter regCenter; @Mock + private SetUpFacade setUpFacade; + + @Mock private SchedulerFacade schedulerFacade; private JobConfiguration jobConfig; @@ -59,15 +63,15 @@ public final class JobSchedulerTest { jobConfig = JobConfiguration.newBuilder("test_job", JobType.SIMPLE, "* * 0/10 * * ? 2050", 3).build(); jobScheduler = new JobScheduler(regCenter, new TestSimpleJob(), jobConfig); ReflectionUtils.setFieldValue(jobScheduler, "regCenter", regCenter); + ReflectionUtils.setFieldValue(jobScheduler, "setUpFacade", setUpFacade); ReflectionUtils.setFieldValue(jobScheduler, "schedulerFacade", schedulerFacade); } @Test public void assertInit() throws SchedulerException { - when(schedulerFacade.updateJobConfiguration(TestSimpleJob.class.getName(), jobConfig)).thenReturn(jobConfig); when(schedulerFacade.newJobTriggerListener()).thenReturn(new JobTriggerListener(null, null)); jobScheduler.init(); - verify(schedulerFacade).registerStartUpInfo(true); + verify(setUpFacade).registerStartUpInfo(true); Scheduler scheduler = (Scheduler) ReflectionUtils.getFieldValue(JobRegistry.getInstance().getJobScheduleController("test_job"), "scheduler"); assertThat(scheduler.getListenerManager().getTriggerListeners().get(0), instanceOf(JobTriggerListener.class)); assertTrue(scheduler.isStarted()); diff --git a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/config/ConfigurationServiceTest.java b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/config/ConfigurationServiceTest.java index 443de8c..ad5743a 100644 --- a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/config/ConfigurationServiceTest.java +++ b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/config/ConfigurationServiceTest.java @@ -34,6 +34,7 @@ import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; import static org.junit.Assert.assertThat; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -80,11 +81,11 @@ public final class ConfigurationServiceTest { } @Test(expected = JobConfigurationException.class) - public void assertPersistJobConfigurationForJobConflict() { + public void assertSetUpJobConfigurationJobConfigurationForJobConflict() { when(jobNodeStorage.isJobRootNodeExisted()).thenReturn(true); when(jobNodeStorage.getJobRootNodeData()).thenReturn("org.apache.shardingsphere.elasticjob.lite.api.script.api.ScriptJob"); try { - configService.persist(null, JobConfigurationUtil.createSimpleJobConfiguration()); + configService.setUpJobConfiguration(null, JobConfigurationUtil.createSimpleJobConfiguration()); } finally { verify(jobNodeStorage).isJobRootNodeExisted(); verify(jobNodeStorage).getJobRootNodeData(); @@ -92,21 +93,31 @@ public final class ConfigurationServiceTest { } @Test - public void assertPersistNewJobConfiguration() { + public void assertSetUpJobConfigurationNewJobConfiguration() { JobConfiguration jobConfig = JobConfigurationUtil.createSimpleJobConfiguration(); - configService.persist(TestSimpleJob.class.getName(), jobConfig); + assertThat(configService.setUpJobConfiguration(TestSimpleJob.class.getName(), jobConfig), is(jobConfig)); verify(jobNodeStorage).replaceJobNode("config", YamlEngine.marshal(YamlJobConfiguration.fromJobConfiguration(jobConfig))); } @Test - public void assertPersistExistedJobConfiguration() { + public void assertSetUpJobConfigurationExistedJobConfigurationAndOverwrite() { when(jobNodeStorage.isJobNodeExisted(ConfigurationNode.ROOT)).thenReturn(true); JobConfiguration jobConfig = JobConfigurationUtil.createSimpleJobConfiguration(true); - configService.persist(TestSimpleJob.class.getName(), jobConfig); + assertThat(configService.setUpJobConfiguration(TestSimpleJob.class.getName(), jobConfig), is(jobConfig)); verify(jobNodeStorage).replaceJobNode("config", YamlEngine.marshal(YamlJobConfiguration.fromJobConfiguration(jobConfig))); } @Test + public void assertSetUpJobConfigurationExistedJobConfigurationAndNotOverwrite() { + when(jobNodeStorage.isJobNodeExisted(ConfigurationNode.ROOT)).thenReturn(true); + when(jobNodeStorage.getJobNodeDataDirectly(ConfigurationNode.ROOT)).thenReturn( + YamlEngine.marshal(YamlJobConfiguration.fromJobConfiguration(JobConfigurationUtil.createSimpleJobConfiguration()))); + JobConfiguration jobConfig = JobConfigurationUtil.createSimpleJobConfiguration(false); + JobConfiguration actual = configService.setUpJobConfiguration(TestSimpleJob.class.getName(), jobConfig); + assertThat(actual, not(jobConfig)); + } + + @Test public void assertIsMaxTimeDiffSecondsTolerableWithDefaultValue() throws JobExecutionEnvironmentException { when(jobNodeStorage.getJobNodeData(ConfigurationNode.ROOT)).thenReturn(LiteYamlConstants.getJobYaml(-1)); configService.checkMaxTimeDiffSecondsTolerable(); diff --git a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/JobListenerTest.java b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/JobListenerTest.java index 23d8d16..3987b42 100644 --- a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/JobListenerTest.java +++ b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/listener/JobListenerTest.java @@ -49,14 +49,14 @@ public final class JobListenerTest { } @Test - public void assertChildEventWhenEventDataIsEmpty() throws Exception { + public void assertChildEventWhenEventDataIsEmpty() { when(event.getData()).thenReturn(null); fooJobListener.childEvent(null, event); verify(list, times(0)).clear(); } @Test - public void assertChildEventSuccess() throws Exception { + public void assertChildEventSuccess() { when(event.getData()).thenReturn(new ChildData("/test_job", null, null)); fooJobListener.childEvent(null, event); verify(list).clear(); diff --git a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/SchedulerFacadeTest.java b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/SchedulerFacadeTest.java index d182768..efb6adf 100644 --- a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/SchedulerFacadeTest.java +++ b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/SchedulerFacadeTest.java @@ -17,18 +17,10 @@ package org.apache.shardingsphere.elasticjob.lite.internal.schedule; -import org.apache.shardingsphere.elasticjob.lite.api.JobType; -import org.apache.shardingsphere.elasticjob.lite.config.JobConfiguration; -import org.apache.shardingsphere.elasticjob.lite.executor.type.impl.DataflowJobExecutor; -import org.apache.shardingsphere.elasticjob.lite.fixture.TestDataflowJob; import org.apache.shardingsphere.elasticjob.lite.handler.sharding.JobInstance; -import org.apache.shardingsphere.elasticjob.lite.internal.config.ConfigurationService; import org.apache.shardingsphere.elasticjob.lite.internal.election.LeaderService; -import org.apache.shardingsphere.elasticjob.lite.internal.instance.InstanceService; -import org.apache.shardingsphere.elasticjob.lite.internal.listener.ListenerManager; import org.apache.shardingsphere.elasticjob.lite.internal.monitor.MonitorService; import org.apache.shardingsphere.elasticjob.lite.internal.reconcile.ReconcileService; -import org.apache.shardingsphere.elasticjob.lite.internal.server.ServerService; import org.apache.shardingsphere.elasticjob.lite.internal.sharding.ShardingService; import org.apache.shardingsphere.elasticjob.lite.reg.base.CoordinatorRegistryCenter; import org.apache.shardingsphere.elasticjob.lite.util.ReflectionUtils; @@ -38,10 +30,6 @@ import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; -import java.util.Collections; - -import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.assertThat; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -56,18 +44,9 @@ public final class SchedulerFacadeTest { private JobScheduleController jobScheduleController; @Mock - private ConfigurationService configService; - - @Mock private LeaderService leaderService; @Mock - private ServerService serverService; - - @Mock - private InstanceService instanceService; - - @Mock private ShardingService shardingService; @Mock @@ -76,42 +55,16 @@ public final class SchedulerFacadeTest { @Mock private ReconcileService reconcileService; - @Mock - private ListenerManager listenerManager; - private SchedulerFacade schedulerFacade; @Before public void setUp() { JobRegistry.getInstance().addJobInstance("test_job", new JobInstance("127.0.0.1@-@0")); - schedulerFacade = new SchedulerFacade(null, "test_job", Collections.emptyList()); - ReflectionUtils.setFieldValue(schedulerFacade, "configService", configService); + schedulerFacade = new SchedulerFacade(null, "test_job"); ReflectionUtils.setFieldValue(schedulerFacade, "leaderService", leaderService); - ReflectionUtils.setFieldValue(schedulerFacade, "serverService", serverService); - ReflectionUtils.setFieldValue(schedulerFacade, "instanceService", instanceService); ReflectionUtils.setFieldValue(schedulerFacade, "shardingService", shardingService); ReflectionUtils.setFieldValue(schedulerFacade, "monitorService", monitorService); ReflectionUtils.setFieldValue(schedulerFacade, "reconcileService", reconcileService); - ReflectionUtils.setFieldValue(schedulerFacade, "listenerManager", listenerManager); - } - - @Test - public void assertUpdateJobConfiguration() { - JobConfiguration jobConfig = JobConfiguration.newBuilder( - "test_job", JobType.DATAFLOW, "0/1 * * * * ?", 3).setProperty(DataflowJobExecutor.STREAM_PROCESS_KEY, Boolean.TRUE.toString()).build(); - when(configService.load(false)).thenReturn(jobConfig); - assertThat(schedulerFacade.updateJobConfiguration(TestDataflowJob.class.getName(), jobConfig), is(jobConfig)); - verify(configService).persist(TestDataflowJob.class.getName(), jobConfig); - } - - @Test - public void assertRegisterStartUpInfo() { - schedulerFacade.registerStartUpInfo(true); - verify(listenerManager).startAllListeners(); - verify(leaderService).electLeader(); - verify(serverService).persistOnline(true); - verify(shardingService).setReshardingFlag(); - verify(monitorService).listen(); } @Test diff --git a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/SchedulerFacadeTest.java b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/setup/SetUpFacadeTest.java similarity index 61% copy from elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/SchedulerFacadeTest.java copy to elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/setup/SetUpFacadeTest.java index d182768..56cdba6 100644 --- a/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/SchedulerFacadeTest.java +++ b/elastic-job-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/setup/SetUpFacadeTest.java @@ -7,7 +7,7 @@ * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.shardingsphere.elasticjob.lite.internal.schedule; +package org.apache.shardingsphere.elasticjob.lite.internal.setup; import org.apache.shardingsphere.elasticjob.lite.api.JobType; import org.apache.shardingsphere.elasticjob.lite.config.JobConfiguration; @@ -28,6 +28,8 @@ import org.apache.shardingsphere.elasticjob.lite.internal.instance.InstanceServi import org.apache.shardingsphere.elasticjob.lite.internal.listener.ListenerManager; import org.apache.shardingsphere.elasticjob.lite.internal.monitor.MonitorService; import org.apache.shardingsphere.elasticjob.lite.internal.reconcile.ReconcileService; +import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobRegistry; +import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobScheduleController; import org.apache.shardingsphere.elasticjob.lite.internal.server.ServerService; import org.apache.shardingsphere.elasticjob.lite.internal.sharding.ShardingService; import org.apache.shardingsphere.elasticjob.lite.reg.base.CoordinatorRegistryCenter; @@ -42,12 +44,11 @@ import java.util.Collections; import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertThat; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) -public final class SchedulerFacadeTest { +public final class SetUpFacadeTest { @Mock private CoordinatorRegistryCenter regCenter; @@ -79,60 +80,38 @@ public final class SchedulerFacadeTest { @Mock private ListenerManager listenerManager; - private SchedulerFacade schedulerFacade; + private SetUpFacade setUpFacade; @Before public void setUp() { JobRegistry.getInstance().addJobInstance("test_job", new JobInstance("127.0.0.1@-@0")); - schedulerFacade = new SchedulerFacade(null, "test_job", Collections.emptyList()); - ReflectionUtils.setFieldValue(schedulerFacade, "configService", configService); - ReflectionUtils.setFieldValue(schedulerFacade, "leaderService", leaderService); - ReflectionUtils.setFieldValue(schedulerFacade, "serverService", serverService); - ReflectionUtils.setFieldValue(schedulerFacade, "instanceService", instanceService); - ReflectionUtils.setFieldValue(schedulerFacade, "shardingService", shardingService); - ReflectionUtils.setFieldValue(schedulerFacade, "monitorService", monitorService); - ReflectionUtils.setFieldValue(schedulerFacade, "reconcileService", reconcileService); - ReflectionUtils.setFieldValue(schedulerFacade, "listenerManager", listenerManager); + setUpFacade = new SetUpFacade(null, "test_job", Collections.emptyList()); + ReflectionUtils.setFieldValue(setUpFacade, "configService", configService); + ReflectionUtils.setFieldValue(setUpFacade, "leaderService", leaderService); + ReflectionUtils.setFieldValue(setUpFacade, "serverService", serverService); + ReflectionUtils.setFieldValue(setUpFacade, "instanceService", instanceService); + ReflectionUtils.setFieldValue(setUpFacade, "shardingService", shardingService); + ReflectionUtils.setFieldValue(setUpFacade, "monitorService", monitorService); + ReflectionUtils.setFieldValue(setUpFacade, "reconcileService", reconcileService); + ReflectionUtils.setFieldValue(setUpFacade, "listenerManager", listenerManager); } @Test - public void assertUpdateJobConfiguration() { + public void assertSetUpJobConfiguration() { JobConfiguration jobConfig = JobConfiguration.newBuilder( "test_job", JobType.DATAFLOW, "0/1 * * * * ?", 3).setProperty(DataflowJobExecutor.STREAM_PROCESS_KEY, Boolean.TRUE.toString()).build(); - when(configService.load(false)).thenReturn(jobConfig); - assertThat(schedulerFacade.updateJobConfiguration(TestDataflowJob.class.getName(), jobConfig), is(jobConfig)); - verify(configService).persist(TestDataflowJob.class.getName(), jobConfig); + when(configService.setUpJobConfiguration(TestDataflowJob.class.getName(), jobConfig)).thenReturn(jobConfig); + assertThat(setUpFacade.setUpJobConfiguration(TestDataflowJob.class.getName(), jobConfig), is(jobConfig)); + verify(configService).setUpJobConfiguration(TestDataflowJob.class.getName(), jobConfig); } @Test public void assertRegisterStartUpInfo() { - schedulerFacade.registerStartUpInfo(true); + setUpFacade.registerStartUpInfo(true); verify(listenerManager).startAllListeners(); verify(leaderService).electLeader(); verify(serverService).persistOnline(true); verify(shardingService).setReshardingFlag(); verify(monitorService).listen(); } - - @Test - public void assertShutdownInstanceIfNotLeaderAndReconcileServiceIsNotRunning() { - JobRegistry.getInstance().registerJob("test_job", jobScheduleController, regCenter); - schedulerFacade.shutdownInstance(); - verify(leaderService, times(0)).removeLeader(); - verify(monitorService).close(); - verify(reconcileService, times(0)).stopAsync(); - verify(jobScheduleController).shutdown(); - } - - @Test - public void assertShutdownInstanceIfLeaderAndReconcileServiceIsRunning() { - when(leaderService.isLeader()).thenReturn(true); - when(reconcileService.isRunning()).thenReturn(true); - JobRegistry.getInstance().registerJob("test_job", jobScheduleController, regCenter); - schedulerFacade.shutdownInstance(); - verify(leaderService).removeLeader(); - verify(monitorService).close(); - verify(reconcileService).stopAsync(); - verify(jobScheduleController).shutdown(); - } } diff --git a/elastic-job-lite-lifecycle/src/main/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/internal/statistics/ServerStatisticsAPIImpl.java b/elastic-job-lite-lifecycle/src/main/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/internal/statistics/ServerStatisticsAPIImpl.java index 4965246..881a8b2 100644 --- a/elastic-job-lite-lifecycle/src/main/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/internal/statistics/ServerStatisticsAPIImpl.java +++ b/elastic-job-lite-lifecycle/src/main/java/org/apache/shardingsphere/elasticjob/lite/lifecycle/internal/statistics/ServerStatisticsAPIImpl.java @@ -44,9 +44,7 @@ public final class ServerStatisticsAPIImpl implements ServerStatisticsAPI { Set<String> servers = new HashSet<>(); for (String jobName : regCenter.getChildrenKeys("/")) { JobNodePath jobNodePath = new JobNodePath(jobName); - for (String each : regCenter.getChildrenKeys(jobNodePath.getServerNodePath())) { - servers.add(each); - } + servers.addAll(regCenter.getChildrenKeys(jobNodePath.getServerNodePath())); } return servers.size(); }