Repository: lens Updated Branches: refs/heads/master d1a463552 -> 4556773e3
LENS-1300 : Query submission to be rejected on sessions marked for close Project: http://git-wip-us.apache.org/repos/asf/lens/repo Commit: http://git-wip-us.apache.org/repos/asf/lens/commit/4556773e Tree: http://git-wip-us.apache.org/repos/asf/lens/tree/4556773e Diff: http://git-wip-us.apache.org/repos/asf/lens/diff/4556773e Branch: refs/heads/master Commit: 4556773e3e4e2d80c1c68afd87338a4fe2de0e6d Parents: d1a4635 Author: Rajat Khandelwal <pro...@apache.org> Authored: Sat Sep 17 11:43:29 2016 +0530 Committer: Amareshwari Sriramadasu <amareshw...@apache.org> Committed: Sat Sep 17 11:43:29 2016 +0530 ---------------------------------------------------------------------- .../java/org/apache/lens/api/APIResult.java | 3 + .../lens/server/api/session/SessionService.java | 7 ++ .../org/apache/lens/server/BaseLensService.java | 8 +- .../lens/server/session/HiveSessionService.java | 15 ++- .../lens/server/session/LensSessionImpl.java | 4 +- .../lens/server/session/SessionResource.java | 20 +++- .../TestQueryIndependenceFromSessionClose.java | 105 +++++++++++++------ 7 files changed, 121 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/lens/blob/4556773e/lens-api/src/main/java/org/apache/lens/api/APIResult.java ---------------------------------------------------------------------- diff --git a/lens-api/src/main/java/org/apache/lens/api/APIResult.java b/lens-api/src/main/java/org/apache/lens/api/APIResult.java index 26e9926..e147fec 100644 --- a/lens-api/src/main/java/org/apache/lens/api/APIResult.java +++ b/lens-api/src/main/java/org/apache/lens/api/APIResult.java @@ -98,6 +98,9 @@ public class APIResult extends ToYAMLString { public static APIResult success() { return SUCCESS; } + public static APIResult success(String message) { + return new APIResult(Status.SUCCEEDED, message); + } public static APIResult failure(Exception e) { String cause = extractCause(e); http://git-wip-us.apache.org/repos/asf/lens/blob/4556773e/lens-server-api/src/main/java/org/apache/lens/server/api/session/SessionService.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/session/SessionService.java b/lens-server-api/src/main/java/org/apache/lens/server/api/session/SessionService.java index 80ce030..20ec686 100644 --- a/lens-server-api/src/main/java/org/apache/lens/server/api/session/SessionService.java +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/session/SessionService.java @@ -67,6 +67,13 @@ public interface SessionService { void closeSession(LensSessionHandle sessionHandle) throws LensException; /** + * Close idle sessions. + * + * @throws LensException the lens exception + */ + + void cleanupIdleSessions() throws LensException; + /** * Adds the resource. * * @param sessionHandle the session handle http://git-wip-us.apache.org/repos/asf/lens/blob/4556773e/lens-server/src/main/java/org/apache/lens/server/BaseLensService.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/BaseLensService.java b/lens-server/src/main/java/org/apache/lens/server/BaseLensService.java index e0e4bf8..d7ce434 100644 --- a/lens-server/src/main/java/org/apache/lens/server/BaseLensService.java +++ b/lens-server/src/main/java/org/apache/lens/server/BaseLensService.java @@ -547,7 +547,13 @@ public abstract class BaseLensService extends CompositeService implements Extern if (handle == null) { throw new LensException(SESSION_ID_NOT_PROVIDED.getLensErrorInfo()); } - if (!getSession(handle).isActive()) { + LensSessionImpl session; + try { + session = getSession(handle); + } catch (ClientErrorException e) { + throw new LensException(SESSION_CLOSED.getLensErrorInfo(), handle, e); + } + if (!session.isActive() || session.isMarkedForClose()) { throw new LensException(SESSION_CLOSED.getLensErrorInfo(), handle); } } http://git-wip-us.apache.org/repos/asf/lens/blob/4556773e/lens-server/src/main/java/org/apache/lens/server/session/HiveSessionService.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/session/HiveSessionService.java b/lens-server/src/main/java/org/apache/lens/server/session/HiveSessionService.java index 8909098..21e2a62 100644 --- a/lens-server/src/main/java/org/apache/lens/server/session/HiveSessionService.java +++ b/lens-server/src/main/java/org/apache/lens/server/session/HiveSessionService.java @@ -22,9 +22,7 @@ import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; import java.util.*; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import javax.ws.rs.BadRequestException; import javax.ws.rs.ClientErrorException; @@ -472,6 +470,17 @@ public class HiveSessionService extends BaseLensService implements SessionServic notifyEvent(new SessionClosed(System.currentTimeMillis(), sessionHandle)); } + @Override + public void cleanupIdleSessions() throws LensException { + ScheduledFuture<?> schedule = sessionExpiryThread.schedule(sessionExpiryRunnable, 0, TimeUnit.MILLISECONDS); + // wait till completion + try { + schedule.get(); + } catch (InterruptedException | ExecutionException e) { + throw new LensException(e); + } + } + /** * Close a Lens server session * @param sessionHandle session handle http://git-wip-us.apache.org/repos/asf/lens/blob/4556773e/lens-server/src/main/java/org/apache/lens/server/session/LensSessionImpl.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/session/LensSessionImpl.java b/lens-server/src/main/java/org/apache/lens/server/session/LensSessionImpl.java index a6013e7..34c901c 100644 --- a/lens-server/src/main/java/org/apache/lens/server/session/LensSessionImpl.java +++ b/lens-server/src/main/java/org/apache/lens/server/session/LensSessionImpl.java @@ -283,7 +283,9 @@ public class LensSessionImpl extends HiveSessionImpl implements AutoCloseable { return System.currentTimeMillis() - lastAccessTime < sessionTimeout && (!persistInfo.markedForClose|| activeOperationsPresent()); } - + public boolean isMarkedForClose() { + return persistInfo.isMarkedForClose(); + } public synchronized void setActive() { setLastAccessTime(System.currentTimeMillis()); } http://git-wip-us.apache.org/repos/asf/lens/blob/4556773e/lens-server/src/main/java/org/apache/lens/server/session/SessionResource.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/session/SessionResource.java b/lens-server/src/main/java/org/apache/lens/server/session/SessionResource.java index a4e61c4..63eea63 100644 --- a/lens-server/src/main/java/org/apache/lens/server/session/SessionResource.java +++ b/lens-server/src/main/java/org/apache/lens/server/session/SessionResource.java @@ -263,13 +263,25 @@ public class SessionResource { /** * Returns a list of all sessions - * @return */ @GET @Path("sessions") @Produces({ MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON, MediaType.TEXT_PLAIN }) - public List<UserSessionInfo> getSession() { - List<UserSessionInfo> l = sessionService.getSessionInfo(); - return l; + public List<UserSessionInfo> getSessionInfo() { + return sessionService.getSessionInfo(); + } + + /** + * Clears idle sessions. response will contain how many sessions were cleared. + * @throws LensException + */ + @DELETE + @Path("sessions") + @Produces({ MediaType.APPLICATION_XML, MediaType.APPLICATION_JSON, MediaType.TEXT_PLAIN }) + public APIResult clearIdleSessions() throws LensException { + int before = getSessionInfo().size(); + sessionService.cleanupIdleSessions(); + int after = getSessionInfo().size(); + return APIResult.success("cleared " + (after - before) + " idle sessions"); } } http://git-wip-us.apache.org/repos/asf/lens/blob/4556773e/lens-server/src/test/java/org/apache/lens/server/query/TestQueryIndependenceFromSessionClose.java ---------------------------------------------------------------------- diff --git a/lens-server/src/test/java/org/apache/lens/server/query/TestQueryIndependenceFromSessionClose.java b/lens-server/src/test/java/org/apache/lens/server/query/TestQueryIndependenceFromSessionClose.java index 202e4be..bf4577c 100644 --- a/lens-server/src/test/java/org/apache/lens/server/query/TestQueryIndependenceFromSessionClose.java +++ b/lens-server/src/test/java/org/apache/lens/server/query/TestQueryIndependenceFromSessionClose.java @@ -18,8 +18,13 @@ */ package org.apache.lens.server.query; +import static org.apache.lens.server.api.LensConfConstants.*; + import static org.testng.Assert.*; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; import java.util.Map; import javax.ws.rs.core.Application; @@ -31,18 +36,21 @@ import org.apache.lens.api.LensSessionHandle; import org.apache.lens.api.query.QueryHandle; import org.apache.lens.api.query.QueryStatus; import org.apache.lens.api.result.LensAPIResult; +import org.apache.lens.api.session.UserSessionInfo; import org.apache.lens.driver.hive.HiveDriver; import org.apache.lens.server.LensJerseyTest; import org.apache.lens.server.LensServerTestUtil; import org.apache.lens.server.LensServices; -import org.apache.lens.server.api.LensConfConstants; +import org.apache.lens.server.api.LensServerAPITestUtil; import org.apache.lens.server.api.driver.LensDriver; import org.apache.lens.server.api.error.LensException; import org.apache.lens.server.api.query.QueryExecutionService; +import org.apache.lens.server.api.session.SessionService; import org.apache.lens.server.api.util.LensUtil; import org.apache.lens.server.common.RestAPITestUtil; import org.apache.lens.server.common.TestResourceFile; import org.apache.lens.server.error.LensServerErrorCode; +import org.apache.lens.server.session.HiveSessionService; import org.glassfish.jersey.test.TestProperties; import org.testng.annotations.*; @@ -58,6 +66,7 @@ import lombok.extern.slf4j.Slf4j; public class TestQueryIndependenceFromSessionClose extends LensJerseyTest { /** The query service. */ QueryExecutionServiceImpl queryService; + HiveSessionService sessionService; /** The lens session id. */ LensSessionHandle lensSessionId; @@ -77,6 +86,15 @@ public class TestQueryIndependenceFromSessionClose extends LensJerseyTest { public void tearDown() throws Exception { super.tearDown(); } + + private QueryExecutionServiceImpl getQueryService() { + return queryService = LensServices.get().getService(QueryExecutionService.NAME); + } + + private SessionService getSessionService() { + return sessionService = LensServices.get().getService(SessionService.NAME); + } + /* * (non-Javadoc) * @@ -84,31 +102,27 @@ public class TestQueryIndependenceFromSessionClose extends LensJerseyTest { */ @BeforeClass public void setUpClass() throws Exception { - queryService = LensServices.get().getService(QueryExecutionService.NAME); lensSessionId = getSession(); createTable(TEST_TABLE); loadData(TEST_TABLE, TestResourceFile.TEST_DATA2_FILE.getValue()); - conf = new LensConf(); - conf.addProperty(LensConfConstants.QUERY_PERSISTENT_RESULT_SET, "true"); - conf.addProperty(LensConfConstants.QUERY_PERSISTENT_RESULT_INDRIVER, "true"); - conf.addProperty(LensConfConstants.QUERY_OUTPUT_FORMATTER, - TestQueryService.DeferredPersistentResultFormatter.class.getName()); - conf.addProperty("deferPersistenceByMillis", 5000); // defer persistence for 5 secs + conf = LensServerAPITestUtil.getLensConf("deferPersistenceByMillis", 5000, + QUERY_PERSISTENT_RESULT_SET, true, + QUERY_PERSISTENT_RESULT_INDRIVER, true, + QUERY_OUTPUT_FORMATTER, TestQueryService.DeferredPersistentResultFormatter.class.getName()); } @Override public Map<String, String> getServerConfOverWrites() { return LensUtil.getHashMap("lens.server.total.query.cost.ceiling.per.user", "1", "lens.server.drivers", - "hive:org.apache.lens.driver.hive.HiveDriver"); + "hive:org.apache.lens.driver.hive.HiveDriver", MAX_SESSIONS_PER_USER, "1"); } private LensSessionHandle getSession() throws LensException { - queryService = LensServices.get().getService(QueryExecutionService.NAME); - return queryService.openSession("foo", "bar", null); + return getSessionService().openSession("foo", "bar", null, null); } private void closeSession(LensSessionHandle session) throws LensException { - queryService.closeSession(session); + getSessionService().closeSession(session); } /* @@ -119,8 +133,8 @@ public class TestQueryIndependenceFromSessionClose extends LensJerseyTest { @AfterClass public void tearDownClass() throws Exception { dropTable(TEST_TABLE); - queryService.closeSession(lensSessionId); - for (LensDriver driver : queryService.getDrivers()) { + getSessionService().closeSession(lensSessionId); + for (LensDriver driver : getQueryService().getDrivers()) { if (driver instanceof HiveDriver) { assertFalse(((HiveDriver) driver).hasLensSession(lensSessionId)); } @@ -132,7 +146,7 @@ public class TestQueryIndependenceFromSessionClose extends LensJerseyTest { private void customRestartLensServer() { queryService = null; super.restartLensServer(getServerConf(), false); - queryService = LensServices.get().getService(QueryExecutionService.NAME); + getQueryService(); } /* @@ -194,45 +208,72 @@ public class TestQueryIndependenceFromSessionClose extends LensJerseyTest { @Test(dataProvider = "restartDataProvider") public void testQueryAliveOnSessionClose(boolean restartBeforeFinish, boolean restartAfterFinish) throws LensException, InterruptedException { + int numSessions = getSessionsOfFoo().size(); MediaType mt = MediaType.APPLICATION_XML_TYPE; - LensSessionHandle sesssionHandle = getSession(); + LensSessionHandle sessionHandle = getSession(); QueryHandle queryHandle1 = RestAPITestUtil.executeAndGetHandle(target(), - Optional.of(sesssionHandle), Optional.of("select * from " + TEST_TABLE), Optional.of(conf), mt); + Optional.of(sessionHandle), Optional.of("select * from " + TEST_TABLE), Optional.of(conf), mt); QueryHandle queryHandle2 = RestAPITestUtil.executeAndGetHandle(target(), - Optional.of(sesssionHandle), Optional.of("select * from " + TEST_TABLE), Optional.of(conf), mt); + Optional.of(sessionHandle), Optional.of("select * from " + TEST_TABLE), Optional.of(conf), mt); assertNotEquals(queryHandle1, queryHandle2); // Second query should be queued - assertEquals(queryService.getQueryContext(queryHandle2).getStatus().getStatus(), QueryStatus.Status.QUEUED); - closeSession(sesssionHandle); + assertEquals(getQueryService().getQueryContext(queryHandle2).getStatus().getStatus(), QueryStatus.Status.QUEUED); + closeSession(sessionHandle); // Session not 'truly' closed - assertNotNull(queryService.getSession(sesssionHandle)); + assertNotNull(getQueryService().getSession(sessionHandle)); // Just 'marked' for closing - assertTrue(queryService.getSession(sesssionHandle).getLensSessionPersistInfo().isMarkedForClose()); + assertTrue(getQueryService().getSession(sessionHandle).getLensSessionPersistInfo().isMarkedForClose()); + // Try submitting another query in this so called "inactive" session + Response response = RestAPITestUtil.postQuery(target(), + Optional.of(sessionHandle), Optional.of("select * from " + TEST_TABLE), Optional.of("execute"), mt); + assertEquals(response.getStatus(), 410); + LensAPIResult apiResult = response.readEntity(LensAPIResult.class); + assertEquals(apiResult.getErrorCode(), 2005); + // Should be able to open another session, since max session per user is 1 and this session is closed + LensSessionHandle sessionHandle1 = getSession(); + assertNotNull(sessionHandle1); if (restartBeforeFinish) { customRestartLensServer(); } - assertTrue(queryService.getSession(sesssionHandle).getLensSessionPersistInfo().isMarkedForClose()); - assertTrue(queryService.getSession(sesssionHandle).isActive()); - RestAPITestUtil.waitForQueryToFinish(target(), lensSessionId, queryHandle2, QueryStatus.Status.SUCCESSFUL, mt); - RestAPITestUtil.waitForQueryToFinish(target(), lensSessionId, queryHandle1, QueryStatus.Status.SUCCESSFUL, mt); + assertTrue(getQueryService().getSession(sessionHandle).getLensSessionPersistInfo().isMarkedForClose()); + assertTrue(getQueryService().getSession(sessionHandle).isActive()); + for (QueryHandle handle : Arrays.asList(queryHandle2, queryHandle1)) { + RestAPITestUtil.waitForQueryToFinish(target(), lensSessionId, handle, QueryStatus.Status.SUCCESSFUL, mt); + } // Session should not be active - assertFalse(queryService.getSession(sesssionHandle).isActive()); + assertFalse(getQueryService().getSession(sessionHandle).isActive()); if (restartAfterFinish) { customRestartLensServer(); } - assertTrue(queryService.getSession(sesssionHandle).getLensSessionPersistInfo().isMarkedForClose()); + assertTrue(getQueryService().getSession(sessionHandle).getLensSessionPersistInfo().isMarkedForClose()); // Now, session is not active anymore - assertFalse(queryService.getSession(sesssionHandle).isActive()); + assertFalse(getQueryService().getSession(sessionHandle).isActive()); // It should not be possible to submit queries now - Response response = RestAPITestUtil.postQuery(target(), Optional.of(sesssionHandle), + response = RestAPITestUtil.postQuery(target(), Optional.of(sessionHandle), Optional.of("select * from " + TEST_TABLE), Optional.of("execute"), Optional.of(conf), mt); assertEquals(response.getStatus(), 410); - LensAPIResult apiResult = response.readEntity(LensAPIResult.class); + apiResult = response.readEntity(LensAPIResult.class); assertEquals(apiResult.getErrorCode(), LensServerErrorCode.SESSION_CLOSED.getLensErrorInfo().getErrorCode()); + getSessionService().cleanupIdleSessions(); + + assertTrue(getSessionsOfFoo().size() - numSessions <= 2); + } + private List<UserSessionInfo> getSessionsOfFoo() { + List<UserSessionInfo> sessions = getSessionService().getSessionInfo(); + Iterator<UserSessionInfo> iter = sessions.iterator(); + while (iter.hasNext()) { + UserSessionInfo session = iter.next(); + assertNotEquals(session.getHandle(), lensSessionId.getPublicId(), + "session not cleaned up even after queries finished"); + if (!session.getUserName().equals("foo")) { + iter.remove(); + } + } + return sessions; } @AfterMethod private void waitForPurge() throws InterruptedException { - waitForPurge(0, queryService.finishedQueries); + waitForPurge(0, getQueryService().finishedQueries); } }