Hi,

I'd like to implement a custom Zookeeper data source which reads zookeeper
if NodeDataChange. Now it's not working perfectly because the thread needs
to sleep otherwise it doesn't work.

public static class ZKSource implements SourceFunction<String> {
                private static final long serialVersionUID = 1L;
                private static String zkData;
                private static CuratorFramework client;
                private static boolean isChanged = false;

                @Override
                public void run(SourceContext<String> ctx) throws Exception {
                        // first collect 
                        ctx.collect(getZKData());                       
                
                        while (true) {
                                Thread.sleep(1); // without this doesn't work 
                                if(isChanged){
                                        ctx.collect(getZKData());
                                        isChanged = false;
                                 }                              
                        }
                }

                 @Override
                public void cancel() {
                         client.close();
                }

                private String getZKData() throws Exception {
                        RetryPolicy retryPolicy = new 
ExponentialBackoffRetry(1000, 3);

                        client = 
CuratorFrameworkFactory.newClient(zookeeperConnectionString,
retryPolicy);
                        
                        client.getCuratorListenable().addListener(new 
ZKListener());

                        client.start();
                        zkData = new 
String(client.getData().watched().forPath(node),
StandardCharsets.UTF_8);
                        
                        return zkData;
                }

                static class ZKListener implements CuratorListener {

                        @Override
                        public void eventReceived(CuratorFramework 
curatorFramework, CuratorEvent
curatorEvent) throws    Exception {
                                EventType changeEvent = 
curatorEvent.getWatchedEvent().getType();                               
                                try {
                                        switch (changeEvent){
                                                case NodeDataChanged:
                                                        isChanged = true;       
                                
                                                        break;
                                                default:
                                                        break;
                                        }

                                }

                                catch (Exception e){
                                        LOG.warn("Exception", e);
                                        client.close();
                                }
                        }

                }
        }

I was thinking ctx.collect needs some time to complete and without
Thread.sleep(), ctx.collect() doesn't start and isChanged becomes false
quicker than ctx.collect(). Besides, the time needed to sleep may vary based
on how busy the Flink cluster is which sounds not very robust? 
Would be glad to know any better implementation and mistakes I have made.

Best,

Sendoh



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Need-to-sleep-the-thread-to-let-my-Flink-Zookeeper-datasource-with-NodeDataChanged-work-tp6249.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to