Repository: lens
Updated Branches:
  refs/heads/master 92456effb -> 1cff38768


LENS-1164: Query getting stuck in queue for inmemory queries


Project: http://git-wip-us.apache.org/repos/asf/lens/repo
Commit: http://git-wip-us.apache.org/repos/asf/lens/commit/1cff3876
Tree: http://git-wip-us.apache.org/repos/asf/lens/tree/1cff3876
Diff: http://git-wip-us.apache.org/repos/asf/lens/diff/1cff3876

Branch: refs/heads/master
Commit: 1cff38768f7999610b4d8ee3b636f9b4a337b076
Parents: 92456ef
Author: Sushil Mohanty <sushil.k.moha...@gmail.com>
Authored: Tue Jun 14 17:50:39 2016 +0530
Committer: Rajat Khandelwal <rajatgupt...@gmail.com>
Committed: Tue Jun 14 17:50:39 2016 +0530

----------------------------------------------------------------------
 .../lens/server/api/query/QueryContext.java     |  4 ++
 .../server/query/QueryExecutionServiceImpl.java | 74 ++++++++++++--------
 .../lens/server/query/TestQueryConstraints.java | 28 ++++++++
 3 files changed, 75 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lens/blob/1cff3876/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java
----------------------------------------------------------------------
diff --git 
a/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java
 
