Repository: samza Updated Branches: refs/heads/master 46b1333c4 -> f6d0d6551
SAMZA-871; Heart-beat mechanism between JobCoordinator and all running containers Author: Abhishek Shivanna <ashiva...@linkedin.com> Reviewers: Navina Ramesh <nav...@apache.org>, Jagadish V <jagad...@apache.org> Closes #163 from abhishekshivanna/master Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/f6d0d655 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/f6d0d655 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/f6d0d655 Branch: refs/heads/master Commit: f6d0d65516aed4f37216dc44ab312cdf1e7b124a Parents: 46b1333 Author: Abhishek Shivanna <ashiva...@linkedin.com> Authored: Wed May 10 16:58:14 2017 -0700 Committer: vjagadish1989 <jvenk...@linkedin.com> Committed: Wed May 10 16:58:14 2017 -0700 ---------------------------------------------------------------------- .../versioned/container/metrics-table.html | 4 + .../container/ContainerHeartbeatClient.java | 112 +++++++++++++++++++ .../container/ContainerHeartbeatMonitor.java | 75 +++++++++++++ .../container/ContainerHeartbeatResponse.java | 42 +++++++ .../samza/runtime/LocalContainerRunner.java | 43 +++++-- .../samza/config/ShellCommandConfig.scala | 5 + .../samza/coordinator/JobModelManager.scala | 2 +- .../main/scala/org/apache/samza/util/Util.scala | 2 +- .../container/TestContainerHeartbeatClient.java | 81 ++++++++++++++ .../TestContainerHeartbeatMonitor.java | 63 +++++++++++ .../samza/job/yarn/YarnContainerRunner.java | 2 + .../webapp/YarnContainerHeartbeatServlet.java | 92 +++++++++++++++ .../job/yarn/SamzaYarnAppMasterService.scala | 3 +- .../TestYarnContainerHeartbeatServlet.java | 97 ++++++++++++++++ .../yarn/TestSamzaYarnAppMasterService.scala | 21 ++-- 15 files changed, 624 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/f6d0d655/docs/learn/documentation/versioned/container/metrics-table.html ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/container/metrics-table.html b/docs/learn/documentation/versioned/container/metrics-table.html index 8d425a2..2eb46e3 100644 --- a/docs/learn/documentation/versioned/container/metrics-table.html +++ b/docs/learn/documentation/versioned/container/metrics-table.html @@ -483,6 +483,10 @@ <td>job-healthy</td> <td>State indicating whether the job is healthy or not</td> </tr> + <tr> + <td>heartbeats-expired</td> + <td>Number of heartbeat requests from containers that are invalid</td> + </tr> <tr> <th colspan="2" class="section" id="kafka-system-consumer-metrics">org.apache.samza.system.kafka.KafkaSystemConsumerMetrics</th> http://git-wip-us.apache.org/repos/asf/samza/blob/f6d0d655/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatClient.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatClient.java b/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatClient.java new file mode 100644 index 0000000..cc14948 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatClient.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.container; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.stream.Collectors; +import org.apache.samza.util.Util; +import org.codehaus.jackson.map.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Issues a heartbeat to the coordinator and returns a + * {@link ContainerHeartbeatResponse}. + * Here's the description of the protocol between the + * container and the coordinator: + * + * The heartbeat request contains a <code> executionContainerId + * </code> that identifies the container from which the + * request is made. The coordinator validates the provided + * executionContainerId against its list of containers that should be + * running and returns a {@link ContainerHeartbeatResponse}. + * + * The returned {@link ContainerHeartbeatResponse#isAlive()} is + * <code> true </code> iff. the coordinator has determined + * that the container is valid and should continue running. + */ +public class ContainerHeartbeatClient { + private static final Logger LOG = LoggerFactory.getLogger(ContainerHeartbeatClient.class); + private static final int NUM_RETRIES = 3; + private static final int TIMEOUT_MS = 5000; + private static final int BACKOFF_MULTIPLIER = 2; + private final String heartbeatEndpoint; + + public ContainerHeartbeatClient(String coordinatorUrl, String executionEnvContainerId) { + this.heartbeatEndpoint = + String.format("%scontainerHeartbeat?executionContainerId=%s", coordinatorUrl, executionEnvContainerId); + } + + /** + * Issues a heartbeat request to the coordinator and + * returns the corresponding {@link ContainerHeartbeatResponse}. + */ + public ContainerHeartbeatResponse requestHeartbeat() { + ObjectMapper mapper = new ObjectMapper(); + ContainerHeartbeatResponse response; + String reply = ""; + try { + reply = httpGet(new URL(heartbeatEndpoint)); + LOG.debug("Container Heartbeat got response {}", reply); + response = mapper.readValue(reply, ContainerHeartbeatResponse.class); + return response; + } catch (IOException e) { + LOG.error("Error in container heart beat protocol. Query url: {} response: {}", heartbeatEndpoint, reply); + } + response = new ContainerHeartbeatResponse(false); + return response; + } + + String httpGet(URL url) throws IOException { + HttpURLConnection conn; + int delayMillis = 1000; + + for (int currentTry = 0; currentTry < NUM_RETRIES; currentTry++) { + conn = Util.getHttpConnection(url, TIMEOUT_MS); + try (BufferedReader br = new BufferedReader(new InputStreamReader(conn.getInputStream()))) { + if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) { + throw new IOException(String.format("HTTP error fetching url %s. Returned status code %d", url.toString(), + conn.getResponseCode())); + } else { + return br.lines().collect(Collectors.joining()); + } + } catch (Exception e) { + LOG.error("Error in heartbeat request", e); + sleepUninterruptibly(delayMillis); + delayMillis = delayMillis * BACKOFF_MULTIPLIER; + } + } + throw new IOException(String.format("Error fetching url: %s. Tried %d time(s).", url.toString(), NUM_RETRIES)); + } + + private void sleepUninterruptibly(int delayMillis) { + try { + Thread.sleep(delayMillis); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } +} + http://git-wip-us.apache.org/repos/asf/samza/blob/f6d0d655/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java b/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java new file mode 100644 index 0000000..940e80f --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.container; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ContainerHeartbeatMonitor { + private static final Logger LOG = LoggerFactory.getLogger(ContainerHeartbeatMonitor.class); + private static final ThreadFactory THREAD_FACTORY = new HeartbeatThreadFactory(); + private static final int SCHEDULE_MS = 60000; + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(THREAD_FACTORY); + private final Runnable onContainerExpired; + private final ContainerHeartbeatClient containerHeartbeatClient; + private boolean started = false; + + public ContainerHeartbeatMonitor(Runnable onContainerExpired, ContainerHeartbeatClient containerHeartbeatClient) { + this.onContainerExpired = onContainerExpired; + this.containerHeartbeatClient = containerHeartbeatClient; + } + + public void start() { + if (started) { + LOG.warn("Skipping attempt to start an already started ContainerHeartbeatMonitor."); + return; + } + LOG.info("Starting ContainerHeartbeatMonitor"); + scheduler.scheduleAtFixedRate(() -> { + ContainerHeartbeatResponse response = containerHeartbeatClient.requestHeartbeat(); + if (!response.isAlive()) { + onContainerExpired.run(); + } + }, 0, SCHEDULE_MS, TimeUnit.MILLISECONDS); + started = true; + } + + public void stop() { + if (started) { + LOG.info("Stopping ContainerHeartbeatMonitor"); + scheduler.shutdown(); + } + } + + private static class HeartbeatThreadFactory implements ThreadFactory { + private static final String PREFIX = "Samza-" + ContainerHeartbeatMonitor.class.getSimpleName() + "-"; + private static final AtomicInteger INSTANCE_NUM = new AtomicInteger(); + + @Override + public Thread newThread(Runnable runnable) { + return new Thread(runnable, PREFIX + INSTANCE_NUM.getAndIncrement()); + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/f6d0d655/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatResponse.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatResponse.java b/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatResponse.java new file mode 100644 index 0000000..d402ef1 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatResponse.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.container; + +import org.codehaus.jackson.annotate.JsonProperty; + + +/** + * Used to represent the heartbeat response between + * the JobCoordinator and the containers. + * {@link ContainerHeartbeatResponse#isAlive()} is set to <code>true</code> + * iff. the heartbeat is valid. + */ +public class ContainerHeartbeatResponse { + + private final boolean alive; + + public ContainerHeartbeatResponse(@JsonProperty("alive") boolean alive) { + this.alive = alive; + } + + public boolean isAlive() { + return alive; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/f6d0d655/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java index e02ee23..d690c80 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java @@ -25,6 +25,8 @@ import org.apache.samza.application.StreamApplication; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; import org.apache.samza.config.ShellCommandConfig; +import org.apache.samza.container.ContainerHeartbeatClient; +import org.apache.samza.container.ContainerHeartbeatMonitor; import org.apache.samza.container.SamzaContainer; import org.apache.samza.container.SamzaContainer$; import org.apache.samza.container.SamzaContainerExceptionHandler; @@ -55,7 +57,9 @@ public class LocalContainerRunner extends AbstractApplicationRunner { private static final Logger log = LoggerFactory.getLogger(LocalContainerRunner.class); private final JobModel jobModel; private final String containerId; - private volatile Throwable containerException = null; + private volatile Throwable containerRunnerException = null; + private ContainerHeartbeatMonitor containerHeartbeatMonitor; + private SamzaContainer container; public LocalContainerRunner(JobModel jobModel, String containerId) { super(jobModel.getConfig()); @@ -68,7 +72,7 @@ public class LocalContainerRunner extends AbstractApplicationRunner { ContainerModel containerModel = jobModel.getContainers().get(containerId); Object taskFactory = TaskFactoryUtil.createTaskFactory(config, streamApp, this); - SamzaContainer container = SamzaContainer$.MODULE$.apply( + container = SamzaContainer$.MODULE$.apply( containerModel, config, jobModel.maxChangeLogStreamPartitions, @@ -89,14 +93,14 @@ public class LocalContainerRunner extends AbstractApplicationRunner { @Override public void onContainerFailed(Throwable t) { log.info("Container Failed"); - containerException = t; + containerRunnerException = t; } }); - + startContainerHeartbeatMonitor(); container.run(); - - if (containerException != null) { - log.error("Container stopped with Exception. Exiting process now.", containerException); + stopContainerHeartbeatMonitor(); + if (containerRunnerException != null) { + log.error("Container stopped with Exception. Exiting process now.", containerRunnerException); System.exit(1); } } @@ -137,6 +141,29 @@ public class LocalContainerRunner extends AbstractApplicationRunner { MDC.put("jobId", jobId); StreamApplication streamApp = TaskFactoryUtil.createStreamApplication(config); - new LocalContainerRunner(jobModel, containerId).run(streamApp); + LocalContainerRunner localContainerRunner = new LocalContainerRunner(jobModel, containerId); + localContainerRunner.run(streamApp); + } + + private void startContainerHeartbeatMonitor() { + String coordinatorUrl = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL()); + String executionEnvContainerId = System.getenv(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID()); + if (executionEnvContainerId != null) { + log.info("Got execution environment container id: {}", executionEnvContainerId); + containerHeartbeatMonitor = new ContainerHeartbeatMonitor(() -> { + container.shutdown(); + containerRunnerException = new SamzaException("Container shutdown due to expired heartbeat"); + }, new ContainerHeartbeatClient(coordinatorUrl, executionEnvContainerId)); + containerHeartbeatMonitor.start(); + } else { + containerHeartbeatMonitor = null; + log.warn("executionEnvContainerId not set. Container heartbeat monitor will not be started"); + } + } + + private void stopContainerHeartbeatMonitor() { + if (containerHeartbeatMonitor != null) { + containerHeartbeatMonitor.stop(); + } } } http://git-wip-us.apache.org/repos/asf/samza/blob/f6d0d655/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala index 3c0f320..caad7fd 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala @@ -45,6 +45,11 @@ object ShellCommandConfig { */ val ENV_JAVA_HOME = "JAVA_HOME" + /** + * The ID assigned to the container by the execution environment (eg: YARN Container Id) + */ + val ENV_EXECUTION_ENV_CONTAINER_ID = "EXECUTION_ENV_CONTAINER_ID" + /* * The base directory for storing logged data stores used in Samza. This has to be set on all machine running Samza * containers. For example, when using YARN, it has to be set in all NMs and passed to the containers. http://git-wip-us.apache.org/repos/asf/samza/blob/f6d0d655/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala index e39ea3b..dda0b6b 100644 --- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala @@ -156,7 +156,7 @@ object JobModelManager extends Logging { jobModelRef.set(jobModel) val server = new HttpServer - server.addServlet("/*", new JobServlet(jobModelRef)) + server.addServlet("/", new JobServlet(jobModelRef)) currentJobModelManager = new JobModelManager(jobModel, server, streamPartitionCountMonitor) currentJobModelManager } http://git-wip-us.apache.org/repos/asf/samza/blob/f6d0d655/samza-core/src/main/scala/org/apache/samza/util/Util.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala index e7832a0..6c224e6 100644 --- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala +++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala @@ -172,7 +172,7 @@ object Util extends Logging { readStream(httpConn.getInputStream) } - private def getHttpConnection(url: URL, timeout: Int): HttpURLConnection = { + def getHttpConnection(url: URL, timeout: Int): HttpURLConnection = { val conn = url.openConnection() conn.setConnectTimeout(timeout) conn.setReadTimeout(timeout) http://git-wip-us.apache.org/repos/asf/samza/blob/f6d0d655/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatClient.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatClient.java b/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatClient.java new file mode 100644 index 0000000..6dc07f8 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatClient.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.container; + +import java.io.IOException; +import java.net.URL; +import junit.framework.Assert; +import org.junit.Test; + +public class TestContainerHeartbeatClient { + private MockContainerHeartbeatClient client = + new MockContainerHeartbeatClient("http://fake-endpoint/", "FAKE_CONTAINER_ID"); + + @Test + public void testClientResponseForHeartbeatAlive() + throws IOException { + client.setHttpOutput("{\"alive\": true}"); + ContainerHeartbeatResponse response = client.requestHeartbeat(); + Assert.assertTrue(response.isAlive()); + } + + @Test + public void testClientResponseForHeartbeatDead() + throws IOException { + client.setHttpOutput("{\"alive\": false}"); + ContainerHeartbeatResponse response = client.requestHeartbeat(); + Assert.assertFalse(response.isAlive()); + } + + @Test + public void testClientResponseOnBadRequest() + throws IOException { + client.shouldThrowException(true); + ContainerHeartbeatResponse response = client.requestHeartbeat(); + Assert.assertFalse(response.isAlive()); + } + + private class MockContainerHeartbeatClient extends ContainerHeartbeatClient { + private String httpOutput; + private boolean throwException = false; + + public void shouldThrowException(boolean throwException) { + this.throwException = throwException; + } + + public void setHttpOutput(String httpOutput) { + this.httpOutput = httpOutput; + } + + MockContainerHeartbeatClient(String coordinatorUrl, String executionEnvContainerId) { + super(coordinatorUrl, executionEnvContainerId); + } + + @Override + String httpGet(URL url) + throws IOException { + if (!throwException) { + return httpOutput; + } else { + throw new IOException("Exception thrown"); + } + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/f6d0d655/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java b/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java new file mode 100644 index 0000000..829a158 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.container; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import junit.framework.Assert; +import org.junit.Test; + +import static org.mockito.Mockito.*; + +public class TestContainerHeartbeatMonitor { + + @Test + public void testCallbackWhenHeartbeatDead() + throws InterruptedException { + ContainerHeartbeatClient mockClient = mock(ContainerHeartbeatClient.class); + CountDownLatch countDownLatch = new CountDownLatch(1); + Runnable onExpired = () -> { + countDownLatch.countDown(); + }; + ContainerHeartbeatMonitor monitor = new ContainerHeartbeatMonitor(onExpired, mockClient); + ContainerHeartbeatResponse response = new ContainerHeartbeatResponse(false); + when(mockClient.requestHeartbeat()).thenReturn(response); + monitor.start(); + boolean success = countDownLatch.await(2, TimeUnit.SECONDS); + Assert.assertTrue(success); + } + + @Test + public void testDoesNotCallbackWhenHeartbeatAlive() + throws InterruptedException { + ContainerHeartbeatClient client = mock(ContainerHeartbeatClient.class); + CountDownLatch countDownLatch = new CountDownLatch(1); + Runnable onExpired = () -> { + countDownLatch.countDown(); + }; + ContainerHeartbeatMonitor monitor = new ContainerHeartbeatMonitor(onExpired, client); + ContainerHeartbeatResponse response = new ContainerHeartbeatResponse(true); + when(client.requestHeartbeat()).thenReturn(response); + monitor.start(); + boolean success = countDownLatch.await(2, TimeUnit.SECONDS); + Assert.assertFalse(success); + Assert.assertEquals(1, countDownLatch.getCount()); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/f6d0d655/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java index 84ded62..cdcf2d1 100644 --- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java +++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java @@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.util.Records; import org.apache.samza.clustermanager.SamzaContainerLaunchException; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; +import org.apache.samza.config.ShellCommandConfig; import org.apache.samza.config.YarnConfig; import org.apache.samza.job.CommandBuilder; import org.apache.samza.util.Util; @@ -110,6 +111,7 @@ public class YarnContainerRunner { log.info("Container ID {} using command {}", samzaContainerId, command); Map<String, String> env = getEscapedEnvironmentVariablesMap(cmdBuilder); + env.put(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID(), Util.envVarEscape(container.getId().toString())); printContainerEnvironmentVariables(samzaContainerId, env); log.info("Samza FWK path: " + command + "; env=" + env); http://git-wip-us.apache.org/repos/asf/samza/blob/f6d0d655/samza-yarn/src/main/java/org/apache/samza/webapp/YarnContainerHeartbeatServlet.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/java/org/apache/samza/webapp/YarnContainerHeartbeatServlet.java b/samza-yarn/src/main/java/org/apache/samza/webapp/YarnContainerHeartbeatServlet.java new file mode 100644 index 0000000..002f365 --- /dev/null +++ b/samza-yarn/src/main/java/org/apache/samza/webapp/YarnContainerHeartbeatServlet.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.webapp; + +import java.io.IOException; +import java.io.PrintWriter; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.samza.container.ContainerHeartbeatResponse; +import org.apache.samza.job.yarn.SamzaAppMasterMetrics; +import org.apache.samza.job.yarn.YarnAppState; +import org.apache.samza.job.yarn.YarnContainer; +import org.apache.samza.metrics.Counter; +import org.apache.samza.metrics.ReadableMetricsRegistry; +import org.codehaus.jackson.map.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Responds to heartbeat requests from the containers with a {@link ContainerHeartbeatResponse}. + * The heartbeat request contains the <code> executionContainerId </code> + * which in YARN's case is the YARN container Id. + * This servlet validates the container Id against the list + * of running containers maintained in the {@link YarnAppState}. + * The returned {@link ContainerHeartbeatResponse#isAlive()} is + * <code> true </code> iff. the container Id exists in {@link YarnAppState#runningYarnContainers}. + */ +public class YarnContainerHeartbeatServlet extends HttpServlet { + + private static final String YARN_CONTAINER_ID = "executionContainerId"; + private static final Logger LOG = LoggerFactory.getLogger(YarnContainerHeartbeatServlet.class); + private static final String APPLICATION_JSON = "application/json"; + private static final String GROUP = SamzaAppMasterMetrics.class.getName(); + private final Counter heartbeatsExpiredCount; + + private YarnAppState yarnAppState; + private ObjectMapper mapper; + + public YarnContainerHeartbeatServlet(YarnAppState yarnAppState, ReadableMetricsRegistry registry) { + this.yarnAppState = yarnAppState; + this.mapper = new ObjectMapper(); + this.heartbeatsExpiredCount = registry.newCounter(GROUP, "heartbeats-expired"); + } + + @Override + protected void doGet(HttpServletRequest req, HttpServletResponse resp) + throws ServletException, IOException { + ContainerId yarnContainerId; + PrintWriter printWriter = resp.getWriter(); + String containerIdParam = req.getParameter(YARN_CONTAINER_ID); + ContainerHeartbeatResponse response; + resp.setContentType(APPLICATION_JSON); + boolean alive = false; + try { + yarnContainerId = ContainerId.fromString(containerIdParam); + for (YarnContainer yarnContainer : yarnAppState.runningYarnContainers.values()) { + if (yarnContainer.id().compareTo(yarnContainerId) == 0) { + alive = true; + break; + } + } + if (!alive) { + heartbeatsExpiredCount.inc(); + } + response = new ContainerHeartbeatResponse(alive); + printWriter.write(mapper.writeValueAsString(response)); + } catch (IllegalArgumentException e) { + LOG.error("Container ID {} passed is invalid", containerIdParam); + resp.sendError(HttpServletResponse.SC_BAD_REQUEST, e.getMessage()); + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/f6d0d655/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterService.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterService.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterService.scala index 5f2bfc5..f436f79 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterService.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterService.scala @@ -28,7 +28,7 @@ import org.apache.samza.coordinator.stream.CoordinatorStreamWriter import org.apache.samza.coordinator.stream.messages.SetConfig import org.apache.samza.metrics.ReadableMetricsRegistry import org.apache.samza.util.Logging -import org.apache.samza.webapp.{ApplicationMasterWebServlet, ApplicationMasterRestServlet} +import org.apache.samza.webapp.{ApplicationMasterRestServlet, ApplicationMasterWebServlet, YarnContainerHeartbeatServlet} /** * Samza's application master runs a very basic HTTP/JSON service to allow @@ -56,6 +56,7 @@ class SamzaYarnAppMasterService(config: Config, samzaAppState: SamzaApplicationS webApp.addServlet("/*", new ApplicationMasterWebServlet(config, samzaAppState, state)) webApp.start + samzaAppState.jobModelManager.server.addServlet("/containerHeartbeat", new YarnContainerHeartbeatServlet(state, registry)) samzaAppState.jobModelManager.start state.rpcUrl = rpcApp.getUrl state.trackingUrl = webApp.getUrl http://git-wip-us.apache.org/repos/asf/samza/blob/f6d0d655/samza-yarn/src/test/java/org/apache/samza/webapp/TestYarnContainerHeartbeatServlet.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/java/org/apache/samza/webapp/TestYarnContainerHeartbeatServlet.java b/samza-yarn/src/test/java/org/apache/samza/webapp/TestYarnContainerHeartbeatServlet.java new file mode 100644 index 0000000..d6fc254 --- /dev/null +++ b/samza-yarn/src/test/java/org/apache/samza/webapp/TestYarnContainerHeartbeatServlet.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.webapp; + +import java.io.IOException; +import java.net.URL; +import junit.framework.Assert; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.samza.container.ContainerHeartbeatClient; +import org.apache.samza.container.ContainerHeartbeatResponse; +import org.apache.samza.coordinator.server.HttpServer; +import org.apache.samza.job.yarn.YarnAppState; +import org.apache.samza.job.yarn.YarnContainer; +import org.apache.samza.metrics.MetricsRegistryMap; +import org.apache.samza.metrics.ReadableMetricsRegistry; +import org.apache.samza.util.Util; +import org.codehaus.jackson.map.ObjectMapper; +import org.eclipse.jetty.servlet.DefaultServlet; +import org.eclipse.jetty.servlet.ServletHolder; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestYarnContainerHeartbeatServlet { + + private YarnContainer container; + private YarnAppState yarnAppState; + private HttpServer webApp; + private ObjectMapper mapper; + + private ContainerHeartbeatResponse heartbeat; + + @Before + public void setup() + throws Exception { + container = mock(YarnContainer.class); + ReadableMetricsRegistry registry = new MetricsRegistryMap("test-registry"); + + yarnAppState = + new YarnAppState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "testHost", 1, 1); + webApp = new HttpServer("/", 0, "", new ServletHolder(new DefaultServlet())); + webApp.addServlet("/", new YarnContainerHeartbeatServlet(yarnAppState, registry)); + webApp.start(); + mapper = new ObjectMapper(); + } + + @After + public void cleanup() + throws Exception { + webApp.stop(); + } + + @Test + public void testContainerHeartbeatWhenValid() + throws IOException { + String VALID_CONTAINER_ID = "container_1350670447861_0003_01_000002"; + when(container.id()).thenReturn(ConverterUtils.toContainerId(VALID_CONTAINER_ID)); + yarnAppState.runningYarnContainers.put(VALID_CONTAINER_ID, container); + URL url = new URL(webApp.getUrl().toString() + "containerHeartbeat?executionContainerId=" + VALID_CONTAINER_ID); + String response = Util.read(url, 1000); + heartbeat = mapper.readValue(response, ContainerHeartbeatResponse.class); + Assert.assertTrue(heartbeat.isAlive()); + } + + @Test + public void testContainerHeartbeatWhenInvalid() + throws IOException { + String VALID_CONTAINER_ID = "container_1350670447861_0003_01_000003"; + String INVALID_CONTAINER_ID = "container_1350670447861_0003_01_000002"; + when(container.id()).thenReturn(ConverterUtils.toContainerId(VALID_CONTAINER_ID)); + yarnAppState.runningYarnContainers.put(VALID_CONTAINER_ID, container); + URL url = new URL(webApp.getUrl().toString() + "containerHeartbeat?executionContainerId=" + INVALID_CONTAINER_ID); + String response = Util.read(url, 1000); + heartbeat = mapper.readValue(response, ContainerHeartbeatResponse.class); + Assert.assertFalse(heartbeat.isAlive()); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/f6d0d655/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala index 65c03d1..73c7f49 100644 --- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala +++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala @@ -19,19 +19,20 @@ package org.apache.samza.job.yarn -import java.io.BufferedReader +import java.io.{BufferedReader, InputStreamReader} import java.net.URL -import java.io.InputStreamReader + import org.apache.hadoop.yarn.util.ConverterUtils import org.apache.samza.clustermanager.SamzaApplicationState -import org.apache.samza.config.MapConfig -import org.junit.Assert._ -import org.junit.Test -import scala.collection.JavaConverters._ -import org.apache.samza.config.Config +import org.apache.samza.config.{Config, MapConfig} import org.apache.samza.container.TaskName import org.apache.samza.coordinator.JobModelManager import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory +import org.apache.samza.metrics._ +import org.junit.Assert._ +import org.junit.Test + +import scala.collection.JavaConverters._ class TestSamzaYarnAppMasterService { @@ -40,9 +41,10 @@ class TestSamzaYarnAppMasterService { val config = getDummyConfig val jobModelManager = JobModelManager(config) val samzaState = new SamzaApplicationState(jobModelManager) + val registry = new MetricsRegistryMap() val state = new YarnAppState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "testHost", 1, 1); - val service = new SamzaYarnAppMasterService(config, samzaState, state, null, null) + val service = new SamzaYarnAppMasterService(config, samzaState, state, registry, null) val taskName = new TaskName("test") // start the dashboard @@ -75,8 +77,9 @@ class TestSamzaYarnAppMasterService { val jobModelManager = JobModelManager(config) val samzaState = new SamzaApplicationState(jobModelManager) val state = new YarnAppState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "testHost", 1, 1); + val registry = new MetricsRegistryMap() - val service = new SamzaYarnAppMasterService(config, samzaState, state, null, null) + val service = new SamzaYarnAppMasterService(config, samzaState, state, registry, null) // start the dashboard service.onInit