Hi,

I want to listen to Zookeeper node change in Spark closure to change behavior 
dynamically. So I use Class PathChildrenCache provided by Curator. But the 
changes are not captured somehow.

main function in the closure
public class DpiScenarioBasedFilter implements 
FlatMapFunction<Iterator<DPIFlow>, Tuple2<String, String>> {
...
@Override
    public Iterator<Tuple2<String, String>> call(Iterator<DPIFlow> iter) throws 
Exception {
        if(!scenarioRuleManager.isRegistered()){ //scenarioRuleManager is an 
instance of ScenarioRuleManager class, which is responsible for listening 
Zookeeper
            synchronized(scenarioRuleManager){
                if(!scenarioRuleManager.isRegistered()){
                    
ZkRuleManager.getInstance(zkConfig).register(scenarioRuleManager);
                    scenarioRuleManager.setRegistered(true);
                }
            }
        }
        List<Tuple2<String, String>> list = new LinkedList<Tuple2<String, 
String>>();
        Map<String, IRule> rules = scenarioRuleManager.getRules();

        while(iter.hasNext()){
            DPIFlow df = iter.next();
            Key k = df.getKey();
            String isdn = k.getIsdn();
            String protocol = k.getProtocol();

            list.add(new Tuple2<String, String>(rules.size() + "", isdn));
        }
        return list.iterator();
    }
...
}

Zookeeper listener
public class ScenarioRuleManager implements IRuleChangeListener, Serializable {

    private ConcurrentHashMap<String, IRule> rules;

    private volatile boolean registered = false;

    @Override
    public void onCreate(String key) { //declared in IRuleChangeListener
        rules.put(key, rule);
    }

    @Override
    public void onDelete(String key) {//declared in IRuleChangeListener
       rules.remove(key);
    }

    @Override
    public void onUpdate(String key) {//declared in IRuleChangeListener
        rules.put(key, rule);
    }
}

Zookeeper listener management
public class ZkRuleManager implements Closeable {
    private static volatile ZkRuleManager INSTANCE = null;
    private static Object _lock = new Object();

    private CuratorFramework _curator;
    private ZkConfig _zkConfig;

    private String zkRoot = "/rtm/keys";

    private ExecutorService es = Executors.newCachedThreadPool();
    private IRuleChangeListener listener;
    private PathChildrenCache childCache;

    private ZkRuleManager(ZkConfig zkConfig) {
        _zkConfig = zkConfig;
        _curator = ZkClientHolder.getZkClient(zkConfig);
        _curator.start();
        init();
        childCache = new PathChildrenCache(_curator, zkRoot, true);
        try{
            childCache.start(StartMode.BUILD_INITIAL_CACHE);
        }catch(Exception e){
            throw new RuntimeException(e);
        }
    }

    private void init() {
        try{
            createZkRoot();
        }catch(Exception e){
            e.printStackTrace();
        }
    }

    public void register(final IRuleChangeListener listener) {
        childCache.getListenable().addListener(new PathChildrenCacheListener() {

            @Override
            public void childEvent(CuratorFramework cf, PathChildrenCacheEvent 
pcce) throws Exception {
                switch(pcce.getType()){
                    case CHILD_ADDED:
                        listener.onCreate(new String(pcce.getData().getData()));
                        break;
                    case CHILD_REMOVED:
                        listener.onDelete(new String(pcce.getData().getData()));
                        break;
                    case CHILD_UPDATED:
                        listener.onUpdate(new String(pcce.getData().getData()));
                        break;
                    default:
                        break;
                }
            }
        }, es);

    }

    private void createZkRoot() throws Exception {
        if(_curator.checkExists().forPath(zkRoot) == null){
            
_curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(zkRoot);
        }
    }

    public static ZkRuleManager getInstance(ZkConfig zkConfig) {
        if(INSTANCE == null){
            synchronized(_lock){
                if(INSTANCE == null){
                    INSTANCE = new ZkRuleManager(zkConfig);
                }
            }
        }
        return INSTANCE;
    }

    public CuratorFramework getCurator() {
        return _curator;
    }

    public void writeRule(String ruleKey) throws Exception {
        String path = ZKPaths.makePath(zkRoot, ruleKey);
        if(_curator.checkExists().forPath(path) == null){//节点不存在
            
_curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path,
 ruleKey.getBytes());
        }else{
            _curator.setData().forPath(path, ruleKey.getBytes());
        }
    }

    @Override
    public void close() {
        try{
            childCache.close();
        }catch(IOException e){
            e.printStackTrace();
        }
        _curator.close();
    }
}
What's the problem?
I am not quite clear about Spark's mechanics. I know that the tasks(defined in 
DpiScenarioBasedFilter in this case) are executed on worker node after the 
instances of the class are deserialized. How many instances are there? One per 
task? Are they equal but not the same one for each JVM? If yes, do their same 
fields(scenarioRuleManager) refer to only one object or different ones?

Could anyone help me?
________________________________
[email protected]

Reply via email to