Repository: lens
Updated Branches:
  refs/heads/master d1a463552 -> 4556773e3


LENS-1300 : Query submission to be rejected on sessions marked for close


Project: http://git-wip-us.apache.org/repos/asf/lens/repo
Commit: http://git-wip-us.apache.org/repos/asf/lens/commit/4556773e
Tree: http://git-wip-us.apache.org/repos/asf/lens/tree/4556773e
Diff: http://git-wip-us.apache.org/repos/asf/lens/diff/4556773e

Branch: refs/heads/master
Commit: 4556773e3e4e2d80c1c68afd87338a4fe2de0e6d
Parents: d1a4635
Author: Rajat Khandelwal <pro...@apache.org>
Authored: Sat Sep 17 11:43:29 2016 +0530
Committer: Amareshwari Sriramadasu <amareshw...@apache.org>
Committed: Sat Sep 17 11:43:29 2016 +0530

----------------------------------------------------------------------
 .../java/org/apache/lens/api/APIResult.java     |   3 +
 .../lens/server/api/session/SessionService.java |   7 ++
 .../org/apache/lens/server/BaseLensService.java |   8 +-
 .../lens/server/session/HiveSessionService.java |  15 ++-
 .../lens/server/session/LensSessionImpl.java    |   4 +-
 .../lens/server/session/SessionResource.java    |  20 +++-
 .../TestQueryIndependenceFromSessionClose.java  | 105 +++++++++++++------
 7 files changed, 121 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lens/blob/4556773e/lens-api/src/main/java/org/apache/lens/api/APIResult.java
