diff --git 
new file mode 100644
index 0000000..b5b5f77
--- /dev/null
@@ -0,0 +1,440 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.client.api.impl;
+import java.lang.reflect.UndeclaredThrowableException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientHandlerException;
+import com.sun.jersey.api.client.ClientRequest;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.config.ClientConfig;
+import com.sun.jersey.api.client.config.DefaultClientConfig;
+import com.sun.jersey.api.client.filter.ClientFilter;
+import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
+import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
+ * Utility Connector class which is used by timeline clients to securely get
+ * connected to the timeline server.
+ *
+ */
+public class TimelineConnector extends AbstractService {
+  private static final Joiner JOINER = Joiner.on("");
+  private static final Log LOG = LogFactory.getLog(TimelineConnector.class);
+  public final static int DEFAULT_SOCKET_TIMEOUT = 1 * 60 * 1000; // 1 minute
+  private SSLFactory sslFactory;
+  private Client client;
+  private ConnectionConfigurator connConfigurator;
+  private DelegationTokenAuthenticator authenticator;
+  private DelegationTokenAuthenticatedURL.Token token;
+  private UserGroupInformation authUgi;
+  private String doAsUser;
+  @VisibleForTesting
+  TimelineClientConnectionRetry connectionRetry;
+  private boolean requireConnectionRetry;
+  public TimelineConnector(boolean requireConnectionRetry,
+      UserGroupInformation authUgi, String doAsUser,
+      DelegationTokenAuthenticatedURL.Token token) {
+    super("TimelineConnector");
+    this.requireConnectionRetry = requireConnectionRetry;
+    this.authUgi = authUgi;
+    this.doAsUser = doAsUser;
+    this.token = token;
+  }
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    super.serviceInit(conf);
+    ClientConfig cc = new DefaultClientConfig();
+    cc.getClasses().add(YarnJacksonJaxbJsonProvider.class);
+    sslFactory = getSSLFactory(conf);
+    connConfigurator = getConnConfigurator(sslFactory);
+    if (UserGroupInformation.isSecurityEnabled()) {
+      authenticator = new KerberosDelegationTokenAuthenticator();
+    } else {
+      authenticator = new PseudoDelegationTokenAuthenticator();
+    }
+    authenticator.setConnectionConfigurator(connConfigurator);
+    connectionRetry = new TimelineClientConnectionRetry(conf);
+    client =
+        new Client(
+            new URLConnectionClientHandler(new TimelineURLConnectionFactory(
+                authUgi, authenticator, connConfigurator, token, doAsUser)),
+            cc);
+    if (requireConnectionRetry) {
+      TimelineJerseyRetryFilter retryFilter =
+          new TimelineJerseyRetryFilter(connectionRetry);
+      client.addFilter(retryFilter);
+    }
+  }
+  private static final ConnectionConfigurator DEFAULT_TIMEOUT_CONN_CONFIGURATOR
+    = new ConnectionConfigurator() {
+        @Override
+        public HttpURLConnection configure(HttpURLConnection conn)
+            throws IOException {
+          setTimeouts(conn, DEFAULT_SOCKET_TIMEOUT);
+          return conn;
+        }
+      };
+  private ConnectionConfigurator getConnConfigurator(SSLFactory sslFactoryObj) 
+    try {
+      return initSslConnConfigurator(DEFAULT_SOCKET_TIMEOUT, sslFactoryObj);
+    } catch (Exception e) {
+      LOG.debug("Cannot load customized ssl related configuration. "
+          + "Fallback to system-generic settings.", e);
+    }
+  }
+  private static ConnectionConfigurator initSslConnConfigurator(
+      final int timeout, SSLFactory sslFactory)
+      throws IOException, GeneralSecurityException {
+    final SSLSocketFactory sf;
+    final HostnameVerifier hv;
+    sf = sslFactory.createSSLSocketFactory();
+    hv = sslFactory.getHostnameVerifier();
+    return new ConnectionConfigurator() {
+      @Override
+      public HttpURLConnection configure(HttpURLConnection conn)
+          throws IOException {
+        if (conn instanceof HttpsURLConnection) {
+          HttpsURLConnection c = (HttpsURLConnection) conn;
+          c.setSSLSocketFactory(sf);
+          c.setHostnameVerifier(hv);
+        }
+        setTimeouts(conn, timeout);
+        return conn;
+      }
+    };
+  }
+  protected SSLFactory getSSLFactory(Configuration conf)
+      throws GeneralSecurityException, IOException {
+    SSLFactory newSSLFactory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
+    newSSLFactory.init();
+    return newSSLFactory;
+  }
+  private static void setTimeouts(URLConnection connection, int socketTimeout) 
+    connection.setConnectTimeout(socketTimeout);
+    connection.setReadTimeout(socketTimeout);
+  }
+  public static URI constructResURI(Configuration conf, String address,
+      String uri) {
+    return URI.create(
+        JOINER.join(YarnConfiguration.useHttps(conf) ? "https://"; : "http://";,
+            address, uri));
+  }
+  DelegationTokenAuthenticatedURL getDelegationTokenAuthenticatedURL() {
+    return new DelegationTokenAuthenticatedURL(authenticator, 
+  }
+  protected void serviceStop() {
+    if (this.sslFactory != null) {
+      this.sslFactory.destroy();
+    }
+  }
+  public Client getClient() {
+    return client;
+  }
+  public Object operateDelegationToken(
+      final PrivilegedExceptionAction<?> action)
+      throws IOException, YarnException {
+    // Set up the retry operation
+    TimelineClientRetryOp tokenRetryOp =
+        createRetryOpForOperateDelegationToken(action);
+    return connectionRetry.retryOn(tokenRetryOp);
+  }
+  @Private
+  @VisibleForTesting
+  TimelineClientRetryOp createRetryOpForOperateDelegationToken(
+      final PrivilegedExceptionAction<?> action) throws IOException {
+    return new TimelineClientRetryOpForOperateDelegationToken(this.authUgi,
+        action);
+  }
+  /**
+   * Abstract class for an operation that should be retried by timeline client.
+   */
+  @Private
+  @VisibleForTesting
+  public static abstract class TimelineClientRetryOp {
+    // The operation that should be retried
+    public abstract Object run() throws IOException;
+    // The method to indicate if we should retry given the incoming exception
+    public abstract boolean shouldRetryOn(Exception e);
+  }
+  private static class TimelineURLConnectionFactory
+      implements HttpURLConnectionFactory {
+    private DelegationTokenAuthenticator authenticator;
+    private UserGroupInformation authUgi;
+    private ConnectionConfigurator connConfigurator;
+    private Token token;
+    private String doAsUser;
+    public TimelineURLConnectionFactory(UserGroupInformation authUgi,
+        DelegationTokenAuthenticator authenticator,
+        ConnectionConfigurator connConfigurator,
+        DelegationTokenAuthenticatedURL.Token token, String doAsUser) {
+      this.authUgi = authUgi;
+      this.authenticator = authenticator;
+      this.connConfigurator = connConfigurator;
+      this.token = token;
+      this.doAsUser = doAsUser;
+    }
+    @Override
+    public HttpURLConnection getHttpURLConnection(final URL url)
+        throws IOException {
+      authUgi.checkTGTAndReloginFromKeytab();
+      try {
+        return new DelegationTokenAuthenticatedURL(authenticator,
+            connConfigurator).openConnection(url, token, doAsUser);
+      } catch (UndeclaredThrowableException e) {
+        throw new IOException(e.getCause());
+      } catch (AuthenticationException ae) {
+        throw new IOException(ae);
+      }
+    }
+  }
+  // Class to handle retry
+  // Outside this class, only visible to tests
+  @Private
+  @VisibleForTesting
+  static class TimelineClientConnectionRetry {
+    // maxRetries < 0 means keep trying
+    @Private
+    @VisibleForTesting
+    public int maxRetries;
+    @Private
+    @VisibleForTesting
+    public long retryInterval;
+    // Indicates if retries happened last time. Only tests should read it.
+    // In unit tests, retryOn() calls should _not_ be concurrent.
+    private boolean retried = false;
+    @Private
+    @VisibleForTesting
+    boolean getRetired() {
+      return retried;
+    }
+    // Constructor with default retry settings
+    public TimelineClientConnectionRetry(Configuration conf) {
+      Preconditions.checkArgument(
+          conf.getInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
+              >= -1,
+          "%s property value should be greater than or equal to -1",
+          YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES);
+      Preconditions.checkArgument(
+          conf.getLong(
+              YarnConfiguration.
+          "%s property value should be greater than zero",
+      maxRetries =
+          conf.getInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
+      retryInterval = conf.getLong(
+    }
+    public Object retryOn(TimelineClientRetryOp op)
+        throws RuntimeException, IOException {
+      int leftRetries = maxRetries;
+      retried = false;
+      // keep trying
+      while (true) {
+        try {
+          // try perform the op, if fail, keep retrying
+          return;
+        } catch (IOException | RuntimeException e) {
+          // break if there's no retries left
+          if (leftRetries == 0) {
+            break;
+          }
+          if (op.shouldRetryOn(e)) {
+            logException(e, leftRetries);
+          } else {
+            throw e;
+          }
+        }
+        if (leftRetries > 0) {
+          leftRetries--;
+        }
+        retried = true;
+        try {
+          // sleep for the given time interval
+          Thread.sleep(retryInterval);
+        } catch (InterruptedException ie) {
+          LOG.warn("Client retry sleep interrupted! ");
+        }
+      }
+      throw new RuntimeException("Failed to connect to timeline server. "
+          + "Connection retries limit exceeded. "
+          + "The posted timeline event may be missing");
+    };
+    private void logException(Exception e, int leftRetries) {
+      if (leftRetries > 0) {
+            "Exception caught by TimelineClientConnectionRetry," + " will try "
+                + leftRetries + " more time(s).\nMessage: " + e.getMessage());
+      } else {
+        // note that maxRetries may be -1 at the very beginning
+"ConnectionException caught by TimelineClientConnectionRetry,"
+            + " will keep retrying.\nMessage: " + e.getMessage());
+      }
+    }
+  }
+  private static class TimelineJerseyRetryFilter extends ClientFilter {
+    private TimelineClientConnectionRetry connectionRetry;
+    public TimelineJerseyRetryFilter(
+        TimelineClientConnectionRetry connectionRetry) {
+      this.connectionRetry = connectionRetry;
+    }
+    @Override
+    public ClientResponse handle(final ClientRequest cr)
+        throws ClientHandlerException {
+      // Set up the retry operation
+      TimelineClientRetryOp jerseyRetryOp = new TimelineClientRetryOp() {
+        @Override
+        public Object run() {
+          // Try pass the request, if fail, keep retrying
+          return getNext().handle(cr);
+        }
+        @Override
+        public boolean shouldRetryOn(Exception e) {
+          // Only retry on connection exceptions
+          return (e instanceof ClientHandlerException)
+              && (e.getCause() instanceof ConnectException
+                  || e.getCause() instanceof SocketTimeoutException);
+        }
+      };
+      try {
+        return (ClientResponse) connectionRetry.retryOn(jerseyRetryOp);
+      } catch (IOException e) {
+        throw new ClientHandlerException(
+            "Jersey retry failed!\nMessage: " + e.getMessage());
+      }
+    }
+  }
+  @Private
+  @VisibleForTesting
+  public static class TimelineClientRetryOpForOperateDelegationToken
+      extends TimelineClientRetryOp {
+    private final UserGroupInformation authUgi;
+    private final PrivilegedExceptionAction<?> action;
+    public TimelineClientRetryOpForOperateDelegationToken(
+        UserGroupInformation authUgi, PrivilegedExceptionAction<?> action) {
+      this.authUgi = authUgi;
+      this.action = action;
+    }
+    @Override
+    public Object run() throws IOException {
+      // Try pass the request, if fail, keep retrying
+      authUgi.checkTGTAndReloginFromKeytab();
+      try {
+        return authUgi.doAs(action);
+      } catch (UndeclaredThrowableException e) {
+        throw new IOException(e.getCause());
+      } catch (InterruptedException e) {
+        throw new IOException(e);
+      }
+    }
+    @Override
+    public boolean shouldRetryOn(Exception e) {
+      // retry on connection exceptions
+      // and SocketTimeoutException
+      return (e instanceof ConnectException
+          || e instanceof SocketTimeoutException);
+    }
+  }
diff --git 
new file mode 100644
index 0000000..848e238
--- /dev/null
@@ -0,0 +1,459 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.client.api.impl;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.client.api.TimelineV2Client;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.core.util.MultivaluedMapImpl;
+ * Implementation of timeline v2 client interface.
+ *
+ */
+public class TimelineV2ClientImpl extends TimelineV2Client {
+  private static final Log LOG = LogFactory.getLog(TimelineV2ClientImpl.class);
+  private static final String RESOURCE_URI_STR_V2 = "/ws/v2/timeline/";
+  private TimelineEntityDispatcher entityDispatcher;
+  private volatile String timelineServiceAddress;
+  // Retry parameters for identifying new timeline service
+  // TODO consider to merge with connection retry
+  private int maxServiceRetries;
+  private long serviceRetryInterval;
+  private TimelineConnector connector;
+  private ApplicationId contextAppId;
+  public TimelineV2ClientImpl(ApplicationId appId) {
+    super(TimelineV2ClientImpl.class.getName());
+    this.contextAppId = appId;
+  }
+  public ApplicationId getContextAppId() {
+    return contextAppId;
+  }
+  protected void serviceInit(Configuration conf) throws Exception {
+    if (!YarnConfiguration.timelineServiceEnabled(conf)
+        || (int) YarnConfiguration.getTimelineServiceVersion(conf) != 2) {
+      throw new IOException("Timeline V2 client is not properly configured. "
+          + "Either timeline service is not enabled or version is not set to"
+          + " 2");
+    }
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    UserGroupInformation realUgi = ugi.getRealUser();
+    String doAsUser = null;
+    UserGroupInformation authUgi = null;
+    if (realUgi != null) {
+      authUgi = realUgi;
+      doAsUser = ugi.getShortUserName();
+    } else {
+      authUgi = ugi;
+      doAsUser = null;
+    }
+    // TODO need to add/cleanup filter retry later for ATSV2. similar to V1
+    DelegationTokenAuthenticatedURL.Token token =
+        new DelegationTokenAuthenticatedURL.Token();
+    connector = new TimelineConnector(false, authUgi, doAsUser, token);
+    addIfService(connector);
+    // new version need to auto discovery (with retry till ATS v2 address is
+    // got).
+    maxServiceRetries =
+        conf.getInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
+    serviceRetryInterval = conf.getLong(
+    entityDispatcher = new TimelineEntityDispatcher(conf);
+    super.serviceInit(conf);
+  }
+  @Override
+  protected void serviceStart() throws Exception {
+    super.serviceStart();
+    entityDispatcher.start();
+  }
+  @Override
+  protected void serviceStop() throws Exception {
+    entityDispatcher.stop();
+    super.serviceStop();
+  }
+  @Override
+  public void putEntities(TimelineEntity... entities)
+      throws IOException, YarnException {
+    entityDispatcher.dispatchEntities(true, entities);
+  }
+  @Override
+  public void putEntitiesAsync(TimelineEntity... entities)
+      throws IOException, YarnException {
+    entityDispatcher.dispatchEntities(false, entities);
+  }
+  @Override
+  public void setTimelineServiceAddress(String address) {
+    this.timelineServiceAddress = address;
+  }
+  @Private
+  protected void putObjects(String path, MultivaluedMap<String, String> params,
+      Object obj) throws IOException, YarnException {
+    int retries = verifyRestEndPointAvailable();
+    // timelineServiceAddress could be stale, add retry logic here.
+    boolean needRetry = true;
+    while (needRetry) {
+      try {
+        URI uri = TimelineConnector.constructResURI(getConfig(),
+            timelineServiceAddress, RESOURCE_URI_STR_V2);
+        putObjects(uri, path, params, obj);
+        needRetry = false;
+      } catch (IOException e) {
+        // handle exception for timelineServiceAddress being updated.
+        checkRetryWithSleep(retries, e);
+        retries--;
+      }
+    }
+  }
+  /**
+   * Check if reaching to maximum of retries.
+   *
+   * @param retries
+   * @param e
+   */
+  private void checkRetryWithSleep(int retries, IOException e)
+      throws YarnException, IOException {
+    if (retries > 0) {
+      try {
+        Thread.sleep(this.serviceRetryInterval);
+      } catch (InterruptedException ex) {
+        Thread.currentThread().interrupt();
+        throw new YarnException("Interrupted while retrying to connect to 
+      }
+    } else {
+      StringBuilder msg =
+          new StringBuilder("TimelineClient has reached to max retry times : 
+      msg.append(this.maxServiceRetries);
+      msg.append(" for service address: ");
+      msg.append(timelineServiceAddress);
+      LOG.error(msg.toString());
+      throw new IOException(msg.toString(), e);
+    }
+  }
+  protected void putObjects(URI base, String path,
+      MultivaluedMap<String, String> params, Object obj)
+      throws IOException, YarnException {
+    ClientResponse resp;
+    try {
+      resp = 
+          .accept(MediaType.APPLICATION_JSON).type(MediaType.APPLICATION_JSON)
+          .put(ClientResponse.class, obj);
+    } catch (RuntimeException re) {
+      // runtime exception is expected if the client cannot connect the server
+      String msg = "Failed to get the response from the timeline server.";
+      LOG.error(msg, re);
+      throw new IOException(re);
+    }
+    if (resp == null ||
+        resp.getClientResponseStatus() != ClientResponse.Status.OK) {
+      String msg =
+          "Response from the timeline server is " + ((resp == null) ? "null"
+              : "not successful," + " HTTP error code: " + resp.getStatus()
+                  + ", Server response:\n" + resp.getEntity(String.class));
+      LOG.error(msg);
+      throw new YarnException(msg);
+    }
+  }
+  private int verifyRestEndPointAvailable() throws YarnException {
+    // timelineServiceAddress could haven't be initialized yet
+    // or stale (only for new timeline service)
+    int retries = pollTimelineServiceAddress(this.maxServiceRetries);
+    if (timelineServiceAddress == null) {
+      String errMessage = "TimelineClient has reached to max retry times : "
+          + this.maxServiceRetries
+          + ", but failed to fetch timeline service address. Please verify"
+          + " Timeline Auxiliary Service is configured in all the NMs";
+      LOG.error(errMessage);
+      throw new YarnException(errMessage);
+    }
+    return retries;
+  }
+  /**
+   * Poll TimelineServiceAddress for maximum of retries times if it is null.
+   *
+   * @param retries
+   * @return the left retry times
+   * @throws IOException
+   */
+  private int pollTimelineServiceAddress(int retries) throws YarnException {
+    while (timelineServiceAddress == null && retries > 0) {
+      try {
+        Thread.sleep(this.serviceRetryInterval);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new YarnException("Interrupted while trying to connect ATS");
+      }
+      retries--;
+    }
+    return retries;
+  }
+  private final class EntitiesHolder extends FutureTask<Void> {
+    private final TimelineEntities entities;
+    private final boolean isSync;
+    EntitiesHolder(final TimelineEntities entities, final boolean isSync) {
+      super(new Callable<Void>() {
+        // publishEntities()
+        public Void call() throws Exception {
+          MultivaluedMap<String, String> params = new MultivaluedMapImpl();
+          params.add("appid", getContextAppId().toString());
+          params.add("async", Boolean.toString(!isSync));
+          putObjects("entities", params, entities);
+          return null;
+        }
+      });
+      this.entities = entities;
+      this.isSync = isSync;
+    }
+    public boolean isSync() {
+      return isSync;
+    }
+    public TimelineEntities getEntities() {
+      return entities;
+    }
+  }
+  /**
+   * This class is responsible for collecting the timeline entities and
+   * publishing them in async.
+   */
+  private class TimelineEntityDispatcher {
+    /**
+     * Time period for which the timelineclient will wait for draining after
+     * stop.
+     */
+    private static final long DRAIN_TIME_PERIOD = 2000L;
+    private int numberOfAsyncsToMerge;
+    private final BlockingQueue<EntitiesHolder> timelineEntityQueue;
+    private ExecutorService executor;
+    TimelineEntityDispatcher(Configuration conf) {
+      timelineEntityQueue = new LinkedBlockingQueue<EntitiesHolder>();
+      numberOfAsyncsToMerge =
+          conf.getInt(YarnConfiguration.NUMBER_OF_ASYNC_ENTITIES_TO_MERGE,
+              YarnConfiguration.DEFAULT_NUMBER_OF_ASYNC_ENTITIES_TO_MERGE);
+    }
+    Runnable createRunnable() {
+      return new Runnable() {
+        @Override
+        public void run() {
+          try {
+            EntitiesHolder entitiesHolder;
+            while (!Thread.currentThread().isInterrupted()) {
+              // Merge all the async calls and make one push, but if its sync
+              // call push immediately
+              try {
+                entitiesHolder = timelineEntityQueue.take();
+              } catch (InterruptedException ie) {
+      "Timeline dispatcher thread was interrupted ");
+                Thread.currentThread().interrupt();
+                return;
+              }
+              if (entitiesHolder != null) {
+                publishWithoutBlockingOnQueue(entitiesHolder);
+              }
+            }
+          } finally {
+            if (!timelineEntityQueue.isEmpty()) {
+    "Yet to publish " + timelineEntityQueue.size()
+                  + " timelineEntities, draining them now. ");
+            }
+            // Try to drain the remaining entities to be published @ the max 
+            // 2 seconds
+            long timeTillweDrain =
+                System.currentTimeMillis() + DRAIN_TIME_PERIOD;
+            while (!timelineEntityQueue.isEmpty()) {
+              publishWithoutBlockingOnQueue(timelineEntityQueue.poll());
+              if (System.currentTimeMillis() > timeTillweDrain) {
+                // time elapsed stop publishing further....
+                if (!timelineEntityQueue.isEmpty()) {
+                  LOG.warn("Time to drain elapsed! Remaining "
+                      + timelineEntityQueue.size() + "timelineEntities will 
+                      + " be published");
+                  // if some entities were not drained then we need interrupt
+                  // the threads which had put sync EntityHolders to the queue.
+                  EntitiesHolder nextEntityInTheQueue = null;
+                  while ((nextEntityInTheQueue =
+                      timelineEntityQueue.poll()) != null) {
+                    nextEntityInTheQueue.cancel(true);
+                  }
+                }
+                break;
+              }
+            }
+          }
+        }
+        /**
+         * Publishes the given EntitiesHolder and return immediately if sync
+         * call, else tries to fetch the EntitiesHolder from the queue in non
+         * blocking fashion and collate the Entities if possible before
+         * publishing through REST.
+         *
+         * @param entitiesHolder
+         */
+        private void publishWithoutBlockingOnQueue(
+            EntitiesHolder entitiesHolder) {
+          if (entitiesHolder.isSync()) {
+  ;
+            return;
+          }
+          int count = 1;
+          while (true) {
+            // loop till we find a sync put Entities or there is nothing
+            // to take
+            EntitiesHolder nextEntityInTheQueue = timelineEntityQueue.poll();
+            if (nextEntityInTheQueue == null) {
+              // Nothing in the queue just publish and get back to the
+              // blocked wait state
+    ;
+              break;
+            } else if (nextEntityInTheQueue.isSync()) {
+              // flush all the prev async entities first
+    ;
+              // and then flush the sync entity
+    ;
+              break;
+            } else {
+              // append all async entities together and then flush
+              entitiesHolder.getEntities().addEntities(
+                  nextEntityInTheQueue.getEntities().getEntities());
+              count++;
+              if (count == numberOfAsyncsToMerge) {
+                // Flush the entities if the number of the async
+                // putEntites merged reaches the desired limit. To avoid
+                // collecting multiple entities and delaying for a long
+                // time.
+      ;
+                break;
+              }
+            }
+          }
+        }
+      };
+    }
+    public void dispatchEntities(boolean sync,
+        TimelineEntity[] entitiesTobePublished) throws YarnException {
+      if (executor.isShutdown()) {
+        throw new YarnException("Timeline client is in the process of 
+            + " not accepting any more TimelineEntities");
+      }
+      // wrap all TimelineEntity into TimelineEntities object
+      TimelineEntities entities = new TimelineEntities();
+      for (TimelineEntity entity : entitiesTobePublished) {
+        entities.addEntity(entity);
+      }
+      // created a holder and place it in queue
+      EntitiesHolder entitiesHolder = new EntitiesHolder(entities, sync);
+      try {
+        timelineEntityQueue.put(entitiesHolder);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new YarnException(
+            "Failed while adding entity to the queue for publishing", e);
+      }
+      if (sync) {
+        // In sync call we need to wait till its published and if any error 
+        // throw it back
+        try {
+          entitiesHolder.get();
+        } catch (ExecutionException e) {
+          throw new YarnException("Failed while publishing entity",
+              e.getCause());
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new YarnException("Interrupted while publishing entity", e);
+        }
+      }
+    }
+    public void start() {
+      executor = Executors.newSingleThreadExecutor();
+      executor.execute(createRunnable());
+    }
+    public void stop() {
+"Stopping TimelineClient.");
+      executor.shutdownNow();
+      try {
+        executor.awaitTermination(DRAIN_TIME_PERIOD, TimeUnit.MILLISECONDS);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        e.printStackTrace();
+      }
+    }
+  }
diff --git 
index a1d4449..a44a8ae 100644
@@ -215,11 +215,11 @@ public class TestTimelineClient {
           + "Timeline server should be off to run this test. ");
     } catch (RuntimeException ce) {
-        "Handler exception for reason other than retry: " + ce.getMessage(),
-        ce.getMessage().contains("Connection retries limit exceeded"));
+          "Handler exception for reason other than retry: " + ce.getMessage(),
+          ce.getMessage().contains("Connection retries limit exceeded"));
       // we would expect this exception here, check if the client has retried
-      Assert.assertTrue("Retry filter didn't perform any retries! ", client
-        .connectionRetry.getRetired());
+      Assert.assertTrue("Retry filter didn't perform any retries! ",
+          client.connector.connectionRetry.getRetired());
@@ -318,7 +318,7 @@ public class TestTimelineClient {
             .getMessage().contains("Connection retries limit exceeded"));
     // we would expect this exception here, check if the client has retried
     Assert.assertTrue("Retry filter didn't perform any retries! ",
-        client.connectionRetry.getRetired());
+        client.connector.connectionRetry.getRetired());
   public static ClientResponse mockEntityClientResponse(
@@ -419,17 +419,26 @@ public class TestTimelineClient {
   private TimelineClientImpl createTimelineClientFakeTimelineClientRetryOp(
       YarnConfiguration conf) {
     TimelineClientImpl client = new TimelineClientImpl() {
-      public TimelineClientRetryOp
-          createTimelineClientRetryOpForOperateDelegationToken(
-              final PrivilegedExceptionAction<?> action) throws IOException {
-        TimelineClientRetryOpForOperateDelegationToken op =
-            spy(new TimelineClientRetryOpForOperateDelegationToken(
-            UserGroupInformation.getCurrentUser(), action));
-        doThrow(new SocketTimeoutException("Test socketTimeoutException"))
-            .when(op).run();
-        return op;
+      protected TimelineConnector createTimelineConnector() {
+        TimelineConnector connector =
+            new TimelineConnector(true, authUgi, doAsUser, token) {
+              @Override
+              public TimelineClientRetryOp
+                createRetryOpForOperateDelegationToken(
+                  final PrivilegedExceptionAction<?> action)
+                  throws IOException {
+                TimelineClientRetryOpForOperateDelegationToken op =
+                    spy(new TimelineClientRetryOpForOperateDelegationToken(
+                        UserGroupInformation.getCurrentUser(), action));
+                doThrow(
+                    new SocketTimeoutException("Test socketTimeoutException"))
+                        .when(op).run();
+                return op;
+              }
+            };
+        addIfService(connector);
+        return connector;
diff --git 
index 5813340..c5b02fd 100644
@@ -50,7 +50,7 @@ public class TestTimelineClientV2Impl {
   public void setup() {
     conf = new YarnConfiguration();
     conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
-    conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.0f);
+    conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
     conf.setInt(YarnConfiguration.NUMBER_OF_ASYNC_ENTITIES_TO_MERGE, 3);
     if (!currTestName.getMethodName()
         .contains("testRetryOnConnectionFailure")) {
@@ -71,7 +71,7 @@ public class TestTimelineClientV2Impl {
   private class TestV2TimelineClientForExceptionHandling
-      extends TimelineClientImpl {
+      extends TimelineV2ClientImpl {
     public TestV2TimelineClientForExceptionHandling(ApplicationId id) {
diff --git 
index 45b9213..851ba53 100644
@@ -40,7 +40,7 @@ import 
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
-import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.client.api.TimelineV2Client;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
@@ -77,7 +77,7 @@ public class NMTimelinePublisher extends CompositeService {
   private String httpAddress;
-  private final Map<ApplicationId, TimelineClient> appToClientMap;
+  private final Map<ApplicationId, TimelineV2Client> appToClientMap;
   public NMTimelinePublisher(Context context) {
@@ -103,7 +103,7 @@ public class NMTimelinePublisher extends CompositeService {
-  Map<ApplicationId, TimelineClient> getAppToClientMap() {
+  Map<ApplicationId, TimelineV2Client> getAppToClientMap() {
     return appToClientMap;
@@ -148,7 +148,7 @@ public class NMTimelinePublisher extends CompositeService {
       try {
         // no need to put it as part of publisher as timeline client already 
         // Queuing concept
-        TimelineClient timelineClient = getTimelineClient(appId);
+        TimelineV2Client timelineClient = getTimelineClient(appId);
         if (timelineClient != null) {
         } else {
@@ -242,7 +242,7 @@ public class NMTimelinePublisher extends CompositeService {
     try {
       // no need to put it as part of publisher as timeline client already has
       // Queuing concept
-      TimelineClient timelineClient = getTimelineClient(appId);
+      TimelineV2Client timelineClient = getTimelineClient(appId);
       if (timelineClient != null) {
       } else {
@@ -273,7 +273,7 @@ public class NMTimelinePublisher extends CompositeService {
         LOG.debug("Publishing the entity " + entity + ", JSON-style content: "
             + TimelineUtils.dumpTimelineRecordtoJSON(entity));
-      TimelineClient timelineClient = getTimelineClient(appId);
+      TimelineV2Client timelineClient = getTimelineClient(appId);
       if (timelineClient != null) {
       } else {
@@ -390,8 +390,8 @@ public class NMTimelinePublisher extends CompositeService {
   public void createTimelineClient(ApplicationId appId) {
     if (!appToClientMap.containsKey(appId)) {
-      TimelineClient timelineClient =
-          TimelineClient.createTimelineClient(appId);
+      TimelineV2Client timelineClient =
+          TimelineV2Client.createTimelineClient(appId);
       appToClientMap.put(appId, timelineClient);
@@ -399,7 +399,7 @@ public class NMTimelinePublisher extends CompositeService {
   public void stopTimelineClient(ApplicationId appId) {
-    TimelineClient client = appToClientMap.remove(appId);
+    TimelineV2Client client = appToClientMap.remove(appId);
     if (client != null) {
@@ -407,13 +407,13 @@ public class NMTimelinePublisher extends CompositeService 
   public void setTimelineServiceAddress(ApplicationId appId,
       String collectorAddr) {
-    TimelineClient client = appToClientMap.get(appId);
+    TimelineV2Client client = appToClientMap.get(appId);
     if (client != null) {
-  private TimelineClient getTimelineClient(ApplicationId appId) {
+  private TimelineV2Client getTimelineClient(ApplicationId appId) {
     return appToClientMap.get(appId);
diff --git 
index ae9397a..e116122 100644
@@ -34,7 +34,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
-import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
+import org.apache.hadoop.yarn.client.api.impl.TimelineV2ClientImpl;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
@@ -50,7 +50,7 @@ public class TestNMTimelinePublisher {
   public void testContainerResourceUsage() {
     Context context = mock(Context.class);
-    final DummyTimelineClient timelineClient = new DummyTimelineClient();
+    final DummyTimelineClient timelineClient = new DummyTimelineClient(null);
     when(context.getNodeId()).thenReturn(NodeId.newInstance("localhost", 0));
     NMTimelinePublisher publisher = new NMTimelinePublisher(context) {
@@ -137,7 +137,11 @@ public class TestNMTimelinePublisher {
-  protected static class DummyTimelineClient extends TimelineClientImpl {
+  protected static class DummyTimelineClient extends TimelineV2ClientImpl {
+    public DummyTimelineClient(ApplicationId appId) {
+      super(appId);
+    }
     private TimelineEntity[] lastPublishedEntities;
diff --git 
index 3ec222f..07058f6 100644
@@ -43,7 +43,7 @@ import 
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 import org.apache.hadoop.yarn.api.records.timelineservice.UserEntity;
-import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.client.api.TimelineV2Client;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
@@ -96,8 +96,8 @@ public class TestTimelineServiceClientIntegration {
   public void testPutEntities() throws Exception {
-    TimelineClient client =
-        TimelineClient.createTimelineClient(ApplicationId.newInstance(0, 1));
+    TimelineV2Client client =
+        TimelineV2Client.createTimelineClient(ApplicationId.newInstance(0, 1));
     try {
       // set the timeline service address manually
@@ -123,8 +123,8 @@ public class TestTimelineServiceClientIntegration {
   public void testPutExtendedEntities() throws Exception {
     ApplicationId appId = ApplicationId.newInstance(0, 1);
-    TimelineClient client =
-        TimelineClient.createTimelineClient(appId);
+    TimelineV2Client client =
+        TimelineV2Client.createTimelineClient(appId);
     try {
       // set the timeline service address manually

To unsubscribe, e-mail:
For additional commands, e-mail:

Reply via email to