I have two routes that I integrated with ZooKeeperRoutePolicy to establish
basic distributed route election.
(Using camel 2.13.1)
When used with one route instance, election/failover works fine.
When I set up multiple routes with multiple instances of ZooKeeperRoutePolicy,
one route almost always gets immediately shut down but NOT 100% of the time.
The behavior seems nondeterministic- like a race condition.
Each RouteBuilder instance has a separate instance of a ZooKeeperRoutePolicy
that uses the same server but a different zookeeper znode:
Like this:
Route 1: zookeeper URI: localhost:2181/scheduler-node-election/test
Route 2: zookeeper URI: localhost:2181/scheduler-node-election/googleplay
In the zookeeper CLI:
[zk: localhost:2181(CONNECTED) 6] ls /scheduler-node-election
[googleplay, test]
(I have one zookeeper node running)
I’m seeing that most of the time one route (either one) just gets shutdown
during Spring initialization, while the other one runs. But once in a while,
both routes will run.
In this case the “googleplay” route is shut down and the associated session is
closed.
Here’s the Camel/Zookeeper logging:
[timer://test-#2] org.apache.zookeeper.ZooKeeper : Initiating client
connection, connectString=127.0.0.1:2181 sessionTimeout=5000
watcher=org.apache.camel.component.zookeeper.ConnectionHolder@733ed393
[//googleplay-#1] org.apache.zookeeper.ZooKeeper : Initiating client
connection, connectString=127.0.0.1:2181 sessionTimeout=5000
watcher=org.apache.camel.component.zookeeper.ConnectionHolder@21b49301
[127.0.0.1:2181)] org.apache.zookeeper.ClientCnxn : Opening socket
connection to server 127.0.0.1/127.0.0.1:2181. Will not attempt to authenticate
using SASL (unknown error)
[127.0.0.1:2181)] org.apache.zookeeper.ClientCnxn : Opening socket
connection to server 127.0.0.1/127.0.0.1:2181. Will not attempt to authenticate
using SASL (unknown error)
[127.0.0.1:2181)] org.apache.zookeeper.ClientCnxn : Socket connection
established to 127.0.0.1/127.0.0.1:2181, initiating session
[127.0.0.1:2181)] org.apache.zookeeper.ClientCnxn : Socket connection
established to 127.0.0.1/127.0.0.1:2181, initiating session
[127.0.0.1:2181)] org.apache.zookeeper.ClientCnxn : Session
establishment complete on server 127.0.0.1/127.0.0.1:2181, sessionid =
0x146de3b813e0032, negotiated timeout = 5000
[127.0.0.1:2181)] org.apache.zookeeper.ClientCnxn : Session
establishment complete on server 127.0.0.1/127.0.0.1:2181, sessionid =
0x146de3b813e0031, negotiated timeout = 5000
[timer://test-#2] o.a.c.c.zookeeper.ZookeeperProducer : Node
'/scheduler-node-election/test/localhost-3fae9f79-a91c-4baa-bdcc-bd23a2623690'
did not exist, creating it.
[//googleplay-#1] o.a.c.c.zookeeper.ZookeeperProducer : Node
'/scheduler-node-election/googleplay/localhost-2b2634e5-a0b3-4c0f-8ddb-b44e008854db'
did not exist, creating it.
[timer://test-#2] o.a.c.c.z.policy.ZooKeeperElection : Candidate node
'/scheduler-node-election/test/localhost-3fae9f79-a91c-4baa-bdcc-bd23a2623690'
has been created
[//googleplay-#1] o.a.c.c.z.policy.ZooKeeperElection : Candidate node
'/scheduler-node-election/googleplay/localhost-2b2634e5-a0b3-4c0f-8ddb-b44e008854db'
has been created
[//googleplay-#1] o.a.camel.spring.SpringCamelContext : Route:
election-route-localhos started and consuming from:
Endpoint[zookeeper://127.0.0.1:2181/scheduler-node-election/googleplay]
[timer://test-#2] o.a.camel.impl.DefaultShutdownStrategy : Starting to
graceful shutdown 1 routes (timeout 300 seconds)
[ShutdownTask-#4] o.a.camel.impl.DefaultShutdownStrategy : Route:
election-route-localhos shutdown complete, was consuming from:
Endpoint[zookeeper://127.0.0.1:2181/scheduler-node-election/googleplay]
[timer://test-#2] o.a.camel.impl.DefaultShutdownStrategy : Graceful shutdown
of 1 routes completed in 0 seconds
[timer://test-#2] o.a.camel.spring.SpringCamelContext : Route:
election-route-localhos is stopped, was consuming from:
Endpoint[zookeeper://127.0.0.1:2181/scheduler-node-election/googleplay]
[-#1-EventThread] org.apache.zookeeper.ClientCnxn : EventThread shut
down
[timer://test-#2] org.apache.zookeeper.ZooKeeper : Session:
0x146de3b813e0031 closed
[timer://test-#2] o.a.camel.spring.SpringCamelContext : Route:
election-route-localhos is shutdown and removed, was consuming from:
Endpoint[zookeeper://127.0.0.1:2181/scheduler-node-election/googleplay]
[timer://test-#2] o.a.camel.spring.SpringCamelContext : Route:
election-route-localhos started and consuming from:
Endpoint[zookeeper://127.0.0.1:2181/scheduler-node-election/test]
My route builder code is below. I call it twice, once for vendor = “test”, and
once for vendor = “googleplay”.
private RouteBuilder scheduledEventBatchExecutionTimerRoute(
final String vendor,
final int batchSize,
final int timerPeriodSeconds,
final boolean autoStart) {
log.info("scheduledEventBatchExecutionTimerRoute: vendor={},
doStart={}, queryInterval={}s, batchSize={}",
vendor, autoStart, timerPeriodSeconds, batchSize);
return new RouteBuilder() {
@Override
public void configure() throws Exception {
from("timer://" + vendor + "?period=" + timerPeriodSeconds +
"s")
.autoStartup(autoStart)
.routePolicy(electionRoutePolicy(vendor))
.setHeader("vendor", constant(vendor)).setHeader("batchSize",
constant(batchSize))
.beanRef("scheduleEventRepository",
"findExecutableEvents(entitlement.create.${in.header.vendor},
${in.header.batchSize})")
.choice()
.when().simple("${body.size} == 0")
.log(LoggingLevel.DEBUG, "Route:
vendor=${in.header.vendor}: No items to process")
.when().simple("${body.size} > 0")
.to(ROUTE_DIRECT_EVENT_BATCH_EXECUTION + "-" + vendor)
.endChoice()
.routeId(vendor + "-timer-execution-route");
}
};
}
private RoutePolicy electionRoutePolicy(String vendor) {
RoutePolicy routePolicy;
if (doUseDistributedRouteElection) {
log.info("Creating zookeeper distributed route election policy for
{}", vendor);
// Ensure that only one node is processing work for the specified
vendor.
// See http://camel.apache.org/zookeeper.html - distributed route
policy.
routePolicy = new ZooKeeperRoutePolicy(zookeeperUri +
"scheduler-node-election/" + vendor, 1);
} else {
// Just return a default route policy that does nothing.
log.info("No-op route policy for {}", vendor);
routePolicy = new RoutePolicySupport() {
@Override
public void onInit(Route route) {
super.onInit(route);
}
};
}
return routePolicy;
}