----------------------------------------------------------------------
diff --git a/lens-api/src/main/java/org/apache/lens/api/APIResult.java 
b/lens-api/src/main/java/org/apache/lens/api/APIResult.java
index 26e9926..e147fec 100644
--- a/lens-api/src/main/java/org/apache/lens/api/APIResult.java
+++ b/lens-api/src/main/java/org/apache/lens/api/APIResult.java
@@ -98,6 +98,9 @@ public class APIResult extends ToYAMLString {
   public static APIResult success() {
     return SUCCESS;
   }
+  public static APIResult success(String message) {
+    return new APIResult(Status.SUCCEEDED, message);
+  }
 
   public static APIResult failure(Exception e) {
     String cause = extractCause(e);

http://git-wip-us.apache.org/repos/asf/lens/blob/4556773e/lens-server-api/src/main/java/org/apache/lens/server/api/session/SessionService.java
----------------------------------------------------------------------
diff --git 
a/lens-server-api/src/main/java/org/apache/lens/server/api/session/SessionService.java
 
b/lens-server-api/src/main/java/org/apache/lens/server/api/session/SessionService.java
index 80ce030..20ec686 100644
--- 
a/lens-server-api/src/main/java/org/apache/lens/server/api/session/SessionService.java
+++ 
b/lens-server-api/src/main/java/org/apache/lens/server/api/session/SessionService.java
@@ -67,6 +67,13 @@ public interface SessionService {
   void closeSession(LensSessionHandle sessionHandle) throws LensException;
 
   /**
+   * Close idle sessions.
+   *
+   * @throws LensException the lens exception
+   */
+
+  void cleanupIdleSessions() throws LensException;
+  /**
    * Adds the resource.
    *
    * @param sessionHandle the session handle

http://git-wip-us.apache.org/repos/asf/lens/blob/4556773e/lens-server/src/main/java/org/apache/lens/server/BaseLensService.java
----------------------------------------------------------------------
diff --git 
a/lens-server/src/main/java/org/apache/lens/server/BaseLensService.java 
b/lens-server/src/main/java/org/apache/lens/server/BaseLensService.java
index e0e4bf8..d7ce434 100644
--- a/lens-server/src/main/java/org/apache/lens/server/BaseLensService.java
+++ b/lens-server/src/main/java/org/apache/lens/server/BaseLensService.java
@@ -547,7 +547,13 @@ public abstract class BaseLensService extends 
CompositeService implements Extern
     if (handle == null) {
       throw new LensException(SESSION_ID_NOT_PROVIDED.getLensErrorInfo());
     }
-    if (!getSession(handle).isActive()) {
+    LensSessionImpl session;
+    try {
+      session = getSession(handle);
+    } catch (ClientErrorException e) {
+      throw new LensException(SESSION_CLOSED.getLensErrorInfo(), handle, e);
+    }
+    if (!session.isActive() || session.isMarkedForClose()) {
       throw new LensException(SESSION_CLOSED.getLensErrorInfo(), handle);
     }
   }

http://git-wip-us.apache.org/repos/asf/lens/blob/4556773e/lens-server/src/main/java/org/apache/lens/server/session/HiveSessionService.java
----------------------------------------------------------------------
diff --git 
a/lens-server/src/main/java/org/apache/lens/server/session/HiveSessionService.java
 
b/lens-server/src/main/java/org/apache/lens/server/session/HiveSessionService.java
index 8909098..21e2a62 100644
--- 
a/lens-server/src/main/java/org/apache/lens/server/session/HiveSessionService.java
+++ 
b/lens-server/src/main/java/org/apache/lens/server/session/HiveSessionService.java
@@ -22,9 +22,7 @@ import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.util.*;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 
 import javax.ws.rs.BadRequestException;
 import javax.ws.rs.ClientErrorException;
@@ -472,6 +470,17 @@ public class HiveSessionService extends BaseLensService 
implements SessionServic
     notifyEvent(new SessionClosed(System.currentTimeMillis(), sessionHandle));
   }
 
+  @Override
+  public void cleanupIdleSessions() throws LensException {
+    ScheduledFuture<?> schedule = 
sessionExpiryThread.schedule(sessionExpiryRunnable, 0, TimeUnit.MILLISECONDS);
+    // wait till completion
+    try {
+      schedule.get();
+    } catch (InterruptedException | ExecutionException e) {
+      throw new LensException(e);
+    }
+  }
+
   /**
    * Close a Lens server session
    * @param sessionHandle session handle

http://git-wip-us.apache.org/repos/asf/lens/blob/4556773e/lens-server/src/main/java/org/apache/lens/server/session/LensSessionImpl.java
----------------------------------------------------------------------
diff --git 
a/lens-server/src/main/java/org/apache/lens/server/session/LensSessionImpl.java 
b/lens-server/src/main/java/org/apache/lens/server/session/LensSessionImpl.java
index a6013e7..34c901c 100644
--- 
a/lens-server/src/main/java/org/apache/lens/server/session/LensSessionImpl.java
+++ 
b/lens-server/src/main/java/org/apache/lens/server/session/LensSessionImpl.java
@@ -283,7 +283,9 @@ public class LensSessionImpl extends HiveSessionImpl 
implements AutoCloseable {
     return System.currentTimeMillis() - lastAccessTime < sessionTimeout
       && (!persistInfo.markedForClose|| activeOperationsPresent());
   }
-
+  public boolean isMarkedForClose() {
+    return persistInfo.isMarkedForClose();
+  }
   public synchronized void setActive() {
     setLastAccessTime(System.currentTimeMillis());
   }

http://git-wip-us.apache.org/repos/asf/lens/blob/4556773e/lens-server/src/main/java/org/apache/lens/server/session/SessionResource.java
----------------------------------------------------------------------
diff --git 
a/lens-server/src/main/java/org/apache/lens/server/session/SessionResource.java 
b/lens-server/src/main/java/org/apache/lens/server/session/SessionResource.java
index a4e61c4..63eea63 100644
--- 
a/lens-server/src/main/java/org/apache/lens/server/session/SessionResource.java
+++ 
b/lens-server/src/main/java/org/apache/lens/server/session/SessionResource.java
@@ -263,13 +263,25 @@ public class SessionResource {
 
   /**
    * Returns a list of all sessions
-   * @return
    */
   @GET
   @Path("sessions")
   @Produces({ MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON, 
MediaType.TEXT_PLAIN })
-  public List<UserSessionInfo> getSession() {
-    List<UserSessionInfo> l = sessionService.getSessionInfo();
-    return l;
+  public List<UserSessionInfo> getSessionInfo() {
+    return sessionService.getSessionInfo();
+  }
+
+  /**
+   * Clears idle sessions. response will contain how many sessions were 
cleared.
+   * @throws LensException
+   */
+  @DELETE
+  @Path("sessions")
+  @Produces({ MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON, 
MediaType.TEXT_PLAIN })
+  public APIResult clearIdleSessions() throws LensException {
+    int before = getSessionInfo().size();
+    sessionService.cleanupIdleSessions();
+    int after = getSessionInfo().size();
+    return APIResult.success("cleared " + (after - before) + " idle sessions");
   }
 }

http://git-wip-us.apache.org/repos/asf/lens/blob/4556773e/lens-server/src/test/java/org/apache/lens/server/query/TestQueryIndependenceFromSessionClose.java
----------------------------------------------------------------------
diff --git 
a/lens-server/src/test/java/org/apache/lens/server/query/TestQueryIndependenceFromSessionClose.java
 
b/lens-server/src/test/java/org/apache/lens/server/query/TestQueryIndependenceFromSessionClose.java
index 202e4be..bf4577c 100644
--- 
a/lens-server/src/test/java/org/apache/lens/server/query/TestQueryIndependenceFromSessionClose.java
+++ 
b/lens-server/src/test/java/org/apache/lens/server/query/TestQueryIndependenceFromSessionClose.java
@@ -18,8 +18,13 @@
  */
 package org.apache.lens.server.query;
 
+import static org.apache.lens.server.api.LensConfConstants.*;
+
 import static org.testng.Assert.*;
 
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 
 import javax.ws.rs.core.Application;
@@ -31,18 +36,21 @@ import org.apache.lens.api.LensSessionHandle;
 import org.apache.lens.api.query.QueryHandle;
 import org.apache.lens.api.query.QueryStatus;
 import org.apache.lens.api.result.LensAPIResult;
+import org.apache.lens.api.session.UserSessionInfo;
 import org.apache.lens.driver.hive.HiveDriver;
 import org.apache.lens.server.LensJerseyTest;
 import org.apache.lens.server.LensServerTestUtil;
 import org.apache.lens.server.LensServices;
-import org.apache.lens.server.api.LensConfConstants;
+import org.apache.lens.server.api.LensServerAPITestUtil;
 import org.apache.lens.server.api.driver.LensDriver;
 import org.apache.lens.server.api.error.LensException;
 import org.apache.lens.server.api.query.QueryExecutionService;
+import org.apache.lens.server.api.session.SessionService;
 import org.apache.lens.server.api.util.LensUtil;
 import org.apache.lens.server.common.RestAPITestUtil;
 import org.apache.lens.server.common.TestResourceFile;
 import org.apache.lens.server.error.LensServerErrorCode;
+import org.apache.lens.server.session.HiveSessionService;
 
 import org.glassfish.jersey.test.TestProperties;
 import org.testng.annotations.*;
@@ -58,6 +66,7 @@ import lombok.extern.slf4j.Slf4j;
 public class TestQueryIndependenceFromSessionClose extends LensJerseyTest {
   /** The query service. */
   QueryExecutionServiceImpl queryService;
+  HiveSessionService sessionService;
 
   /** The lens session id. */
   LensSessionHandle lensSessionId;
@@ -77,6 +86,15 @@ public class TestQueryIndependenceFromSessionClose extends 
LensJerseyTest {
   public void tearDown() throws Exception {
     super.tearDown();
   }
+
+  private QueryExecutionServiceImpl getQueryService() {
+    return queryService = 
LensServices.get().getService(QueryExecutionService.NAME);
+  }
+
+  private SessionService getSessionService() {
+    return sessionService = LensServices.get().getService(SessionService.NAME);
+  }
+
   /*
    * (non-Javadoc)
    *
@@ -84,31 +102,27 @@ public class TestQueryIndependenceFromSessionClose extends 
LensJerseyTest {
    */
   @BeforeClass
   public void setUpClass() throws Exception {
-    queryService = LensServices.get().getService(QueryExecutionService.NAME);
     lensSessionId = getSession();
     createTable(TEST_TABLE);
     loadData(TEST_TABLE, TestResourceFile.TEST_DATA2_FILE.getValue());
-    conf = new LensConf();
-    conf.addProperty(LensConfConstants.QUERY_PERSISTENT_RESULT_SET, "true");
-    conf.addProperty(LensConfConstants.QUERY_PERSISTENT_RESULT_INDRIVER, 
"true");
-    conf.addProperty(LensConfConstants.QUERY_OUTPUT_FORMATTER,
-      TestQueryService.DeferredPersistentResultFormatter.class.getName());
-    conf.addProperty("deferPersistenceByMillis", 5000); // defer persistence 
for 5 secs
+    conf = LensServerAPITestUtil.getLensConf("deferPersistenceByMillis", 5000,
+      QUERY_PERSISTENT_RESULT_SET, true,
+      QUERY_PERSISTENT_RESULT_INDRIVER, true,
+      QUERY_OUTPUT_FORMATTER, 
TestQueryService.DeferredPersistentResultFormatter.class.getName());
   }
 
   @Override
   public Map<String, String> getServerConfOverWrites() {
     return 
LensUtil.getHashMap("lens.server.total.query.cost.ceiling.per.user", "1", 
"lens.server.drivers",
-      "hive:org.apache.lens.driver.hive.HiveDriver");
+      "hive:org.apache.lens.driver.hive.HiveDriver", MAX_SESSIONS_PER_USER, 
"1");
   }
 
   private LensSessionHandle getSession() throws LensException {
-    queryService = LensServices.get().getService(QueryExecutionService.NAME);
-    return queryService.openSession("foo", "bar", null);
+    return getSessionService().openSession("foo", "bar", null, null);
   }
 
   private void closeSession(LensSessionHandle session) throws LensException {
-    queryService.closeSession(session);
+    getSessionService().closeSession(session);
   }
 
   /*
@@ -119,8 +133,8 @@ public class TestQueryIndependenceFromSessionClose extends 
LensJerseyTest {
   @AfterClass
   public void tearDownClass() throws Exception {
     dropTable(TEST_TABLE);
-    queryService.closeSession(lensSessionId);
-    for (LensDriver driver : queryService.getDrivers()) {
+    getSessionService().closeSession(lensSessionId);
+    for (LensDriver driver : getQueryService().getDrivers()) {
       if (driver instanceof HiveDriver) {
         assertFalse(((HiveDriver) driver).hasLensSession(lensSessionId));
       }
@@ -132,7 +146,7 @@ public class TestQueryIndependenceFromSessionClose extends 
LensJerseyTest {
   private void customRestartLensServer() {
     queryService = null;
     super.restartLensServer(getServerConf(), false);
-    queryService = LensServices.get().getService(QueryExecutionService.NAME);
+    getQueryService();
   }
 
   /*
@@ -194,45 +208,72 @@ public class TestQueryIndependenceFromSessionClose 
extends LensJerseyTest {
   @Test(dataProvider = "restartDataProvider")
   public void testQueryAliveOnSessionClose(boolean restartBeforeFinish, 
boolean restartAfterFinish)
     throws LensException, InterruptedException {
+    int numSessions = getSessionsOfFoo().size();
     MediaType mt = MediaType.APPLICATION_XML_TYPE;
-    LensSessionHandle sesssionHandle = getSession();
+    LensSessionHandle sessionHandle = getSession();
     QueryHandle queryHandle1 = RestAPITestUtil.executeAndGetHandle(target(),
-      Optional.of(sesssionHandle), Optional.of("select * from " + TEST_TABLE), 
Optional.of(conf), mt);
+      Optional.of(sessionHandle), Optional.of("select * from " + TEST_TABLE), 
Optional.of(conf), mt);
     QueryHandle queryHandle2 = RestAPITestUtil.executeAndGetHandle(target(),
-      Optional.of(sesssionHandle), Optional.of("select *  from " + 
TEST_TABLE), Optional.of(conf), mt);
+      Optional.of(sessionHandle), Optional.of("select *  from " + TEST_TABLE), 
Optional.of(conf), mt);
     assertNotEquals(queryHandle1, queryHandle2);
     // Second query should be queued
-    
assertEquals(queryService.getQueryContext(queryHandle2).getStatus().getStatus(),
 QueryStatus.Status.QUEUED);
-    closeSession(sesssionHandle);
+    
assertEquals(getQueryService().getQueryContext(queryHandle2).getStatus().getStatus(),
 QueryStatus.Status.QUEUED);
+    closeSession(sessionHandle);
     // Session not 'truly' closed
-    assertNotNull(queryService.getSession(sesssionHandle));
+    assertNotNull(getQueryService().getSession(sessionHandle));
     // Just 'marked' for closing
-    
assertTrue(queryService.getSession(sesssionHandle).getLensSessionPersistInfo().isMarkedForClose());
+    
assertTrue(getQueryService().getSession(sessionHandle).getLensSessionPersistInfo().isMarkedForClose());
+    // Try submitting another query in this so called "inactive" session
+    Response response = RestAPITestUtil.postQuery(target(),
+      Optional.of(sessionHandle), Optional.of("select * from " + TEST_TABLE), 
Optional.of("execute"), mt);
+    assertEquals(response.getStatus(), 410);
+    LensAPIResult apiResult = response.readEntity(LensAPIResult.class);
+    assertEquals(apiResult.getErrorCode(), 2005);
+    // Should be able to open another session, since max session per user is 1 
and this session is closed
+    LensSessionHandle sessionHandle1 = getSession();
+    assertNotNull(sessionHandle1);
     if (restartBeforeFinish) {
       customRestartLensServer();
     }
-    
assertTrue(queryService.getSession(sesssionHandle).getLensSessionPersistInfo().isMarkedForClose());
-    assertTrue(queryService.getSession(sesssionHandle).isActive());
-    RestAPITestUtil.waitForQueryToFinish(target(), lensSessionId, 
queryHandle2, QueryStatus.Status.SUCCESSFUL, mt);
-    RestAPITestUtil.waitForQueryToFinish(target(), lensSessionId, 
queryHandle1, QueryStatus.Status.SUCCESSFUL, mt);
+    
assertTrue(getQueryService().getSession(sessionHandle).getLensSessionPersistInfo().isMarkedForClose());
+    assertTrue(getQueryService().getSession(sessionHandle).isActive());
+    for (QueryHandle handle : Arrays.asList(queryHandle2, queryHandle1)) {
+      RestAPITestUtil.waitForQueryToFinish(target(), lensSessionId, handle, 
QueryStatus.Status.SUCCESSFUL, mt);
+    }
     // Session should not be active
-    assertFalse(queryService.getSession(sesssionHandle).isActive());
+    assertFalse(getQueryService().getSession(sessionHandle).isActive());
     if (restartAfterFinish) {
       customRestartLensServer();
     }
-    
assertTrue(queryService.getSession(sesssionHandle).getLensSessionPersistInfo().isMarkedForClose());
+    
assertTrue(getQueryService().getSession(sessionHandle).getLensSessionPersistInfo().isMarkedForClose());
     // Now, session is not active anymore
-    assertFalse(queryService.getSession(sesssionHandle).isActive());
+    assertFalse(getQueryService().getSession(sessionHandle).isActive());
     // It should not be possible to submit queries now
-    Response response = RestAPITestUtil.postQuery(target(), 
Optional.of(sesssionHandle),
+    response = RestAPITestUtil.postQuery(target(), Optional.of(sessionHandle),
       Optional.of("select * from " + TEST_TABLE), Optional.of("execute"), 
Optional.of(conf), mt);
     assertEquals(response.getStatus(), 410);
-    LensAPIResult apiResult = response.readEntity(LensAPIResult.class);
+    apiResult = response.readEntity(LensAPIResult.class);
     assertEquals(apiResult.getErrorCode(), 
LensServerErrorCode.SESSION_CLOSED.getLensErrorInfo().getErrorCode());
+    getSessionService().cleanupIdleSessions();
+
+    assertTrue(getSessionsOfFoo().size() - numSessions <= 2);
+  }
+  private List<UserSessionInfo> getSessionsOfFoo() {
+    List<UserSessionInfo> sessions = getSessionService().getSessionInfo();
+    Iterator<UserSessionInfo> iter = sessions.iterator();
+    while (iter.hasNext()) {
+      UserSessionInfo session = iter.next();
+      assertNotEquals(session.getHandle(), lensSessionId.getPublicId(),
+        "session not cleaned up even after queries finished");
+      if (!session.getUserName().equals("foo")) {
+        iter.remove();
+      }
+    }
+    return sessions;
   }
 
   @AfterMethod
   private void waitForPurge() throws InterruptedException {
-    waitForPurge(0, queryService.finishedQueries);
+    waitForPurge(0, getQueryService().finishedQueries);
   }
 }

Reply via email to