## Keep Passing the Open Windows

You did read the classic [v2Archive OSGi enRoute App note][5] about this topic? 
It has been archived by the OSGi to [v2Archive OSGi enRoute web site][3]. It 
handles a lot of similar cases. There is an accompanying workspace [v2Archive 
OSGi enRoute osgi.enroute.examples.concurrency 
<https://github.com/osgi/osgi.enroute.examples.concurrency>][7]

Anyway, I am not sure if you want to solve this pragmatic or pure?

## Pragmatic 

Pragmatic means there is a tiny chance you hit the window where you check if 
the MyService is unregistered and then use it. If you're really unlucky you 
just hit the unregistration after you checked it but before you can use it. It 
works when the unregistration of MyService is rare and the work is long. Yes, 
it can fail but so can anything so you should be prepared for it. 

Pragmatic works best as follows:

   @Component
   public class MyClass extends Thread {   
      @Reference MyService myService;
   
      @Activate void activate()         { start(); }
      @Deactivate void deactivate()     { interrupt(); }
   
      public void run() {
         while (!isInterrupted()) {
            try {
                MyResult result = doHardWork();
                if (!isInterrupted())
                    myService.setResult(result);
            } catch (Exception e) { /* TODO */ }
         }
      }
   }

Clearly there is a race condition. 




## Pure 

I once had a use case where we had whiteboard listeners that received events. 
The frequency and some not so good event listeners that took too much time in 
their callback. This created a quite long window where it could fail so it 
often did. For that use case I created a special highly optimized class that 
could delay the removal of the listener while it was being dispatched. To make 
it have absolutely minimal overhead was tricky, I even made an Alloy model of 
it that found some design errors. Anyway, sometimes you have pick one of the 
bad sides, this was one where delaying the deactivate was worth it.

So how would you make this 'purer' by delaying the deactivation until you 
stopped using it? Since the service is still supposed to be valid during 
deactivate we could make the setResult() and the deactivate() methods exclude 
each other. That is, we need to make sure that no interrupt can happen when we 
check for the isInterrupted() and call myService.setResult(). We could use 
heavy locks but synchronized works fine for me when you realize some of its 
caveats:

* Short blocks
* Ensure you cannot create deadlocks

So there must be an explicit contract that the MyService is not going to stay 
away for a long time nor call lots of other unknown code that could cause 
deadlocks. After all, we're blocking the deactivate() method which is very bad 
practice in general. So you will trade off one purity for another.

   @Component
   public class MyClass extends Thread {   
      @Reference MyService myService;
   
      @Activate void activate()                         { start(); }
      @Deactivate synchronized void deactivate()        { interrupt(); }
   
      public void run() {
         while (!isInterrupted()) {
            try {
                MyResult result = doHardWork();
                synchronized(this) {
                    if (!isInterrupted()) {
                        myService.setResult(result);
                    }
                }
            } catch (Exception e) { /* TODO */ }
         }
      }
   }

This guarantees what you want … However (you knew this was coming!) there is a 
reason the service gets deactivated. Even though the _service_ is still valid 
at that point, there is a reason the _service object_ indicated its 
unwillingness to play. For example, if MyService was remoted then the 
connection might have been lost. In general, when you call a service you should 
be prepared that it fails. (That is why you should always take exceptions into 
account even if they're not checked.)

## Better API

The best solution is usually to turn the problem around. This clearly can only 
happen when you can influence the API so that is often not a choice. If you 
can, you can pass a Promise to the myService and calculate in the background. 
Clearly that means you keep churning doing the hard work. Unless the 
calculation is very expensive and the unregistration happens often, doing the 
calculation unnecessary should normally have no practical concerns. If it is, 
you might want to consider CompletableFuture instead of Promise since it has a 
cancel() method. (We rejected a cancel since it makes the Promise mutable, but 
admittedly it is useful. However, it has the same race issues as we discuss 
here.)

   @Component
   public class MyClass {
   
      @Reference MyService myService;
      @Reference PromiseFactory promiseFactory;

      @Activate void activate()                         { 
        Promise<MyResult> result = promiseFactory.submit( this::doHardWork );
        myService.setResult( result );
      }
   }

This is an example where you see a very weird effect that I first noticed in 
the eighties during my first big OO design. At first you think the problem is 
now moved from MyClass to MyService? I think when you try to implement this 
that you find that the problem mostly _disappeared_. During one of the first 
large systems I designed I kept feeling we were kicking the hard problems down 
the road and we still run into a brick wall. However, one day we realized we 
were done. For some reason the hard problems were solved in the structure of 
the application and not in specific code. Weird. However, realizing this I 
often have to cry a bit when I realize how some designs are doing the opposite 
and make simple things complex :-(

## Multiple Results

If you have multiple results to deliver you might want to take a look at the 
[OSGi PushStream][1]. When I made the initial design for ASyncStreams (feels 
eons ago :-( ) that inspired the OSGi Push Stream specification  this was one 
of the use cases I had in mind. The Push Stream are intended to handle all the 
nasty cases and shield you from them. As a bonus, it actually works for 
multiple receivers as well. Push Streams provide a simple low cost backlink to 
handle the case where the MyService gets closed. Haven't looked at where Push 
Stream's ended up but as far as I know they should still be useful when your 
hard work delivers multiple results. Ah well, I wanted to take a look at it 
anyway since it has been released now. Let's see how that would look like:

        @Component
        public class ProviderImpl extends Thread {
        
                @Reference PushStreamProvider           psp;
                @Reference MyService                    myService;

                volatile SimplePushEventSource<MyResult>        dispatcher;
        
                @Activate void activate() throws Exception {
                        dispatcher = 
psp.createSimpleEventSource(MyResult.class);
                        myService.setResult(dispatcher);
                        start();
                }
        
                @Deactivate void deactivate() {
                        interrupt();
                }
        
                @Override
                public void run() {
                        try {
                                MyResult r = doHardWork();
                                while (!isInterrupted()) {
                                        dispatcher.publish(r);
                                        r = doHardWork();
                                }
                        } finally {
                                dispatcher.close();
                        }
                }
        }

## Use of Executors

As a side note. I've been in systems where everybody was mucking around with 
ExecutorServices and it became a mess. In [v2Archive OSGi enRoute][3] I always 
provided an [Executor service][4] that is shared and does proper cleanup when 
the service getter goes away. (The [v2Archive OSGi enRoute Scheduler][6] was 
also very nice for this since it provides Cancelable Promises.) Executor 
Services created statically are horror in OSGi since they are completely 
oblivious of the OSGi dynamics. And in your case they are totally unnecessary. 
The only utility they provide to you is that they interrupt the threads. This 
is trivial to do when you create your own thread. (And messages about the 
expensiveness of threads are highly exaggerated.) Even if you use an Executor 
you can pass the thread.

        Deferred<Thread> deferred = new Deferred<>();   
        Promise<MyResult> promiseFactory.submit( () -> {
                deferred.resolve( Thread.currentThread() );

                while ( result == null && !Thread 
<http://thread.is/>.currentThread().isInterrupted() {
                … do some hard work
                }
                return result;
        });

        // deactivate
        deferred.getPromise().getValue().interrupt();

In general, if you go this route, suggest you clearly separate the strategies 
from the code. I.e. make a separate class to capture the strategy of handling 
these things. Worst designs are where these are mixed.

## Disclaimer

I guess this became a tad long, I guess I will turn it into a blog.
 
Anyway, usually disclaimer: none of the code has been tested so use it at your 
own peril!

Good luck, kind regards,

        Peter Kriens

[1]: https://osgi.org/specification/osgi.cmpn/7.0.0/util.pushstream.html 
<https://osgi.org/specification/osgi.cmpn/7.0.0/util.pushstream.html>
[2]: 
http://www.plantuml.com/plantuml/png/RP2n2i8m48RtF4NST6WVe4Cj24M7Ka71EII71jjKxYwLlhsXMXghO-w3Z-zFGQoGVTk8QZW1zbQ3J79PNcGc4QwM6524LxXLmwvHH07epX6Zr_mcCo1WsKwU9LIQRQyOn7GAplCDGPa0nmoHfgdud69ekhr2y-pm_ezQEZW6HFzWCDlHyRl5ksXDN6LWsPNaiteIhpUBjk_D2EGRZeVD1PayrdMv4WKu4_xv1G00
 
<http://www.plantuml.com/plantuml/png/RP2n2i8m48RtF4NST6WVe4Cj24M7Ka71EII71jjKxYwLlhsXMXghO-w3Z-zFGQoGVTk8QZW1zbQ3J79PNcGc4QwM6524LxXLmwvHH07epX6Zr_mcCo1WsKwU9LIQRQyOn7GAplCDGPa0nmoHfgdud69ekhr2y-pm_ezQEZW6HFzWCDlHyRl5ksXDN6LWsPNaiteIhpUBjk_D2EGRZeVD1PayrdMv4WKu4_xv1G00>
[3]: https://v2archive.enroute.osgi.org/ <https://v2archive.enroute.osgi.org/>
[4]: 
https://github.com/osgi/v2archive.osgi.enroute/tree/master/osgi.enroute.executor.simple.provider
 
<https://github.com/osgi/v2archive.osgi.enroute/tree/master/osgi.enroute.executor.simple.provider>
[5]: https://v2archive.enroute.osgi.org/appnotes/concurrency.html 
<https://v2archive.enroute.osgi.org/appnotes/concurrency.html> 
[6]: 
https://github.com/osgi/v2archive.osgi.enroute/tree/master/osgi.enroute.scheduler.simple.provider
 
<https://github.com/osgi/v2archive.osgi.enroute/tree/master/osgi.enroute.scheduler.simple.provider>
[7]: https://github.com/osgi/osgi.enroute.examples.concurrency 
<https://github.com/osgi/osgi.enroute.examples.concurrency>

> 
> On 2 Aug 2018, at 02:01, David Leangen via osgi-dev <osgi-dev@mail.osgi.org> 
> wrote:
> 
> 
> Hi Tim,
> 
> Thanks, and this is good advice. The example you give is when the thread is 
> in the same component that is being deactivated. In this case, as you show, 
> it is quite trivial to track the activation state of the component in order 
> to shut down the thread.
> 
> In my case, the trouble I am having is that the long-running thread is in a 
> component that is different from the component that is getting deactivated. 
> For instance, building on your example:
> 
> @Component
> public class MyClass {
> 
>    // Note that I am using a STATIC service
>    @Reference private MyService myService;
> 
>    private final AtomicBoolean closed = new AtomicBoolean();
> 
>    @Activate
>    void start() {
>        new Thread(this::longStart).run()
>    }
> 
> 
>    @Deactivate
>    void stop() {
>        closed.set(true);
>    }
> 
>    void longStart() {
>        for(int i = 0; i < 1000000; i++) {
> 
>            // This only works if the service object is not stateful, 
> otherwise we need
>            // to do a check and throw away an intermediate invalidated result
> 
>            // Understood, but unfortunately the service object is stateful.
> 
>            // The problem is that the dependency can be deactivated at any 
> time, and this
>            // is happening before “closed" in this component get set to 
> “true". I do not know how
>            // to detect the deactivation of the dependency. I need to 
> determine this pre-emptively,
>            // not after-the-fact. Otherwise the result will be destructive.
> 
>            doSomethingWithMyService(myService);
> 
>            // Ideally I would like to do something like this:
>            if (myServiceIsStillActive())
>                doSomethingWithMyService(myService);
>        }
>    }
> }
> 
> In the second example, there is a dynamic @Reference, so I see the point of 
> using an AtomicReference. However, I am using a static @Reference, so I doubt 
> that just putting in an AtomicReference will change the timing problem.
> 
> Any thoughts?
> 
> 
> 
> By the way, instead of using a “closed” variable, I am doing something like 
> this:
> 
>     @Activate
>     void activate()
>     {
>         executor = Executors.newSingleThreadExecutor();
>     }
> 
>     void deactivate()
>     {
>         executor.shutdownNow();
>     }
> 
> Then I only need to test for Thread.interrupted(). I assume this has the same 
> effect as having the check for “closed".
> 
> Cheers,
> =David
> 
> 
> 
>> On Aug 1, 2018, at 16:59, Tim Ward <tim.w...@paremus.com 
>> <mailto:tim.w...@paremus.com>> wrote:
>> 
>> Hi David,
>> 
>> In addition to interrupting the worker thread (which is a good idea). There 
>> are a couple of useful things that you can do using the support from 
>> java.util.concurrent. For example, setting a closed state:
>> 
>> 
>> @Component
>> public class MyClass {
>> 
>>    private final AtomicBoolean closed = new AtomicBoolean();
>> 
>>    @Activate
>>    void start() {
>>        new Thread(this::longStart).run()
>>    }
>> 
>> 
>>    @Deactivate
>>    void stop() {
>>        closed.set(true);
>>    }
>> 
>>    void longStart() {
>>        for(int i = 0; i < 1000000; i++) {
>>            if(closed.get()) {
>>                break;
>>            }
>>            doSomething();
>>        }
>>    }
>> }
>> 
>> Also if your references are dynamic then you should treat them carefully
>> 
>> @Component
>> public class MyClass implements MySlowService {
>> 
>>    private final AtomicReference<MyService> myRef = new AtomicReference<>();
>> 
>>    @Reference(policy=DYNAMIC)
>>    void setReference(MyService service) {
>>        myRef.set(service)
>>    }
>> 
>>    void unsetReference(MyService service) {
>>        // Note that it is *not* safe to just do a set null, see Compendium 
>> 112.5.12
>>        myRef.compareAndSet(service, null);
>>    }
>> 
>>    public void longRunningTask() {
>>        for(int i = 0; i < 1000000; i++) {
>>            // This only works if the service object is not stateful, 
>> otherwise we need
>>            // to do a check and throw away an intermediate invalidated result
>> 
>>            MyService myService = myRef.get();
>>            doSomethingWithMyService(myService);
>>        }
>>    }
>> }
>> 
>> I hope you find these helpful.
>> 
>> Tim
>> 
>>> On 1 Aug 2018, at 05:44, David Leangen via osgi-dev <osgi-dev@mail.osgi.org 
>>> <mailto:osgi-dev@mail.osgi.org>> wrote:
>>> 
>>> 
>>> Hi!
>>> 
>>> I am running into a situation where, what I think is happening is:
>>> 
>>> Component A gets instantiated
>>> Component B
>>> - references A
>>> - gets satisfied once A is satisfied 
>>> - kicks off a long-running process when one of its methods are called
>>> - the long-running process is run in a different thread, with a Promise
>>> Component A is no longer satisfied
>>> But
>>> - the long-running process is still running
>>> - the long-running process now references an invalid Component A
>>> - the long-running thread fails because of the invalid state of Component A
>>> Component B is no longer satisfied
>>> 
>>> 
>>> So, the long-running component messes things up, but its component has not 
>>> yet shut down even though its process is still happily running in another 
>>> thread.
>>> 
>>> I can think of two possible solutions, but not sure which is best and not 
>>> sure how to implement:
>>> 
>>> 1) Figure out a way to share an ExecutorService between “related” 
>>> components so that when one component 
>>>     shuts down it will signal to the other related components that their 
>>> threads are now interrupted
>>> 
>>> 2) In the long-running process, determine if the component that provides 
>>> the required service
>>>      is still active before continuing with the havoc-wreaking process
>>> 
>>> 
>>> Does this sound about right?
>>> 
>>> How would I actually accomplish either of these?
>>> 
>>> 
>>> Thanks!
>>> =David
>>> 
>>> 
>>> _______________________________________________
>>> OSGi Developer Mail List
>>> osgi-dev@mail.osgi.org <mailto:osgi-dev@mail.osgi.org>
>>> https://mail.osgi.org/mailman/listinfo/osgi-dev
>> 
> 
> _______________________________________________
> OSGi Developer Mail List
> osgi-dev@mail.osgi.org
> https://mail.osgi.org/mailman/listinfo/osgi-dev

_______________________________________________
OSGi Developer Mail List
osgi-dev@mail.osgi.org
https://mail.osgi.org/mailman/listinfo/osgi-dev

Reply via email to