Repository: curator Updated Branches: refs/heads/CURATOR-70 [created] 896c7c1b3 refs/heads/CURATOR-82 [created] 20d8c066f refs/heads/CURATOR-88 656ecdedc -> 710d78d48 refs/heads/CURATOR-97 [created] bfdef2ddc refs/heads/CURATOR-97-OLDER-MUST-HAVE-BEEN-A-MISTAKE [created] e4cb66acb refs/heads/rest [created] 2fa263e94 refs/heads/websockets [created] a08f3c55d
WIP on a REST api Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/91fb3886 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/91fb3886 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/91fb3886 Branch: refs/heads/rest Commit: 91fb3886edba31e6077c21ca2f0073b8697d1399 Parents: 20d8c06 Author: randgalt <randg...@apache.org> Authored: Wed Jan 8 15:12:32 2014 -0500 Committer: randgalt <randg...@apache.org> Committed: Wed Jan 8 15:12:32 2014 -0500 ---------------------------------------------------------------------- curator-x-rest/pom.xml | 54 ++++++++ .../org/apache/curator/x/rest/ApiResource.java | 24 ++++ .../curator/x/rest/ConnectionResource.java | 113 ++++++++++++++++ .../curator/x/rest/LeaderRecipeResource.java | 132 +++++++++++++++++++ .../curator/x/rest/LockRecipeResource.java | 132 +++++++++++++++++++ .../x/rest/entity/LockRequestEntity.java | 60 +++++++++ .../curator/x/rest/system/Connection.java | 106 +++++++++++++++ .../x/rest/system/ConnectionsManager.java | 94 +++++++++++++ .../rest/system/CuratorFrameworkAllocator.java | 27 ++++ .../apache/curator/x/rest/system/ThingKey.java | 85 ++++++++++++ .../apache/curator/x/rest/system/ThingType.java | 77 +++++++++++ pom.xml | 1 + 12 files changed, 905 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/91fb3886/curator-x-rest/pom.xml ---------------------------------------------------------------------- diff --git a/curator-x-rest/pom.xml b/curator-x-rest/pom.xml new file mode 100644 index 0000000..8176fb5 --- /dev/null +++ b/curator-x-rest/pom.xml @@ -0,0 +1,54 @@ +<?xml version="1.0" encoding="UTF-8"?><!--~ + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, + ~ software distributed under the License is distributed on an + ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ~ KIND, either express or implied. See the License for the + ~ specific language governing permissions and limitations + ~ under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>apache-curator</artifactId> + <groupId>org.apache.curator</groupId> + <version>2.3.2-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>curator-x-rest</artifactId> + <version>2.3.2-SNAPSHOT</version> + + <properties> + <jersey.version>2.5.1</jersey.version> + <osgi.import.package> + * + </osgi.import.package> + <osgi.export.package> + org.apache.curator.x.discovery.server*;version="${project.version}";-noimport:=true + </osgi.export.package> + </properties> + + <dependencies> + <dependency> + <groupId>org.glassfish.jersey.core</groupId> + <artifactId>jersey-server</artifactId> + <version>${jersey.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-recipes</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/curator/blob/91fb3886/curator-x-rest/src/main/java/org/apache/curator/x/rest/ApiResource.java ---------------------------------------------------------------------- diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/ApiResource.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/ApiResource.java new file mode 100644 index 0000000..db8bb82 --- /dev/null +++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/ApiResource.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.curator.x.rest; + +public class ApiResource +{ +} http://git-wip-us.apache.org/repos/asf/curator/blob/91fb3886/curator-x-rest/src/main/java/org/apache/curator/x/rest/ConnectionResource.java ---------------------------------------------------------------------- diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/ConnectionResource.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/ConnectionResource.java new file mode 100644 index 0000000..88fd202 --- /dev/null +++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/ConnectionResource.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.curator.x.rest; + +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.x.rest.system.Connection; +import org.apache.curator.x.rest.system.ConnectionsManager; +import org.apache.curator.x.rest.system.ThingKey; +import org.apache.curator.x.rest.system.ThingType; +import javax.ws.rs.DELETE; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.container.AsyncResponse; +import javax.ws.rs.container.Suspended; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.ext.ContextResolver; +import java.util.concurrent.Future; + +@Path("zookeeper/connection") +public class ConnectionResource +{ + private final ConnectionsManager connectionsManager; + + public ConnectionResource(@Context ContextResolver<ConnectionsManager> contextResolver) + { + connectionsManager = contextResolver.getContext(ConnectionsManager.class); + } + + @POST + @Path("{id}/connection-state-change") + public void registerConnectionStateChange(@Suspended final AsyncResponse asyncResponse, @PathParam("id") String id) + { + final Connection connection = connectionsManager.get(id); + if ( connection == null ) + { + asyncResponse.resume(Response.status(Response.Status.NOT_FOUND).build()); + return; + } + + Future<?> future = connectionsManager.getExecutorService().submit(new Runnable() + { + @Override + public void run() + { + try + { + ConnectionState state = connection.blockingPopStateChange(); + asyncResponse.resume(Response.ok(state.name()).build()); + } + catch ( InterruptedException e ) + { + Thread.currentThread().interrupt(); + asyncResponse.resume(Response.status(Response.Status.SERVICE_UNAVAILABLE).build()); + } + } + }); + connection.putThing(new ThingKey<Future>(ThingType.FUTURE), future); + } + + @GET + @Produces(MediaType.APPLICATION_JSON) + @Path("{id}") + public Response getState(@PathParam("id") String id) + { + Connection connection = connectionsManager.get(id); + if ( connection == null ) + { + return Response.status(Response.Status.NOT_FOUND).build(); + } + + return Response.ok(connection.getClient().getState().name()).build(); + } + + @POST + @Produces(MediaType.APPLICATION_JSON) + public String newConnection() throws Exception + { + return connectionsManager.newConnection(); + } + + @DELETE + @Path("{id}") + public Response closeConnection(@PathParam("id") String id) + { + if ( connectionsManager.closeConnection(id) ) + { + return Response.ok().build(); + } + return Response.status(Response.Status.NOT_FOUND).build(); + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/91fb3886/curator-x-rest/src/main/java/org/apache/curator/x/rest/LeaderRecipeResource.java ---------------------------------------------------------------------- diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/LeaderRecipeResource.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/LeaderRecipeResource.java new file mode 100644 index 0000000..868fb33 --- /dev/null +++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/LeaderRecipeResource.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.curator.x.rest; + +import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex; +import org.apache.curator.x.rest.entity.LockRequestEntity; +import org.apache.curator.x.rest.system.Connection; +import org.apache.curator.x.rest.system.ConnectionsManager; +import org.apache.curator.x.rest.system.ThingKey; +import org.apache.curator.x.rest.system.ThingType; +import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.container.AsyncResponse; +import javax.ws.rs.container.Suspended; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.ext.ContextResolver; +import java.util.concurrent.TimeUnit; + +@Path("zookeeper/recipes/leader/{connectionId}") +public class LeaderRecipeResource +{ + private final ConnectionsManager connectionsManager; + + public LeaderRecipeResource(@Context ContextResolver<ConnectionsManager> contextResolver) + { + connectionsManager = contextResolver.getContext(ConnectionsManager.class); + } + + @POST + @Path("{path:.*}") + @Produces(MediaType.APPLICATION_JSON) + public Response reEntrantLockAllocate(@PathParam("connectionId") String connectionId, @PathParam("path") String path) throws Exception + { + Connection connection = connectionsManager.get(connectionId); + if ( connection == null ) + { + return Response.status(Response.Status.NOT_FOUND).build(); + } + + InterProcessSemaphoreMutex mutex = new InterProcessSemaphoreMutex(connection.getClient(), path); + ThingKey<InterProcessSemaphoreMutex> key = new ThingKey<InterProcessSemaphoreMutex>(ThingType.MUTEX); + connection.putThing(key, mutex); + + return Response.ok(key.getId()).build(); + } + + @DELETE + @Path("{id}") + public Response reEntrantLockDelete(@PathParam("connectionId") String connectionId, @PathParam("id") String lockId) throws Exception + { + Connection connection = connectionsManager.get(connectionId); + if ( connection == null ) + { + return Response.status(Response.Status.NOT_FOUND).build(); + } + + InterProcessSemaphoreMutex mutex = connection.removeThing(new ThingKey<InterProcessSemaphoreMutex>(lockId, ThingType.MUTEX)); + if ( mutex == null ) + { + return Response.status(Response.Status.NOT_FOUND).build(); + } + + if ( mutex.isAcquiredInThisProcess() ) + { + mutex.release(); + } + + return Response.ok().build(); + } + + @POST + @Consumes(MediaType.APPLICATION_JSON) + public void reEntrantLockAcquire(@Suspended final AsyncResponse asyncResponse, @PathParam("connectionId") String connectionId, final LockRequestEntity lockRequest) throws Exception + { + Connection connection = connectionsManager.get(connectionId); + if ( connection == null ) + { + asyncResponse.resume(Response.status(Response.Status.NOT_FOUND).build()); + return; + } + + final InterProcessSemaphoreMutex mutex = connection.getThing(new ThingKey<InterProcessSemaphoreMutex>(lockRequest.getLockId(), ThingType.MUTEX)); + if ( mutex == null ) + { + asyncResponse.resume(Response.status(Response.Status.NOT_FOUND).build()); + return; + } + + connectionsManager.getExecutorService().submit + ( + new Runnable() + { + @Override + public void run() + { + try + { + boolean success = mutex.acquire(lockRequest.getMaxWaitMs(), TimeUnit.MILLISECONDS); + asyncResponse.resume(Response.status(success ? Response.Status.OK : Response.Status.REQUEST_TIMEOUT).build()); + } + catch ( Exception e ) + { + asyncResponse.resume(e); + } + } + } + ); + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/91fb3886/curator-x-rest/src/main/java/org/apache/curator/x/rest/LockRecipeResource.java ---------------------------------------------------------------------- diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/LockRecipeResource.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/LockRecipeResource.java new file mode 100644 index 0000000..ed7a572 --- /dev/null +++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/LockRecipeResource.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.curator.x.rest; + +import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex; +import org.apache.curator.x.rest.entity.LockRequestEntity; +import org.apache.curator.x.rest.system.Connection; +import org.apache.curator.x.rest.system.ConnectionsManager; +import org.apache.curator.x.rest.system.ThingKey; +import org.apache.curator.x.rest.system.ThingType; +import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.container.AsyncResponse; +import javax.ws.rs.container.Suspended; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.ext.ContextResolver; +import java.util.concurrent.TimeUnit; + +@Path("zookeeper/recipes/lock/{connectionId}") +public class LockRecipeResource +{ + private final ConnectionsManager connectionsManager; + + public LockRecipeResource(@Context ContextResolver<ConnectionsManager> contextResolver) + { + connectionsManager = contextResolver.getContext(ConnectionsManager.class); + } + + @POST + @Path("{path:.*}") + @Produces(MediaType.APPLICATION_JSON) + public Response reEntrantLockAllocate(@PathParam("connectionId") String connectionId, @PathParam("path") String path) throws Exception + { + Connection connection = connectionsManager.get(connectionId); + if ( connection == null ) + { + return Response.status(Response.Status.NOT_FOUND).build(); + } + + InterProcessSemaphoreMutex mutex = new InterProcessSemaphoreMutex(connection.getClient(), path); + ThingKey<InterProcessSemaphoreMutex> key = new ThingKey<InterProcessSemaphoreMutex>(ThingType.MUTEX); + connection.putThing(key, mutex); + + return Response.ok(key.getId()).build(); + } + + @DELETE + @Path("{id}") + public Response reEntrantLockDelete(@PathParam("connectionId") String connectionId, @PathParam("id") String lockId) throws Exception + { + Connection connection = connectionsManager.get(connectionId); + if ( connection == null ) + { + return Response.status(Response.Status.NOT_FOUND).build(); + } + + InterProcessSemaphoreMutex mutex = connection.removeThing(new ThingKey<InterProcessSemaphoreMutex>(lockId, ThingType.MUTEX)); + if ( mutex == null ) + { + return Response.status(Response.Status.NOT_FOUND).build(); + } + + if ( mutex.isAcquiredInThisProcess() ) + { + mutex.release(); + } + + return Response.ok().build(); + } + + @POST + @Consumes(MediaType.APPLICATION_JSON) + public void reEntrantLockAcquire(@Suspended final AsyncResponse asyncResponse, @PathParam("connectionId") String connectionId, final LockRequestEntity lockRequest) throws Exception + { + Connection connection = connectionsManager.get(connectionId); + if ( connection == null ) + { + asyncResponse.resume(Response.status(Response.Status.NOT_FOUND).build()); + return; + } + + final InterProcessSemaphoreMutex mutex = connection.getThing(new ThingKey<InterProcessSemaphoreMutex>(lockRequest.getLockId(), ThingType.MUTEX)); + if ( mutex == null ) + { + asyncResponse.resume(Response.status(Response.Status.NOT_FOUND).build()); + return; + } + + connectionsManager.getExecutorService().submit + ( + new Runnable() + { + @Override + public void run() + { + try + { + boolean success = mutex.acquire(lockRequest.getMaxWaitMs(), TimeUnit.MILLISECONDS); + asyncResponse.resume(Response.status(success ? Response.Status.OK : Response.Status.REQUEST_TIMEOUT).build()); + } + catch ( Exception e ) + { + asyncResponse.resume(e); + } + } + } + ); + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/91fb3886/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/LockRequestEntity.java ---------------------------------------------------------------------- diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/LockRequestEntity.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/LockRequestEntity.java new file mode 100644 index 0000000..ef51d15 --- /dev/null +++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/entity/LockRequestEntity.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.curator.x.rest.entity; + +import javax.xml.bind.annotation.XmlRootElement; + +@XmlRootElement +public class LockRequestEntity +{ + private String lockId; + private int maxWaitMs; + + public LockRequestEntity() + { + this("/", 0); + } + + public LockRequestEntity(String lockId, int maxWaitMs) + { + this.lockId = lockId; + this.maxWaitMs = maxWaitMs; + } + + public String getLockId() + { + return lockId; + } + + public void setLockId(String lockId) + { + this.lockId = lockId; + } + + public int getMaxWaitMs() + { + return maxWaitMs; + } + + public void setMaxWaitMs(int maxWaitMs) + { + this.maxWaitMs = maxWaitMs; + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/91fb3886/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/Connection.java ---------------------------------------------------------------------- diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/Connection.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/Connection.java new file mode 100644 index 0000000..7f46de1 --- /dev/null +++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/Connection.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.curator.x.rest.system; + +import com.google.common.collect.Maps; +import com.google.common.collect.Queues; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.framework.state.ConnectionStateListener; +import java.io.Closeable; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicLong; + +public class Connection implements Closeable, ConnectionStateListener +{ + private final CuratorFramework client; + private final AtomicLong lastUseMs = new AtomicLong(System.currentTimeMillis()); + private final Map<ThingKey, Object> things = Maps.newConcurrentMap(); + private final BlockingQueue<ConnectionState> states = Queues.newLinkedBlockingQueue(); + + public Connection(CuratorFramework client) + { + this.client = client; + client.getConnectionStateListenable().addListener(this); + } + + @Override + public void close() + { + client.close(); + } + + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + states.add(newState); + } + + public ConnectionState blockingPopStateChange() throws InterruptedException + { + return states.take(); + } + + public void updateUse() + { + for ( Map.Entry<ThingKey, Object> entry : things.entrySet() ) + { + //noinspection unchecked + entry.getKey().getType().closeFor(entry.getValue()); + } + lastUseMs.set(System.currentTimeMillis()); + } + + public CuratorFramework getClient() + { + return client; + } + + public long getLastUseMs() + { + return lastUseMs.get(); + } + + public <T> void putThing(ThingKey<T> key, T thing) + { + things.put(key, thing); + } + + public <T> T getThing(ThingKey<T> key) + { + Object o = things.get(key); + if ( o != null ) + { + return key.getType().getThingClass().cast(o); + } + return null; + } + + public <T> T removeThing(ThingKey<T> key) + { + Object o = things.remove(key); + if ( o != null ) + { + return key.getType().getThingClass().cast(o); + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/91fb3886/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ConnectionsManager.java ---------------------------------------------------------------------- diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ConnectionsManager.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ConnectionsManager.java new file mode 100644 index 0000000..febea7a --- /dev/null +++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ConnectionsManager.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.curator.x.rest.system; + +import com.google.common.collect.Maps; +import com.google.common.io.Closeables; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.utils.ThreadUtils; +import java.io.Closeable; +import java.io.IOException; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +// TODO connection cleanup/timeouts + +public class ConnectionsManager implements Closeable +{ + private final Map<String, Connection> connections = Maps.newConcurrentMap(); + private final CuratorFrameworkAllocator allocator; + private final ExecutorService executorService; + + public ConnectionsManager(CuratorFrameworkAllocator allocator) + { + this(allocator, Executors.newCachedThreadPool(ThreadUtils.newThreadFactory("ConnectionsManager"))); + } + + public ConnectionsManager(CuratorFrameworkAllocator allocator, ExecutorService executorService) + { + this.allocator = allocator; + this.executorService = executorService; + } + + public String newConnection() throws Exception + { + String id = UUID.randomUUID().toString(); + CuratorFramework client = allocator.newCuratorFramework(); + connections.put(id, new Connection(client)); + return id; + } + + public Connection get(String id) + { + Connection connection = connections.get(id); + if ( connection != null ) + { + connection.updateUse(); + } + return connection; + } + + public boolean closeConnection(String id) + { + Connection connection = connections.remove(id); + if ( connection != null ) + { + connection.close(); + return true; + } + return false; + } + + public ExecutorService getExecutorService() + { + return executorService; + } + + @Override + public void close() throws IOException + { + for ( Connection connection : connections.values() ) + { + Closeables.closeQuietly(connection); + } + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/91fb3886/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/CuratorFrameworkAllocator.java ---------------------------------------------------------------------- diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/CuratorFrameworkAllocator.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/CuratorFrameworkAllocator.java new file mode 100644 index 0000000..d6dd768 --- /dev/null +++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/CuratorFrameworkAllocator.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.curator.x.rest.system; + +import org.apache.curator.framework.CuratorFramework; + +public interface CuratorFrameworkAllocator +{ + public CuratorFramework newCuratorFramework() throws Exception; +} http://git-wip-us.apache.org/repos/asf/curator/blob/91fb3886/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ThingKey.java ---------------------------------------------------------------------- diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ThingKey.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ThingKey.java new file mode 100644 index 0000000..3e60cbd --- /dev/null +++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ThingKey.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.curator.x.rest.system; + +import com.google.common.base.Preconditions; +import java.util.UUID; + +public class ThingKey<T> +{ + private final String id; + private final ThingType<T> type; + + public ThingKey(ThingType<T> type) + { + this(UUID.randomUUID().toString(), type); + } + + public ThingKey(String id, ThingType<T> type) + { + this.id = Preconditions.checkNotNull(id, "id cannot be null"); + this.type = Preconditions.checkNotNull(type, "type cannot be null"); + } + + public String getId() + { + return id; + } + + public ThingType<T> getType() + { + return type; + } + + @Override + public boolean equals(Object o) + { + if ( this == o ) + { + return true; + } + if ( o == null || getClass() != o.getClass() ) + { + return false; + } + + ThingKey thingKey = (ThingKey)o; + + if ( !id.equals(thingKey.id) ) + { + return false; + } + //noinspection RedundantIfStatement + if ( type != thingKey.type ) + { + return false; + } + + return true; + } + + @Override + public int hashCode() + { + int result = id.hashCode(); + result = 31 * result + type.hashCode(); + return result; + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/91fb3886/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ThingType.java ---------------------------------------------------------------------- diff --git a/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ThingType.java b/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ThingType.java new file mode 100644 index 0000000..d09f0cc --- /dev/null +++ b/curator-x-rest/src/main/java/org/apache/curator/x/rest/system/ThingType.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.curator.x.rest.system; + +import com.google.common.io.Closeables; +import org.apache.curator.framework.recipes.leader.LeaderLatch; +import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex; +import java.util.concurrent.Future; + +public interface ThingType<T> +{ + public static ThingType<InterProcessSemaphoreMutex> MUTEX = new ThingType<InterProcessSemaphoreMutex>() + { + @Override + public Class<InterProcessSemaphoreMutex> getThingClass() + { + return InterProcessSemaphoreMutex.class; + } + + @Override + public void closeFor(InterProcessSemaphoreMutex instance) + { + // nop + } + }; + + public static ThingType<LeaderLatch> LEADER = new ThingType<LeaderLatch>() + { + @Override + public Class<LeaderLatch> getThingClass() + { + return LeaderLatch.class; + } + + @Override + public void closeFor(LeaderLatch latch) + { + Closeables.closeQuietly(latch); + } + }; + + public static ThingType<Future> FUTURE = new ThingType<Future>() + { + @Override + public Class<Future> getThingClass() + { + return Future.class; + } + + @Override + public void closeFor(Future future) + { + future.cancel(true); + } + }; + + public Class<T> getThingClass(); + + public void closeFor(T instance); +} http://git-wip-us.apache.org/repos/asf/curator/blob/91fb3886/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index af548c2..81a2577 100644 --- a/pom.xml +++ b/pom.xml @@ -224,6 +224,7 @@ <module>curator-examples</module> <module>curator-x-discovery</module> <module>curator-x-discovery-server</module> + <module>curator-x-rest</module> </modules> <dependencyManagement>