This is an automated email from the ASF dual-hosted git repository. danhaywood pushed a commit to branch ISIS-1852_parallel_stream_patch in repository https://gitbox.apache.org/repos/asf/isis.git
commit e29b26f535079e051b3d84f6a148dfff9c4bd7bc Author: Andi Huber <ahu...@apache.org> AuthorDate: Mon Jan 29 15:06:27 2018 +0100 ISIS-1852 allow local thread variable propagation to any child threads --- .../core/runtime/services/ServiceInstantiator.java | 6 +- .../runtime/services/ServiceInstantiatorTest.java | 281 ++++++++++++--------- 2 files changed, 167 insertions(+), 120 deletions(-) diff --git a/core/runtime/src/main/java/org/apache/isis/core/runtime/services/ServiceInstantiator.java b/core/runtime/src/main/java/org/apache/isis/core/runtime/services/ServiceInstantiator.java index 2b6591b..77adb02 100644 --- a/core/runtime/src/main/java/org/apache/isis/core/runtime/services/ServiceInstantiator.java +++ b/core/runtime/src/main/java/org/apache/isis/core/runtime/services/ServiceInstantiator.java @@ -157,11 +157,15 @@ public final class ServiceInstantiator { final T newInstance = proxySubclass.newInstance(); final ProxyObject proxyObject = (ProxyObject) newInstance; proxyObject.setHandler(new MethodHandler() { - private ThreadLocal<T> serviceByThread = new ThreadLocal<>(); + // Allow serviceByThread to be propagated from the thread that starts the request + // to any child-threads, hence InheritableThreadLocal. + private InheritableThreadLocal<T> serviceByThread = new InheritableThreadLocal<>(); @Override public Object invoke(final Object proxied, final Method proxyMethod, final Method proxiedMethod, final Object[] args) throws Throwable { + System.out.println("invoke: "+proxyMethod.getName()+" "+this); + cacheMethodsIfNecessary(cls); if(proxyMethod.getName().equals("__isis_startRequest")) { diff --git a/core/runtime/src/test/java/org/apache/isis/core/runtime/services/ServiceInstantiatorTest.java b/core/runtime/src/test/java/org/apache/isis/core/runtime/services/ServiceInstantiatorTest.java index 3237b51..5b39423 100644 --- a/core/runtime/src/test/java/org/apache/isis/core/runtime/services/ServiceInstantiatorTest.java +++ b/core/runtime/src/test/java/org/apache/isis/core/runtime/services/ServiceInstantiatorTest.java @@ -16,134 +16,177 @@ */ package org.apache.isis.core.runtime.services; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAdder; + import javax.enterprise.context.RequestScoped; + +import org.apache.isis.core.commons.config.IsisConfigurationDefault; +import org.apache.isis.core.metamodel.services.ServicesInjector; +import org.apache.isis.core.unittestsupport.jmocking.JUnitRuleMockery2; import org.jmock.auto.Mock; import org.junit.Before; import org.junit.Rule; import org.junit.Test; -import org.apache.isis.core.commons.config.IsisConfigurationDefault; -import org.apache.isis.core.metamodel.services.ServicesInjector; -import org.apache.isis.core.unittestsupport.jmocking.JUnitRuleMockery2; +public class ServiceInstantiatorTest { -import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.assertThat; + @Rule + public JUnitRuleMockery2 context = JUnitRuleMockery2.createFor(JUnitRuleMockery2.Mode.INTERFACES_AND_CLASSES); -public class ServiceInstantiatorTest { + private ServiceInstantiator serviceInstantiator; + + @JUnitRuleMockery2.Ignoring + @Mock + private ServicesInjector mockServiceInjector; + + @Before + public void setUp() throws Exception { + + serviceInstantiator = new ServiceInstantiator(); + serviceInstantiator.setConfiguration(new IsisConfigurationDefault()); + } + + @Test + public void singleton() { + SingletonCalculator calculator = serviceInstantiator.createInstance(SingletonCalculator.class); + assertThat(calculator.add(3,4), is(7)); + } + + @Test + public void requestScoped_instantiate() { + AccumulatingCalculator calculator = serviceInstantiator.createInstance(AccumulatingCalculator.class); + assertThat(calculator instanceof RequestScopedService, is(true)); + } + + @Test + public void requestScoped_justOneThread() { + AccumulatingCalculator calculator = serviceInstantiator.createInstance(AccumulatingCalculator.class); + try { + ((RequestScopedService)calculator).__isis_startRequest(mockServiceInjector); + assertThat(calculator.add(3), is(3)); + assertThat(calculator.add(4), is(7)); + assertThat(calculator.getTotal(), is(7)); + } finally { + ((RequestScopedService)calculator).__isis_endRequest(); + } + } + + @Test + public void requestScoped_multipleThreads() throws InterruptedException, BrokenBarrierException { + + final AccumulatingCalculator calculator = serviceInstantiator.createInstance(AccumulatingCalculator.class); + + // will ask each thread's calculator to increment 10 times + final int[] steps = new int[]{10}; + + // each thread will post its totals here + final int[] totals = new int[]{0,0,0}; + + // after each step, all threads wait. The +1 is for this thread (the co-ordinator) + final CyclicBarrier barrier = + new CyclicBarrier(totals.length+1, new Runnable() { + public void run() { + // all threads waiting; decrement number of steps + steps[0]--; + } + }); + + // start off all threads + for(int i=0; i<totals.length; i++) { + final int j=i; + new Thread() { + public void run() { + try { + ((RequestScopedService)calculator).__isis_startRequest(mockServiceInjector); + // keep incrementing, till no more steps + while(steps[0]>0) { + try { + calculator.add((j+1)); + totals[j] = calculator.getTotal(); + barrier.await(); + } catch (InterruptedException | BrokenBarrierException e) { + throw new RuntimeException(e); + } + } + } finally { + ((RequestScopedService)calculator).__isis_endRequest(); + } + }; + }.start(); + } + + // this thread is the co-ordinator; move onto next step when all are waiting + while(steps[0]>0) { + barrier.await(); + } + + assertThat(totals[0], is(10)); + assertThat(totals[1], is(20)); + assertThat(totals[2], is(30)); + } + + @Test + public void requestScoped_childThreads() throws InterruptedException { + + final AccumulatingCalculator calculator = + serviceInstantiator.createInstance(AccumulatingCalculator.class); + + final LongAdder counter = new LongAdder(); + + final int n = 100; + final int nThreads = 8; + final ExecutorService execService = Executors.newFixedThreadPool(nThreads); + + // initialize the request scoped calculator on current thread ('main') + ((RequestScopedService)calculator).__isis_startRequest(mockServiceInjector); + + for(int i=1;i<=n;++i) { + final int j=i; + execService.submit(()->{ + try { + + // access the request scoped calculator on a child thread of 'main' + calculator.add(j); + counter.add(calculator.getTotal()); + + } catch (Exception e) { + System.err.println(e.getMessage()); + } + }); + } + + execService.shutdown(); + + execService.awaitTermination(10, TimeUnit.SECONDS); + + ((RequestScopedService)calculator).__isis_endRequest(); + + assertThat(counter.intValue(), is(n*(n+1)/2)); + } + + public static class SingletonCalculator { + public int add(int x, int y) { + return x+y; + } + } - @Rule - public JUnitRuleMockery2 context = JUnitRuleMockery2.createFor(JUnitRuleMockery2.Mode.INTERFACES_AND_CLASSES); - - private ServiceInstantiator serviceInstantiator; - - @JUnitRuleMockery2.Ignoring - @Mock - private ServicesInjector mockServiceInjector; - - @Before - public void setUp() throws Exception { - - serviceInstantiator = new ServiceInstantiator(); - serviceInstantiator.setConfiguration(new IsisConfigurationDefault()); - } - - @Test - public void singleton() { - SingletonCalculator calculator = serviceInstantiator.createInstance(SingletonCalculator.class); - assertThat(calculator.add(3,4), is(7)); - } - - @Test - public void requestScoped_instantiate() { - AccumulatingCalculator calculator = serviceInstantiator.createInstance(AccumulatingCalculator.class); - assertThat(calculator instanceof RequestScopedService, is(true)); - } - - @Test - public void requestScoped_justOneThread() { - AccumulatingCalculator calculator = serviceInstantiator.createInstance(AccumulatingCalculator.class); - try { - ((RequestScopedService)calculator).__isis_startRequest(mockServiceInjector); - assertThat(calculator.add(3), is(3)); - assertThat(calculator.add(4), is(7)); - assertThat(calculator.getTotal(), is(7)); - } finally { - ((RequestScopedService)calculator).__isis_endRequest(); - } - } - - @Test - public void requestScoped_multipleThreads() throws InterruptedException, BrokenBarrierException { - - final AccumulatingCalculator calculator = serviceInstantiator.createInstance(AccumulatingCalculator.class); - - // will ask each thread's calculator to increment 10 times - final int[] steps = new int[]{10}; - - // each thread will post its totals here - final int[] totals = new int[]{0,0,0}; - - // after each step, all threads wait. The +1 is for this thread (the co-ordinator) - final CyclicBarrier barrier = - new CyclicBarrier(totals.length+1, new Runnable() { - public void run() { - // all threads waiting; decrement number of steps - steps[0]--; - } - }); - - // start off all threads - for(int i=0; i<totals.length; i++) { - final int j=i; - new Thread() { - public void run() { - try { - ((RequestScopedService)calculator).__isis_startRequest(mockServiceInjector); - // keep incrementing, til no more steps - while(steps[0]>0) { - try { - calculator.add((j+1)); - totals[j] = calculator.getTotal(); - barrier.await(); - } catch (InterruptedException | BrokenBarrierException e) { - throw new RuntimeException(e); - } - } - } finally { - ((RequestScopedService)calculator).__isis_endRequest(); - } - }; - }.start(); - } - - // this thread is the co-ordinator; move onto next step when all are waiting - while(steps[0]>0) { - barrier.await(); - } - - assertThat(totals[0], is(10)); - assertThat(totals[1], is(20)); - assertThat(totals[2], is(30)); - } - - - public static class SingletonCalculator { - public int add(int x, int y) { - return x+y; - } - } - - @RequestScoped - public static class AccumulatingCalculator { - private int total; - public int add(int x) { - total += x; - return getTotal(); - } - public int getTotal() { - return total; - } - } + @RequestScoped + public static class AccumulatingCalculator { + private int total; + public int add(int x) { + total += x; + return getTotal(); + } + public int getTotal() { + return total; + } + } } -- To stop receiving notification emails like this one, please contact danhayw...@apache.org.