Hi

I have committed a first cut of this feature in rev 830861.

Please feel free to take a look.

And feedback on the locking in ThrottlingRoutePolicy class is welcome.
I did consider using try locks in case there was a race, as after all
we just want at least one of them to acquire the lock and invoke the
suspension / resume.
There may be clever ways to archive this.

For example running it by throttling this route
    <bean id="myPolicy" class="org.apache.camel.impl.ThrottlingRoutePolicy">
        <property name="maxInflightExchanges" value="16"/>
        <property name="resumePercentOfMax" value="25"/>
    </bean>

    <camelContext xmlns="http://camel.apache.org/schema/spring";>
        <route routePolicyRef="myPolicy">
            <from uri="activemq:queue:foo?concurrentConsumers=20"/>
            <transacted/>
            <delay><constant>100</constant></delay>
            <to uri="log:foo?groupSize=10"/>
            <to uri="mock:result"/>
        </route>
    </camelContext>

Will yield this output. Yes its throttling very often but this is due
to the low number of concurrent consumers and the fact the exchanges
are routed at a constant pace.



2009-10-29 10:02:12,788 [main           ] INFO  DefaultCamelContext
        - Apache Camel 2.1-SNAPSHOT (CamelContext:camelContext)
started
2009-10-29 10:02:13,179 [enerContainer-6] INFO  foo
        - Received: 10 messages so far. Last group took: 4 millis
which is: 2,500 messages per second. average: 2,500
2009-10-29 10:02:13,184 [nerContainer-19] INFO  ThrottlingRoutePolicy
        - Throtteling consumer: 20 > 16 inflight exchange by
suspending consumer.
2009-10-29 10:02:13,193 [nerContainer-13] INFO  ThrottlingRoutePolicy
        - Throtteling consumer: 4 <= 4 inflight exchange by resuming
consumer.
2009-10-29 10:02:13,203 [nerContainer-16] INFO  foo
        - Received: 20 messages so far. Last group took: 25 millis
which is: 400 messages per second. average: 689.655
2009-10-29 10:02:13,302 [nerContainer-10] INFO  ThrottlingRoutePolicy
        - Throtteling consumer: 20 > 16 inflight exchange by
suspending consumer.
2009-10-29 10:02:13,320 [enerContainer-9] INFO  foo
        - Received: 30 messages so far. Last group took: 117 millis
which is: 85.47 messages per second. average: 205.479
2009-10-29 10:02:13,333 [nerContainer-18] INFO  foo
        - Received: 40 messages so far. Last group took: 13 millis
which is: 769.231 messages per second. average: 251.572
2009-10-29 10:02:13,344 [enerContainer-3] INFO  ThrottlingRoutePolicy
        - Throtteling consumer: 4 <= 4 inflight exchange by resuming
consumer.
2009-10-29 10:02:13,444 [nerContainer-10] INFO  ThrottlingRoutePolicy
        - Throtteling consumer: 20 > 16 inflight exchange by
suspending consumer.
2009-10-29 10:02:13,446 [nerContainer-15] INFO  foo
        - Received: 50 messages so far. Last group took: 113 millis
which is: 88.496 messages per second. average: 183.824
2009-10-29 10:02:13,455 [enerContainer-3] INFO  ThrottlingRoutePolicy
        - Throtteling consumer: 4 <= 4 inflight exchange by resuming
consumer.
2009-10-29 10:02:13,457 [nerContainer-11] INFO  foo
        - Received: 60 messages so far. Last group took: 11 millis
which is: 909.091 messages per second. average: 212.014
2009-10-29 10:02:13,560 [enerContainer-2] INFO  ThrottlingRoutePolicy
        - Throtteling consumer: 20 > 16 inflight exchange by
suspending consumer.
2009-10-29 10:02:13,570 [nerContainer-13] INFO  foo
        - Received: 70 messages so far. Last group took: 113 millis
which is: 88.496 messages per second. average: 176.768
2009-10-29 10:02:13,574 [nerContainer-11] INFO  foo
        - Received: 80 messages so far. Last group took: 4 millis
which is: 2,500 messages per second. average: 200
2009-10-29 10:02:13,574 [nerContainer-16] INFO  ThrottlingRoutePolicy
        - Throtteling consumer: 4 <= 4 inflight exchange by resuming
