SAMZA-1359; Handle phantom container notifications cleanly during an RM fail-over
1. Improved our container handling logic to be resilient to phantom notifications. 2. Added a new metric to Samza's ContainerProcessManager module that tracks the number of such invalid notifications. 3. Add a couple of tests that simulate this exact scenario above that we encountered during the cluster upgrade. (container starts -> container fails -> legitimate notification for the failure - container re-start -> RM fail-over -> phantom notification with a different exit code) 4. As an aside, there are a whole bunch of tests in ContainerProcessManager that rely on Thread.sleep to ensure that threads get to run in a certain order. Removed this non-determinism and made them predictable. Author: Jagadish Venkatraman <jvenk...@jvenkatr-mn2.linkedin.biz> Reviewers: Jake Maes <jm...@linkedin.com> Closes #243 from vjagadish1989/am-bug Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/35143b67 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/35143b67 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/35143b67 Branch: refs/heads/0.14.0 Commit: 35143b676f23f69324b442fd1061318a663538f0 Parents: 91b22fd Author: Jagadish Venkatraman <jvenk...@jvenkatr-mn2.linkedin.biz> Authored: Tue Jul 25 11:18:14 2017 -0700 Committer: Jagadish <jagad...@apache.org> Committed: Tue Jul 25 11:18:14 2017 -0700 ---------------------------------------------------------------------- .../clustermanager/ContainerProcessManager.java | 145 +++++++------ .../clustermanager/SamzaApplicationState.java | 8 + .../ContainerProcessManagerMetrics.scala | 1 + .../clustermanager/MockContainerAllocator.java | 24 +++ .../TestContainerProcessManager.java | 207 ++++++++++++++++--- 5 files changed, 283 insertions(+), 102 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/35143b67/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java index 9b5e871..2861e9e 100644 --- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java @@ -239,9 +239,10 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback } if (containerId == null) { log.info("No matching container id found for " + containerStatus.toString()); - } else { - state.runningContainers.remove(containerId); + state.redundantNotifications.incrementAndGet(); + return; } + state.runningContainers.remove(containerId); int exitStatus = containerStatus.getExitCode(); switch (exitStatus) { @@ -250,10 +251,8 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback state.completedContainers.incrementAndGet(); - if (containerId != null) { - state.finishedContainers.incrementAndGet(); - containerFailures.remove(containerId); - } + state.finishedContainers.incrementAndGet(); + containerFailures.remove(containerId); if (state.completedContainers.get() == state.containerCount.get()) { log.info("Setting job status to SUCCEEDED, since all containers have been marked as completed."); @@ -273,18 +272,16 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback state.releasedContainers.incrementAndGet(); // If this container was assigned some partitions (a containerId), then - // clean up, and request a refactor container for the tasks. This only + // clean up, and request a new container for the tasks. This only // should happen if the container was 'lost' due to node failure, not // if the AM released the container. - if (containerId != null) { - log.info("Released container {} was assigned task group ID {}. Requesting a refactor container for the task group.", containerIdStr, containerId); + log.info("Released container {} was assigned task group ID {}. Requesting a new container for the task group.", containerIdStr, containerId); - state.neededContainers.incrementAndGet(); - state.jobHealthy.set(false); + state.neededContainers.incrementAndGet(); + state.jobHealthy.set(false); - // request a container on refactor host - containerAllocator.requestResource(containerId, ResourceRequestState.ANY_HOST); - } + // request a container on new host + containerAllocator.requestResource(containerId, ResourceRequestState.ANY_HOST); break; default: @@ -296,72 +293,70 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback state.failedContainersStatus.put(containerIdStr, containerStatus); state.jobHealthy.set(false); - if (containerId != null) { - state.neededContainers.incrementAndGet(); - // Find out previously running container location - String lastSeenOn = state.jobModelManager.jobModel().getContainerToHostValue(containerId, SetContainerHostMapping.HOST_KEY); - if (!hostAffinityEnabled || lastSeenOn == null) { - lastSeenOn = ResourceRequestState.ANY_HOST; + state.neededContainers.incrementAndGet(); + // Find out previously running container location + String lastSeenOn = state.jobModelManager.jobModel().getContainerToHostValue(containerId, SetContainerHostMapping.HOST_KEY); + if (!hostAffinityEnabled || lastSeenOn == null) { + lastSeenOn = ResourceRequestState.ANY_HOST; + } + log.info("Container was last seen on " + lastSeenOn); + // A container failed for an unknown reason. Let's check to see if + // we need to shutdown the whole app master if too many container + // failures have happened. The rules for failing are that the + // failure count for a task group id must be > the configured retry + // count, and the last failure (the one prior to this one) must have + // happened less than retry window ms ago. If retry count is set to + // 0, the app master will fail on any container failure. If the + // retry count is set to a number < 0, a container failure will + // never trigger an app master failure. + int retryCount = clusterManagerConfig.getContainerRetryCount(); + int retryWindowMs = clusterManagerConfig.getContainerRetryWindowMs(); + + if (retryCount == 0) { + log.error("Container ID {} ({}) failed, and retry count is set to 0, so shutting down the application master, and marking the job as failed.", containerId, containerIdStr); + + tooManyFailedContainers = true; + } else if (retryCount > 0) { + int currentFailCount; + long lastFailureTime; + if (containerFailures.containsKey(containerId)) { + ResourceFailure failure = containerFailures.get(containerId); + currentFailCount = failure.getCount() + 1; + lastFailureTime = failure.getLastFailure(); + } else { + currentFailCount = 1; + lastFailureTime = 0L; } - log.info("Container was last seen on " + lastSeenOn); - // A container failed for an unknown reason. Let's check to see if - // we need to shutdown the whole app master if too many container - // failures have happened. The rules for failing are that the - // failure count for a task group id must be > the configured retry - // count, and the last failure (the one prior to this one) must have - // happened less than retry window ms ago. If retry count is set to - // 0, the app master will fail on any container failure. If the - // retry count is set to a number < 0, a container failure will - // never trigger an app master failure. - int retryCount = clusterManagerConfig.getContainerRetryCount(); - int retryWindowMs = clusterManagerConfig.getContainerRetryWindowMs(); - - if (retryCount == 0) { - log.error("Container ID {} ({}) failed, and retry count is set to 0, so shutting down the application master, and marking the job as failed.", containerId, containerIdStr); - - tooManyFailedContainers = true; - } else if (retryCount > 0) { - int currentFailCount; - long lastFailureTime; - if (containerFailures.containsKey(containerId)) { - ResourceFailure failure = containerFailures.get(containerId); - currentFailCount = failure.getCount() + 1; - lastFailureTime = failure.getLastFailure(); + if (currentFailCount >= retryCount) { + long lastFailureMsDiff = System.currentTimeMillis() - lastFailureTime; + + if (lastFailureMsDiff < retryWindowMs) { + log.error("Container ID " + containerId + "(" + containerIdStr + ") has failed " + currentFailCount + + " times, with last failure " + lastFailureMsDiff + "ms ago. This is greater than retry count of " + + retryCount + " and window of " + retryWindowMs + "ms , so shutting down the application master, and marking the job as failed."); + + // We have too many failures, and we're within the window + // boundary, so reset shut down the app master. + tooManyFailedContainers = true; + state.status = SamzaApplicationState.SamzaAppStatus.FAILED; } else { - currentFailCount = 1; - lastFailureTime = 0L; - } - if (currentFailCount >= retryCount) { - long lastFailureMsDiff = System.currentTimeMillis() - lastFailureTime; - - if (lastFailureMsDiff < retryWindowMs) { - log.error("Container ID " + containerId + "(" + containerIdStr + ") has failed " + currentFailCount + - " times, with last failure " + lastFailureMsDiff + "ms ago. This is greater than retry count of " + - retryCount + " and window of " + retryWindowMs + "ms , so shutting down the application master, and marking the job as failed."); - - // We have too many failures, and we're within the window - // boundary, so reset shut down the app master. - tooManyFailedContainers = true; - state.status = SamzaApplicationState.SamzaAppStatus.FAILED; - } else { - log.info("Resetting fail count for container ID {} back to 1, since last container failure ({}) for " + - "this container ID was outside the bounds of the retry window.", containerId, containerIdStr); - - // Reset counter back to 1, since the last failure for this - // container happened outside the window boundary. - containerFailures.put(containerId, new ResourceFailure(1, System.currentTimeMillis())); - } - } else { - log.info("Current fail count for container ID {} is {}.", containerId, currentFailCount); - containerFailures.put(containerId, new ResourceFailure(currentFailCount, System.currentTimeMillis())); + log.info("Resetting fail count for container ID {} back to 1, since last container failure ({}) for " + + "this container ID was outside the bounds of the retry window.", containerId, containerIdStr); + + // Reset counter back to 1, since the last failure for this + // container happened outside the window boundary. + containerFailures.put(containerId, new ResourceFailure(1, System.currentTimeMillis())); } + } else { + log.info("Current fail count for container ID {} is {}.", containerId, currentFailCount); + containerFailures.put(containerId, new ResourceFailure(currentFailCount, System.currentTimeMillis())); } + } - if (!tooManyFailedContainers) { - log.info("Requesting a refactor container "); - // Request a refactor container - containerAllocator.requestResource(containerId, lastSeenOn); - } + if (!tooManyFailedContainers) { + log.info("Requesting a new container "); + // Request a new container + containerAllocator.requestResource(containerId, lastSeenOn); } } http://git-wip-us.apache.org/repos/asf/samza/blob/35143b67/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java index bde3fac..653fb4e 100644 --- a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java @@ -116,6 +116,14 @@ public class SamzaApplicationState { public final AtomicInteger matchedResourceRequests = new AtomicInteger(0); + /** + * Number of invalid container notifications. + * + * A notification is "invalid" if the corresponding container is not currently managed by the + * {@link ContainerProcessManager} + */ + public final AtomicInteger redundantNotifications = new AtomicInteger(0); + public SamzaApplicationState(JobModelManager jobModelManager) { this.jobModelManager = jobModelManager; } http://git-wip-us.apache.org/repos/asf/samza/blob/35143b67/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala b/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala index 6c3081b..c396ed6 100644 --- a/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala +++ b/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala @@ -52,6 +52,7 @@ class ContainerProcessManagerMetrics( val mFailedContainers = newGauge("failed-containers", () => state.failedContainers.get()) val mReleasedContainers = newGauge("released-containers", () => state.releasedContainers.get()) val mContainers = newGauge("container-count", () => state.containerCount) + val mRedundantNotifications = newGauge("redundant-notifications", () => state.redundantNotifications.get()) val mJobHealthy = newGauge("job-healthy", () => if (state.jobHealthy.get()) 1 else 0) val mLocalityMatchedRequests = newGauge( http://git-wip-us.apache.org/repos/asf/samza/blob/35143b67/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocator.java index 109ed47..449b484 100644 --- a/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocator.java +++ b/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocator.java @@ -23,9 +23,13 @@ import org.apache.samza.config.Config; import java.lang.reflect.Field; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; public class MockContainerAllocator extends ContainerAllocator { public int requestedContainers = 0; + private Semaphore semaphore = new Semaphore(0); public MockContainerAllocator(ClusterResourceManager manager, Config config, @@ -33,6 +37,20 @@ public class MockContainerAllocator extends ContainerAllocator { super(manager, config, state); } + /** + * Causes the current thread to block until the expected number of containers have started. + * + * @param numExpectedContainers the number of containers expected to start + * @param timeout the maximum time to wait + * @param unit the time unit of the {@code timeout} argument + * + * @return a boolean that specifies whether containers started within the timeout. + * @throws InterruptedException if the current thread is interrupted while waiting + */ + boolean awaitContainersStart(int numExpectedContainers, long timeout, TimeUnit unit) throws InterruptedException { + return semaphore.tryAcquire(numExpectedContainers, timeout, unit); + } + @Override public void requestResources(Map<String, String> containerToHostMappings) { requestedContainers += containerToHostMappings.size(); @@ -45,4 +63,10 @@ public class MockContainerAllocator extends ContainerAllocator { return (ResourceRequestState) field.get(this); } + + @Override + protected void runStreamProcessor(SamzaResourceRequest request, String preferredHost) { + super.runStreamProcessor(request, preferredHost); + semaphore.release(); + } } http://git-wip-us.apache.org/repos/asf/samza/blob/35143b67/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java index 8199559..6978341 100644 --- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java +++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java @@ -32,16 +32,17 @@ import org.apache.samza.testUtils.MockHttpServer; import org.eclipse.jetty.servlet.DefaultServlet; import org.eclipse.jetty.servlet.ServletHolder; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import java.lang.reflect.Field; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -173,15 +174,19 @@ public class TestContainerProcessManager { state); getPrivateFieldFromTaskManager("containerAllocator", taskManager).set(taskManager, allocator); - + CountDownLatch latch = new CountDownLatch(1); getPrivateFieldFromTaskManager("allocatorThread", taskManager).set(taskManager, new Thread() { public void run() { isRunning = true; + latch.countDown(); } }); taskManager.start(); - Thread.sleep(1000); + + if (!latch.await(2, TimeUnit.SECONDS)) { + Assert.fail("timed out waiting for the latch to expire"); + } // Verify Allocator thread has started running assertTrue(isRunning); @@ -206,40 +211,56 @@ public class TestContainerProcessManager { ); taskManager.start(); - Thread.sleep(100); - Thread allocatorThread = (Thread) getPrivateFieldFromTaskManager("allocatorThread", taskManager).get(taskManager); assertTrue(allocatorThread.isAlive()); taskManager.stop(); - Thread.sleep(100); assertFalse(allocatorThread.isAlive()); - } /** * Test Task Manager should stop when all containers finish */ @Test - public void testTaskManagerShouldStopWhenContainersFinish() { + public void testTaskManagerShouldStopWhenContainersFinish() throws Exception { Config conf = getConfig(); state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1)); - ContainerProcessManager taskManager = new ContainerProcessManager( - new MapConfig(conf), - state, - new MetricsRegistryMap(), - manager + ContainerProcessManager taskManager = new ContainerProcessManager( + new MapConfig(conf), + state, + new MetricsRegistryMap(), + manager ); + MockContainerAllocator allocator = new MockContainerAllocator( + manager, + conf, + state); + + getPrivateFieldFromTaskManager("containerAllocator", taskManager).set(taskManager, allocator); + + Thread thread = new Thread(allocator); + getPrivateFieldFromTaskManager("allocatorThread", taskManager).set(taskManager, thread); + + // start triggers a request taskManager.start(); assertFalse(taskManager.shouldShutdown()); + assertEquals(1, allocator.getContainerRequestState().numPendingRequests()); - taskManager.onResourceCompleted(new SamzaResourceStatus("123", "diagnostics", SamzaResourceStatus.SUCCESS)); + SamzaResource container = new SamzaResource(1, 1024, "abc", "id0"); + taskManager.onResourceAllocated(container); + // Allow container to run and update state + if (!allocator.awaitContainersStart(1,2, TimeUnit.SECONDS)) { + fail("timed out waiting for the containers to start"); + } + assertFalse(taskManager.shouldShutdown()); + + taskManager.onResourceCompleted(new SamzaResourceStatus("id0", "diagnostics", SamzaResourceStatus.SUCCESS)); assertTrue(taskManager.shouldShutdown()); } @@ -281,7 +302,9 @@ public class TestContainerProcessManager { taskManager.onResourceAllocated(container); // Allow container to run and update state - Thread.sleep(300); + if (!allocator.awaitContainersStart(1, 2, TimeUnit.SECONDS)) { + fail("timed out waiting for the containers to start"); + } // Create first container failure taskManager.onResourceCompleted(new SamzaResourceStatus(container.getResourceID(), "diagnostics", 1)); @@ -299,7 +322,9 @@ public class TestContainerProcessManager { taskManager.onResourceAllocated(container); // Allow container to run and update state - Thread.sleep(1000); + if (!allocator.awaitContainersStart(1, 2, TimeUnit.SECONDS)) { + fail("timed out waiting for the containers to start"); + } assertTrue(state.jobHealthy.get()); @@ -318,6 +343,117 @@ public class TestContainerProcessManager { taskManager.stop(); } + @Test + public void testInvalidNotificationsAreIgnored() throws Exception { + Config conf = getConfig(); + + Map<String, String> config = new HashMap<>(); + config.putAll(getConfig()); + state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1)); + + ContainerProcessManager taskManager = new ContainerProcessManager( + new MapConfig(conf), + state, + new MetricsRegistryMap(), + manager + ); + + MockContainerAllocator allocator = new MockContainerAllocator( + manager, + conf, + state); + getPrivateFieldFromTaskManager("containerAllocator", taskManager).set(taskManager, allocator); + + Thread thread = new Thread(allocator); + getPrivateFieldFromTaskManager("allocatorThread", taskManager).set(taskManager, thread); + + // Start the task manager + taskManager.start(); + + SamzaResource container = new SamzaResource(1, 1000, "abc", "id1"); + taskManager.onResourceAllocated(container); + + // Allow container to run and update state + if (!allocator.awaitContainersStart(1,2, TimeUnit.SECONDS)) { + fail("timed out waiting for the containers to start"); + } + + // Create container failure - with ContainerExitStatus.DISKS_FAILED + taskManager.onResourceCompleted(new SamzaResourceStatus("invalidContainerID", "Disk failure", SamzaResourceStatus.DISK_FAIL)); + + // The above failure should not trigger any container requests, since it is for an invalid container ID + assertEquals(0, allocator.getContainerRequestState().numPendingRequests()); + assertFalse(taskManager.shouldShutdown()); + assertTrue(state.jobHealthy.get()); + assertEquals(state.redundantNotifications.get(), 1); + } + + @Test + public void testDuplicateNotificationsDoNotAffectJobHealth() throws Exception { + Config conf = getConfig(); + + Map<String, String> config = new HashMap<>(); + config.putAll(getConfig()); + state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1)); + + ContainerProcessManager taskManager = new ContainerProcessManager( + new MapConfig(conf), + state, + new MetricsRegistryMap(), + manager + ); + + MockContainerAllocator allocator = new MockContainerAllocator( + manager, + conf, + state); + getPrivateFieldFromTaskManager("containerAllocator", taskManager).set(taskManager, allocator); + + Thread thread = new Thread(allocator); + getPrivateFieldFromTaskManager("allocatorThread", taskManager).set(taskManager, thread); + + // Start the task manager + taskManager.start(); + assertFalse(taskManager.shouldShutdown()); + assertEquals(1, allocator.getContainerRequestState().numPendingRequests()); + + SamzaResource container1 = new SamzaResource(1, 1000, "abc", "id1"); + taskManager.onResourceAllocated(container1); + + // Allow container to run and update state + if (!allocator.awaitContainersStart(1, 2, TimeUnit.SECONDS)) { + fail("timed out waiting for the containers to start"); + } + assertEquals(0, allocator.getContainerRequestState().numPendingRequests()); + + // Create container failure - with ContainerExitStatus.DISKS_FAILED + taskManager.onResourceCompleted(new SamzaResourceStatus(container1.getResourceID(), "Disk failure", SamzaResourceStatus.DISK_FAIL)); + + // The above failure should trigger a container request + assertEquals(1, allocator.getContainerRequestState().numPendingRequests()); + assertFalse(taskManager.shouldShutdown()); + assertFalse(state.jobHealthy.get()); + assertEquals(2, manager.resourceRequests.size()); + assertEquals(0, manager.releasedResources.size()); + assertEquals(ResourceRequestState.ANY_HOST, allocator.getContainerRequestState().peekPendingRequest().getPreferredHost()); + + SamzaResource container2 = new SamzaResource(1, 1000, "abc", "id2"); + taskManager.onResourceAllocated(container2); + + // Allow container to run and update state + if (!allocator.awaitContainersStart(1, 2, TimeUnit.SECONDS)) { + fail("timed out waiting for the containers to start"); + } + assertTrue(state.jobHealthy.get()); + + // Simulate a duplicate notification for container 1 with a different exit code + taskManager.onResourceCompleted(new SamzaResourceStatus(container1.getResourceID(), "Disk failure", SamzaResourceStatus.PREEMPTED)); + // assert that a duplicate notification does not change metrics (including job health) + assertEquals(state.redundantNotifications.get(), 1); + assertEquals(2, manager.resourceRequests.size()); + assertEquals(0, manager.releasedResources.size()); + assertTrue(state.jobHealthy.get()); + } /** * Test AM requests a new container when a task fails @@ -329,8 +465,6 @@ public class TestContainerProcessManager { Map<String, String> config = new HashMap<>(); config.putAll(getConfig()); - config.remove("yarn.container.retry.count"); - state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1)); ContainerProcessManager taskManager = new ContainerProcessManager( @@ -354,14 +488,17 @@ public class TestContainerProcessManager { assertFalse(taskManager.shouldShutdown()); assertEquals(1, allocator.getContainerRequestState().numPendingRequests()); - SamzaResource container = new SamzaResource(1, 1000, "abc", "id1"); - taskManager.onResourceAllocated(container); + SamzaResource container1 = new SamzaResource(1, 1000, "abc", "id1"); + taskManager.onResourceAllocated(container1); // Allow container to run and update state - Thread.sleep(300); + if (!allocator.awaitContainersStart(1, 2, TimeUnit.SECONDS)) { + fail("timed out waiting for the containers to start"); + } + assertEquals(0, allocator.getContainerRequestState().numPendingRequests()); // Create container failure - with ContainerExitStatus.DISKS_FAILED - taskManager.onResourceCompleted(new SamzaResourceStatus(container.getResourceID(), "Disk failure", SamzaResourceStatus.DISK_FAIL)); + taskManager.onResourceCompleted(new SamzaResourceStatus(container1.getResourceID(), "Disk failure", SamzaResourceStatus.DISK_FAIL)); // The above failure should trigger a container request assertEquals(1, allocator.getContainerRequestState().numPendingRequests()); @@ -371,21 +508,37 @@ public class TestContainerProcessManager { assertEquals(0, manager.releasedResources.size()); assertEquals(ResourceRequestState.ANY_HOST, allocator.getContainerRequestState().peekPendingRequest().getPreferredHost()); + SamzaResource container2 = new SamzaResource(1, 1000, "abc", "id2"); + taskManager.onResourceAllocated(container2); + + // Allow container to run and update state + if (!allocator.awaitContainersStart(1, 2, TimeUnit.SECONDS)) { + fail("timed out waiting for the containers to start"); + } + // Create container failure - with ContainerExitStatus.PREEMPTED - taskManager.onResourceCompleted(new SamzaResourceStatus(container.getResourceID(), "Preemption", SamzaResourceStatus.PREEMPTED)); + taskManager.onResourceCompleted(new SamzaResourceStatus(container2.getResourceID(), "Preemption", SamzaResourceStatus.PREEMPTED)); + assertEquals(3, manager.resourceRequests.size()); // The above failure should trigger a container request assertEquals(1, allocator.getContainerRequestState().numPendingRequests()); assertFalse(taskManager.shouldShutdown()); assertFalse(state.jobHealthy.get()); assertEquals(ResourceRequestState.ANY_HOST, allocator.getContainerRequestState().peekPendingRequest().getPreferredHost()); + SamzaResource container3 = new SamzaResource(1, 1000, "abc", "id3"); + taskManager.onResourceAllocated(container3); + + // Allow container to run and update state + if (!allocator.awaitContainersStart(1, 2, TimeUnit.SECONDS)) { + fail("timed out waiting for the containers to start"); + } // Create container failure - with ContainerExitStatus.ABORTED - taskManager.onResourceCompleted(new SamzaResourceStatus(container.getResourceID(), "Aborted", SamzaResourceStatus.ABORTED)); + taskManager.onResourceCompleted(new SamzaResourceStatus(container3.getResourceID(), "Aborted", SamzaResourceStatus.ABORTED)); // The above failure should trigger a container request assertEquals(1, allocator.getContainerRequestState().numPendingRequests()); - assertEquals(2, manager.resourceRequests.size()); + assertEquals(4, manager.resourceRequests.size()); assertEquals(0, manager.releasedResources.size()); assertFalse(taskManager.shouldShutdown()); assertFalse(state.jobHealthy.get());