Re: Concurrent modification of DefaultExchange

2017-02-22 Thread rsteppac2
Hello Claus,


Claus Ibsen-2 wrote
> If you want an volatile exception member then provide a github PR

Personally I think that for almost all integrators it makes sense for the
DefaultExchange to not be thread safe. Incurring the cost of volatile member
access for everyone, whether they need it or not, seems unnecessary to me.
A more generic solution to allow integrators to choose whether they would
like to pay the cost of synchronization if wanted and needed would be
better. I think. 


Ralf



--
View this message in context: 
http://camel.465427.n5.nabble.com/Concurrent-modification-of-DefaultExchange-tp5793968p5794279.html
Sent from the Camel - Users mailing list archive at Nabble.com.


Re: Concurrent modification of DefaultExchange

2017-02-22 Thread Claus Ibsen
Hi

If you want an volatile exception member then provide a github PR

On Thu, Feb 16, 2017 at 12:01 PM, rsteppac2 <r...@steppacher.name> wrote:
> Hello all,
>
> I have to ensure that the thread that runs a route (and the error handler)
> is guaranteed to see an exception set on the exchange instance if that
> instance is shared by many threads and the exception is set by one of those
> threads.
>
> I have a route step that massages the data in an input stream (jetty
> endpoint) before proxying to the actual web server. For the stream data
> manipulation I use stream pipelines (PipedInputStream/PipedOutputStream)
> which need to run in their own thread per pipeline element. Each pipeline
> element holds a reference to the exchange and if an error is encountered the
> exception is set on the exchange.
> This appears to work alright. But I don't think it is guaranteed to work as
> "exception" is not a volatile member of DefaultExchange. Thus the main
> thread is not guaranteed to see an exception set on the exchange by a worker
> thread. The worker threads themselves all use a ReentrantLock I provide as
> an exchange property (the property map is a ConcurrentHashMap) to
> synchronize their access to the exchange. So data visibility between the
> worker threads should not be an issue.
> But how to do it for the main thread?
>
> I had a look at the Splitter implementation and how it deals with parallel
> execution and aggregation. As far as I understand it the Splitter enforces
> visibility by returning an AtomicReference of an exchange that the main
> thread has never seen before. The exchange referenced by the atomic
> reference is treated as read-only after it is set on the atomic reference.
> So a get() on the atomic reference in the main thread is guaranteed to see
> the latest version of the referenced exchange instance.
>
> I cannot apply this approach as it requires the main thread to wait for the
> workers to finish processing. If I block the main thread my workers would be
> blocked as well as the main thread is responsible for consuming the modified
> input stream contents from the last PipedInputStream element and forwarding
> it to the web server.
>
> I also could not find a factory mechanism that would allow me to tell Camel
> to instantiate my own implementation of the Exchange interface (with a
> volatile member "exception").
> (DefaultException was also not written with sub-classing in mind, it
> appears. E.g. DefaultException::isFailed() uses the private member
> "exception" instead of DefaultException::getException() for the answer. But
> that's a separate issue.)
>
> Does anyone have any other ideas?
>
>
>
> Thanks!
> Ralf
>
>
>
> --
> View this message in context: 
> http://camel.465427.n5.nabble.com/Concurrent-modification-of-DefaultExchange-tp5793968.html
> Sent from the Camel - Users mailing list archive at Nabble.com.



-- 
Claus Ibsen
-
http://davsclaus.com @davsclaus
Camel in Action 2: https://www.manning.com/ibsen2


Concurrent modification of DefaultExchange

2017-02-16 Thread rsteppac2
Hello all,

I have to ensure that the thread that runs a route (and the error handler)
is guaranteed to see an exception set on the exchange instance if that
instance is shared by many threads and the exception is set by one of those
threads.

I have a route step that massages the data in an input stream (jetty
endpoint) before proxying to the actual web server. For the stream data
manipulation I use stream pipelines (PipedInputStream/PipedOutputStream)
which need to run in their own thread per pipeline element. Each pipeline
element holds a reference to the exchange and if an error is encountered the
exception is set on the exchange.
This appears to work alright. But I don't think it is guaranteed to work as
"exception" is not a volatile member of DefaultExchange. Thus the main
thread is not guaranteed to see an exception set on the exchange by a worker
thread. The worker threads themselves all use a ReentrantLock I provide as
an exchange property (the property map is a ConcurrentHashMap) to
synchronize their access to the exchange. So data visibility between the
worker threads should not be an issue. 
But how to do it for the main thread?

I had a look at the Splitter implementation and how it deals with parallel
execution and aggregation. As far as I understand it the Splitter enforces
visibility by returning an AtomicReference of an exchange that the main
thread has never seen before. The exchange referenced by the atomic
reference is treated as read-only after it is set on the atomic reference.
So a get() on the atomic reference in the main thread is guaranteed to see
the latest version of the referenced exchange instance.

I cannot apply this approach as it requires the main thread to wait for the
workers to finish processing. If I block the main thread my workers would be
blocked as well as the main thread is responsible for consuming the modified
input stream contents from the last PipedInputStream element and forwarding
it to the web server. 

I also could not find a factory mechanism that would allow me to tell Camel
to instantiate my own implementation of the Exchange interface (with a
volatile member "exception").
(DefaultException was also not written with sub-classing in mind, it
appears. E.g. DefaultException::isFailed() uses the private member
"exception" instead of DefaultException::getException() for the answer. But
that's a separate issue.)

Does anyone have any other ideas?



Thanks!
Ralf



--
View this message in context: 
http://camel.465427.n5.nabble.com/Concurrent-modification-of-DefaultExchange-tp5793968.html
Sent from the Camel - Users mailing list archive at Nabble.com.