Repository: incubator-ratis Updated Branches: refs/heads/master 0235de0ec -> 3e0ad68a0
RATIS-103. LeaderState.updateSenders may throws UnsupportedOperationException. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/3e0ad68a Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/3e0ad68a Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/3e0ad68a Branch: refs/heads/master Commit: 3e0ad68a0897c622b00e85af0b7c06ed1a80fc12 Parents: 0235de0 Author: Tsz-Wo Nicholas Sze <szets...@hortonworks.com> Authored: Sat Aug 12 15:28:54 2017 -0700 Committer: Tsz-Wo Nicholas Sze <szets...@hortonworks.com> Committed: Sat Aug 12 15:28:54 2017 -0700 ---------------------------------------------------------------------- .../apache/ratis/server/impl/LeaderState.java | 85 ++++++++++++-------- .../apache/ratis/server/impl/LogAppender.java | 3 +- 2 files changed, 54 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/3e0ad68a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java index de88382..313a3bb 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java @@ -34,7 +34,10 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Predicate; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.ratis.server.impl.LeaderState.StateUpdateEventType.*; @@ -68,6 +71,35 @@ public class LeaderState { } } + /** + * Use {@link CopyOnWriteArrayList} to implement a thread-safe list. + * Since each mutation induces a copy of the list, only bulk operations + * (addAll and removeAll) are supported. + */ + static class SenderList { + private final List<LogAppender> senders; + + SenderList(LogAppender[] senders) { + this.senders = new CopyOnWriteArrayList<>(senders); + } + + Stream<LogAppender> stream() { + return senders.stream(); + } + + void forEach(Consumer<LogAppender> action) { + senders.forEach(action); + } + + boolean addAll(Collection<LogAppender> c) { + return senders.addAll(c); + } + + boolean removeAll(Collection<LogAppender> c) { + return senders.removeAll(c); + } + } + static final StateUpdateEvent UPDATE_COMMIT_EVENT = new StateUpdateEvent(StateUpdateEventType.UPDATECOMMIT, -1); static final StateUpdateEvent STAGING_PROGRESS_EVENT = @@ -83,7 +115,7 @@ public class LeaderState { * The list of threads appending entries to followers. * The list is protected by the RaftServer's lock. */ - private final List<LogAppender> senders; + private final SenderList senders; private final BlockingQueue<StateUpdateEvent> eventQ; private final EventProcessor processor; private final PendingRequests pendingRequests; @@ -110,11 +142,11 @@ public class LeaderState { Collection<RaftPeer> others = conf.getOtherPeers(state.getSelfId()); final Timestamp t = new Timestamp().addTimeMs(-server.getMaxTimeoutMs()); placeHolderIndex = raftLog.getNextIndex(); - senders = new CopyOnWriteArrayList<LogAppender>(); - for (RaftPeer p : others) { - senders.add(server.newLogAppender(this, p, t, placeHolderIndex, true)); - } + senders = new SenderList(others.stream().map( + p -> server.newLogAppender(this, p, t, placeHolderIndex, true)) + .toArray(LogAppender[]::new)); + voterLists = divideFollowers(conf); } @@ -146,10 +178,7 @@ public class LeaderState { void stop() { this.running = false; // do not interrupt event processor since it may be in the middle of logSync - for (LogAppender sender : senders) { - sender.stopSender(); - sender.interrupt(); - } + senders.forEach(sender -> sender.stopSender().interrupt()); try { pendingRequests.sendNotLeaderResponses(); } catch (IOException e) { @@ -235,11 +264,18 @@ public class LeaderState { void addSenders(Collection<RaftPeer> newMembers) { final Timestamp t = new Timestamp().addTimeMs(-server.getMaxTimeoutMs()); final long nextIndex = raftLog.getNextIndex(); - for (RaftPeer peer : newMembers) { + + senders.addAll(newMembers.stream().map(peer -> { LogAppender sender = server.newLogAppender(this, peer, t, nextIndex, false); - senders.add(sender); sender.start(); - } + return sender; + }).collect(Collectors.toList())); + } + + void stopAndRemoveSenders(Predicate<LogAppender> predicate) { + final List<LogAppender> toStop = senders.stream().filter(predicate).collect(Collectors.toList()); + toStop.forEach(s -> s.stopSender().interrupt()); + senders.removeAll(toStop); } /** @@ -247,15 +283,7 @@ public class LeaderState { */ private void updateSenders(RaftConfiguration conf) { Preconditions.assertTrue(conf.isStable() && !inStagingState()); - Iterator<LogAppender> iterator = senders.iterator(); - while (iterator.hasNext()) { - LogAppender sender = iterator.next(); - if (!conf.containsInConf(sender.getFollower().getPeer().getId())) { - iterator.remove(); - sender.stopSender(); - sender.interrupt(); - } - } + stopAndRemoveSenders(s -> !conf.containsInConf(s.getFollower().getPeer().getId())); } void submitUpdateStateEvent(StateUpdateEvent event) { @@ -384,9 +412,7 @@ public class LeaderState { } else if (!reports.contains(BootStrapProgress.PROGRESSING)) { // all caught up! applyOldNewConf(); - for (LogAppender sender : senders) { - sender.getFollower().startAttendVote(); - } + senders.forEach(s -> s.getFollower().startAttendVote()); } } } @@ -573,15 +599,8 @@ public class LeaderState { } void fail() { - Iterator<LogAppender> iterator = senders.iterator(); - while (iterator.hasNext()) { - LogAppender sender = iterator.next(); - if (!sender.getFollower().isAttendingVote()) { - iterator.remove(); - sender.stopSender(); - sender.interrupt(); - } - } + stopAndRemoveSenders(s -> !s.getFollower().isAttendingVote()); + LeaderState.this.stagingState = null; // send back failure response to client's request pendingRequests.failSetConfiguration( http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/3e0ad68a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java index 36c6171..a5b0791 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java @@ -98,8 +98,9 @@ public class LogAppender extends Daemon { return sending; } - public void stopSender() { + public LogAppender stopSender() { this.sending = false; + return this; } public FollowerInfo getFollower() {