Repository: deltaspike
Updated Branches:
  refs/heads/master a571ac650 -> dbd4fb78e


DELTASPIKE-1094 DELTASPIKE-1093 @Throttled and @Futureable


Project: http://git-wip-us.apache.org/repos/asf/deltaspike/repo
Commit: http://git-wip-us.apache.org/repos/asf/deltaspike/commit/dbd4fb78
Tree: http://git-wip-us.apache.org/repos/asf/deltaspike/tree/dbd4fb78
Diff: http://git-wip-us.apache.org/repos/asf/deltaspike/diff/dbd4fb78

Branch: refs/heads/master
Commit: dbd4fb78e1af1acb1e59e3f2de59979b012486b1
Parents: a571ac6
Author: Romain manni-Bucau <rmannibu...@gmail.com>
Authored: Thu Mar 17 12:09:18 2016 +0100
Committer: Romain manni-Bucau <rmannibu...@gmail.com>
Committed: Thu Mar 17 12:09:18 2016 +0100

----------------------------------------------------------------------
 .../deltaspike/core/api/future/Futureable.java  |  44 +++
 .../core/api/throttling/Throttled.java          |  56 ++++
 .../core/api/throttling/Throttling.java         |  73 +++++
 .../core/impl/future/FutureableInterceptor.java | 324 +++++++++++++++++++
 .../impl/throttling/ThrottledInterceptor.java   | 197 +++++++++++
 .../impl/src/main/resources/META-INF/beans.xml  |   4 +
 .../test/core/impl/future/FutureableTest.java   |  66 ++++
 .../test/core/impl/future/Service.java          |  61 ++++
 .../test/core/impl/throttling/Service.java      |  60 ++++
 .../test/core/impl/throttling/Service2.java     |  69 ++++
 .../core/impl/throttling/ThrottledTest.java     | 277 ++++++++++++++++
 11 files changed, 1231 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/deltaspike/blob/dbd4fb78/deltaspike/core/api/src/main/java/org/apache/deltaspike/core/api/future/Futureable.java
----------------------------------------------------------------------
diff --git 
a/deltaspike/core/api/src/main/java/org/apache/deltaspike/core/api/future/Futureable.java
 
b/deltaspike/core/api/src/main/java/org/apache/deltaspike/core/api/future/Futureable.java
new file mode 100644
index 0000000..2a725c2
--- /dev/null
+++ 
b/deltaspike/core/api/src/main/java/org/apache/deltaspike/core/api/future/Futureable.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.deltaspike.core.api.future;
+
+import javax.enterprise.util.Nonbinding;
+import javax.interceptor.InterceptorBinding;
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.TYPE;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+/**
+ * Mark the method as execute in a thread pool and not synchronously.
+ * Note: it should return a CompletionStage or Future.
+ */
+@InterceptorBinding
+@Retention(RUNTIME)
+@Target({ TYPE, METHOD })
+public @interface Futureable
+{
+    /**
+     * @return pool name, if not existing will use a default one based on 
processor count.
+     */
+    @Nonbinding
+    String value() default "";
+}

http://git-wip-us.apache.org/repos/asf/deltaspike/blob/dbd4fb78/deltaspike/core/api/src/main/java/org/apache/deltaspike/core/api/throttling/Throttled.java
----------------------------------------------------------------------
diff --git 
a/deltaspike/core/api/src/main/java/org/apache/deltaspike/core/api/throttling/Throttled.java
 
b/deltaspike/core/api/src/main/java/org/apache/deltaspike/core/api/throttling/Throttled.java
new file mode 100644
index 0000000..4dcc0d2
--- /dev/null
+++ 
b/deltaspike/core/api/src/main/java/org/apache/deltaspike/core/api/throttling/Throttled.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.deltaspike.core.api.throttling;
+
+import javax.enterprise.util.Nonbinding;
+import javax.interceptor.InterceptorBinding;
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.TYPE;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+/**
+ * Mark a bean/method as relying on a throttler.
+ */
+@InterceptorBinding
+@Retention(RUNTIME)
+@Target({ TYPE, METHOD })
+public @interface Throttled
+{
+    /**
+     * @return the duration to wait to acquire the permits.
+     */
+    @Nonbinding
+    long timeout() default 0L;
+
+    /**
+     * @return the unit of timeout().
+     */
+    @Nonbinding
+    TimeUnit timeoutUnit() default TimeUnit.MILLISECONDS;
+
+    /**
+     * @return how many permits to require.
+     */
+    @Nonbinding
+    int weight() default 1;
+}

http://git-wip-us.apache.org/repos/asf/deltaspike/blob/dbd4fb78/deltaspike/core/api/src/main/java/org/apache/deltaspike/core/api/throttling/Throttling.java
----------------------------------------------------------------------
diff --git 
a/deltaspike/core/api/src/main/java/org/apache/deltaspike/core/api/throttling/Throttling.java
 
