The camel version I am using is: 2.24.1 but I also tried this with version
2.23.4
I have the following route that I simplified for the purpose of this
report. This simplified route was tested and, in terms of the bug, behaves
the same way as the the original route.
from("timer:some-timer?period=5s")
//This query select all the rows/values from tableA that are
not in tableB
.to("sql:select a.fieldA from tableA a left join tableB b on
(a.fieldA=b.fieldB) where fieldB is
null;?dataSource=dataSource&outputType=SelectList")
.log(LoggingLevel.INFO, "LOG1 - MAIN ROUTE - BEFORE SPLIT")
.to("direct:split")
.to("direct:change-datasource");
from("direct:split")
.split(body()).stopOnException().parallelProcessing()
.log(LoggingLevel.INFO, "LOG2 - SPLIT ROUTE - BEFORE INSERT")
.to("sql:insert into tableB (fieldB) values
(:#${body[fieldA]});?dataSource=dataSource")
.log(LoggingLevel.INFO, "LOG3 - SPLIT ROUTE - AFTER INSERT")
.end();
from("direct:change-datasource")
.log(LoggingLevel.INFO, "LOG4 - CHANGE DATASOURCE ROUTE - AFTER SPLIT")
.bean("changeDataSourceBean","changeDataSource");
*How the route (should) work:*
1 - the route triggers with a timer every 5 seconds
2 - reads a list of values from a tableA, that are not in other tableB
3 - iterate over the list in parallel
3.1 insert the iterated item in tableB.
4 - change the data source using AbstractRoutingDataSource
<https://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/jdbc/datasource/lookup/AbstractRoutingDataSource.html>
.
*Current buggy behavior:*
Step 4 is executed before step 3.1. Even though the logs show in the
correct order. The change of data source(step 4) happens before the value
is inserted in tableB (step 3.1).
So even when in tableA is only one value from dataSource1, that value is
inserted in tableB but in the dataSource2.
*This behavior not happen when I remove '.parallelProcessing()'. When
removed the application behaves as expected.*
*Expected behavior:*
Step 4 should be executed ONLY after step 3 fully finished. Reference:
documentation
<https://camel.apache.org/manual/latest/split-eip.html>of the splitter
states:
parallelProcessing
If enabled then processing each splitted messages occurs concurrently.*
Note the caller thread will still wait until all messages has been fully
processed, before it continues. Its only processing the sub messages from
the splitter which happens concurrently.*
Below are the logs from a two tests I made. One test using
'.parallelProcessing()' and other without '.parallelProcessing()'. Both
test are executed with a "pool " of 3 dataSources (dataSource1,
dataSource2, dataSource3). Only tableA from dataSource2 has one row. All
the other tables from all dataSources are empty.
*Logs using '.parallelProcessing()':*
Incorrect behavior the value from table from dataSource2 should be
processed only once but it loops with no end:
//dataSource1
2019-10-01 20:50:33.990 INFO 1408 --- [er://some-timer] route1
: LOG1 - MAIN ROUTE - BEFORE SPLIT
2019-10-01 20:50:34.000 INFO 1408 --- [er://some-timer] route3
: LOG4 - CHANGE DATASOURCE ROUTE - AFTER SPLIT
//dataSource2
*2019-10-01 20:50:38.653 INFO 1408 --- [er://some-timer] route1
: LOG1 - MAIN ROUTE - BEFORE SPLIT2019-10-01
20:50:38.666 INFO 1408 --- [read #2 - Split] route2
: LOG2 - SPLIT ROUTE - BEFORE INSERT*
*2019-10-01 20:50:40.644 INFO 1408 --- [read #2 - Split] route2
: LOG3 - SPLIT ROUTE - AFTER INSERT2019-10-01
20:50:40.648 INFO 1408 --- [er://some-timer] route3
: LOG4 - CHANGE DATASOURCE ROUTE - AFTER SPLIT*
//dataSource3
2019-10-01 20:50:44.430 INFO 1408 --- [er://some-timer] route1
: LOG1 - MAIN ROUTE - BEFORE SPLIT
2019-10-01 20:50:44.431 INFO 1408 --- [er://some-timer] route3
: LOG4 - CHANGE DATASOURCE ROUTE - AFTER SPLIT
//dataSource1
2019-10-01 20:50:49.246 INFO 1408 --- [er://some-timer] route1
: LOG1 - MAIN ROUTE - BEFORE SPLIT
2019-10-01 20:50:49.246 INFO 1408 --- [er://some-timer] route3
: LOG4 - CHANGE DATASOURCE ROUTE - AFTER SPLIT
//dataSource2
*2019-10-01 20:50:54.054 INFO 1408 --- [er://some-timer] route1
: LOG1 - MAIN ROUTE - BEFORE SPLIT2019-10-01
20:50:54.059 INFO 1408 --- [read #4 - Split] route2
: LOG2 - SPLIT ROUTE - BEFORE INSERT2019-10-01 20:50:56.306
INFO 1408 --- [read #4 - Split] route2 :
LOG3 - SPLIT ROUTE - AFTER INSERT*
*2019-10-01 20:50:56.307 INFO 1408 --- [er://some-timer] route3
: LOG4 - CHANGE DATASOURCE ROUTE - AFTER SPLIT*
//dataSource3
2019-10-01 20:50:59.080 INFO 1408 --- [er://some-timer] route1
: LOG1 - MAIN ROUTE - BEFORE SPLIT
2019-10-01 20:50:59.081 INFO 1408 --- [er://some-timer] route3
: LOG4 - CHANGE DATASOURCE ROUTE - AFTER SPLIT
//dataSource1
2019-10-01 20:51:03.966 INFO 1408 --- [er://some-timer] route1
: LOG1 - MAIN ROUTE - BEFORE SPLIT
2019-10-01 20:51:03.967 INFO 1408 --- [er://some-timer] route3
: LOG4 - CHANGE DATASOURCE ROUTE - AFTER SPLIT
//dataSource2
*2019-10-01 20:51:09.004 INFO 1408 --- [er://some-timer] route1
: LOG1 - MAIN ROUTE - BEFORE SPLIT2019-10-01
20:51:09.006 INFO 1408 --- [read #5 - Split] route2
: LOG2 - SPLIT ROUTE - BEFORE INSERT2019-10-01 20:51:11.062
INFO 1408 --- [read #5 - Split] route2 :
LOG3 - SPLIT ROUTE - AFTER INSERT2019-10-01 20:51:11.063 INFO 1408 ---
[er://some-timer] route3 : LOG4 - CHANGE
DATASOURCE ROUTE - AFTER SPLIT*
//dataSource3
2019-10-01 20:51:13.816 INFO 1408 --- [er://some-timer] route1
: LOG1 - MAIN ROUTE - BEFORE SPLIT
2019-10-01 20:51:13.816 INFO 1408 --- [er://some-timer] route3
: LOG4 - CHANGE DATASOURCE ROUTE - AFTER SPLIT
//dataSource1
2019-10-01 20:51:19.041 INFO 1408 --- [er://some-timer] route1
: LOG1 - MAIN ROUTE - BEFORE SPLIT
2019-10-01 20:51:19.042 INFO 1408 --- [er://some-timer] route3
: LOG4 - CHANGE DATASOURCE ROUTE - AFTER SPLIT
...
*Logs not using '.parallelProcessing()':*
//dataSource1
2019-10-01 21:00:10.294 INFO 24400 --- [er://some-timer] route1
: LOG1 - MAIN ROUTE - BEFORE SPLIT
2019-10-01 21:00:10.298 INFO 24400 --- [er://some-timer] route3
: LOG4 - CHANGE DATASOURCE ROUTE - AFTER SPLIT
//dataSource2
*2019-10-01 21:00:15.413 INFO 24400 --- [er://some-timer] route1
: LOG1 - MAIN ROUTE - BEFORE SPLIT2019-10-01
21:00:15.419 INFO 24400 --- [er://some-timer] route2
: LOG2 - SPLIT ROUTE - BEFORE INSERT*
*2019-10-01 21:00:17.559 INFO 24400 --- [er://some-timer] route2
: LOG3 - SPLIT ROUTE - AFTER INSERT2019-10-01
21:00:17.561 INFO 24400 --- [er://some-timer] route3
: LOG4 - CHANGE DATASOURCE ROUTE - AFTER SPLIT*
//dataSource3
2019-10-01 21:00:20.122 INFO 24400 --- [er://some-timer] route1
: LOG1 - MAIN ROUTE - BEFORE SPLIT
2019-10-01 21:00:20.123 INFO 24400 --- [er://some-timer] route3
: LOG4 - CHANGE DATASOURCE ROUTE - AFTER SPLIT
//dataSource1
2019-10-01 21:00:25.239 INFO 24400 --- [er://some-timer] route1
: LOG1 - MAIN ROUTE - BEFORE SPLIT
2019-10-01 21:00:25.240 INFO 24400 --- [er://some-timer] route3
: LOG4 - CHANGE DATASOURCE ROUTE - AFTER SPLIT
//dataSource2
*2019-10-01 21:00:30.052 INFO 24400 --- [er://some-timer] route1
: LOG1 - MAIN ROUTE - BEFORE SPLIT2019-10-01
21:00:30.053 INFO 24400 --- [er://some-timer] route3
: LOG4 - CHANGE DATASOURCE ROUTE - AFTER SPLIT*
//dataSource3
2019-10-01 21:00:35.215 INFO 24400 --- [er://some-timer] route1
: LOG1 - MAIN ROUTE - BEFORE SPLIT
2019-10-01 21:00:35.216 INFO 24400 --- [er://some-timer] route3
: LOG4 - CHANGE DATASOURCE ROUTE - AFTER SPLIT
//dataSource1
2019-10-01 21:00:40.497 INFO 24400 --- [er://some-timer] route1
: LOG1 - MAIN ROUTE - BEFORE SPLIT
2019-10-01 21:00:40.498 INFO 24400 --- [er://some-timer] route3
: LOG4 - CHANGE DATASOURCE ROUTE - AFTER SPLIT
...
*P.S I am not an native english speak, sorry for possible errors*
Regards,
Santiago.