consumer.
2009-10-29 10:02:13,660 [main           ] INFO  MockEndpoint
        - Asserting: Endpoint[mock://result] is satisfied
2009-10-29 10:02:13,681 [nerContainer-20] INFO  ThrottlingRoutePolicy
        - Throtteling consumer: 20 > 16 inflight exchange by
suspending consumer.
2009-10-29 10:02:13,693 [nerContainer-13] INFO  foo
        - Received: 90 messages so far. Last group took: 119 millis
which is: 84.034 messages per second. average: 173.41
2009-10-29 10:02:13,701 [enerContainer-5] INFO  foo
        - Received: 100 messages so far. Last group took: 8 millis
which is: 1,250 messages per second. average: 189.753
2009-10-29 10:02:13,725 [nerContainer-16] INFO  ThrottlingRoutePolicy
        - Throtteling consumer: 4 <= 4 inflight exchange by resuming
consumer.
2009-10-29 10:02:13,809 [enerContainer-2] INFO  ThrottlingRoutePolicy
        - Throtteling consumer: 20 > 16 inflight exchange by
suspending consumer.
2009-10-29 10:02:13,826 [enerContainer-4] INFO  foo
        - Received: 110 messages so far. Last group took: 125 millis
which is: 80 messages per second. average: 168.712
2009-10-29 10:02:13,837 [enerContainer-5] INFO  foo
        - Received: 120 messages so far. Last group took: 11 millis
which is: 909.091 messages per second. average: 180.995
2009-10-29 10:02:13,851 [enerContainer-3] INFO  ThrottlingRoutePolicy
        - Throtteling consumer: 4 <= 4 inflight exchange by resuming
consumer.
2009-10-29 10:02:13,941 [nerContainer-15] INFO  ThrottlingRoutePolicy
        - Throtteling consumer: 20 > 16 inflight exchange by
suspending consumer.
2009-10-29 10:02:13,952 [nerContainer-19] INFO  foo
        - Received: 130 messages so far. Last group took: 115 millis
which is: 86.957 messages per second. average: 167.095
2009-10-29 10:02:13,957 [enerContainer-9] INFO  ThrottlingRoutePolicy
        - Throtteling consumer: 4 <= 4 inflight exchange by resuming
consumer.
2009-10-29 10:02:13,963 [nerContainer-16] INFO  foo
        - Received: 140 messages so far. Last group took: 11 millis
which is: 909.091 messages per second. average: 177.44
2009-10-29 10:02:14,071 [nerContainer-18] INFO  foo
        - Received: 150 messages so far. Last group took: 108 millis
which is: 92.593 messages per second. average: 167.224
2009-10-29 10:02:14,171 [enerContainer-6] INFO  ThrottlingRoutePolicy
        - Throtteling consumer: 20 > 16 inflight exchange by
suspending consumer.
2009-10-29 10:02:14,177 [nerContainer-14] INFO  foo
        - Received: 160 messages so far. Last group took: 106 millis
which is: 94.34 messages per second. average: 159.521
2009-10-29 10:02:14,198 [enerContainer-9] INFO  foo
        - Received: 170 messages so far. Last group took: 21 millis
which is: 476.19 messages per second. average: 166.016
2009-10-29 10:02:14,199 [nerContainer-13] INFO  ThrottlingRoutePolicy
        - Throtteling consumer: 4 <= 4 inflight exchange by resuming
consumer.
2009-10-29 10:02:14,306 [nerContainer-14] INFO  ThrottlingRoutePolicy
        - Throtteling consumer: 20 > 16 inflight exchange by
suspending consumer.
2009-10-29 10:02:14,314 [nerContainer-12] INFO  foo
        - Received: 180 messages so far. Last group took: 116 millis
which is: 86.207 messages per second. average: 157.895
2009-10-29 10:02:14,317 [nerContainer-18] INFO  foo
        - Received: 190 messages so far. Last group took: 3 millis
which is: 3,333.333 messages per second. average: 166.229
2009-10-29 10:02:14,317 [nerContainer-10] INFO  ThrottlingRoutePolicy
        - Throtteling consumer: 4 <= 4 inflight exchange by resuming
consumer.
2009-10-29 10:02:14,432 [enerContainer-3] INFO  foo
        - Received: 200 messages so far. Last group took: 115 millis
which is: 86.957 messages per second. average: 158.983
2009-10-29 10:02:14,434 [main           ] INFO  DefaultCamelContext
        - Apache Camel 2.1-SNAPSHOT (CamelContext:camelContext) is
stopping

On Wed, Oct 28, 2009 at 3:15 PM, Claus Ibsen <[email protected]> wrote:
> Hi
>
> I have done some experiment and I am setting down for something like this
>
> Where we can configure a route policy to use.
> In this case we use a throtteling policy that is based on the number
> of in flight exchanges.
>
> In this example at most 16 in flights messages is aimed for. It can go higher.
> But when the policy is investigated it will react if the current in
> flight > max.
> If so it will suspend/stop the consumer. And when it goes down to 25%
> of 16 (=4) then it will resume / start the consumer again.
>
> Apache CXF have already this feature. So its inspired from this project.
>
>    <bean id="myPolicy" class="org.apache.camel.impl.ThrottelingRoutePolicy">
>        <property name="maxInflightExchanges" value="16"/>
>        <property name="reconnectPercentOfMax" value="25"/>
>    </bean>
>
>    <camelContext xmlns="http://camel.apache.org/schema/spring";>
>        <route routePolicyRef="myPolicy">
>            <from uri="activemq:queue:foo?concurrentConsumers=20"/>
>            <transacted/>
>            <delay><constant>100</constant></delay>
>            <to uri="log:foo?groupSize=10"/>
>            <to uri="mock:result"/>
>        </route>
>    </camelContext>
>
>
>
> You can of course implement your own route policy and do whatever you like.
>
> The route policy is this interface. I amy want to adjust the naming of
> that method and maybe change it to be more routish.
> Its based on a callback that is invoked when an Exchange is done being
> routed. So maybe something as:
>
>      void onRouteDone(Route route, Exchange exchange);
>
> is better as its more clear to the point. And you can get the Consumer
> from the Route so you can grab it and susped/stop it on the fly.
>
>
> public interface RoutePolicy {
>
>    /**
>     * Callback invokes when a {...@link Consumer} have consumed and
> created an {...@link Exchange}
>     *
>     * @param consumer  the consumer
>     * @param exchange  the created exchange
>     */
>    void onConsume(Consumer consumer, Exchange exchange);
> }
>
>
>
>
> On Wed, Oct 28, 2009 at 8:15 AM, Claus Ibsen <[email protected]> wrote:
>> Hi
>>
>> A bit of background is CAMEL-1048
>> https://issues.apache.org/activemq/browse/CAMEL-1048
>>
>> I am looking into this how to implement this nicely in Camel.
>>
>> CAMEL-1048 is maybe a bit too much at current requirements from Camel users.
>> What they are looking for is to be able to dynamic throttle consumers.
>>
>> Apache CXF and ServiceMix has such a feature specially build in their
>> JMS components.
>> What it does is that it can stop the JMS listener when there are too
>> many messages in flight.
>>
>> But on the other hand we have also had Camel users wanting to dynamic
>> start/stop consumers depending on some flag of some sort.
>> So there is also grounds to make this a general solution that can
>> cater both use cases.
>>
>> I do wonder how to move forward.
>> In CAMEL-1048 there is an example using requires to associate a route
>> that this predicate must be true for the route to be running.
>> I do not like the naming requires or require. And having it in the
>> fluent builder / DSL makes it fixed.
>>
>> Wonder if we should name such a configuration a RoutePolicy so you can
>> configure it as
>>
>> <route routePolicy="myRoutePolicy">
>>   <from .../>
>>  ...
>> </route>
>>
>> <bean id="myRoutePolicy" class=...>
>>   <property name="maxInflightExchanges" value="500"/>
>> </bean>
>>
>> This is quite flexible as we can just offer a SPI interface for the
>> RoutePolicy and have people implement it as how they like it.
>> For example as above something that is controlled by the number of
>> current in flight exchanges.
>>
>> Others can control it by CPU utliization, a switch, timer based etc.
>>
>>
>> In the Java DSL its
>>
>> from("xxx").routePolicy(myRoutePolicy).to(yyyy);
>>
>>
>>
>>
>> --
>> Claus Ibsen
>> Apache Camel Committer
>>
>> Author of Camel in Action: http://www.manning.com/ibsen/
>> Open Source Integration: http://fusesource.com
>> Blog: http://davsclaus.blogspot.com/
>> Twitter: http://twitter.com/davsclaus
>>
>
>
>
> --
> Claus Ibsen
> Apache Camel Committer
>
> Author of Camel in Action: http://www.manning.com/ibsen/
> Open Source Integration: http://fusesource.com
> Blog: http://davsclaus.blogspot.com/
> Twitter: http://twitter.com/davsclaus
>



-- 
Claus Ibsen
Apache Camel Committer

Author of Camel in Action: http://www.manning.com/ibsen/
Open Source Integration: http://fusesource.com
Blog: http://davsclaus.blogspot.com/
Twitter: http://twitter.com/davsclaus

Reply via email to