This is an automated email from the ASF dual-hosted git repository. vrozov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/apex-core.git
The following commit(s) were added to refs/heads/master by this push: new 436785b APEXCORE-810 Fixing race condition between publisher and subscriber teardowns 436785b is described below commit 436785bd63be0e90265cf8f8f18882647b8ecab0 Author: Pramod Immaneni <pra...@apache.org> AuthorDate: Wed Jan 17 13:48:32 2018 -0800 APEXCORE-810 Fixing race condition between publisher and subscriber teardowns --- .../bufferserver/internal/LogicalNode.java | 7 +-- .../datatorrent/bufferserver/server/Server.java | 62 +++++++++------------- 2 files changed, 27 insertions(+), 42 deletions(-) diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java index 3e8846d..af5db09 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java @@ -115,12 +115,7 @@ public class LogicalNode implements DataListener */ public void removeChannel(WriteOnlyClient client) { - for (PhysicalNode pn : physicalNodes) { - if (pn.getClient() == client) { - physicalNodes.remove(pn); - break; - } - } + physicalNodes.removeIf(node -> (node.getClient().equals(client))); } /** diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java index c5700f2..6332a18 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java @@ -24,9 +24,6 @@ import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.Map.Entry; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; @@ -117,9 +114,11 @@ public class Server extends AbstractServer @Override public void unregistered(SelectionKey key) { + logger.debug("Unregistered {}", this); for (LogicalNode ln : subscriberGroups.values()) { ln.boot(); } + super.unregistered(key); /* * There may be un-register tasks scheduled to run on the event loop that use serverHelperExecutor. */ @@ -860,41 +859,32 @@ public class Server extends AbstractServer } torndown = true; - /* - * if the publisher unregistered, all the downstream guys are going to be unregistered anyways - * in our world. So it makes sense to kick them out proactively. Otherwise these clients since - * are not being written to, just stick around till the next publisher shows up and eat into - * the data it's publishing for the new subscribers. - */ - - /** - * since the publisher server died, the queue which it was using would stop pumping the data unless - * a new publisher comes up with the same name. We leave it to the stream to decide when to bring up a new node - * with the same identifier as the one which just died. - */ - if (publisherChannels.containsValue(this)) { - final Iterator<Entry<String, AbstractLengthPrependerClient>> i = publisherChannels.entrySet().iterator(); - while (i.hasNext()) { - if (i.next().getValue() == this) { - i.remove(); - break; - } - } - } - - ArrayList<LogicalNode> list = new ArrayList<>(); - String publisherIdentifier = datalist.getIdentifier(); - Iterator<LogicalNode> iterator = subscriberGroups.values().iterator(); - while (iterator.hasNext()) { - LogicalNode ln = iterator.next(); - if (publisherIdentifier.equals(ln.getUpstream())) { - list.add(ln); + serverHelperExecutor.submit(() -> + { + /* + * if the publisher unregistered, all the downstream guys are going to be unregistered anyways + * in our world. So it makes sense to kick them out proactively. Otherwise these clients since + * are not being written to, just stick around till the next publisher shows up and eat into + * the data it's publishing for the new subscribers. + */ + + /** + * since the publisher server died, the queue which it was using would stop pumping the data unless + * a new publisher comes up with the same name. We leave it to the stream to decide when to bring up a new node + * with the same identifier as the one which just died. + */ + String publisherIdentifier = datalist.getIdentifier(); + if (!publisherChannels.remove(publisherIdentifier, Publisher.this)) { + logger.warn("{} could not be removed from channels", Publisher.this); } - } - for (LogicalNode ln : list) { - ln.boot(); - } + subscriberGroups.forEach((type, ln) -> { + if (publisherIdentifier.equals(ln.getUpstream())) { + logger.debug("Booting logical node {} from publisher", ln); + ln.boot(); + } + }); + }); } } -- To stop receiving notification emails like this one, please contact vro...@apache.org.