b/deltaspike/core/api/src/main/java/org/apache/deltaspike/core/api/throttling/Throttling.java
new file mode 100644
index 0000000..fdf2cf1
--- /dev/null
+++ 
b/deltaspike/core/api/src/main/java/org/apache/deltaspike/core/api/throttling/Throttling.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.deltaspike.core.api.throttling;
+
+import javax.enterprise.util.Nonbinding;
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+import javax.enterprise.inject.spi.AnnotatedMethod;
+import java.util.concurrent.Semaphore;
+
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.TYPE;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+/**
+ * Configure the throttler associated to the class/method.
+ */
+@Retention(RUNTIME)
+@Target({ TYPE, METHOD })
+public @interface Throttling
+{
+    /**
+     * @return how to get the semaphore. Default to a plain Semaphore of the 
JVM.
+     */
+    @Nonbinding
+    Class<? extends SemaphoreFactory> factory() default SemaphoreFactory.class;
+
+    /**
+     * @return true if the semaphore is fair false otherwise.
+     */
+    @Nonbinding
+    boolean fair() default false;
+
+    /**
+     * @return how many permits has the semaphore.
+     */
+    @Nonbinding
+    int permits() default 1;
+
+    /**
+     * @return name/bucket of this configuration (allow to have multiple 
buckets per class but default is 1 per class).
+     */
+    @Nonbinding
+    String name() default "";
+
+    interface SemaphoreFactory
+    {
+        /**
+         * @param method the intercepted method.
+         * @param name bucket name.
+         * @param fair should the semaphore be fair.
+         * @param permits maximum permits the semaphore shoulg get.
+         * @return the semaphore build accordingly the parameters.
+         */
+        Semaphore newSemaphore(AnnotatedMethod<?> method, String name, boolean 
fair, int permits);
+    }
+}

http://git-wip-us.apache.org/repos/asf/deltaspike/blob/dbd4fb78/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/future/FutureableInterceptor.java
----------------------------------------------------------------------
diff --git 
a/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/future/FutureableInterceptor.java
 
