http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionFlexCluster.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionFlexCluster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionFlexCluster.java new file mode 100644 index 0000000..6b61681 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionFlexCluster.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.appmaster.actions; + +import org.apache.slider.core.conf.ConfTree; +import org.apache.slider.server.appmaster.SliderAppMaster; +import org.apache.slider.server.appmaster.state.AppState; + +import java.util.concurrent.TimeUnit; + +public class ActionFlexCluster extends AsyncAction { + + public final ConfTree resources; + + public ActionFlexCluster(String name, + long delay, + TimeUnit timeUnit, ConfTree resources) { + super(name, delay, timeUnit, ATTR_CHANGES_APP_SIZE); + this.resources = resources; + } + + @Override + public void execute(SliderAppMaster appMaster, + QueueAccess queueService, + AppState appState) throws Exception { + appMaster.flexCluster(resources); + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionHalt.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionHalt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionHalt.java new file mode 100644 index 0000000..e2ad559 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionHalt.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.appmaster.actions; + +import org.apache.hadoop.util.ExitUtil; +import org.apache.slider.server.appmaster.SliderAppMaster; +import org.apache.slider.server.appmaster.state.AppState; + +import java.util.concurrent.TimeUnit; + +/** + * Exit an emergency JVM halt. + * @see ExitUtil#halt(int, String) + */ +public class ActionHalt extends AsyncAction { + + private final int status; + private final String text; + + public ActionHalt( + int status, + String text, + long delay, TimeUnit timeUnit) { + + // do not declare that this action halts the cluster ... keep it a surprise + super("Halt", delay, timeUnit); + this.status = status; + this.text = text; + } + + @Override + public void execute(SliderAppMaster appMaster, + QueueAccess queueService, + AppState appState) throws Exception { + ExitUtil.halt(status, text); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionKillContainer.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionKillContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionKillContainer.java new file mode 100644 index 0000000..7446e82 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionKillContainer.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.appmaster.actions; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.slider.server.appmaster.SliderAppMaster; +import org.apache.slider.server.appmaster.operations.AbstractRMOperation; +import org.apache.slider.server.appmaster.operations.ContainerReleaseOperation; +import org.apache.slider.server.appmaster.operations.RMOperationHandler; +import org.apache.slider.server.appmaster.operations.RMOperationHandlerActions; +import org.apache.slider.server.appmaster.state.AppState; + +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * Kill a specific container + */ +public class ActionKillContainer extends AsyncAction { + + /** + * container to kill + */ + private final ContainerId containerId; + + /** + * handler for the operation + */ + private final RMOperationHandlerActions operationHandler; + + /** + * Kill a container + * @param containerId container to kill + * @param delay + * @param timeUnit + * @param operationHandler + */ + public ActionKillContainer( + ContainerId containerId, + long delay, + TimeUnit timeUnit, + RMOperationHandlerActions operationHandler) { + super("kill container", delay, timeUnit, ATTR_CHANGES_APP_SIZE); + this.operationHandler = operationHandler; + Preconditions.checkArgument(containerId != null); + + this.containerId = containerId; + } + + /** + * Get the container ID to kill + * @return + */ + public ContainerId getContainerId() { + return containerId; + } + + @Override + public void execute(SliderAppMaster appMaster, + QueueAccess queueService, + AppState appState) throws Exception { + List<AbstractRMOperation> opsList = new LinkedList<>(); + ContainerReleaseOperation release = new ContainerReleaseOperation(containerId); + opsList.add(release); + //now apply the operations + operationHandler.execute(opsList); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionRegisterServiceInstance.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionRegisterServiceInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionRegisterServiceInstance.java new file mode 100644 index 0000000..ca330af --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionRegisterServiceInstance.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.appmaster.actions; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.slider.server.appmaster.SliderAppMaster; +import org.apache.slider.server.appmaster.state.AppState; + +import java.util.concurrent.TimeUnit; + +/** + * Asynchronous registration operation + */ +public class ActionRegisterServiceInstance extends AsyncAction { + + private final String instanceName; + private final ApplicationId appId; + + public ActionRegisterServiceInstance(String instanceName, + ApplicationId appId) { + super("ActionRegisterServiceInstance"); + this.instanceName = instanceName; + this.appId = appId; + } + + public ActionRegisterServiceInstance(String instanceName, + ApplicationId appId, + long delay, + TimeUnit timeUnit) { + super("ActionRegisterServiceInstance", delay, timeUnit); + this.instanceName = instanceName; + this.appId = appId; + } + + @Override + public void execute(SliderAppMaster appMaster, + QueueAccess queueService, + AppState appState) throws Exception { + + // YARN Registry do the registration + appMaster.registerServiceInstance(instanceName, appId); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStartContainer.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStartContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStartContainer.java new file mode 100644 index 0000000..358c844 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStartContainer.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.appmaster.actions; + +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.slider.server.appmaster.SliderAppMaster; +import org.apache.slider.server.appmaster.state.AppState; +import org.apache.slider.server.appmaster.state.RoleInstance; + +import java.util.Locale; +import java.util.concurrent.TimeUnit; + +/** + * Start a container + * @see SliderAppMaster#startContainer(Container, ContainerLaunchContext, RoleInstance) + */ +public class ActionStartContainer extends AsyncAction { + + private final Container container; + private final ContainerLaunchContext ctx; + private final RoleInstance instance; + + public ActionStartContainer(String name, + Container container, + ContainerLaunchContext ctx, + RoleInstance instance, + long delay, TimeUnit timeUnit) { + super( + String.format(Locale.ENGLISH, + "%s %s: /", + name , container.getId().toString()), + delay, + timeUnit); + this.container = container; + this.ctx = ctx; + this.instance = instance; + } + + @Override + public void execute(SliderAppMaster appMaster, + QueueAccess queueService, + AppState appState) throws Exception { + appMaster.startContainer(container, ctx, instance); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStopQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStopQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStopQueue.java new file mode 100644 index 0000000..08e8086 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStopQueue.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.appmaster.actions; + +import org.apache.slider.server.appmaster.SliderAppMaster; +import org.apache.slider.server.appmaster.state.AppState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; + +/** + * Action to tell a queue executor to stop -after handing this on/executing it + */ +public class ActionStopQueue extends AsyncAction { + private static final Logger log = + LoggerFactory.getLogger(ActionStopQueue.class); + + public ActionStopQueue(long delay) { + super("stop queue", delay); + } + + public ActionStopQueue(long delay, + TimeUnit timeUnit) { + super("stop queue", delay, timeUnit); + } + + public ActionStopQueue(String name, + long delay, + TimeUnit timeUnit) { + super(name, delay, timeUnit); + } + + @Override + public void execute(SliderAppMaster appMaster, + QueueAccess queueService, + AppState appState) throws Exception { + log.warn("STOP"); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStopSlider.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStopSlider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStopSlider.java new file mode 100644 index 0000000..055cea5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStopSlider.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.appmaster.actions; + +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.slider.core.exceptions.ExceptionConverter; +import org.apache.slider.core.exceptions.TriggerClusterTeardownException; +import org.apache.slider.core.main.ExitCodeProvider; +import org.apache.slider.core.main.LauncherExitCodes; +import org.apache.slider.server.appmaster.SliderAppMaster; +import org.apache.slider.server.appmaster.state.AppState; + +import java.util.concurrent.TimeUnit; + +/** + * Trigger an AM exit. This is used to build the exit status message for YARN + */ +public class ActionStopSlider extends AsyncAction { + + private int exitCode; + private FinalApplicationStatus finalApplicationStatus; + private String message; + private final Exception ex; + + /** + * Simple constructor + * @param name action name + */ + public ActionStopSlider(String name) { + super(name); + this.ex = null; + } + + /** + * Stop slider + * @param name action name + * @param delay execution delay + * @param timeUnit delay time unit + * @param exitCode process exit code + * @param finalApplicationStatus yarn status + * @param message message for AM + */ + public ActionStopSlider(String name, + long delay, + TimeUnit timeUnit, + int exitCode, + FinalApplicationStatus finalApplicationStatus, + String message) { + super(name, delay, timeUnit, ATTR_HALTS_APP); + this.exitCode = exitCode; + this.finalApplicationStatus = finalApplicationStatus; + this.message = message; + this.ex = null; + } + + /** + * Stop slider + * @param name action name + * @param exitCode process exit code + * @param finalApplicationStatus yarn status + * @param message message for AM + */ + public ActionStopSlider(String name, + int exitCode, + FinalApplicationStatus finalApplicationStatus, + String message) { + super(name); + this.exitCode = exitCode; + this.finalApplicationStatus = finalApplicationStatus; + this.message = message; + this.ex = null; + } + + /** + * Simple constructor + * @param ex teardown exception + */ + public ActionStopSlider(TriggerClusterTeardownException ex) { + this("stop", + ex.getExitCode(), + ex.getFinalApplicationStatus(), + ex.getMessage()); + } + + /** + * Build from an exception. + * <p> + * If the exception implements + * {@link ExitCodeProvider} then the exit code is extracted from that + * @param ex exception. + */ + public ActionStopSlider(Exception ex) { + super("stop"); + if (ex instanceof ExitCodeProvider) { + setExitCode(((ExitCodeProvider)ex).getExitCode()); + } else { + setExitCode(LauncherExitCodes.EXIT_EXCEPTION_THROWN); + } + setFinalApplicationStatus(FinalApplicationStatus.FAILED); + setMessage(ex.getMessage()); + this.ex = ex; + } + + @Override + public void execute(SliderAppMaster appMaster, + QueueAccess queueService, + AppState appState) throws Exception { + SliderAppMaster.getLog().info("SliderAppMasterApi.stopCluster: {}", + message); + appMaster.onAMStop(this); + } + + @Override + public String toString() { + return String.format("%s: exit code = %d, %s: %s;", + name, exitCode, finalApplicationStatus, message) ; + } + + public int getExitCode() { + return exitCode; + } + + public void setExitCode(int exitCode) { + this.exitCode = exitCode; + } + + public FinalApplicationStatus getFinalApplicationStatus() { + return finalApplicationStatus; + } + + public void setFinalApplicationStatus(FinalApplicationStatus finalApplicationStatus) { + this.finalApplicationStatus = finalApplicationStatus; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + + public Exception getEx() { + return ex; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionUpgradeContainers.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionUpgradeContainers.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionUpgradeContainers.java new file mode 100644 index 0000000..05fcbcc --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionUpgradeContainers.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.appmaster.actions; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.slider.server.appmaster.SliderAppMaster; +import org.apache.slider.server.appmaster.state.AppState; + +public class ActionUpgradeContainers extends AsyncAction { + private int exitCode; + private FinalApplicationStatus finalApplicationStatus; + private String message; + private Set<String> containers = new HashSet<>(); + private Set<String> components = new HashSet<>(); + + public ActionUpgradeContainers(String name, + long delay, + TimeUnit timeUnit, + int exitCode, + FinalApplicationStatus finalApplicationStatus, + List<String> containers, + List<String> components, + String message) { + super(name, delay, timeUnit); + this.exitCode = exitCode; + this.finalApplicationStatus = finalApplicationStatus; + this.containers.addAll(containers); + this.components.addAll(components); + this.message = message; + } + + @Override + public void execute(SliderAppMaster appMaster, QueueAccess queueService, + AppState appState) throws Exception { + if (CollectionUtils.isNotEmpty(this.containers) + || CollectionUtils.isNotEmpty(this.components)) { + SliderAppMaster.getLog().info("SliderAppMaster.upgradeContainers: {}", + message); + appMaster.onUpgradeContainers(this); + } + } + + public int getExitCode() { + return exitCode; + } + + public void setExitCode(int exitCode) { + this.exitCode = exitCode; + } + + public FinalApplicationStatus getFinalApplicationStatus() { + return finalApplicationStatus; + } + + public void setFinalApplicationStatus( + FinalApplicationStatus finalApplicationStatus) { + this.finalApplicationStatus = finalApplicationStatus; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + + public Set<String> getContainers() { + return containers; + } + + public void setContainers(Set<String> containers) { + this.containers = containers; + } + + public Set<String> getComponents() { + return components; + } + + public void setComponents(Set<String> components) { + this.components = components; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/AsyncAction.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/AsyncAction.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/AsyncAction.java new file mode 100644 index 0000000..f9a1fd5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/AsyncAction.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.appmaster.actions; + +import org.apache.slider.common.tools.SliderUtils; +import org.apache.slider.server.appmaster.SliderAppMaster; +import org.apache.slider.server.appmaster.state.AppState; + +import java.io.IOException; +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +public abstract class AsyncAction implements Delayed { + + private static final AtomicLong sequencer = new AtomicLong(0); + + public final String name; + private long nanos; + public final int attrs; + private final long sequenceNumber = sequencer.incrementAndGet(); + + + protected AsyncAction(String name) { + this(name, 0); + } + + protected AsyncAction(String name, + long delayMillis) { + this(name, delayMillis, TimeUnit.MILLISECONDS); + } + + protected AsyncAction(String name, + long delay, + TimeUnit timeUnit) { + this(name, delay, timeUnit, 0); + } + + protected AsyncAction(String name, + long delay, + TimeUnit timeUnit, + int attrs) { + this.name = name; + this.setNanos(convertAndOffset(delay, timeUnit)); + this.attrs = attrs; + } + + protected long convertAndOffset(long delay, TimeUnit timeUnit) { + return now() + TimeUnit.NANOSECONDS.convert(delay, timeUnit); + } + + /** + * The current time in nanos + * @return now + */ + protected long now() { + return System.nanoTime(); + } + + @Override + public long getDelay(TimeUnit unit) { + return unit.convert(getNanos() - now(), TimeUnit.NANOSECONDS); + } + + @Override + public int compareTo(Delayed that) { + if (this == that) { + return 0; + } + return SliderUtils.compareTo( + getDelay(TimeUnit.NANOSECONDS), + that.getDelay(TimeUnit.NANOSECONDS)); + } + + @Override + public String toString() { + final StringBuilder sb = + new StringBuilder(super.toString()); + sb.append(" name='").append(name).append('\''); + sb.append(", delay=").append(getDelay(TimeUnit.SECONDS)); + sb.append(", attrs=").append(attrs); + sb.append(", sequenceNumber=").append(sequenceNumber); + sb.append('}'); + return sb.toString(); + } + + protected int getAttrs() { + return attrs; + } + + /** + * Ask if an action has an of the specified bits set. + * This is not an equality test. + * @param attr attribute + * @return true iff the action has any of the bits in the attr arg set + */ + public boolean hasAttr(int attr) { + return (attrs & attr) != 0; + } + + /** + * Actual application + * @param appMaster + * @param queueService + * @param appState + * @throws IOException + */ + public abstract void execute(SliderAppMaster appMaster, + QueueAccess queueService, AppState appState) throws Exception; + + public long getNanos() { + return nanos; + } + + public void setNanos(long nanos) { + this.nanos = nanos; + } + + public static final int ATTR_CHANGES_APP_SIZE = 1; + public static final int ATTR_HALTS_APP = 2; + public static final int ATTR_REVIEWS_APP_SIZE = 4; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/EscalateOutstandingRequests.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/EscalateOutstandingRequests.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/EscalateOutstandingRequests.java new file mode 100644 index 0000000..2c545ea --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/EscalateOutstandingRequests.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.appmaster.actions; + +import org.apache.slider.server.appmaster.SliderAppMaster; +import org.apache.slider.server.appmaster.state.AppState; + +import java.util.concurrent.TimeUnit; + +/** + * Escalate outstanding requests by asking AM + */ +public class EscalateOutstandingRequests extends AsyncAction { + + public EscalateOutstandingRequests() { + super("EscalateOutstandingRequests"); + } + + public EscalateOutstandingRequests(long delay, + TimeUnit timeUnit) { + super("EscalateOutstandingRequests", delay, timeUnit, ATTR_REVIEWS_APP_SIZE); + } + + @Override + public void execute(SliderAppMaster appMaster, QueueAccess queueService, AppState appState) throws + Exception { + appMaster.escalateOutstandingRequests(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ProviderReportedContainerLoss.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ProviderReportedContainerLoss.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ProviderReportedContainerLoss.java new file mode 100644 index 0000000..41fe494 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ProviderReportedContainerLoss.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.appmaster.actions; + +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.slider.server.appmaster.SliderAppMaster; +import org.apache.slider.server.appmaster.state.AppState; + +import java.util.concurrent.TimeUnit; + +/** + * Report container loss to the AM + * {@link SliderAppMaster#providerLostContainer(ContainerId)} + */ +public class ProviderReportedContainerLoss extends AsyncAction { + + private final ContainerId containerId; + + public ProviderReportedContainerLoss(ContainerId containerId) { + this("lost container", 0, TimeUnit.MILLISECONDS, containerId); + } + + public ProviderReportedContainerLoss(String name, + long delay, + TimeUnit timeUnit, + ContainerId containerId) { + super(name, delay, timeUnit); + this.containerId = containerId; + } + + @Override + public void execute(SliderAppMaster appMaster, + QueueAccess queueService, + AppState appState) throws Exception { + appMaster.providerLostContainer(containerId); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ProviderStartupCompleted.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ProviderStartupCompleted.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ProviderStartupCompleted.java new file mode 100644 index 0000000..957a35f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ProviderStartupCompleted.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.appmaster.actions; + +import org.apache.slider.server.appmaster.SliderAppMaster; +import org.apache.slider.server.appmaster.state.AppState; + +public class ProviderStartupCompleted extends AsyncAction { + + public ProviderStartupCompleted() { + super("ProviderStartupCompleted"); + } + + @Override + public void execute(SliderAppMaster appMaster, + QueueAccess queueService, + AppState appState) throws Exception { + appMaster.eventCallbackEvent(null); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueAccess.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueAccess.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueAccess.java new file mode 100644 index 0000000..0396891 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueAccess.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.appmaster.actions; + +/** + * Access for queue operations + */ +public interface QueueAccess { + /** + * Put an action on the immediate queue -to be executed when the queue + * reaches it. + * @param action action to queue + */ + void put(AsyncAction action); + + /** + * Put a delayed action: this will only be added to the main queue + * after its action time has been reached + * @param action action to queue + */ + void schedule(AsyncAction action); + + /** + * Remove an action from the queues. + * @param action action to remove + * @return true if the action was removed + */ + boolean remove(AsyncAction action); + + /** + * Add a named renewing action + * @param name name + * @param renewingAction wrapped action + */ + void renewing(String name, + RenewingAction<? extends AsyncAction> renewingAction); + + /** + * Look up a renewing action + * @param name name of the action + * @return the action or null if none was found + */ + RenewingAction<? extends AsyncAction> lookupRenewingAction(String name); + + /** + * Remove a renewing action + * @param name action name name of the action + * @return true if the action was found and removed. + */ + boolean removeRenewingAction(String name); + + /** + * Look in the immediate queue for any actions of a specific attribute + */ + boolean hasQueuedActionWithAttribute(int attr); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueExecutor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueExecutor.java new file mode 100644 index 0000000..d0fc2cf --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueExecutor.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.appmaster.actions; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.slider.server.appmaster.SliderAppMaster; +import org.apache.slider.server.appmaster.state.AppState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Executor for async actions - hands them off to the AM as + * appropriate + */ +public class QueueExecutor implements Runnable { + private static final Logger log = + LoggerFactory.getLogger(QueueExecutor.class); + + private final SliderAppMaster appMaster; + private final QueueService actionQueues; + private final AppState appState; + + + public QueueExecutor(SliderAppMaster appMaster, + QueueService actionQueues) { + Preconditions.checkNotNull(appMaster); + Preconditions.checkNotNull(actionQueues); + + this.appMaster = appMaster; + this.actionQueues = actionQueues; + this.appState = appMaster.getAppState(); + } + + @VisibleForTesting + public QueueExecutor(QueueService actionQueues) { + Preconditions.checkNotNull(actionQueues); + this.appMaster = null; + this.appState = null; + this.actionQueues = actionQueues; + } + + /** + * Run until the queue has been told to stop + */ + @Override + public void run() { + AsyncAction take = null; + try { + log.info("Queue Executor run() started"); + do { + take = actionQueues.actionQueue.take(); + log.debug("Executing {}", take); + + take.execute(appMaster, actionQueues, appState); + log.debug("Completed {}", take); + + } while (!(take instanceof ActionStopQueue)); + log.info("Queue Executor run() stopped"); + } catch (InterruptedException e) { + // interrupted: exit + } catch (Throwable e) { + log.error("Exception processing {}: {}", take, e, e); + if (appMaster != null) { + appMaster.onExceptionInThread(Thread.currentThread(), e); + } + } + // tag completed + actionQueues.complete(); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueService.java new file mode 100644 index 0000000..34acade --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueService.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.appmaster.actions; + + +import org.apache.slider.server.services.workflow.ServiceThreadFactory; +import org.apache.slider.server.services.workflow.WorkflowExecutorService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * The Queue service provides immediate and scheduled queues, as well + * as an executor thread that moves queued actions from the scheduled + * queue to the immediate one. + * + * <p> + * This code to be revisited to see if all that was needed is the single scheduled + * queue, implicitly making actions immediate by giving them an execution + * time of "now". It would force having a sequence number to all actions, one + * which the queue would have to set from its (monotonic, thread-safe) counter + * on every submission, with a modified comparison operator. This would guarantee + * that earlier submissions were picked before later ones. + */ +public class QueueService extends WorkflowExecutorService<ExecutorService> +implements Runnable, QueueAccess { + private static final Logger log = + LoggerFactory.getLogger(QueueService.class); + public static final String NAME = "Action Queue"; + private final AtomicBoolean completed = new AtomicBoolean(false); + + /** + * Immediate actions. + */ + public final BlockingDeque<AsyncAction> actionQueue = + new LinkedBlockingDeque<>(); + + /** + * Actions to be scheduled in the future + */ + public final DelayQueue<AsyncAction> scheduledActions = new DelayQueue<>(); + + /** + * Map of renewing actions by name ... this is to allow them to + * be cancelled by name + */ + private final Map<String, RenewingAction<? extends AsyncAction>> renewingActions + = new ConcurrentHashMap<>(); + + /** + * Create a queue instance with a single thread executor + */ + public QueueService() { + super(NAME, + ServiceThreadFactory.singleThreadExecutor(NAME, true)); + } + + @Override + public void put(AsyncAction action) { + log.debug("Queueing {}", action); + actionQueue.add(action); + } + + @Override + public void schedule(AsyncAction action) { + log.debug("Scheduling {}", action); + scheduledActions.add(action); + } + + @Override + public boolean remove(AsyncAction action) { + boolean removedFromDelayQueue = scheduledActions.remove(action); + boolean removedFromActions = actionQueue.remove(action); + return removedFromActions || removedFromDelayQueue; + } + + @Override + public void renewing(String name, + RenewingAction<? extends AsyncAction> renewingAction) { + log.debug("Adding renewing Action \"{}\": {}", name, + renewingAction.getAction()); + if (removeRenewingAction(name)) { + log.debug("Removed predecessor action"); + } + renewingActions.put(name, renewingAction); + schedule(renewingAction); + } + + @Override + public RenewingAction<? extends AsyncAction> lookupRenewingAction(String name) { + return renewingActions.get(name); + } + + @Override + public boolean removeRenewingAction(String name) { + RenewingAction<? extends AsyncAction> action = renewingActions.remove(name); + return action != null && remove(action); + } + + /** + * Stop the service by scheduling an {@link ActionStopQueue} action + * ..if the processor thread is working this will propagate through + * and stop the queue handling after all other actions complete. + * @throws Exception + */ + @Override + protected void serviceStop() throws Exception { + ActionStopQueue stopQueue = new ActionStopQueue("serviceStop: "+ this, + 0, TimeUnit.MILLISECONDS); + schedule(stopQueue); + super.serviceStop(); + } + + /** + * Flush an action queue of all types of a specific action + * @param clazz + */ + protected void flushActionQueue(Class<? extends AsyncAction> clazz) { + Iterator<AsyncAction> iterator = + actionQueue.descendingIterator(); + while (iterator.hasNext()) { + AsyncAction next = iterator.next(); + if (next.getClass().equals(clazz)) { + iterator.remove(); + } + } + } + + @Override + public boolean hasQueuedActionWithAttribute(int attr) { + for (AsyncAction action : actionQueue) { + if (action.hasAttr(attr)) { + return true; + } + } + return false; + } + + /** + * Run until the queue has been told to stop + */ + @Override + public void run() { + try { + + log.info("QueueService processor started"); + + AsyncAction take; + do { + take = scheduledActions.take(); + log.debug("Propagating {}", take); + actionQueue.put(take); + } while (!(take instanceof ActionStopQueue)); + log.info("QueueService processor terminated"); + } catch (InterruptedException e) { + // interrupted during actions + } + // the thread exits, but does not tag the service as complete. That's expected + // to be done by the stop queue + } + + + /** + * Check to see if the queue executor has completed + * @return the status + */ + public boolean isCompleted() { + return completed.get(); + } + + /** + * Package scoped method to mark the queue service as finished + */ + void complete() { + completed.set(true); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/RegisterComponentInstance.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/RegisterComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/RegisterComponentInstance.java new file mode 100644 index 0000000..4cf4981 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/RegisterComponentInstance.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.appmaster.actions; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.slider.server.appmaster.SliderAppMaster; +import org.apache.slider.server.appmaster.state.AppState; + +import java.util.concurrent.TimeUnit; + +/** + * Notify the app master that it should register a component instance + * in the registry + * {@link SliderAppMaster#registerComponent(ContainerId)} + */ +public class RegisterComponentInstance extends AsyncAction { + + public final ContainerId containerId; + public final String description; + public final String type; + + public RegisterComponentInstance(ContainerId containerId, + String description, + String type, + long delay, + TimeUnit timeUnit) { + super("RegisterComponentInstance :" + containerId, + delay, timeUnit); + this.description = description; + this.type = type; + Preconditions.checkArgument(containerId != null); + this.containerId = containerId; + } + + @Override + public void execute(SliderAppMaster appMaster, + QueueAccess queueService, + AppState appState) throws Exception { + + appMaster.registerComponent(containerId, description, type); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/RenewingAction.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/RenewingAction.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/RenewingAction.java new file mode 100644 index 0000000..f3143ea --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/RenewingAction.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.appmaster.actions; + +import com.google.common.base.Preconditions; +import org.apache.slider.server.appmaster.SliderAppMaster; +import org.apache.slider.server.appmaster.state.AppState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * This action executes then reschedules an inner action; a limit + * can specify the number of times to run + */ + +public class RenewingAction<A extends AsyncAction> extends AsyncAction { + private static final Logger log = + LoggerFactory.getLogger(RenewingAction.class); + private final A action; + private long interval; + private TimeUnit timeUnit; + public final AtomicInteger executionCount = new AtomicInteger(); + private final ReentrantReadWriteLock intervalLock = new ReentrantReadWriteLock(); + private final Lock intervalReadLock = intervalLock.readLock(); + private final Lock intervalWriteLock = intervalLock.writeLock(); + public final int limit; + + + /** + * Rescheduling action + * @param action action to execute + * @param initialDelay initial delay + * @param interval interval for later delays + * @param timeUnit time unit for all times + * @param limit limit on the no. of executions. If 0 or less: no limit + */ + public RenewingAction(A action, + long initialDelay, + long interval, + TimeUnit timeUnit, + int limit) { + super("renewing " + action.name, initialDelay, timeUnit, action.getAttrs()); + Preconditions.checkArgument(interval > 0, "invalid interval: " + interval); + this.action = action; + this.interval = interval; + this.timeUnit = timeUnit; + this.limit = limit; + } + + /** + * Execute the inner action then reschedule ourselves + * @param appMaster + * @param queueService + * @param appState + * @throws Exception + */ + @Override + public void execute(SliderAppMaster appMaster, + QueueAccess queueService, + AppState appState) + throws Exception { + long exCount = executionCount.incrementAndGet(); + log.debug("{}: Executing inner action count # {}", this, exCount); + action.execute(appMaster, queueService, appState); + boolean reschedule = true; + if (limit > 0) { + reschedule = limit > exCount; + } + if (reschedule) { + this.setNanos(convertAndOffset(getInterval(), getTimeUnit())); + log.debug("{}: rescheduling, new offset {} mS ", this, + getDelay(TimeUnit.MILLISECONDS)); + queueService.schedule(this); + } + } + + /** + * Get the action + * @return + */ + public A getAction() { + return action; + } + + public long getInterval() { + intervalReadLock.lock(); + try { + return interval; + } finally { + intervalReadLock.unlock(); + } + } + + public void updateInterval(long delay, TimeUnit timeUnit) { + intervalWriteLock.lock(); + try { + interval = delay; + this.timeUnit = timeUnit; + } finally { + intervalWriteLock.unlock(); + } + } + + public TimeUnit getTimeUnit() { + intervalReadLock.lock(); + try { + return timeUnit; + } finally { + intervalReadLock.unlock(); + } + } + + public int getExecutionCount() { + return executionCount.get(); + } + + public int getLimit() { + return limit; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ResetFailureWindow.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ResetFailureWindow.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ResetFailureWindow.java new file mode 100644 index 0000000..28bcf55 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ResetFailureWindow.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.appmaster.actions; + +import org.apache.slider.server.appmaster.SliderAppMaster; +import org.apache.slider.server.appmaster.state.AppState; + +/** + * Requests the AM to reset the failure window + */ +public class ResetFailureWindow extends AsyncAction { + + public ResetFailureWindow() { + super("ResetFailureWindow"); + } + + @Override + public void execute(SliderAppMaster appMaster, + QueueAccess queueService, + AppState appState) throws Exception { + appState.resetFailureCounts(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ReviewAndFlexApplicationSize.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ReviewAndFlexApplicationSize.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ReviewAndFlexApplicationSize.java new file mode 100644 index 0000000..bf7edf9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ReviewAndFlexApplicationSize.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.appmaster.actions; + +import org.apache.slider.server.appmaster.SliderAppMaster; +import org.apache.slider.server.appmaster.state.AppState; + +import java.util.concurrent.TimeUnit; + +/** + * Tell the AM to execute the full flex review operation + */ +public class ReviewAndFlexApplicationSize extends AsyncAction { + + public ReviewAndFlexApplicationSize(String name, + long delay, + TimeUnit timeUnit) { + super(name, delay, timeUnit, ATTR_REVIEWS_APP_SIZE); + } + + @Override + public void execute(SliderAppMaster appMaster, + QueueAccess queueService, + AppState appState) throws Exception { + appMaster.handleReviewAndFlexApplicationSize(this); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/UnregisterComponentInstance.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/UnregisterComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/UnregisterComponentInstance.java new file mode 100644 index 0000000..575fe8f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/actions/UnregisterComponentInstance.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.appmaster.actions; + +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.slider.server.appmaster.SliderAppMaster; +import org.apache.slider.server.appmaster.state.AppState; + +import java.util.concurrent.TimeUnit; + +/** + * Tell AM to unregister this component instance + * {@link SliderAppMaster#unregisterComponent(ContainerId)} + */ +public class UnregisterComponentInstance extends AsyncAction { + + + public final ContainerId containerId; + + public UnregisterComponentInstance(ContainerId containerId, + long delay, + TimeUnit timeUnit) { + super("UnregisterComponentInstance :" + containerId.toString(), + delay, timeUnit); + this.containerId = containerId; + } + + @Override + public void execute(SliderAppMaster appMaster, + QueueAccess queueService, + AppState appState) throws Exception { + appMaster.unregisterComponent(containerId); + + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/management/BoolMetric.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/management/BoolMetric.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/management/BoolMetric.java new file mode 100644 index 0000000..33f8d85 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/management/BoolMetric.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.appmaster.management; + +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Metric; + +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * A bool metric, mapped to an integer. true maps to 1, false to zero, + */ +public class BoolMetric implements Metric, Gauge<Integer> { + + private final AtomicBoolean value; + + public BoolMetric(boolean b) { + value = new AtomicBoolean(b); + } + + public void set(boolean b) { + value.set(b); + } + + public boolean get() { + return value.get(); + } + + @Override + public Integer getValue() { + return value.get() ? 1 : 0; + } + + /** + * Evaluate from a string. Returns true if the string is considered to match 'true', + * false otherwise. + * @param s source + * @return true if the input parses to an integer other than 0. False if it doesn't parse + * or parses to 0. + */ + public static boolean fromString(String s) { + try { + return Integer.valueOf(s) != 0; + } catch (NumberFormatException e) { + return false; + } + } + + @Override + public String toString() { + return value.toString(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + BoolMetric that = (BoolMetric) o; + return get() == that.get(); + } + + @Override + public int hashCode() { + return value.hashCode(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/management/BoolMetricPredicate.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/management/BoolMetricPredicate.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/management/BoolMetricPredicate.java new file mode 100644 index 0000000..82bcd3a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/management/BoolMetricPredicate.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.appmaster.management; + +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Metric; + +/** + * A metric which takes a predicate and returns 1 if the predicate evaluates + * to true. The predicate is evaluated whenever the metric is read. + */ +public class BoolMetricPredicate implements Metric, Gauge<Integer> { + + private final Eval predicate; + + public BoolMetricPredicate(Eval predicate) { + this.predicate = predicate; + } + + @Override + public Integer getValue() { + return predicate.eval() ? 1: 0; + } + + public interface Eval { + boolean eval(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/management/LongGauge.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/management/LongGauge.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/management/LongGauge.java new file mode 100644 index 0000000..c93467b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/management/LongGauge.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.appmaster.management; + +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Metric; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * This is a {@link AtomicLong} which acts as a metrics gauge: its state can be exposed as + * a metrics. + * It also exposes some of the same method names as the Codahale Counter class, so that + * it's easy to swap in. + * + */ +public class LongGauge extends AtomicLong implements Metric, Gauge<Long> { + + /** + * Instantiate + * @param val current value + */ + public LongGauge(long val) { + super(val); + } + + /** + * Instantiate with value 0 + */ + public LongGauge() { + this(0); + } + + /** + * Get the value as a metric + * @return current value + */ + @Override + public Long getValue() { + return get(); + } + + /** + * Method from {@Code counter}; used here for drop-in replacement + * without any recompile + * @return current value + */ + public Long getCount() { + return get(); + } + + /** + * {@code ++} + */ + public void inc() { + incrementAndGet(); + } + + /** + * {@code --} + */ + public void dec() { + decrementAndGet(); + } + + /** + * Decrement to the floor of 0. Operations in parallel may cause confusion here, + * but it will still never go below zero + * @param delta delta + * @return the current value + */ + public long decToFloor(long delta) { + long l = get(); + long r = l - delta; + if (r < 0) { + r = 0; + } + // if this fails, the decrement has been lost + compareAndSet(l, r); + return get(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/management/LongMetricFunction.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/management/LongMetricFunction.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/management/LongMetricFunction.java new file mode 100644 index 0000000..1de7345 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/management/LongMetricFunction.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.appmaster.management; + +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Metric; + +/** + * A metric which takes a function to generate a long value. + * The function is evaluated whenever the metric is read. + */ +public class LongMetricFunction implements Metric, Gauge<Long> { + + private final Eval function; + + public LongMetricFunction(Eval function) { + this.function = function; + } + + @Override + public Long getValue() { + return function.eval(); + } + + public interface Eval { + long eval(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/management/MeterAndCounter.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/management/MeterAndCounter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/management/MeterAndCounter.java new file mode 100644 index 0000000..02ab7bc --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/management/MeterAndCounter.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.appmaster.management; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; + +/** + * A combined meter and counter that can be used to measure load. + * Hash and equality are derived from the name + */ +public class MeterAndCounter { + + /** + * suffix for counters: {@value} + */ + public static final String COUNTER = ".counter"; + + /** + * suffix for meters: {@value} + */ + public static final String METER = ".meter"; + + final Meter meter; + final Counter counter; + final String name; + + /** + * Construct an instance + * @param metrics metrics to bond to + * @param name name before suffixes are appended + */ + public MeterAndCounter(MetricRegistry metrics, String name) { + this.name = name; + counter = metrics.counter(name + COUNTER); + meter = metrics.meter(name + METER); + } + + /** + * Construct an instance + * @param metrics metrics to bond to + * @param clazz class to use to derive name + * @param name name before suffixes are appended + */ + + public MeterAndCounter(MetricRegistry metrics, Class clazz, String name) { + this.name = name; + counter = metrics.counter(MetricRegistry.name(clazz, name + COUNTER)); + meter = metrics.meter(MetricRegistry.name(clazz, name + METER)); + } + + /** + * Increment the counter, mark the meter + */ + public void mark() { + counter.inc(); + meter.mark(); + } + + public void inc() { + mark(); + } + + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + MeterAndCounter that = (MeterAndCounter) o; + + return name.equals(that.name); + + } + + @Override + public int hashCode() { + return name.hashCode(); + } + + /** + * Get the count. + * @return the current count + */ + public long getCount() { + return counter.getCount(); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org