This is an automated email from the ASF dual-hosted git repository.
linghengqian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere-elasticjob.git
The following commit(s) were added to refs/heads/master by this push:
new a5cfb4a82 feature: Support New Sharding Strategy To Run Balanceful On
Multiple Servers For Single Sharding Scenario (#2462)
a5cfb4a82 is described below
commit a5cfb4a8230af2b79d61d58d2c8acfef4f0682c9
Author: baiyangzhuhong <[email protected]>
AuthorDate: Wed Dec 11 15:29:53 2024 +0800
feature: Support New Sharding Strategy To Run Balanceful On Multiple
Servers For Single Sharding Scenario (#2462)
* feature: https://github.com/apache/shardingsphere-elasticjob/issues/2461
* fix problems of code formatting validation
* remove some comments
---------
Co-authored-by: baiyangzhuhong <[email protected]>
---
.../{JobFacade.java => AbstractJobFacade.java} | 63 ++--
.../kernel/executor/facade/JobFacade.java | 150 ++--------
.../kernel/executor/facade/ShardingJobFacade.java | 93 ++++++
.../executor/facade/SingleShardingJobFacade.java | 158 ++++++++++
.../kernel/internal/schedule/JobScheduler.java | 20 +-
.../SingleShardingBalanceJobShardingStrategy.java | 61 ++++
....internal.sharding.strategy.JobShardingStrategy | 1 +
...bFacadeTest.java => ShardingJobFacadeTest.java} | 58 ++--
.../facade/SingleShardingJobFacadeTest.java | 329 +++++++++++++++++++++
...ngleShardingBalanceJobShardingStrategyTest.java | 42 +++
10 files changed, 789 insertions(+), 186 deletions(-)
diff --git
a/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/JobFacade.java
b/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/AbstractJobFacade.java
similarity index 86%
copy from
kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/JobFacade.java
copy to
kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/AbstractJobFacade.java
index 7c6eaf040..44a1c169c 100644
---
a/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/JobFacade.java
+++
b/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/AbstractJobFacade.java
@@ -18,35 +18,36 @@
package org.apache.shardingsphere.elasticjob.kernel.executor.facade;
import com.google.common.base.Strings;
-import lombok.extern.slf4j.Slf4j;
+
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.stream.Collectors;
+
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import
org.apache.shardingsphere.elasticjob.kernel.infra.exception.JobExecutionEnvironmentException;
-import org.apache.shardingsphere.elasticjob.spi.listener.ElasticJobListener;
-import
org.apache.shardingsphere.elasticjob.spi.listener.param.ShardingContexts;
import
org.apache.shardingsphere.elasticjob.kernel.internal.config.ConfigurationService;
import
org.apache.shardingsphere.elasticjob.kernel.internal.context.TaskContext;
import
org.apache.shardingsphere.elasticjob.kernel.internal.failover.FailoverService;
import
org.apache.shardingsphere.elasticjob.kernel.internal.sharding.ExecutionContextService;
import
org.apache.shardingsphere.elasticjob.kernel.internal.sharding.ExecutionService;
import
org.apache.shardingsphere.elasticjob.kernel.internal.sharding.ShardingService;
+import
org.apache.shardingsphere.elasticjob.kernel.tracing.config.TracingConfiguration;
+import
org.apache.shardingsphere.elasticjob.kernel.tracing.event.JobTracingEventBus;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import
org.apache.shardingsphere.elasticjob.spi.executor.item.param.JobRuntimeService;
-import
org.apache.shardingsphere.elasticjob.kernel.tracing.event.JobTracingEventBus;
-import
org.apache.shardingsphere.elasticjob.kernel.tracing.config.TracingConfiguration;
+import org.apache.shardingsphere.elasticjob.spi.listener.ElasticJobListener;
+import
org.apache.shardingsphere.elasticjob.spi.listener.param.ShardingContexts;
import
org.apache.shardingsphere.elasticjob.spi.tracing.event.JobExecutionEvent;
import
org.apache.shardingsphere.elasticjob.spi.tracing.event.JobStatusTraceEvent;
import
org.apache.shardingsphere.elasticjob.spi.tracing.event.JobStatusTraceEvent.State;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.List;
-import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
/**
- * Job facade.
+ * Abstract Job facade.
*/
@Slf4j
-public final class JobFacade {
+abstract class AbstractJobFacade implements JobFacade {
private final ConfigurationService configService;
@@ -62,7 +63,7 @@ public final class JobFacade {
private final JobTracingEventBus jobTracingEventBus;
- public JobFacade(final CoordinatorRegistryCenter regCenter, final String
jobName, final Collection<ElasticJobListener> elasticJobListeners, final
TracingConfiguration<?> tracingConfig) {
+ AbstractJobFacade(final CoordinatorRegistryCenter regCenter, final String
jobName, final Collection<ElasticJobListener> elasticJobListeners, final
TracingConfiguration<?> tracingConfig) {
configService = new ConfigurationService(regCenter, jobName);
shardingService = new ShardingService(regCenter, jobName);
executionContextService = new ExecutionContextService(regCenter,
jobName);
@@ -78,6 +79,7 @@ public final class JobFacade {
* @param fromCache load from cache or not
* @return job configuration
*/
+ @Override
public JobConfiguration loadJobConfiguration(final boolean fromCache) {
return configService.load(fromCache);
}
@@ -85,8 +87,9 @@ public final class JobFacade {
/**
* Check job execution environment.
*
- * @throws JobExecutionEnvironmentException job execution environment
exception
+ * @throws
org.apache.shardingsphere.elasticjob.kernel.infra.exception.JobExecutionEnvironmentException
job execution environment exception
*/
+ @Override
public void checkJobExecutionEnvironment() throws
JobExecutionEnvironmentException {
configService.checkMaxTimeDiffSecondsTolerable();
}
@@ -94,6 +97,7 @@ public final class JobFacade {
/**
* Failover If necessary.
*/
+ @Override
public void failoverIfNecessary() {
if (configService.load(true).isFailover()) {
failoverService.failoverIfNecessary();
@@ -105,6 +109,7 @@ public final class JobFacade {
*
* @param shardingContexts sharding contexts
*/
+ @Override
public void registerJobBegin(final ShardingContexts shardingContexts) {
executionService.registerJobBegin(shardingContexts);
}
@@ -114,6 +119,7 @@ public final class JobFacade {
*
* @param shardingContexts sharding contexts
*/
+ @Override
public void registerJobCompleted(final ShardingContexts shardingContexts) {
executionService.registerJobCompleted(shardingContexts);
if (configService.load(true).isFailover()) {
@@ -121,27 +127,7 @@ public final class JobFacade {
}
}
- /**
- * Get sharding contexts.
- *
- * @return sharding contexts
- */
- public ShardingContexts getShardingContexts() {
- boolean isFailover = configService.load(true).isFailover();
- if (isFailover) {
- List<Integer> failoverShardingItems =
failoverService.getLocalFailoverItems();
- if (!failoverShardingItems.isEmpty()) {
- return
executionContextService.getJobShardingContext(failoverShardingItems);
- }
- }
- shardingService.shardingIfNecessary();
- List<Integer> shardingItems = shardingService.getLocalShardingItems();
- if (isFailover) {
- shardingItems.removeAll(failoverService.getLocalTakeOffItems());
- }
-
shardingItems.removeAll(executionService.getDisabledItems(shardingItems));
- return executionContextService.getJobShardingContext(shardingItems);
- }
+ public abstract ShardingContexts getShardingContexts();
/**
* Set task misfire flag.
@@ -149,6 +135,7 @@ public final class JobFacade {
* @param shardingItems sharding items to be set misfire flag
* @return whether satisfy misfire condition
*/
+ @Override
public boolean misfireIfRunning(final Collection<Integer> shardingItems) {
return executionService.misfireIfHasRunningItems(shardingItems);
}
@@ -158,6 +145,7 @@ public final class JobFacade {
*
* @param shardingItems sharding items to be cleared misfire flag
*/
+ @Override
public void clearMisfire(final Collection<Integer> shardingItems) {
executionService.clearMisfire(shardingItems);
}
@@ -168,6 +156,7 @@ public final class JobFacade {
* @param shardingItems sharding items
* @return need to execute misfire tasks or not
*/
+ @Override
public boolean isExecuteMisfired(final Collection<Integer> shardingItems) {
return configService.load(true).isMisfire() && !isNeedSharding() &&
!executionService.getMisfiredJobItems(shardingItems).isEmpty();
}
@@ -177,6 +166,7 @@ public final class JobFacade {
*
* @return need resharding or not
*/
+ @Override
public boolean isNeedSharding() {
return shardingService.isNeedSharding();
}
@@ -186,6 +176,7 @@ public final class JobFacade {
*
* @param shardingContexts sharding contexts
*/
+ @Override
public void beforeJobExecuted(final ShardingContexts shardingContexts) {
for (ElasticJobListener each : elasticJobListeners) {
each.beforeJobExecuted(shardingContexts);
@@ -197,6 +188,7 @@ public final class JobFacade {
*
* @param shardingContexts sharding contexts
*/
+ @Override
public void afterJobExecuted(final ShardingContexts shardingContexts) {
for (ElasticJobListener each : elasticJobListeners) {
each.afterJobExecuted(shardingContexts);
@@ -208,6 +200,7 @@ public final class JobFacade {
*
* @param jobExecutionEvent job execution event
*/
+ @Override
public void postJobExecutionEvent(final JobExecutionEvent
jobExecutionEvent) {
jobTracingEventBus.post(jobExecutionEvent);
}
@@ -219,6 +212,7 @@ public final class JobFacade {
* @param state job state
* @param message job message
*/
+ @Override
public void postJobStatusTraceEvent(final String taskId, final State
state, final String message) {
TaskContext taskContext = TaskContext.from(taskId);
jobTracingEventBus.post(new
JobStatusTraceEvent(taskContext.getMetaInfo().getJobName(), taskContext.getId(),
@@ -233,6 +227,7 @@ public final class JobFacade {
*
* @return job runtime service
*/
+ @Override
public JobRuntimeService getJobRuntimeService() {
return new JobJobRuntimeServiceImpl(this);
}
diff --git
a/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/JobFacade.java
b/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/JobFacade.java
index 7c6eaf040..72b638b5d 100644
---
a/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/JobFacade.java
+++
b/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/JobFacade.java
@@ -17,131 +17,60 @@
package org.apache.shardingsphere.elasticjob.kernel.executor.facade;
-import com.google.common.base.Strings;
-import lombok.extern.slf4j.Slf4j;
+import java.util.Collection;
+
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import
org.apache.shardingsphere.elasticjob.kernel.infra.exception.JobExecutionEnvironmentException;
-import org.apache.shardingsphere.elasticjob.spi.listener.ElasticJobListener;
-import
org.apache.shardingsphere.elasticjob.spi.listener.param.ShardingContexts;
-import
org.apache.shardingsphere.elasticjob.kernel.internal.config.ConfigurationService;
-import
org.apache.shardingsphere.elasticjob.kernel.internal.context.TaskContext;
-import
org.apache.shardingsphere.elasticjob.kernel.internal.failover.FailoverService;
-import
org.apache.shardingsphere.elasticjob.kernel.internal.sharding.ExecutionContextService;
-import
org.apache.shardingsphere.elasticjob.kernel.internal.sharding.ExecutionService;
-import
org.apache.shardingsphere.elasticjob.kernel.internal.sharding.ShardingService;
-import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import
org.apache.shardingsphere.elasticjob.spi.executor.item.param.JobRuntimeService;
-import
org.apache.shardingsphere.elasticjob.kernel.tracing.event.JobTracingEventBus;
-import
org.apache.shardingsphere.elasticjob.kernel.tracing.config.TracingConfiguration;
+import
org.apache.shardingsphere.elasticjob.spi.listener.param.ShardingContexts;
import
org.apache.shardingsphere.elasticjob.spi.tracing.event.JobExecutionEvent;
import
org.apache.shardingsphere.elasticjob.spi.tracing.event.JobStatusTraceEvent;
-import
org.apache.shardingsphere.elasticjob.spi.tracing.event.JobStatusTraceEvent.State;
-
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.List;
-import java.util.stream.Collectors;
/**
* Job facade.
*/
-@Slf4j
-public final class JobFacade {
-
- private final ConfigurationService configService;
-
- private final ShardingService shardingService;
-
- private final ExecutionContextService executionContextService;
-
- private final ExecutionService executionService;
-
- private final FailoverService failoverService;
-
- private final Collection<ElasticJobListener> elasticJobListeners;
-
- private final JobTracingEventBus jobTracingEventBus;
-
- public JobFacade(final CoordinatorRegistryCenter regCenter, final String
jobName, final Collection<ElasticJobListener> elasticJobListeners, final
TracingConfiguration<?> tracingConfig) {
- configService = new ConfigurationService(regCenter, jobName);
- shardingService = new ShardingService(regCenter, jobName);
- executionContextService = new ExecutionContextService(regCenter,
jobName);
- executionService = new ExecutionService(regCenter, jobName);
- failoverService = new FailoverService(regCenter, jobName);
- this.elasticJobListeners =
elasticJobListeners.stream().sorted(Comparator.comparingInt(ElasticJobListener::order)).collect(Collectors.toList());
- this.jobTracingEventBus = null == tracingConfig ? new
JobTracingEventBus() : new JobTracingEventBus(tracingConfig);
- }
+public interface JobFacade {
/**
* Load job configuration.
- *
+ *
* @param fromCache load from cache or not
* @return job configuration
*/
- public JobConfiguration loadJobConfiguration(final boolean fromCache) {
- return configService.load(fromCache);
- }
+ JobConfiguration loadJobConfiguration(boolean fromCache);
/**
* Check job execution environment.
- *
+ *
* @throws JobExecutionEnvironmentException job execution environment
exception
*/
- public void checkJobExecutionEnvironment() throws
JobExecutionEnvironmentException {
- configService.checkMaxTimeDiffSecondsTolerable();
- }
+ void checkJobExecutionEnvironment() throws
JobExecutionEnvironmentException;
/**
* Failover If necessary.
*/
- public void failoverIfNecessary() {
- if (configService.load(true).isFailover()) {
- failoverService.failoverIfNecessary();
- }
- }
+ void failoverIfNecessary();
/**
* Register job begin.
*
* @param shardingContexts sharding contexts
*/
- public void registerJobBegin(final ShardingContexts shardingContexts) {
- executionService.registerJobBegin(shardingContexts);
- }
+ void registerJobBegin(ShardingContexts shardingContexts);
/**
* Register job completed.
*
* @param shardingContexts sharding contexts
*/
- public void registerJobCompleted(final ShardingContexts shardingContexts) {
- executionService.registerJobCompleted(shardingContexts);
- if (configService.load(true).isFailover()) {
-
failoverService.updateFailoverComplete(shardingContexts.getShardingItemParameters().keySet());
- }
- }
+ void registerJobCompleted(ShardingContexts shardingContexts);
/**
* Get sharding contexts.
*
* @return sharding contexts
*/
- public ShardingContexts getShardingContexts() {
- boolean isFailover = configService.load(true).isFailover();
- if (isFailover) {
- List<Integer> failoverShardingItems =
failoverService.getLocalFailoverItems();
- if (!failoverShardingItems.isEmpty()) {
- return
executionContextService.getJobShardingContext(failoverShardingItems);
- }
- }
- shardingService.shardingIfNecessary();
- List<Integer> shardingItems = shardingService.getLocalShardingItems();
- if (isFailover) {
- shardingItems.removeAll(failoverService.getLocalTakeOffItems());
- }
-
shardingItems.removeAll(executionService.getDisabledItems(shardingItems));
- return executionContextService.getJobShardingContext(shardingItems);
- }
+ ShardingContexts getShardingContexts();
/**
* Set task misfire flag.
@@ -149,68 +78,50 @@ public final class JobFacade {
* @param shardingItems sharding items to be set misfire flag
* @return whether satisfy misfire condition
*/
- public boolean misfireIfRunning(final Collection<Integer> shardingItems) {
- return executionService.misfireIfHasRunningItems(shardingItems);
- }
+ boolean misfireIfRunning(Collection<Integer> shardingItems);
/**
* Clear misfire flag.
*
* @param shardingItems sharding items to be cleared misfire flag
*/
- public void clearMisfire(final Collection<Integer> shardingItems) {
- executionService.clearMisfire(shardingItems);
- }
+ void clearMisfire(Collection<Integer> shardingItems);
/**
- * Judge job whether to need to execute misfire tasks.
- *
+ * Judge job whether need to execute misfire tasks.
+ *
* @param shardingItems sharding items
- * @return need to execute misfire tasks or not
+ * @return whether need to execute misfire tasks
*/
- public boolean isExecuteMisfired(final Collection<Integer> shardingItems) {
- return configService.load(true).isMisfire() && !isNeedSharding() &&
!executionService.getMisfiredJobItems(shardingItems).isEmpty();
- }
+ boolean isExecuteMisfired(Collection<Integer> shardingItems);
/**
- * Judge job whether to need resharding.
+ * Judge job whether need resharding.
*
- * @return need resharding or not
+ * @return whether need resharding
*/
- public boolean isNeedSharding() {
- return shardingService.isNeedSharding();
- }
+ boolean isNeedSharding();
/**
* Call before job executed.
*
* @param shardingContexts sharding contexts
*/
- public void beforeJobExecuted(final ShardingContexts shardingContexts) {
- for (ElasticJobListener each : elasticJobListeners) {
- each.beforeJobExecuted(shardingContexts);
- }
- }
+ void beforeJobExecuted(ShardingContexts shardingContexts);
/**
* Call after job executed.
*
* @param shardingContexts sharding contexts
*/
- public void afterJobExecuted(final ShardingContexts shardingContexts) {
- for (ElasticJobListener each : elasticJobListeners) {
- each.afterJobExecuted(shardingContexts);
- }
- }
+ void afterJobExecuted(ShardingContexts shardingContexts);
/**
* Post job execution event.
*
* @param jobExecutionEvent job execution event
*/
- public void postJobExecutionEvent(final JobExecutionEvent
jobExecutionEvent) {
- jobTracingEventBus.post(jobExecutionEvent);
- }
+ void postJobExecutionEvent(JobExecutionEvent jobExecutionEvent);
/**
* Post job status trace event.
@@ -219,21 +130,12 @@ public final class JobFacade {
* @param state job state
* @param message job message
*/
- public void postJobStatusTraceEvent(final String taskId, final State
state, final String message) {
- TaskContext taskContext = TaskContext.from(taskId);
- jobTracingEventBus.post(new
JobStatusTraceEvent(taskContext.getMetaInfo().getJobName(), taskContext.getId(),
- taskContext.getSlaveId(), taskContext.getType(),
taskContext.getMetaInfo().getShardingItems().toString(), state, message));
- if (!Strings.isNullOrEmpty(message)) {
- log.trace(message);
- }
- }
+ void postJobStatusTraceEvent(String taskId, JobStatusTraceEvent.State
state, String message);
/**
* Get job runtime service.
*
* @return job runtime service
*/
- public JobRuntimeService getJobRuntimeService() {
- return new JobJobRuntimeServiceImpl(this);
- }
+ JobRuntimeService getJobRuntimeService();
}
diff --git
a/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/ShardingJobFacade.java
b/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/ShardingJobFacade.java
new file mode 100644
index 000000000..3f4f3cc90
--- /dev/null
+++
b/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/ShardingJobFacade.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.kernel.executor.facade;
+
+import lombok.extern.slf4j.Slf4j;
+
+import
org.apache.shardingsphere.elasticjob.kernel.internal.config.ConfigurationService;
+import
org.apache.shardingsphere.elasticjob.kernel.internal.failover.FailoverService;
+import
org.apache.shardingsphere.elasticjob.kernel.internal.sharding.ExecutionContextService;
+import
org.apache.shardingsphere.elasticjob.kernel.internal.sharding.ExecutionService;
+import
org.apache.shardingsphere.elasticjob.kernel.internal.sharding.ShardingService;
+import
org.apache.shardingsphere.elasticjob.kernel.tracing.event.JobTracingEventBus;
+import org.apache.shardingsphere.elasticjob.spi.listener.ElasticJobListener;
+import
org.apache.shardingsphere.elasticjob.spi.listener.param.ShardingContexts;
+import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
+import
org.apache.shardingsphere.elasticjob.kernel.tracing.config.TracingConfiguration;
+
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Sharding Job facade.
+ */
+@Slf4j
+public final class ShardingJobFacade extends AbstractJobFacade {
+
+ private final ConfigurationService configService;
+
+ private final ShardingService shardingService;
+
+ private final ExecutionContextService executionContextService;
+
+ private final ExecutionService executionService;
+
+ private final FailoverService failoverService;
+
+ private final Collection<ElasticJobListener> elasticJobListeners;
+
+ private final JobTracingEventBus jobTracingEventBus;
+
+ public ShardingJobFacade(final CoordinatorRegistryCenter regCenter, final
String jobName, final Collection<ElasticJobListener> elasticJobListeners, final
TracingConfiguration<?> tracingConfig) {
+ super(regCenter, jobName, elasticJobListeners, tracingConfig);
+
+ configService = new ConfigurationService(regCenter, jobName);
+ shardingService = new ShardingService(regCenter, jobName);
+ executionContextService = new ExecutionContextService(regCenter,
jobName);
+ executionService = new ExecutionService(regCenter, jobName);
+ failoverService = new FailoverService(regCenter, jobName);
+ this.elasticJobListeners =
elasticJobListeners.stream().sorted(Comparator.comparingInt(ElasticJobListener::order)).collect(Collectors.toList());
+ this.jobTracingEventBus = null == tracingConfig ? new
JobTracingEventBus() : new JobTracingEventBus(tracingConfig);
+ }
+
+ /**
+ * Get sharding contexts.
+ *
+ * @return sharding contexts
+ */
+ @Override
+ public ShardingContexts getShardingContexts() {
+ boolean isFailover = configService.load(true).isFailover();
+ if (isFailover) {
+ List<Integer> failoverShardingItems =
failoverService.getLocalFailoverItems();
+ if (!failoverShardingItems.isEmpty()) {
+ return
executionContextService.getJobShardingContext(failoverShardingItems);
+ }
+ }
+ shardingService.shardingIfNecessary();
+ List<Integer> shardingItems = shardingService.getLocalShardingItems();
+ if (isFailover) {
+ shardingItems.removeAll(failoverService.getLocalTakeOffItems());
+ }
+
shardingItems.removeAll(executionService.getDisabledItems(shardingItems));
+ return executionContextService.getJobShardingContext(shardingItems);
+ }
+
+}
diff --git
a/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/SingleShardingJobFacade.java
b/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/SingleShardingJobFacade.java
new file mode 100644
index 000000000..1ce6954ba
--- /dev/null
+++
b/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/SingleShardingJobFacade.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.kernel.executor.facade;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
+import
org.apache.shardingsphere.elasticjob.kernel.internal.config.ConfigurationService;
+import
org.apache.shardingsphere.elasticjob.kernel.internal.failover.FailoverService;
+import
org.apache.shardingsphere.elasticjob.kernel.internal.instance.InstanceService;
+import
org.apache.shardingsphere.elasticjob.kernel.internal.schedule.JobRegistry;
+import
org.apache.shardingsphere.elasticjob.kernel.internal.sharding.ExecutionContextService;
+import
org.apache.shardingsphere.elasticjob.kernel.internal.sharding.ExecutionService;
+import
org.apache.shardingsphere.elasticjob.kernel.internal.sharding.JobInstance;
+import
org.apache.shardingsphere.elasticjob.kernel.internal.sharding.ShardingService;
+import
org.apache.shardingsphere.elasticjob.kernel.internal.storage.JobNodeStorage;
+import
org.apache.shardingsphere.elasticjob.kernel.tracing.config.TracingConfiguration;
+import
org.apache.shardingsphere.elasticjob.kernel.tracing.event.JobTracingEventBus;
+import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
+import org.apache.shardingsphere.elasticjob.spi.listener.ElasticJobListener;
+import
org.apache.shardingsphere.elasticjob.spi.listener.param.ShardingContexts;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Single Sharding Job facade.
+ */
+@Slf4j
+public final class SingleShardingJobFacade extends AbstractJobFacade {
+
+ private final ConfigurationService configService;
+
+ private final ShardingService shardingService;
+
+ private final ExecutionContextService executionContextService;
+
+ private final ExecutionService executionService;
+
+ private final FailoverService failoverService;
+
+ private final Collection<ElasticJobListener> elasticJobListeners;
+
+ private final JobTracingEventBus jobTracingEventBus;
+
+ private final JobNodeStorage jobNodeStorage;
+
+ private final InstanceService instanceService;
+
+ public SingleShardingJobFacade(final CoordinatorRegistryCenter regCenter,
final String jobName, final Collection<ElasticJobListener> elasticJobListeners,
+ final TracingConfiguration<?>
tracingConfig) {
+ super(regCenter, jobName, elasticJobListeners, tracingConfig);
+
+ configService = new ConfigurationService(regCenter, jobName);
+ shardingService = new ShardingService(regCenter, jobName);
+ executionContextService = new ExecutionContextService(regCenter,
jobName);
+ executionService = new ExecutionService(regCenter, jobName);
+ failoverService = new FailoverService(regCenter, jobName);
+ this.elasticJobListeners =
elasticJobListeners.stream().sorted(Comparator.comparingInt(ElasticJobListener::order)).collect(Collectors.toList());
+ this.jobTracingEventBus = null == tracingConfig ? new
JobTracingEventBus() : new JobTracingEventBus(tracingConfig);
+ jobNodeStorage = new JobNodeStorage(regCenter, jobName);
+ instanceService = new InstanceService(regCenter, jobName);
+ }
+
+ @Override
+ public void registerJobCompleted(final ShardingContexts shardingContexts) {
+ super.registerJobCompleted(shardingContexts);
+
+ JobConfiguration jobConfig = configService.load(true);
+ JobInstance jobInst =
JobRegistry.getInstance().getJobInstance(jobConfig.getJobName());
+ if (null == jobInst) {
+ log.warn("Error! Can't find the job instance with name:{}",
jobConfig.getJobName());
+ return;
+ }
+ Integer nextIndex = null;
+ List<JobInstance> availJobInst =
instanceService.getAvailableJobInstances();
+ for (int i = 0; i < availJobInst.size(); i++) {
+ JobInstance temp = availJobInst.get(i);
+ if (temp.getServerIp().equals(jobInst.getServerIp())) {
+ nextIndex = i + 1;
+ break;
+ }
+ }
+ if (nextIndex != null) {
+ nextIndex = nextIndex >= availJobInst.size() ? 0 : nextIndex;
+ jobNodeStorage.fillEphemeralJobNode("next-job-instance-ip",
availJobInst.get(nextIndex).getServerIp());
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("job name: {}, next index: {}, sharding total count: {}",
+ jobConfig.getJobName(), nextIndex,
jobConfig.getShardingTotalCount());
+ }
+ }
+
+ /**
+ * Get sharding contexts.
+ *
+ * @return sharding contexts
+ */
+ @Override
+ public ShardingContexts getShardingContexts() {
+ JobConfiguration jobConfig = configService.load(true);
+ boolean isFailover = jobConfig.isFailover();
+ if (isFailover) {
+ List<Integer> failoverShardingItems =
failoverService.getLocalFailoverItems();
+ if (!failoverShardingItems.isEmpty()) {
+ return
executionContextService.getJobShardingContext(failoverShardingItems);
+ }
+ }
+
+ List<Integer> shardingItems;
+ String nextJobInstIP = null;
+ if (isNeedSharding()) {
+ shardingService.shardingIfNecessary();
+ shardingItems = shardingService.getLocalShardingItems();
+ } else {
+ nextJobInstIP =
jobNodeStorage.getJobNodeDataDirectly("next-job-instance-ip");
+ if (StringUtils.isBlank(nextJobInstIP)) {
+ shardingService.shardingIfNecessary();
+ shardingItems = shardingService.getLocalShardingItems();
+ } else {
+ JobInstance jobInst =
JobRegistry.getInstance().getJobInstance(jobConfig.getJobName());
+ shardingItems = nextJobInstIP.equals(jobInst.getServerIp()) ?
Collections.singletonList(0) : new ArrayList<>();
+ }
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("job name: {}, sharding items: {}, nextJobInstIP: {},
sharding total count: {}, isFailover: {}",
+ jobConfig.getJobName(), shardingItems, nextJobInstIP,
jobConfig.getShardingTotalCount(), isFailover);
+ }
+
+ if (isFailover) {
+ shardingItems.removeAll(failoverService.getLocalTakeOffItems());
+ }
+
shardingItems.removeAll(executionService.getDisabledItems(shardingItems));
+ return executionContextService.getJobShardingContext(shardingItems);
+ }
+
+}
diff --git
a/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/internal/schedule/JobScheduler.java
b/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/internal/schedule/JobScheduler.java
index 0311b6fc6..fbeab942e 100644
---
a/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/internal/schedule/JobScheduler.java
+++
b/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/internal/schedule/JobScheduler.java
@@ -22,9 +22,11 @@ import com.google.common.base.Strings;
import lombok.Getter;
import org.apache.shardingsphere.elasticjob.api.ElasticJob;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
+import
org.apache.shardingsphere.elasticjob.kernel.executor.facade.SingleShardingJobFacade;
import
org.apache.shardingsphere.elasticjob.spi.executor.error.handler.JobErrorHandlerPropertiesValidator;
import org.apache.shardingsphere.elasticjob.kernel.executor.ElasticJobExecutor;
import
org.apache.shardingsphere.elasticjob.kernel.infra.exception.JobSystemException;
+import
org.apache.shardingsphere.elasticjob.kernel.executor.facade.ShardingJobFacade;
import org.apache.shardingsphere.elasticjob.kernel.executor.facade.JobFacade;
import
org.apache.shardingsphere.elasticjob.kernel.internal.sharding.JobInstance;
import org.apache.shardingsphere.elasticjob.spi.listener.ElasticJobListener;
@@ -80,7 +82,14 @@ public final class JobScheduler {
Collection<ElasticJobListener> jobListeners =
getElasticJobListeners(this.jobConfig);
setUpFacade = new SetUpFacade(regCenter, this.jobConfig.getJobName(),
jobListeners);
schedulerFacade = new SchedulerFacade(regCenter,
this.jobConfig.getJobName());
- jobFacade = new JobFacade(regCenter, this.jobConfig.getJobName(),
jobListeners, findTracingConfiguration().orElse(null));
+
+ if (1 == this.jobConfig.getShardingTotalCount()
+ &&
"SINGLE_SHARDING_BALANCE".equals(this.jobConfig.getJobShardingStrategyType())) {
+ jobFacade = new SingleShardingJobFacade(regCenter,
this.jobConfig.getJobName(), jobListeners,
findTracingConfiguration().orElse(null));
+ } else {
+ jobFacade = new ShardingJobFacade(regCenter,
this.jobConfig.getJobName(), jobListeners,
findTracingConfiguration().orElse(null));
+ }
+
validateJobProperties();
jobExecutor = new ElasticJobExecutor(elasticJob, this.jobConfig,
jobFacade);
setGuaranteeServiceForElasticJobListeners(regCenter, jobListeners);
@@ -94,7 +103,14 @@ public final class JobScheduler {
Collection<ElasticJobListener> jobListeners =
getElasticJobListeners(this.jobConfig);
setUpFacade = new SetUpFacade(regCenter, this.jobConfig.getJobName(),
jobListeners);
schedulerFacade = new SchedulerFacade(regCenter,
this.jobConfig.getJobName());
- jobFacade = new JobFacade(regCenter, this.jobConfig.getJobName(),
jobListeners, findTracingConfiguration().orElse(null));
+
+ if (1 == this.jobConfig.getShardingTotalCount()
+ &&
"SINGLE_SHARDING_BALANCE".equals(this.jobConfig.getJobShardingStrategyType())) {
+ jobFacade = new SingleShardingJobFacade(regCenter,
this.jobConfig.getJobName(), jobListeners,
findTracingConfiguration().orElse(null));
+ } else {
+ jobFacade = new ShardingJobFacade(regCenter,
this.jobConfig.getJobName(), jobListeners,
findTracingConfiguration().orElse(null));
+ }
+
validateJobProperties();
jobExecutor = new ElasticJobExecutor(elasticJobType, this.jobConfig,
jobFacade);
setGuaranteeServiceForElasticJobListeners(regCenter, jobListeners);
diff --git
a/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/internal/sharding/strategy/type/SingleShardingBalanceJobShardingStrategy.java
b/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/internal/sharding/strategy/type/SingleShardingBalanceJobShardingStrategy.java
new file mode 100644
index 000000000..268ba4fb9
--- /dev/null
+++
b/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/internal/sharding/strategy/type/SingleShardingBalanceJobShardingStrategy.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.shardingsphere.elasticjob.kernel.internal.sharding.strategy.type;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import
org.apache.shardingsphere.elasticjob.kernel.internal.sharding.JobInstance;
+import
org.apache.shardingsphere.elasticjob.kernel.internal.sharding.strategy.JobShardingStrategy;
+
+/**
+ * Single sharding Balance strategy, referenced of ROUND_ROBIN strategy.
+ * <pre>
+ * it resolves the problem which ROUND_ROBIN is stick with the certain one job
instance
+ * for the hashcode of job name is a constant value. while with
SINGLE_SHARDING_BALANCE, it allows
+ * the job running on all the job instances each one by one, just like loop
the job instances.
+ *
+ * this is the real round robin balance job running in the job instance
dimension.
+ * </pre>
+ *
+ */
+public class SingleShardingBalanceJobShardingStrategy implements
JobShardingStrategy {
+
+ private final AverageAllocationJobShardingStrategy
averageAllocationJobShardingStrategy = new
AverageAllocationJobShardingStrategy();
+
+ @Override
+ public Map<JobInstance, List<Integer>> sharding(final List<JobInstance>
jobInstances, final String jobName, final int shardingTotalCount) {
+ int shardingUnitsSize = jobInstances.size();
+ int offset = Math.abs(jobName.hashCode() + ((Long)
System.currentTimeMillis()).intValue()) % shardingUnitsSize;
+
+ List<JobInstance> result = new ArrayList<>(shardingUnitsSize);
+ for (int i = 0; i < shardingUnitsSize; i++) {
+ int index = (i + offset) % shardingUnitsSize;
+ result.add(jobInstances.get(index));
+ }
+
+ return averageAllocationJobShardingStrategy.sharding(result, jobName,
shardingTotalCount);
+ }
+
+ @Override
+ public String getType() {
+ return "SINGLE_SHARDING_BALANCE";
+ }
+
+}
diff --git
a/kernel/src/main/resources/META-INF/services/org.apache.shardingsphere.elasticjob.kernel.internal.sharding.strategy.JobShardingStrategy
b/kernel/src/main/resources/META-INF/services/org.apache.shardingsphere.elasticjob.kernel.internal.sharding.strategy.JobShardingStrategy
index 16310868c..f1fa156d0 100644
---
a/kernel/src/main/resources/META-INF/services/org.apache.shardingsphere.elasticjob.kernel.internal.sharding.strategy.JobShardingStrategy
+++
b/kernel/src/main/resources/META-INF/services/org.apache.shardingsphere.elasticjob.kernel.internal.sharding.strategy.JobShardingStrategy
@@ -18,3 +18,4 @@
org.apache.shardingsphere.elasticjob.kernel.internal.sharding.strategy.type.AverageAllocationJobShardingStrategy
org.apache.shardingsphere.elasticjob.kernel.internal.sharding.strategy.type.OdevitySortByNameJobShardingStrategy
org.apache.shardingsphere.elasticjob.kernel.internal.sharding.strategy.type.RoundRobinByNameJobShardingStrategy
+org.apache.shardingsphere.elasticjob.kernel.internal.sharding.strategy.type.SingleShardingBalanceJobShardingStrategy
diff --git
a/kernel/src/test/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/JobFacadeTest.java
b/kernel/src/test/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/ShardingJobFacadeTest.java
similarity index 77%
rename from
kernel/src/test/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/JobFacadeTest.java
rename to
kernel/src/test/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/ShardingJobFacadeTest.java
index 8806af610..31afd41ee 100644
---
a/kernel/src/test/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/JobFacadeTest.java
+++
b/kernel/src/test/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/ShardingJobFacadeTest.java
@@ -46,7 +46,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
-class JobFacadeTest {
+class ShardingJobFacadeTest {
@Mock
private ConfigurationService configService;
@@ -69,54 +69,60 @@ class JobFacadeTest {
@Mock
private ElasticJobListenerCaller caller;
- private JobFacade jobFacade;
+ private ShardingJobFacade shardingJobFacade;
private StringBuilder orderResult;
@BeforeEach
void setUp() {
orderResult = new StringBuilder();
- jobFacade = new JobFacade(null, "test_job",
+ shardingJobFacade = new ShardingJobFacade(null, "test_job",
Arrays.asList(new TestElasticJobListener(caller, "l1", 2,
orderResult), new TestElasticJobListener(caller, "l2", 1, orderResult)), null);
- ReflectionUtils.setFieldValue(jobFacade, "configService",
configService);
- ReflectionUtils.setFieldValue(jobFacade, "shardingService",
shardingService);
- ReflectionUtils.setFieldValue(jobFacade, "executionContextService",
executionContextService);
- ReflectionUtils.setFieldValue(jobFacade, "executionService",
executionService);
- ReflectionUtils.setFieldValue(jobFacade, "failoverService",
failoverService);
- ReflectionUtils.setFieldValue(jobFacade, "jobTracingEventBus",
jobTracingEventBus);
+ ReflectionUtils.setSuperclassFieldValue(shardingJobFacade,
"configService", configService);
+ ReflectionUtils.setSuperclassFieldValue(shardingJobFacade,
"shardingService", shardingService);
+ ReflectionUtils.setSuperclassFieldValue(shardingJobFacade,
"executionContextService", executionContextService);
+ ReflectionUtils.setSuperclassFieldValue(shardingJobFacade,
"executionService", executionService);
+ ReflectionUtils.setSuperclassFieldValue(shardingJobFacade,
"failoverService", failoverService);
+ ReflectionUtils.setSuperclassFieldValue(shardingJobFacade,
"jobTracingEventBus", jobTracingEventBus);
+ ReflectionUtils.setFieldValue(shardingJobFacade, "configService",
configService);
+ ReflectionUtils.setFieldValue(shardingJobFacade, "shardingService",
shardingService);
+ ReflectionUtils.setFieldValue(shardingJobFacade,
"executionContextService", executionContextService);
+ ReflectionUtils.setFieldValue(shardingJobFacade, "executionService",
executionService);
+ ReflectionUtils.setFieldValue(shardingJobFacade, "failoverService",
failoverService);
+ ReflectionUtils.setFieldValue(shardingJobFacade, "jobTracingEventBus",
jobTracingEventBus);
}
@Test
void assertLoad() {
JobConfiguration expected = JobConfiguration.newBuilder("test_job",
3).cron("0/1 * * * * ?").build();
when(configService.load(true)).thenReturn(expected);
- assertThat(jobFacade.loadJobConfiguration(true), is(expected));
+ assertThat(shardingJobFacade.loadJobConfiguration(true), is(expected));
}
@Test
void assertCheckMaxTimeDiffSecondsTolerable() throws
JobExecutionEnvironmentException {
- jobFacade.checkJobExecutionEnvironment();
+ shardingJobFacade.checkJobExecutionEnvironment();
verify(configService).checkMaxTimeDiffSecondsTolerable();
}
@Test
void assertFailoverIfUnnecessary() {
when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job",
3).cron("0/1 * * * * ?").failover(false).build());
- jobFacade.failoverIfNecessary();
+ shardingJobFacade.failoverIfNecessary();
verify(failoverService, times(0)).failoverIfNecessary();
}
@Test
void assertFailoverIfNecessary() {
when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job",
3).cron("0/1 * * * * ?").failover(true).monitorExecution(true).build());
- jobFacade.failoverIfNecessary();
+ shardingJobFacade.failoverIfNecessary();
verify(failoverService).failoverIfNecessary();
}
@Test
void assertRegisterJobBegin() {
ShardingContexts shardingContexts = new
ShardingContexts("fake_task_id", "test_job", 10, "", Collections.emptyMap());
- jobFacade.registerJobBegin(shardingContexts);
+ shardingJobFacade.registerJobBegin(shardingContexts);
verify(executionService).registerJobBegin(shardingContexts);
}
@@ -124,7 +130,7 @@ class JobFacadeTest {
void assertRegisterJobCompletedWhenFailoverDisabled() {
ShardingContexts shardingContexts = new
ShardingContexts("fake_task_id", "test_job", 10, "", Collections.emptyMap());
when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job",
3).cron("0/1 * * * * ?").failover(false).build());
- jobFacade.registerJobCompleted(shardingContexts);
+ shardingJobFacade.registerJobCompleted(shardingContexts);
verify(executionService).registerJobCompleted(shardingContexts);
verify(failoverService,
times(0)).updateFailoverComplete(shardingContexts.getShardingItemParameters().keySet());
}
@@ -133,7 +139,7 @@ class JobFacadeTest {
void assertRegisterJobCompletedWhenFailoverEnabled() {
ShardingContexts shardingContexts = new
ShardingContexts("fake_task_id", "test_job", 10, "", Collections.emptyMap());
when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job",
3).cron("0/1 * * * * ?").failover(true).monitorExecution(true).build());
- jobFacade.registerJobCompleted(shardingContexts);
+ shardingJobFacade.registerJobCompleted(shardingContexts);
verify(executionService).registerJobCompleted(shardingContexts);
verify(failoverService).updateFailoverComplete(shardingContexts.getShardingItemParameters().keySet());
}
@@ -144,7 +150,7 @@ class JobFacadeTest {
when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job",
3).cron("0/1 * * * * ?").failover(true).monitorExecution(true).build());
when(failoverService.getLocalFailoverItems()).thenReturn(Collections.singletonList(1));
when(executionContextService.getJobShardingContext(Collections.singletonList(1))).thenReturn(shardingContexts);
- assertThat(jobFacade.getShardingContexts(), is(shardingContexts));
+ assertThat(shardingJobFacade.getShardingContexts(),
is(shardingContexts));
verify(shardingService, times(0)).shardingIfNecessary();
}
@@ -156,7 +162,7 @@ class JobFacadeTest {
when(shardingService.getLocalShardingItems()).thenReturn(Lists.newArrayList(0,
1));
when(failoverService.getLocalTakeOffItems()).thenReturn(Collections.singletonList(0));
when(executionContextService.getJobShardingContext(Collections.singletonList(1))).thenReturn(shardingContexts);
- assertThat(jobFacade.getShardingContexts(), is(shardingContexts));
+ assertThat(shardingJobFacade.getShardingContexts(),
is(shardingContexts));
verify(shardingService).shardingIfNecessary();
}
@@ -166,7 +172,7 @@ class JobFacadeTest {
when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job",
3).cron("0/1 * * * * ?").failover(false).build());
when(shardingService.getLocalShardingItems()).thenReturn(Arrays.asList(0, 1));
when(executionContextService.getJobShardingContext(Arrays.asList(0,
1))).thenReturn(shardingContexts);
- assertThat(jobFacade.getShardingContexts(), is(shardingContexts));
+ assertThat(shardingJobFacade.getShardingContexts(),
is(shardingContexts));
verify(shardingService).shardingIfNecessary();
}
@@ -177,45 +183,45 @@ class JobFacadeTest {
when(shardingService.getLocalShardingItems()).thenReturn(Lists.newArrayList(0,
1));
when(executionService.getDisabledItems(Arrays.asList(0,
1))).thenReturn(Collections.singletonList(1));
when(executionContextService.getJobShardingContext(Collections.singletonList(0))).thenReturn(shardingContexts);
- assertThat(jobFacade.getShardingContexts(), is(shardingContexts));
+ assertThat(shardingJobFacade.getShardingContexts(),
is(shardingContexts));
verify(shardingService).shardingIfNecessary();
}
@Test
void assertMisfireIfRunning() {
when(executionService.misfireIfHasRunningItems(Arrays.asList(0,
1))).thenReturn(true);
- assertThat(jobFacade.misfireIfRunning(Arrays.asList(0, 1)), is(true));
+ assertThat(shardingJobFacade.misfireIfRunning(Arrays.asList(0, 1)),
is(true));
}
@Test
void assertClearMisfire() {
- jobFacade.clearMisfire(Arrays.asList(0, 1));
+ shardingJobFacade.clearMisfire(Arrays.asList(0, 1));
verify(executionService).clearMisfire(Arrays.asList(0, 1));
}
@Test
void assertIsNeedSharding() {
when(shardingService.isNeedSharding()).thenReturn(true);
- assertThat(jobFacade.isNeedSharding(), is(true));
+ assertThat(shardingJobFacade.isNeedSharding(), is(true));
}
@Test
void assertBeforeJobExecuted() {
- jobFacade.beforeJobExecuted(new ShardingContexts("fake_task_id",
"test_job", 10, "", Collections.emptyMap()));
+ shardingJobFacade.beforeJobExecuted(new
ShardingContexts("fake_task_id", "test_job", 10, "", Collections.emptyMap()));
verify(caller, times(2)).before();
assertThat(orderResult.toString(), is("l2l1"));
}
@Test
void assertAfterJobExecuted() {
- jobFacade.afterJobExecuted(new ShardingContexts("fake_task_id",
"test_job", 10, "", Collections.emptyMap()));
+ shardingJobFacade.afterJobExecuted(new
ShardingContexts("fake_task_id", "test_job", 10, "", Collections.emptyMap()));
verify(caller, times(2)).after();
assertThat(orderResult.toString(), is("l2l1"));
}
@Test
void assertPostJobExecutionEvent() {
- jobFacade.postJobExecutionEvent(null);
+ shardingJobFacade.postJobExecutionEvent(null);
verify(jobTracingEventBus).post(null);
}
}
diff --git
a/kernel/src/test/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/SingleShardingJobFacadeTest.java
b/kernel/src/test/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/SingleShardingJobFacadeTest.java
new file mode 100644
index 000000000..2f5027a94
--- /dev/null
+++
b/kernel/src/test/java/org/apache/shardingsphere/elasticjob/kernel/executor/facade/SingleShardingJobFacadeTest.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.kernel.executor.facade;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.Lists;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
+import
org.apache.shardingsphere.elasticjob.kernel.infra.exception.JobExecutionEnvironmentException;
+import
org.apache.shardingsphere.elasticjob.kernel.internal.config.ConfigurationService;
+import
org.apache.shardingsphere.elasticjob.kernel.internal.failover.FailoverService;
+import
org.apache.shardingsphere.elasticjob.kernel.internal.instance.InstanceService;
+import
org.apache.shardingsphere.elasticjob.kernel.internal.schedule.JobRegistry;
+import
org.apache.shardingsphere.elasticjob.kernel.internal.sharding.ExecutionContextService;
+import
org.apache.shardingsphere.elasticjob.kernel.internal.sharding.ExecutionService;
+import
org.apache.shardingsphere.elasticjob.kernel.internal.sharding.JobInstance;
+import
org.apache.shardingsphere.elasticjob.kernel.internal.sharding.ShardingService;
+import
org.apache.shardingsphere.elasticjob.kernel.internal.storage.JobNodeStorage;
+import
org.apache.shardingsphere.elasticjob.kernel.listener.fixture.ElasticJobListenerCaller;
+import
org.apache.shardingsphere.elasticjob.kernel.listener.fixture.TestElasticJobListener;
+import
org.apache.shardingsphere.elasticjob.kernel.tracing.event.JobTracingEventBus;
+import
org.apache.shardingsphere.elasticjob.spi.listener.param.ShardingContexts;
+import org.apache.shardingsphere.elasticjob.test.util.ReflectionUtils;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class SingleShardingJobFacadeTest {
+
+ @Mock
+ private ConfigurationService configService;
+
+ @Mock
+ private ShardingService shardingService;
+
+ @Mock
+ private ExecutionContextService executionContextService;
+
+ @Mock
+ private ExecutionService executionService;
+
+ @Mock
+ private FailoverService failoverService;
+
+ @Mock
+ private JobTracingEventBus jobTracingEventBus;
+
+ @Mock
+ private ElasticJobListenerCaller caller;
+
+ @Mock
+ private JobNodeStorage jobNodeStorage;
+
+ @Mock
+ private InstanceService instanceService;
+
+ private SingleShardingJobFacade singleShardingJobFacade;
+
+ private StringBuilder orderResult;
+
+ @BeforeEach
+ void setUp() {
+ orderResult = new StringBuilder();
+ singleShardingJobFacade = new SingleShardingJobFacade(null, "test_job",
+ Arrays.asList(new TestElasticJobListener(caller, "l1", 2,
orderResult), new TestElasticJobListener(caller, "l2", 1, orderResult)), null);
+ ReflectionUtils.setSuperclassFieldValue(singleShardingJobFacade,
"configService", configService);
+ ReflectionUtils.setSuperclassFieldValue(singleShardingJobFacade,
"shardingService", shardingService);
+ ReflectionUtils.setSuperclassFieldValue(singleShardingJobFacade,
"executionContextService", executionContextService);
+ ReflectionUtils.setSuperclassFieldValue(singleShardingJobFacade,
"executionService", executionService);
+ ReflectionUtils.setSuperclassFieldValue(singleShardingJobFacade,
"failoverService", failoverService);
+ ReflectionUtils.setSuperclassFieldValue(singleShardingJobFacade,
"jobTracingEventBus", jobTracingEventBus);
+ ReflectionUtils.setFieldValue(singleShardingJobFacade,
"configService", configService);
+ ReflectionUtils.setFieldValue(singleShardingJobFacade,
"shardingService", shardingService);
+ ReflectionUtils.setFieldValue(singleShardingJobFacade,
"executionContextService", executionContextService);
+ ReflectionUtils.setFieldValue(singleShardingJobFacade,
"executionService", executionService);
+ ReflectionUtils.setFieldValue(singleShardingJobFacade,
"failoverService", failoverService);
+ ReflectionUtils.setFieldValue(singleShardingJobFacade,
"jobTracingEventBus", jobTracingEventBus);
+ ReflectionUtils.setFieldValue(singleShardingJobFacade,
"jobNodeStorage", jobNodeStorage);
+ ReflectionUtils.setFieldValue(singleShardingJobFacade,
"instanceService", instanceService);
+ }
+
+ @Test
+ void assertLoad() {
+ JobConfiguration expected = JobConfiguration.newBuilder("test_job",
1).cron("0/1 * * * * ?").build();
+ when(configService.load(true)).thenReturn(expected);
+ assertThat(singleShardingJobFacade.loadJobConfiguration(true),
is(expected));
+ }
+
+ @Test
+ void assertCheckMaxTimeDiffSecondsTolerable() throws
JobExecutionEnvironmentException {
+ singleShardingJobFacade.checkJobExecutionEnvironment();
+ verify(configService).checkMaxTimeDiffSecondsTolerable();
+ }
+
+ @Test
+ void assertFailoverIfUnnecessary() {
+
when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job",
1).cron("0/1 * * * * ?").failover(false).build());
+ singleShardingJobFacade.failoverIfNecessary();
+ verify(failoverService, times(0)).failoverIfNecessary();
+ }
+
+ @Test
+ void assertFailoverIfNecessary() {
+
when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job",
1).cron("0/1 * * * * ?").failover(true).monitorExecution(true).build());
+ singleShardingJobFacade.failoverIfNecessary();
+ verify(failoverService).failoverIfNecessary();
+ }
+
+ @Test
+ void assertRegisterJobBegin() {
+ ShardingContexts shardingContexts = new
ShardingContexts("fake_task_id", "test_job", 1, "", Collections.emptyMap());
+ singleShardingJobFacade.registerJobBegin(shardingContexts);
+ verify(executionService).registerJobBegin(shardingContexts);
+ }
+
+ @Test
+ void assertRegisterJobCompletedWhenFailoverDisabled() {
+ ShardingContexts shardingContexts = new
ShardingContexts("fake_task_id", "test_job", 1, "", Collections.emptyMap());
+
when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job",
1).cron("0/1 * * * * ?").failover(false).build());
+ singleShardingJobFacade.registerJobCompleted(shardingContexts);
+ verify(executionService).registerJobCompleted(shardingContexts);
+ verify(failoverService,
times(0)).updateFailoverComplete(shardingContexts.getShardingItemParameters().keySet());
+ }
+
+ @Test
+ void assertRegisterJobCompletedWhenFailoverEnabled() {
+ ShardingContexts shardingContexts = new
ShardingContexts("fake_task_id", "test_job", 1, "", Collections.emptyMap());
+
when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job",
1).cron("0/1 * * * * ?").failover(true).monitorExecution(true).build());
+ singleShardingJobFacade.registerJobCompleted(shardingContexts);
+ verify(executionService).registerJobCompleted(shardingContexts);
+
verify(failoverService).updateFailoverComplete(shardingContexts.getShardingItemParameters().keySet());
+ }
+
+ @Test
+ void assertRegisterJobCompletedWhenRunningOnCurrentHost() {
+ final ShardingContexts shardingContexts = new
ShardingContexts("fake_task_id", "test_job", 1, "", Collections.emptyMap());
+
when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job",
1).cron("0/1 * * * * ?").failover(true).monitorExecution(true).build());
+ JobInstance jobInstance = new JobInstance();
+ jobInstance.setServerIp("192.168.1.2");
+ JobRegistry jobRegistry = JobRegistry.getInstance();
+ jobRegistry.addJobInstance("test_job", jobInstance);
+ List<JobInstance> availJobInst = new ArrayList<>();
+ availJobInst.add(jobInstance);
+ JobInstance jobInstance2 = new JobInstance();
+ jobInstance2.setServerIp("192.168.1.3");
+ availJobInst.add(jobInstance2);
+
when(instanceService.getAvailableJobInstances()).thenReturn(availJobInst);
+
+ singleShardingJobFacade.registerJobCompleted(shardingContexts);
+
+ verify(executionService).registerJobCompleted(shardingContexts);
+
verify(failoverService).updateFailoverComplete(shardingContexts.getShardingItemParameters().keySet());
+ verify(jobNodeStorage).fillEphemeralJobNode("next-job-instance-ip",
availJobInst.get(1).getServerIp());
+ }
+
+ @Test
+ void assertRegisterJobCompletedWhenRunningOnOtherHost() {
+ final ShardingContexts shardingContexts = new
ShardingContexts("fake_task_id", "test_job", 1, "", Collections.emptyMap());
+
when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job",
1).cron("0/1 * * * * ?").failover(true).monitorExecution(true).build());
+ JobInstance jobInstance = new JobInstance();
+ jobInstance.setServerIp("192.168.1.2");
+ JobRegistry jobRegistry = JobRegistry.getInstance();
+ jobRegistry.addJobInstance("test_job", jobInstance);
+ List<JobInstance> availJobInst = new ArrayList<>();
+ JobInstance jobInstance2 = new JobInstance();
+ jobInstance2.setServerIp("192.168.1.3");
+ availJobInst.add(jobInstance2);
+
when(instanceService.getAvailableJobInstances()).thenReturn(availJobInst);
+
+ singleShardingJobFacade.registerJobCompleted(shardingContexts);
+
+ verify(executionService).registerJobCompleted(shardingContexts);
+
verify(failoverService).updateFailoverComplete(shardingContexts.getShardingItemParameters().keySet());
+ verify(jobNodeStorage,
times(0)).fillEphemeralJobNode("next-job-instance-ip",
availJobInst.get(0).getServerIp());
+ }
+
+ @Test
+ void assertGetShardingContextWhenIsFailoverEnableAndFailover() {
+ ShardingContexts shardingContexts = new
ShardingContexts("fake_task_id", "test_job", 1, "", Collections.emptyMap());
+
when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job",
1).cron("0/1 * * * * ?").failover(true).monitorExecution(true).build());
+
when(failoverService.getLocalFailoverItems()).thenReturn(Collections.singletonList(0));
+
when(executionContextService.getJobShardingContext(Collections.singletonList(0))).thenReturn(shardingContexts);
+ assertThat(singleShardingJobFacade.getShardingContexts(),
is(shardingContexts));
+ verify(shardingService, times(0)).shardingIfNecessary();
+ }
+
+ @Test
+ void assertGetShardingContextWhenIsFailoverEnableAndNotFailover() {
+ ShardingContexts shardingContexts = new
ShardingContexts("fake_task_id", "test_job", 1, "", Collections.emptyMap());
+
when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job",
1).cron("0/1 * * * * ?").failover(true).monitorExecution(true).build());
+
when(failoverService.getLocalFailoverItems()).thenReturn(Collections.emptyList());
+
when(shardingService.getLocalShardingItems()).thenReturn(Lists.newArrayList(0));
+
when(failoverService.getLocalTakeOffItems()).thenReturn(Collections.emptyList());
+
when(executionContextService.getJobShardingContext(Collections.singletonList(0))).thenReturn(shardingContexts);
+ assertThat(singleShardingJobFacade.getShardingContexts(),
is(shardingContexts));
+ verify(shardingService).shardingIfNecessary();
+ }
+
+ @Test
+ void assertGetShardingContextWhenIsFailoverDisable() {
+ ShardingContexts shardingContexts = new
ShardingContexts("fake_task_id", "test_job", 1, "", Collections.emptyMap());
+
when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job",
1).cron("0/1 * * * * ?").failover(false).build());
+
when(shardingService.getLocalShardingItems()).thenReturn(Collections.singletonList(0));
+
when(executionContextService.getJobShardingContext(Collections.singletonList(0))).thenReturn(shardingContexts);
+ assertThat(singleShardingJobFacade.getShardingContexts(),
is(shardingContexts));
+ verify(shardingService).shardingIfNecessary();
+ }
+
+ @Test
+ void assertGetShardingContextWhenHasDisabledItems() {
+ ShardingContexts shardingContexts = new
ShardingContexts("fake_task_id", "test_job", 1, "", Collections.emptyMap());
+
when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job",
1).cron("0/1 * * * * ?").failover(false).build());
+
when(shardingService.getLocalShardingItems()).thenReturn(Lists.newArrayList(0));
+
when(executionService.getDisabledItems(Collections.singletonList(0))).thenReturn(Collections.singletonList(0));
+
when(executionContextService.getJobShardingContext(Collections.emptyList())).thenReturn(shardingContexts);
+ assertThat(singleShardingJobFacade.getShardingContexts(),
is(shardingContexts));
+ verify(shardingService).shardingIfNecessary();
+ }
+
+ @Test
+ void
assertGetShardingContextWhenIsFailoverDisableAndNoNeedShardingWithoutNextIP() {
+ ShardingContexts shardingContexts = new
ShardingContexts("fake_task_id", "test_job", 1, "", Collections.emptyMap());
+
when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job",
1).cron("0/1 * * * * ?").failover(false).build());
+ when(shardingService.isNeedSharding()).thenReturn(false);
+
when(jobNodeStorage.getJobNodeDataDirectly("next-job-instance-ip")).thenReturn(null);
+
when(shardingService.getLocalShardingItems()).thenReturn(Collections.singletonList(0));
+
when(executionContextService.getJobShardingContext(Collections.singletonList(0))).thenReturn(shardingContexts);
+
+ assertThat(singleShardingJobFacade.getShardingContexts(),
is(shardingContexts));
+
+ verify(shardingService, times(1)).shardingIfNecessary();
+ }
+
+ @Test
+ void
assertGetShardingContextWhenIsFailoverDisableAndNoNeedShardingWithNextIP() {
+ final ShardingContexts shardingContexts = new
ShardingContexts("fake_task_id", "test_job", 1, "", Collections.emptyMap());
+
when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job",
1).cron("0/1 * * * * ?").failover(false).build());
+ when(shardingService.isNeedSharding()).thenReturn(false);
+
when(jobNodeStorage.getJobNodeDataDirectly("next-job-instance-ip")).thenReturn("192.168.1.2");
+ JobInstance jobInstance = new JobInstance();
+ jobInstance.setServerIp("192.168.1.2");
+ JobRegistry jobRegistry = JobRegistry.getInstance();
+ jobRegistry.addJobInstance("test_job", jobInstance);
+
when(executionContextService.getJobShardingContext(Collections.singletonList(0))).thenReturn(shardingContexts);
+
+ assertThat(singleShardingJobFacade.getShardingContexts(),
is(shardingContexts));
+
+ verify(shardingService, times(0)).shardingIfNecessary();
+ }
+
+ @Test
+ void assertGetShardingContextWhenIsFailoverDisableAndNeedSharding() {
+ ShardingContexts shardingContexts = new
ShardingContexts("fake_task_id", "test_job", 1, "", Collections.emptyMap());
+
when(configService.load(true)).thenReturn(JobConfiguration.newBuilder("test_job",
1).cron("0/1 * * * * ?").failover(false).build());
+
when(shardingService.getLocalShardingItems()).thenReturn(Collections.singletonList(0));
+ when(shardingService.isNeedSharding()).thenReturn(true);
+
when(executionContextService.getJobShardingContext(Collections.singletonList(0))).thenReturn(shardingContexts);
+
+ assertThat(singleShardingJobFacade.getShardingContexts(),
is(shardingContexts));
+
+ verify(shardingService).shardingIfNecessary();
+ }
+
+ @Test
+ void assertMisfireIfRunning() {
+ when(executionService.misfireIfHasRunningItems(Arrays.asList(0,
1))).thenReturn(true);
+ assertThat(singleShardingJobFacade.misfireIfRunning(Arrays.asList(0,
1)), is(true));
+ }
+
+ @Test
+ void assertClearMisfire() {
+ singleShardingJobFacade.clearMisfire(Arrays.asList(0, 1));
+ verify(executionService).clearMisfire(Arrays.asList(0, 1));
+ }
+
+ @Test
+ void assertIsNeedSharding() {
+ when(shardingService.isNeedSharding()).thenReturn(true);
+ assertThat(singleShardingJobFacade.isNeedSharding(), is(true));
+ }
+
+ @Test
+ void assertBeforeJobExecuted() {
+ singleShardingJobFacade.beforeJobExecuted(new
ShardingContexts("fake_task_id", "test_job", 1, "", Collections.emptyMap()));
+ verify(caller, times(2)).before();
+ assertThat(orderResult.toString(), is("l2l1"));
+ }
+
+ @Test
+ void assertAfterJobExecuted() {
+ singleShardingJobFacade.afterJobExecuted(new
ShardingContexts("fake_task_id", "test_job", 1, "", Collections.emptyMap()));
+ verify(caller, times(2)).after();
+ assertThat(orderResult.toString(), is("l2l1"));
+ }
+
+ @Test
+ void assertPostJobExecutionEvent() {
+ singleShardingJobFacade.postJobExecutionEvent(null);
+ verify(jobTracingEventBus).post(null);
+ }
+}
diff --git
a/kernel/src/test/java/org/apache/shardingsphere/elasticjob/kernel/internal/sharding/strategy/type/SingleShardingBalanceJobShardingStrategyTest.java
b/kernel/src/test/java/org/apache/shardingsphere/elasticjob/kernel/internal/sharding/strategy/type/SingleShardingBalanceJobShardingStrategyTest.java
new file mode 100644
index 000000000..6f016ce72
--- /dev/null
+++
b/kernel/src/test/java/org/apache/shardingsphere/elasticjob/kernel/internal/sharding/strategy/type/SingleShardingBalanceJobShardingStrategyTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.shardingsphere.elasticjob.kernel.internal.sharding.strategy.type;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import
org.apache.shardingsphere.elasticjob.kernel.internal.sharding.JobInstance;
+import org.junit.jupiter.api.Test;
+
+class SingleShardingBalanceJobShardingStrategyTest {
+
+ private final SingleShardingBalanceJobShardingStrategy
singleShardingBalanceJobShardingStrategy = new
SingleShardingBalanceJobShardingStrategy();
+
+ @Test
+ void assertSharding() {
+ Map<JobInstance, List<Integer>> sharding =
singleShardingBalanceJobShardingStrategy.sharding(
+ Arrays.asList(new JobInstance("host0@-@0"), new
JobInstance("host1@-@0"), new JobInstance("host2@-@0")),
+ "JobName", 1);
+ int sum = sharding.values().stream().mapToInt(List::size).sum();
+ assertThat(sum, is(1));
+ }
+}