Repository: deltaspike Updated Branches: refs/heads/master a571ac650 -> dbd4fb78e
DELTASPIKE-1094 DELTASPIKE-1093 @Throttled and @Futureable Project: http://git-wip-us.apache.org/repos/asf/deltaspike/repo Commit: http://git-wip-us.apache.org/repos/asf/deltaspike/commit/dbd4fb78 Tree: http://git-wip-us.apache.org/repos/asf/deltaspike/tree/dbd4fb78 Diff: http://git-wip-us.apache.org/repos/asf/deltaspike/diff/dbd4fb78 Branch: refs/heads/master Commit: dbd4fb78e1af1acb1e59e3f2de59979b012486b1 Parents: a571ac6 Author: Romain manni-Bucau <rmannibu...@gmail.com> Authored: Thu Mar 17 12:09:18 2016 +0100 Committer: Romain manni-Bucau <rmannibu...@gmail.com> Committed: Thu Mar 17 12:09:18 2016 +0100 ---------------------------------------------------------------------- .../deltaspike/core/api/future/Futureable.java | 44 +++ .../core/api/throttling/Throttled.java | 56 ++++ .../core/api/throttling/Throttling.java | 73 +++++ .../core/impl/future/FutureableInterceptor.java | 324 +++++++++++++++++++ .../impl/throttling/ThrottledInterceptor.java | 197 +++++++++++ .../impl/src/main/resources/META-INF/beans.xml | 4 + .../test/core/impl/future/FutureableTest.java | 66 ++++ .../test/core/impl/future/Service.java | 61 ++++ .../test/core/impl/throttling/Service.java | 60 ++++ .../test/core/impl/throttling/Service2.java | 69 ++++ .../core/impl/throttling/ThrottledTest.java | 277 ++++++++++++++++ 11 files changed, 1231 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/deltaspike/blob/dbd4fb78/deltaspike/core/api/src/main/java/org/apache/deltaspike/core/api/future/Futureable.java ---------------------------------------------------------------------- diff --git a/deltaspike/core/api/src/main/java/org/apache/deltaspike/core/api/future/Futureable.java b/deltaspike/core/api/src/main/java/org/apache/deltaspike/core/api/future/Futureable.java new file mode 100644 index 0000000..2a725c2 --- /dev/null +++ b/deltaspike/core/api/src/main/java/org/apache/deltaspike/core/api/future/Futureable.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.deltaspike.core.api.future; + +import javax.enterprise.util.Nonbinding; +import javax.interceptor.InterceptorBinding; +import java.lang.annotation.Retention; +import java.lang.annotation.Target; + +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.ElementType.TYPE; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +/** + * Mark the method as execute in a thread pool and not synchronously. + * Note: it should return a CompletionStage or Future. + */ +@InterceptorBinding +@Retention(RUNTIME) +@Target({ TYPE, METHOD }) +public @interface Futureable +{ + /** + * @return pool name, if not existing will use a default one based on processor count. + */ + @Nonbinding + String value() default ""; +} http://git-wip-us.apache.org/repos/asf/deltaspike/blob/dbd4fb78/deltaspike/core/api/src/main/java/org/apache/deltaspike/core/api/throttling/Throttled.java ---------------------------------------------------------------------- diff --git a/deltaspike/core/api/src/main/java/org/apache/deltaspike/core/api/throttling/Throttled.java b/deltaspike/core/api/src/main/java/org/apache/deltaspike/core/api/throttling/Throttled.java new file mode 100644 index 0000000..4dcc0d2 --- /dev/null +++ b/deltaspike/core/api/src/main/java/org/apache/deltaspike/core/api/throttling/Throttled.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.deltaspike.core.api.throttling; + +import javax.enterprise.util.Nonbinding; +import javax.interceptor.InterceptorBinding; +import java.lang.annotation.Retention; +import java.lang.annotation.Target; +import java.util.concurrent.TimeUnit; + +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.ElementType.TYPE; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +/** + * Mark a bean/method as relying on a throttler. + */ +@InterceptorBinding +@Retention(RUNTIME) +@Target({ TYPE, METHOD }) +public @interface Throttled +{ + /** + * @return the duration to wait to acquire the permits. + */ + @Nonbinding + long timeout() default 0L; + + /** + * @return the unit of timeout(). + */ + @Nonbinding + TimeUnit timeoutUnit() default TimeUnit.MILLISECONDS; + + /** + * @return how many permits to require. + */ + @Nonbinding + int weight() default 1; +} http://git-wip-us.apache.org/repos/asf/deltaspike/blob/dbd4fb78/deltaspike/core/api/src/main/java/org/apache/deltaspike/core/api/throttling/Throttling.java ---------------------------------------------------------------------- diff --git a/deltaspike/core/api/src/main/java/org/apache/deltaspike/core/api/throttling/Throttling.java b/deltaspike/core/api/src/main/java/org/apache/deltaspike/core/api/throttling/Throttling.java new file mode 100644 index 0000000..fdf2cf1 --- /dev/null +++ b/deltaspike/core/api/src/main/java/org/apache/deltaspike/core/api/throttling/Throttling.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.deltaspike.core.api.throttling; + +import javax.enterprise.util.Nonbinding; +import java.lang.annotation.Retention; +import java.lang.annotation.Target; +import javax.enterprise.inject.spi.AnnotatedMethod; +import java.util.concurrent.Semaphore; + +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.ElementType.TYPE; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +/** + * Configure the throttler associated to the class/method. + */ +@Retention(RUNTIME) +@Target({ TYPE, METHOD }) +public @interface Throttling +{ + /** + * @return how to get the semaphore. Default to a plain Semaphore of the JVM. + */ + @Nonbinding + Class<? extends SemaphoreFactory> factory() default SemaphoreFactory.class; + + /** + * @return true if the semaphore is fair false otherwise. + */ + @Nonbinding + boolean fair() default false; + + /** + * @return how many permits has the semaphore. + */ + @Nonbinding + int permits() default 1; + + /** + * @return name/bucket of this configuration (allow to have multiple buckets per class but default is 1 per class). + */ + @Nonbinding + String name() default ""; + + interface SemaphoreFactory + { + /** + * @param method the intercepted method. + * @param name bucket name. + * @param fair should the semaphore be fair. + * @param permits maximum permits the semaphore shoulg get. + * @return the semaphore build accordingly the parameters. + */ + Semaphore newSemaphore(AnnotatedMethod<?> method, String name, boolean fair, int permits); + } +} http://git-wip-us.apache.org/repos/asf/deltaspike/blob/dbd4fb78/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/future/FutureableInterceptor.java ---------------------------------------------------------------------- diff --git a/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/future/FutureableInterceptor.java b/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/future/FutureableInterceptor.java new file mode 100644 index 0000000..a1ae35b --- /dev/null +++ b/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/future/FutureableInterceptor.java @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.deltaspike.core.impl.future; + +import org.apache.deltaspike.core.api.config.ConfigResolver; +import org.apache.deltaspike.core.api.future.Futureable; + +import javax.annotation.PreDestroy; +import javax.enterprise.context.ApplicationScoped; +import javax.enterprise.inject.spi.AnnotatedMethod; +import javax.enterprise.inject.spi.AnnotatedType; +import javax.enterprise.inject.spi.BeanManager; +import javax.inject.Inject; +import javax.interceptor.AroundInvoke; +import javax.interceptor.Interceptor; +import javax.interceptor.InvocationContext; +import java.io.Serializable; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +@Interceptor +@Futureable("") +public class FutureableInterceptor implements Serializable +{ + private static final Class<?> COMPLETION_STAGE; + private static final Class<?> COMPLETABLE_FUTURE; + private static final Method COMPLETABLE_STAGE_TO_FUTURE; + private static final Method COMPLETABLE_FUTURE_COMPLETE; + private static final Method COMPLETABLE_FUTURE_COMPLETE_ERROR; + + static + { + Class<?> completionStageClass = null; + Class<?> completableFutureClass = null; + Method completionStageClassToCompletableFuture = null; + Method completableFutureComplete = null; + Method completableFutureCompleteError = null; + try + { + final ClassLoader classLoader = ClassLoader.getSystemClassLoader(); + completionStageClass = classLoader.loadClass("java.util.concurrent.CompletionStage"); + completionStageClassToCompletableFuture = completionStageClass.getMethod("toCompletableFuture"); + completableFutureClass = classLoader.loadClass("java.util.concurrent.CompletableFuture"); + completableFutureComplete = completableFutureClass.getMethod("complete", Object.class); + completableFutureCompleteError = completableFutureClass.getMethod("completeExceptionally", Throwable.class); + } + catch (final Exception e) + { + // not on java 8 + } + COMPLETION_STAGE = completionStageClass; + COMPLETABLE_FUTURE = completableFutureClass; + COMPLETABLE_STAGE_TO_FUTURE = completionStageClassToCompletableFuture; + COMPLETABLE_FUTURE_COMPLETE = completableFutureComplete; + COMPLETABLE_FUTURE_COMPLETE_ERROR = completableFutureCompleteError; + } + + @Inject + private ThreadPoolManager manager; + + @Inject + private BeanManager beanManager; + + private transient ConcurrentMap<Method, ExecutorService> configByMethod = + new ConcurrentHashMap<Method, ExecutorService>(); + + @AroundInvoke + public Object invoke(final InvocationContext ic) throws Exception + { + // validate usage + final Class<?> returnType = ic.getMethod().getReturnType(); + if (!COMPLETION_STAGE.isAssignableFrom(returnType) && !Future.class.isAssignableFrom(returnType)) + { + throw new IllegalArgumentException("Return type should be a CompletableStage or Future"); + } + + if (configByMethod == null) + { + synchronized (this) + { + if (configByMethod == null) + { + configByMethod = new ConcurrentHashMap<Method, ExecutorService>(); + } + } + } + + // running < j8 we cant have cancellation + //final AtomicReference<Callable<?>> cancelHook = new AtomicReference<Callable<?>>(); + final Callable<Object> invocation = new Callable<Object>() + { + @Override + public Object call() throws Exception + { + try + { + final Object proceed = ic.proceed(); + final Future<?> future = !COMPLETION_STAGE.isInstance(proceed) ? + Future.class.cast(proceed) : + Future.class.cast(COMPLETABLE_STAGE_TO_FUTURE.invoke(proceed)); + /* + cancelHook.set(new Callable<Boolean>() + { + @Override + public Boolean call() + { + return future.cancel(true); + } + }); + */ + return future.get(); + } + catch (final InvocationTargetException e) + { + throw rethrow(e.getCause()); + } + catch (final Exception e) + { + throw rethrow(e); + } + } + }; + + final ExecutorService pool = getOrCreatePool(ic); + if (COMPLETABLE_FUTURE == null) // not on java 8 can only be a future + { + return pool.submit(invocation); + } + + // java 8, use CompletableFuture, it impl CompletionStage and Future so everyone is happy + final Object completableFuture = COMPLETABLE_FUTURE.newInstance(); + pool.submit(new J8PromiseCompanionTask(completableFuture, invocation)); + // TODO: handle cancel + return completableFuture; + } + + private RuntimeException rethrow(final Throwable cause) + { + if (RuntimeException.class.isInstance(cause)) + { + return RuntimeException.class.cast(cause); + } + return new IllegalStateException(cause); + } + + private ExecutorService getOrCreatePool(final InvocationContext ic) + { + final Method method = ic.getMethod(); + ExecutorService executorService = configByMethod.get(method); + if (executorService == null) + { + final AnnotatedType<?> annotatedType = beanManager.createAnnotatedType(method.getDeclaringClass()); + AnnotatedMethod<?> annotatedMethod = null; + for (final AnnotatedMethod<?> am : annotatedType.getMethods()) + { + if (am.getJavaMember().equals(method)) + { + annotatedMethod = am; + break; + } + } + if (annotatedMethod == null) + { + throw new IllegalStateException("No annotated method for " + method); + } + final Futureable methodConfig = annotatedMethod.getAnnotation(Futureable.class); + final ExecutorService instance = manager.find( + (methodConfig == null ? annotatedType.getAnnotation(Futureable.class) : methodConfig).value()); + configByMethod.putIfAbsent(method, instance); + executorService = instance; + } + return executorService; + } + + @ApplicationScoped + public static class ThreadPoolManager + { + private final ConcurrentMap<String, ExecutorService> pools = new ConcurrentHashMap<String, ExecutorService>(); + private volatile ExecutorService defaultPool; + private volatile boolean closed = false; + + @PreDestroy + private void shutdown() + { + closed = true; + final String propertyValue = ConfigResolver.getPropertyValue("deltaspike.future.timeout"); + final long timeout = propertyValue == null ? TimeUnit.MINUTES.toMillis(1) : Integer.parseInt(propertyValue); + for (final ExecutorService es : pools.values()) + { + es.shutdown(); + try + { + es.awaitTermination(timeout, TimeUnit.MILLISECONDS); + } + catch (final InterruptedException e) + { + Thread.interrupted(); + } + } + if (defaultPool != null) + { + defaultPool.shutdown(); + try + { + defaultPool.awaitTermination(timeout, TimeUnit.MILLISECONDS); + } + catch (final InterruptedException e) + { + Thread.interrupted(); + } + } + pools.clear(); + } + + // open door for users until we have a config, should be part of API but since it can change keeping it there + public void register(final String name, final ExecutorService es) + { + pools.putIfAbsent(name, es); + } + + public ExecutorService find(final String name) + { + if (closed) + { + throw new IllegalStateException("Container is shutting down"); + } + ExecutorService pool = pools.get(name); + if (pool == null) + { + ensureDefaultPool(); + pool = defaultPool; + } + return pool; + } + + private void ensureDefaultPool() + { + if (defaultPool == null) + { + synchronized (this) + { + if (defaultPool == null) + { + defaultPool = Executors.newFixedThreadPool( + Math.max(2, Runtime.getRuntime().availableProcessors())); + } + } + } + } + } + + private static final class J8PromiseCompanionTask<T> implements Runnable + { + private Object dep; + private Callable<T> fn; + + J8PromiseCompanionTask(final Object dep, Callable<T> fn) + { + this.dep = dep; + this.fn = fn; + } + + public void run() + { + try + { + COMPLETABLE_FUTURE_COMPLETE.invoke(dep, fn.call()); + } + catch (final InvocationTargetException e) + { + try + { + COMPLETABLE_FUTURE_COMPLETE_ERROR.invoke(dep, e.getCause()); + } + catch (IllegalAccessException e1) + { + throw new IllegalStateException(e1); + } + catch (final InvocationTargetException e1) + { + throw new IllegalStateException(e1.getCause()); + } + } + catch (Exception e) + { + try + { + COMPLETABLE_FUTURE_COMPLETE_ERROR.invoke(dep, e); + } + catch (IllegalAccessException e1) + { + throw new IllegalStateException(e1); + } + catch (final InvocationTargetException e1) + { + throw new IllegalStateException(e1.getCause()); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/deltaspike/blob/dbd4fb78/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/throttling/ThrottledInterceptor.java ---------------------------------------------------------------------- diff --git a/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/throttling/ThrottledInterceptor.java b/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/throttling/ThrottledInterceptor.java new file mode 100644 index 0000000..6b9e1f8 --- /dev/null +++ b/deltaspike/core/impl/src/main/java/org/apache/deltaspike/core/impl/throttling/ThrottledInterceptor.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.deltaspike.core.impl.throttling; + +import org.apache.deltaspike.core.api.throttling.Throttled; +import org.apache.deltaspike.core.api.throttling.Throttling; + +import javax.enterprise.context.ApplicationScoped; +import javax.enterprise.inject.Typed; +import javax.enterprise.inject.spi.AnnotatedMethod; +import javax.enterprise.inject.spi.AnnotatedType; +import javax.enterprise.inject.spi.BeanManager; +import javax.inject.Inject; +import javax.interceptor.AroundInvoke; +import javax.interceptor.Interceptor; +import javax.interceptor.InvocationContext; +import java.io.Serializable; +import java.lang.reflect.Method; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; + +@Throttled +@Interceptor +public class ThrottledInterceptor implements Serializable +{ + @Inject + private LocalCache metadata; + + @AroundInvoke + public Object invoke(final InvocationContext ic) throws Exception + { + return metadata.getOrCreateInvocation(ic).invoke(ic); + } + + private static Semaphore onInterruption(final InterruptedException e) + { + Thread.interrupted(); + throw new IllegalStateException("acquire() interrupted", e); + } + + @ApplicationScoped + @Typed(LocalCache.class) + static class LocalCache implements Throttling.SemaphoreFactory + { + private final ConcurrentMap<String, Semaphore> semaphores = new ConcurrentHashMap<String, Semaphore>(); + private final ConcurrentMap<Method, Invocation> providers = new ConcurrentHashMap<Method, Invocation>(); + + @Inject + private BeanManager beanManager; + + Invocation getOrCreateInvocation(final InvocationContext ic) + { + final Method method = ic.getMethod(); + Invocation i = providers.get(method); + if (i == null) + { + final Class declaringClass = method.getDeclaringClass(); + final AnnotatedType<Object> annotatedType = beanManager.createAnnotatedType(declaringClass); + AnnotatedMethod<?> annotatedMethod = null; + for (final AnnotatedMethod<?> am : annotatedType.getMethods()) + { + if (am.getJavaMember().equals(method)) + { + annotatedMethod = am; + break; + } + } + if (annotatedMethod == null) + { + throw new IllegalStateException("No annotated method for " + method); + } + + Throttled config = annotatedMethod.getAnnotation(Throttled.class); + if (config == null) + { + config = annotatedType.getAnnotation(Throttled.class); + } + Throttling sharedConfig = annotatedMethod.getAnnotation(Throttling.class); + if (sharedConfig == null) + { + sharedConfig = annotatedType.getAnnotation(Throttling.class); + } + + final Throttling.SemaphoreFactory factory = + sharedConfig != null && sharedConfig.factory() != Throttling.SemaphoreFactory.class ? + Throttling.SemaphoreFactory.class.cast( + beanManager.getReference(beanManager.resolve( + beanManager.getBeans( + sharedConfig.factory())), + Throttling.SemaphoreFactory.class, null)) : this; + + final Semaphore semaphore = factory.newSemaphore( + annotatedMethod, + sharedConfig != null && !sharedConfig.name().isEmpty() ? + sharedConfig.name() : declaringClass.getName(), + sharedConfig != null && sharedConfig.fair(), + sharedConfig != null ? sharedConfig.permits() : 1); + final long timeout = config.timeoutUnit().toMillis(config.timeout()); + final int weigth = config.weight(); + i = new Invocation(semaphore, weigth, timeout); + final Invocation existing = providers.putIfAbsent(ic.getMethod(), i); + if (existing != null) + { + i = existing; + } + } + return i; + } + + @Override + public Semaphore newSemaphore(final AnnotatedMethod<?> method, final String name, + final boolean fair, final int permits) + { + Semaphore semaphore = semaphores.get(name); + if (semaphore == null) + { + semaphore = new Semaphore(permits, fair); + final Semaphore existing = semaphores.putIfAbsent(name, semaphore); + if (existing != null) + { + semaphore = existing; + } + } + return semaphore; + } + } + + private static final class Invocation + { + private final int weight; + private final Semaphore semaphore; + private final long timeout; + + private Invocation(final Semaphore semaphore, final int weight, final long timeout) + { + this.semaphore = semaphore; + this.weight = weight; + this.timeout = timeout; + } + + Object invoke(final InvocationContext context) throws Exception + { + if (timeout > 0) + { + try + { + if (!semaphore.tryAcquire(weight, timeout, TimeUnit.MILLISECONDS)) + { + throw new IllegalStateException("Can't acquire " + weight + + " permits for " + context.getMethod() + " in " + timeout + "ms"); + } + } + catch (final InterruptedException e) + { + return onInterruption(e); + } + } + else + { + try + { + semaphore.acquire(weight); + } + catch (final InterruptedException e) + { + return onInterruption(e); + } + } + try + { + return context.proceed(); + } + finally + { + semaphore.release(weight); + } + } + } +} http://git-wip-us.apache.org/repos/asf/deltaspike/blob/dbd4fb78/deltaspike/core/impl/src/main/resources/META-INF/beans.xml ---------------------------------------------------------------------- diff --git a/deltaspike/core/impl/src/main/resources/META-INF/beans.xml b/deltaspike/core/impl/src/main/resources/META-INF/beans.xml index a2308ff..9994be5 100644 --- a/deltaspike/core/impl/src/main/resources/META-INF/beans.xml +++ b/deltaspike/core/impl/src/main/resources/META-INF/beans.xml @@ -20,4 +20,8 @@ <beans xmlns="http://java.sun.com/xml/ns/javaee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/beans_1_0.xsd"> + <interceptors> + <class>org.apache.deltaspike.core.impl.throttling.ThrottledInterceptor</class> + <class>org.apache.deltaspike.core.impl.future.FutureableInterceptor</class> + </interceptors> </beans> http://git-wip-us.apache.org/repos/asf/deltaspike/blob/dbd4fb78/deltaspike/core/impl/src/test/java/org/apache/deltaspike/test/core/impl/future/FutureableTest.java ---------------------------------------------------------------------- diff --git a/deltaspike/core/impl/src/test/java/org/apache/deltaspike/test/core/impl/future/FutureableTest.java b/deltaspike/core/impl/src/test/java/org/apache/deltaspike/test/core/impl/future/FutureableTest.java new file mode 100644 index 0000000..afc37e7 --- /dev/null +++ b/deltaspike/core/impl/src/test/java/org/apache/deltaspike/test/core/impl/future/FutureableTest.java @@ -0,0 +1,66 @@ +package org.apache.deltaspike.test.core.impl.future; + +import org.apache.deltaspike.test.util.ArchiveUtils; +import org.jboss.arquillian.container.test.api.Deployment; +import org.jboss.arquillian.junit.Arquillian; +import org.jboss.shrinkwrap.api.ShrinkWrap; +import org.jboss.shrinkwrap.api.asset.EmptyAsset; +import org.jboss.shrinkwrap.api.spec.JavaArchive; +import org.jboss.shrinkwrap.api.spec.WebArchive; +import org.junit.Test; +import org.junit.runner.RunWith; + +import javax.inject.Inject; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +@RunWith(Arquillian.class) +public class FutureableTest { + @Deployment + public static WebArchive deploy() + { + JavaArchive testJar = ShrinkWrap.create(JavaArchive.class, "FutureableTest.jar") + .addPackage(Service.class.getPackage().getName()) + .addAsManifestResource(EmptyAsset.INSTANCE, "beans.xml"); + + return ShrinkWrap.create(WebArchive.class, "FutureableTest.war") + .addAsLibraries(ArchiveUtils.getDeltaSpikeCoreArchive()) + .addAsLibraries(testJar) + .addAsWebInfResource(EmptyAsset.INSTANCE, "beans.xml"); + } + + @Inject + private Service service; + + @Test + public void future() + { + final Future<String> future = service.thatSLong(1000); + int count = 0; + for (int i = 0; i < 1000; i++) + { + if (future.isDone()) + { + break; + } + count++; + } + try + { + assertEquals("done", future.get()); + } + catch (final InterruptedException e) + { + Thread.interrupted(); + fail(); + } + catch (final ExecutionException e) + { + fail(e.getMessage()); + } + assertEquals(1000, count); + } +} http://git-wip-us.apache.org/repos/asf/deltaspike/blob/dbd4fb78/deltaspike/core/impl/src/test/java/org/apache/deltaspike/test/core/impl/future/Service.java ---------------------------------------------------------------------- diff --git a/deltaspike/core/impl/src/test/java/org/apache/deltaspike/test/core/impl/future/Service.java b/deltaspike/core/impl/src/test/java/org/apache/deltaspike/test/core/impl/future/Service.java new file mode 100644 index 0000000..2212302 --- /dev/null +++ b/deltaspike/core/impl/src/test/java/org/apache/deltaspike/test/core/impl/future/Service.java @@ -0,0 +1,61 @@ +package org.apache.deltaspike.test.core.impl.future; + +import org.apache.deltaspike.core.api.future.Futureable; + +import javax.enterprise.context.ApplicationScoped; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +@ApplicationScoped +public class Service +{ + @Futureable // or CompletableFuture<String> + public Future<String> thatSLong(final long sleep) + { + try + { + Thread.sleep(sleep); + // return CompletableFuture.completedFuture("done"); + return new Future<String>() // EE will have AsyncFuture but more designed for j8 ^^ + { + @Override + public boolean cancel(final boolean mayInterruptIfRunning) + { + return false; + } + + @Override + public boolean isCancelled() + { + return false; + } + + @Override + public boolean isDone() + { + return true; + } + + @Override + public String get() throws InterruptedException, ExecutionException + { + return "done"; + } + + @Override + public String get(final long timeout, final TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException + { + return "done"; + } + }; + } + catch (final InterruptedException e) + { + Thread.interrupted(); + throw new IllegalStateException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/deltaspike/blob/dbd4fb78/deltaspike/core/impl/src/test/java/org/apache/deltaspike/test/core/impl/throttling/Service.java ---------------------------------------------------------------------- diff --git a/deltaspike/core/impl/src/test/java/org/apache/deltaspike/test/core/impl/throttling/Service.java b/deltaspike/core/impl/src/test/java/org/apache/deltaspike/test/core/impl/throttling/Service.java new file mode 100644 index 0000000..95f7e7f --- /dev/null +++ b/deltaspike/core/impl/src/test/java/org/apache/deltaspike/test/core/impl/throttling/Service.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.deltaspike.test.core.impl.throttling; + +import org.apache.deltaspike.core.api.throttling.Throttled; + +import javax.enterprise.context.ApplicationScoped; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.fail; + +@ApplicationScoped +public class Service +{ + private final Map<String, String> entries = new HashMap<String, String>(); + + @Throttled(timeout = 1, timeoutUnit = TimeUnit.SECONDS) + public String read(final String k) + { + return entries.get(k); + } + + @Throttled(timeout = 1, timeoutUnit = TimeUnit.SECONDS) + public void write(final String k, final String v) + { + entries.put(k, v); + } + + @Throttled + public void force() + { + try + { + Thread.sleep(TimeUnit.SECONDS.toMillis(5)); + } + catch (final InterruptedException e) + { + Thread.interrupted(); + fail(); + } + } +} http://git-wip-us.apache.org/repos/asf/deltaspike/blob/dbd4fb78/deltaspike/core/impl/src/test/java/org/apache/deltaspike/test/core/impl/throttling/Service2.java ---------------------------------------------------------------------- diff --git a/deltaspike/core/impl/src/test/java/org/apache/deltaspike/test/core/impl/throttling/Service2.java b/deltaspike/core/impl/src/test/java/org/apache/deltaspike/test/core/impl/throttling/Service2.java new file mode 100644 index 0000000..4d81d51 --- /dev/null +++ b/deltaspike/core/impl/src/test/java/org/apache/deltaspike/test/core/impl/throttling/Service2.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.deltaspike.test.core.impl.throttling; + +import org.apache.deltaspike.core.api.throttling.Throttled; +import org.apache.deltaspike.core.api.throttling.Throttling; + +import javax.enterprise.context.ApplicationScoped; +import java.util.ArrayList; +import java.util.Collection; + +@Throttling(permits = 2) +@ApplicationScoped +public class Service2 +{ + private final Collection<String> called = new ArrayList<String>(); + + @Throttled(timeout = 750) + public void call(final String k) + { + synchronized (called) + { + called.add(k); + } + try + { + Thread.sleep(1000); + } + catch (final InterruptedException e) + { + Thread.interrupted(); + } + } + + @Throttled(weight = 2) + public void heavy(final Runnable inTask) + { + inTask.run(); + try + { + Thread.sleep(5000); + } + catch (final InterruptedException e) + { + Thread.interrupted(); + } + } + + public Collection<String> getCalled() + { + return called; + } +} http://git-wip-us.apache.org/repos/asf/deltaspike/blob/dbd4fb78/deltaspike/core/impl/src/test/java/org/apache/deltaspike/test/core/impl/throttling/ThrottledTest.java ---------------------------------------------------------------------- diff --git a/deltaspike/core/impl/src/test/java/org/apache/deltaspike/test/core/impl/throttling/ThrottledTest.java b/deltaspike/core/impl/src/test/java/org/apache/deltaspike/test/core/impl/throttling/ThrottledTest.java new file mode 100644 index 0000000..261a434 --- /dev/null +++ b/deltaspike/core/impl/src/test/java/org/apache/deltaspike/test/core/impl/throttling/ThrottledTest.java @@ -0,0 +1,277 @@ +package org.apache.deltaspike.test.core.impl.throttling; + +import org.apache.deltaspike.test.category.SeCategory; +import org.apache.deltaspike.test.util.ArchiveUtils; +import org.jboss.arquillian.container.test.api.Deployment; +import org.jboss.arquillian.junit.Arquillian; +import org.jboss.shrinkwrap.api.ShrinkWrap; +import org.jboss.shrinkwrap.api.asset.EmptyAsset; +import org.jboss.shrinkwrap.api.spec.JavaArchive; +import org.jboss.shrinkwrap.api.spec.WebArchive; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import javax.inject.Inject; +import java.util.HashSet; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static java.util.Arrays.asList; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +@RunWith(Arquillian.class) +@Category(SeCategory.class) +public class ThrottledTest { + @Deployment + public static WebArchive deploy() + { + JavaArchive testJar = ShrinkWrap.create(JavaArchive.class, "ThrottledTest.jar") + .addPackage(Service.class.getPackage().getName()) + .addAsManifestResource(EmptyAsset.INSTANCE, "beans.xml"); + + return ShrinkWrap.create(WebArchive.class, "ThrottledTest.war") + .addAsLibraries(ArchiveUtils.getDeltaSpikeCoreArchive()) + .addAsLibraries(testJar) + .addAsWebInfResource(EmptyAsset.INSTANCE, "beans.xml"); + } + + @Inject + private Service service; + + @Inject + private Service2 service2; + + @Test + public void permits() + { + {// failling case now + final AtomicReference<Exception> failed = new AtomicReference<Exception>(); + final CountDownLatch latch = new CountDownLatch(2); + final Thread[] concurrents = new Thread[] + { + new Thread() + { + @Override + public void run() + { + service2.heavy(new Runnable() + { + @Override + public void run() + { + latch.countDown(); + } + }); + } + }, + new Thread() + { + @Override + public void run() + { + try + { + latch.await(); + } + catch (final InterruptedException e) + { + Thread.interrupted(); + fail(); + } + try + { + service2.call("failed"); + fail(); + } + catch (final IllegalStateException ise) + { + failed.set(ise); + } + } + } + }; + for (final Thread t : concurrents) + { + t.start(); + } + latch.countDown(); + waitForThreads(concurrents); + assertNotNull(failed.get()); + assertThat(failed.get(), instanceOf(IllegalStateException.class)); + } + { // passing + final CountDownLatch latch = new CountDownLatch(1); + final Thread[] concurrents = new Thread[] + { + new Thread() + { + @Override + public void run() + { + try + { + latch.await(); + } + catch (final InterruptedException e) + { + Thread.interrupted(); + fail(); + } + service2.call("1"); + } + }, + new Thread() + { + @Override + public void run() + { + try + { + latch.await(); + } + catch (final InterruptedException e) + { + Thread.interrupted(); + fail(); + } + service2.call("2"); + } + } + }; + for (final Thread t : concurrents) + { + t.start(); + } + latch.countDown(); + waitForThreads(concurrents); + assertEquals(new HashSet<String>(asList("1", "2")), new HashSet<String>(service2.getCalled())); + } + } + + private void waitForThreads(final Thread[] concurrents) + { + for (final Thread t : concurrents) + { + try + { + t.join(); + } + catch (final InterruptedException e) + { + Thread.interrupted(); + fail(); + } + } + } + + @Test + public void simpleNotConcurrent() + { // ~lock case + final CountDownLatch synchro = new CountDownLatch(1); + final Thread writer = new Thread() + { + @Override + public void run() + { + service.write("test", "value"); + synchro.countDown(); + } + }; + + final CountDownLatch end = new CountDownLatch(1); + final AtomicReference<String> val = new AtomicReference<String>(); + final Thread reader = new Thread() + { + @Override + public void run() + { + try + { + synchro.await(1, TimeUnit.MINUTES); + } + catch (final InterruptedException e) + { + Thread.interrupted(); + fail(); + } + val.set(service.read("test")); + end.countDown(); + } + }; + + reader.start(); + writer.start(); + try + { + end.await(1, TimeUnit.MINUTES); + } + catch (final InterruptedException e) + { + Thread.interrupted(); + fail(); + } + assertEquals("value", val.get()); + } + + @Test + public void concurrentTimeout() + { + final AtomicBoolean doAgain = new AtomicBoolean(true); + final CountDownLatch endWriter = new CountDownLatch(1); + final Thread writer = new Thread() + { + @Override + public void run() + { + while (doAgain.get()) + { + service.write("test", "value"); + service.force(); + } + endWriter.countDown(); + } + }; + + final CountDownLatch endReader = new CountDownLatch(1); + final Thread reader = new Thread() + { + @Override + public void run() + { + while (doAgain.get()) + { + try + { + service.read("test"); + } + catch (final IllegalStateException e) + { + doAgain.set(false); + } + } + endReader.countDown(); + } + }; + + reader.start(); + writer.start(); + try + { + endReader.await(1, TimeUnit.MINUTES); + endWriter.await(1, TimeUnit.MINUTES); + } + catch (final InterruptedException e) + { + Thread.interrupted(); + fail(); + } + assertEquals("value", service.read("test")); + } +}