b/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/future/FutureableInterceptor.java
new file mode 100644
index 0000000..a1ae35b
--- /dev/null
+++ 
b/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/future/FutureableInterceptor.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.deltaspike.core.impl.future;
+
+import org.apache.deltaspike.core.api.config.ConfigResolver;
+import org.apache.deltaspike.core.api.future.Futureable;
+
+import javax.annotation.PreDestroy;
+import javax.enterprise.context.ApplicationScoped;
+import javax.enterprise.inject.spi.AnnotatedMethod;
+import javax.enterprise.inject.spi.AnnotatedType;
+import javax.enterprise.inject.spi.BeanManager;
+import javax.inject.Inject;
+import javax.interceptor.AroundInvoke;
+import javax.interceptor.Interceptor;
+import javax.interceptor.InvocationContext;
+import java.io.Serializable;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+@Interceptor
+@Futureable("")
+public class FutureableInterceptor implements Serializable
+{
+    private static final Class<?> COMPLETION_STAGE;
+    private static final Class<?> COMPLETABLE_FUTURE;
+    private static final Method COMPLETABLE_STAGE_TO_FUTURE;
+    private static final Method COMPLETABLE_FUTURE_COMPLETE;
+    private static final Method COMPLETABLE_FUTURE_COMPLETE_ERROR;
+
+    static
+    {
+        Class<?> completionStageClass = null;
+        Class<?> completableFutureClass = null;
+        Method completionStageClassToCompletableFuture = null;
+        Method completableFutureComplete = null;
+        Method completableFutureCompleteError = null;
+        try
+        {
+            final ClassLoader classLoader = ClassLoader.getSystemClassLoader();
+            completionStageClass = 
classLoader.loadClass("java.util.concurrent.CompletionStage");
+            completionStageClassToCompletableFuture = 
completionStageClass.getMethod("toCompletableFuture");
+            completableFutureClass = 
classLoader.loadClass("java.util.concurrent.CompletableFuture");
+            completableFutureComplete = 
completableFutureClass.getMethod("complete", Object.class);
+            completableFutureCompleteError = 
completableFutureClass.getMethod("completeExceptionally", Throwable.class);
+        }
+        catch (final Exception e)
+        {
+            // not on java 8
+        }
+        COMPLETION_STAGE = completionStageClass;
+        COMPLETABLE_FUTURE = completableFutureClass;
+        COMPLETABLE_STAGE_TO_FUTURE = completionStageClassToCompletableFuture;
+        COMPLETABLE_FUTURE_COMPLETE = completableFutureComplete;
+        COMPLETABLE_FUTURE_COMPLETE_ERROR = completableFutureCompleteError;
+    }
+
+    @Inject
+    private ThreadPoolManager manager;
+
+    @Inject
+    private BeanManager beanManager;
+
+    private transient ConcurrentMap<Method, ExecutorService> configByMethod =
+            new ConcurrentHashMap<Method, ExecutorService>();
+
+    @AroundInvoke
+    public Object invoke(final InvocationContext ic) throws Exception
+    {
+        // validate usage
+        final Class<?> returnType = ic.getMethod().getReturnType();
+        if (!COMPLETION_STAGE.isAssignableFrom(returnType) && 
!Future.class.isAssignableFrom(returnType))
+        {
+            throw new IllegalArgumentException("Return type should be a 
CompletableStage or Future");
+        }
+
+        if (configByMethod == null)
+        {
+            synchronized (this)
+            {
+                if (configByMethod == null)
+                {
+                    configByMethod = new ConcurrentHashMap<Method, 
ExecutorService>();
+                }
+            }
+        }
+
+        // running < j8 we cant have cancellation
+        //final AtomicReference<Callable<?>> cancelHook = new 
AtomicReference<Callable<?>>();
+        final Callable<Object> invocation = new Callable<Object>()
+        {
+            @Override
+            public Object call() throws Exception
+            {
+                try
+                {
+                    final Object proceed = ic.proceed();
+                    final Future<?> future = 
!COMPLETION_STAGE.isInstance(proceed) ?
+                            Future.class.cast(proceed) :
+                            
Future.class.cast(COMPLETABLE_STAGE_TO_FUTURE.invoke(proceed));
+                    /*
+                    cancelHook.set(new Callable<Boolean>()
+                    {
+                        @Override
+                        public Boolean call()
+                        {
+                            return future.cancel(true);
+                        }
+                    });
+                    */
+                    return future.get();
+                }
+                catch (final InvocationTargetException e)
+                {
+                    throw rethrow(e.getCause());
+                }
+                catch (final Exception e)
+                {
+                    throw rethrow(e);
+                }
+            }
+        };
+
+        final ExecutorService pool = getOrCreatePool(ic);
+        if (COMPLETABLE_FUTURE == null)  // not on java 8 can only be a future
+        {
+            return pool.submit(invocation);
+        }
+
+        // java 8, use CompletableFuture, it impl CompletionStage and Future 
so everyone is happy
+        final Object completableFuture = COMPLETABLE_FUTURE.newInstance();
+        pool.submit(new J8PromiseCompanionTask(completableFuture, invocation));
+        // TODO: handle cancel
+        return completableFuture;
+    }
+
+    private RuntimeException rethrow(final Throwable cause)
+    {
+        if (RuntimeException.class.isInstance(cause))
+        {
+            return RuntimeException.class.cast(cause);
+        }
+        return new IllegalStateException(cause);
+    }
+
+    private ExecutorService getOrCreatePool(final InvocationContext ic)
+    {
+        final Method method = ic.getMethod();
+        ExecutorService executorService = configByMethod.get(method);
+        if (executorService == null)
+        {
+            final AnnotatedType<?> annotatedType = 
beanManager.createAnnotatedType(method.getDeclaringClass());
+            AnnotatedMethod<?> annotatedMethod = null;
+            for (final AnnotatedMethod<?> am : annotatedType.getMethods())
+            {
+                if (am.getJavaMember().equals(method))
+                {
+                    annotatedMethod = am;
+                    break;
+                }
+            }
+            if (annotatedMethod == null)
+            {
+                throw new IllegalStateException("No annotated method for " + 
method);
+            }
+            final Futureable methodConfig = 
annotatedMethod.getAnnotation(Futureable.class);
+            final ExecutorService instance = manager.find(
+                    (methodConfig == null ? 
annotatedType.getAnnotation(Futureable.class) : methodConfig).value());
+            configByMethod.putIfAbsent(method, instance);
+            executorService = instance;
+        }
+        return executorService;
+    }
+
+    @ApplicationScoped
+    public static class ThreadPoolManager
+    {
+        private final ConcurrentMap<String, ExecutorService> pools = new 
ConcurrentHashMap<String, ExecutorService>();
+        private volatile ExecutorService defaultPool;
+        private volatile boolean closed = false;
+
+        @PreDestroy
+        private void shutdown()
+        {
+            closed = true;
+            final String propertyValue = 
ConfigResolver.getPropertyValue("deltaspike.future.timeout");
+            final long timeout = propertyValue == null ? 
TimeUnit.MINUTES.toMillis(1) : Integer.parseInt(propertyValue);
+            for (final ExecutorService es : pools.values())
+            {
+                es.shutdown();
+                try
+                {
+                    es.awaitTermination(timeout, TimeUnit.MILLISECONDS);
+                }
+                catch (final InterruptedException e)
+                {
+                    Thread.interrupted();
+                }
+            }
+            if (defaultPool != null)
+            {
+                defaultPool.shutdown();
+                try
+                {
+                    defaultPool.awaitTermination(timeout, 
TimeUnit.MILLISECONDS);
+                }
+                catch (final InterruptedException e)
+                {
+                    Thread.interrupted();
+                }
+            }
+            pools.clear();
+        }
+
+        // open door for users until we have a config, should be part of API 
but since it can change keeping it there
+        public void register(final String name, final ExecutorService es)
+        {
+            pools.putIfAbsent(name, es);
+        }
+
+        public ExecutorService find(final String name)
+        {
+            if (closed)
+            {
+                throw new IllegalStateException("Container is shutting down");
+            }
+            ExecutorService pool = pools.get(name);
+            if (pool == null)
+            {
+                ensureDefaultPool();
+                pool = defaultPool;
+            }
+            return pool;
+        }
+
+        private void ensureDefaultPool()
+        {
+            if (defaultPool == null)
+            {
+                synchronized (this)
+                {
+                    if (defaultPool == null)
+                    {
+                        defaultPool = Executors.newFixedThreadPool(
+                                Math.max(2, 
Runtime.getRuntime().availableProcessors()));
+                    }
+                }
+            }
+        }
+    }
+
+    private static final class J8PromiseCompanionTask<T> implements Runnable
+    {
+        private Object dep;
+        private Callable<T> fn;
+
+        J8PromiseCompanionTask(final Object dep, Callable<T> fn)
+        {
+            this.dep = dep;
+            this.fn = fn;
+        }
+
+        public void run()
+        {
+            try
+            {
+                COMPLETABLE_FUTURE_COMPLETE.invoke(dep, fn.call());
+            }
+            catch (final InvocationTargetException e)
+            {
+                try
+                {
+                    COMPLETABLE_FUTURE_COMPLETE_ERROR.invoke(dep, 
e.getCause());
+                }
+                catch (IllegalAccessException e1)
+                {
+                    throw new IllegalStateException(e1);
+                }
+                catch (final InvocationTargetException e1)
+                {
+                    throw new IllegalStateException(e1.getCause());
+                }
+            }
+            catch (Exception e)
+            {
+                try
+                {
+                    COMPLETABLE_FUTURE_COMPLETE_ERROR.invoke(dep, e);
+                }
+                catch (IllegalAccessException e1)
+                {
+                    throw new IllegalStateException(e1);
+                }
+                catch (final InvocationTargetException e1)
+                {
+                    throw new IllegalStateException(e1.getCause());
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/deltaspike/blob/dbd4fb78/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/throttling/ThrottledInterceptor.java
----------------------------------------------------------------------
diff --git 
a/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/throttling/ThrottledInterceptor.java
 
b/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/throttling/ThrottledInterceptor.java
new file mode 100644
index 0000000..6b9e1f8
--- /dev/null
+++ 
b/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/throttling/ThrottledInterceptor.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.deltaspike.core.impl.throttling;
+
+import org.apache.deltaspike.core.api.throttling.Throttled;
+import org.apache.deltaspike.core.api.throttling.Throttling;
+
+import javax.enterprise.context.ApplicationScoped;
+import javax.enterprise.inject.Typed;
+import javax.enterprise.inject.spi.AnnotatedMethod;
+import javax.enterprise.inject.spi.AnnotatedType;
+import javax.enterprise.inject.spi.BeanManager;
+import javax.inject.Inject;
+import javax.interceptor.AroundInvoke;
+import javax.interceptor.Interceptor;
+import javax.interceptor.InvocationContext;
+import java.io.Serializable;
+import java.lang.reflect.Method;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+@Throttled
+@Interceptor
+public class ThrottledInterceptor implements Serializable
+{
+    @Inject
+    private LocalCache metadata;
+
+    @AroundInvoke
+    public Object invoke(final InvocationContext ic) throws Exception
+    {
+        return metadata.getOrCreateInvocation(ic).invoke(ic);
+    }
+
+    private static Semaphore onInterruption(final InterruptedException e)
+    {
+        Thread.interrupted();
+        throw new IllegalStateException("acquire() interrupted", e);
+    }
+
+    @ApplicationScoped
+    @Typed(LocalCache.class)
+    static class LocalCache implements Throttling.SemaphoreFactory
+    {
+        private final ConcurrentMap<String, Semaphore> semaphores = new 
ConcurrentHashMap<String, Semaphore>();
+        private final ConcurrentMap<Method, Invocation> providers = new 
ConcurrentHashMap<Method, Invocation>();
+
+        @Inject
+        private BeanManager beanManager;
+
+        Invocation getOrCreateInvocation(final InvocationContext ic)
+        {
+            final Method method = ic.getMethod();
+            Invocation i = providers.get(method);
+            if (i == null)
+            {
+                final Class declaringClass = method.getDeclaringClass();
+                final AnnotatedType<Object> annotatedType = 
beanManager.createAnnotatedType(declaringClass);
+                AnnotatedMethod<?> annotatedMethod = null;
+                for (final AnnotatedMethod<?> am : annotatedType.getMethods())
+                {
+                    if (am.getJavaMember().equals(method))
+                    {
+                        annotatedMethod = am;
+                        break;
+                    }
+                }
+                if (annotatedMethod == null)
+                {
+                    throw new IllegalStateException("No annotated method for " 
+ method);
+                }
+
+                Throttled config = 
annotatedMethod.getAnnotation(Throttled.class);
+                if (config == null)
+                {
+                    config = annotatedType.getAnnotation(Throttled.class);
+                }
+                Throttling sharedConfig = 
annotatedMethod.getAnnotation(Throttling.class);
+                if (sharedConfig == null)
+                {
+                    sharedConfig = 
annotatedType.getAnnotation(Throttling.class);
+                }
+
+                final Throttling.SemaphoreFactory factory =
+                        sharedConfig != null && sharedConfig.factory() != 
Throttling.SemaphoreFactory.class ?
+                        Throttling.SemaphoreFactory.class.cast(
+                                beanManager.getReference(beanManager.resolve(
+                                        beanManager.getBeans(
+                                                sharedConfig.factory())),
+                                        Throttling.SemaphoreFactory.class, 
null)) : this;
+
+                final Semaphore semaphore = factory.newSemaphore(
+                        annotatedMethod,
+                        sharedConfig != null && !sharedConfig.name().isEmpty() 
?
+                                sharedConfig.name() : declaringClass.getName(),
+                        sharedConfig != null && sharedConfig.fair(),
+                        sharedConfig != null ? sharedConfig.permits() : 1);
+                final long timeout = 
config.timeoutUnit().toMillis(config.timeout());
+                final int weigth = config.weight();
+                i = new Invocation(semaphore, weigth, timeout);
+                final Invocation existing = 
providers.putIfAbsent(ic.getMethod(), i);
+                if (existing != null)
+                {
+                    i = existing;
+                }
+            }
+            return i;
+        }
+
+        @Override
+        public Semaphore newSemaphore(final AnnotatedMethod<?> method, final 
String name,
+                                      final boolean fair, final int permits)
+        {
+            Semaphore semaphore = semaphores.get(name);
+            if (semaphore == null)
+            {
+                semaphore = new Semaphore(permits, fair);
+                final Semaphore existing = semaphores.putIfAbsent(name, 
semaphore);
+                if (existing != null)
+                {
+                    semaphore = existing;
+                }
+            }
+            return semaphore;
+        }
+    }
+
+    private static final class Invocation
+    {
+        private final int weight;
+        private final Semaphore semaphore;
+        private final long timeout;
+
+        private Invocation(final Semaphore semaphore, final int weight, final 
long timeout)
+        {
+            this.semaphore = semaphore;
+            this.weight = weight;
+            this.timeout = timeout;
+        }
+
+        Object invoke(final InvocationContext context) throws Exception
+        {
+            if (timeout > 0)
+            {
+                try
+                {
+                    if (!semaphore.tryAcquire(weight, timeout, 
TimeUnit.MILLISECONDS))
+                    {
+                        throw new IllegalStateException("Can't acquire " + 
weight +
+                                " permits for " + context.getMethod() + " in " 
+ timeout + "ms");
+                    }
+                }
+                catch (final InterruptedException e)
+                {
+                    return onInterruption(e);
+                }
+            }
+            else
+            {
+                try
+                {
+                    semaphore.acquire(weight);
+                }
+                catch (final InterruptedException e)
+                {
+                    return onInterruption(e);
+                }
+            }
+            try
+            {
+                return context.proceed();
+            }
+            finally
+            {
+                semaphore.release(weight);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/deltaspike/blob/dbd4fb78/deltaspike/core/impl/src/main/resources/META-INF/beans.xml
----------------------------------------------------------------------
diff --git a/deltaspike/core/impl/src/main/resources/META-INF/beans.xml 
b/deltaspike/core/impl/src/main/resources/META-INF/beans.xml
index a2308ff..9994be5 100644
--- a/deltaspike/core/impl/src/main/resources/META-INF/beans.xml
+++ b/deltaspike/core/impl/src/main/resources/META-INF/beans.xml
@@ -20,4 +20,8 @@
 <beans xmlns="http://java.sun.com/xml/ns/javaee";
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
        xsi:schemaLocation="http://java.sun.com/xml/ns/javaee 
http://java.sun.com/xml/ns/javaee/beans_1_0.xsd";>
+  <interceptors>
+    
<class>org.apache.deltaspike.core.impl.throttling.ThrottledInterceptor</class>
+    <class>org.apache.deltaspike.core.impl.future.FutureableInterceptor</class>
+  </interceptors>
 </beans>

http://git-wip-us.apache.org/repos/asf/deltaspike/blob/dbd4fb78/deltaspike/core/impl/src/test/java/org/apache/deltaspike/test/core/impl/future/FutureableTest.java
----------------------------------------------------------------------
diff --git 
a/deltaspike/core/impl/src/test/java/org/apache/deltaspike/test/core/impl/future/FutureableTest.java
 
b/deltaspike/core/impl/src/test/java/org/apache/deltaspike/test/core/impl/future/FutureableTest.java
new file mode 100644
index 0000000..afc37e7
--- /dev/null
+++ 
b/deltaspike/core/impl/src/test/java/org/apache/deltaspike/test/core/impl/future/FutureableTest.java
@@ -0,0 +1,66 @@
+package org.apache.deltaspike.test.core.impl.future;
+
+import org.apache.deltaspike.test.util.ArchiveUtils;
+import org.jboss.arquillian.container.test.api.Deployment;
+import org.jboss.arquillian.junit.Arquillian;
+import org.jboss.shrinkwrap.api.ShrinkWrap;
+import org.jboss.shrinkwrap.api.asset.EmptyAsset;
+import org.jboss.shrinkwrap.api.spec.JavaArchive;
+import org.jboss.shrinkwrap.api.spec.WebArchive;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import javax.inject.Inject;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+@RunWith(Arquillian.class)
+public class FutureableTest {
+    @Deployment
+    public static WebArchive deploy()
+    {
+        JavaArchive testJar = ShrinkWrap.create(JavaArchive.class, 
"FutureableTest.jar")
+                .addPackage(Service.class.getPackage().getName())
+                .addAsManifestResource(EmptyAsset.INSTANCE, "beans.xml");
+
+        return ShrinkWrap.create(WebArchive.class, "FutureableTest.war")
+                .addAsLibraries(ArchiveUtils.getDeltaSpikeCoreArchive())
+                .addAsLibraries(testJar)
+                .addAsWebInfResource(EmptyAsset.INSTANCE, "beans.xml");
+    }
+
+    @Inject
+    private Service service;
+
+    @Test
+    public void future()
+    {
+        final Future<String> future = service.thatSLong(1000);
+        int count = 0;
+        for (int i = 0; i < 1000; i++)
+        {
+            if (future.isDone())
+            {
+                break;
+            }
+            count++;
+        }
+        try
+        {
+            assertEquals("done", future.get());
+        }
+        catch (final InterruptedException e)
+        {
+            Thread.interrupted();
+            fail();
+        }
+        catch (final ExecutionException e)
+        {
+            fail(e.getMessage());
+        }
+        assertEquals(1000, count);
+    }
+}

http://git-wip-us.apache.org/repos/asf/deltaspike/blob/dbd4fb78/deltaspike/core/impl/src/test/java/org/apache/deltaspike/test/core/impl/future/Service.java
----------------------------------------------------------------------
diff --git 
a/deltaspike/core/impl/src/test/java/org/apache/deltaspike/test/core/impl/future/Service.java
 
b/deltaspike/core/impl/src/test/java/org/apache/deltaspike/test/core/impl/future/Service.java
new file mode 100644
index 0000000..2212302
--- /dev/null
+++ 
b/deltaspike/core/impl/src/test/java/org/apache/deltaspike/test/core/impl/future/Service.java
@@ -0,0 +1,61 @@
+package org.apache.deltaspike.test.core.impl.future;
+
+import org.apache.deltaspike.core.api.future.Futureable;
+
+import javax.enterprise.context.ApplicationScoped;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+@ApplicationScoped
+public class Service
+{
+    @Futureable // or CompletableFuture<String>
+    public Future<String> thatSLong(final long sleep)
+    {
+        try
+        {
+            Thread.sleep(sleep);
+            // return CompletableFuture.completedFuture("done");
+            return new Future<String>()  // EE will have AsyncFuture but more 
designed for j8 ^^
+            {
+                @Override
+                public boolean cancel(final boolean mayInterruptIfRunning)
+                {
+                    return false;
+                }
+
+                @Override
+                public boolean isCancelled()
+                {
+                    return false;
+                }
+
+                @Override
+                public boolean isDone()
+                {
+                    return true;
+                }
+
+                @Override
+                public String get() throws InterruptedException, 
ExecutionException
+                {
+                    return "done";
+                }
+
+                @Override
+                public String get(final long timeout, final TimeUnit unit)
+                        throws InterruptedException, ExecutionException, 
TimeoutException
+                {
+                    return "done";
+                }
+            };
+        }
+        catch (final InterruptedException e)
+        {
+            Thread.interrupted();
+            throw new IllegalStateException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/deltaspike/blob/dbd4fb78/deltaspike/core/impl/src/test/java/org/apache/deltaspike/test/core/impl/throttling/Service.java
----------------------------------------------------------------------
diff --git 
a/deltaspike/core/impl/src/test/java/org/apache/deltaspike/test/core/impl/throttling/Service.java
 
b/deltaspike/core/impl/src/test/java/org/apache/deltaspike/test/core/impl/throttling/Service.java
new file mode 100644
index 0000000..95f7e7f
--- /dev/null
+++ 
b/deltaspike/core/impl/src/test/java/org/apache/deltaspike/test/core/impl/throttling/Service.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.deltaspike.test.core.impl.throttling;
+
+import org.apache.deltaspike.core.api.throttling.Throttled;
+
+import javax.enterprise.context.ApplicationScoped;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.fail;
+
+@ApplicationScoped
+public class Service
+{
+    private final Map<String, String> entries = new HashMap<String, String>();
+
+    @Throttled(timeout = 1, timeoutUnit = TimeUnit.SECONDS)
+    public String read(final String k)
+    {
+        return entries.get(k);
+    }
+
+    @Throttled(timeout = 1, timeoutUnit = TimeUnit.SECONDS)
+    public void write(final String k, final String v)
+    {
+        entries.put(k, v);
+    }
+
+    @Throttled
+    public void force()
+    {
+        try
+        {
+            Thread.sleep(TimeUnit.SECONDS.toMillis(5));
+        }
+        catch (final InterruptedException e)
+        {
+            Thread.interrupted();
+            fail();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/deltaspike/blob/dbd4fb78/deltaspike/core/impl/src/test/java/org/apache/deltaspike/test/core/impl/throttling/Service2.java
----------------------------------------------------------------------
diff --git 
a/deltaspike/core/impl/src/test/java/org/apache/deltaspike/test/core/impl/throttling/Service2.java
 
b/deltaspike/core/impl/src/test/java/org/apache/deltaspike/test/core/impl/throttling/Service2.java
new file mode 100644
index 0000000..4d81d51
--- /dev/null
+++ 
b/deltaspike/core/impl/src/test/java/org/apache/deltaspike/test/core/impl/throttling/Service2.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.deltaspike.test.core.impl.throttling;
+
+import org.apache.deltaspike.core.api.throttling.Throttled;
+import org.apache.deltaspike.core.api.throttling.Throttling;
+
+import javax.enterprise.context.ApplicationScoped;
+import java.util.ArrayList;
+import java.util.Collection;
+
+@Throttling(permits = 2)
+@ApplicationScoped
+public class Service2
+{
+    private final Collection<String> called = new ArrayList<String>();
+
+    @Throttled(timeout = 750)
+    public void call(final String k)
+    {
+        synchronized (called)
+        {
+            called.add(k);
+        }
+        try
+        {
+            Thread.sleep(1000);
+        }
+        catch (final InterruptedException e)
+        {
+            Thread.interrupted();
+        }
+    }
+
+    @Throttled(weight = 2)
+    public void heavy(final Runnable inTask)
+    {
+        inTask.run();
+        try
+        {
+            Thread.sleep(5000);
+        }
+        catch (final InterruptedException e)
+        {
+            Thread.interrupted();
+        }
+    }
+
+    public Collection<String> getCalled()
+    {
+        return called;
+    }
+}

http://git-wip-us.apache.org/repos/asf/deltaspike/blob/dbd4fb78/deltaspike/core/impl/src/test/java/org/apache/deltaspike/test/core/impl/throttling/ThrottledTest.java
----------------------------------------------------------------------
diff --git 
a/deltaspike/core/impl/src/test/java/org/apache/deltaspike/test/core/impl/throttling/ThrottledTest.java
 
b/deltaspike/core/impl/src/test/java/org/apache/deltaspike/test/core/impl/throttling/ThrottledTest.java
new file mode 100644
index 0000000..261a434
--- /dev/null
+++ 
b/deltaspike/core/impl/src/test/java/org/apache/deltaspike/test/core/impl/throttling/ThrottledTest.java
@@ -0,0 +1,277 @@
+package org.apache.deltaspike.test.core.impl.throttling;
+
+import org.apache.deltaspike.test.category.SeCategory;
+import org.apache.deltaspike.test.util.ArchiveUtils;
+import org.jboss.arquillian.container.test.api.Deployment;
+import org.jboss.arquillian.junit.Arquillian;
+import org.jboss.shrinkwrap.api.ShrinkWrap;
+import org.jboss.shrinkwrap.api.asset.EmptyAsset;
+import org.jboss.shrinkwrap.api.spec.JavaArchive;
+import org.jboss.shrinkwrap.api.spec.WebArchive;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import javax.inject.Inject;
+import java.util.HashSet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static java.util.Arrays.asList;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+@RunWith(Arquillian.class)
+@Category(SeCategory.class)
+public class ThrottledTest {
+    @Deployment
+    public static WebArchive deploy()
+    {
+        JavaArchive testJar = ShrinkWrap.create(JavaArchive.class, 
"ThrottledTest.jar")
+                .addPackage(Service.class.getPackage().getName())
+                .addAsManifestResource(EmptyAsset.INSTANCE, "beans.xml");
+
+        return ShrinkWrap.create(WebArchive.class, "ThrottledTest.war")
+                .addAsLibraries(ArchiveUtils.getDeltaSpikeCoreArchive())
+                .addAsLibraries(testJar)
+                .addAsWebInfResource(EmptyAsset.INSTANCE, "beans.xml");
+    }
+
+    @Inject
+    private Service service;
+
+    @Inject
+    private Service2 service2;
+
+    @Test
+    public void permits()
+    {
+        {// failling case now
+            final AtomicReference<Exception> failed = new 
AtomicReference<Exception>();
+            final CountDownLatch latch = new CountDownLatch(2);
+            final Thread[] concurrents = new Thread[]
+                    {
+                    new Thread()
+                    {
+                        @Override
+                        public void run()
+                        {
+                            service2.heavy(new Runnable()
+                            {
+                                @Override
+                                public void run()
+                                {
+                                    latch.countDown();
+                                }
+                            });
+                        }
+                    },
+                    new Thread()
+                    {
+                        @Override
+                        public void run()
+                        {
+                            try
+                            {
+                                latch.await();
+                            }
+                            catch (final InterruptedException e)
+                            {
+                                Thread.interrupted();
+                                fail();
+                            }
+                            try
+                            {
+                                service2.call("failed");
+                                fail();
+                            }
+                            catch (final IllegalStateException ise)
+                            {
+                                failed.set(ise);
+                            }
+                        }
+                    }
+            };
+            for (final Thread t : concurrents)
+            {
+                t.start();
+            }
+            latch.countDown();
+            waitForThreads(concurrents);
+            assertNotNull(failed.get());
+            assertThat(failed.get(), instanceOf(IllegalStateException.class));
+        }
+        { // passing
+            final CountDownLatch latch = new CountDownLatch(1);
+            final Thread[] concurrents = new Thread[]
+                    {
+                    new Thread()
+                    {
+                        @Override
+                        public void run()
+                        {
+                            try
+                            {
+                                latch.await();
+                            }
+                            catch (final InterruptedException e)
+                            {
+                                Thread.interrupted();
+                                fail();
+                            }
+                            service2.call("1");
+                        }
+                    },
+                    new Thread()
+                    {
+                        @Override
+                        public void run()
+                        {
+                            try
+                            {
+                                latch.await();
+                            }
+                            catch (final InterruptedException e)
+                            {
+                                Thread.interrupted();
+                                fail();
+                            }
+                            service2.call("2");
+                        }
+                    }
+            };
+            for (final Thread t : concurrents)
+            {
+                t.start();
+            }
+            latch.countDown();
+            waitForThreads(concurrents);
+            assertEquals(new HashSet<String>(asList("1", "2")), new 
HashSet<String>(service2.getCalled()));
+        }
+    }
+
+    private void waitForThreads(final Thread[] concurrents)
+    {
+        for (final Thread t : concurrents)
+        {
+            try
+            {
+                t.join();
+            }
+            catch (final InterruptedException e)
+            {
+                Thread.interrupted();
+                fail();
+            }
+        }
+    }
+
+    @Test
+    public void simpleNotConcurrent()
+    { // ~lock case
+        final CountDownLatch synchro = new CountDownLatch(1);
+        final Thread writer = new Thread()
+        {
+            @Override
+            public void run()
+            {
+                service.write("test", "value");
+                synchro.countDown();
+            }
+        };
+
+        final CountDownLatch end = new CountDownLatch(1);
+        final AtomicReference<String> val = new AtomicReference<String>();
+        final Thread reader = new Thread()
+        {
+            @Override
+            public void run()
+            {
+                try
+                {
+                    synchro.await(1, TimeUnit.MINUTES);
+                }
+                catch (final InterruptedException e)
+                {
+                    Thread.interrupted();
+                    fail();
+                }
+                val.set(service.read("test"));
+                end.countDown();
+            }
+        };
+
+        reader.start();
+        writer.start();
+        try
+        {
+            end.await(1, TimeUnit.MINUTES);
+        }
+        catch (final InterruptedException e)
+        {
+            Thread.interrupted();
+            fail();
+        }
+        assertEquals("value", val.get());
+    }
+
+    @Test
+    public void concurrentTimeout()
+    {
+        final AtomicBoolean doAgain = new AtomicBoolean(true);
+        final CountDownLatch endWriter = new CountDownLatch(1);
+        final Thread writer = new Thread()
+        {
+            @Override
+            public void run()
+            {
+                while (doAgain.get())
+                {
+                    service.write("test", "value");
+                    service.force();
+                }
+                endWriter.countDown();
+            }
+        };
+
+        final CountDownLatch endReader = new CountDownLatch(1);
+        final Thread reader = new Thread()
+        {
+            @Override
+            public void run()
+            {
+                while (doAgain.get())
+                {
+                    try
+                    {
+                        service.read("test");
+                    }
+                    catch (final IllegalStateException e)
+                    {
+                        doAgain.set(false);
+                    }
+                }
+                endReader.countDown();
+            }
+        };
+
+        reader.start();
+        writer.start();
+        try
+        {
+            endReader.await(1, TimeUnit.MINUTES);
+            endWriter.await(1, TimeUnit.MINUTES);
+        }
+        catch (final InterruptedException e)
+        {
+            Thread.interrupted();
+            fail();
+        }
+        assertEquals("value", service.read("test"));
+    }
+}

Reply via email to