This is an automated email from the ASF dual-hosted git repository. asnaik pushed a commit to branch AMBARI-25362-2.6 in repository https://gitbox.apache.org/repos/asf/ambari.git
commit 72fca489b4c8bf6c206988d887ff268c7c007023 Author: Akhil Subhash Naik <asn...@hortonworks.com> AuthorDate: Tue Aug 27 10:59:59 2019 +0530 [AMBARI-25362] Hive View throws TimeoutException deadline passed for few queries randomly.( Sreenath Somarajapuram) --- .gitignore | 1 + .../org/apache/ambari/view/hive2/client/AsyncJobRunnerImpl.java | 9 +++++---- .../org/apache/ambari/view/hive2/client/NonPersistentCursor.java | 5 ++--- .../org/apache/ambari/view/hive20/client/AsyncJobRunnerImpl.java | 9 +++++---- .../apache/ambari/view/hive20/client/NonPersistentCursor.java | 5 ++--- 5 files changed, 15 insertions(+), 14 deletions(-) diff --git a/.gitignore b/.gitignore index 77aadb7..dbdbcc9 100644 --- a/.gitignore +++ b/.gitignore @@ -29,3 +29,4 @@ createDDL.jdbc /contrib/views/storm/src/main/resources/ui/node_modules/ /contrib/views/storm/src/main/resources/ui/public/ /contrib/views/storm/src/main/resources/ui/npm-debug.log +contrib/views/capacity-scheduler/src/main/resources/ui/wwwroot diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/client/AsyncJobRunnerImpl.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/client/AsyncJobRunnerImpl.java index 82e3df5..e19415e 100644 --- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/client/AsyncJobRunnerImpl.java +++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/client/AsyncJobRunnerImpl.java @@ -48,6 +48,7 @@ import java.util.concurrent.TimeUnit; public class AsyncJobRunnerImpl implements AsyncJobRunner { private final Logger LOG = LoggerFactory.getLogger(getClass()); + private static scala.concurrent.duration.FiniteDuration WAIT_TIME = Duration.create(12, TimeUnit.HOURS); private final ActorRef controller; private final ActorSystem system; @@ -76,7 +77,7 @@ public class AsyncJobRunnerImpl implements AsyncJobRunner { public Optional<NonPersistentCursor> getCursor(String jobId, String username) { Inbox inbox = Inbox.create(system); inbox.send(controller, new FetchResult(jobId, username)); - Object receive = inbox.receive(Duration.create(1, TimeUnit.MINUTES)); + Object receive = inbox.receive(WAIT_TIME); if(receive instanceof ResultNotReady) { String errorString = "Result not ready for job: " + jobId + ", username: " + username + ". Try after sometime."; LOG.info(errorString); @@ -98,7 +99,7 @@ public class AsyncJobRunnerImpl implements AsyncJobRunner { public Optional<NonPersistentCursor> resetAndGetCursor(String jobId, String username) { Inbox inbox = Inbox.create(system); inbox.send(controller, new FetchResult(jobId, username)); - Object receive = inbox.receive(Duration.create(1, TimeUnit.MINUTES)); + Object receive = inbox.receive(WAIT_TIME); if(receive instanceof ResultNotReady) { String errorString = "Result not ready for job: " + jobId + ", username: " + username + ". Try after sometime."; LOG.info(errorString); @@ -110,7 +111,7 @@ public class AsyncJobRunnerImpl implements AsyncJobRunner { Optional<ActorRef> iterator = (Optional<ActorRef>) receive; if(iterator.isPresent()) { inbox.send(iterator.get(), new ResetCursor()); - Object resetResult = inbox.receive(Duration.create(1, TimeUnit.MINUTES)); + Object resetResult = inbox.receive(WAIT_TIME); if (resetResult instanceof CursorReset) { return Optional.of(new NonPersistentCursor(context, system, iterator.get())); } else { @@ -126,7 +127,7 @@ public class AsyncJobRunnerImpl implements AsyncJobRunner { public Optional<Failure> getError(String jobId, String username) { Inbox inbox = Inbox.create(system); inbox.send(controller, new FetchError(jobId, username)); - Object receive = inbox.receive(Duration.create(1, TimeUnit.MINUTES)); + Object receive = inbox.receive(WAIT_TIME); if(receive instanceof FetchFailed){ FetchFailed fetchFailed = (FetchFailed) receive; return Optional.of(new Failure(fetchFailed.getMessage(), getExceptionForRetry())); diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/client/NonPersistentCursor.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/client/NonPersistentCursor.java index 13cab33..1e43c07 100644 --- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/client/NonPersistentCursor.java +++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/client/NonPersistentCursor.java @@ -45,7 +45,7 @@ import java.util.concurrent.TimeUnit; */ public class NonPersistentCursor implements Cursor<Row, ColumnDescription> { private final Logger LOG = LoggerFactory.getLogger(getClass()); - private static long DEFAULT_WAIT_TIMEOUT = 60 * 1000L; + private static scala.concurrent.duration.FiniteDuration WAIT_TIME = Duration.create(12, TimeUnit.HOURS); private final ActorSystem system; private final ActorRef actorRef; @@ -124,8 +124,7 @@ public class NonPersistentCursor implements Cursor<Row, ColumnDescription> { inbox.send(actorRef, new Next()); Object receive; try { - receive = inbox.receive(Duration.create(actorConfiguration.getResultFetchTimeout(DEFAULT_WAIT_TIMEOUT), - TimeUnit.MILLISECONDS)); + receive = inbox.receive(WAIT_TIME); } catch (Throwable ex) { String errorMessage = "Result fetch timed out"; LOG.error(errorMessage, ex); diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/client/AsyncJobRunnerImpl.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/client/AsyncJobRunnerImpl.java index 7013f8a..7804103 100644 --- a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/client/AsyncJobRunnerImpl.java +++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/client/AsyncJobRunnerImpl.java @@ -48,6 +48,7 @@ import java.util.concurrent.TimeUnit; public class AsyncJobRunnerImpl implements AsyncJobRunner { private final Logger LOG = LoggerFactory.getLogger(getClass()); + private static scala.concurrent.duration.FiniteDuration WAIT_TIME = Duration.create(12, TimeUnit.HOURS); private final ActorRef controller; private final ActorSystem system; @@ -76,7 +77,7 @@ public class AsyncJobRunnerImpl implements AsyncJobRunner { public Optional<NonPersistentCursor> getCursor(String jobId, String username) { Inbox inbox = Inbox.create(system); inbox.send(controller, new FetchResult(jobId, username)); - Object receive = inbox.receive(Duration.create(1, TimeUnit.MINUTES)); + Object receive = inbox.receive(WAIT_TIME); if(receive instanceof ResultNotReady) { String errorString = "Result not ready for job: " + jobId + ", username: " + username + ". Try after sometime."; LOG.info(errorString); @@ -98,7 +99,7 @@ public class AsyncJobRunnerImpl implements AsyncJobRunner { public Optional<NonPersistentCursor> resetAndGetCursor(String jobId, String username) { Inbox inbox = Inbox.create(system); inbox.send(controller, new FetchResult(jobId, username)); - Object receive = inbox.receive(Duration.create(1, TimeUnit.MINUTES)); + Object receive = inbox.receive(WAIT_TIME); if(receive instanceof ResultNotReady) { String errorString = "Result not ready for job: " + jobId + ", username: " + username + ". Try after sometime."; LOG.info(errorString); @@ -110,7 +111,7 @@ public class AsyncJobRunnerImpl implements AsyncJobRunner { Optional<ActorRef> iterator = (Optional<ActorRef>) receive; if(iterator.isPresent()) { inbox.send(iterator.get(), new ResetCursor()); - Object resetResult = inbox.receive(Duration.create(1, TimeUnit.MINUTES)); + Object resetResult = inbox.receive(WAIT_TIME); if (resetResult instanceof CursorReset) { return Optional.of(new NonPersistentCursor(context, system, iterator.get())); } else { @@ -126,7 +127,7 @@ public class AsyncJobRunnerImpl implements AsyncJobRunner { public Optional<Failure> getError(String jobId, String username) { Inbox inbox = Inbox.create(system); inbox.send(controller, new FetchError(jobId, username)); - Object receive = inbox.receive(Duration.create(1, TimeUnit.MINUTES)); + Object receive = inbox.receive(WAIT_TIME); if(receive instanceof FetchFailed){ FetchFailed fetchFailed = (FetchFailed) receive; return Optional.of(new Failure(fetchFailed.getMessage(), getExceptionForRetry())); diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/client/NonPersistentCursor.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/client/NonPersistentCursor.java index 80ffe79..c316579 100644 --- a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/client/NonPersistentCursor.java +++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/client/NonPersistentCursor.java @@ -45,7 +45,7 @@ import java.util.concurrent.TimeUnit; */ public class NonPersistentCursor implements Cursor<Row, ColumnDescription> { private final Logger LOG = LoggerFactory.getLogger(getClass()); - private static long DEFAULT_WAIT_TIMEOUT = 60 * 1000L; + private static scala.concurrent.duration.FiniteDuration WAIT_TIME = Duration.create(12, TimeUnit.HOURS); private final ActorSystem system; private final ActorRef actorRef; @@ -124,8 +124,7 @@ public class NonPersistentCursor implements Cursor<Row, ColumnDescription> { inbox.send(actorRef, new Next()); Object receive; try { - receive = inbox.receive(Duration.create(actorConfiguration.getResultFetchTimeout(DEFAULT_WAIT_TIMEOUT), - TimeUnit.MILLISECONDS)); + receive = inbox.receive(WAIT_TIME); } catch (Throwable ex) { String errorMessage = "Result fetch timed out"; LOG.error(errorMessage, ex);