[2/3] curator git commit: CURATOR-344 - do watch event processing tasks in the background and limit shared value watcher to valid change events to avoid work on disconnect
CURATOR-344 - do watch event processing tasks in the background and limit shared value watcher to valid change events to avoid work on disconnect Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/022de392 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/022de392 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/022de392 Branch: refs/heads/CURATOR-3.0 Commit: 022de3921a120c6f86cc6e21442327cc04b66cd2 Parents: ef33ccb Author: gtullyAuthored: Thu Aug 18 19:34:10 2016 +0100 Committer: gtully Committed: Tue Aug 30 13:09:56 2016 +0100 -- .../framework/recipes/shared/SharedValue.java | 24 - .../recipes/shared/TestSharedCount.java | 106 +++ 2 files changed, 127 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/curator/blob/022de392/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java -- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java index dddc471..1f9df37 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java @@ -22,6 +22,8 @@ package org.apache.curator.framework.recipes.shared; import com.google.common.base.Function; import com.google.common.base.Preconditions; import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.framework.api.CuratorWatcher; import org.apache.curator.framework.listen.ListenerContainer; import org.apache.curator.framework.state.ConnectionState; @@ -30,6 +32,7 @@ import org.apache.curator.utils.PathUtils; import org.apache.curator.utils.ThreadUtils; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,10 +62,10 @@ public class SharedValue implements Closeable, SharedValueReader @Override public void process(WatchedEvent event) throws Exception { -if ( state.get() == State.STARTED ) +if ( state.get() == State.STARTED && event.getType() != Watcher.Event.EventType.None ) { -readValue(); -notifyListeners(); +// don't block event thread in possible retry +readValueAndNotifyListenersInBackground(); } } }; @@ -248,6 +251,21 @@ public class SharedValue implements Closeable, SharedValueReader updateValue(localStat.getVersion(), bytes); } +private final BackgroundCallback upadateAndNotifyListenerCallback = new BackgroundCallback() { +@Override +public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { +if (event.getResultCode() == KeeperException.Code.OK.intValue()) { +updateValue(event.getStat().getVersion(), event.getData()); +notifyListeners(); +} +} +}; + +private void readValueAndNotifyListenersInBackground() throws Exception +{ + client.getData().usingWatcher(watcher).inBackground(upadateAndNotifyListenerCallback).forPath(path); +} + private void notifyListeners() { final byte[] localValue = getValue(); http://git-wip-us.apache.org/repos/asf/curator/blob/022de392/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java -- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java index a1f4d8c..7939f6e 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java @@ -23,7 +23,11 @@ import com.google.common.collect.Lists; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.CuratorEvent; import
curator git commit: CURATOR-344 - do watch event processing tasks in the background and limit shared value watcher to valid change events to avoid work on disconnect
Repository: curator Updated Branches: refs/heads/master ef33ccb11 -> 022de3921 CURATOR-344 - do watch event processing tasks in the background and limit shared value watcher to valid change events to avoid work on disconnect Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/022de392 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/022de392 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/022de392 Branch: refs/heads/master Commit: 022de3921a120c6f86cc6e21442327cc04b66cd2 Parents: ef33ccb Author: gtullyAuthored: Thu Aug 18 19:34:10 2016 +0100 Committer: gtully Committed: Tue Aug 30 13:09:56 2016 +0100 -- .../framework/recipes/shared/SharedValue.java | 24 - .../recipes/shared/TestSharedCount.java | 106 +++ 2 files changed, 127 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/curator/blob/022de392/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java -- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java index dddc471..1f9df37 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java @@ -22,6 +22,8 @@ package org.apache.curator.framework.recipes.shared; import com.google.common.base.Function; import com.google.common.base.Preconditions; import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.framework.api.CuratorWatcher; import org.apache.curator.framework.listen.ListenerContainer; import org.apache.curator.framework.state.ConnectionState; @@ -30,6 +32,7 @@ import org.apache.curator.utils.PathUtils; import org.apache.curator.utils.ThreadUtils; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,10 +62,10 @@ public class SharedValue implements Closeable, SharedValueReader @Override public void process(WatchedEvent event) throws Exception { -if ( state.get() == State.STARTED ) +if ( state.get() == State.STARTED && event.getType() != Watcher.Event.EventType.None ) { -readValue(); -notifyListeners(); +// don't block event thread in possible retry +readValueAndNotifyListenersInBackground(); } } }; @@ -248,6 +251,21 @@ public class SharedValue implements Closeable, SharedValueReader updateValue(localStat.getVersion(), bytes); } +private final BackgroundCallback upadateAndNotifyListenerCallback = new BackgroundCallback() { +@Override +public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { +if (event.getResultCode() == KeeperException.Code.OK.intValue()) { +updateValue(event.getStat().getVersion(), event.getData()); +notifyListeners(); +} +} +}; + +private void readValueAndNotifyListenersInBackground() throws Exception +{ + client.getData().usingWatcher(watcher).inBackground(upadateAndNotifyListenerCallback).forPath(path); +} + private void notifyListeners() { final byte[] localValue = getValue(); http://git-wip-us.apache.org/repos/asf/curator/blob/022de392/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java -- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java index a1f4d8c..7939f6e 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java @@ -23,7 +23,11 @@ import com.google.common.collect.Lists; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; +import
curator git commit: CURATOR-344 - do watch event processing tasks in the background and limit shared value watcher to valid change events to avoid work on disconnect
Repository: curator Updated Branches: refs/heads/CURATOR-344 [created] 022de3921 CURATOR-344 - do watch event processing tasks in the background and limit shared value watcher to valid change events to avoid work on disconnect Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/022de392 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/022de392 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/022de392 Branch: refs/heads/CURATOR-344 Commit: 022de3921a120c6f86cc6e21442327cc04b66cd2 Parents: ef33ccb Author: gtullyAuthored: Thu Aug 18 19:34:10 2016 +0100 Committer: gtully Committed: Tue Aug 30 13:09:56 2016 +0100 -- .../framework/recipes/shared/SharedValue.java | 24 - .../recipes/shared/TestSharedCount.java | 106 +++ 2 files changed, 127 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/curator/blob/022de392/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java -- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java index dddc471..1f9df37 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java @@ -22,6 +22,8 @@ package org.apache.curator.framework.recipes.shared; import com.google.common.base.Function; import com.google.common.base.Preconditions; import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.framework.api.CuratorWatcher; import org.apache.curator.framework.listen.ListenerContainer; import org.apache.curator.framework.state.ConnectionState; @@ -30,6 +32,7 @@ import org.apache.curator.utils.PathUtils; import org.apache.curator.utils.ThreadUtils; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,10 +62,10 @@ public class SharedValue implements Closeable, SharedValueReader @Override public void process(WatchedEvent event) throws Exception { -if ( state.get() == State.STARTED ) +if ( state.get() == State.STARTED && event.getType() != Watcher.Event.EventType.None ) { -readValue(); -notifyListeners(); +// don't block event thread in possible retry +readValueAndNotifyListenersInBackground(); } } }; @@ -248,6 +251,21 @@ public class SharedValue implements Closeable, SharedValueReader updateValue(localStat.getVersion(), bytes); } +private final BackgroundCallback upadateAndNotifyListenerCallback = new BackgroundCallback() { +@Override +public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { +if (event.getResultCode() == KeeperException.Code.OK.intValue()) { +updateValue(event.getStat().getVersion(), event.getData()); +notifyListeners(); +} +} +}; + +private void readValueAndNotifyListenersInBackground() throws Exception +{ + client.getData().usingWatcher(watcher).inBackground(upadateAndNotifyListenerCallback).forPath(path); +} + private void notifyListeners() { final byte[] localValue = getValue(); http://git-wip-us.apache.org/repos/asf/curator/blob/022de392/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java -- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java index a1f4d8c..7939f6e 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java @@ -23,7 +23,11 @@ import com.google.common.collect.Lists; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; +import