HIVE-16013: Fragments without locality can stack up on nodes (Prasanth Jayachandran reviewed by Siddharth Seth)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/fc970f6f Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/fc970f6f Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/fc970f6f Branch: refs/heads/hive-14535 Commit: fc970f6f67a80f272d93f4e779d6792e4133a46f Parents: 7f1c29e Author: Prasanth Jayachandran <prasan...@apache.org> Authored: Mon Feb 27 11:54:45 2017 -0800 Committer: Prasanth Jayachandran <prasan...@apache.org> Committed: Mon Feb 27 11:54:45 2017 -0800 ---------------------------------------------------------------------- .../tezplugins/LlapTaskSchedulerService.java | 110 ++++++---- .../TestLlapTaskSchedulerService.java | 199 +++++++++++++++++-- 2 files changed, 260 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/fc970f6f/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java index 97191f8..fe73ff1 100644 --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java @@ -64,6 +64,8 @@ import org.apache.hadoop.hive.llap.metrics.MetricsUtils; import org.apache.hadoop.hive.llap.registry.ServiceInstance; import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet; import org.apache.hadoop.hive.llap.registry.ServiceInstanceStateChangeListener; +import org.apache.hadoop.hive.llap.registry.ServiceRegistry; +import org.apache.hadoop.hive.llap.registry.impl.InactiveServiceInstance; import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; import org.apache.hadoop.hive.llap.tezplugins.helpers.MonotonicClock; import org.apache.hadoop.hive.llap.tezplugins.scheduler.LoggingFutureCallback; @@ -193,6 +195,7 @@ public class LlapTaskSchedulerService extends TaskScheduler { private final LlapTaskSchedulerMetrics metrics; private final JvmPauseMonitor pauseMonitor; + private final Random random = new Random(); public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext) { this(taskSchedulerContext, new MonotonicClock(), true); @@ -330,6 +333,11 @@ public class LlapTaskSchedulerService extends TaskScheduler { } } + @VisibleForTesting + public void setServiceInstanceSet(ServiceInstanceSet serviceInstanceSet) { + this.activeInstances = serviceInstanceSet; + } + private class NodeStateChangeListener implements ServiceInstanceStateChangeListener { private final Logger LOG = LoggerFactory.getLogger(NodeStateChangeListener.class); @@ -804,54 +812,73 @@ public class LlapTaskSchedulerService extends TaskScheduler { /* fall through - miss in locality or no locality-requested */ Collection<ServiceInstance> instances = activeInstances.getAllInstancesOrdered(true); - ArrayList<NodeInfo> allNodes = new ArrayList<>(instances.size()); + List<NodeInfo> allNodes = new ArrayList<>(instances.size()); + List<NodeInfo> activeNodesWithFreeSlots = new ArrayList<>(); for (ServiceInstance inst : instances) { - NodeInfo nodeInfo = instanceToNodeMap.get(inst.getWorkerIdentity()); - if (nodeInfo != null) { - allNodes.add(nodeInfo); + if (inst instanceof InactiveServiceInstance) { + allNodes.add(null); + } else { + NodeInfo nodeInfo = instanceToNodeMap.get(inst.getWorkerIdentity()); + if (nodeInfo == null) { + allNodes.add(null); + } else { + allNodes.add(nodeInfo); + if (nodeInfo.canAcceptTask()) { + activeNodesWithFreeSlots.add(nodeInfo); + } + } } } + if (allNodes.isEmpty()) { + return SELECT_HOST_RESULT_DELAYED_RESOURCES; + } + + // no locality-requested, randomly pick a node containing free slots if (requestedHosts == null || requestedHosts.length == 0) { - // no locality-requested, iterate the available hosts in consistent order from the beginning - if (LOG.isDebugEnabled()) { - LOG.debug("No-locality requested. Attempting to allocate next available host for task={}", request.task); - } - for (NodeInfo nodeInfo : allNodes) { - if (nodeInfo.canAcceptTask()) { - LOG.info("Assigning {} when looking for any host, from #hosts={}, requestedHosts={}", - nodeInfo.toShortString(), allNodes.size(), ((requestedHosts == null || requestedHosts.length == 0) - ? "null" : requestedHostsDebugStr)); - return new SelectHostResult(nodeInfo); - } - } - } else { - // miss in locality request, try the next available host that can accept tasks (assume the consistent instances - // list as a ring) from the index of first requested host - final String firstRequestedHost = requestedHosts[0]; if (LOG.isDebugEnabled()) { - LOG.debug("Locality miss. Attempting to allocate next available host from first requested host({}) for " + - "task={}", firstRequestedHost, request.task); + LOG.debug("No-locality requested. Selecting a random host for task={}", request.task); } - int requestedHostIdx = -1; - for (int i = 0; i < allNodes.size(); i++) { - if (allNodes.get(i).getHost().equals(firstRequestedHost)) { + return randomSelection(activeNodesWithFreeSlots); + } + + // miss in locality request, try picking consistent location with fallback to random selection + final String firstRequestedHost = requestedHosts[0]; + int requestedHostIdx = -1; + for (int i = 0; i < allNodes.size(); i++) { + NodeInfo nodeInfo = allNodes.get(i); + if (nodeInfo != null) { + if (nodeInfo.getHost().equals(firstRequestedHost)){ requestedHostIdx = i; break; } } + } - for (int i = 0; i < allNodes.size(); i++) { - NodeInfo nodeInfo = allNodes.get((i + requestedHostIdx + 1) % allNodes.size()); - if (nodeInfo.canAcceptTask()) { - if (LOG.isInfoEnabled()) { - LOG.info("Assigning {} when looking for first requested host, from #hosts={}," - + " requestedHosts={}", nodeInfo.toShortString(), allNodes.size(), - ((requestedHosts == null || requestedHosts.length == 0) ? "null" : - requestedHostsDebugStr)); - } - return new SelectHostResult(nodeInfo); + // requested host died or unknown host requested, fallback to random selection. + // TODO: At this point we don't know the slot number of the requested host, so can't rollover to next available + if (requestedHostIdx == -1) { + if (LOG.isDebugEnabled()) { + LOG.debug("Requested node [{}] in consistent order does not exist. Falling back to random selection for " + + "request {}", firstRequestedHost, request); + } + return randomSelection(activeNodesWithFreeSlots); + } + + // requested host is still alive but cannot accept task, pick the next available host in consistent order + for (int i = 0; i < allNodes.size(); i++) { + NodeInfo nodeInfo = allNodes.get((i + requestedHostIdx + 1) % allNodes.size()); + // next node in consistent order died or does not have free slots, rollover to next + if (nodeInfo == null || !nodeInfo.canAcceptTask()) { + continue; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Assigning {} in consistent order when looking for first requested host, from #hosts={}," + + " requestedHosts={}", nodeInfo.toShortString(), allNodes.size(), + ((requestedHosts == null || requestedHosts.length == 0) ? "null" : + requestedHostsDebugStr)); } + return new SelectHostResult(nodeInfo); } } @@ -861,6 +888,19 @@ public class LlapTaskSchedulerService extends TaskScheduler { } } + private SelectHostResult randomSelection(final List<NodeInfo> nodesWithFreeSlots) { + if (nodesWithFreeSlots.isEmpty()) { + return SELECT_HOST_RESULT_DELAYED_RESOURCES; + } + + NodeInfo randomNode = nodesWithFreeSlots.get(random.nextInt(nodesWithFreeSlots.size())); + if (LOG.isInfoEnabled()) { + LOG.info("Assigning {} when looking for any host, from #hosts={}, requestedHosts=null", + randomNode.toShortString(), nodesWithFreeSlots.size()); + } + return new SelectHostResult(randomNode); + } + private void addNode(NodeInfo node, ServiceInstance serviceInstance) { // we have just added a new node. Signal timeout monitor to reset timer if (activeInstances.size() == 1) { http://git-wip-us.apache.org/repos/asf/hive/blob/fc970f6f/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java index d60635b..339f513 100644 --- a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java +++ b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java @@ -28,6 +28,8 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.DelayQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -38,6 +40,9 @@ import java.util.concurrent.locks.ReentrantLock; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.llap.registry.ServiceInstance; +import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet; +import org.apache.hadoop.hive.llap.registry.impl.InactiveServiceInstance; import org.apache.hadoop.hive.llap.registry.impl.LlapFixedRegistryImpl; import org.apache.hadoop.hive.llap.testhelpers.ControlledClock; import org.apache.hadoop.hive.llap.tezplugins.helpers.MonotonicClock; @@ -58,6 +63,8 @@ import org.mockito.ArgumentCaptor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.ImmutableSet; + public class TestLlapTaskSchedulerService { private static final Logger LOG = LoggerFactory.getLogger(TestLlapTaskSchedulerService.class); @@ -65,6 +72,7 @@ public class TestLlapTaskSchedulerService { private static final String HOST1 = "host1"; private static final String HOST2 = "host2"; private static final String HOST3 = "host3"; + private static final String HOST4 = "host4"; @Test(timeout = 10000) public void testSimpleLocalAllocation() throws IOException, InterruptedException { @@ -456,8 +464,8 @@ public class TestLlapTaskSchedulerService { } } - @Test(timeout = 10000000) - public void testFallbackAllocationOrderedNext() throws IOException, InterruptedException { + @Test(timeout = 10000) + public void testHostPreferenceUnknownAndNotSpecified() throws IOException, InterruptedException { Priority priority1 = Priority.newInstance(1); String[] hostsKnown = new String[]{HOST1, HOST2}; @@ -476,10 +484,72 @@ public class TestLlapTaskSchedulerService { Object task3 = "task3"; Object clientCookie3 = "cookie3"; + Object task4 = "task4"; + Object clientCookie4 = "cookie4"; + tsWrapper.controlScheduler(true); - tsWrapper.allocateTask(task1, hostsUnknown, priority1, clientCookie1); + tsWrapper.allocateTask(task1, hostsKnown, priority1, clientCookie1); tsWrapper.allocateTask(task2, hostsKnown, priority1, clientCookie2); - tsWrapper.allocateTask(task3, noHosts, priority1, clientCookie3); + tsWrapper.allocateTask(task3, hostsUnknown, priority1, clientCookie3); + tsWrapper.allocateTask(task4, noHosts, priority1, clientCookie4); + + while (true) { + tsWrapper.signalSchedulerRun(); + tsWrapper.awaitSchedulerRun(); + if (tsWrapper.ts.dagStats.numTotalAllocations == 4) { + break; + } + } + + ArgumentCaptor<Object> argumentCaptor = ArgumentCaptor.forClass(Object.class); + ArgumentCaptor<Container> argumentCaptor2 = ArgumentCaptor.forClass(Container.class); + verify(tsWrapper.mockAppCallback, times(4)) + .taskAllocated(argumentCaptor.capture(), any(Object.class), argumentCaptor2.capture()); + assertEquals(4, argumentCaptor.getAllValues().size()); + assertEquals(task1, argumentCaptor.getAllValues().get(0)); + assertEquals(task2, argumentCaptor.getAllValues().get(1)); + assertEquals(task3, argumentCaptor.getAllValues().get(2)); + assertEquals(task4, argumentCaptor.getAllValues().get(3)); + // 1st task requested host1, got host1 + assertEquals(HOST1, argumentCaptor2.getAllValues().get(0).getNodeId().getHost()); + // 2nd task requested host1, got host1 + assertEquals(HOST1, argumentCaptor2.getAllValues().get(1).getNodeId().getHost()); + // 3rd task requested unknown host, got host2 since host1 is full and only host2 is left in random pool + assertEquals(HOST2, argumentCaptor2.getAllValues().get(2).getNodeId().getHost()); + // 4rd task provided no location preference, got host2 since host1 is full and only host2 is left in random pool + assertEquals(HOST2, argumentCaptor2.getAllValues().get(3).getNodeId().getHost()); + + assertEquals(1, tsWrapper.ts.dagStats.numAllocationsNoLocalityRequest); + assertEquals(2, tsWrapper.ts.dagStats.numLocalAllocations); + assertEquals(1, tsWrapper.ts.dagStats.numNonLocalAllocations); + } finally { + tsWrapper.shutdown(); + } + } + + @Test(timeout = 10000) + public void testHostPreferenceMissesConsistentRollover() throws IOException, InterruptedException { + Priority priority1 = Priority.newInstance(1); + + String[] hostsKnown = new String[]{HOST1, HOST2, HOST3}; + String[] hostsLive = new String[]{HOST1, HOST2, HOST3}; + String[] hostsH2 = new String[]{HOST2}; + TestTaskSchedulerServiceWrapper tsWrapper = + new TestTaskSchedulerServiceWrapper(2000, hostsKnown, 1, 0, 0l, false, hostsLive, true); + try { + Object task1 = "task1"; + Object clientCookie1 = "cookie1"; + + Object task2 = "task2"; + Object clientCookie2 = "cookie2"; + + Object task3 = "task3"; + Object clientCookie3 = "cookie3"; + + tsWrapper.controlScheduler(true); + tsWrapper.allocateTask(task1, hostsH2, priority1, clientCookie1); + tsWrapper.allocateTask(task2, hostsH2, priority1, clientCookie2); + tsWrapper.allocateTask(task3, hostsH2, priority1, clientCookie3); while (true) { tsWrapper.signalSchedulerRun(); @@ -495,18 +565,79 @@ public class TestLlapTaskSchedulerService { .taskAllocated(argumentCaptor.capture(), any(Object.class), argumentCaptor2.capture()); assertEquals(3, argumentCaptor.getAllValues().size()); assertEquals(task1, argumentCaptor.getAllValues().get(0)); - // 1st task provided unknown host location, it should be assigned first host - assertEquals(hostsKnown[0], argumentCaptor2.getAllValues().get(0).getNodeId().getHost()); assertEquals(task2, argumentCaptor.getAllValues().get(1)); - // 2nd task provided host1 as location preference, it should be assigned host1 as it has capacity - assertEquals(hostsKnown[0], argumentCaptor2.getAllValues().get(1).getNodeId().getHost()); assertEquals(task3, argumentCaptor.getAllValues().get(2)); - // 3rd task provided no location preference, it is tried with host1 but it is full, so gets assigned host2 - assertEquals(hostsKnown[1], argumentCaptor2.getAllValues().get(2).getNodeId().getHost()); + // 1st task requested host2, got host2 + assertEquals(HOST2, argumentCaptor2.getAllValues().get(0).getNodeId().getHost()); + // 2nd task requested host2, got host3 as host2 is full + assertEquals(HOST3, argumentCaptor2.getAllValues().get(1).getNodeId().getHost()); + // 3rd task requested host2, got host1 as host2 and host3 are full + assertEquals(HOST1, argumentCaptor2.getAllValues().get(2).getNodeId().getHost()); - assertEquals(1, tsWrapper.ts.dagStats.numAllocationsNoLocalityRequest); + verify(tsWrapper.mockServiceInstanceSet, times(2)).getAllInstancesOrdered(true); + + assertEquals(0, tsWrapper.ts.dagStats.numAllocationsNoLocalityRequest); assertEquals(1, tsWrapper.ts.dagStats.numLocalAllocations); - assertEquals(1, tsWrapper.ts.dagStats.numNonLocalAllocations); + assertEquals(2, tsWrapper.ts.dagStats.numNonLocalAllocations); + } finally { + tsWrapper.shutdown(); + } + } + + @Test(timeout = 10000) + public void testHostPreferenceMissesConsistentPartialAlive() throws IOException, InterruptedException { + Priority priority1 = Priority.newInstance(1); + + String[] hostsKnown = new String[]{HOST1, HOST2, HOST3, HOST4}; + String[] hostsLive = new String[]{HOST1, HOST2, null, HOST4}; // host3 dead before scheduling + String[] hostsH2 = new String[]{HOST2}; + String[] hostsH3 = new String[]{HOST3}; + TestTaskSchedulerServiceWrapper tsWrapper = + new TestTaskSchedulerServiceWrapper(2000, hostsKnown, 1, 0, 0l, false, hostsLive, true); + try { + Object task1 = "task1"; + Object clientCookie1 = "cookie1"; + + Object task2 = "task2"; + Object clientCookie2 = "cookie2"; + + Object task3 = "task3"; + Object clientCookie3 = "cookie3"; + + tsWrapper.controlScheduler(true); + tsWrapper.allocateTask(task1, hostsH2, priority1, clientCookie1); + tsWrapper.allocateTask(task2, hostsH2, priority1, clientCookie2); + tsWrapper.allocateTask(task3, hostsH3, priority1, clientCookie3); + + while (true) { + tsWrapper.signalSchedulerRun(); + tsWrapper.awaitSchedulerRun(); + if (tsWrapper.ts.dagStats.numTotalAllocations == 3) { + break; + } + } + + ArgumentCaptor<Object> argumentCaptor = ArgumentCaptor.forClass(Object.class); + ArgumentCaptor<Container> argumentCaptor2 = ArgumentCaptor.forClass(Container.class); + verify(tsWrapper.mockAppCallback, times(3)) + .taskAllocated(argumentCaptor.capture(), any(Object.class), argumentCaptor2.capture()); + assertEquals(3, argumentCaptor.getAllValues().size()); + assertEquals(task1, argumentCaptor.getAllValues().get(0)); + assertEquals(task2, argumentCaptor.getAllValues().get(1)); + assertEquals(task3, argumentCaptor.getAllValues().get(2)); + + // 1st task requested host2, got host2 + assertEquals(HOST2, argumentCaptor2.getAllValues().get(0).getNodeId().getHost()); + // 2nd task requested host2, got host4 since host3 is dead and host2 is full + assertEquals(HOST4, argumentCaptor2.getAllValues().get(1).getNodeId().getHost()); + // 3rd task requested host3, got host1 since host3 is dead and host4 is full + assertEquals(HOST1, argumentCaptor2.getAllValues().get(2).getNodeId().getHost()); + + verify(tsWrapper.mockServiceInstanceSet, times(2)).getAllInstancesOrdered(true); + + assertEquals(0, tsWrapper.ts.dagStats.numAllocationsNoLocalityRequest); + assertEquals(1, tsWrapper.ts.dagStats.numLocalAllocations); + assertEquals(2, tsWrapper.ts.dagStats.numNonLocalAllocations); } finally { tsWrapper.shutdown(); } @@ -1316,6 +1447,7 @@ public class TestLlapTaskSchedulerService { static final Resource resource = Resource.newInstance(1024, 1); Configuration conf; TaskSchedulerContext mockAppCallback = mock(TaskSchedulerContext.class); + ServiceInstanceSet mockServiceInstanceSet = mock(ServiceInstanceSet.class); ControlledClock clock = new ControlledClock(new MonotonicClock()); ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(ApplicationId.newInstance(1000, 1), 1); LlapTaskSchedulerServiceForTest ts; @@ -1344,12 +1476,20 @@ public class TestLlapTaskSchedulerService { TestTaskSchedulerServiceWrapper(long nodeDisableTimeoutMillis, String[] hosts, int numExecutors, int waitQueueSize, long localityDelayMs, boolean controlledDelayedTaskQueue) throws IOException, InterruptedException { + this(nodeDisableTimeoutMillis, hosts, numExecutors, waitQueueSize, localityDelayMs, controlledDelayedTaskQueue, + hosts, false); + } + + TestTaskSchedulerServiceWrapper(long nodeDisableTimeoutMillis, String[] hosts, int numExecutors, + int waitQueueSize, long localityDelayMs, boolean controlledDelayedTaskQueue, String[] liveHosts, + boolean useMockRegistry) throws + IOException, InterruptedException { conf = new Configuration(); conf.setStrings(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname, hosts); conf.setInt(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname, numExecutors); conf.setInt(ConfVars.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE.varname, waitQueueSize); conf.set(ConfVars.LLAP_TASK_SCHEDULER_NODE_REENABLE_MIN_TIMEOUT_MS.varname, - nodeDisableTimeoutMillis + "ms"); + nodeDisableTimeoutMillis + "ms"); conf.setBoolean(LlapFixedRegistryImpl.FIXED_REGISTRY_RESOLVE_HOST_NAMES, false); conf.setLong(ConfVars.LLAP_TASK_SCHEDULER_LOCALITY_DELAY.varname, localityDelayMs); @@ -1358,15 +1498,46 @@ public class TestLlapTaskSchedulerService { UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf); doReturn(userPayload).when(mockAppCallback).getInitialUserPayload(); + if (useMockRegistry) { + List<ServiceInstance> liveInstances = new ArrayList<>(); + for (String host : liveHosts) { + if (host == null) { + ServiceInstance mockInactive = mock(InactiveServiceInstance.class); + doReturn(host).when(mockInactive).getHost(); + doReturn("inactive-host-" + host).when(mockInactive).getWorkerIdentity(); + doReturn(ImmutableSet.builder().add(mockInactive).build()).when(mockServiceInstanceSet).getByHost(host); + liveInstances.add(mockInactive); + } else { + ServiceInstance mockActive = mock(ServiceInstance.class); + doReturn(host).when(mockActive).getHost(); + doReturn("host-" + host).when(mockActive).getWorkerIdentity(); + doReturn(ImmutableSet.builder().add(mockActive).build()).when(mockServiceInstanceSet).getByHost(host); + liveInstances.add(mockActive); + } + } + doReturn(liveInstances).when(mockServiceInstanceSet).getAllInstancesOrdered(true); + + List<ServiceInstance> allInstances = new ArrayList<>(); + for (String host : hosts) { + ServiceInstance mockActive = mock(ServiceInstance.class); + doReturn(host).when(mockActive).getHost(); + doReturn(Resource.newInstance(100, 1)).when(mockActive).getResource(); + doReturn("host-" + host).when(mockActive).getWorkerIdentity(); + allInstances.add(mockActive); + } + doReturn(allInstances).when(mockServiceInstanceSet).getAll(); + } if (controlledDelayedTaskQueue) { ts = new LlapTaskSchedulerServiceForTestControlled(mockAppCallback, clock); } else { ts = new LlapTaskSchedulerServiceForTest(mockAppCallback, clock); } - controlScheduler(true); ts.initialize(); ts.start(); + if (useMockRegistry) { + ts.setServiceInstanceSet(mockServiceInstanceSet); + } // One scheduler pass from the nodes that are added at startup signalSchedulerRun(); controlScheduler(false);