http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/RpcBinder.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/RpcBinder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/RpcBinder.java new file mode 100644 index 0000000..dd4785d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/RpcBinder.java @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.appmaster.rpc; + +import com.google.common.base.Preconditions; +import com.google.protobuf.BlockingService; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.ProtocolProxy; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.RpcEngine; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.SecretManager; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.slider.api.SliderClusterProtocol; +import org.apache.slider.common.SliderExitCodes; +import org.apache.slider.common.tools.Duration; +import org.apache.slider.common.tools.SliderUtils; +import org.apache.slider.core.exceptions.BadClusterStateException; +import org.apache.slider.core.exceptions.ErrorStrings; +import org.apache.slider.core.exceptions.ServiceNotReadyException; +import org.apache.slider.core.exceptions.SliderException; + +import static org.apache.slider.common.SliderXmlConfKeys.*; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.security.PrivilegedExceptionAction; + +public class RpcBinder { + protected static final Logger log = + LoggerFactory.getLogger(RpcBinder.class); + + /** + * Create a protobuf server bonded to the specific socket address + * @param addr address to listen to; 0.0.0.0 as hostname acceptable + * @param conf config + * @param secretManager token secret handler + * @param numHandlers threads to service requests + * @param blockingService service to handle + * @param portRangeConfig range of ports + * @return the IPC server itself + * @throws IOException + */ + public static Server createProtobufServer(InetSocketAddress addr, + Configuration conf, + SecretManager<? extends TokenIdentifier> secretManager, + int numHandlers, + BlockingService blockingService, + String portRangeConfig) throws + IOException { + Class<SliderClusterProtocolPB> sliderClusterAPIClass = registerSliderAPI( + conf); + RPC.Server server = new RPC.Builder(conf).setProtocol(sliderClusterAPIClass) + .setInstance(blockingService) + .setBindAddress(addr.getAddress() + .getCanonicalHostName()) + .setPort(addr.getPort()) + .setNumHandlers(numHandlers) + .setVerbose(false) + .setSecretManager(secretManager) + .setPortRangeConfig( + portRangeConfig) + .build(); + log.debug( + "Adding protocol " + sliderClusterAPIClass.getCanonicalName() + " to the server"); + server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, sliderClusterAPIClass, + blockingService); + return server; + } + + /** + * Add the protobuf engine to the configuration. Harmless and inexpensive + * if repeated. + * @param conf configuration to patch + * @return the protocol class + */ + public static Class<SliderClusterProtocolPB> registerSliderAPI( + Configuration conf) { + Class<SliderClusterProtocolPB> sliderClusterAPIClass = + SliderClusterProtocolPB.class; + RPC.setProtocolEngine(conf, sliderClusterAPIClass, ProtobufRpcEngine.class); + + //quick sanity check here + assert verifyBondedToProtobuf(conf, sliderClusterAPIClass); + + return sliderClusterAPIClass; + } + + /** + * Verify that the conf is set up for protobuf transport of Slider RPC + * @param conf configuration + * @param sliderClusterAPIClass class for the API + * @return true if the RPC engine is protocol buffers + */ + public static boolean verifyBondedToProtobuf(Configuration conf, + Class<SliderClusterProtocolPB> sliderClusterAPIClass) { + return conf.getClass("rpc.engine." + sliderClusterAPIClass.getName(), + RpcEngine.class) .equals(ProtobufRpcEngine.class); + } + + + /** + * Connect to a server. May include setting up retry policies + * @param addr + * @param currentUser + * @param conf + * @param rpcTimeout + * @return + * @throws IOException + */ + public static SliderClusterProtocol connectToServer(InetSocketAddress addr, + UserGroupInformation currentUser, + Configuration conf, + int rpcTimeout) throws IOException { + Class<SliderClusterProtocolPB> sliderClusterAPIClass = + registerSliderAPI(conf); + + final RetryPolicy retryPolicy = RetryPolicies.TRY_ONCE_THEN_FAIL; + log.debug("Connecting to Slider AM at {}", addr); + ProtocolProxy<SliderClusterProtocolPB> protoProxy = + RPC.getProtocolProxy(sliderClusterAPIClass, + 1, + addr, + currentUser, + conf, + NetUtils.getDefaultSocketFactory(conf), + rpcTimeout, + retryPolicy); + SliderClusterProtocolPB endpoint = protoProxy.getProxy(); + return new SliderClusterProtocolProxy(endpoint, addr); + } + + + /** + * This loops for a limited period trying to get the Proxy - + * by doing so it handles AM failover + * @param conf configuration to patch and use + * @param rmClient client of the resource manager + * @param application application to work with + * @param connectTimeout timeout for the whole proxy operation to timeout + * (milliseconds). Use 0 to indicate "do not attempt to wait" -fail fast. + * @param rpcTimeout timeout for RPCs to block during communications + * @return the proxy + * @throws IOException IO problems + * @throws YarnException Slider-generated exceptions related to the binding + * failing. This can include the application finishing or timeouts + * @throws InterruptedException if a sleep operation waiting for + * the cluster to respond is interrupted. + */ + @SuppressWarnings("NestedAssignment") + public static SliderClusterProtocol getProxy(final Configuration conf, + final ApplicationClientProtocol rmClient, + ApplicationReport application, + final int connectTimeout, + final int rpcTimeout) + throws IOException, YarnException, InterruptedException { + ApplicationId appId; + appId = application.getApplicationId(); + Duration timeout = new Duration(connectTimeout); + timeout.start(); + Exception exception = null; + YarnApplicationState state = null; + try { + while (application != null && + (state = application.getYarnApplicationState()).equals( + YarnApplicationState.RUNNING)) { + + try { + return getProxy(conf, application, rpcTimeout); + } catch (IOException e) { + if (connectTimeout <= 0 || timeout.getLimitExceeded()) { + throw e; + } + exception = e; + } catch (YarnException e) { + if (connectTimeout <= 0 || timeout.getLimitExceeded()) { + throw e; + } + exception = e; + } + //at this point: app failed to work + log.debug("Could not connect to {}. Waiting for getting the latest AM address...", + appId); + Thread.sleep(1000); + //or get the app report + application = + rmClient.getApplicationReport( + GetApplicationReportRequest.newInstance(appId)).getApplicationReport(); + } + //get here if the app is no longer running. Raise a specific + //exception but init it with the previous failure + throw new BadClusterStateException( + exception, + ErrorStrings.E_FINISHED_APPLICATION, appId, state ); + } finally { + timeout.close(); + } + } + + /** + * Get a proxy from the application report + * @param conf config to use + * @param application app report + * @param rpcTimeout timeout in RPC operations + * @return the proxy + * @throws IOException + * @throws SliderException + * @throws InterruptedException + */ + public static SliderClusterProtocol getProxy(final Configuration conf, + final ApplicationReport application, + final int rpcTimeout) + throws IOException, SliderException, InterruptedException { + + String host = application.getHost(); + int port = application.getRpcPort(); + org.apache.hadoop.yarn.api.records.Token clientToAMToken = + application.getClientToAMToken(); + return createProxy(conf, host, port, clientToAMToken, rpcTimeout); + } + + /** + * + * @param conf config to use + * @param host hosname + * @param port port + * @param clientToAMToken auth token: only used in a secure cluster. + * converted via {@link ConverterUtils#convertFromYarn(org.apache.hadoop.yarn.api.records.Token, InetSocketAddress)} + * @param rpcTimeout timeout in RPC operations + * @return the proxy + * @throws SliderException + * @throws IOException + * @throws InterruptedException + */ + public static SliderClusterProtocol createProxy(final Configuration conf, + String host, + int port, + org.apache.hadoop.yarn.api.records.Token clientToAMToken, + final int rpcTimeout) throws + SliderException, + IOException, + InterruptedException { + String address = host + ":" + port; + if (SliderUtils.isUnset(host) || 0 == port) { + throw new SliderException(SliderExitCodes.EXIT_CONNECTIVITY_PROBLEM, + "Slider instance " + + " isn't providing a valid address for the" + + " Slider RPC protocol: " + address); + } + + UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); + final UserGroupInformation newUgi = UserGroupInformation.createRemoteUser( + currentUser.getUserName()); + final InetSocketAddress serviceAddr = + NetUtils.createSocketAddrForHost(host, port); + SliderClusterProtocol realProxy; + + log.debug("Connecting to {}", serviceAddr); + if (UserGroupInformation.isSecurityEnabled()) { + Preconditions.checkArgument(clientToAMToken != null, + "Null clientToAMToken"); + Token<ClientToAMTokenIdentifier> token = + ConverterUtils.convertFromYarn(clientToAMToken, serviceAddr); + newUgi.addToken(token); + realProxy = + newUgi.doAs(new PrivilegedExceptionAction<SliderClusterProtocol>() { + @Override + public SliderClusterProtocol run() throws IOException { + return connectToServer(serviceAddr, newUgi, conf, rpcTimeout); + } + }); + } else { + realProxy = connectToServer(serviceAddr, newUgi, conf, rpcTimeout); + } + return realProxy; + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderAMPolicyProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderAMPolicyProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderAMPolicyProvider.java new file mode 100644 index 0000000..a40078a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderAMPolicyProvider.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.slider.server.appmaster.rpc; + +import org.apache.hadoop.security.authorize.PolicyProvider; +import org.apache.hadoop.security.authorize.Service; +import org.apache.slider.common.SliderXmlConfKeys; + +/** + * {@link PolicyProvider} for Slider protocols. + */ + +public class SliderAMPolicyProvider extends PolicyProvider { + + private static final Service[] services = + new Service[] { + new Service(SliderXmlConfKeys.KEY_PROTOCOL_ACL, SliderClusterProtocolPB.class) + }; + + @SuppressWarnings("ReturnOfCollectionOrArrayField") + @Override + public Service[] getServices() { + return services; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolPB.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolPB.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolPB.java new file mode 100644 index 0000000..7d237de --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolPB.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.appmaster.rpc; + +import org.apache.slider.api.SliderClusterProtocol; +import org.apache.slider.api.proto.SliderClusterAPI; + +public interface SliderClusterProtocolPB extends SliderClusterAPI.SliderClusterProtocolPB.BlockingInterface{ + + long versionID = SliderClusterProtocol.versionID; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolPBImpl.java new file mode 100644 index 0000000..f0d9063 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolPBImpl.java @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.appmaster.rpc; + +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; +import org.apache.slider.api.SliderClusterProtocol; +import org.apache.slider.api.proto.Messages; + +import java.io.IOException; + +/** + * Server-side Relay from Protobuf to internal RPC. + * + */ +public class SliderClusterProtocolPBImpl implements SliderClusterProtocolPB { + + private SliderClusterProtocol real; + + public SliderClusterProtocolPBImpl(SliderClusterProtocol real) { + this.real = real; + } + + private ServiceException wrap(Exception e) { + if (e instanceof ServiceException) { + return (ServiceException) e; + } + return new ServiceException(e); + } + + public long getProtocolVersion(String protocol, long clientVersion) + throws IOException { + return SliderClusterProtocol.versionID; + } + + @Override + public Messages.StopClusterResponseProto stopCluster(RpcController controller, + Messages.StopClusterRequestProto request) throws ServiceException { + try { + return real.stopCluster(request); + } catch (Exception e) { + throw wrap(e); + } + } + + @Override + public Messages.UpgradeContainersResponseProto upgradeContainers(RpcController controller, + Messages.UpgradeContainersRequestProto request) throws ServiceException { + try { + return real.upgradeContainers(request); + } catch (Exception e) { + throw wrap(e); + } + } + + @Override + public Messages.FlexClusterResponseProto flexCluster(RpcController controller, + Messages.FlexClusterRequestProto request) throws ServiceException { + try { + return real.flexCluster(request); + } catch (Exception e) { + throw wrap(e); + } + } + + @Override + public Messages.GetJSONClusterStatusResponseProto getJSONClusterStatus( + RpcController controller, + Messages.GetJSONClusterStatusRequestProto request) throws ServiceException { + try { + return real.getJSONClusterStatus(request); + } catch (Exception e) { + throw wrap(e); + } + } + + + @Override + public Messages.GetInstanceDefinitionResponseProto getInstanceDefinition( + RpcController controller, + Messages.GetInstanceDefinitionRequestProto request) + throws ServiceException { + try { + return real.getInstanceDefinition(request); + } catch (Exception e) { + throw wrap(e); + } + } + + @Override + public Messages.ListNodeUUIDsByRoleResponseProto listNodeUUIDsByRole( + RpcController controller, + Messages.ListNodeUUIDsByRoleRequestProto request) throws ServiceException { + try { + return real.listNodeUUIDsByRole(request); + } catch (Exception e) { + throw wrap(e); + } + } + + @Override + public Messages.GetNodeResponseProto getNode(RpcController controller, + Messages.GetNodeRequestProto request) throws ServiceException { + try { + return real.getNode(request); + } catch (Exception e) { + throw wrap(e); + } + } + + @Override + public Messages.GetClusterNodesResponseProto getClusterNodes(RpcController controller, + Messages.GetClusterNodesRequestProto request) throws ServiceException { + try { + return real.getClusterNodes(request); + } catch (Exception e) { + throw wrap(e); + } + } + + @Override + public Messages.EchoResponseProto echo(RpcController controller, + Messages.EchoRequestProto request) throws ServiceException { + try { + return real.echo(request); + } catch (Exception e) { + throw wrap(e); + } + } + + @Override + public Messages.KillContainerResponseProto killContainer(RpcController controller, + Messages.KillContainerRequestProto request) throws ServiceException { + try { + return real.killContainer(request); + } catch (Exception e) { + throw wrap(e); + } + } + + @Override + public Messages.AMSuicideResponseProto amSuicide(RpcController controller, + Messages.AMSuicideRequestProto request) throws ServiceException { + try { + return real.amSuicide(request); + } catch (Exception e) { + throw wrap(e); + } + } + + @Override + public Messages.ApplicationLivenessInformationProto getLivenessInformation( + RpcController controller, + Messages.GetApplicationLivenessRequestProto request) throws ServiceException { + try { + return real.getLivenessInformation(request); + } catch (Exception e) { + throw wrap(e); + } + } + + @Override + public Messages.GetLiveContainersResponseProto getLiveContainers(RpcController controller, + Messages.GetLiveContainersRequestProto request) throws ServiceException { + try { + return real.getLiveContainers(request); + } catch (Exception e) { + throw wrap(e); + } + } + + @Override + public Messages.ContainerInformationProto getLiveContainer(RpcController controller, + Messages.GetLiveContainerRequestProto request) throws ServiceException { + try { + return real.getLiveContainer(request); + } catch (Exception e) { + throw wrap(e); + } + } + + @Override + public Messages.GetLiveComponentsResponseProto getLiveComponents(RpcController controller, + Messages.GetLiveComponentsRequestProto request) throws ServiceException { + try { + return real.getLiveComponents(request); + } catch (Exception e) { + throw wrap(e); + } + } + + @Override + public Messages.ComponentInformationProto getLiveComponent(RpcController controller, + Messages.GetLiveComponentRequestProto request) throws ServiceException { + try { + return real.getLiveComponent(request); + } catch (Exception e) { + throw wrap(e); + } + } + + @Override + public Messages.GetLiveNodesResponseProto getLiveNodes(RpcController controller, + Messages.GetLiveNodesRequestProto request) throws ServiceException { + try { + return real.getLiveNodes(request); + } catch (Exception e) { + throw wrap(e); + } + } + + @Override + public Messages.NodeInformationProto getLiveNode(RpcController controller, + Messages.GetLiveNodeRequestProto request) throws ServiceException { + try { + return real.getLiveNode(request); + } catch (Exception e) { + throw wrap(e); + } + } + + @Override + public Messages.WrappedJsonProto getModelDesired(RpcController controller, + Messages.EmptyPayloadProto request) throws ServiceException { + try { + return real.getModelDesired(request); + } catch (Exception e) { + throw wrap(e); + } + } + + @Override + public Messages.WrappedJsonProto getModelDesiredAppconf(RpcController controller, + Messages.EmptyPayloadProto request) throws ServiceException { + try { + return real.getModelDesiredAppconf(request); + } catch (Exception e) { + throw wrap(e); + } } + + @Override + public Messages.WrappedJsonProto getModelDesiredResources(RpcController controller, + Messages.EmptyPayloadProto request) throws ServiceException { + try { + return real.getModelDesiredResources(request); + } catch (Exception e) { + throw wrap(e); + } + } + + @Override + public Messages.WrappedJsonProto getModelResolved(RpcController controller, + Messages.EmptyPayloadProto request) throws ServiceException { + try { + return real.getModelResolved(request); + } catch (Exception e) { + throw wrap(e); + } + } + + @Override + public Messages.WrappedJsonProto getModelResolvedAppconf(RpcController controller, + Messages.EmptyPayloadProto request) throws ServiceException { + try { + return real.getModelResolvedAppconf(request); + } catch (Exception e) { + throw wrap(e); + } + } + + @Override + public Messages.WrappedJsonProto getModelResolvedResources(RpcController controller, + Messages.EmptyPayloadProto request) throws ServiceException { + try { + return real.getModelResolvedResources(request); + } catch (Exception e) { + throw wrap(e); + } + } + + @Override + public Messages.WrappedJsonProto getLiveResources(RpcController controller, + Messages.EmptyPayloadProto request) throws ServiceException { + try { + return real.getLiveResources(request); + } catch (Exception e) { + throw wrap(e); + } + } + + @Override + public Messages.GetCertificateStoreResponseProto getClientCertificateStore( + RpcController controller, + Messages.GetCertificateStoreRequestProto request) + throws ServiceException { + try { + return real.getClientCertificateStore(request); + } catch (Exception e) { + throw wrap(e); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolProxy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolProxy.java new file mode 100644 index 0000000..b230816 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolProxy.java @@ -0,0 +1,358 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.appmaster.rpc; + +import com.google.common.base.Preconditions; +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; +import org.apache.hadoop.ipc.ProtobufHelper; +import org.apache.hadoop.ipc.ProtocolSignature; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.slider.api.SliderClusterProtocol; +import org.apache.slider.api.proto.Messages; + +import java.io.IOException; +import java.net.InetSocketAddress; + +public class SliderClusterProtocolProxy implements SliderClusterProtocol { + + private static final RpcController NULL_CONTROLLER = null; + private final SliderClusterProtocolPB endpoint; + private final InetSocketAddress address; + + public SliderClusterProtocolProxy(SliderClusterProtocolPB endpoint, + InetSocketAddress address) { + Preconditions.checkArgument(endpoint != null, "null endpoint"); + Preconditions.checkNotNull(address != null, "null address"); + this.endpoint = endpoint; + this.address = address; + } + + @Override + public String toString() { + final StringBuilder sb = + new StringBuilder("SliderClusterProtocolProxy{"); + sb.append("address=").append(address); + sb.append('}'); + return sb.toString(); + } + + @Override + public ProtocolSignature getProtocolSignature(String protocol, + long clientVersion, + int clientMethodsHash) + throws IOException { + if (!protocol.equals(RPC.getProtocolName(SliderClusterProtocolPB.class))) { + throw new IOException("Serverside implements " + + RPC.getProtocolName(SliderClusterProtocolPB.class) + + ". The following requested protocol is unknown: " + + protocol); + } + + return ProtocolSignature.getProtocolSignature(clientMethodsHash, + RPC.getProtocolVersion( + SliderClusterProtocol.class), + SliderClusterProtocol.class); + } + + @Override + public long getProtocolVersion(String protocol, long clientVersion) + throws IOException { + return SliderClusterProtocol.versionID; + } + + private IOException convert(ServiceException se) { + IOException ioe = ProtobufHelper.getRemoteException(se); + if (ioe instanceof RemoteException) { + RemoteException remoteException = (RemoteException) ioe; + return remoteException.unwrapRemoteException(); + } + return ioe; + } + + @Override + public Messages.StopClusterResponseProto stopCluster(Messages.StopClusterRequestProto request) throws + IOException, + YarnException { + try { + return endpoint.stopCluster(NULL_CONTROLLER, request); + } catch (ServiceException e) { + throw convert(e); + } + } + + @Override + public Messages.UpgradeContainersResponseProto upgradeContainers( + Messages.UpgradeContainersRequestProto request) throws IOException, + YarnException { + try { + return endpoint.upgradeContainers(NULL_CONTROLLER, request); + } catch (ServiceException e) { + throw convert(e); + } + } + + @Override + public Messages.FlexClusterResponseProto flexCluster(Messages.FlexClusterRequestProto request) + throws IOException { + try { + return endpoint.flexCluster(NULL_CONTROLLER, request); + } catch (ServiceException e) { + throw convert(e); + } + } + + @Override + public Messages.GetJSONClusterStatusResponseProto getJSONClusterStatus( + Messages.GetJSONClusterStatusRequestProto request) throws + IOException, + YarnException { + try { + return endpoint.getJSONClusterStatus(NULL_CONTROLLER, request); + } catch (ServiceException e) { + throw convert(e); + } + } + + + @Override + public Messages.GetInstanceDefinitionResponseProto getInstanceDefinition( + Messages.GetInstanceDefinitionRequestProto request) throws + IOException, + YarnException { + try { + return endpoint.getInstanceDefinition(NULL_CONTROLLER, request); + } catch (ServiceException e) { + throw convert(e); + } + } + + @Override + public Messages.ListNodeUUIDsByRoleResponseProto listNodeUUIDsByRole(Messages.ListNodeUUIDsByRoleRequestProto request) throws + IOException, + YarnException { + try { + return endpoint.listNodeUUIDsByRole(NULL_CONTROLLER, request); + } catch (ServiceException e) { + throw convert(e); + } + } + + @Override + public Messages.GetNodeResponseProto getNode(Messages.GetNodeRequestProto request) throws + IOException, + YarnException { + try { + return endpoint.getNode(NULL_CONTROLLER, request); + } catch (ServiceException e) { + throw convert(e); + } + } + + @Override + public Messages.GetClusterNodesResponseProto getClusterNodes(Messages.GetClusterNodesRequestProto request) throws + IOException, + YarnException { + try { + return endpoint.getClusterNodes(NULL_CONTROLLER, request); + } catch (ServiceException e) { + throw convert(e); + } + } + + + @Override + public Messages.EchoResponseProto echo(Messages.EchoRequestProto request) throws + IOException, + YarnException { + try { + return endpoint.echo(NULL_CONTROLLER, request); + } catch (ServiceException e) { + throw convert(e); + } + } + + + @Override + public Messages.KillContainerResponseProto killContainer(Messages.KillContainerRequestProto request) throws + IOException, + YarnException { + try { + return endpoint.killContainer(NULL_CONTROLLER, request); + } catch (ServiceException e) { + throw convert(e); + } + } + + @Override + public Messages.AMSuicideResponseProto amSuicide(Messages.AMSuicideRequestProto request) throws + IOException { + try { + return endpoint.amSuicide(NULL_CONTROLLER, request); + } catch (ServiceException e) { + throw convert(e); + } + } + + @Override + public Messages.ApplicationLivenessInformationProto getLivenessInformation( + Messages.GetApplicationLivenessRequestProto request) throws IOException { + try { + return endpoint.getLivenessInformation(NULL_CONTROLLER, request); + } catch (ServiceException e) { + throw convert(e); + } + } + + @Override + public Messages.GetLiveContainersResponseProto getLiveContainers(Messages.GetLiveContainersRequestProto request) throws + IOException { + try { + return endpoint.getLiveContainers(NULL_CONTROLLER, request); + } catch (ServiceException e) { + throw convert(e); + } + } + + @Override + public Messages.ContainerInformationProto getLiveContainer(Messages.GetLiveContainerRequestProto request) throws + IOException { + try { + return endpoint.getLiveContainer(NULL_CONTROLLER, request); + } catch (ServiceException e) { + throw convert(e); + } + } + + @Override + public Messages.GetLiveComponentsResponseProto getLiveComponents(Messages.GetLiveComponentsRequestProto request) throws + IOException { + try { + return endpoint.getLiveComponents(NULL_CONTROLLER, request); + } catch (ServiceException e) { + throw convert(e); + } + } + + @Override + public Messages.ComponentInformationProto getLiveComponent(Messages.GetLiveComponentRequestProto request) throws + IOException { + try { + return endpoint.getLiveComponent(NULL_CONTROLLER, request); + } catch (ServiceException e) { + throw convert(e); + } + } + + @Override + public Messages.GetLiveNodesResponseProto getLiveNodes(Messages.GetLiveNodesRequestProto request) + throws IOException { + try { + return endpoint.getLiveNodes(NULL_CONTROLLER, request); + } catch (ServiceException e) { + throw convert(e); + } + } + + @Override + public Messages.NodeInformationProto getLiveNode(Messages.GetLiveNodeRequestProto request) + throws IOException { + try { + return endpoint.getLiveNode(NULL_CONTROLLER, request); + } catch (ServiceException e) { + throw convert(e); + } + } + + @Override + public Messages.WrappedJsonProto getModelDesired(Messages.EmptyPayloadProto request) throws IOException { + try { + return endpoint.getModelDesired(NULL_CONTROLLER, request); + } catch (ServiceException e) { + throw convert(e); + } + } + + @Override + public Messages.WrappedJsonProto getModelDesiredAppconf(Messages.EmptyPayloadProto request) throws IOException { + try { + return endpoint.getModelDesiredAppconf(NULL_CONTROLLER, request); + } catch (ServiceException e) { + throw convert(e); + } + } + + @Override + public Messages.WrappedJsonProto getModelDesiredResources(Messages.EmptyPayloadProto request) throws IOException { + try { + return endpoint.getModelDesiredResources(NULL_CONTROLLER, request); + } catch (ServiceException e) { + throw convert(e); + } + } + + @Override + public Messages.WrappedJsonProto getModelResolved(Messages.EmptyPayloadProto request) throws IOException { + try { + return endpoint.getModelResolved(NULL_CONTROLLER, request); + } catch (ServiceException e) { + throw convert(e); + } + } + + @Override + public Messages.WrappedJsonProto getModelResolvedAppconf(Messages.EmptyPayloadProto request) throws IOException { + try { + return endpoint.getModelResolvedAppconf(NULL_CONTROLLER, request); + } catch (ServiceException e) { + throw convert(e); + } + } + + @Override + public Messages.WrappedJsonProto getModelResolvedResources(Messages.EmptyPayloadProto request) throws IOException { + try { + return endpoint.getModelResolvedResources(NULL_CONTROLLER, request); + } catch (ServiceException e) { + throw convert(e); + } + } + + @Override + public Messages.WrappedJsonProto getLiveResources(Messages.EmptyPayloadProto request) throws IOException { + try { + return endpoint.getLiveResources(NULL_CONTROLLER, request); + } catch (ServiceException e) { + throw convert(e); + } + + } + + @Override + public Messages.GetCertificateStoreResponseProto getClientCertificateStore(Messages.GetCertificateStoreRequestProto request) throws + IOException { + try { + return endpoint.getClientCertificateStore(NULL_CONTROLLER, request); + } catch (ServiceException e) { + throw convert(e); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderIPCService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderIPCService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderIPCService.java new file mode 100644 index 0000000..fda23aa --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderIPCService.java @@ -0,0 +1,551 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.appmaster.rpc; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.ipc.ProtocolSignature; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.slider.api.ClusterDescription; +import org.apache.slider.api.SliderClusterProtocol; +import org.apache.slider.api.proto.Messages; +import org.apache.slider.api.types.ApplicationLivenessInformation; +import org.apache.slider.api.types.ComponentInformation; +import org.apache.slider.api.types.ContainerInformation; +import org.apache.slider.api.types.NodeInformation; +import org.apache.slider.api.types.NodeInformationList; +import org.apache.slider.core.conf.AggregateConf; +import org.apache.slider.core.conf.ConfTree; +import org.apache.slider.core.exceptions.ServiceNotReadyException; +import org.apache.slider.core.exceptions.SliderException; +import org.apache.slider.core.main.LauncherExitCodes; +import org.apache.slider.core.persist.AggregateConfSerDeser; +import org.apache.slider.core.persist.ConfTreeSerDeser; +import org.apache.slider.server.appmaster.AppMasterActionOperations; +import org.apache.slider.server.appmaster.actions.ActionFlexCluster; +import org.apache.slider.server.appmaster.actions.ActionHalt; +import org.apache.slider.server.appmaster.actions.ActionKillContainer; +import org.apache.slider.server.appmaster.actions.ActionStopSlider; +import org.apache.slider.server.appmaster.actions.ActionUpgradeContainers; +import org.apache.slider.server.appmaster.actions.AsyncAction; +import org.apache.slider.server.appmaster.actions.QueueAccess; +import org.apache.slider.server.appmaster.management.MetricsAndMonitoring; +import org.apache.slider.server.appmaster.state.RoleInstance; +import org.apache.slider.server.appmaster.state.StateAccessForProviders; +import org.apache.slider.server.appmaster.web.rest.application.resources.ContentCache; +import org.apache.slider.server.services.security.CertificateManager; +import org.apache.slider.server.services.security.SecurityStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.apache.slider.api.proto.RestTypeMarshalling.marshall; +import static org.apache.slider.server.appmaster.web.rest.RestPaths.LIVE_COMPONENTS; +import static org.apache.slider.server.appmaster.web.rest.RestPaths.LIVE_CONTAINERS; +import static org.apache.slider.server.appmaster.web.rest.RestPaths.LIVE_NODES; +import static org.apache.slider.server.appmaster.web.rest.RestPaths.LIVE_RESOURCES; +import static org.apache.slider.server.appmaster.web.rest.RestPaths.MODEL_DESIRED; +import static org.apache.slider.server.appmaster.web.rest.RestPaths.MODEL_DESIRED_APPCONF; +import static org.apache.slider.server.appmaster.web.rest.RestPaths.MODEL_DESIRED_RESOURCES; +import static org.apache.slider.server.appmaster.web.rest.RestPaths.MODEL_RESOLVED; +import static org.apache.slider.server.appmaster.web.rest.RestPaths.MODEL_RESOLVED_APPCONF; +import static org.apache.slider.server.appmaster.web.rest.RestPaths.MODEL_RESOLVED_RESOURCES; + +/** + * Implement the {@link SliderClusterProtocol}. + */ +@SuppressWarnings("unchecked") + +public class SliderIPCService extends AbstractService + implements SliderClusterProtocol { + + protected static final Logger log = + LoggerFactory.getLogger(SliderIPCService.class); + + private final QueueAccess actionQueues; + private final StateAccessForProviders state; + private final MetricsAndMonitoring metricsAndMonitoring; + private final AppMasterActionOperations amOperations; + private final ContentCache cache; + private final CertificateManager certificateManager; + + /** + * This is the prefix used for metrics + */ + public static final String METRICS_PREFIX = + "org.apache.slider.api.SliderIPCService."; + + /** + * Constructor + * @param amOperations access to any AM operations + * @param state state view + * @param actionQueues queues for actions + * @param metricsAndMonitoring metrics + * @param cache + */ + public SliderIPCService(AppMasterActionOperations amOperations, + CertificateManager certificateManager, + StateAccessForProviders state, + QueueAccess actionQueues, + MetricsAndMonitoring metricsAndMonitoring, + ContentCache cache) { + super("SliderIPCService"); + Preconditions.checkArgument(amOperations != null, "null amOperations"); + Preconditions.checkArgument(state != null, "null appState"); + Preconditions.checkArgument(actionQueues != null, "null actionQueues"); + Preconditions.checkArgument(metricsAndMonitoring != null, + "null metricsAndMonitoring"); + Preconditions.checkArgument(cache != null, "null cache"); + this.state = state; + this.actionQueues = actionQueues; + this.metricsAndMonitoring = metricsAndMonitoring; + this.amOperations = amOperations; + this.cache = cache; + this.certificateManager = certificateManager; + } + + @Override //SliderClusterProtocol + public ProtocolSignature getProtocolSignature(String protocol, + long clientVersion, + int clientMethodsHash) throws IOException { + return ProtocolSignature.getProtocolSignature( + this, protocol, clientVersion, clientMethodsHash); + } + + + @Override //SliderClusterProtocol + public long getProtocolVersion(String protocol, long clientVersion) + throws IOException { + return SliderClusterProtocol.versionID; + } + + /** + * General actions to perform on a slider RPC call coming in + * @param operation operation to log + * @throws IOException problems + * @throws ServiceNotReadyException if the RPC service is constructed + * but not fully initialized + */ + protected void onRpcCall(String operation) throws IOException { + log.debug("Received call to {}", operation); + metricsAndMonitoring.markMeterAndCounter(METRICS_PREFIX + operation); + } + + /** + * Schedule an action + * @param action for delayed execution + */ + public void schedule(AsyncAction action) { + actionQueues.schedule(action); + } + + /** + * Queue an action for immediate execution in the executor thread + * @param action action to execute + */ + public void queue(AsyncAction action) { + actionQueues.put(action); + } + + @Override //SliderClusterProtocol + public Messages.StopClusterResponseProto stopCluster(Messages.StopClusterRequestProto request) + throws IOException, YarnException { + onRpcCall("stop"); + String message = request.getMessage(); + if (message == null) { + message = "application stopped by client"; + } + ActionStopSlider stopSlider = + new ActionStopSlider(message, + 1000, TimeUnit.MILLISECONDS, + LauncherExitCodes.EXIT_SUCCESS, + FinalApplicationStatus.SUCCEEDED, + message); + log.info("SliderAppMasterApi.stopCluster: {}", stopSlider); + schedule(stopSlider); + return Messages.StopClusterResponseProto.getDefaultInstance(); + } + + @Override //SliderClusterProtocol + public Messages.UpgradeContainersResponseProto upgradeContainers( + Messages.UpgradeContainersRequestProto request) throws IOException, + YarnException { + onRpcCall("upgrade"); + String message = request.getMessage(); + if (message == null) { + message = "application containers upgraded by client"; + } + ActionUpgradeContainers upgradeContainers = + new ActionUpgradeContainers( + "Upgrade containers", + 1000, TimeUnit.MILLISECONDS, + LauncherExitCodes.EXIT_SUCCESS, + FinalApplicationStatus.SUCCEEDED, + request.getContainerList(), + request.getComponentList(), + message); + log.info("SliderAppMasterApi.upgradeContainers: {}", upgradeContainers); + schedule(upgradeContainers); + return Messages.UpgradeContainersResponseProto.getDefaultInstance(); + } + + @Override //SliderClusterProtocol + public Messages.FlexClusterResponseProto flexCluster(Messages.FlexClusterRequestProto request) + throws IOException { + onRpcCall("flex"); + String payload = request.getClusterSpec(); + ConfTreeSerDeser confTreeSerDeser = new ConfTreeSerDeser(); + ConfTree updatedResources = confTreeSerDeser.fromJson(payload); + schedule(new ActionFlexCluster("flex", 1, TimeUnit.MILLISECONDS, + updatedResources)); + return Messages.FlexClusterResponseProto.newBuilder().setResponse( + true).build(); + } + + @Override //SliderClusterProtocol + public Messages.GetJSONClusterStatusResponseProto getJSONClusterStatus( + Messages.GetJSONClusterStatusRequestProto request) + throws IOException, YarnException { + onRpcCall("getstatus"); + String result; + //quick update + //query and json-ify + ClusterDescription cd = state.refreshClusterStatus(); + result = cd.toJsonString(); + String stat = result; + return Messages.GetJSONClusterStatusResponseProto.newBuilder() + .setClusterSpec(stat) + .build(); + } + + @Override + public Messages.GetInstanceDefinitionResponseProto getInstanceDefinition( + Messages.GetInstanceDefinitionRequestProto request) + throws IOException, YarnException { + + onRpcCall("getinstancedefinition"); + String internal; + String resources; + String app; + AggregateConf instanceDefinition = + state.getInstanceDefinitionSnapshot(); + internal = instanceDefinition.getInternal().toJson(); + resources = instanceDefinition.getResources().toJson(); + app = instanceDefinition.getAppConf().toJson(); + assert internal != null; + assert resources != null; + assert app != null; + log.debug("Generating getInstanceDefinition Response"); + Messages.GetInstanceDefinitionResponseProto.Builder builder = + Messages.GetInstanceDefinitionResponseProto.newBuilder(); + builder.setInternal(internal); + builder.setResources(resources); + builder.setApplication(app); + return builder.build(); + } + + @Override //SliderClusterProtocol + public Messages.ListNodeUUIDsByRoleResponseProto listNodeUUIDsByRole(Messages.ListNodeUUIDsByRoleRequestProto request) + throws IOException, YarnException { + onRpcCall("listnodes)"); + String role = request.getRole(); + Messages.ListNodeUUIDsByRoleResponseProto.Builder builder = + Messages.ListNodeUUIDsByRoleResponseProto.newBuilder(); + List<RoleInstance> nodes = state.enumLiveInstancesInRole(role); + for (RoleInstance node : nodes) { + builder.addUuid(node.id); + } + return builder.build(); + } + + @Override //SliderClusterProtocol + public Messages.GetNodeResponseProto getNode(Messages.GetNodeRequestProto request) + throws IOException, YarnException { + onRpcCall("getnode"); + RoleInstance instance = state.getLiveInstanceByContainerID( + request.getUuid()); + return Messages.GetNodeResponseProto.newBuilder() + .setClusterNode(instance.toProtobuf()) + .build(); + } + + @Override //SliderClusterProtocol + public Messages.GetClusterNodesResponseProto getClusterNodes( + Messages.GetClusterNodesRequestProto request) + throws IOException, YarnException { + onRpcCall("getclusternodes"); + List<RoleInstance> + clusterNodes = state.getLiveInstancesByContainerIDs( + request.getUuidList()); + + Messages.GetClusterNodesResponseProto.Builder builder = + Messages.GetClusterNodesResponseProto.newBuilder(); + for (RoleInstance node : clusterNodes) { + builder.addClusterNode(node.toProtobuf()); + } + //at this point: a possibly empty list of nodes + return builder.build(); + } + + @Override + public Messages.EchoResponseProto echo(Messages.EchoRequestProto request) + throws IOException, YarnException { + onRpcCall("echo"); + Messages.EchoResponseProto.Builder builder = + Messages.EchoResponseProto.newBuilder(); + String text = request.getText(); + log.info("Echo request size ={}", text.length()); + log.info(text); + //now return it + builder.setText(text); + return builder.build(); + } + + @Override + public Messages.KillContainerResponseProto killContainer(Messages.KillContainerRequestProto request) + throws IOException, YarnException { + onRpcCall("killcontainer"); + String containerID = request.getId(); + log.info("Kill Container {}", containerID); + //throws NoSuchNodeException if it is missing + RoleInstance instance = + state.getLiveInstanceByContainerID(containerID); + queue(new ActionKillContainer(instance.getId(), 0, TimeUnit.MILLISECONDS, + amOperations)); + Messages.KillContainerResponseProto.Builder builder = + Messages.KillContainerResponseProto.newBuilder(); + builder.setSuccess(true); + return builder.build(); + } + + + @Override + public Messages.AMSuicideResponseProto amSuicide( + Messages.AMSuicideRequestProto request) + throws IOException { + onRpcCall("amsuicide"); + int signal = request.getSignal(); + String text = request.getText(); + if (text == null) { + text = ""; + } + int delay = request.getDelay(); + log.info("AM Suicide with signal {}, message {} delay = {}", signal, text, + delay); + ActionHalt action = new ActionHalt(signal, text, delay, + TimeUnit.MILLISECONDS); + schedule(action); + return Messages.AMSuicideResponseProto.getDefaultInstance(); + } + + @Override + public Messages.ApplicationLivenessInformationProto getLivenessInformation( + Messages.GetApplicationLivenessRequestProto request) throws IOException { + ApplicationLivenessInformation info = + state.getApplicationLivenessInformation(); + return marshall(info); + } + + @Override + public Messages.GetLiveContainersResponseProto getLiveContainers( + Messages.GetLiveContainersRequestProto request) + throws IOException { + Map<String, ContainerInformation> infoMap = + (Map<String, ContainerInformation>) cache.lookupWithIOE(LIVE_CONTAINERS); + Messages.GetLiveContainersResponseProto.Builder builder = + Messages.GetLiveContainersResponseProto.newBuilder(); + + for (Map.Entry<String, ContainerInformation> entry : infoMap.entrySet()) { + builder.addNames(entry.getKey()); + builder.addContainers(marshall(entry.getValue())); + } + return builder.build(); + } + + @Override + public Messages.ContainerInformationProto getLiveContainer(Messages.GetLiveContainerRequestProto request) + throws IOException { + String containerId = request.getContainerId(); + RoleInstance id = state.getLiveInstanceByContainerID(containerId); + ContainerInformation containerInformation = id.serialize(); + return marshall(containerInformation); + } + + @Override + public Messages.GetLiveComponentsResponseProto getLiveComponents(Messages.GetLiveComponentsRequestProto request) + throws IOException { + Map<String, ComponentInformation> infoMap = + (Map<String, ComponentInformation>) cache.lookupWithIOE(LIVE_COMPONENTS); + Messages.GetLiveComponentsResponseProto.Builder builder = + Messages.GetLiveComponentsResponseProto.newBuilder(); + + for (Map.Entry<String, ComponentInformation> entry : infoMap.entrySet()) { + builder.addNames(entry.getKey()); + builder.addComponents(marshall(entry.getValue())); + } + return builder.build(); + } + + + @Override + public Messages.ComponentInformationProto getLiveComponent(Messages.GetLiveComponentRequestProto request) + throws IOException { + String name = request.getName(); + try { + return marshall(state.getComponentInformation(name)); + } catch (YarnRuntimeException e) { + throw new FileNotFoundException("Unknown component: " + name); + } + } + + @Override + public Messages.GetLiveNodesResponseProto getLiveNodes(Messages.GetLiveNodesRequestProto request) + throws IOException { + NodeInformationList info = (NodeInformationList) cache.lookupWithIOE(LIVE_NODES); + Messages.GetLiveNodesResponseProto.Builder builder = + Messages.GetLiveNodesResponseProto.newBuilder(); + + for (NodeInformation nodeInformation : info) { + builder.addNodes(marshall(nodeInformation)); + } + return builder.build(); + } + + + @Override + public Messages.NodeInformationProto getLiveNode(Messages.GetLiveNodeRequestProto request) + throws IOException { + String name = request.getName(); + NodeInformation nodeInformation = state.getNodeInformation(name); + if (nodeInformation != null) { + return marshall(nodeInformation); + } else { + throw new FileNotFoundException("Unknown host: " + name); + } + } + + @Override + public Messages.WrappedJsonProto getModelDesired(Messages.EmptyPayloadProto request) throws IOException { + return lookupAggregateConf(MODEL_DESIRED); + } + + @Override + public Messages.WrappedJsonProto getModelDesiredAppconf(Messages.EmptyPayloadProto request) throws IOException { + return lookupConfTree(MODEL_DESIRED_APPCONF); + } + + @Override + public Messages.WrappedJsonProto getModelDesiredResources(Messages.EmptyPayloadProto request) throws IOException { + return lookupConfTree(MODEL_DESIRED_RESOURCES); + } + + @Override + public Messages.WrappedJsonProto getModelResolved(Messages.EmptyPayloadProto request) throws IOException { + return lookupAggregateConf(MODEL_RESOLVED); + } + + @Override + public Messages.WrappedJsonProto getModelResolvedAppconf(Messages.EmptyPayloadProto request) throws IOException { + return lookupConfTree(MODEL_RESOLVED_APPCONF); + } + + @Override + public Messages.WrappedJsonProto getModelResolvedResources(Messages.EmptyPayloadProto request) throws IOException { + return lookupConfTree(MODEL_RESOLVED_RESOURCES); + } + + @Override + public Messages.WrappedJsonProto getLiveResources(Messages.EmptyPayloadProto request) throws IOException { + return lookupConfTree(LIVE_RESOURCES); + } + + /** + * Helper method; look up an aggregate configuration in the cache from + * a key, or raise an exception + * @param key key to resolve + * @return the configuration + * @throws IOException on a failure + */ + + protected Messages.WrappedJsonProto lookupAggregateConf(String key) throws + IOException { + AggregateConf aggregateConf = (AggregateConf) cache.lookupWithIOE(key); + String json = AggregateConfSerDeser.toString(aggregateConf); + return wrap(json); + } + + /** + * Helper method; look up an conf tree in the cache from + * a key, or raise an exception + * @param key key to resolve + * @return the configuration + * @throws IOException on a failure + */ + protected Messages.WrappedJsonProto lookupConfTree(String key) throws + IOException { + ConfTree conf = (ConfTree) cache.lookupWithIOE(key); + String json = ConfTreeSerDeser.toString(conf); + return wrap(json); + } + + private Messages.WrappedJsonProto wrap(String json) { + Messages.WrappedJsonProto.Builder builder = + Messages.WrappedJsonProto.newBuilder(); + builder.setJson(json); + return builder.build(); + } + + @Override + public Messages.GetCertificateStoreResponseProto getClientCertificateStore(Messages.GetCertificateStoreRequestProto request) throws + IOException { + String hostname = request.getHostname(); + String clientId = request.getRequesterId(); + String password = request.getPassword(); + String type = request.getType(); + + SecurityStore store = null; + try { + if ( SecurityStore.StoreType.keystore.equals( + SecurityStore.StoreType.valueOf(type))) { + store = certificateManager.generateContainerKeystore(hostname, + clientId, + null, + password); + } else if (SecurityStore.StoreType.truststore.equals( + SecurityStore.StoreType.valueOf(type))) { + store = certificateManager.generateContainerTruststore(clientId, + null, + password); + + } else { + throw new IOException("Illegal store type"); + } + } catch (SliderException e) { + throw new IOException(e); + } + return marshall(store); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderRPCSecurityInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderRPCSecurityInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderRPCSecurityInfo.java new file mode 100644 index 0000000..4fd4910 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderRPCSecurityInfo.java @@ -0,0 +1,87 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.slider.server.appmaster.rpc; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.KerberosInfo; +import org.apache.hadoop.security.SecurityInfo; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.security.token.TokenInfo; +import org.apache.hadoop.security.token.TokenSelector; +import org.apache.hadoop.yarn.security.client.ClientToAMTokenSelector; +import org.apache.slider.common.SliderXmlConfKeys; + +import java.lang.annotation.Annotation; + +/** + * This is where security information goes. + * It is referred to in the <code>META-INF/services/org.apache.hadoop.security.SecurityInfo</code> + * resource of this JAR, which is used to find the binding info + */ +public class SliderRPCSecurityInfo extends SecurityInfo { + + @Override + public KerberosInfo getKerberosInfo(Class<?> protocol, Configuration conf) { + if (!protocol.equals(SliderClusterProtocolPB.class)) { + return null; + } + return new KerberosInfo() { + + @Override + public Class<? extends Annotation> annotationType() { + return null; + } + + @Override + public String serverPrincipal() { + return SliderXmlConfKeys.KEY_KERBEROS_PRINCIPAL; + } + + @Override + public String clientPrincipal() { + return null; + } + }; + } + + @Override + public TokenInfo getTokenInfo(Class<?> protocol, Configuration conf) { + if (!protocol.equals(SliderClusterProtocolPB.class)) { + return null; + } + return new TokenInfo() { + + @Override + public Class<? extends Annotation> annotationType() { + return null; + } + + @Override + public Class<? extends TokenSelector<? extends TokenIdentifier>> + value() { + return ClientToAMTokenSelector.class; + } + + @Override + public String toString() { + return "SliderClusterProtocolPB token info"; + } + }; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/security/SecurityConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/security/SecurityConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/security/SecurityConfiguration.java new file mode 100644 index 0000000..9a89c39 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/security/SecurityConfiguration.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.slider.server.appmaster.security; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import static org.apache.slider.core.main.LauncherExitCodes.EXIT_UNAUTHORIZED; +import org.apache.slider.common.SliderKeys; +import org.apache.slider.common.SliderXmlConfKeys; +import org.apache.slider.common.tools.SliderUtils; +import org.apache.slider.core.conf.AggregateConf; +import org.apache.slider.core.exceptions.SliderException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; + +/** + * Class keeping code security information + */ +public class SecurityConfiguration { + + protected static final Logger log = + LoggerFactory.getLogger(SecurityConfiguration.class); + private final Configuration configuration; + private final AggregateConf instanceDefinition; + private String clusterName; + + public SecurityConfiguration(Configuration configuration, + AggregateConf instanceDefinition, + String clusterName) throws SliderException { + Preconditions.checkNotNull(configuration); + Preconditions.checkNotNull(instanceDefinition); + Preconditions.checkNotNull(clusterName); + this.configuration = configuration; + this.instanceDefinition = instanceDefinition; + this.clusterName = clusterName; + validate(); + } + + private void validate() throws SliderException { + if (isSecurityEnabled()) { + String principal = instanceDefinition.getAppConfOperations() + .getComponent(SliderKeys.COMPONENT_AM).get(SliderXmlConfKeys.KEY_KEYTAB_PRINCIPAL); + if(SliderUtils.isUnset(principal)) { + // if no login identity is available, fail + UserGroupInformation loginUser = null; + try { + loginUser = getLoginUser(); + } catch (IOException e) { + throw new SliderException(EXIT_UNAUTHORIZED, e, + "No principal configured for the application and " + + "exception raised during retrieval of login user. " + + "Unable to proceed with application " + + "initialization. Please ensure a value " + + "for %s exists in the application " + + "configuration or the login issue is addressed", + SliderXmlConfKeys.KEY_KEYTAB_PRINCIPAL); + } + if (loginUser == null) { + throw new SliderException(EXIT_UNAUTHORIZED, + "No principal configured for the application " + + "and no login user found. " + + "Unable to proceed with application " + + "initialization. Please ensure a value " + + "for %s exists in the application " + + "configuration or the login issue is addressed", + SliderXmlConfKeys.KEY_KEYTAB_PRINCIPAL); + } + } + // ensure that either local or distributed keytab mechanism is enabled, + // but not both + String keytabFullPath = instanceDefinition.getAppConfOperations() + .getComponent(SliderKeys.COMPONENT_AM) + .get(SliderXmlConfKeys.KEY_AM_KEYTAB_LOCAL_PATH); + String keytabName = instanceDefinition.getAppConfOperations() + .getComponent(SliderKeys.COMPONENT_AM) + .get(SliderXmlConfKeys.KEY_AM_LOGIN_KEYTAB_NAME); + if (SliderUtils.isSet(keytabFullPath) && SliderUtils.isSet(keytabName)) { + throw new SliderException(EXIT_UNAUTHORIZED, + "Both a keytab on the cluster host (%s) and a" + + " keytab to be retrieved from HDFS (%s) are" + + " specified. Please configure only one keytab" + + " retrieval mechanism.", + SliderXmlConfKeys.KEY_AM_KEYTAB_LOCAL_PATH, + SliderXmlConfKeys.KEY_AM_LOGIN_KEYTAB_NAME); + + } + } + } + + protected UserGroupInformation getLoginUser() throws IOException { + return UserGroupInformation.getLoginUser(); + } + + public boolean isSecurityEnabled () { + return SliderUtils.isHadoopClusterSecure(configuration); + } + + public String getPrincipal () throws IOException { + String principal = instanceDefinition.getAppConfOperations() + .getComponent(SliderKeys.COMPONENT_AM).get(SliderXmlConfKeys.KEY_KEYTAB_PRINCIPAL); + if (SliderUtils.isUnset(principal)) { + principal = UserGroupInformation.getLoginUser().getShortUserName(); + log.info("No principal set in the slider configuration. Will use AM login" + + " identity {} to attempt keytab-based login", principal); + } + + return principal; + } + + public boolean isKeytabProvided() { + boolean keytabProvided = instanceDefinition.getAppConfOperations() + .getComponent(SliderKeys.COMPONENT_AM) + .get(SliderXmlConfKeys.KEY_AM_KEYTAB_LOCAL_PATH) != null || + instanceDefinition.getAppConfOperations() + .getComponent(SliderKeys.COMPONENT_AM). + get(SliderXmlConfKeys.KEY_AM_LOGIN_KEYTAB_NAME) != null; + return keytabProvided; + + } + + public File getKeytabFile(AggregateConf instanceDefinition) + throws SliderException, IOException { + String keytabFullPath = instanceDefinition.getAppConfOperations() + .getComponent(SliderKeys.COMPONENT_AM) + .get(SliderXmlConfKeys.KEY_AM_KEYTAB_LOCAL_PATH); + File localKeytabFile; + if (SliderUtils.isUnset(keytabFullPath)) { + // get the keytab + String keytabName = instanceDefinition.getAppConfOperations() + .getComponent(SliderKeys.COMPONENT_AM). + get(SliderXmlConfKeys.KEY_AM_LOGIN_KEYTAB_NAME); + log.info("No host keytab file path specified. Will attempt to retrieve" + + " keytab file {} as a local resource for the container", + keytabName); + // download keytab to local, protected directory + localKeytabFile = new File(SliderKeys.KEYTAB_DIR, keytabName); + } else { + log.info("Using host keytab file {} for login", keytabFullPath); + localKeytabFile = new File(keytabFullPath); + } + return localKeytabFile; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/AbstractClusterServices.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/AbstractClusterServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/AbstractClusterServices.java new file mode 100644 index 0000000..54f384b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/AbstractClusterServices.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.appmaster.state; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; + +/** + * Cluster services offered by the YARN infrastructure. + */ +public abstract class AbstractClusterServices { + + private final DefaultResourceCalculator + defaultResourceCalculator = new DefaultResourceCalculator(); + + /** + * Create a resource for requests + * @return a resource which can be built up. + */ + public abstract Resource newResource(); + + public abstract Resource newResource(int memory, int cores); + + /** + * Normalise memory, CPU and other resources according to the YARN AM-supplied + * values and the resource calculator in use (currently hard-coded to the + * {@link DefaultResourceCalculator}. + * Those resources which aren't normalized (currently: CPU) are left + * as is. + * @param resource resource requirements of a role + * @param minR minimum values of this queue + * @param maxR max values of this queue + * @return a normalized value. + */ + public Resource normalize(Resource resource, Resource minR, Resource maxR) { + Preconditions.checkArgument(resource != null, "null resource"); + Preconditions.checkArgument(minR != null, "null minR"); + Preconditions.checkArgument(maxR != null, "null maxR"); + + Resource normalize = defaultResourceCalculator.normalize(resource, minR, + maxR, minR); + return newResource(normalize.getMemory(), resource.getVirtualCores()); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org