Hi,
I want to listen to Zookeeper node change in Spark closure to change behavior
dynamically. So I use Class PathChildrenCache provided by Curator. But the
changes are not captured somehow.
main function in the closure
public class DpiScenarioBasedFilter implements
FlatMapFunction<Iterator<DPIFlow>, Tuple2<String, String>> {
...
@Override
public Iterator<Tuple2<String, String>> call(Iterator<DPIFlow> iter) throws
Exception {
if(!scenarioRuleManager.isRegistered()){ //scenarioRuleManager is an
instance of ScenarioRuleManager class, which is responsible for listening
Zookeeper
synchronized(scenarioRuleManager){
if(!scenarioRuleManager.isRegistered()){
ZkRuleManager.getInstance(zkConfig).register(scenarioRuleManager);
scenarioRuleManager.setRegistered(true);
}
}
}
List<Tuple2<String, String>> list = new LinkedList<Tuple2<String,
String>>();
Map<String, IRule> rules = scenarioRuleManager.getRules();
while(iter.hasNext()){
DPIFlow df = iter.next();
Key k = df.getKey();
String isdn = k.getIsdn();
String protocol = k.getProtocol();
list.add(new Tuple2<String, String>(rules.size() + "", isdn));
}
return list.iterator();
}
...
}
Zookeeper listener
public class ScenarioRuleManager implements IRuleChangeListener, Serializable {
private ConcurrentHashMap<String, IRule> rules;
private volatile boolean registered = false;
@Override
public void onCreate(String key) { //declared in IRuleChangeListener
rules.put(key, rule);
}
@Override
public void onDelete(String key) {//declared in IRuleChangeListener
rules.remove(key);
}
@Override
public void onUpdate(String key) {//declared in IRuleChangeListener
rules.put(key, rule);
}
}
Zookeeper listener management
public class ZkRuleManager implements Closeable {
private static volatile ZkRuleManager INSTANCE = null;
private static Object _lock = new Object();
private CuratorFramework _curator;
private ZkConfig _zkConfig;
private String zkRoot = "/rtm/keys";
private ExecutorService es = Executors.newCachedThreadPool();
private IRuleChangeListener listener;
private PathChildrenCache childCache;
private ZkRuleManager(ZkConfig zkConfig) {
_zkConfig = zkConfig;
_curator = ZkClientHolder.getZkClient(zkConfig);
_curator.start();
init();
childCache = new PathChildrenCache(_curator, zkRoot, true);
try{
childCache.start(StartMode.BUILD_INITIAL_CACHE);
}catch(Exception e){
throw new RuntimeException(e);
}
}
private void init() {
try{
createZkRoot();
}catch(Exception e){
e.printStackTrace();
}
}
public void register(final IRuleChangeListener listener) {
childCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework cf, PathChildrenCacheEvent
pcce) throws Exception {
switch(pcce.getType()){
case CHILD_ADDED:
listener.onCreate(new String(pcce.getData().getData()));
break;
case CHILD_REMOVED:
listener.onDelete(new String(pcce.getData().getData()));
break;
case CHILD_UPDATED:
listener.onUpdate(new String(pcce.getData().getData()));
break;
default:
break;
}
}
}, es);
}
private void createZkRoot() throws Exception {
if(_curator.checkExists().forPath(zkRoot) == null){
_curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(zkRoot);
}
}
public static ZkRuleManager getInstance(ZkConfig zkConfig) {
if(INSTANCE == null){
synchronized(_lock){
if(INSTANCE == null){
INSTANCE = new ZkRuleManager(zkConfig);
}
}
}
return INSTANCE;
}
public CuratorFramework getCurator() {
return _curator;
}
public void writeRule(String ruleKey) throws Exception {
String path = ZKPaths.makePath(zkRoot, ruleKey);
if(_curator.checkExists().forPath(path) == null){//节点不存在
_curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path,
ruleKey.getBytes());
}else{
_curator.setData().forPath(path, ruleKey.getBytes());
}
}
@Override
public void close() {
try{
childCache.close();
}catch(IOException e){
e.printStackTrace();
}
_curator.close();
}
}
What's the problem?
I am not quite clear about Spark's mechanics. I know that the tasks(defined in
DpiScenarioBasedFilter in this case) are executed on worker node after the
instances of the class are deserialized. How many instances are there? One per
task? Are they equal but not the same one for each JVM? If yes, do their same
fields(scenarioRuleManager) refer to only one object or different ones?
Could anyone help me?
________________________________
[email protected]