Repository: hive Updated Branches: refs/heads/master 8158f8848 -> 52f1b2471
HIVE-17838: Make org.apache.hive.spark.client.rpc logging HoS specific and other logging cleanup (Sahil Takiar, reviewed by Aihua Xu) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/52f1b247 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/52f1b247 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/52f1b247 Branch: refs/heads/master Commit: 52f1b2471545a797856e4b9b1ae0a36cb4233c18 Parents: 8158f88 Author: Sahil Takiar <takiar.sa...@gmail.com> Authored: Fri May 4 14:32:50 2018 -0700 Committer: Sahil Takiar <stak...@cloudera.com> Committed: Fri May 4 14:32:50 2018 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/ql/ErrorMsg.java | 8 +-- .../ql/exec/spark/HiveSparkClientFactory.java | 4 +- .../hadoop/hive/ql/exec/spark/SparkTask.java | 6 +-- .../ql/exec/spark/session/SparkSessionImpl.java | 28 ++++++----- .../spark/SetSparkReducerParallelism.java | 4 +- .../apache/hive/spark/client/BaseProtocol.java | 53 ++++++++++++++++++++ .../apache/hive/spark/client/RemoteDriver.java | 39 ++++++++------ .../hive/spark/client/SparkClientFactory.java | 4 +- .../hive/spark/client/SparkClientImpl.java | 44 ++++++++++------ .../hive/spark/client/metrics/InputMetrics.java | 6 +++ .../hive/spark/client/metrics/Metrics.java | 18 +++++++ .../client/metrics/ShuffleReadMetrics.java | 9 ++++ .../client/metrics/ShuffleWriteMetrics.java | 7 +++ .../hive/spark/client/rpc/KryoMessageCodec.java | 4 +- .../org/apache/hive/spark/client/rpc/Rpc.java | 23 ++++++--- .../hive/spark/client/rpc/RpcConfiguration.java | 9 ++-- .../hive/spark/client/rpc/RpcDispatcher.java | 20 ++++---- .../apache/hive/spark/client/rpc/RpcServer.java | 18 ++++--- 18 files changed, 216 insertions(+), 88 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/52f1b247/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java index 99df967..8baf309 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java @@ -495,8 +495,8 @@ public enum ErrorMsg { FILE_NOT_FOUND(20012, "File not found: {0}", "64000", true), WRONG_FILE_FORMAT(20013, "Wrong file format. Please check the file's format.", "64000", true), - SPARK_CREATE_CLIENT_INVALID_QUEUE(20014, "Spark job is submitted to an invalid queue: {0}." - + " Please fix and try again.", true), + SPARK_CREATE_CLIENT_INVALID_QUEUE(20014, "Spark app for session {0} was submitted to an invalid" + + " queue: {1}. Please fix and try again.", true), SPARK_RUNTIME_OOM(20015, "Spark job failed because of out of memory."), // An exception from runtime that will show the full stack to client @@ -574,13 +574,13 @@ public enum ErrorMsg { SPARK_CREATE_CLIENT_TIMEOUT(30038, "Timed out while creating Spark client for session {0}.", true), SPARK_CREATE_CLIENT_QUEUE_FULL(30039, - "Failed to create Spark client because job queue is full: {0}.", true), + "Failed to create Spark client for session {0} because job queue is full: {1}.", true), SPARK_CREATE_CLIENT_INTERRUPTED(30040, "Interrupted while creating Spark client for session {0}", true), SPARK_CREATE_CLIENT_ERROR(30041, "Failed to create Spark client for Spark session {0}: {1}", true), SPARK_CREATE_CLIENT_INVALID_RESOURCE_REQUEST(30042, - "Failed to create Spark client due to invalid resource request: {0}", true), + "Failed to create Spark client for session {0} due to invalid resource request: {1}", true), SPARK_CREATE_CLIENT_CLOSED_SESSION(30043, "Cannot create Spark client on a closed session {0}", true), http://git-wip-us.apache.org/repos/asf/hive/blob/52f1b247/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java index 565c43b..5ed5d42 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java @@ -104,7 +104,7 @@ public class HiveSparkClientFactory { inputStream = HiveSparkClientFactory.class.getClassLoader() .getResourceAsStream(SPARK_DEFAULT_CONF_FILE); if (inputStream != null) { - LOG.info("loading spark properties from: " + SPARK_DEFAULT_CONF_FILE); + LOG.info("Loading Spark properties from: " + SPARK_DEFAULT_CONF_FILE); Properties properties = new Properties(); properties.load(new InputStreamReader(inputStream, CharsetNames.UTF_8)); for (String propertyName : properties.stringPropertyNames()) { @@ -118,7 +118,7 @@ public class HiveSparkClientFactory { } } } catch (IOException e) { - LOG.info("Failed to open spark configuration file: " + LOG.info("Failed to open Spark configuration file: " + SPARK_DEFAULT_CONF_FILE, e); } finally { if (inputStream != null) { http://git-wip-us.apache.org/repos/asf/hive/blob/52f1b247/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index bfa2da6..8038771 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -130,7 +130,7 @@ public class SparkTask extends Task<SparkWork> { if (driverContext.isShutdown()) { LOG.warn("Killing Spark job"); killJob(); - throw new HiveException("Operation is cancelled."); + throw new HiveException(String.format("Spark task %s cancelled for query %s", getId(), sparkWork.getQueryId())); } // Get the Job Handle id associated with the Spark job @@ -176,7 +176,7 @@ public class SparkTask extends Task<SparkWork> { ? "UNKNOWN" : jobID)); killJob(); } else if (rc == 4) { - LOG.info("The spark job or one stage of it has too many tasks" + + LOG.info("The Spark job or one stage of it has too many tasks" + ". Cancelling Spark job " + sparkJobID + " with application ID " + jobID ); killJob(); } @@ -186,7 +186,7 @@ public class SparkTask extends Task<SparkWork> { } sparkJobStatus.cleanup(); } catch (Exception e) { - String msg = "Failed to execute spark task, with exception '" + Utilities.getNameMessage(e) + "'"; + String msg = "Failed to execute Spark task " + getId() + ", with exception '" + Utilities.getNameMessage(e) + "'"; // Has to use full name to make sure it does not conflict with // org.apache.commons.lang.StringUtils http://git-wip-us.apache.org/repos/asf/hive/blob/52f1b247/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java index 189de19..c8cb1ac 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java @@ -79,7 +79,7 @@ public class SparkSessionImpl implements SparkSession { @Override public void open(HiveConf conf) throws HiveException { - LOG.info("Trying to open Spark session {}", sessionId); + LOG.info("Trying to open Hive on Spark session {}", sessionId); this.conf = conf; isOpen = true; try { @@ -94,12 +94,12 @@ public class SparkSessionImpl implements SparkSession { } throw he; } - LOG.info("Spark session {} is successfully opened", sessionId); + LOG.info("Hive on Spark session {} successfully opened", sessionId); } @Override public SparkJobRef submit(DriverContext driverContext, SparkWork sparkWork) throws Exception { - Preconditions.checkState(isOpen, "Session is not open. Can't submit jobs."); + Preconditions.checkState(isOpen, "Hive on Spark session is not open. Can't submit jobs."); return hiveSparkClient.execute(driverContext, sparkWork); } @@ -129,9 +129,9 @@ public class SparkSessionImpl implements SparkSession { totalCores = totalCores / sparkConf.getInt("spark.task.cpus", 1); long memoryPerTaskInBytes = totalMemory / totalCores; - LOG.info("Spark cluster current has executors: " + numExecutors + LOG.info("Hive on Spark application currently has number of executors: " + numExecutors + ", total cores: " + totalCores + ", memory per executor: " - + executorMemoryInMB + "M, memoryFraction: " + memoryFraction); + + executorMemoryInMB + " mb, memoryFraction: " + memoryFraction); return new ObjectPair<Long, Integer>(Long.valueOf(memoryPerTaskInBytes), Integer.valueOf(totalCores)); } @@ -153,15 +153,15 @@ public class SparkSessionImpl implements SparkSession { @Override public void close() { - LOG.info("Trying to close Spark session {}", sessionId); + LOG.info("Trying to close Hive on Spark session {}", sessionId); isOpen = false; if (hiveSparkClient != null) { try { hiveSparkClient.close(); - LOG.info("Spark session {} is successfully closed", sessionId); + LOG.info("Hive on Spark session {} successfully closed", sessionId); cleanScratchDir(); } catch (IOException e) { - LOG.error("Failed to close spark session (" + sessionId + ").", e); + LOG.error("Failed to close Hive on Spark session (" + sessionId + ")", e); } } hiveSparkClient = null; @@ -197,20 +197,22 @@ public class SparkSessionImpl implements SparkSession { StringBuilder matchedString = new StringBuilder(); while (e != null) { if (e instanceof TimeoutException) { - return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_TIMEOUT); + return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_TIMEOUT, sessionId); } else if (e instanceof InterruptedException) { return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_INTERRUPTED, sessionId); } else if (e instanceof RuntimeException) { String sts = Throwables.getStackTraceAsString(e); if (matches(sts, AM_TIMEOUT_ERR, matchedString)) { - return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_TIMEOUT); + return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_TIMEOUT, sessionId); } else if (matches(sts, UNKNOWN_QUEUE_ERR, matchedString) || matches(sts, STOPPED_QUEUE_ERR, matchedString)) { - return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_INVALID_QUEUE, matchedString.toString()); + return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_INVALID_QUEUE, sessionId, + matchedString.toString()); } else if (matches(sts, FULL_QUEUE_ERR, matchedString)) { - return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_QUEUE_FULL, matchedString.toString()); + return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_QUEUE_FULL, sessionId, + matchedString.toString()); } else if (matches(sts, INVALILD_MEM_ERR, matchedString) || matches(sts, INVALID_CORE_ERR, matchedString)) { return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_INVALID_RESOURCE_REQUEST, - matchedString.toString()); + sessionId, matchedString.toString()); } else { return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_ERROR, sessionId, Throwables.getRootCause(e).getMessage()); } http://git-wip-us.apache.org/repos/asf/hive/blob/52f1b247/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java index eecb103..ab87c79 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java @@ -264,9 +264,9 @@ public class SetSparkReducerParallelism implements NodeProcessor { context.getConf(), sparkSessionManager); sparkMemoryAndCores = sparkSession.getMemoryAndCores(); } catch (HiveException e) { - throw new SemanticException("Failed to get a spark session: " + e); + throw new SemanticException("Failed to get a Hive on Spark session", e); } catch (Exception e) { - LOG.warn("Failed to get spark memory/core info", e); + LOG.warn("Failed to get spark memory/core info, reducer parallelism may be inaccurate", e); } finally { if (sparkSession != null && sparkSessionManager != null) { try { http://git-wip-us.apache.org/repos/asf/hive/blob/52f1b247/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java b/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java index 6a988a4..558ed80 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java @@ -38,10 +38,20 @@ public abstract class BaseProtocol extends RpcDispatcher { this(null); } + @Override + public String toString() { + return "CancelJob{" + + "id='" + id + '\'' + + '}'; + } } protected static class EndSession implements Serializable { + @Override + public String toString() { + return "EndSession"; + } } protected static class Error implements Serializable { @@ -56,6 +66,12 @@ public abstract class BaseProtocol extends RpcDispatcher { this(null); } + @Override + public String toString() { + return "Error{" + + "cause='" + cause + '\'' + + '}'; + } } protected static class JobMetrics implements Serializable { @@ -78,6 +94,16 @@ public abstract class BaseProtocol extends RpcDispatcher { this(null, -1, -1, -1, null); } + @Override + public String toString() { + return "JobMetrics{" + + "jobId='" + jobId + '\'' + + ", sparkJobId=" + sparkJobId + + ", stageId=" + stageId + + ", taskId=" + taskId + + ", metrics=" + metrics + + '}'; + } } protected static class JobRequest<T extends Serializable> implements Serializable { @@ -94,6 +120,13 @@ public abstract class BaseProtocol extends RpcDispatcher { this(null, null); } + @Override + public String toString() { + return "JobRequest{" + + "id='" + id + '\'' + + ", job=" + job + + '}'; + } } public static class JobResult<T extends Serializable> implements Serializable { @@ -137,6 +170,12 @@ public abstract class BaseProtocol extends RpcDispatcher { this(null); } + @Override + public String toString() { + return "JobStarted{" + + "id='" + id + '\'' + + '}'; + } } /** @@ -154,6 +193,14 @@ public abstract class BaseProtocol extends RpcDispatcher { JobSubmitted() { this(null, -1); } + + @Override + public String toString() { + return "JobSubmitted{" + + "clientJobId='" + clientJobId + '\'' + + ", sparkJobId=" + sparkJobId + + '}'; + } } protected static class SyncJobRequest<T extends Serializable> implements Serializable { @@ -168,5 +215,11 @@ public abstract class BaseProtocol extends RpcDispatcher { this(null); } + @Override + public String toString() { + return "SyncJobRequest{" + + "job=" + job + + '}'; + } } } http://git-wip-us.apache.org/repos/asf/hive/blob/52f1b247/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java index 6e546d4..caa850c 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java @@ -25,8 +25,6 @@ import io.netty.channel.nio.NioEventLoopGroup; import java.io.File; import java.io.IOException; import java.io.Serializable; -import java.net.InetAddress; -import java.net.URI; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -118,17 +116,18 @@ public class RemoteDriver { // as these are non-spark specific configs used by the remote driver mapConf.put(val[0], val[1]); } else { - throw new IllegalArgumentException("Invalid command line: " + Joiner.on(" ").join(args)); + throw new IllegalArgumentException("Invalid command line arguments: " + + Joiner.on(" ").join(args)); } } executor = Executors.newCachedThreadPool(); - LOG.info("Connecting to: {}:{}", serverAddress, serverPort); + LOG.info("Connecting to HiveServer2 address: {}:{}", serverAddress, serverPort); for (Tuple2<String, String> e : conf.getAll()) { mapConf.put(e._1(), e._2()); - LOG.debug("Remote Driver configured with: " + e._1() + "=" + e._2()); + LOG.debug("Remote Spark Driver configured with: " + e._1() + "=" + e._2()); } String clientId = mapConf.get(SparkClientFactory.CONF_CLIENT_ID); @@ -140,7 +139,7 @@ public class RemoteDriver { this.egroup = new NioEventLoopGroup( threadCount, new ThreadFactoryBuilder() - .setNameFormat("Driver-RPC-Handler-%d") + .setNameFormat("Spark-Driver-RPC-Handler-%d") .setDaemon(true) .build()); this.protocol = new DriverProtocol(); @@ -153,9 +152,14 @@ public class RemoteDriver { this.clientRpc.addListener(new Rpc.Listener() { @Override public void rpcClosed(Rpc rpc) { - LOG.warn("Shutting down driver because RPC channel was closed."); + LOG.warn("Shutting down driver because Remote Spark Driver to HiveServer2 connection was closed."); shutdown(null); } + + @Override + public String toString() { + return "Shutting Down Remote Spark Driver to HiveServer2 Connection"; + } }); try { @@ -211,7 +215,7 @@ public class RemoteDriver { if (jc != null) { job.submit(); } else { - LOG.info("SparkContext not yet up, queueing job request."); + LOG.info("SparkContext not yet up; adding Hive on Spark job request to the queue."); jobQueue.add(job); } } @@ -220,9 +224,9 @@ public class RemoteDriver { private synchronized void shutdown(Throwable error) { if (running) { if (error == null) { - LOG.info("Shutting down remote driver."); + LOG.info("Shutting down Spark Remote Driver."); } else { - LOG.error("Shutting down remote driver due to error: " + error, error); + LOG.error("Shutting down Spark Remote Driver due to error: " + error, error); } running = false; for (JobWrapper<?> job : activeJobs.values()) { @@ -253,7 +257,7 @@ public class RemoteDriver { private String getArg(String[] args, int keyIdx) { int valIdx = keyIdx + 1; if (args.length <= valIdx) { - throw new IllegalArgumentException("Invalid command line: " + throw new IllegalArgumentException("Invalid command line arguments: " + Joiner.on(" ").join(args)); } return args[valIdx]; @@ -294,7 +298,7 @@ public class RemoteDriver { private void handle(ChannelHandlerContext ctx, CancelJob msg) { JobWrapper<?> job = activeJobs.get(msg.id); if (job == null || !cancelJob(job)) { - LOG.info("Requested to cancel an already finished job."); + LOG.info("Requested to cancel an already finished client job."); } } @@ -304,7 +308,7 @@ public class RemoteDriver { } private void handle(ChannelHandlerContext ctx, JobRequest msg) { - LOG.info("Received job request {}", msg.id); + LOG.debug("Received client job request {}", msg.id); JobWrapper<?> wrapper = new JobWrapper<Serializable>(msg); activeJobs.put(msg.id, wrapper); submit(wrapper); @@ -318,7 +322,7 @@ public class RemoteDriver { while (jc == null) { jcLock.wait(); if (!running) { - throw new IllegalStateException("Remote context is shutting down."); + throw new IllegalStateException("Remote Spark context is shutting down."); } } } @@ -339,6 +343,10 @@ public class RemoteDriver { } } + @Override + public String name() { + return "Remote Spark Driver to HiveServer2 Connection"; + } } private class JobWrapper<T extends Serializable> implements Callable<Void> { @@ -404,12 +412,13 @@ public class RemoteDriver { if (sparkCounters != null) { counters = sparkCounters.snapshot(); } + protocol.jobFinished(req.id, result, null, counters); } catch (Throwable t) { // Catch throwables in a best-effort to report job status back to the client. It's // re-thrown so that the executor can destroy the affected thread (or the JVM can // die or whatever would happen if the throwable bubbled up). - LOG.error("Failed to run job " + req.id, t); + LOG.error("Failed to run client job " + req.id, t); protocol.jobFinished(req.id, null, t, sparkCounters != null ? sparkCounters.snapshot() : null); throw new ExecutionException(t); http://git-wip-us.apache.org/repos/asf/hive/blob/52f1b247/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java index fd9b725..88b5c95 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java @@ -84,8 +84,8 @@ public final class SparkClientFactory { public static SparkClient createClient(Map<String, String> sparkConf, HiveConf hiveConf, String sessionId) throws IOException, SparkException { - Preconditions.checkState(server != null, "initialize() not called."); + Preconditions.checkState(server != null, + "Invalid state: Hive on Spark RPC Server has not been initialized"); return new SparkClientImpl(server, sparkConf, hiveConf, sessionId); } - } http://git-wip-us.apache.org/repos/asf/hive/blob/52f1b247/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java index d450515..f8b5d19 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java @@ -43,6 +43,7 @@ import java.io.Writer; import java.net.URI; import java.net.URL; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -103,17 +104,16 @@ class SparkClientImpl implements SparkClient { // The RPC server will take care of timeouts here. this.driverRpc = rpcServer.registerClient(sessionid, secret, protocol).get(); } catch (Throwable e) { - String errorMsg = null; + String errorMsg; if (e.getCause() instanceof TimeoutException) { - errorMsg = "Timed out waiting for client to connect.\nPossible reasons include network " + - "issues, errors in remote driver or the cluster has no available resources, etc." + + errorMsg = "Timed out waiting for Remote Spark Driver to connect to HiveServer2.\nPossible reasons " + + "include network issues, errors in remote driver, cluster has no available resources, etc." + "\nPlease check YARN or Spark driver's logs for further information."; } else if (e.getCause() instanceof InterruptedException) { - errorMsg = "Interruption occurred while waiting for client to connect.\nPossibly the Spark session is closed " + - "such as in case of query cancellation." + - "\nPlease refer to HiveServer2 logs for further information."; + errorMsg = "Interrupted while waiting for Remote Spark Driver to connect to HiveServer2.\nIt is possible " + + "that the query was cancelled which would cause the Spark Session to close."; } else { - errorMsg = "Error while waiting for client to connect."; + errorMsg = "Error while waiting for Remote Spark Driver to connect back to HiveServer2."; } LOG.error(errorMsg, e); driverThread.interrupt(); @@ -126,14 +126,21 @@ class SparkClientImpl implements SparkClient { throw Throwables.propagate(e); } + LOG.info("Successfully connected to Remote Spark Driver at: " + this.driverRpc.getRemoteAddress()); + driverRpc.addListener(new Rpc.Listener() { @Override public void rpcClosed(Rpc rpc) { if (isAlive) { - LOG.warn("Client RPC channel closed unexpectedly."); + LOG.warn("Connection to Remote Spark Driver {} closed unexpectedly", driverRpc.getRemoteAddress()); isAlive = false; } } + + @Override + public String toString() { + return "Connection to Remote Spark Driver Closed Unexpectedly"; + } }); isAlive = true; } @@ -256,7 +263,7 @@ class SparkClientImpl implements SparkClient { try { URL sparkDefaultsUrl = Thread.currentThread().getContextClassLoader().getResource("spark-defaults.conf"); if (sparkDefaultsUrl != null) { - LOG.info("Loading spark defaults: " + sparkDefaultsUrl); + LOG.info("Loading spark defaults configs from: " + sparkDefaultsUrl); allProps.load(new ByteArrayInputStream(Resources.toByteArray(sparkDefaultsUrl))); } } catch (Exception e) { @@ -574,7 +581,7 @@ class SparkClientImpl implements SparkClient { } private void handle(ChannelHandlerContext ctx, Error msg) { - LOG.warn("Error reported from remote driver: {}", msg.cause); + LOG.warn("Error reported from Remote Spark Driver: {}", msg.cause); } private void handle(ChannelHandlerContext ctx, JobMetrics msg) { @@ -582,14 +589,14 @@ class SparkClientImpl implements SparkClient { if (handle != null) { handle.getMetrics().addMetrics(msg.sparkJobId, msg.stageId, msg.taskId, msg.metrics); } else { - LOG.warn("Received metrics for unknown job {}", msg.jobId); + LOG.warn("Received metrics for unknown Spark job {}", msg.sparkJobId); } } private void handle(ChannelHandlerContext ctx, JobResult msg) { JobHandleImpl<?> handle = jobs.remove(msg.id); if (handle != null) { - LOG.info("Received result for {}", msg.id); + LOG.debug("Received result for client job {}", msg.id); handle.setSparkCounters(msg.sparkCounters); Throwable error = msg.error; if (error == null) { @@ -598,7 +605,7 @@ class SparkClientImpl implements SparkClient { handle.setFailure(error); } } else { - LOG.warn("Received result for unknown job {}", msg.id); + LOG.warn("Received result for unknown client job {}", msg.id); } } @@ -607,19 +614,24 @@ class SparkClientImpl implements SparkClient { if (handle != null) { handle.changeState(JobHandle.State.STARTED); } else { - LOG.warn("Received event for unknown job {}", msg.id); + LOG.warn("Received event for unknown client job {}", msg.id); } } private void handle(ChannelHandlerContext ctx, JobSubmitted msg) { JobHandleImpl<?> handle = jobs.get(msg.clientJobId); if (handle != null) { - LOG.info("Received spark job ID: {} for {}", msg.sparkJobId, msg.clientJobId); + LOG.info("Received Spark job ID: {} for client job {}", msg.sparkJobId, msg.clientJobId); handle.addSparkJobId(msg.sparkJobId); } else { - LOG.warn("Received spark job ID: {} for unknown job {}", msg.sparkJobId, msg.clientJobId); + LOG.warn("Received Spark job ID: {} for unknown client job {}", msg.sparkJobId, msg.clientJobId); } } + + @Override + protected String name() { + return "HiveServer2 to Remote Spark Driver Connection"; + } } private static class AddJarJob implements Job<Serializable> { http://git-wip-us.apache.org/repos/asf/hive/blob/52f1b247/spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java index f137007..6a13071 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java @@ -44,4 +44,10 @@ public class InputMetrics implements Serializable { this(metrics.inputMetrics().bytesRead()); } + @Override + public String toString() { + return "InputMetrics{" + + "bytesRead=" + bytesRead + + '}'; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/52f1b247/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java index b718b3b..cf7a1f6 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java @@ -127,4 +127,22 @@ public class Metrics implements Serializable { return (metrics.shuffleWriteMetrics() != null) ? new ShuffleWriteMetrics(metrics) : null; } + @Override + public String toString() { + return "Metrics{" + + "executorDeserializeTime=" + executorDeserializeTime + + ", executorDeserializeCpuTime=" + executorDeserializeCpuTime + + ", executorRunTime=" + executorRunTime + + ", executorCpuTime=" + executorCpuTime + + ", resultSize=" + resultSize + + ", jvmGCTime=" + jvmGCTime + + ", resultSerializationTime=" + resultSerializationTime + + ", memoryBytesSpilled=" + memoryBytesSpilled + + ", diskBytesSpilled=" + diskBytesSpilled + + ", taskDurationTime=" + taskDurationTime + + ", inputMetrics=" + inputMetrics + + ", shuffleReadMetrics=" + shuffleReadMetrics + + ", shuffleWriteMetrics=" + shuffleWriteMetrics + + '}'; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/52f1b247/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleReadMetrics.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleReadMetrics.java b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleReadMetrics.java index 9ff4d0f..e3d564f 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleReadMetrics.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleReadMetrics.java @@ -73,4 +73,13 @@ public class ShuffleReadMetrics implements Serializable { return remoteBlocksFetched + localBlocksFetched; } + @Override + public String toString() { + return "ShuffleReadMetrics{" + + "remoteBlocksFetched=" + remoteBlocksFetched + + ", localBlocksFetched=" + localBlocksFetched + + ", fetchWaitTime=" + fetchWaitTime + + ", remoteBytesRead=" + remoteBytesRead + + '}'; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/52f1b247/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleWriteMetrics.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleWriteMetrics.java b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleWriteMetrics.java index 64a4b86..e9cf6a1 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleWriteMetrics.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleWriteMetrics.java @@ -51,4 +51,11 @@ public class ShuffleWriteMetrics implements Serializable { metrics.shuffleWriteMetrics().shuffleWriteTime()); } + @Override + public String toString() { + return "ShuffleWriteMetrics{" + + "shuffleBytesWritten=" + shuffleBytesWritten + + ", shuffleWriteTime=" + shuffleWriteTime + + '}'; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/52f1b247/spark-client/src/main/java/org/apache/hive/spark/client/rpc/KryoMessageCodec.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/KryoMessageCodec.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/KryoMessageCodec.java index 5454ec2..d3a6812 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/KryoMessageCodec.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/KryoMessageCodec.java @@ -101,7 +101,7 @@ class KryoMessageCodec extends ByteToMessageCodec<Object> { Input kryoIn = new Input(new ByteBufferInputStream(nioBuffer)); Object msg = kryos.get().readClassAndObject(kryoIn); - LOG.debug("Decoded message of type {} ({} bytes)", + LOG.trace("Decoded message of type {} ({} bytes)", msg != null ? msg.getClass().getName() : msg, msgSize); out.add(msg); } finally { @@ -118,7 +118,7 @@ class KryoMessageCodec extends ByteToMessageCodec<Object> { kryoOut.flush(); byte[] msgData = maybeEncrypt(bytes.toByteArray()); - LOG.debug("Encoded message of type {} ({} bytes)", msg.getClass().getName(), msgData.length); + LOG.trace("Encoded message of type {} ({} bytes)", msg.getClass().getName(), msgData.length); checkSize(msgData.length); buf.ensureWritable(msgData.length + 4); http://git-wip-us.apache.org/repos/asf/hive/blob/52f1b247/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java index cbbfb1c..298a210 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java @@ -19,6 +19,7 @@ package org.apache.hive.spark.client.rpc; import java.io.Closeable; import java.io.IOException; +import java.net.InetSocketAddress; import java.util.Collection; import java.util.Collections; import java.util.Map; @@ -116,7 +117,7 @@ public class Rpc implements Closeable { final Runnable timeoutTask = new Runnable() { @Override public void run() { - promise.setFailure(new TimeoutException("Timed out waiting for RPC server connection.")); + promise.setFailure(new TimeoutException("Timed out waiting to connect to HiveServer2.")); } }; final ScheduledFuture<?> timeoutFuture = eloop.schedule(timeoutTask, @@ -272,7 +273,8 @@ public class Rpc implements Closeable { */ public <T> Future<T> call(final Object msg, Class<T> retType) { Preconditions.checkArgument(msg != null); - Preconditions.checkState(channel.isActive(), "RPC channel is closed."); + Preconditions.checkState(channel.isActive(), "Unable to send message " + msg + + " because the Remote Spark Driver - HiveServer2 connection has been closed."); try { final long id = rpcId.getAndIncrement(); final Promise<T> promise = createPromise(); @@ -280,7 +282,8 @@ public class Rpc implements Closeable { @Override public void operationComplete(ChannelFuture cf) { if (!cf.isSuccess() && !promise.isDone()) { - LOG.warn("Failed to send RPC, closing connection.", cf.cause()); + LOG.warn("Failed to send message '" + msg + "', closing Remote Spark Driver - " + + "HiveServer2 connection.", cf.cause()); promise.setFailure(cf.cause()); dispatcher.discardRpc(id); close(); @@ -314,6 +317,14 @@ public class Rpc implements Closeable { return channel; } + /** + * Returns the "hostname:port" that the RPC is connected to + */ + public String getRemoteAddress() { + InetSocketAddress remoteAddress = ((InetSocketAddress) this.channel.remoteAddress()); + return remoteAddress.getHostName() + ":" + remoteAddress.getPort(); + } + void setDispatcher(RpcDispatcher dispatcher) { Preconditions.checkNotNull(dispatcher); Preconditions.checkState(this.dispatcher == null); @@ -336,7 +347,7 @@ public class Rpc implements Closeable { try { l.rpcClosed(this); } catch (Exception e) { - LOG.warn("Error caught in Rpc.Listener invocation.", e); + LOG.warn("Error caught while running '" + l + "' listener", e); } } } @@ -493,12 +504,10 @@ public class Rpc implements Closeable { client.evaluateChallenge(new byte[0]) : new byte[0]; c.writeAndFlush(new SaslMessage(clientId, hello)).addListener(future -> { if (!future.isSuccess()) { - LOG.error("Failed to send hello to server", future.cause()); + LOG.error("Failed to send test message to HiveServer2", future.cause()); onError(future.cause()); } }); } - } - } http://git-wip-us.apache.org/repos/asf/hive/blob/52f1b247/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java index a535b8d..090c628 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java @@ -115,10 +115,9 @@ public final class RpcConfiguration { * Parses the port string like 49152-49222,49228 into the port list. A default 0 * is added for the empty port string. * @return a list of configured ports. - * @exception IOException is thrown if the property is not configured properly */ - List<Integer> getServerPorts() throws IOException { - String errMsg = "Incorrect RPC server port configuration for HiveServer2"; + List<Integer> getServerPorts() { + String errMsg = "Malformed configuration value for " + HiveConf.ConfVars.SPARK_RPC_SERVER_PORT.varname; String portString = config.get(HiveConf.ConfVars.SPARK_RPC_SERVER_PORT.varname); ArrayList<Integer> ports = new ArrayList<Integer>(); try { @@ -127,7 +126,7 @@ public final class RpcConfiguration { String[] range = portRange.split("-"); if (range.length == 0 || range.length > 2 || (range.length == 2 && Integer.valueOf(range[0]) > Integer.valueOf(range[1]))) { - throw new IOException(errMsg); + throw new IllegalArgumentException(errMsg); } if (range.length == 1) { ports.add(Integer.valueOf(range[0])); @@ -143,7 +142,7 @@ public final class RpcConfiguration { return ports; } catch(NumberFormatException e) { - throw new IOException(errMsg); + throw new IllegalArgumentException(errMsg, e); } } http://git-wip-us.apache.org/repos/asf/hive/blob/52f1b247/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcDispatcher.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcDispatcher.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcDispatcher.java index 00f5a17..b588547 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcDispatcher.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcDispatcher.java @@ -66,13 +66,12 @@ public abstract class RpcDispatcher extends SimpleChannelInboundHandler<Object> protected final void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { if (lastHeader == null) { if (!(msg instanceof Rpc.MessageHeader)) { - LOG.warn("[{}] Expected RPC header, got {} instead.", name(), - msg != null ? msg.getClass().getName() : null); - throw new IllegalArgumentException(); + throw new IllegalArgumentException(String.format("[%s] Expected RPC header, got %s instead.", name(), + msg != null ? msg.getClass().getName() : null)); } lastHeader = (Rpc.MessageHeader) msg; } else { - LOG.debug("[{}] Received RPC message: type={} id={} payload={}", name(), + LOG.trace("[{}] Received RPC message: type={} id={} payload={}", name(), lastHeader.type, lastHeader.id, msg != null ? msg.getClass().getName() : null); try { switch (lastHeader.type) { @@ -86,7 +85,7 @@ public abstract class RpcDispatcher extends SimpleChannelInboundHandler<Object> handleError(ctx, msg, findRpc(lastHeader.id)); break; default: - throw new IllegalArgumentException("Unknown RPC message type: " + lastHeader.type); + throw new IllegalArgumentException("[" + name() + "] Unknown RPC message type: " + lastHeader.type); } } finally { lastHeader = null; @@ -103,7 +102,7 @@ public abstract class RpcDispatcher extends SimpleChannelInboundHandler<Object> } } throw new IllegalArgumentException(String.format( - "Received RPC reply for unknown RPC (%d).", id)); + "[%s] Received RPC reply for unknown RPC (%d).", name(), id)); } private void handleCall(ChannelHandlerContext ctx, Object msg) throws Exception { @@ -124,7 +123,7 @@ public abstract class RpcDispatcher extends SimpleChannelInboundHandler<Object> } replyType = Rpc.MessageType.REPLY; } catch (InvocationTargetException ite) { - LOG.debug(String.format("[%s] Error in RPC handler.", name()), ite.getCause()); + LOG.error(String.format("[%s] Error in RPC handler.", name()), ite.getCause()); replyPayload = Throwables.getStackTraceAsString(ite.getCause()); replyType = Rpc.MessageType.ERROR; } @@ -140,12 +139,12 @@ public abstract class RpcDispatcher extends SimpleChannelInboundHandler<Object> private void handleError(ChannelHandlerContext ctx, Object msg, OutstandingRpc rpc) throws Exception { if (msg instanceof String) { - LOG.warn("Received error message:{}.", msg); + LOG.error("[{}] Received error message: {}.", name(), msg); rpc.future.setFailure(new RpcException((String) msg)); } else { String error = String.format("Received error with unexpected payload (%s).", msg != null ? msg.getClass().getName() : null); - LOG.warn(String.format("[%s] %s", name(), error)); + LOG.error(String.format("[%s] %s", name(), error)); rpc.future.setFailure(new IllegalArgumentException(error)); ctx.close(); } @@ -178,7 +177,7 @@ public abstract class RpcDispatcher extends SimpleChannelInboundHandler<Object> } void registerRpc(long id, Promise promise, String type) { - LOG.debug("[{}] Registered outstanding rpc {} ({}).", name(), id, type); + LOG.trace("[{}] Registered outstanding rpc {} ({}).", name(), id, type); rpcs.add(new OutstandingRpc(id, promise)); } @@ -196,5 +195,4 @@ public abstract class RpcDispatcher extends SimpleChannelInboundHandler<Object> this.future = future; } } - } http://git-wip-us.apache.org/repos/asf/hive/blob/52f1b247/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java index 6c6ab74..f1383d6 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java @@ -21,6 +21,7 @@ import java.io.Closeable; import java.io.IOException; import java.net.InetSocketAddress; import java.security.SecureRandom; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Random; @@ -84,7 +85,7 @@ public class RpcServer implements Closeable { this.group = new NioEventLoopGroup( this.config.getRpcThreadCount(), new ThreadFactoryBuilder() - .setNameFormat("RPC-Handler-%d") + .setNameFormat("Spark-Driver-RPC-Handler-%d") .setDaemon(true) .build()); ServerBootstrap serverBootstrap = new ServerBootstrap() @@ -100,7 +101,7 @@ public class RpcServer implements Closeable { Runnable cancelTask = new Runnable() { @Override public void run() { - LOG.warn("Timed out waiting for hello from client."); + LOG.warn("Timed out waiting for test message from Remote Spark driver."); newRpc.close(); } }; @@ -117,6 +118,8 @@ public class RpcServer implements Closeable { this.port = ((InetSocketAddress) channel.localAddress()).getPort(); this.pendingClients = Maps.newConcurrentMap(); this.address = this.config.getServerAddress(); + + LOG.info("Successfully created Remote Spark Driver RPC Server with address {}:{}", this.address, this.port); } /** @@ -143,7 +146,8 @@ public class RpcServer implements Closeable { // Retry the next port } } - throw new IOException("No available ports from configured RPC Server ports for HiveServer2"); + throw new IOException("Remote Spark Driver RPC Server cannot bind to any of the configured ports: " + + Arrays.toString(config.getServerPorts().toArray())); } } @@ -169,7 +173,9 @@ public class RpcServer implements Closeable { Runnable timeout = new Runnable() { @Override public void run() { - promise.setFailure(new TimeoutException("Timed out waiting for client connection.")); + promise.setFailure(new TimeoutException( + String.format("Client '%s' timed out waiting for connection from the Remote Spark" + + " Driver", clientId))); } }; ScheduledFuture<?> timeoutFuture = group.schedule(timeout, @@ -179,7 +185,7 @@ public class RpcServer implements Closeable { timeoutFuture); if (pendingClients.putIfAbsent(clientId, client) != null) { throw new IllegalStateException( - String.format("Client '%s' already registered.", clientId)); + String.format("Remote Spark Driver with client ID '%s' already registered", clientId)); } promise.addListener(new GenericFutureListener<Promise<Rpc>>() { @@ -208,7 +214,7 @@ public class RpcServer implements Closeable { cinfo.timeoutFuture.cancel(true); if (!cinfo.promise.isDone()) { cinfo.promise.setFailure(new RuntimeException( - String.format("Cancel client '%s'. Error: " + msg, clientId))); + String.format("Cancelling Remote Spark Driver client connection '%s' with error: " + msg, clientId))); } }