This is an automated email from the ASF dual-hosted git repository.

asnaik pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/ambari.git


The following commit(s) were added to refs/heads/branch-2.6 by this push:
     new c763113  [AMBARI-25362] Hive View throws TimeoutException deadline 
passed for few queries randomly.( Sreenath Somarajapuram) (#3072)
c763113 is described below

commit c763113e134d238f3f159dabd8de359cb40bff13
Author: Asnaik HWX <asn...@hortonworks.com>
AuthorDate: Tue Oct 1 12:12:22 2019 +0530

    [AMBARI-25362] Hive View throws TimeoutException deadline passed for few 
queries randomly.( Sreenath Somarajapuram) (#3072)
---
 .../org/apache/ambari/view/hive2/client/AsyncJobRunnerImpl.java  | 9 +++++----
 .../org/apache/ambari/view/hive2/client/NonPersistentCursor.java | 5 ++---
 .../org/apache/ambari/view/hive20/client/AsyncJobRunnerImpl.java | 9 +++++----
 .../apache/ambari/view/hive20/client/NonPersistentCursor.java    | 5 ++---
 4 files changed, 14 insertions(+), 14 deletions(-)

diff --git 
a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/client/AsyncJobRunnerImpl.java
 
b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/client/AsyncJobRunnerImpl.java
index 82e3df5..e19415e 100644
--- 
a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/client/AsyncJobRunnerImpl.java
+++ 
b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/client/AsyncJobRunnerImpl.java
@@ -48,6 +48,7 @@ import java.util.concurrent.TimeUnit;
 public class AsyncJobRunnerImpl implements AsyncJobRunner {
 
   private final Logger LOG = LoggerFactory.getLogger(getClass());
+  private static scala.concurrent.duration.FiniteDuration WAIT_TIME = 
Duration.create(12, TimeUnit.HOURS);
 
   private final ActorRef controller;
   private final ActorSystem system;
@@ -76,7 +77,7 @@ public class AsyncJobRunnerImpl implements AsyncJobRunner {
   public Optional<NonPersistentCursor> getCursor(String jobId, String 
username) {
     Inbox inbox = Inbox.create(system);
     inbox.send(controller, new FetchResult(jobId, username));
-    Object receive = inbox.receive(Duration.create(1, TimeUnit.MINUTES));
+    Object receive = inbox.receive(WAIT_TIME);
     if(receive instanceof ResultNotReady) {
       String errorString = "Result not ready for job: " + jobId + ", username: 
" + username + ". Try after sometime.";
       LOG.info(errorString);
@@ -98,7 +99,7 @@ public class AsyncJobRunnerImpl implements AsyncJobRunner {
   public Optional<NonPersistentCursor> resetAndGetCursor(String jobId, String 
username) {
     Inbox inbox = Inbox.create(system);
     inbox.send(controller, new FetchResult(jobId, username));
-    Object receive = inbox.receive(Duration.create(1, TimeUnit.MINUTES));
+    Object receive = inbox.receive(WAIT_TIME);
     if(receive instanceof ResultNotReady) {
       String errorString = "Result not ready for job: " + jobId + ", username: 
" + username + ". Try after sometime.";
       LOG.info(errorString);
@@ -110,7 +111,7 @@ public class AsyncJobRunnerImpl implements AsyncJobRunner {
       Optional<ActorRef> iterator = (Optional<ActorRef>) receive;
       if(iterator.isPresent()) {
         inbox.send(iterator.get(), new ResetCursor());
-        Object resetResult = inbox.receive(Duration.create(1, 
TimeUnit.MINUTES));
+        Object resetResult = inbox.receive(WAIT_TIME);
         if (resetResult instanceof CursorReset) {
           return Optional.of(new NonPersistentCursor(context, system, 
iterator.get()));
         } else {
@@ -126,7 +127,7 @@ public class AsyncJobRunnerImpl implements AsyncJobRunner {
   public Optional<Failure> getError(String jobId, String username) {
     Inbox inbox = Inbox.create(system);
     inbox.send(controller, new FetchError(jobId, username));
-    Object receive = inbox.receive(Duration.create(1, TimeUnit.MINUTES));
+    Object receive = inbox.receive(WAIT_TIME);
     if(receive instanceof FetchFailed){
       FetchFailed fetchFailed = (FetchFailed) receive;
       return Optional.of(new Failure(fetchFailed.getMessage(), 
getExceptionForRetry()));
diff --git 
a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/client/NonPersistentCursor.java
 
b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/client/NonPersistentCursor.java
index 13cab33..1e43c07 100644
--- 
a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/client/NonPersistentCursor.java
+++ 
b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/client/NonPersistentCursor.java
@@ -45,7 +45,7 @@ import java.util.concurrent.TimeUnit;
  */
 public class NonPersistentCursor implements Cursor<Row, ColumnDescription> {
   private final Logger LOG = LoggerFactory.getLogger(getClass());
-  private static long DEFAULT_WAIT_TIMEOUT = 60 * 1000L;
+  private static scala.concurrent.duration.FiniteDuration WAIT_TIME = 
Duration.create(12, TimeUnit.HOURS);
 
   private final ActorSystem system;
   private final ActorRef actorRef;
@@ -124,8 +124,7 @@ public class NonPersistentCursor implements Cursor<Row, 
ColumnDescription> {
     inbox.send(actorRef, new Next());
     Object receive;
     try {
-      receive = 
inbox.receive(Duration.create(actorConfiguration.getResultFetchTimeout(DEFAULT_WAIT_TIMEOUT),
-        TimeUnit.MILLISECONDS));
+      receive = inbox.receive(WAIT_TIME);
     } catch (Throwable ex) {
       String errorMessage = "Result fetch timed out";
       LOG.error(errorMessage, ex);
diff --git 
a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/client/AsyncJobRunnerImpl.java
 
b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/client/AsyncJobRunnerImpl.java
index 7013f8a..7804103 100644
--- 
a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/client/AsyncJobRunnerImpl.java
+++ 
b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/client/AsyncJobRunnerImpl.java
@@ -48,6 +48,7 @@ import java.util.concurrent.TimeUnit;
 public class AsyncJobRunnerImpl implements AsyncJobRunner {
 
   private final Logger LOG = LoggerFactory.getLogger(getClass());
+  private static scala.concurrent.duration.FiniteDuration WAIT_TIME = 
Duration.create(12, TimeUnit.HOURS);
 
   private final ActorRef controller;
   private final ActorSystem system;
@@ -76,7 +77,7 @@ public class AsyncJobRunnerImpl implements AsyncJobRunner {
   public Optional<NonPersistentCursor> getCursor(String jobId, String 
username) {
     Inbox inbox = Inbox.create(system);
     inbox.send(controller, new FetchResult(jobId, username));
-    Object receive = inbox.receive(Duration.create(1, TimeUnit.MINUTES));
+    Object receive = inbox.receive(WAIT_TIME);
     if(receive instanceof ResultNotReady) {
       String errorString = "Result not ready for job: " + jobId + ", username: 
" + username + ". Try after sometime.";
       LOG.info(errorString);
@@ -98,7 +99,7 @@ public class AsyncJobRunnerImpl implements AsyncJobRunner {
   public Optional<NonPersistentCursor> resetAndGetCursor(String jobId, String 
username) {
     Inbox inbox = Inbox.create(system);
     inbox.send(controller, new FetchResult(jobId, username));
-    Object receive = inbox.receive(Duration.create(1, TimeUnit.MINUTES));
+    Object receive = inbox.receive(WAIT_TIME);
     if(receive instanceof ResultNotReady) {
       String errorString = "Result not ready for job: " + jobId + ", username: 
" + username + ". Try after sometime.";
       LOG.info(errorString);
@@ -110,7 +111,7 @@ public class AsyncJobRunnerImpl implements AsyncJobRunner {
       Optional<ActorRef> iterator = (Optional<ActorRef>) receive;
       if(iterator.isPresent()) {
         inbox.send(iterator.get(), new ResetCursor());
-        Object resetResult = inbox.receive(Duration.create(1, 
TimeUnit.MINUTES));
+        Object resetResult = inbox.receive(WAIT_TIME);
         if (resetResult instanceof CursorReset) {
           return Optional.of(new NonPersistentCursor(context, system, 
iterator.get()));
         } else {
@@ -126,7 +127,7 @@ public class AsyncJobRunnerImpl implements AsyncJobRunner {
   public Optional<Failure> getError(String jobId, String username) {
     Inbox inbox = Inbox.create(system);
     inbox.send(controller, new FetchError(jobId, username));
-    Object receive = inbox.receive(Duration.create(1, TimeUnit.MINUTES));
+    Object receive = inbox.receive(WAIT_TIME);
     if(receive instanceof FetchFailed){
       FetchFailed fetchFailed = (FetchFailed) receive;
       return Optional.of(new Failure(fetchFailed.getMessage(), 
getExceptionForRetry()));
diff --git 
a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/client/NonPersistentCursor.java
 
b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/client/NonPersistentCursor.java
index 80ffe79..c316579 100644
--- 
a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/client/NonPersistentCursor.java
+++ 
b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/client/NonPersistentCursor.java
@@ -45,7 +45,7 @@ import java.util.concurrent.TimeUnit;
  */
 public class NonPersistentCursor implements Cursor<Row, ColumnDescription> {
   private final Logger LOG = LoggerFactory.getLogger(getClass());
-  private static long DEFAULT_WAIT_TIMEOUT = 60 * 1000L;
+  private static scala.concurrent.duration.FiniteDuration WAIT_TIME = 
Duration.create(12, TimeUnit.HOURS);
 
   private final ActorSystem system;
   private final ActorRef actorRef;
@@ -124,8 +124,7 @@ public class NonPersistentCursor implements Cursor<Row, 
ColumnDescription> {
     inbox.send(actorRef, new Next());
     Object receive;
     try {
-      receive = 
inbox.receive(Duration.create(actorConfiguration.getResultFetchTimeout(DEFAULT_WAIT_TIMEOUT),
-        TimeUnit.MILLISECONDS));
+      receive = inbox.receive(WAIT_TIME);
     } catch (Throwable ex) {
       String errorMessage = "Result fetch timed out";
       LOG.error(errorMessage, ex);

Reply via email to