2011/12/24 Eric Charles <[email protected]>:
> Hi Stephano,
>
> Opening the discussion to learn more :)
>
> - Why are you considering that 2 threads is a criteria to use standard
> synchronization rather than some atomic fields.
It is a criteria to not use the ConcurrentLinkedQueue that is a
structure thought to handle many concurrent threads and is overkill
for 2 threads.
> - I can understand you replace a concurrent by a non-concurrent queue.
> However, you now have a blocking queue. Is there an impact due to this
> blocking aspect?
I think the answer is no. That's why I did that.
Remember we have 2 threads and they do 2 different things, they simply
block each other when they add or remove from the queue.
> - You defined isAsync as volatile and sometimes encapsulate access to
> isAsync in a synchronized block, sometime not. Why using 2 different
> thread-safety strategies in this class?
Because some times it needed sincronization, other times I felt it was
not needed (the access to the volatile doesn't need synchronization. I
just synchronize to ensure that the change to the list happens
together with the change in the volatile var).
If you can find a better solution you're welcome to provide one. It
took a couple of hours to reach a working solution.
The previous one was not thread safe at this line:
------
if (listenerAdded == false) {
write.set(false);
-----
It could happen another thread already added a new item to the queue
but skipped to process it because write was true. So we ended up with
an item in the queue never written.
I don't like too much my solution and I felt it a bit hackish, but
that was my best solution for my limited time, so if you can provide a
more elegant solution while still being thread safe, I'm more than
happy :-)
Stefano
>
> Thx,
>
> Eric
>
>
>
> On 21/12/11 15:47, [email protected] wrote:
>>
>> Author: bago
>> Date: Wed Dec 21 14:47:25 2011
>> New Revision: 1221748
>>
>> URL: http://svn.apache.org/viewvc?rev=1221748&view=rev
>> Log:
>> An attempt to refactor AbstractProtocolTransport to be thread safe. I
>> moved back to standard synchronization as we only have max 2 threads
>> competing for the queue so it doesn't make sense to use a non blocking
>> queue. Norman, please overview, and feel free to revert if you don't like
>> the solution (i thought it was better to simply commit instead of opening a
>> JIRA to show you this).
>>
>> Modified:
>>
>> james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
>>
>> Modified:
>> james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
>> URL:
>> http://svn.apache.org/viewvc/james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java?rev=1221748&r1=1221747&r2=1221748&view=diff
>>
>> ==============================================================================
>> ---
>> james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
>> (original)
>> +++
>> james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
>> Wed Dec 21 14:47:25 2011
>> @@ -22,9 +22,8 @@ package org.apache.james.protocols.api;
>> import java.io.InputStream;
>> import java.io.UnsupportedEncodingException;
>> import java.util.List;
>> -import java.util.concurrent.ConcurrentLinkedQueue;
>> -import java.util.concurrent.atomic.AtomicBoolean;
>> -
>> +import java.util.Queue;
>> +import java.util.concurrent.LinkedBlockingQueue;
>>
>> import org.apache.james.protocols.api.FutureResponse.ResponseListener;
>>
>> @@ -42,18 +41,34 @@ public abstract class AbstractProtocolTr
>>
>>
>> // TODO: Should we limit the size ?
>> - private final ConcurrentLinkedQueue<Response> responses = new
>> ConcurrentLinkedQueue<Response>();
>> - private final AtomicBoolean write = new AtomicBoolean(false);
>> + private final Queue<Response> responses = new
>> LinkedBlockingQueue<Response>();
>> + private volatile boolean isAsync = false;
>>
>> /**
>> * @see
>> org.apache.james.protocols.api.ProtocolTransport#writeResponse(org.apache.james.protocols.api.Response,
>> org.apache.james.protocols.api.ProtocolSession)
>> */
>> public final void writeResponse(Response response, final
>> ProtocolSession session) {
>> - // just add the response to the queue. We will trigger the write
>> operation later
>> - responses.add(response);
>> -
>> - // trigger the write
>> - writeQueuedResponses(session);
>> + // if we already in asynchrnous mode we simply enqueue the
>> response
>> + // we do this synchronously because we may have a dequeuer thread
>> working on
>> + // isAsync and responses.
>> + boolean enqueued = false;
>> + synchronized(this) {
>> + if (isAsync == true) {
>> + responses.offer(response);
>> + enqueued = true;
>> + }
>> + }
>> +
>> + // if we didn't enqueue then we check if the response is writable
>> or we have to
>> + // set us "asynchrnous" and wait for response to be ready.
>> + if (!enqueued) {
>> + if (isResponseWritable(response)) {
>> + writeResponseToClient(response, session);
>> + } else {
>> + addDequeuerListener(response, session);
>> + isAsync = true;
>> + }
>> + }
>> }
>>
>> /**
>> @@ -65,50 +80,46 @@ public abstract class AbstractProtocolTr
>> * @param session
>> */
>> private void writeQueuedResponses(final ProtocolSession session) {
>> - Response queuedResponse = null;
>>
>> - if (write.compareAndSet(false, true)){
>> - boolean listenerAdded = false;
>> - // dequeue Responses until non is left
>> - while ((queuedResponse = responses.poll()) != null) {
>> -
>> - // check if we need to take special care of
>> FutureResponses
>> - if (queuedResponse instanceof FutureResponse) {
>> - FutureResponse futureResponse =(FutureResponse)
>> queuedResponse;
>> - if (futureResponse.isReady()) {
>> - // future is ready so we can write it without
>> blocking the IO-Thread
>> - writeResponseToClient(queuedResponse, session);
>> - } else {
>> -
>> - // future is not ready so we need to write it via
>> a ResponseListener otherwise we MAY block the IO-Thread
>> - futureResponse.addListener(new ResponseListener()
>> {
>> -
>> - public void onResponse(FutureResponse
>> response) {
>> - writeResponseToClient(response, session);
>> - if (write.compareAndSet(true, false)) {
>> - writeQueuedResponses(session);
>> - }
>> - }
>> - });
>> - listenerAdded = true;
>> - // just break here as we will trigger the dequeue
>> later
>> - break;
>> - }
>> -
>> - } else {
>> - // the Response is not a FutureResponse, so just
>> write it back the the remote peer
>> - writeResponseToClient(queuedResponse, session);
>> + // dequeue Responses until non is left
>> + while (true) {
>> +
>> + Response queuedResponse = null;
>> +
>> + // synchrnously we check responses and if it is empty we move
>> back to non asynch
>> + // behaviour
>> + synchronized(this) {
>> + queuedResponse = responses.poll();
>> + if (queuedResponse == null) {
>> + isAsync = false;
>> + break;
>> }
>> -
>> }
>> - // Check if a ResponseListener was added before. If not we
>> can allow to write
>> - // responses again. Otherwise the writing will get triggered
>> from the listener
>> - if (listenerAdded == false) {
>> - write.set(false);
>> +
>> + // if we have something in the queue we continue writing
>> until we
>> + // find something asynchronous.
>> + if (isResponseWritable(queuedResponse)) {
>> + writeResponseToClient(queuedResponse, session);
>> + } else {
>> + addDequeuerListener(queuedResponse, session);
>> + // no changes to isAsync here, because in this method we
>> are always already async.
>> + break;
>> }
>> }
>> -
>> -
>> + }
>> +
>> + private boolean isResponseWritable(Response response) {
>> + return !(response instanceof FutureResponse) || ((FutureResponse)
>> response).isReady();
>> + }
>> +
>> + private void addDequeuerListener(Response response, final
>> ProtocolSession session) {
>> + ((FutureResponse) response).addListener(new ResponseListener() {
>> +
>> + public void onResponse(FutureResponse response) {
>> + writeResponseToClient(response, session);
>> + writeQueuedResponses(session);
>> + }
>> + });
>> }
>>
>> /**
>>
>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: [email protected]
>> For additional commands, e-mail: [email protected]
>>
>
> --
> Eric http://about.echarles.net
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [email protected]
> For additional commands, e-mail: [email protected]
>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]