This is an automated email from the ASF dual-hosted git repository. asnaik pushed a commit to branch branch-2.6 in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/branch-2.6 by this push: new c763113 [AMBARI-25362] Hive View throws TimeoutException deadline passed for few queries randomly.( Sreenath Somarajapuram) (#3072) c763113 is described below commit c763113e134d238f3f159dabd8de359cb40bff13 Author: Asnaik HWX <asn...@hortonworks.com> AuthorDate: Tue Oct 1 12:12:22 2019 +0530 [AMBARI-25362] Hive View throws TimeoutException deadline passed for few queries randomly.( Sreenath Somarajapuram) (#3072) --- .../org/apache/ambari/view/hive2/client/AsyncJobRunnerImpl.java | 9 +++++---- .../org/apache/ambari/view/hive2/client/NonPersistentCursor.java | 5 ++--- .../org/apache/ambari/view/hive20/client/AsyncJobRunnerImpl.java | 9 +++++---- .../apache/ambari/view/hive20/client/NonPersistentCursor.java | 5 ++--- 4 files changed, 14 insertions(+), 14 deletions(-) diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/client/AsyncJobRunnerImpl.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/client/AsyncJobRunnerImpl.java index 82e3df5..e19415e 100644 --- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/client/AsyncJobRunnerImpl.java +++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/client/AsyncJobRunnerImpl.java @@ -48,6 +48,7 @@ import java.util.concurrent.TimeUnit; public class AsyncJobRunnerImpl implements AsyncJobRunner { private final Logger LOG = LoggerFactory.getLogger(getClass()); + private static scala.concurrent.duration.FiniteDuration WAIT_TIME = Duration.create(12, TimeUnit.HOURS); private final ActorRef controller; private final ActorSystem system; @@ -76,7 +77,7 @@ public class AsyncJobRunnerImpl implements AsyncJobRunner { public Optional<NonPersistentCursor> getCursor(String jobId, String username) { Inbox inbox = Inbox.create(system); inbox.send(controller, new FetchResult(jobId, username)); - Object receive = inbox.receive(Duration.create(1, TimeUnit.MINUTES)); + Object receive = inbox.receive(WAIT_TIME); if(receive instanceof ResultNotReady) { String errorString = "Result not ready for job: " + jobId + ", username: " + username + ". Try after sometime."; LOG.info(errorString); @@ -98,7 +99,7 @@ public class AsyncJobRunnerImpl implements AsyncJobRunner { public Optional<NonPersistentCursor> resetAndGetCursor(String jobId, String username) { Inbox inbox = Inbox.create(system); inbox.send(controller, new FetchResult(jobId, username)); - Object receive = inbox.receive(Duration.create(1, TimeUnit.MINUTES)); + Object receive = inbox.receive(WAIT_TIME); if(receive instanceof ResultNotReady) { String errorString = "Result not ready for job: " + jobId + ", username: " + username + ". Try after sometime."; LOG.info(errorString); @@ -110,7 +111,7 @@ public class AsyncJobRunnerImpl implements AsyncJobRunner { Optional<ActorRef> iterator = (Optional<ActorRef>) receive; if(iterator.isPresent()) { inbox.send(iterator.get(), new ResetCursor()); - Object resetResult = inbox.receive(Duration.create(1, TimeUnit.MINUTES)); + Object resetResult = inbox.receive(WAIT_TIME); if (resetResult instanceof CursorReset) { return Optional.of(new NonPersistentCursor(context, system, iterator.get())); } else { @@ -126,7 +127,7 @@ public class AsyncJobRunnerImpl implements AsyncJobRunner { public Optional<Failure> getError(String jobId, String username) { Inbox inbox = Inbox.create(system); inbox.send(controller, new FetchError(jobId, username)); - Object receive = inbox.receive(Duration.create(1, TimeUnit.MINUTES)); + Object receive = inbox.receive(WAIT_TIME); if(receive instanceof FetchFailed){ FetchFailed fetchFailed = (FetchFailed) receive; return Optional.of(new Failure(fetchFailed.getMessage(), getExceptionForRetry())); diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/client/NonPersistentCursor.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/client/NonPersistentCursor.java index 13cab33..1e43c07 100644 --- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/client/NonPersistentCursor.java +++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/client/NonPersistentCursor.java @@ -45,7 +45,7 @@ import java.util.concurrent.TimeUnit; */ public class NonPersistentCursor implements Cursor<Row, ColumnDescription> { private final Logger LOG = LoggerFactory.getLogger(getClass()); - private static long DEFAULT_WAIT_TIMEOUT = 60 * 1000L; + private static scala.concurrent.duration.FiniteDuration WAIT_TIME = Duration.create(12, TimeUnit.HOURS); private final ActorSystem system; private final ActorRef actorRef; @@ -124,8 +124,7 @@ public class NonPersistentCursor implements Cursor<Row, ColumnDescription> { inbox.send(actorRef, new Next()); Object receive; try { - receive = inbox.receive(Duration.create(actorConfiguration.getResultFetchTimeout(DEFAULT_WAIT_TIMEOUT), - TimeUnit.MILLISECONDS)); + receive = inbox.receive(WAIT_TIME); } catch (Throwable ex) { String errorMessage = "Result fetch timed out"; LOG.error(errorMessage, ex); diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/client/AsyncJobRunnerImpl.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/client/AsyncJobRunnerImpl.java index 7013f8a..7804103 100644 --- a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/client/AsyncJobRunnerImpl.java +++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/client/AsyncJobRunnerImpl.java @@ -48,6 +48,7 @@ import java.util.concurrent.TimeUnit; public class AsyncJobRunnerImpl implements AsyncJobRunner { private final Logger LOG = LoggerFactory.getLogger(getClass()); + private static scala.concurrent.duration.FiniteDuration WAIT_TIME = Duration.create(12, TimeUnit.HOURS); private final ActorRef controller; private final ActorSystem system; @@ -76,7 +77,7 @@ public class AsyncJobRunnerImpl implements AsyncJobRunner { public Optional<NonPersistentCursor> getCursor(String jobId, String username) { Inbox inbox = Inbox.create(system); inbox.send(controller, new FetchResult(jobId, username)); - Object receive = inbox.receive(Duration.create(1, TimeUnit.MINUTES)); + Object receive = inbox.receive(WAIT_TIME); if(receive instanceof ResultNotReady) { String errorString = "Result not ready for job: " + jobId + ", username: " + username + ". Try after sometime."; LOG.info(errorString); @@ -98,7 +99,7 @@ public class AsyncJobRunnerImpl implements AsyncJobRunner { public Optional<NonPersistentCursor> resetAndGetCursor(String jobId, String username) { Inbox inbox = Inbox.create(system); inbox.send(controller, new FetchResult(jobId, username)); - Object receive = inbox.receive(Duration.create(1, TimeUnit.MINUTES)); + Object receive = inbox.receive(WAIT_TIME); if(receive instanceof ResultNotReady) { String errorString = "Result not ready for job: " + jobId + ", username: " + username + ". Try after sometime."; LOG.info(errorString); @@ -110,7 +111,7 @@ public class AsyncJobRunnerImpl implements AsyncJobRunner { Optional<ActorRef> iterator = (Optional<ActorRef>) receive; if(iterator.isPresent()) { inbox.send(iterator.get(), new ResetCursor()); - Object resetResult = inbox.receive(Duration.create(1, TimeUnit.MINUTES)); + Object resetResult = inbox.receive(WAIT_TIME); if (resetResult instanceof CursorReset) { return Optional.of(new NonPersistentCursor(context, system, iterator.get())); } else { @@ -126,7 +127,7 @@ public class AsyncJobRunnerImpl implements AsyncJobRunner { public Optional<Failure> getError(String jobId, String username) { Inbox inbox = Inbox.create(system); inbox.send(controller, new FetchError(jobId, username)); - Object receive = inbox.receive(Duration.create(1, TimeUnit.MINUTES)); + Object receive = inbox.receive(WAIT_TIME); if(receive instanceof FetchFailed){ FetchFailed fetchFailed = (FetchFailed) receive; return Optional.of(new Failure(fetchFailed.getMessage(), getExceptionForRetry())); diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/client/NonPersistentCursor.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/client/NonPersistentCursor.java index 80ffe79..c316579 100644 --- a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/client/NonPersistentCursor.java +++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/client/NonPersistentCursor.java @@ -45,7 +45,7 @@ import java.util.concurrent.TimeUnit; */ public class NonPersistentCursor implements Cursor<Row, ColumnDescription> { private final Logger LOG = LoggerFactory.getLogger(getClass()); - private static long DEFAULT_WAIT_TIMEOUT = 60 * 1000L; + private static scala.concurrent.duration.FiniteDuration WAIT_TIME = Duration.create(12, TimeUnit.HOURS); private final ActorSystem system; private final ActorRef actorRef; @@ -124,8 +124,7 @@ public class NonPersistentCursor implements Cursor<Row, ColumnDescription> { inbox.send(actorRef, new Next()); Object receive; try { - receive = inbox.receive(Duration.create(actorConfiguration.getResultFetchTimeout(DEFAULT_WAIT_TIMEOUT), - TimeUnit.MILLISECONDS)); + receive = inbox.receive(WAIT_TIME); } catch (Throwable ex) { String errorMessage = "Result fetch timed out"; LOG.error(errorMessage, ex);