b/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java
index aebb395..8ba0689 100644
--- 
a/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java
+++ 
b/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java
@@ -160,6 +160,10 @@ public class QueryContext extends AbstractQueryContext {
   @Setter
   private boolean finishedQueryPersisted = false;
 
+  @Getter
+  @Setter
+  private boolean isQueryClosedOnDriver = false;
+
   /**
    * The query name.
    */

http://git-wip-us.apache.org/repos/asf/lens/blob/1cff3876/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
 
b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
index 39984d4..1f6ec13 100644
--- 
a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
+++ 
b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
@@ -26,6 +26,7 @@ import static 
org.apache.lens.server.session.LensSessionImpl.ResourceEntry;
 import java.io.*;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.sql.SQLException;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -1040,43 +1041,26 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
             finished = iter.next();
             if (finished.canBePurged()) {
               try {
-                FinishedLensQuery finishedQuery = new 
FinishedLensQuery(finished.getCtx());
-                if (finished.ctx.getStatus().getStatus() == SUCCESSFUL) {
-                  if (finished.ctx.getStatus().isResultSetAvailable()) {
-                    try {
-                      LensResultSet set = finished.getResultset();
-                      if (set != null && 
PersistentResultSet.class.isAssignableFrom(set.getClass())) {
-                        LensResultSetMetadata metadata = set.getMetadata();
-                        String outputPath = set.getOutputPath();
-                        Long fileSize = ((PersistentResultSet) 
set).getFileSize();
-                        Integer rows = set.size();
-                        finishedQuery.setResult(outputPath);
-                        finishedQuery.setMetadata(metadata.toJson());
-                        finishedQuery.setRows(rows);
-                        finishedQuery.setFileSize(fileSize);
-                      }
-                    } catch (Exception e) {
-                      log.error("Couldn't obtain result set info for the 
query: {}. Going ahead with purge",
-                        finished.getQueryHandle(), e);
-                    }
-                  }
-                }
-                lensServerDao.insertFinishedQuery(finishedQuery);
-                log.info("Saved query {} to DB", finishedQuery.getHandle());
+                persistQuery(finished);
                 iter.remove();
               } catch (Exception e) {
                 log.warn("Exception while purging query {}", 
finished.getQueryHandle(), e);
                 continue;
+              } finally {
+                if (!finished.getCtx().isQueryClosedOnDriver()) {
+                  try {
+                    if (finished.getCtx().getSelectedDriver() != null) {
+                      
finished.getCtx().getSelectedDriver().closeQuery(finished.getQueryHandle());
+                    }
+                    finished.getCtx().setQueryClosedOnDriver(true);
+                  } catch (Exception e) {
+                    log.warn("Exception while closing query with selected 
driver.", e);
+                  }
+                  processWaitingQueriesAsync(finished.ctx);
+                }
               }
               synchronized (finished.ctx) {
                 finished.ctx.setFinishedQueryPersisted(true);
-                try {
-                  if (finished.getCtx().getSelectedDriver() != null) {
-                    
finished.getCtx().getSelectedDriver().closeQuery(finished.getQueryHandle());
-                  }
-                } catch (Exception e) {
-                  log.warn("Exception while closing query with selected 
driver.", e);
-                }
                 log.info("Purging: {}", finished.getQueryHandle());
                 allQueries.remove(finished.getQueryHandle());
                 resultSets.remove(finished.getQueryHandle());
@@ -1097,6 +1081,32 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
       }
       log.info("QueryPurger exited");
     }
+
+    private void persistQuery(FinishedQuery finished) throws SQLException {
+      FinishedLensQuery finishedQuery = new 
FinishedLensQuery(finished.getCtx());
+      if (finished.ctx.getStatus().getStatus() == SUCCESSFUL) {
+        if (finished.ctx.getStatus().isResultSetAvailable()) {
+          try {
+            LensResultSet set = finished.getResultset();
+            if (set != null && 
PersistentResultSet.class.isAssignableFrom(set.getClass())) {
+              LensResultSetMetadata metadata = set.getMetadata();
+              String outputPath = set.getOutputPath();
+              Long fileSize = ((PersistentResultSet) set).getFileSize();
+              Integer rows = set.size();
+              finishedQuery.setResult(outputPath);
+              finishedQuery.setMetadata(metadata.toJson());
+              finishedQuery.setRows(rows);
+              finishedQuery.setFileSize(fileSize);
+            }
+          } catch (Exception e) {
+            log.error("Couldn't obtain result set info for the query: {}. 
Going ahead with perstsiting the query",
+              finished.getQueryHandle(), e);
+          }
+        }
+      }
+      lensServerDao.insertFinishedQuery(finishedQuery);
+      log.info("Saved query {} to DB", finishedQuery.getHandle());
+    }
   }
 
   /**
@@ -1684,6 +1694,7 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
    */
   LensResultSet getResultset(QueryHandle queryHandle) throws LensException {
     QueryContext ctx = allQueries.get(queryHandle);
+
     if (ctx == null) {
       return getResultsetFromDAO(queryHandle);
     } else {
@@ -1696,7 +1707,8 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
           if (resultSet == null) {
             if (ctx.isPersistent() && ctx.getQueryOutputFormatter() != null) {
               resultSets.put(queryHandle, new LensPersistentResult(ctx, conf));
-            } else if 
(allQueries.get(queryHandle).isResultAvailableInDriver()) {
+            } else if (ctx.isResultAvailableInDriver() && 
!ctx.isQueryClosedOnDriver()) {
+              //InMemory result can not be returned for a closed query
               resultSet = getDriverResultset(queryHandle);
               resultSets.put(queryHandle, resultSet);
             }

http://git-wip-us.apache.org/repos/asf/lens/blob/1cff3876/lens-server/src/test/java/org/apache/lens/server/query/TestQueryConstraints.java
----------------------------------------------------------------------
diff --git 
a/lens-server/src/test/java/org/apache/lens/server/query/TestQueryConstraints.java
 
b/lens-server/src/test/java/org/apache/lens/server/query/TestQueryConstraints.java
index 6ed3770..0df436a 100644
--- 
a/lens-server/src/test/java/org/apache/lens/server/query/TestQueryConstraints.java
+++ 
b/lens-server/src/test/java/org/apache/lens/server/query/TestQueryConstraints.java
@@ -27,6 +27,7 @@ import java.util.*;
 import javax.ws.rs.core.Application;
 import javax.ws.rs.core.MediaType;
 
+import org.apache.lens.api.LensConf;
 import org.apache.lens.api.LensSessionHandle;
 import org.apache.lens.api.query.QueryHandle;
 import org.apache.lens.driver.hive.HiveDriver;
@@ -37,6 +38,7 @@ import org.apache.lens.server.api.LensConfConstants;
 import org.apache.lens.server.api.LensServerAPITestUtil;
 import org.apache.lens.server.api.driver.DriverSelector;
 import org.apache.lens.server.api.driver.LensDriver;
+import org.apache.lens.server.api.error.LensException;
 import org.apache.lens.server.api.metrics.MetricsService;
 import org.apache.lens.server.api.query.AbstractQueryContext;
 import org.apache.lens.server.api.query.QueryExecutionService;
@@ -195,6 +197,22 @@ public class TestQueryConstraints extends LensJerseyTest {
     }
   }
 
+  @Test(dataProvider = "mediaTypeData")
+  public void testInmemoryQueryThrottling(MediaType mt) throws 
InterruptedException, LensException {
+    List<QueryHandle> handles = Lists.newArrayList();
+    for (int j = 0; j < 5; j++) {
+      for (int i = 0; i < 10; i++) {
+        handles.add(launchInmemoryQuery(mt));
+        assertValidity();
+      }
+    }
+    for (QueryHandle handle : handles) {
+      RestAPITestUtil.waitForQueryToFinish(target(), lensSessionId, handle, 
mt);
+      queryService.closeResultSet(lensSessionId, handle);
+      assertValidity();
+    }
+  }
+
   private void assertValidity() {
     QueryExecutionServiceImpl.QueryCount count = 
queryService.getQueryCountSnapshot();
     assertTrue(count.running <= 4, System.currentTimeMillis() + " " + 
count.running + " running queries: "
@@ -207,6 +225,16 @@ public class TestQueryConstraints extends LensJerseyTest {
       
Optional.of(LensServerAPITestUtil.getLensConf(QUERY_METRIC_UNIQUE_ID_CONF_KEY, 
UUID.randomUUID())), mt);
   }
 
+  private QueryHandle launchInmemoryQuery(MediaType mt) {
+    LensConf conf = 
LensServerAPITestUtil.getLensConf(QUERY_METRIC_UNIQUE_ID_CONF_KEY, 
UUID.randomUUID());
+    conf.addProperty(LensConfConstants.QUERY_PERSISTENT_RESULT_SET, "false");
+    conf.addProperty(LensConfConstants.QUERY_PERSISTENT_RESULT_INDRIVER, 
"false");
+    conf.addProperty(LensConfConstants.INMEMORY_RESULT_SET_TTL_SECS, 600);
+    return RestAPITestUtil.executeAndGetHandle(target(), 
Optional.of(lensSessionId),
+        Optional.of("select ID from " + TEST_TABLE),
+        Optional.of(conf), mt);
+  }
+
   @AfterMethod
   private void waitForPurge() throws InterruptedException {
     waitForPurge(0, queryService.finishedQueries);

Reply via email to