[08/16] geode git commit: GEODE-2757: Do not process netsearch reply from a departed node that membership listener already detected.
GEODE-2757: Do not process netsearch reply from a departed node that membership listener already detected. Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/d497d63a Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/d497d63a Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/d497d63a Branch: refs/heads/feature/GEODE-2485 Commit: d497d63af422b3b98c480698a9470812539f8a83 Parents: 799548e Author: eshuAuthored: Fri Apr 7 11:37:35 2017 -0700 Committer: eshu Committed: Fri Apr 7 11:37:35 2017 -0700 -- .../geode/internal/cache/DistributedRegion.java | 2 +- .../cache/SearchLoadAndWriteProcessor.java | 61 +++ .../cache/SearchLoadAndWriteProcessorTest.java | 102 ++- 3 files changed, 143 insertions(+), 22 deletions(-) -- http://git-wip-us.apache.org/repos/asf/geode/blob/d497d63a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java -- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java index fa02574..c12a652 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java @@ -2126,7 +2126,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA return this.distAdvisor; } - public final CacheDistributionAdvisor getCacheDistributionAdvisor() { + public CacheDistributionAdvisor getCacheDistributionAdvisor() { return this.distAdvisor; } http://git-wip-us.apache.org/repos/asf/geode/blob/d497d63a/geode-core/src/main/java/org/apache/geode/internal/cache/SearchLoadAndWriteProcessor.java -- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/SearchLoadAndWriteProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/SearchLoadAndWriteProcessor.java index 3d969f9..2a10792 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/SearchLoadAndWriteProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/SearchLoadAndWriteProcessor.java @@ -67,7 +67,7 @@ public class SearchLoadAndWriteProcessor implements MembershipListener { Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "search-retry-interval", 2000).longValue(); - private InternalDistributedMember selectedNode; + private volatile InternalDistributedMember selectedNode; private boolean selectedNodeDead = false; private int timeout; private boolean netSearchDone = false; @@ -108,6 +108,8 @@ public class SearchLoadAndWriteProcessor implements MembershipListener { private final Object membersLock = new Object(); + private ArrayList departedMembers; + private Lock lock = null; // if non-null, then needs to be unlocked in release static final int NETSEARCH = 0; @@ -221,6 +223,10 @@ public class SearchLoadAndWriteProcessor implements MembershipListener { } synchronized (this) { if (id.equals(selectedNode) && (this.requestInProgress) && (this.remoteGetInProgress)) { +if (departedMembers == null) { + departedMembers = new ArrayList(); +} +departedMembers.add(id); selectedNode = null; selectedNodeDead = true; computeRemainingTimeout(); @@ -231,8 +237,9 @@ public class SearchLoadAndWriteProcessor implements MembershipListener { notifyAll(); // signal the waiter; we are not done; but we need the waiter to call // sendNetSearchRequest } - if (responseQueue != null) + if (responseQueue != null) { responseQueue.remove(id); + } checkIfDone(); } } @@ -378,6 +385,10 @@ public class SearchLoadAndWriteProcessor implements MembershipListener { /** Package Methods **/ + InternalDistributedMember getSelectedNode() { +return this.selectedNode; + } + /** Private Methods **/ /** * Even though SearchLoadAndWriteProcessor may be in invoked in the context of a local region, @@ -495,25 +506,28 @@ public class SearchLoadAndWriteProcessor implements MembershipListener { synchronized (this.pendingResponders) { this.pendingResponders.clear(); } -this.requestInProgress = true; -this.remoteGetInProgress = true; + synchronized (this) { + this.requestInProgress = true; + this.remoteGetInProgress = true; setSelectedNode(replicate);
geode git commit: GEODE-2757: Do not process netsearch reply from a departed node that membership listener already detected.
Repository: geode Updated Branches: refs/heads/develop 799548ee4 -> d497d63af GEODE-2757: Do not process netsearch reply from a departed node that membership listener already detected. Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/d497d63a Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/d497d63a Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/d497d63a Branch: refs/heads/develop Commit: d497d63af422b3b98c480698a9470812539f8a83 Parents: 799548e Author: eshuAuthored: Fri Apr 7 11:37:35 2017 -0700 Committer: eshu Committed: Fri Apr 7 11:37:35 2017 -0700 -- .../geode/internal/cache/DistributedRegion.java | 2 +- .../cache/SearchLoadAndWriteProcessor.java | 61 +++ .../cache/SearchLoadAndWriteProcessorTest.java | 102 ++- 3 files changed, 143 insertions(+), 22 deletions(-) -- http://git-wip-us.apache.org/repos/asf/geode/blob/d497d63a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java -- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java index fa02574..c12a652 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java @@ -2126,7 +2126,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA return this.distAdvisor; } - public final CacheDistributionAdvisor getCacheDistributionAdvisor() { + public CacheDistributionAdvisor getCacheDistributionAdvisor() { return this.distAdvisor; } http://git-wip-us.apache.org/repos/asf/geode/blob/d497d63a/geode-core/src/main/java/org/apache/geode/internal/cache/SearchLoadAndWriteProcessor.java -- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/SearchLoadAndWriteProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/SearchLoadAndWriteProcessor.java index 3d969f9..2a10792 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/SearchLoadAndWriteProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/SearchLoadAndWriteProcessor.java @@ -67,7 +67,7 @@ public class SearchLoadAndWriteProcessor implements MembershipListener { Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "search-retry-interval", 2000).longValue(); - private InternalDistributedMember selectedNode; + private volatile InternalDistributedMember selectedNode; private boolean selectedNodeDead = false; private int timeout; private boolean netSearchDone = false; @@ -108,6 +108,8 @@ public class SearchLoadAndWriteProcessor implements MembershipListener { private final Object membersLock = new Object(); + private ArrayList departedMembers; + private Lock lock = null; // if non-null, then needs to be unlocked in release static final int NETSEARCH = 0; @@ -221,6 +223,10 @@ public class SearchLoadAndWriteProcessor implements MembershipListener { } synchronized (this) { if (id.equals(selectedNode) && (this.requestInProgress) && (this.remoteGetInProgress)) { +if (departedMembers == null) { + departedMembers = new ArrayList(); +} +departedMembers.add(id); selectedNode = null; selectedNodeDead = true; computeRemainingTimeout(); @@ -231,8 +237,9 @@ public class SearchLoadAndWriteProcessor implements MembershipListener { notifyAll(); // signal the waiter; we are not done; but we need the waiter to call // sendNetSearchRequest } - if (responseQueue != null) + if (responseQueue != null) { responseQueue.remove(id); + } checkIfDone(); } } @@ -378,6 +385,10 @@ public class SearchLoadAndWriteProcessor implements MembershipListener { /** Package Methods **/ + InternalDistributedMember getSelectedNode() { +return this.selectedNode; + } + /** Private Methods **/ /** * Even though SearchLoadAndWriteProcessor may be in invoked in the context of a local region, @@ -495,25 +506,28 @@ public class SearchLoadAndWriteProcessor implements MembershipListener { synchronized (this.pendingResponders) { this.pendingResponders.clear(); } -this.requestInProgress = true; -this.remoteGetInProgress = true; + synchronized (this) { + this.requestInProgress = true; +