## Keep Passing the Open Windows You did read the classic [v2Archive OSGi enRoute App note][5] about this topic? It has been archived by the OSGi to [v2Archive OSGi enRoute web site][3]. It handles a lot of similar cases. There is an accompanying workspace [v2Archive OSGi enRoute osgi.enroute.examples.concurrency <https://github.com/osgi/osgi.enroute.examples.concurrency>][7]
Anyway, I am not sure if you want to solve this pragmatic or pure? ## Pragmatic Pragmatic means there is a tiny chance you hit the window where you check if the MyService is unregistered and then use it. If you're really unlucky you just hit the unregistration after you checked it but before you can use it. It works when the unregistration of MyService is rare and the work is long. Yes, it can fail but so can anything so you should be prepared for it. Pragmatic works best as follows: @Component public class MyClass extends Thread { @Reference MyService myService; @Activate void activate() { start(); } @Deactivate void deactivate() { interrupt(); } public void run() { while (!isInterrupted()) { try { MyResult result = doHardWork(); if (!isInterrupted()) myService.setResult(result); } catch (Exception e) { /* TODO */ } } } } Clearly there is a race condition. ## Pure I once had a use case where we had whiteboard listeners that received events. The frequency and some not so good event listeners that took too much time in their callback. This created a quite long window where it could fail so it often did. For that use case I created a special highly optimized class that could delay the removal of the listener while it was being dispatched. To make it have absolutely minimal overhead was tricky, I even made an Alloy model of it that found some design errors. Anyway, sometimes you have pick one of the bad sides, this was one where delaying the deactivate was worth it. So how would you make this 'purer' by delaying the deactivation until you stopped using it? Since the service is still supposed to be valid during deactivate we could make the setResult() and the deactivate() methods exclude each other. That is, we need to make sure that no interrupt can happen when we check for the isInterrupted() and call myService.setResult(). We could use heavy locks but synchronized works fine for me when you realize some of its caveats: * Short blocks * Ensure you cannot create deadlocks So there must be an explicit contract that the MyService is not going to stay away for a long time nor call lots of other unknown code that could cause deadlocks. After all, we're blocking the deactivate() method which is very bad practice in general. So you will trade off one purity for another. @Component public class MyClass extends Thread { @Reference MyService myService; @Activate void activate() { start(); } @Deactivate synchronized void deactivate() { interrupt(); } public void run() { while (!isInterrupted()) { try { MyResult result = doHardWork(); synchronized(this) { if (!isInterrupted()) { myService.setResult(result); } } } catch (Exception e) { /* TODO */ } } } } This guarantees what you want … However (you knew this was coming!) there is a reason the service gets deactivated. Even though the _service_ is still valid at that point, there is a reason the _service object_ indicated its unwillingness to play. For example, if MyService was remoted then the connection might have been lost. In general, when you call a service you should be prepared that it fails. (That is why you should always take exceptions into account even if they're not checked.) ## Better API The best solution is usually to turn the problem around. This clearly can only happen when you can influence the API so that is often not a choice. If you can, you can pass a Promise to the myService and calculate in the background. Clearly that means you keep churning doing the hard work. Unless the calculation is very expensive and the unregistration happens often, doing the calculation unnecessary should normally have no practical concerns. If it is, you might want to consider CompletableFuture instead of Promise since it has a cancel() method. (We rejected a cancel since it makes the Promise mutable, but admittedly it is useful. However, it has the same race issues as we discuss here.) @Component public class MyClass { @Reference MyService myService; @Reference PromiseFactory promiseFactory; @Activate void activate() { Promise<MyResult> result = promiseFactory.submit( this::doHardWork ); myService.setResult( result ); } } This is an example where you see a very weird effect that I first noticed in the eighties during my first big OO design. At first you think the problem is now moved from MyClass to MyService? I think when you try to implement this that you find that the problem mostly _disappeared_. During one of the first large systems I designed I kept feeling we were kicking the hard problems down the road and we still run into a brick wall. However, one day we realized we were done. For some reason the hard problems were solved in the structure of the application and not in specific code. Weird. However, realizing this I often have to cry a bit when I realize how some designs are doing the opposite and make simple things complex :-( ## Multiple Results If you have multiple results to deliver you might want to take a look at the [OSGi PushStream][1]. When I made the initial design for ASyncStreams (feels eons ago :-( ) that inspired the OSGi Push Stream specification this was one of the use cases I had in mind. The Push Stream are intended to handle all the nasty cases and shield you from them. As a bonus, it actually works for multiple receivers as well. Push Streams provide a simple low cost backlink to handle the case where the MyService gets closed. Haven't looked at where Push Stream's ended up but as far as I know they should still be useful when your hard work delivers multiple results. Ah well, I wanted to take a look at it anyway since it has been released now. Let's see how that would look like: @Component public class ProviderImpl extends Thread { @Reference PushStreamProvider psp; @Reference MyService myService; volatile SimplePushEventSource<MyResult> dispatcher; @Activate void activate() throws Exception { dispatcher = psp.createSimpleEventSource(MyResult.class); myService.setResult(dispatcher); start(); } @Deactivate void deactivate() { interrupt(); } @Override public void run() { try { MyResult r = doHardWork(); while (!isInterrupted()) { dispatcher.publish(r); r = doHardWork(); } } finally { dispatcher.close(); } } } ## Use of Executors As a side note. I've been in systems where everybody was mucking around with ExecutorServices and it became a mess. In [v2Archive OSGi enRoute][3] I always provided an [Executor service][4] that is shared and does proper cleanup when the service getter goes away. (The [v2Archive OSGi enRoute Scheduler][6] was also very nice for this since it provides Cancelable Promises.) Executor Services created statically are horror in OSGi since they are completely oblivious of the OSGi dynamics. And in your case they are totally unnecessary. The only utility they provide to you is that they interrupt the threads. This is trivial to do when you create your own thread. (And messages about the expensiveness of threads are highly exaggerated.) Even if you use an Executor you can pass the thread. Deferred<Thread> deferred = new Deferred<>(); Promise<MyResult> promiseFactory.submit( () -> { deferred.resolve( Thread.currentThread() ); while ( result == null && !Thread <http://thread.is/>.currentThread().isInterrupted() { … do some hard work } return result; }); // deactivate deferred.getPromise().getValue().interrupt(); In general, if you go this route, suggest you clearly separate the strategies from the code. I.e. make a separate class to capture the strategy of handling these things. Worst designs are where these are mixed. ## Disclaimer I guess this became a tad long, I guess I will turn it into a blog. Anyway, usually disclaimer: none of the code has been tested so use it at your own peril! Good luck, kind regards, Peter Kriens [1]: https://osgi.org/specification/osgi.cmpn/7.0.0/util.pushstream.html <https://osgi.org/specification/osgi.cmpn/7.0.0/util.pushstream.html> [2]: http://www.plantuml.com/plantuml/png/RP2n2i8m48RtF4NST6WVe4Cj24M7Ka71EII71jjKxYwLlhsXMXghO-w3Z-zFGQoGVTk8QZW1zbQ3J79PNcGc4QwM6524LxXLmwvHH07epX6Zr_mcCo1WsKwU9LIQRQyOn7GAplCDGPa0nmoHfgdud69ekhr2y-pm_ezQEZW6HFzWCDlHyRl5ksXDN6LWsPNaiteIhpUBjk_D2EGRZeVD1PayrdMv4WKu4_xv1G00 <http://www.plantuml.com/plantuml/png/RP2n2i8m48RtF4NST6WVe4Cj24M7Ka71EII71jjKxYwLlhsXMXghO-w3Z-zFGQoGVTk8QZW1zbQ3J79PNcGc4QwM6524LxXLmwvHH07epX6Zr_mcCo1WsKwU9LIQRQyOn7GAplCDGPa0nmoHfgdud69ekhr2y-pm_ezQEZW6HFzWCDlHyRl5ksXDN6LWsPNaiteIhpUBjk_D2EGRZeVD1PayrdMv4WKu4_xv1G00> [3]: https://v2archive.enroute.osgi.org/ <https://v2archive.enroute.osgi.org/> [4]: https://github.com/osgi/v2archive.osgi.enroute/tree/master/osgi.enroute.executor.simple.provider <https://github.com/osgi/v2archive.osgi.enroute/tree/master/osgi.enroute.executor.simple.provider> [5]: https://v2archive.enroute.osgi.org/appnotes/concurrency.html <https://v2archive.enroute.osgi.org/appnotes/concurrency.html> [6]: https://github.com/osgi/v2archive.osgi.enroute/tree/master/osgi.enroute.scheduler.simple.provider <https://github.com/osgi/v2archive.osgi.enroute/tree/master/osgi.enroute.scheduler.simple.provider> [7]: https://github.com/osgi/osgi.enroute.examples.concurrency <https://github.com/osgi/osgi.enroute.examples.concurrency> > > On 2 Aug 2018, at 02:01, David Leangen via osgi-dev <osgi-dev@mail.osgi.org> > wrote: > > > Hi Tim, > > Thanks, and this is good advice. The example you give is when the thread is > in the same component that is being deactivated. In this case, as you show, > it is quite trivial to track the activation state of the component in order > to shut down the thread. > > In my case, the trouble I am having is that the long-running thread is in a > component that is different from the component that is getting deactivated. > For instance, building on your example: > > @Component > public class MyClass { > > // Note that I am using a STATIC service > @Reference private MyService myService; > > private final AtomicBoolean closed = new AtomicBoolean(); > > @Activate > void start() { > new Thread(this::longStart).run() > } > > > @Deactivate > void stop() { > closed.set(true); > } > > void longStart() { > for(int i = 0; i < 1000000; i++) { > > // This only works if the service object is not stateful, > otherwise we need > // to do a check and throw away an intermediate invalidated result > > // Understood, but unfortunately the service object is stateful. > > // The problem is that the dependency can be deactivated at any > time, and this > // is happening before “closed" in this component get set to > “true". I do not know how > // to detect the deactivation of the dependency. I need to > determine this pre-emptively, > // not after-the-fact. Otherwise the result will be destructive. > > doSomethingWithMyService(myService); > > // Ideally I would like to do something like this: > if (myServiceIsStillActive()) > doSomethingWithMyService(myService); > } > } > } > > In the second example, there is a dynamic @Reference, so I see the point of > using an AtomicReference. However, I am using a static @Reference, so I doubt > that just putting in an AtomicReference will change the timing problem. > > Any thoughts? > > > > By the way, instead of using a “closed” variable, I am doing something like > this: > > @Activate > void activate() > { > executor = Executors.newSingleThreadExecutor(); > } > > void deactivate() > { > executor.shutdownNow(); > } > > Then I only need to test for Thread.interrupted(). I assume this has the same > effect as having the check for “closed". > > Cheers, > =David > > > >> On Aug 1, 2018, at 16:59, Tim Ward <tim.w...@paremus.com >> <mailto:tim.w...@paremus.com>> wrote: >> >> Hi David, >> >> In addition to interrupting the worker thread (which is a good idea). There >> are a couple of useful things that you can do using the support from >> java.util.concurrent. For example, setting a closed state: >> >> >> @Component >> public class MyClass { >> >> private final AtomicBoolean closed = new AtomicBoolean(); >> >> @Activate >> void start() { >> new Thread(this::longStart).run() >> } >> >> >> @Deactivate >> void stop() { >> closed.set(true); >> } >> >> void longStart() { >> for(int i = 0; i < 1000000; i++) { >> if(closed.get()) { >> break; >> } >> doSomething(); >> } >> } >> } >> >> Also if your references are dynamic then you should treat them carefully >> >> @Component >> public class MyClass implements MySlowService { >> >> private final AtomicReference<MyService> myRef = new AtomicReference<>(); >> >> @Reference(policy=DYNAMIC) >> void setReference(MyService service) { >> myRef.set(service) >> } >> >> void unsetReference(MyService service) { >> // Note that it is *not* safe to just do a set null, see Compendium >> 112.5.12 >> myRef.compareAndSet(service, null); >> } >> >> public void longRunningTask() { >> for(int i = 0; i < 1000000; i++) { >> // This only works if the service object is not stateful, >> otherwise we need >> // to do a check and throw away an intermediate invalidated result >> >> MyService myService = myRef.get(); >> doSomethingWithMyService(myService); >> } >> } >> } >> >> I hope you find these helpful. >> >> Tim >> >>> On 1 Aug 2018, at 05:44, David Leangen via osgi-dev <osgi-dev@mail.osgi.org >>> <mailto:osgi-dev@mail.osgi.org>> wrote: >>> >>> >>> Hi! >>> >>> I am running into a situation where, what I think is happening is: >>> >>> Component A gets instantiated >>> Component B >>> - references A >>> - gets satisfied once A is satisfied >>> - kicks off a long-running process when one of its methods are called >>> - the long-running process is run in a different thread, with a Promise >>> Component A is no longer satisfied >>> But >>> - the long-running process is still running >>> - the long-running process now references an invalid Component A >>> - the long-running thread fails because of the invalid state of Component A >>> Component B is no longer satisfied >>> >>> >>> So, the long-running component messes things up, but its component has not >>> yet shut down even though its process is still happily running in another >>> thread. >>> >>> I can think of two possible solutions, but not sure which is best and not >>> sure how to implement: >>> >>> 1) Figure out a way to share an ExecutorService between “related” >>> components so that when one component >>> shuts down it will signal to the other related components that their >>> threads are now interrupted >>> >>> 2) In the long-running process, determine if the component that provides >>> the required service >>> is still active before continuing with the havoc-wreaking process >>> >>> >>> Does this sound about right? >>> >>> How would I actually accomplish either of these? >>> >>> >>> Thanks! >>> =David >>> >>> >>> _______________________________________________ >>> OSGi Developer Mail List >>> osgi-dev@mail.osgi.org <mailto:osgi-dev@mail.osgi.org> >>> https://mail.osgi.org/mailman/listinfo/osgi-dev >> > > _______________________________________________ > OSGi Developer Mail List > osgi-dev@mail.osgi.org > https://mail.osgi.org/mailman/listinfo/osgi-dev
_______________________________________________ OSGi Developer Mail List osgi-dev@mail.osgi.org https://mail.osgi.org/mailman/listinfo/osgi-dev