This is an automated email from the ASF dual-hosted git repository. jagadish pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push: new 0a0d081 SAMZA-2117: Handle race condition in container launch due to incorrect AM accounting 0a0d081 is described below commit 0a0d0814a5f324503631ea41fe4987c856f8fab3 Author: Jagadish <jvenkatra...@linkedin.com> AuthorDate: Tue Mar 19 10:50:58 2019 -0700 SAMZA-2117: Handle race condition in container launch due to incorrect AM accounting Steps involved when starting a Samza container: 1. Issue a request to YARN to launch a container 2. Record that container as "pending" launch. 3. Launch callback succeeds on a different thread : The callback looks at the "pending" container and marks it as "running". A race-condition in the above: If the main thread gets pre-empted between (1) and (2), the callback thread wouldn't see the container state as "pending" - hence, it wouldn't transition it to a "running" state. This PR fixes it by flipping (1) and (2) - ie., record the intent prior to issuing the launch request. Added an unit test - refactored existing tests Author: Jagadish <jvenkatra...@linkedin.com> Reviewers: Prateek M<pmahe...@linkedin.com> Closes #958 from vjagadish1989/samza-2117 --- .../clustermanager/AbstractContainerAllocator.java | 9 ++-- .../clustermanager/MockClusterResourceManager.java | 9 +++- .../MockClusterResourceManagerFactory.java | 2 +- .../clustermanager/TestContainerAllocator.java | 4 +- .../TestContainerProcessManager.java | 51 +++++++++++++++------- .../clustermanager/TestContainerRequestState.java | 2 +- .../TestHostAwareContainerAllocator.java | 11 ++--- 7 files changed, 61 insertions(+), 27 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java index 7adb1cc..adc09fe 100644 --- a/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java @@ -152,10 +152,13 @@ public abstract class AbstractContainerAllocator implements Runnable { + "timestamp {} to resource {}", new Object[]{preferredHost, String.valueOf(containerID), request.getRequestTimestampMs(), resource.getResourceID()}); - //Submit a request to launch a StreamProcessor on the provided resource. To match with the response returned later - //in the callback, we should also store state about the container whose launch is pending. - clusterResourceManager.launchStreamProcessor(resource, builder); + // Update container state as "pending" and then issue a request to launch it. It's important to perform the state-update + // prior to issuing the request. Otherwise, there's a race where the response callback may arrive sooner and not see + // the container as "pending" (SAMZA-2117) + state.pendingContainers.put(containerID, resource); + + clusterResourceManager.launchStreamProcessor(resource, builder); } /** diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java b/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java index 4545a75..7d9f13a 100644 --- a/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java +++ b/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java @@ -21,6 +21,7 @@ package org.apache.samza.clustermanager; import com.google.common.collect.ImmutableList; import org.apache.samza.job.CommandBuilder; +import org.junit.Assert; import java.util.ArrayList; import java.util.Collections; @@ -40,11 +41,13 @@ public class MockClusterResourceManager extends ClusterResourceManager { private final Semaphore requestCountSemaphore = new Semaphore(0); private final Semaphore launchCountSemaphore = new Semaphore(0); + private final SamzaApplicationState state; Throwable nextException = null; - MockClusterResourceManager(ClusterResourceManager.Callback callback) { + MockClusterResourceManager(ClusterResourceManager.Callback callback, SamzaApplicationState state) { super(callback); + this.state = state; } @Override @@ -81,12 +84,16 @@ public class MockClusterResourceManager extends ClusterResourceManager { @Override public void launchStreamProcessor(SamzaResource resource, CommandBuilder builder) { + // assert that the resource is in "pending" state prior to invoking this method + Assert.assertTrue(state.pendingContainers.values().contains(resource)); + if (nextException != null) { clusterManagerCallback.onStreamProcessorLaunchFailure(resource, new SamzaContainerLaunchException(nextException)); } else { launchedResources.add(resource); clusterManagerCallback.onStreamProcessorLaunchSuccess(resource); } + for (MockContainerListener listener : mockContainerListeners) { listener.postRunContainer(launchedResources.size()); } diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManagerFactory.java b/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManagerFactory.java index 3a464c2..0701e35 100644 --- a/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManagerFactory.java +++ b/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManagerFactory.java @@ -27,6 +27,6 @@ public class MockClusterResourceManagerFactory implements ResourceManagerFactory @Override public ClusterResourceManager getClusterResourceManager(ClusterResourceManager.Callback callback, SamzaApplicationState state) { - return new MockClusterResourceManager(callback); + return new MockClusterResourceManager(callback, state); } } diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java index dd40e7c..a20c37f 100644 --- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java +++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java @@ -41,11 +41,13 @@ import static org.junit.Assert.assertTrue; public class TestContainerAllocator { private final MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback(); - private final MockClusterResourceManager manager = new MockClusterResourceManager(callback); private final Config config = getConfig(); private final JobModelManager jobModelManager = JobModelManagerTestUtil.getJobModelManager(config, 1, new MockHttpServer("/", 7777, null, new ServletHolder(DefaultServlet.class))); + private final SamzaApplicationState state = new SamzaApplicationState(jobModelManager); + private final MockClusterResourceManager manager = new MockClusterResourceManager(callback, state); + private ContainerAllocator containerAllocator; private MockContainerRequestState requestState; private Thread allocatorThread; diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java index 841e5ba..324d17d 100644 --- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java +++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java @@ -48,8 +48,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class TestContainerProcessManager { - private final MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback(); - private final MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback); private static volatile boolean isRunning = false; @@ -85,7 +83,6 @@ public class TestContainerProcessManager { private HttpServer server = null; - private SamzaApplicationState state = null; private JobModelManager getJobModelManagerWithHostAffinity(Map<String, String> containerIdToHost) { Map<String, Map<String, String>> localityMap = new HashMap<>(); @@ -122,7 +119,10 @@ public class TestContainerProcessManager { conf.put("cluster-manager.container.memory.mb", "500"); conf.put("cluster-manager.container.cpu.cores", "5"); - state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1)); + SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1)); + MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback(); + MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state); + ContainerProcessManager taskManager = new ContainerProcessManager( new MapConfig(conf), state, @@ -143,6 +143,8 @@ public class TestContainerProcessManager { conf.put("cluster-manager.container.cpu.cores", "5"); state = new SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("0", "host1"))); + callback = new MockClusterResourceManagerCallback(); + clusterResourceManager = new MockClusterResourceManager(callback, state); taskManager = new ContainerProcessManager( new MapConfig(conf), state, @@ -161,8 +163,9 @@ public class TestContainerProcessManager { @Test public void testOnInit() throws Exception { Config conf = getConfig(); - state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1)); - + SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1)); + MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback(); + ClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state); ContainerProcessManager taskManager = new ContainerProcessManager( new MapConfig(conf), state, @@ -203,7 +206,9 @@ public class TestContainerProcessManager { @Test public void testOnShutdown() throws Exception { Config conf = getConfig(); - state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1)); + SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1)); + MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback(); + MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state); ContainerProcessManager taskManager = new ContainerProcessManager( new MapConfig(conf), @@ -227,7 +232,9 @@ public class TestContainerProcessManager { @Test public void testTaskManagerShouldStopWhenContainersFinish() throws Exception { Config conf = getConfig(); - state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1)); + SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1)); + MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback(); + MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state); ContainerProcessManager taskManager = new ContainerProcessManager( new MapConfig(conf), @@ -275,7 +282,9 @@ public class TestContainerProcessManager { @Test public void testNewContainerRequestedOnFailureWithUnknownCode() throws Exception { Config conf = getConfig(); - state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1)); + SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1)); + MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback(); + MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state); ContainerProcessManager taskManager = new ContainerProcessManager( new MapConfig(conf), @@ -352,7 +361,9 @@ public class TestContainerProcessManager { Map<String, String> config = new HashMap<>(); config.putAll(getConfig()); - state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1)); + SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1)); + MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback(); + MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state); ContainerProcessManager taskManager = new ContainerProcessManager( new MapConfig(conf), @@ -393,9 +404,11 @@ public class TestContainerProcessManager { @Test public void testRerequestOnAnyHostIfContainerStartFails() throws Exception { - state = new SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("1", "host1"))); + SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("1", "host1"))); Map<String, String> configMap = new HashMap<>(); configMap.putAll(getConfig()); + MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback(); + MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state); MockContainerAllocator allocator = new MockContainerAllocator( clusterResourceManager, @@ -421,8 +434,10 @@ public class TestContainerProcessManager { config.put("job.container.count", "2"); Config cfg = new MapConfig(config); // 1. Request two containers on hosts - host1 and host2 - state = new SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("0", "host1", + SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("0", "host1", "1", "host2"))); + MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback(); + MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state); ContainerProcessManager taskManager = new ContainerProcessManager( cfg, @@ -487,7 +502,9 @@ public class TestContainerProcessManager { Map<String, String> config = new HashMap<>(); config.putAll(getConfig()); - state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1)); + SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1)); + MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback(); + MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state); ContainerProcessManager taskManager = new ContainerProcessManager( new MapConfig(conf), @@ -561,7 +578,9 @@ public class TestContainerProcessManager { Map<String, String> config = new HashMap<>(); config.putAll(getConfig()); - state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1)); + SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1)); + MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback(); + MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state); ContainerProcessManager taskManager = new ContainerProcessManager( new MapConfig(conf), @@ -648,7 +667,9 @@ public class TestContainerProcessManager { @Test public void testAppMasterWithFwk() { Config conf = getConfig(); - state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1)); + SamzaApplicationState state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1)); + MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback(); + MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state); ContainerProcessManager taskManager = new ContainerProcessManager( new MapConfig(conf), diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerRequestState.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerRequestState.java index 3d52510..86dea85 100644 --- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerRequestState.java +++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerRequestState.java @@ -29,7 +29,7 @@ import static org.junit.Assert.assertTrue; public class TestContainerRequestState { private final MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback(); - private final MockClusterResourceManager manager = new MockClusterResourceManager(callback); + private final MockClusterResourceManager manager = new MockClusterResourceManager(callback, new SamzaApplicationState(null)); private static final String ANY_HOST = ResourceRequestState.ANY_HOST; diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java index fd3b452..c99be9b 100644 --- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java +++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java @@ -49,10 +49,12 @@ import static org.mockito.Mockito.when; public class TestHostAwareContainerAllocator { - private final MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback(); - private final MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback); private final Config config = getConfig(); - private final JobModelManager reader = initializeJobModelManager(config, 1); + private final JobModelManager jobModelManager = initializeJobModelManager(config, 1); + private final MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback(); + private final SamzaApplicationState state = new SamzaApplicationState(jobModelManager); + + private final MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state); private JobModelManager initializeJobModelManager(Config config, int containerCount) { Map<String, Map<String, String>> localityMap = new HashMap<>(); @@ -66,7 +68,6 @@ public class TestHostAwareContainerAllocator { new MockHttpServer("/", 7777, null, new ServletHolder(DefaultServlet.class))); } - private final SamzaApplicationState state = new SamzaApplicationState(reader); private HostAwareContainerAllocator containerAllocator; private final int timeoutMillis = 1000; private MockContainerRequestState requestState; @@ -414,7 +415,7 @@ public class TestHostAwareContainerAllocator { @After public void teardown() throws Exception { - reader.stop(); + jobModelManager.stop(); containerAllocator.stop(); }