I have some distributed processing that would need to message after the
completion of all the sub-tasks, which is working via Beakstalkd. The design
was based upon existing Python implementations, where the jobs are posted to
a queue and responses are posted to a unique queue based upon the parent job
id. The issue is that it doesn't appear to sent the delete message command
until the end, and even then it sends the same one repeatedly, the message
attempts to delete the last Beanstalkd message the number of time that it
read from any of the queues. I'm assuming that pollEnrich is the incorrect
way of consolidating the messages, but searching for "message dependencies"
produces a lot of noise on the camel website as people talk about Camel/Java
dependencies.
So for example from a SpringBoot, posting a message to the
"dev_job_listener" queue:
{
"ttl": "20",
"parent_job_id":"9999",
"split_count": 5
}
Then posting 5 messages to the "dev_job_9999" queue
This produces the following message:
2016-03-21 11:29:24.588 INFO 30185 --- [nstalk-Producer] route1
: Ending Job : Simple: 139787
2016-03-21 11:29:24.621 WARN 30185 --- [nstalk-Consumer]
o.a.c.c.b.processors.DeleteCommand : Failed to delete job 139787
2016-03-21 11:29:24.640 WARN 30185 --- [nstalk-Consumer]
o.a.c.c.b.processors.DeleteCommand : Failed to delete job 139787
2016-03-21 11:29:24.661 WARN 30185 --- [nstalk-Consumer]
o.a.c.c.b.processors.DeleteCommand : Failed to delete job 139787
2016-03-21 11:29:24.681 WARN 30185 --- [nstalk-Consumer]
o.a.c.c.b.processors.DeleteCommand : Failed to delete job 139787
2016-03-21 11:29:24.704 WARN 30185 --- [nstalk-Consumer]
o.a.c.c.b.processors.DeleteCommand : Failed to delete job 139787
Where job 139787 is the parent job.
@Configuration
public class ApplicationConfig {
@Bean
RouteBuilder exampleRoute() {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
/**
* Step 1: Read a message from the listener tube, this will
take the form:
* {
* parent_job_id : The parent job id
* split_count: Number of responses to expect
* }
**/
from("beanstalk://localhost/dev_job_listener?onFailure=release&jobDelay=20&jobTimeToRun=10")
.unmarshal().json(JsonLibrary.Jackson, Map.class)
.setProperty("message", simple("${body}"))
// Get required tube jobId to allow deletion of job at
end of this route
.process(new Processor() {
@Override
public void process(Exchange exchange) throws
Exception {
final Deque<Long> stack = new ArrayDeque<>();
stack.push(Long.valueOf(exchange.getIn().getHeader("beanstalk.jobId").toString()));
exchange.setProperty("stack", stack);
Map<String, Object> message = (HashMap)
exchange.getProperties().get("message");
exchange.setProperty("parent_job_id",
Integer.valueOf(message.get("parent_job_id").toString()));
exchange.setProperty("message_count",
Integer.valueOf(message.get("split_count").toString()));
}
})
/**
* Step 2: Listed to the response queue
job_{parent_job_id} for the number of completion messages
* from the split_count.
*/
.to("direct:verifySubJobs").end()
/**
* Step 3: Post to the complete queue that all jobs have
completed
*/
.to("beanstalk://localhost/next_step?jobTimeToRun=10")
.process(new Processor() {
@Override
public void process(Exchange exchange) throws
Exception {
log("next_step job id : " +
exchange.getIn().getHeader("beanstalk.jobId").toString());
exchange.getIn().setHeader("beanstalk.jobId",
exchange.getProperty("stack", Deque.class).pop());
}
})
.log("Ending Job : " +
simple("${header.beanstalk.jobId}"));
from("direct:verifySubJobs")
.log("-> direct:verifySubJobs")
.setProperty("url",
simple("beanstalk://localhost/dev_job_" +
simple("${property.parent_job_id}").getText() +
"?onFailure=release&jobTimeToRun=10"))
.pollEnrich().simple("${property.url}")
.process(new Processor() {
@Override
public void process(Exchange exchange) throws
Exception {
Deque<Long> stack =
exchange.getProperty("stack", Deque.class);
stack.push(Long.valueOf(exchange.getIn().getHeader("beanstalk.jobId").toString()));
Integer counter =
exchange.getProperty("message_count", Integer.class);
exchange.setProperty("message_count",
counter);
}
}).end()
.choice().when(simple("${property.message_count}
> 0"))
.to("direct:verifySubJobs")
.end()
.process(new Processor() {
@Override
public void process(Exchange exchange) throws
Exception {
exchange.getIn().setHeader("beanstalk.jobId", exchange.getProperty("stack",
Deque.class).pop());
}
}).end()
.log("<- direct:verifySubJobs");
}
};
}
}
How do I close off message when complete, or perhaps what is the correct
method of chaining message dependencies with Camel?
--
View this message in context:
http://camel.465427.n5.nabble.com/Correct-pattern-for-message-dependencies-tp5779384.html
Sent from the Camel - Users mailing list archive at Nabble.com.