http://git-wip-us.apache.org/repos/asf/hadoop/blob/42b69405/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineConnector.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineConnector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineConnector.java new file mode 100644 index 0000000..b5b5f77 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineConnector.java @@ -0,0 +1,440 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client.api.impl; + +import java.io.IOException; +import java.lang.reflect.UndeclaredThrowableException; +import java.net.ConnectException; +import java.net.HttpURLConnection; +import java.net.SocketTimeoutException; +import java.net.URI; +import java.net.URL; +import java.net.URLConnection; +import java.security.GeneralSecurityException; +import java.security.PrivilegedExceptionAction; + +import javax.net.ssl.HostnameVerifier; +import javax.net.ssl.HttpsURLConnection; +import javax.net.ssl.SSLSocketFactory; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authentication.client.AuthenticationException; +import org.apache.hadoop.security.authentication.client.ConnectionConfigurator; +import org.apache.hadoop.security.ssl.SSLFactory; +import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL; +import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL.Token; +import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator; +import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator; +import org.apache.hadoop.security.token.delegation.web.PseudoDelegationTokenAuthenticator; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.ClientHandlerException; +import com.sun.jersey.api.client.ClientRequest; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.config.ClientConfig; +import com.sun.jersey.api.client.config.DefaultClientConfig; +import com.sun.jersey.api.client.filter.ClientFilter; +import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory; +import com.sun.jersey.client.urlconnection.URLConnectionClientHandler; + +/** + * Utility Connector class which is used by timeline clients to securely get + * connected to the timeline server. + * + */ +public class TimelineConnector extends AbstractService { + + private static final Joiner JOINER = Joiner.on(""); + private static final Log LOG = LogFactory.getLog(TimelineConnector.class); + public final static int DEFAULT_SOCKET_TIMEOUT = 1 * 60 * 1000; // 1 minute + + private SSLFactory sslFactory; + private Client client; + private ConnectionConfigurator connConfigurator; + private DelegationTokenAuthenticator authenticator; + private DelegationTokenAuthenticatedURL.Token token; + private UserGroupInformation authUgi; + private String doAsUser; + @VisibleForTesting + TimelineClientConnectionRetry connectionRetry; + private boolean requireConnectionRetry; + + public TimelineConnector(boolean requireConnectionRetry, + UserGroupInformation authUgi, String doAsUser, + DelegationTokenAuthenticatedURL.Token token) { + super("TimelineConnector"); + this.requireConnectionRetry = requireConnectionRetry; + this.authUgi = authUgi; + this.doAsUser = doAsUser; + this.token = token; + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + ClientConfig cc = new DefaultClientConfig(); + cc.getClasses().add(YarnJacksonJaxbJsonProvider.class); + + sslFactory = getSSLFactory(conf); + connConfigurator = getConnConfigurator(sslFactory); + + if (UserGroupInformation.isSecurityEnabled()) { + authenticator = new KerberosDelegationTokenAuthenticator(); + } else { + authenticator = new PseudoDelegationTokenAuthenticator(); + } + authenticator.setConnectionConfigurator(connConfigurator); + + connectionRetry = new TimelineClientConnectionRetry(conf); + client = + new Client( + new URLConnectionClientHandler(new TimelineURLConnectionFactory( + authUgi, authenticator, connConfigurator, token, doAsUser)), + cc); + if (requireConnectionRetry) { + TimelineJerseyRetryFilter retryFilter = + new TimelineJerseyRetryFilter(connectionRetry); + client.addFilter(retryFilter); + } + } + + private static final ConnectionConfigurator DEFAULT_TIMEOUT_CONN_CONFIGURATOR + = new ConnectionConfigurator() { + @Override + public HttpURLConnection configure(HttpURLConnection conn) + throws IOException { + setTimeouts(conn, DEFAULT_SOCKET_TIMEOUT); + return conn; + } + }; + + private ConnectionConfigurator getConnConfigurator(SSLFactory sslFactoryObj) { + try { + return initSslConnConfigurator(DEFAULT_SOCKET_TIMEOUT, sslFactoryObj); + } catch (Exception e) { + LOG.debug("Cannot load customized ssl related configuration. " + + "Fallback to system-generic settings.", e); + return DEFAULT_TIMEOUT_CONN_CONFIGURATOR; + } + } + + private static ConnectionConfigurator initSslConnConfigurator( + final int timeout, SSLFactory sslFactory) + throws IOException, GeneralSecurityException { + final SSLSocketFactory sf; + final HostnameVerifier hv; + + sf = sslFactory.createSSLSocketFactory(); + hv = sslFactory.getHostnameVerifier(); + + return new ConnectionConfigurator() { + @Override + public HttpURLConnection configure(HttpURLConnection conn) + throws IOException { + if (conn instanceof HttpsURLConnection) { + HttpsURLConnection c = (HttpsURLConnection) conn; + c.setSSLSocketFactory(sf); + c.setHostnameVerifier(hv); + } + setTimeouts(conn, timeout); + return conn; + } + }; + } + + protected SSLFactory getSSLFactory(Configuration conf) + throws GeneralSecurityException, IOException { + SSLFactory newSSLFactory = new SSLFactory(SSLFactory.Mode.CLIENT, conf); + newSSLFactory.init(); + return newSSLFactory; + } + + private static void setTimeouts(URLConnection connection, int socketTimeout) { + connection.setConnectTimeout(socketTimeout); + connection.setReadTimeout(socketTimeout); + } + + public static URI constructResURI(Configuration conf, String address, + String uri) { + return URI.create( + JOINER.join(YarnConfiguration.useHttps(conf) ? "https://" : "http://", + address, uri)); + } + + DelegationTokenAuthenticatedURL getDelegationTokenAuthenticatedURL() { + return new DelegationTokenAuthenticatedURL(authenticator, connConfigurator); + } + + protected void serviceStop() { + if (this.sslFactory != null) { + this.sslFactory.destroy(); + } + } + + public Client getClient() { + return client; + } + + public Object operateDelegationToken( + final PrivilegedExceptionAction<?> action) + throws IOException, YarnException { + // Set up the retry operation + TimelineClientRetryOp tokenRetryOp = + createRetryOpForOperateDelegationToken(action); + + return connectionRetry.retryOn(tokenRetryOp); + } + + @Private + @VisibleForTesting + TimelineClientRetryOp createRetryOpForOperateDelegationToken( + final PrivilegedExceptionAction<?> action) throws IOException { + return new TimelineClientRetryOpForOperateDelegationToken(this.authUgi, + action); + } + + /** + * Abstract class for an operation that should be retried by timeline client. + */ + @Private + @VisibleForTesting + public static abstract class TimelineClientRetryOp { + // The operation that should be retried + public abstract Object run() throws IOException; + + // The method to indicate if we should retry given the incoming exception + public abstract boolean shouldRetryOn(Exception e); + } + + private static class TimelineURLConnectionFactory + implements HttpURLConnectionFactory { + private DelegationTokenAuthenticator authenticator; + private UserGroupInformation authUgi; + private ConnectionConfigurator connConfigurator; + private Token token; + private String doAsUser; + + public TimelineURLConnectionFactory(UserGroupInformation authUgi, + DelegationTokenAuthenticator authenticator, + ConnectionConfigurator connConfigurator, + DelegationTokenAuthenticatedURL.Token token, String doAsUser) { + this.authUgi = authUgi; + this.authenticator = authenticator; + this.connConfigurator = connConfigurator; + this.token = token; + this.doAsUser = doAsUser; + } + + @Override + public HttpURLConnection getHttpURLConnection(final URL url) + throws IOException { + authUgi.checkTGTAndReloginFromKeytab(); + try { + return new DelegationTokenAuthenticatedURL(authenticator, + connConfigurator).openConnection(url, token, doAsUser); + } catch (UndeclaredThrowableException e) { + throw new IOException(e.getCause()); + } catch (AuthenticationException ae) { + throw new IOException(ae); + } + } + + } + + // Class to handle retry + // Outside this class, only visible to tests + @Private + @VisibleForTesting + static class TimelineClientConnectionRetry { + + // maxRetries < 0 means keep trying + @Private + @VisibleForTesting + public int maxRetries; + + @Private + @VisibleForTesting + public long retryInterval; + + // Indicates if retries happened last time. Only tests should read it. + // In unit tests, retryOn() calls should _not_ be concurrent. + private boolean retried = false; + + @Private + @VisibleForTesting + boolean getRetired() { + return retried; + } + + // Constructor with default retry settings + public TimelineClientConnectionRetry(Configuration conf) { + Preconditions.checkArgument( + conf.getInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES) + >= -1, + "%s property value should be greater than or equal to -1", + YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES); + Preconditions.checkArgument( + conf.getLong( + YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS, + YarnConfiguration. + DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS) > 0, + "%s property value should be greater than zero", + YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS); + maxRetries = + conf.getInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES); + retryInterval = conf.getLong( + YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS); + } + + public Object retryOn(TimelineClientRetryOp op) + throws RuntimeException, IOException { + int leftRetries = maxRetries; + retried = false; + + // keep trying + while (true) { + try { + // try perform the op, if fail, keep retrying + return op.run(); + } catch (IOException | RuntimeException e) { + // break if there's no retries left + if (leftRetries == 0) { + break; + } + if (op.shouldRetryOn(e)) { + logException(e, leftRetries); + } else { + throw e; + } + } + if (leftRetries > 0) { + leftRetries--; + } + retried = true; + try { + // sleep for the given time interval + Thread.sleep(retryInterval); + } catch (InterruptedException ie) { + LOG.warn("Client retry sleep interrupted! "); + } + } + throw new RuntimeException("Failed to connect to timeline server. " + + "Connection retries limit exceeded. " + + "The posted timeline event may be missing"); + }; + + private void logException(Exception e, int leftRetries) { + if (leftRetries > 0) { + LOG.info( + "Exception caught by TimelineClientConnectionRetry," + " will try " + + leftRetries + " more time(s).\nMessage: " + e.getMessage()); + } else { + // note that maxRetries may be -1 at the very beginning + LOG.info("ConnectionException caught by TimelineClientConnectionRetry," + + " will keep retrying.\nMessage: " + e.getMessage()); + } + } + } + + private static class TimelineJerseyRetryFilter extends ClientFilter { + private TimelineClientConnectionRetry connectionRetry; + + public TimelineJerseyRetryFilter( + TimelineClientConnectionRetry connectionRetry) { + this.connectionRetry = connectionRetry; + } + + @Override + public ClientResponse handle(final ClientRequest cr) + throws ClientHandlerException { + // Set up the retry operation + TimelineClientRetryOp jerseyRetryOp = new TimelineClientRetryOp() { + @Override + public Object run() { + // Try pass the request, if fail, keep retrying + return getNext().handle(cr); + } + + @Override + public boolean shouldRetryOn(Exception e) { + // Only retry on connection exceptions + return (e instanceof ClientHandlerException) + && (e.getCause() instanceof ConnectException + || e.getCause() instanceof SocketTimeoutException); + } + }; + try { + return (ClientResponse) connectionRetry.retryOn(jerseyRetryOp); + } catch (IOException e) { + throw new ClientHandlerException( + "Jersey retry failed!\nMessage: " + e.getMessage()); + } + } + } + + @Private + @VisibleForTesting + public static class TimelineClientRetryOpForOperateDelegationToken + extends TimelineClientRetryOp { + + private final UserGroupInformation authUgi; + private final PrivilegedExceptionAction<?> action; + + public TimelineClientRetryOpForOperateDelegationToken( + UserGroupInformation authUgi, PrivilegedExceptionAction<?> action) { + this.authUgi = authUgi; + this.action = action; + } + + @Override + public Object run() throws IOException { + // Try pass the request, if fail, keep retrying + authUgi.checkTGTAndReloginFromKeytab(); + try { + return authUgi.doAs(action); + } catch (UndeclaredThrowableException e) { + throw new IOException(e.getCause()); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + + @Override + public boolean shouldRetryOn(Exception e) { + // retry on connection exceptions + // and SocketTimeoutException + return (e instanceof ConnectException + || e instanceof SocketTimeoutException); + } + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/42b69405/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java new file mode 100644 index 0000000..848e238 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineV2ClientImpl.java @@ -0,0 +1,459 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client.api.impl; + +import java.io.IOException; +import java.net.URI; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.FutureTask; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.MultivaluedMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.client.api.TimelineV2Client; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; + +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.core.util.MultivaluedMapImpl; + +/** + * Implementation of timeline v2 client interface. + * + */ +public class TimelineV2ClientImpl extends TimelineV2Client { + private static final Log LOG = LogFactory.getLog(TimelineV2ClientImpl.class); + + private static final String RESOURCE_URI_STR_V2 = "/ws/v2/timeline/"; + + private TimelineEntityDispatcher entityDispatcher; + private volatile String timelineServiceAddress; + + // Retry parameters for identifying new timeline service + // TODO consider to merge with connection retry + private int maxServiceRetries; + private long serviceRetryInterval; + + private TimelineConnector connector; + + private ApplicationId contextAppId; + + public TimelineV2ClientImpl(ApplicationId appId) { + super(TimelineV2ClientImpl.class.getName()); + this.contextAppId = appId; + } + + public ApplicationId getContextAppId() { + return contextAppId; + } + + protected void serviceInit(Configuration conf) throws Exception { + if (!YarnConfiguration.timelineServiceEnabled(conf) + || (int) YarnConfiguration.getTimelineServiceVersion(conf) != 2) { + throw new IOException("Timeline V2 client is not properly configured. " + + "Either timeline service is not enabled or version is not set to" + + " 2"); + } + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + UserGroupInformation realUgi = ugi.getRealUser(); + String doAsUser = null; + UserGroupInformation authUgi = null; + if (realUgi != null) { + authUgi = realUgi; + doAsUser = ugi.getShortUserName(); + } else { + authUgi = ugi; + doAsUser = null; + } + + // TODO need to add/cleanup filter retry later for ATSV2. similar to V1 + DelegationTokenAuthenticatedURL.Token token = + new DelegationTokenAuthenticatedURL.Token(); + connector = new TimelineConnector(false, authUgi, doAsUser, token); + addIfService(connector); + + // new version need to auto discovery (with retry till ATS v2 address is + // got). + maxServiceRetries = + conf.getInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES); + serviceRetryInterval = conf.getLong( + YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS); + entityDispatcher = new TimelineEntityDispatcher(conf); + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + super.serviceStart(); + entityDispatcher.start(); + } + + @Override + protected void serviceStop() throws Exception { + entityDispatcher.stop(); + super.serviceStop(); + } + + @Override + public void putEntities(TimelineEntity... entities) + throws IOException, YarnException { + entityDispatcher.dispatchEntities(true, entities); + } + + @Override + public void putEntitiesAsync(TimelineEntity... entities) + throws IOException, YarnException { + entityDispatcher.dispatchEntities(false, entities); + } + + @Override + public void setTimelineServiceAddress(String address) { + this.timelineServiceAddress = address; + } + + @Private + protected void putObjects(String path, MultivaluedMap<String, String> params, + Object obj) throws IOException, YarnException { + + int retries = verifyRestEndPointAvailable(); + + // timelineServiceAddress could be stale, add retry logic here. + boolean needRetry = true; + while (needRetry) { + try { + URI uri = TimelineConnector.constructResURI(getConfig(), + timelineServiceAddress, RESOURCE_URI_STR_V2); + putObjects(uri, path, params, obj); + needRetry = false; + } catch (IOException e) { + // handle exception for timelineServiceAddress being updated. + checkRetryWithSleep(retries, e); + retries--; + } + } + } + + /** + * Check if reaching to maximum of retries. + * + * @param retries + * @param e + */ + private void checkRetryWithSleep(int retries, IOException e) + throws YarnException, IOException { + if (retries > 0) { + try { + Thread.sleep(this.serviceRetryInterval); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new YarnException("Interrupted while retrying to connect to ATS"); + } + } else { + StringBuilder msg = + new StringBuilder("TimelineClient has reached to max retry times : "); + msg.append(this.maxServiceRetries); + msg.append(" for service address: "); + msg.append(timelineServiceAddress); + LOG.error(msg.toString()); + throw new IOException(msg.toString(), e); + } + } + + protected void putObjects(URI base, String path, + MultivaluedMap<String, String> params, Object obj) + throws IOException, YarnException { + ClientResponse resp; + try { + resp = connector.getClient().resource(base).path(path).queryParams(params) + .accept(MediaType.APPLICATION_JSON).type(MediaType.APPLICATION_JSON) + .put(ClientResponse.class, obj); + } catch (RuntimeException re) { + // runtime exception is expected if the client cannot connect the server + String msg = "Failed to get the response from the timeline server."; + LOG.error(msg, re); + throw new IOException(re); + } + if (resp == null || + resp.getClientResponseStatus() != ClientResponse.Status.OK) { + String msg = + "Response from the timeline server is " + ((resp == null) ? "null" + : "not successful," + " HTTP error code: " + resp.getStatus() + + ", Server response:\n" + resp.getEntity(String.class)); + LOG.error(msg); + throw new YarnException(msg); + } + } + + private int verifyRestEndPointAvailable() throws YarnException { + // timelineServiceAddress could haven't be initialized yet + // or stale (only for new timeline service) + int retries = pollTimelineServiceAddress(this.maxServiceRetries); + if (timelineServiceAddress == null) { + String errMessage = "TimelineClient has reached to max retry times : " + + this.maxServiceRetries + + ", but failed to fetch timeline service address. Please verify" + + " Timeline Auxiliary Service is configured in all the NMs"; + LOG.error(errMessage); + throw new YarnException(errMessage); + } + return retries; + } + + /** + * Poll TimelineServiceAddress for maximum of retries times if it is null. + * + * @param retries + * @return the left retry times + * @throws IOException + */ + private int pollTimelineServiceAddress(int retries) throws YarnException { + while (timelineServiceAddress == null && retries > 0) { + try { + Thread.sleep(this.serviceRetryInterval); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new YarnException("Interrupted while trying to connect ATS"); + } + retries--; + } + return retries; + } + + private final class EntitiesHolder extends FutureTask<Void> { + private final TimelineEntities entities; + private final boolean isSync; + + EntitiesHolder(final TimelineEntities entities, final boolean isSync) { + super(new Callable<Void>() { + // publishEntities() + public Void call() throws Exception { + MultivaluedMap<String, String> params = new MultivaluedMapImpl(); + params.add("appid", getContextAppId().toString()); + params.add("async", Boolean.toString(!isSync)); + putObjects("entities", params, entities); + return null; + } + }); + this.entities = entities; + this.isSync = isSync; + } + + public boolean isSync() { + return isSync; + } + + public TimelineEntities getEntities() { + return entities; + } + } + + /** + * This class is responsible for collecting the timeline entities and + * publishing them in async. + */ + private class TimelineEntityDispatcher { + /** + * Time period for which the timelineclient will wait for draining after + * stop. + */ + private static final long DRAIN_TIME_PERIOD = 2000L; + + private int numberOfAsyncsToMerge; + private final BlockingQueue<EntitiesHolder> timelineEntityQueue; + private ExecutorService executor; + + TimelineEntityDispatcher(Configuration conf) { + timelineEntityQueue = new LinkedBlockingQueue<EntitiesHolder>(); + numberOfAsyncsToMerge = + conf.getInt(YarnConfiguration.NUMBER_OF_ASYNC_ENTITIES_TO_MERGE, + YarnConfiguration.DEFAULT_NUMBER_OF_ASYNC_ENTITIES_TO_MERGE); + } + + Runnable createRunnable() { + return new Runnable() { + @Override + public void run() { + try { + EntitiesHolder entitiesHolder; + while (!Thread.currentThread().isInterrupted()) { + // Merge all the async calls and make one push, but if its sync + // call push immediately + try { + entitiesHolder = timelineEntityQueue.take(); + } catch (InterruptedException ie) { + LOG.info("Timeline dispatcher thread was interrupted "); + Thread.currentThread().interrupt(); + return; + } + if (entitiesHolder != null) { + publishWithoutBlockingOnQueue(entitiesHolder); + } + } + } finally { + if (!timelineEntityQueue.isEmpty()) { + LOG.info("Yet to publish " + timelineEntityQueue.size() + + " timelineEntities, draining them now. "); + } + // Try to drain the remaining entities to be published @ the max for + // 2 seconds + long timeTillweDrain = + System.currentTimeMillis() + DRAIN_TIME_PERIOD; + while (!timelineEntityQueue.isEmpty()) { + publishWithoutBlockingOnQueue(timelineEntityQueue.poll()); + if (System.currentTimeMillis() > timeTillweDrain) { + // time elapsed stop publishing further.... + if (!timelineEntityQueue.isEmpty()) { + LOG.warn("Time to drain elapsed! Remaining " + + timelineEntityQueue.size() + "timelineEntities will not" + + " be published"); + // if some entities were not drained then we need interrupt + // the threads which had put sync EntityHolders to the queue. + EntitiesHolder nextEntityInTheQueue = null; + while ((nextEntityInTheQueue = + timelineEntityQueue.poll()) != null) { + nextEntityInTheQueue.cancel(true); + } + } + break; + } + } + } + } + + /** + * Publishes the given EntitiesHolder and return immediately if sync + * call, else tries to fetch the EntitiesHolder from the queue in non + * blocking fashion and collate the Entities if possible before + * publishing through REST. + * + * @param entitiesHolder + */ + private void publishWithoutBlockingOnQueue( + EntitiesHolder entitiesHolder) { + if (entitiesHolder.isSync()) { + entitiesHolder.run(); + return; + } + int count = 1; + while (true) { + // loop till we find a sync put Entities or there is nothing + // to take + EntitiesHolder nextEntityInTheQueue = timelineEntityQueue.poll(); + if (nextEntityInTheQueue == null) { + // Nothing in the queue just publish and get back to the + // blocked wait state + entitiesHolder.run(); + break; + } else if (nextEntityInTheQueue.isSync()) { + // flush all the prev async entities first + entitiesHolder.run(); + // and then flush the sync entity + nextEntityInTheQueue.run(); + break; + } else { + // append all async entities together and then flush + entitiesHolder.getEntities().addEntities( + nextEntityInTheQueue.getEntities().getEntities()); + count++; + if (count == numberOfAsyncsToMerge) { + // Flush the entities if the number of the async + // putEntites merged reaches the desired limit. To avoid + // collecting multiple entities and delaying for a long + // time. + entitiesHolder.run(); + break; + } + } + } + } + }; + } + + public void dispatchEntities(boolean sync, + TimelineEntity[] entitiesTobePublished) throws YarnException { + if (executor.isShutdown()) { + throw new YarnException("Timeline client is in the process of stopping," + + " not accepting any more TimelineEntities"); + } + + // wrap all TimelineEntity into TimelineEntities object + TimelineEntities entities = new TimelineEntities(); + for (TimelineEntity entity : entitiesTobePublished) { + entities.addEntity(entity); + } + + // created a holder and place it in queue + EntitiesHolder entitiesHolder = new EntitiesHolder(entities, sync); + try { + timelineEntityQueue.put(entitiesHolder); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new YarnException( + "Failed while adding entity to the queue for publishing", e); + } + + if (sync) { + // In sync call we need to wait till its published and if any error then + // throw it back + try { + entitiesHolder.get(); + } catch (ExecutionException e) { + throw new YarnException("Failed while publishing entity", + e.getCause()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new YarnException("Interrupted while publishing entity", e); + } + } + } + + public void start() { + executor = Executors.newSingleThreadExecutor(); + executor.execute(createRunnable()); + } + + public void stop() { + LOG.info("Stopping TimelineClient."); + executor.shutdownNow(); + try { + executor.awaitTermination(DRAIN_TIME_PERIOD, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + e.printStackTrace(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/42b69405/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java index a1d4449..a44a8ae 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java @@ -215,11 +215,11 @@ public class TestTimelineClient { + "Timeline server should be off to run this test. "); } catch (RuntimeException ce) { Assert.assertTrue( - "Handler exception for reason other than retry: " + ce.getMessage(), - ce.getMessage().contains("Connection retries limit exceeded")); + "Handler exception for reason other than retry: " + ce.getMessage(), + ce.getMessage().contains("Connection retries limit exceeded")); // we would expect this exception here, check if the client has retried - Assert.assertTrue("Retry filter didn't perform any retries! ", client - .connectionRetry.getRetired()); + Assert.assertTrue("Retry filter didn't perform any retries! ", + client.connector.connectionRetry.getRetired()); } } @@ -318,7 +318,7 @@ public class TestTimelineClient { .getMessage().contains("Connection retries limit exceeded")); // we would expect this exception here, check if the client has retried Assert.assertTrue("Retry filter didn't perform any retries! ", - client.connectionRetry.getRetired()); + client.connector.connectionRetry.getRetired()); } public static ClientResponse mockEntityClientResponse( @@ -419,17 +419,26 @@ public class TestTimelineClient { private TimelineClientImpl createTimelineClientFakeTimelineClientRetryOp( YarnConfiguration conf) { TimelineClientImpl client = new TimelineClientImpl() { - @Override - public TimelineClientRetryOp - createTimelineClientRetryOpForOperateDelegationToken( - final PrivilegedExceptionAction<?> action) throws IOException { - TimelineClientRetryOpForOperateDelegationToken op = - spy(new TimelineClientRetryOpForOperateDelegationToken( - UserGroupInformation.getCurrentUser(), action)); - doThrow(new SocketTimeoutException("Test socketTimeoutException")) - .when(op).run(); - return op; + protected TimelineConnector createTimelineConnector() { + TimelineConnector connector = + new TimelineConnector(true, authUgi, doAsUser, token) { + @Override + public TimelineClientRetryOp + createRetryOpForOperateDelegationToken( + final PrivilegedExceptionAction<?> action) + throws IOException { + TimelineClientRetryOpForOperateDelegationToken op = + spy(new TimelineClientRetryOpForOperateDelegationToken( + UserGroupInformation.getCurrentUser(), action)); + doThrow( + new SocketTimeoutException("Test socketTimeoutException")) + .when(op).run(); + return op; + } + }; + addIfService(connector); + return connector; } }; client.init(conf); http://git-wip-us.apache.org/repos/asf/hadoop/blob/42b69405/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java index 5813340..c5b02fd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java @@ -50,7 +50,7 @@ public class TestTimelineClientV2Impl { public void setup() { conf = new YarnConfiguration(); conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); - conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.0f); + conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f); conf.setInt(YarnConfiguration.NUMBER_OF_ASYNC_ENTITIES_TO_MERGE, 3); if (!currTestName.getMethodName() .contains("testRetryOnConnectionFailure")) { @@ -71,7 +71,7 @@ public class TestTimelineClientV2Impl { } private class TestV2TimelineClientForExceptionHandling - extends TimelineClientImpl { + extends TimelineV2ClientImpl { public TestV2TimelineClientForExceptionHandling(ApplicationId id) { super(id); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/42b69405/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java index 45b9213..851ba53 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java @@ -40,7 +40,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation; -import org.apache.hadoop.yarn.client.api.TimelineClient; +import org.apache.hadoop.yarn.client.api.TimelineV2Client; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; @@ -77,7 +77,7 @@ public class NMTimelinePublisher extends CompositeService { private String httpAddress; - private final Map<ApplicationId, TimelineClient> appToClientMap; + private final Map<ApplicationId, TimelineV2Client> appToClientMap; public NMTimelinePublisher(Context context) { super(NMTimelinePublisher.class.getName()); @@ -103,7 +103,7 @@ public class NMTimelinePublisher extends CompositeService { } @VisibleForTesting - Map<ApplicationId, TimelineClient> getAppToClientMap() { + Map<ApplicationId, TimelineV2Client> getAppToClientMap() { return appToClientMap; } @@ -148,7 +148,7 @@ public class NMTimelinePublisher extends CompositeService { try { // no need to put it as part of publisher as timeline client already has // Queuing concept - TimelineClient timelineClient = getTimelineClient(appId); + TimelineV2Client timelineClient = getTimelineClient(appId); if (timelineClient != null) { timelineClient.putEntitiesAsync(entity); } else { @@ -242,7 +242,7 @@ public class NMTimelinePublisher extends CompositeService { try { // no need to put it as part of publisher as timeline client already has // Queuing concept - TimelineClient timelineClient = getTimelineClient(appId); + TimelineV2Client timelineClient = getTimelineClient(appId); if (timelineClient != null) { timelineClient.putEntitiesAsync(entity); } else { @@ -273,7 +273,7 @@ public class NMTimelinePublisher extends CompositeService { LOG.debug("Publishing the entity " + entity + ", JSON-style content: " + TimelineUtils.dumpTimelineRecordtoJSON(entity)); } - TimelineClient timelineClient = getTimelineClient(appId); + TimelineV2Client timelineClient = getTimelineClient(appId); if (timelineClient != null) { timelineClient.putEntities(entity); } else { @@ -390,8 +390,8 @@ public class NMTimelinePublisher extends CompositeService { public void createTimelineClient(ApplicationId appId) { if (!appToClientMap.containsKey(appId)) { - TimelineClient timelineClient = - TimelineClient.createTimelineClient(appId); + TimelineV2Client timelineClient = + TimelineV2Client.createTimelineClient(appId); timelineClient.init(getConfig()); timelineClient.start(); appToClientMap.put(appId, timelineClient); @@ -399,7 +399,7 @@ public class NMTimelinePublisher extends CompositeService { } public void stopTimelineClient(ApplicationId appId) { - TimelineClient client = appToClientMap.remove(appId); + TimelineV2Client client = appToClientMap.remove(appId); if (client != null) { client.stop(); } @@ -407,13 +407,13 @@ public class NMTimelinePublisher extends CompositeService { public void setTimelineServiceAddress(ApplicationId appId, String collectorAddr) { - TimelineClient client = appToClientMap.get(appId); + TimelineV2Client client = appToClientMap.get(appId); if (client != null) { client.setTimelineServiceAddress(collectorAddr); } } - private TimelineClient getTimelineClient(ApplicationId appId) { + private TimelineV2Client getTimelineClient(ApplicationId appId) { return appToClientMap.get(appId); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/42b69405/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java index ae9397a..e116122 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java @@ -34,7 +34,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; -import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl; +import org.apache.hadoop.yarn.client.api.impl.TimelineV2ClientImpl; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; @@ -50,7 +50,7 @@ public class TestNMTimelinePublisher { public void testContainerResourceUsage() { Context context = mock(Context.class); @SuppressWarnings("unchecked") - final DummyTimelineClient timelineClient = new DummyTimelineClient(); + final DummyTimelineClient timelineClient = new DummyTimelineClient(null); when(context.getNodeId()).thenReturn(NodeId.newInstance("localhost", 0)); when(context.getHttpPort()).thenReturn(0); NMTimelinePublisher publisher = new NMTimelinePublisher(context) { @@ -137,7 +137,11 @@ public class TestNMTimelinePublisher { } } - protected static class DummyTimelineClient extends TimelineClientImpl { + protected static class DummyTimelineClient extends TimelineV2ClientImpl { + public DummyTimelineClient(ApplicationId appId) { + super(appId); + } + private TimelineEntity[] lastPublishedEntities; @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/42b69405/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java index 3ec222f..07058f6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java @@ -43,7 +43,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.QueueEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.api.records.timelineservice.UserEntity; -import org.apache.hadoop.yarn.client.api.TimelineClient; +import org.apache.hadoop.yarn.client.api.TimelineV2Client; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol; @@ -96,8 +96,8 @@ public class TestTimelineServiceClientIntegration { @Test public void testPutEntities() throws Exception { - TimelineClient client = - TimelineClient.createTimelineClient(ApplicationId.newInstance(0, 1)); + TimelineV2Client client = + TimelineV2Client.createTimelineClient(ApplicationId.newInstance(0, 1)); try { // set the timeline service address manually client.setTimelineServiceAddress( @@ -123,8 +123,8 @@ public class TestTimelineServiceClientIntegration { @Test public void testPutExtendedEntities() throws Exception { ApplicationId appId = ApplicationId.newInstance(0, 1); - TimelineClient client = - TimelineClient.createTimelineClient(appId); + TimelineV2Client client = + TimelineV2Client.createTimelineClient(appId); try { // set the timeline service address manually client.setTimelineServiceAddress( --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org