I didn't see the original message from dvlato.

We are doing pretty much exactly what you describe. We are using
blueprint.xml to describe our routes.

We start with a timer
<from id="_timer1" uri="timer:theTimer?period=10s&amp;fixedRate=true"/>


Then we have some intermediate code using a bean that determines status we
are going to look for - in our case records that have changed since a
certain time.

Then we set "body" to the SQL to be executed and use the JDBC component to
execute the query - this is where things get tricky

<to id="jdbcQuery" 
uri="jdbc:TheDataSource?outputType=StreamList&amp;resetAutoCommit=false"/>
            
            <split id="_split1" streaming="true">
  <simple>${body}</simple>
  <aggregate completionSize="100" completionTimeout="5000"
id="_aggregate1" strategyRef="theAggregationStrategy">
    <correlationExpression>
      <constant>true</constant>
      <!-- aggregate all the items. -->
    </correlationExpression>
    <to id="_to4" uri="direct:doStuffWithBatch"/>
  </aggregate>
            </split>


We use outputType="StreamList" to keep from getting too many records at
one time. "resetAutoCommit=false" because of performance problems we were
seeing - it was really slow otherwise.
<split id="_split1" streaming="true"> - processes each record in the
result stream. The <simple>${body}</simple> says we want to work with the
record in the JDBC result.


Then we have an aggregator as you mentioned. The aggregator simply adds
the records to an ArrayList - it batches them.

public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
  Object newBody = newExchange.getIn().getBody();
  ArrayList<Object> list = null;
  if (oldExchange == null) {
    list = new ArrayList<Object>();
    list.add(newBody);
    newExchange.getIn().setBody(list);
    return newExchange;
  } else {
    list = oldExchange.getIn().getBody(ArrayList.class);
    list.add(newBody);
    return oldExchange;
  }
        }


Finally we call the route that does whatever we want with the batch of
records.


We were doing the "split" without an aggregator, but then every record was
being processed individually and this created a lot of SQL traffic on the
insert side - i.e. It was really slow. Using the aggregator improved
performance tremendously.

I hope this helps.

- Jasen.

On 3/26/17, 3:27 AM, "Claus Ibsen" <[email protected]> wrote:

>Yeah
>
>The other sql components are likely a bit better at this, such as
>mybatis or the sql component. For the latter you can set
>useIterator=false to get the List. And you can set that
>maxMessagesPerPoll to 10 but that is after the poll, so try to set it
>on the jdbc driver, eg on template.maxRows=10 for SQL. MyBatis ought
>to have something as well to configure a max limit to the jdbc driver.
>
>The JPA component was originally not intended for arbitrary SQL and
>thus was a 1:1 jpa entity/table thingy, and it could use a bit of love
>to make the consumer to poll X together. You are welcome to log a JIRA
>
>
>On Fri, Mar 24, 2017 at 1:02 PM, dvlato <[email protected]> wrote:
>> Hello,
>> I have just started working with Camel and I have what I believe is a
>>pretty
>> common requirement:  poll the database to fetch all the records with a
>> certain status, and process them in chunks (let's 10 rows at a time). I
>> though I could leverage the existing components to do this easily but I
>>am
>> not finding the correct way:
>>
>> What I have tried so far is to usa the JPA component (as we already
>>have JPA
>> entities in place) to poll the database and use the @Consumed
>>annotation to
>> change the status of the record (some pointers about how to deal with
>> transactionality are also most welcome).  The problem with this
>>approach is:
>> 1) Here the rows are received one by one, and we can later merge them
>>with
>> an aggregator, which seems suboptimal.
>> 2)  Even if we ignored the performance problem with that approach, I'm
>>not
>> sure of how to configure the aggregator to have fixed size blocks but
>>taking
>> into account the whole size of the batch (I mean, if after chunking we
>>only
>> have 5 rows in a message, just return that instead of waiting for the
>>10th -
>> maybe it works like that by default - I don't really know).
>>
>> I've seen that there is a class called  "JpaPollingConsumer" which
>>returns
>> all the rows instead of one by one, but I don't know if there is anyway
>>to
>> use it with a "from()" clause, it seems that Camel calls
>>"createConsumer"
>> and not "createPollingConsumer", right? Is there a way to use
>> from("jpa://...") and have Camel use the polling consumer?  I have tried
>> using pollEnrich but in that case the @Consumed code is not executed. Is
>> there any other alternative?
>>
>>
>>  I apologise for such basic questions, I am sure I could probably
>>figure out
>> a "good enough" approach using my inexistent knowledge of Camel (we have
>> Camel in Action which has been great so far, but I don't see anything in
>> Chapter 6 regarding database polling) but, on the other hand, it's such
>>a
>> common task that someone might have best practices  already available.
>>I
>> have browsed the myBatis and SQL docs and I think that they might be
>>better
>> suited (if I am not using JPA incorrectly, which is quite likely the
>>case),
>> but I will have the same problem of not knowing if I am using Camel
>> correctly.

Reply via email to