Repository: curator Updated Branches: refs/heads/CURATOR-88 8f6edd706 -> ce456d2df
expiration should now be on a pre-object basis Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/ce456d2d Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/ce456d2d Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/ce456d2d Branch: refs/heads/CURATOR-88 Commit: ce456d2df24c9c64108d1c40e111b66a7e00e3ca Parents: 8f6edd7 Author: randgalt <randg...@apache.org> Authored: Sat Feb 15 19:39:38 2014 -0500 Committer: randgalt <randg...@apache.org> Committed: Sat Feb 15 19:39:38 2014 -0500 ---------------------------------------------------------------------- .../curator/x/rest/CuratorRestContext.java | 38 +------------- .../curator/x/rest/api/ClientResource.java | 24 ++++----- .../apache/curator/x/rest/api/Constants.java | 2 +- .../org/apache/curator/x/rest/api/Session.java | 55 +++++++++++++------- 4 files changed, 50 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/ce456d2d/curator-x-rest/src/main/java/org/apache/curator/x/rest/CuratorRestContext.java ---------------------------------------------------------------------- diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/CuratorRestContext.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/CuratorRestContext.java index cca2e89..a07197e 100644 --- a/curator-x-rest/src/main/java/org/apache/curator/x/rest/CuratorRestContext.java +++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/CuratorRestContext.java @@ -20,14 +20,10 @@ package org.apache.curator.x.rest; import com.google.common.base.Preconditions; import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.state.ConnectionState; -import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.utils.ThreadUtils; import org.apache.curator.x.rest.api.Session; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.ObjectWriter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.Closeable; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -35,7 +31,6 @@ import java.util.concurrent.atomic.AtomicReference; public class CuratorRestContext implements Closeable { - private final Logger log = LoggerFactory.getLogger(getClass()); private final Session session = new Session(); private final ObjectMapper mapper = new ObjectMapper(); private final ObjectWriter writer = mapper.writer(); @@ -43,17 +38,6 @@ public class CuratorRestContext implements Closeable private final int sessionLengthMs; private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT); private final ScheduledExecutorService executorService = ThreadUtils.newSingleThreadScheduledExecutor("CuratorRestContext"); - private final ConnectionStateListener connectionStateListener = new ConnectionStateListener() - { - @Override - public void stateChanged(CuratorFramework client, ConnectionState newState) - { - if ( newState == ConnectionState.LOST ) - { - handleLostConnection(); - } - } - }; private enum State { @@ -77,7 +61,6 @@ public class CuratorRestContext implements Closeable public Session getSession() { Preconditions.checkState(state.get() == State.STARTED, "Not started"); - session.updateLastUse(); return session; } @@ -85,35 +68,22 @@ public class CuratorRestContext implements Closeable { Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Already started"); - client.getConnectionStateListenable().addListener(connectionStateListener); - Runnable runner = new Runnable() { @Override public void run() { - checkSession(); + session.checkExpiredThings(sessionLengthMs); } }; executorService.scheduleAtFixedRate(runner, sessionLengthMs, sessionLengthMs, TimeUnit.MILLISECONDS); } - private void checkSession() - { - long elapsedSinceLastUse = System.currentTimeMillis() - session.getLastUseMs(); - if ( elapsedSinceLastUse > sessionLengthMs ) - { - log.warn("Session has expired. Closing all open recipes. Milliseconds since last ping: " + elapsedSinceLastUse); - session.closeThings(); - } - } - @Override public void close() { if ( state.compareAndSet(State.STARTED, State.CLOSED) ) { - client.getConnectionStateListenable().removeListener(connectionStateListener); executorService.shutdownNow(); session.close(); } @@ -128,10 +98,4 @@ public class CuratorRestContext implements Closeable { return writer; } - - private void handleLostConnection() - { - log.warn("Connection lost - closing all REST sessions"); - session.closeThings(); - } } http://git-wip-us.apache.org/repos/asf/curator/blob/ce456d2d/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/ClientResource.java ---------------------------------------------------------------------- diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/ClientResource.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/ClientResource.java index 94009f0..26708cf 100644 --- a/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/ClientResource.java +++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/ClientResource.java @@ -34,6 +34,7 @@ import javax.ws.rs.GET; import javax.ws.rs.POST; import javax.ws.rs.PUT; import javax.ws.rs.Path; +import javax.ws.rs.PathParam; import javax.ws.rs.Produces; import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.Context; @@ -64,14 +65,23 @@ public class ClientResource return Response.ok(context.getWriter().writeValueAsString(node)).build(); } + @GET + @Path("/touch/{id}") + public Response touchThing(@PathParam("id") String id) + { + if ( !context.getSession().updateThingLastUse(id) ) + { + return Response.status(Response.Status.NOT_FOUND).build(); + } + return Response.ok().build(); + } + @POST @Consumes(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON) @Path("/get-children") public Response getChildren(final GetChildrenSpec getChildrenSpec) throws Exception { - context.getSession(); // update last use - Object builder = context.getClient().getChildren(); if ( getChildrenSpec.isWatched() ) { @@ -110,8 +120,6 @@ public class ClientResource @Path("/delete") public Response delete(final DeleteSpec deleteSpec) throws Exception { - context.getSession(); // update last use - Object builder = context.getClient().delete(); if ( deleteSpec.isGuaranteed() ) { @@ -135,8 +143,6 @@ public class ClientResource @Path("/set-data") public Response setData(final SetDataSpec setDataSpec) throws Exception { - context.getSession(); // update last use - Object builder = context.getClient().setData(); if ( setDataSpec.isCompressed() ) { @@ -164,8 +170,6 @@ public class ClientResource @Path("/create") public Response create(final CreateSpec createSpec) throws Exception { - context.getSession(); // update last use - Object builder = context.getClient().create(); if ( createSpec.isCreatingParentsIfNeeded() ) { @@ -200,8 +204,6 @@ public class ClientResource @Path("/get-data") public Response getData(final GetDataSpec getDataSpec) throws Exception { - context.getSession(); // update last use - Object builder = context.getClient().getData(); if ( getDataSpec.isWatched() ) { @@ -243,8 +245,6 @@ public class ClientResource @Path("/exists") public Response exists(final ExistsSpec existsSpec) throws Exception { - context.getSession(); // update last use - Object builder = context.getClient().checkExists(); if ( existsSpec.isWatched() ) { http://git-wip-us.apache.org/repos/asf/curator/blob/ce456d2d/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/Constants.java ---------------------------------------------------------------------- diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/Constants.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/Constants.java index b12d1b1..5ee1982 100644 --- a/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/Constants.java +++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/Constants.java @@ -38,7 +38,7 @@ class Constants static final String PATH_CACHE = "path-cache"; static final String NODE_CACHE = "node-cache"; static final String LEADER = "leader"; - static final String CLOSING = "closing"; + static final String EXPIRED = "expired"; static ObjectNode makeIdNode(CuratorRestContext context, String id) { http://git-wip-us.apache.org/repos/asf/curator/blob/ce456d2d/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/Session.java ---------------------------------------------------------------------- diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/Session.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/Session.java index 2d2cd8a..16478ce 100644 --- a/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/Session.java +++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/api/Session.java @@ -35,13 +35,13 @@ public class Session implements Closeable { private final Logger log = LoggerFactory.getLogger(getClass()); private final Map<String, Entry> things = Maps.newConcurrentMap(); - private final AtomicLong lastUseMs = new AtomicLong(System.currentTimeMillis()); private final BlockingQueue<StatusMessage> messages = Queues.newLinkedBlockingQueue(); private static class Entry { final Object thing; final Closer closer; + final AtomicLong lastUseMs = new AtomicLong(System.currentTimeMillis()); private Entry(Object thing, Closer closer) { @@ -50,26 +50,9 @@ public class Session implements Closeable } } - public void updateLastUse() - { - lastUseMs.set(System.currentTimeMillis()); - } - - public long getLastUseMs() - { - return lastUseMs.get(); - } - @Override - public void close() + public synchronized void close() { - closeThings(); - } - - public void closeThings() - { - pushMessage(new StatusMessage(Constants.CLOSING, "", "", "")); - for ( Map.Entry<String, Entry> mapEntry : things.entrySet() ) { Entry entry = mapEntry.getValue(); @@ -80,6 +63,29 @@ public class Session implements Closeable entry.closer.close(entry.thing); // lack of generics is safe because addThing() is type-safe } } + things.clear(); + } + + public synchronized void checkExpiredThings(long sessionLengthMs) + { + for ( Map.Entry<String, Entry> mapEntry : things.entrySet() ) + { + Entry entry = mapEntry.getValue(); + long elapsedSinceLastUse = System.currentTimeMillis() - entry.lastUseMs.get(); + if ( elapsedSinceLastUse > sessionLengthMs ) + { + String id = mapEntry.getKey(); + pushMessage(new StatusMessage(Constants.EXPIRED, id, "expired", entry.thing.getClass().getName())); + log.warn(String.format("Expiring object. Elapsed time: %d, id: %s, Class: %s", elapsedSinceLastUse, id, entry.thing.getClass().getName())); + + things.remove(id); + if ( entry.closer != null ) + { + //noinspection unchecked + entry.closer.close(entry.thing); // lack of generics is safe because addThing() is type-safe + } + } + } } void pushMessage(StatusMessage message) @@ -94,6 +100,17 @@ public class Session implements Closeable return localMessages; } + boolean updateThingLastUse(String id) + { + Entry entry = things.get(id); + if ( entry != null ) + { + entry.lastUseMs.set(System.currentTimeMillis()); + return true; + } + return false; + } + <T> String addThing(T thing, Closer<T> closer) { String id = Constants.newId();