[GitHub] [zookeeper] symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network

2019-10-09 Thread GitBox
symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve 
resilience to network
URL: https://github.com/apache/zookeeper/pull/1048#discussion_r332900640
 
 

 ##
 File path: 
zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainMultiAddressTest.java
 ##
 @@ -0,0 +1,423 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import static org.junit.Assert.assertEquals;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.DummyWatcher;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.admin.ZooKeeperAdmin;
+import org.apache.zookeeper.test.ClientBase;
+import org.apache.zookeeper.test.ReconfigTest;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class QuorumPeerMainMultiAddressTest extends QuorumPeerTestBase {
+
+  private static final int FIRST_SERVER = 0;
+  private static final int SECOND_SERVER = 1;
+  private static final int THIRD_SERVER = 2;
+  private static final int FIRST_ADDRESS = 0;
+  private static final int SECOND_ADDRESS = 1;
+  private static final String UNREACHABLE_HOST = 
"invalid.hostname.unreachable.com";
+  private static final String IPV6_LOCALHOST = "[0:0:0:0:0:0:0:1]";
+
+  // IPv4 by default, change to IPV6_LOCALHOST to test with servers binding to 
IPv6
+  private String hostName = "127.0.0.1";
+
+  private int zNodeId = 0;
+
+  @Before
+  public void setUp() throws Exception {
+ClientBase.setupTestEnv();
+System.setProperty("zookeeper.DigestAuthenticationProvider.superDigest", 
"super:D/InIHSb7yEEbrWz8b9l71RjZJU="/* password is 'test'*/);
+QuorumPeerConfig.setReconfigEnabled(true);
+  }
+
+
+  @Test
+  public void shouldStartClusterWithMultipleAddresses() throws Exception {
+// we have three ZK servers, each server has two quorumPort and two 
electionPort registered
+QuorumConfigBuilder quorumConfig = new QuorumConfigBuilder(hostName, 3, 2);
+
+// we launch the three servers, each server having the same configuration
+QuorumConfigBuilder builderForServer1 = new 
QuorumConfigBuilder(quorumConfig);
+QuorumConfigBuilder builderForServer2 = new 
QuorumConfigBuilder(quorumConfig);
+QuorumConfigBuilder builderForServer3 = new 
QuorumConfigBuilder(quorumConfig);
+launchServers(Arrays.asList(builderForServer1, builderForServer2, 
builderForServer3));
+
+checkIfZooKeeperQuorumWorks(quorumConfig);
+  }
+
+
+  @Test
+  public void shouldStartClusterWithMultipleAddresses_IPv6() throws Exception {
+hostName = IPV6_LOCALHOST;
+
+shouldStartClusterWithMultipleAddresses();
+  }
+
+
+  @Test
+  public void shouldStartClusterWhenSomeAddressesAreUnreachable() throws 
Exception {
+// we have three ZK servers, each server has two quorumPort and two 
electionPort registered
+QuorumConfigBuilder quorumConfig = new QuorumConfigBuilder(hostName, 3, 2);
+
+// we launch the three servers
+// in the config of each server, we misconfigure one of the addresses for 
the other two servers
+QuorumConfigBuilder builderForServer1 = new 
QuorumConfigBuilder(quorumConfig)
+  .changeHostName(SECOND_SERVER, SECOND_ADDRESS, UNREACHABLE_HOST)
+  .changeHostName(THIRD_SERVER, SECOND_ADDRESS, UNREACHABLE_HOST);
+
+QuorumConfigBuilder builderForServer2 = new 
QuorumConfigBuilder(quorumConfig)
+  .changeHostName(FIRST_SERVER, SECOND_ADDRESS, UNREACHABLE_HOST)
+  .changeHostName(THIRD_SERVER, SECOND_ADDRESS, UNREACHABLE_HOST);
+
+QuorumConfigBuilder builderForServer3 = new 
QuorumConfigBuilder(quorumConfig)
+  .changeHostName(FIRST_SERVER, SECOND_ADDRESS, UNREACHABLE_HOST)
+  .changeHostName(SECOND_SERVER, SECOND_ADDRESS, UNREACHABLE_HOST);
+
+launchServers(Arrays.asList(builderForServer1, builderForServer2, 
builderForServer3));
+
+checkIfZooKeeperQuorumWorks(quorumConfig);
+  }
+
+

[GitHub] [zookeeper] symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network

2019-10-09 Thread GitBox
symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve 
resilience to network
URL: https://github.com/apache/zookeeper/pull/1048#discussion_r332900576
 
 

 ##
 File path: 
zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigTest.java
 ##
 @@ -1152,4 +1171,74 @@ private void 
assertRemotePeerMXBeanAttributes(QuorumServer qs, String beanName)
 getAddrPortFromBean(beanName, "QuorumAddress"));
 }
 
+
+/*
+ * A helper class to parse / compare server address config lines.
+ * Example: 
server.1=127.0.0.1:11228:11231|127.0.0.1:11230:11229:participant;0.0.0.0:11227
+ */
+private static class ServerConfigLine {
 
 Review comment:
   I know about this place in `QuorumPeer`: 
https://github.com/apache/zookeeper/blob/2dcb5e799ec02a2c6a6c7bad80c47169dc095271/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java#L240
   
   In my case I want to compare complete server config lines during the test 
for verification (without caring the syntactical correctness or order of the 
actual addresses), so what I need is a bit different.
   
   It might be nice to create a re-usable helper class for this, but also I 
don't mind if this logic gets duplicated in the test code. 
   
   Beside: even nicer would be to use some more extendable config format, like 
JSON. (but it is out of the scope here)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [zookeeper] symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network

2019-10-04 Thread GitBox
symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve 
resilience to network
URL: https://github.com/apache/zookeeper/pull/1048#discussion_r331468963
 
 

 ##
 File path: 
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
 ##
 @@ -418,66 +426,108 @@ public boolean isQuorumSynced(QuorumVerifier qv) {
 
 class LearnerCnxAcceptor extends ZooKeeperCriticalThread {
 
-private volatile boolean stop = false;
+private final AtomicBoolean stop = new AtomicBoolean(false);
+private final AtomicBoolean fail = new AtomicBoolean(false);
 
-public LearnerCnxAcceptor() {
-super("LearnerCnxAcceptor-" + ss.getLocalSocketAddress(), 
zk.getZooKeeperServerListener());
+LearnerCnxAcceptor() {
+super("LearnerCnxAcceptor-" + serverSockets.stream()
+  .map(ServerSocket::getLocalSocketAddress)
+  .map(Objects::toString)
+  .collect(Collectors.joining(",")),
+  zk.getZooKeeperServerListener());
 }
 
 @Override
 public void run() {
-try {
-while (!stop) {
-Socket s = null;
-boolean error = false;
-try {
-s = ss.accept();
-
-// start with the initLimit, once the ack is processed
-// in LearnerHandler switch to the syncLimit
-s.setSoTimeout(self.tickTime * self.initLimit);
-s.setTcpNoDelay(nodelay);
-
-BufferedInputStream is = new 
BufferedInputStream(s.getInputStream());
-LearnerHandler fh = new LearnerHandler(s, is, 
Leader.this);
-fh.start();
-} catch (SocketException e) {
-error = true;
-if (stop) {
-LOG.info("exception while shutting down acceptor: 
" + e);
-
-// When Leader.shutdown() calls ss.close(),
-// the call to accept throws an exception.
-// We catch and set stop to true.
-stop = true;
-} else {
-throw e;
-}
-} catch (SaslException e) {
-LOG.error("Exception while connecting to quorum 
learner", e);
-error = true;
-} catch (Exception e) {
-error = true;
+if (!stop.get() && !serverSockets.isEmpty()) {
+ExecutorService executor = 
Executors.newFixedThreadPool(serverSockets.size());
+CountDownLatch latch = new 
CountDownLatch(serverSockets.size());
+
+serverSockets.forEach(serverSocket ->
+executor.submit(new 
LearnerCnxAcceptorHandler(serverSocket, latch)));
+
+try {
+latch.await();
+} catch (InterruptedException ie) {
+LOG.error("Interrupted while sleeping. Ignoring 
exception", ie);
+} finally {
+closeSockets();
 
 Review comment:
   Yes, You are right, I will add the awaitTermination here (and also to the 
Learner class).
   I will provide some log messages if there would be any tasks alive after the 
1 second.
   
   (you are right that is is very unlikely in case of real use)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [zookeeper] symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network

2019-10-04 Thread GitBox
symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve 
resilience to network
URL: https://github.com/apache/zookeeper/pull/1048#discussion_r331463634
 
 

 ##
 File path: 
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/MultipleAddresses.java
 ##
 @@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.NoRouteToHostException;
+import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This class allows to store several quorum and electing addresses.
+ *
+ * See ZOOKEEPER-3188 for a discussion of this feature.
+ */
+public class MultipleAddresses {
+private static final int DEFAULT_TIMEOUT = 100;
+
+private Set addresses;
+private int timeout;
+
+public MultipleAddresses() {
+addresses = Collections.newSetFromMap(new ConcurrentHashMap<>());
+timeout = DEFAULT_TIMEOUT;
+}
+
+public MultipleAddresses(List addresses) {
+this(addresses, DEFAULT_TIMEOUT);
+}
+
+public MultipleAddresses(InetSocketAddress address) {
+this(address, DEFAULT_TIMEOUT);
+}
+
+public MultipleAddresses(List addresses, int timeout) {
+this.addresses = Collections.newSetFromMap(new ConcurrentHashMap<>());
+this.addresses.addAll(addresses);
+this.timeout = timeout;
+}
+
+public MultipleAddresses(InetSocketAddress address, int timeout) {
+addresses = Collections.newSetFromMap(new ConcurrentHashMap<>());
+addresses.add(address);
+this.timeout = timeout;
+}
+
+public int getTimeout() {
+return timeout;
+}
+
+public void setTimeout(int timeout) {
+this.timeout = timeout;
+}
+
+public boolean isEmpty() {
+return addresses.isEmpty();
+}
+
+/**
+ * Returns all addresses.
+ *
+ * @return set of all InetSocketAddress
+ */
+public Set getAllAddresses() {
+return Collections.unmodifiableSet(addresses);
+}
+
+/**
+ * Returns wildcard addresses for all ports
+ *
+ * @return set of InetSocketAddress with wildcards for all ports
+ */
+public Set getWildcardAddresses() {
+return addresses.stream().map(a -> new 
InetSocketAddress(a.getPort())).collect(Collectors.toSet());
+}
+
+/**
+ * Returns all ports
+ *
+ * @return list of all ports
+ */
+public List getAllPorts() {
+return 
addresses.stream().map(InetSocketAddress::getPort).distinct().collect(Collectors.toList());
+}
+
+/**
+ * Returns distinct list of all host strings
+ *
+ * @return list of all hosts
+ */
+public List getAllHostStrings() {
+return 
addresses.stream().map(InetSocketAddress::getHostString).distinct().collect(Collectors.toList());
+}
+
+public void addAddress(InetSocketAddress address) {
+addresses.add(address);
+}
+
+/**
+ * Returns reachable address. If none is reachable than throws exception.
+ *
+ * @return address which is reachable.
+ * @throws NoRouteToHostException if none address is reachable
+ */
+public InetSocketAddress getReachableAddress() throws 
NoRouteToHostException {
+AtomicReference address = new 
AtomicReference<>(null);
+getInetSocketAddressStream().forEach(addr -> 
checkIfAddressIsReachableAndSet(addr, address));
+
+if (address.get() != null) {
+return address.get();
+} else {
+throw new NoRouteToHostException("No valid address among " + 
addresses);
+}
+}
+
+/**
+ * Returns reachable address or first one, if none is reachable.
+ *
+ * @return address which is reachable or fist one.
+ */
+public InetSocketAddress getReachableOrOne() {
+InetSocketAddress address;
+  

[GitHub] [zookeeper] symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network

2019-10-04 Thread GitBox
symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve 
resilience to network
URL: https://github.com/apache/zookeeper/pull/1048#discussion_r331445109
 
 

 ##
 File path: 
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/MultipleAddresses.java
 ##
 @@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.NoRouteToHostException;
+import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This class allows to store several quorum and electing addresses.
+ *
+ * See ZOOKEEPER-3188 for a discussion of this feature.
+ */
+public class MultipleAddresses {
+private static final int DEFAULT_TIMEOUT = 100;
+
+private Set addresses;
+private int timeout;
+
+public MultipleAddresses() {
+addresses = Collections.newSetFromMap(new ConcurrentHashMap<>());
+timeout = DEFAULT_TIMEOUT;
+}
+
+public MultipleAddresses(List addresses) {
+this(addresses, DEFAULT_TIMEOUT);
+}
+
+public MultipleAddresses(InetSocketAddress address) {
+this(address, DEFAULT_TIMEOUT);
+}
+
+public MultipleAddresses(List addresses, int timeout) {
+this.addresses = Collections.newSetFromMap(new ConcurrentHashMap<>());
+this.addresses.addAll(addresses);
+this.timeout = timeout;
+}
+
+public MultipleAddresses(InetSocketAddress address, int timeout) {
+addresses = Collections.newSetFromMap(new ConcurrentHashMap<>());
+addresses.add(address);
+this.timeout = timeout;
+}
+
+public int getTimeout() {
+return timeout;
+}
+
+public void setTimeout(int timeout) {
+this.timeout = timeout;
+}
+
+public boolean isEmpty() {
+return addresses.isEmpty();
+}
+
+/**
+ * Returns all addresses.
+ *
+ * @return set of all InetSocketAddress
+ */
+public Set getAllAddresses() {
+return Collections.unmodifiableSet(addresses);
+}
+
+/**
+ * Returns wildcard addresses for all ports
+ *
+ * @return set of InetSocketAddress with wildcards for all ports
+ */
+public Set getWildcardAddresses() {
+return addresses.stream().map(a -> new 
InetSocketAddress(a.getPort())).collect(Collectors.toSet());
+}
+
+/**
+ * Returns all ports
+ *
+ * @return list of all ports
+ */
+public List getAllPorts() {
+return 
addresses.stream().map(InetSocketAddress::getPort).distinct().collect(Collectors.toList());
+}
+
+/**
+ * Returns distinct list of all host strings
+ *
+ * @return list of all hosts
+ */
+public List getAllHostStrings() {
+return 
addresses.stream().map(InetSocketAddress::getHostString).distinct().collect(Collectors.toList());
+}
+
+public void addAddress(InetSocketAddress address) {
+addresses.add(address);
+}
+
+/**
+ * Returns reachable address. If none is reachable than throws exception.
+ *
+ * @return address which is reachable.
+ * @throws NoRouteToHostException if none address is reachable
+ */
+public InetSocketAddress getReachableAddress() throws 
NoRouteToHostException {
+AtomicReference address = new 
AtomicReference<>(null);
+getInetSocketAddressStream().forEach(addr -> 
checkIfAddressIsReachableAndSet(addr, address));
+
+if (address.get() != null) {
+return address.get();
+} else {
+throw new NoRouteToHostException("No valid address among " + 
addresses);
+}
+}
+
+/**
+ * Returns reachable address or first one, if none is reachable.
+ *
+ * @return address which is reachable or fist one.
+ */
+public InetSocketAddress getReachableOrOne() {
+InetSocketAddress address;
+  

[GitHub] [zookeeper] symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network

2019-10-04 Thread GitBox
symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve 
resilience to network
URL: https://github.com/apache/zookeeper/pull/1048#discussion_r331445109
 
 

 ##
 File path: 
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/MultipleAddresses.java
 ##
 @@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.NoRouteToHostException;
+import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This class allows to store several quorum and electing addresses.
+ *
+ * See ZOOKEEPER-3188 for a discussion of this feature.
+ */
+public class MultipleAddresses {
+private static final int DEFAULT_TIMEOUT = 100;
+
+private Set addresses;
+private int timeout;
+
+public MultipleAddresses() {
+addresses = Collections.newSetFromMap(new ConcurrentHashMap<>());
+timeout = DEFAULT_TIMEOUT;
+}
+
+public MultipleAddresses(List addresses) {
+this(addresses, DEFAULT_TIMEOUT);
+}
+
+public MultipleAddresses(InetSocketAddress address) {
+this(address, DEFAULT_TIMEOUT);
+}
+
+public MultipleAddresses(List addresses, int timeout) {
+this.addresses = Collections.newSetFromMap(new ConcurrentHashMap<>());
+this.addresses.addAll(addresses);
+this.timeout = timeout;
+}
+
+public MultipleAddresses(InetSocketAddress address, int timeout) {
+addresses = Collections.newSetFromMap(new ConcurrentHashMap<>());
+addresses.add(address);
+this.timeout = timeout;
+}
+
+public int getTimeout() {
+return timeout;
+}
+
+public void setTimeout(int timeout) {
+this.timeout = timeout;
+}
+
+public boolean isEmpty() {
+return addresses.isEmpty();
+}
+
+/**
+ * Returns all addresses.
+ *
+ * @return set of all InetSocketAddress
+ */
+public Set getAllAddresses() {
+return Collections.unmodifiableSet(addresses);
+}
+
+/**
+ * Returns wildcard addresses for all ports
+ *
+ * @return set of InetSocketAddress with wildcards for all ports
+ */
+public Set getWildcardAddresses() {
+return addresses.stream().map(a -> new 
InetSocketAddress(a.getPort())).collect(Collectors.toSet());
+}
+
+/**
+ * Returns all ports
+ *
+ * @return list of all ports
+ */
+public List getAllPorts() {
+return 
addresses.stream().map(InetSocketAddress::getPort).distinct().collect(Collectors.toList());
+}
+
+/**
+ * Returns distinct list of all host strings
+ *
+ * @return list of all hosts
+ */
+public List getAllHostStrings() {
+return 
addresses.stream().map(InetSocketAddress::getHostString).distinct().collect(Collectors.toList());
+}
+
+public void addAddress(InetSocketAddress address) {
+addresses.add(address);
+}
+
+/**
+ * Returns reachable address. If none is reachable than throws exception.
+ *
+ * @return address which is reachable.
+ * @throws NoRouteToHostException if none address is reachable
+ */
+public InetSocketAddress getReachableAddress() throws 
NoRouteToHostException {
+AtomicReference address = new 
AtomicReference<>(null);
+getInetSocketAddressStream().forEach(addr -> 
checkIfAddressIsReachableAndSet(addr, address));
+
+if (address.get() != null) {
+return address.get();
+} else {
+throw new NoRouteToHostException("No valid address among " + 
addresses);
+}
+}
+
+/**
+ * Returns reachable address or first one, if none is reachable.
+ *
+ * @return address which is reachable or fist one.
+ */
+public InetSocketAddress getReachableOrOne() {
+InetSocketAddress address;
+  

[GitHub] [zookeeper] symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network

2019-10-04 Thread GitBox
symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve 
resilience to network
URL: https://github.com/apache/zookeeper/pull/1048#discussion_r331433800
 
 

 ##
 File path: 
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/MultipleAddresses.java
 ##
 @@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.NoRouteToHostException;
+import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This class allows to store several quorum and electing addresses.
+ *
+ * See ZOOKEEPER-3188 for a discussion of this feature.
+ */
+public class MultipleAddresses {
+private static final int DEFAULT_TIMEOUT = 100;
+
+private Set addresses;
+private int timeout;
+
+public MultipleAddresses() {
+addresses = Collections.newSetFromMap(new ConcurrentHashMap<>());
+timeout = DEFAULT_TIMEOUT;
+}
+
+public MultipleAddresses(List addresses) {
+this(addresses, DEFAULT_TIMEOUT);
+}
+
+public MultipleAddresses(InetSocketAddress address) {
+this(address, DEFAULT_TIMEOUT);
+}
+
+public MultipleAddresses(List addresses, int timeout) {
+this.addresses = Collections.newSetFromMap(new ConcurrentHashMap<>());
+this.addresses.addAll(addresses);
+this.timeout = timeout;
+}
+
+public MultipleAddresses(InetSocketAddress address, int timeout) {
+addresses = Collections.newSetFromMap(new ConcurrentHashMap<>());
+addresses.add(address);
+this.timeout = timeout;
+}
+
+public int getTimeout() {
+return timeout;
+}
+
+public void setTimeout(int timeout) {
+this.timeout = timeout;
+}
+
+public boolean isEmpty() {
+return addresses.isEmpty();
+}
+
+/**
+ * Returns all addresses.
+ *
+ * @return set of all InetSocketAddress
+ */
+public Set getAllAddresses() {
+return Collections.unmodifiableSet(addresses);
+}
+
+/**
+ * Returns wildcard addresses for all ports
+ *
+ * @return set of InetSocketAddress with wildcards for all ports
+ */
+public Set getWildcardAddresses() {
+return addresses.stream().map(a -> new 
InetSocketAddress(a.getPort())).collect(Collectors.toSet());
+}
+
+/**
+ * Returns all ports
+ *
+ * @return list of all ports
+ */
+public List getAllPorts() {
+return 
addresses.stream().map(InetSocketAddress::getPort).distinct().collect(Collectors.toList());
+}
+
+/**
+ * Returns distinct list of all host strings
+ *
+ * @return list of all hosts
+ */
+public List getAllHostStrings() {
+return 
addresses.stream().map(InetSocketAddress::getHostString).distinct().collect(Collectors.toList());
+}
+
+public void addAddress(InetSocketAddress address) {
+addresses.add(address);
+}
+
+/**
+ * Returns reachable address. If none is reachable than throws exception.
+ *
+ * @return address which is reachable.
+ * @throws NoRouteToHostException if none address is reachable
+ */
+public InetSocketAddress getReachableAddress() throws 
NoRouteToHostException {
+AtomicReference address = new 
AtomicReference<>(null);
+getInetSocketAddressStream().forEach(addr -> 
checkIfAddressIsReachableAndSet(addr, address));
+
+if (address.get() != null) {
+return address.get();
+} else {
+throw new NoRouteToHostException("No valid address among " + 
addresses);
+}
+}
+
+/**
+ * Returns reachable address or first one, if none is reachable.
+ *
+ * @return address which is reachable or fist one.
+ */
+public InetSocketAddress getReachableOrOne() {
+InetSocketAddress address;
+  

[GitHub] [zookeeper] symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network

2019-10-04 Thread GitBox
symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve 
resilience to network
URL: https://github.com/apache/zookeeper/pull/1048#discussion_r331414154
 
 

 ##
 File path: 
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/MultipleAddresses.java
 ##
 @@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.NoRouteToHostException;
+import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This class allows to store several quorum and electing addresses.
+ *
+ * See ZOOKEEPER-3188 for a discussion of this feature.
+ */
+public class MultipleAddresses {
+private static final int DEFAULT_TIMEOUT = 100;
+
+private Set addresses;
+private int timeout;
+
+public MultipleAddresses() {
+addresses = Collections.newSetFromMap(new ConcurrentHashMap<>());
+timeout = DEFAULT_TIMEOUT;
+}
+
+public MultipleAddresses(List addresses) {
+this(addresses, DEFAULT_TIMEOUT);
+}
+
+public MultipleAddresses(InetSocketAddress address) {
+this(address, DEFAULT_TIMEOUT);
+}
+
+public MultipleAddresses(List addresses, int timeout) {
+this.addresses = Collections.newSetFromMap(new ConcurrentHashMap<>());
+this.addresses.addAll(addresses);
+this.timeout = timeout;
+}
+
+public MultipleAddresses(InetSocketAddress address, int timeout) {
+addresses = Collections.newSetFromMap(new ConcurrentHashMap<>());
+addresses.add(address);
+this.timeout = timeout;
+}
+
+public int getTimeout() {
+return timeout;
+}
+
+public void setTimeout(int timeout) {
+this.timeout = timeout;
+}
+
+public boolean isEmpty() {
+return addresses.isEmpty();
+}
+
+/**
+ * Returns all addresses.
+ *
+ * @return set of all InetSocketAddress
+ */
+public Set getAllAddresses() {
+return Collections.unmodifiableSet(addresses);
+}
+
+/**
+ * Returns wildcard addresses for all ports
+ *
+ * @return set of InetSocketAddress with wildcards for all ports
+ */
+public Set getWildcardAddresses() {
+return addresses.stream().map(a -> new 
InetSocketAddress(a.getPort())).collect(Collectors.toSet());
+}
+
+/**
+ * Returns all ports
+ *
+ * @return list of all ports
+ */
+public List getAllPorts() {
+return 
addresses.stream().map(InetSocketAddress::getPort).distinct().collect(Collectors.toList());
+}
+
+/**
+ * Returns distinct list of all host strings
+ *
+ * @return list of all hosts
+ */
+public List getAllHostStrings() {
+return 
addresses.stream().map(InetSocketAddress::getHostString).distinct().collect(Collectors.toList());
+}
+
+public void addAddress(InetSocketAddress address) {
+addresses.add(address);
+}
+
+/**
+ * Returns reachable address. If none is reachable than throws exception.
+ *
+ * @return address which is reachable.
+ * @throws NoRouteToHostException if none address is reachable
+ */
+public InetSocketAddress getReachableAddress() throws 
NoRouteToHostException {
+AtomicReference address = new 
AtomicReference<>(null);
+getInetSocketAddressStream().forEach(addr -> 
checkIfAddressIsReachableAndSet(addr, address));
+
+if (address.get() != null) {
+return address.get();
+} else {
+throw new NoRouteToHostException("No valid address among " + 
addresses);
+}
+}
+
+/**
+ * Returns reachable address or first one, if none is reachable.
+ *
+ * @return address which is reachable or fist one.
+ */
+public InetSocketAddress getReachableOrOne() {
+InetSocketAddress address;
+  

[GitHub] [zookeeper] symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network

2019-10-04 Thread GitBox
symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve 
resilience to network
URL: https://github.com/apache/zookeeper/pull/1048#discussion_r331407319
 
 

 ##
 File path: 
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/MultipleAddresses.java
 ##
 @@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.NoRouteToHostException;
+import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This class allows to store several quorum and electing addresses.
+ *
+ * See ZOOKEEPER-3188 for a discussion of this feature.
+ */
+public class MultipleAddresses {
 
 Review comment:
   thx


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [zookeeper] symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network

2019-10-04 Thread GitBox
symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve 
resilience to network
URL: https://github.com/apache/zookeeper/pull/1048#discussion_r331406937
 
 

 ##
 File path: 
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
 ##
 @@ -255,65 +260,114 @@ protected void sockConnect(Socket sock, 
InetSocketAddress addr, int timeout) thr
  * @throws X509Exception
  * @throws InterruptedException
  */
-protected void connectToLeader(InetSocketAddress addr, String hostname) 
throws IOException, InterruptedException, X509Exception {
-this.sock = createSocket();
+protected void connectToLeader(MultipleAddresses addr, String hostname)
+throws IOException, InterruptedException {
+
 this.leaderAddr = addr;
+Set addresses = addr.getAllAddresses();
+ExecutorService executor = 
Executors.newFixedThreadPool(addresses.size());
 
 Review comment:
   thanks, I will do it


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [zookeeper] symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network

2019-10-04 Thread GitBox
symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve 
resilience to network
URL: https://github.com/apache/zookeeper/pull/1048#discussion_r331403117
 
 

 ##
 File path: 
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
 ##
 @@ -418,66 +426,108 @@ public boolean isQuorumSynced(QuorumVerifier qv) {
 
 class LearnerCnxAcceptor extends ZooKeeperCriticalThread {
 
-private volatile boolean stop = false;
+private final AtomicBoolean stop = new AtomicBoolean(false);
+private final AtomicBoolean fail = new AtomicBoolean(false);
 
-public LearnerCnxAcceptor() {
-super("LearnerCnxAcceptor-" + ss.getLocalSocketAddress(), 
zk.getZooKeeperServerListener());
+LearnerCnxAcceptor() {
+super("LearnerCnxAcceptor-" + serverSockets.stream()
+  .map(ServerSocket::getLocalSocketAddress)
+  .map(Objects::toString)
+  .collect(Collectors.joining(",")),
+  zk.getZooKeeperServerListener());
 }
 
 @Override
 public void run() {
-try {
-while (!stop) {
-Socket s = null;
-boolean error = false;
-try {
-s = ss.accept();
-
-// start with the initLimit, once the ack is processed
-// in LearnerHandler switch to the syncLimit
-s.setSoTimeout(self.tickTime * self.initLimit);
-s.setTcpNoDelay(nodelay);
-
-BufferedInputStream is = new 
BufferedInputStream(s.getInputStream());
-LearnerHandler fh = new LearnerHandler(s, is, 
Leader.this);
-fh.start();
-} catch (SocketException e) {
-error = true;
-if (stop) {
-LOG.info("exception while shutting down acceptor: 
" + e);
-
-// When Leader.shutdown() calls ss.close(),
-// the call to accept throws an exception.
-// We catch and set stop to true.
-stop = true;
-} else {
-throw e;
-}
-} catch (SaslException e) {
-LOG.error("Exception while connecting to quorum 
learner", e);
-error = true;
-} catch (Exception e) {
-error = true;
+if (!stop.get() && !serverSockets.isEmpty()) {
+ExecutorService executor = 
Executors.newFixedThreadPool(serverSockets.size());
+CountDownLatch latch = new 
CountDownLatch(serverSockets.size());
+
+serverSockets.forEach(serverSocket ->
+executor.submit(new 
LearnerCnxAcceptorHandler(serverSocket, latch)));
+
+try {
+latch.await();
+} catch (InterruptedException ie) {
+LOG.error("Interrupted while sleeping. Ignoring 
exception", ie);
+} finally {
+closeSockets();
 
 Review comment:
   thanks, nice catch. I will use executor.shutdownNow() to terminate the tasks 
immediately.
   
   I was thinking on adding a timeout first to wait for the tasks to shutdown 
gracefully in case of an interrupted exception, but I decided against it. At 
this point we are in a phase when all the tasks are finished (or will be 
finished anyway because of closing the sockets). This point I think it is 
better to finish everything quickly so that a new leader election can took 
place.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [zookeeper] symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network

2019-10-04 Thread GitBox
symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve 
resilience to network
URL: https://github.com/apache/zookeeper/pull/1048#discussion_r331403263
 
 

 ##
 File path: 
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
 ##
 @@ -418,66 +426,108 @@ public boolean isQuorumSynced(QuorumVerifier qv) {
 
 class LearnerCnxAcceptor extends ZooKeeperCriticalThread {
 
-private volatile boolean stop = false;
+private final AtomicBoolean stop = new AtomicBoolean(false);
+private final AtomicBoolean fail = new AtomicBoolean(false);
 
-public LearnerCnxAcceptor() {
-super("LearnerCnxAcceptor-" + ss.getLocalSocketAddress(), 
zk.getZooKeeperServerListener());
+LearnerCnxAcceptor() {
+super("LearnerCnxAcceptor-" + serverSockets.stream()
+  .map(ServerSocket::getLocalSocketAddress)
+  .map(Objects::toString)
+  .collect(Collectors.joining(",")),
+  zk.getZooKeeperServerListener());
 }
 
 @Override
 public void run() {
-try {
-while (!stop) {
-Socket s = null;
-boolean error = false;
-try {
-s = ss.accept();
-
-// start with the initLimit, once the ack is processed
-// in LearnerHandler switch to the syncLimit
-s.setSoTimeout(self.tickTime * self.initLimit);
-s.setTcpNoDelay(nodelay);
-
-BufferedInputStream is = new 
BufferedInputStream(s.getInputStream());
-LearnerHandler fh = new LearnerHandler(s, is, 
Leader.this);
-fh.start();
-} catch (SocketException e) {
-error = true;
-if (stop) {
-LOG.info("exception while shutting down acceptor: 
" + e);
-
-// When Leader.shutdown() calls ss.close(),
-// the call to accept throws an exception.
-// We catch and set stop to true.
-stop = true;
-} else {
-throw e;
-}
-} catch (SaslException e) {
-LOG.error("Exception while connecting to quorum 
learner", e);
-error = true;
-} catch (Exception e) {
-error = true;
+if (!stop.get() && !serverSockets.isEmpty()) {
+ExecutorService executor = 
Executors.newFixedThreadPool(serverSockets.size());
+CountDownLatch latch = new 
CountDownLatch(serverSockets.size());
+
+serverSockets.forEach(serverSocket ->
+executor.submit(new 
LearnerCnxAcceptorHandler(serverSocket, latch)));
+
+try {
+latch.await();
+} catch (InterruptedException ie) {
+LOG.error("Interrupted while sleeping. Ignoring 
exception", ie);
+} finally {
+closeSockets();
+}
+}
+}
+
+public void halt() {
+stop.set(true);
+closeSockets();
+}
+
+class LearnerCnxAcceptorHandler implements Runnable {
+private ServerSocket serverSocket;
+private CountDownLatch latch;
+
+LearnerCnxAcceptorHandler(ServerSocket serverSocket, 
CountDownLatch latch) {
+this.serverSocket = serverSocket;
+this.latch = latch;
+}
+
+@Override
+public void run() {
+try {
+
Thread.currentThread().setName("LearnerCnxAcceptorHandler-" + 
serverSocket.getLocalSocketAddress());
+
+while (!stop.get()) {
+acceptConnections();
+}
+} catch (Exception e) {
+LOG.warn("Exception while accepting follower", e);
+if (fail.compareAndSet(false, true)) {
+handleException(getName(), e);
+halt();
+}
+} finally {
+latch.countDown();
+}
+}
+
+private void acceptConnections() throws IOException {
+Socket socket = null;
+boolean error = false;
+try {
+socket = serverSocket.accept();
+
+// start with the initLimit, once the ack is processed
+// in LearnerHandler switch to the syncLimit
+socket.setSoTimeout(self.tickTime * self.initLimit);
+  

[GitHub] [zookeeper] symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network

2019-08-23 Thread GitBox
symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve 
resilience to network
URL: https://github.com/apache/zookeeper/pull/1048#discussion_r317189846
 
 

 ##
 File path: 
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
 ##
 @@ -417,71 +427,111 @@ public boolean isQuorumSynced(QuorumVerifier qv) {
 protected final Proposal newLeaderProposal = new Proposal();
 
 class LearnerCnxAcceptor extends ZooKeeperCriticalThread {
-private volatile boolean stop = false;
+private final AtomicBoolean stop = new AtomicBoolean(false);
+private final AtomicBoolean fail = new AtomicBoolean(false);
 
-public LearnerCnxAcceptor() {
-super("LearnerCnxAcceptor-" + ss.getLocalSocketAddress(), zk
-.getZooKeeperServerListener());
+LearnerCnxAcceptor() {
+super("LearnerCnxAcceptor-" + serverSockets.stream()
+  .map(ServerSocket::getLocalSocketAddress)
+  .map(Objects::toString)
+  .collect(Collectors.joining(",")),
+  zk.getZooKeeperServerListener());
 }
 
 @Override
 public void run() {
-try {
-while (!stop) {
-Socket s = null;
-boolean error = false;
-try {
-s = ss.accept();
-
-// start with the initLimit, once the ack is processed
-// in LearnerHandler switch to the syncLimit
-s.setSoTimeout(self.tickTime * self.initLimit);
-s.setTcpNoDelay(nodelay);
-
-BufferedInputStream is = new BufferedInputStream(
-s.getInputStream());
-LearnerHandler fh = new LearnerHandler(s, is,
-Leader.this);
-fh.start();
-} catch (SocketException e) {
-error = true;
-if (stop) {
-LOG.info("exception while shutting down acceptor: "
-+ e);
-
-// When Leader.shutdown() calls ss.close(),
-// the call to accept throws an exception.
-// We catch and set stop to true.
-stop = true;
-} else {
-throw e;
-}
-} catch (SaslException e){
-LOG.error("Exception while connecting to quorum 
learner", e);
-error = true;
-} catch (Exception e) {
-error = true;
+if (!stop.get() && !serverSockets.isEmpty()) {
+ExecutorService executor = 
Executors.newFixedThreadPool(serverSockets.size());
+CountDownLatch latch = new 
CountDownLatch(serverSockets.size());
+
+serverSockets.forEach(serverSocket ->
+executor.submit(new 
LearnerCnxAcceptorHandler(serverSocket, latch)));
+
+try {
+latch.await();
 
 Review comment:
   The code basically now starting listener threads on all local addresses, 
then waiting until all the listeners are dead, and then simply start over again 
(assuming no stop was requested by Leader.shutdown() or no unexpected failure 
happened in the Listener threads). 
   
   I don't think we need a timeout here... ideally we want to wait forever :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [zookeeper] symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network

2019-08-23 Thread GitBox
symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve 
resilience to network
URL: https://github.com/apache/zookeeper/pull/1048#discussion_r317167254
 
 

 ##
 File path: 
zookeeper-server/src/main/java/org/apache/zookeeper/server/ObserverBean.java
 ##
 @@ -49,10 +50,11 @@ public String getQuorumAddress() {
 
 public String getLearnerMaster() {
 QuorumPeer.QuorumServer learnerMaster = 
observer.getCurrentLearnerMaster();
-if (learnerMaster == null || learnerMaster.addr == null) {
+InetSocketAddress address = learnerMaster.addr.getReachableOrOne();
+if (learnerMaster == null || address == null) {
 
 Review comment:
   thanks, nice catch!
   (actually beside this, we also got a NoSuchElementException if 
learnerMaster.addr is empty)
   I will fix this in the next commit


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [zookeeper] symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network

2019-08-14 Thread GitBox
symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve 
resilience to network
URL: https://github.com/apache/zookeeper/pull/1048#discussion_r313774702
 
 

 ##
 File path: 
zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java
 ##
 @@ -657,11 +647,31 @@ public CommandResponse run(ZooKeeperServer zkServer, 
Map kwargs)
 TreeMap::new));
 }
 
+private String getMultiAddressString(QuorumPeer.QuorumServer qs) {
+return qs.addr.getAllAddresses().stream()
+.map(address -> getSingleAddressString(qs, address))
+.collect(Collectors.joining(","));
+}
+
+private String getSingleAddressString(QuorumPeer.QuorumServer qs, 
InetSocketAddress address) {
 
 Review comment:
   I changed the format, this is how the `voting_view` admin command responds 
now:
   ```
   {
 "current_config" : {
   "1" : {
 "server_addresses" : [ "/172.16.101.11:2888", "/172.16.102.11:2888" ],
 "election_addresses" : [ "/172.16.101.11:3888", "/172.16.102.11:3888" 
],
 "client_address" : "/0.0.0.0:2181",
 "learner_type" : "participant"
   },
   "2" : {
 "server_addresses" : [ "/172.16.101.22:2888", "/172.16.102.22:2888" ],
 "election_addresses" : [ "/172.16.101.22:3888", "/172.16.102.22:3888" 
],
 "client_address" : "/0.0.0.0:2181",
 "learner_type" : "participant"
   },
   "3" : {
 "server_addresses" : [ "/172.16.101.33:2888", "/172.16.102.33:2888" ],
 "election_addresses" : [ "/172.16.101.33:3888", "/172.16.102.33:3888" 
],
 "client_address" : "/0.0.0.0:2181",
 "learner_type" : "participant"
   }
 },
 "command" : "voting_view",
 "error" : null
   }
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [zookeeper] symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network

2019-08-14 Thread GitBox
symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve 
resilience to network
URL: https://github.com/apache/zookeeper/pull/1048#discussion_r313738065
 
 

 ##
 File path: 
zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java
 ##
 @@ -657,11 +647,31 @@ public CommandResponse run(ZooKeeperServer zkServer, 
Map kwargs)
 TreeMap::new));
 }
 
+private String getMultiAddressString(QuorumPeer.QuorumServer qs) {
+return qs.addr.getAllAddresses().stream()
+.map(address -> getSingleAddressString(qs, address))
+.collect(Collectors.joining(","));
+}
+
+private String getSingleAddressString(QuorumPeer.QuorumServer qs, 
InetSocketAddress address) {
 
 Review comment:
   Yes, I agree, I will change it. 
   
   This admin command is about to show the internal view of the voting members 
(how the zookeeper server thinks who the voting members are and where do they 
listen). I wouldn't complicate this PR any further, but it might be a good idea 
to create a follow-up ticket to have some admin command showing if the given 
server can actually reach all the different ports of the other servers (and not 
only the voting members). It can help debugging network problems, showing if 
certain network interfaces on some servers are unreachable. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [zookeeper] symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network

2019-08-14 Thread GitBox
symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve 
resilience to network
URL: https://github.com/apache/zookeeper/pull/1048#discussion_r313820217
 
 

 ##
 File path: 
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
 ##
 @@ -228,28 +240,33 @@ static public InitialMessage parse(Long protocolVersion, 
DataInputStream din)
 num_read, remaining, sid);
 }
 
-String addr = new String(b);
-String[] host_port;
-try {
-host_port = ConfigUtils.getHostAndPort(addr);
-} catch (ConfigException e) {
-throw new InitialMessageException("Badly formed address: %s", 
addr);
-}
+String[] addressStrings = new String(b).split(",");
 
 Review comment:
   referring to our second offline discussion:
   - I incremented the PROTOCOL_VERSION
   - did not do the JSON format modification, as it makes the future code 
complicated to always ensure both forward and backward compatibility of the 
election protocol during the rolling upgrades. The current solution (simply 
failing if the protocol versions mismatch) is more simple and still working 
just fine: as the servers are restarted one-by-one, the nodes with the old 
protocol version and the nodes with the new protocol version will form two 
partitions, but any given time only one partition will have the quorum.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [zookeeper] symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network

2019-08-14 Thread GitBox
symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve 
resilience to network
URL: https://github.com/apache/zookeeper/pull/1048#discussion_r313776919
 
 

 ##
 File path: 
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java
 ##
 @@ -271,39 +278,42 @@ public boolean isQuorumSynced(QuorumVerifier qv) {
return qv.containsQuorum(ids);
 }
 
-private final ServerSocket ss;
+private final List serverSockets = new LinkedList<>();
 
 Leader(QuorumPeer self,LeaderZooKeeperServer zk) throws IOException {
 this.self = self;
 this.proposalStats = new BufferStats();
+
+Set addresses;
+if (self.getQuorumListenOnAllIPs()) {
+addresses = self.getQuorumAddress().getWildcardAddresses();
+} else {
+addresses = self.getQuorumAddress().getAllAddresses();
+}
+
+for (InetSocketAddress address : addresses) {
+serverSockets.add(createServerSocket(address, 
self.shouldUsePortUnification(), self.isSslQuorum()));
+}
+
+this.zk = zk;
+}
+
+ServerSocket createServerSocket(InetSocketAddress address, boolean 
portUnification, boolean sslQuorum)
+throws IOException {
+ServerSocket serverSocket;
 try {
-if (self.shouldUsePortUnification() || self.isSslQuorum()) {
-boolean allowInsecureConnection = 
self.shouldUsePortUnification();
-if (self.getQuorumListenOnAllIPs()) {
-ss = new UnifiedServerSocket(self.getX509Util(), 
allowInsecureConnection, self.getQuorumAddress().getPort());
-} else {
-ss = new UnifiedServerSocket(self.getX509Util(), 
allowInsecureConnection);
-}
+if (portUnification || sslQuorum) {
+serverSocket = new UnifiedServerSocket(self.getX509Util(), 
portUnification);
 } else {
-if (self.getQuorumListenOnAllIPs()) {
-ss = new ServerSocket(self.getQuorumAddress().getPort());
-} else {
-ss = new ServerSocket();
-}
-}
-ss.setReuseAddress(true);
-if (!self.getQuorumListenOnAllIPs()) {
-ss.bind(self.getQuorumAddress());
+serverSocket = new ServerSocket();
 }
+serverSocket.setReuseAddress(true);
+serverSocket.bind(address);
+return serverSocket;
 } catch (BindException e) {
-if (self.getQuorumListenOnAllIPs()) {
-LOG.error("Couldn't bind to port " + 
self.getQuorumAddress().getPort(), e);
-} else {
-LOG.error("Couldn't bind to " + self.getQuorumAddress(), e);
-}
+LOG.error("Couldn't bind to " + self.getQuorumAddress(), e);
 
 Review comment:
   thanks!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [zookeeper] symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network

2019-08-14 Thread GitBox
symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve 
resilience to network
URL: https://github.com/apache/zookeeper/pull/1048#discussion_r313774702
 
 

 ##
 File path: 
zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java
 ##
 @@ -657,11 +647,31 @@ public CommandResponse run(ZooKeeperServer zkServer, 
Map kwargs)
 TreeMap::new));
 }
 
+private String getMultiAddressString(QuorumPeer.QuorumServer qs) {
+return qs.addr.getAllAddresses().stream()
+.map(address -> getSingleAddressString(qs, address))
+.collect(Collectors.joining(","));
+}
+
+private String getSingleAddressString(QuorumPeer.QuorumServer qs, 
InetSocketAddress address) {
 
 Review comment:
   I changed the format, this is how the `voting_view` admin command responds 
now (I haven't push the commit yet):
   ```
   {
 "current_config" : {
   "1" : {
 "server_addresses" : [ "/172.16.101.11:2888", "/172.16.102.11:2888" ],
 "election_addresses" : [ "/172.16.101.11:3888", "/172.16.102.11:3888" 
],
 "client_address" : "/0.0.0.0:2181",
 "learner_type" : "participant"
   },
   "2" : {
 "server_addresses" : [ "/172.16.101.22:2888", "/172.16.102.22:2888" ],
 "election_addresses" : [ "/172.16.101.22:3888", "/172.16.102.22:3888" 
],
 "client_address" : "/0.0.0.0:2181",
 "learner_type" : "participant"
   },
   "3" : {
 "server_addresses" : [ "/172.16.101.33:2888", "/172.16.102.33:2888" ],
 "election_addresses" : [ "/172.16.101.33:3888", "/172.16.102.33:3888" 
],
 "client_address" : "/0.0.0.0:2181",
 "learner_type" : "participant"
   }
 },
 "command" : "voting_view",
 "error" : null
   }
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [zookeeper] symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network

2019-08-14 Thread GitBox
symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve 
resilience to network
URL: https://github.com/apache/zookeeper/pull/1048#discussion_r313774702
 
 

 ##
 File path: 
zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java
 ##
 @@ -657,11 +647,31 @@ public CommandResponse run(ZooKeeperServer zkServer, 
Map kwargs)
 TreeMap::new));
 }
 
+private String getMultiAddressString(QuorumPeer.QuorumServer qs) {
+return qs.addr.getAllAddresses().stream()
+.map(address -> getSingleAddressString(qs, address))
+.collect(Collectors.joining(","));
+}
+
+private String getSingleAddressString(QuorumPeer.QuorumServer qs, 
InetSocketAddress address) {
 
 Review comment:
   I changed the format, this is how the admin command responds now (I haven't 
push the commit yet):
   ```
   {
 "current_config" : {
   "1" : {
 "server_addresses" : [ "/172.16.101.11:2888", "/172.16.102.11:2888" ],
 "election_addresses" : [ "/172.16.101.11:3888", "/172.16.102.11:3888" 
],
 "client_address" : "/0.0.0.0:2181",
 "learner_type" : "participant"
   },
   "2" : {
 "server_addresses" : [ "/172.16.101.22:2888", "/172.16.102.22:2888" ],
 "election_addresses" : [ "/172.16.101.22:3888", "/172.16.102.22:3888" 
],
 "client_address" : "/0.0.0.0:2181",
 "learner_type" : "participant"
   },
   "3" : {
 "server_addresses" : [ "/172.16.101.33:2888", "/172.16.102.33:2888" ],
 "election_addresses" : [ "/172.16.101.33:3888", "/172.16.102.33:3888" 
],
 "client_address" : "/0.0.0.0:2181",
 "learner_type" : "participant"
   }
 },
 "command" : "voting_view",
 "error" : null
   }
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [zookeeper] symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network

2019-08-14 Thread GitBox
symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve 
resilience to network
URL: https://github.com/apache/zookeeper/pull/1048#discussion_r313739495
 
 

 ##
 File path: 
zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java
 ##
 @@ -18,16 +18,8 @@
 
 package org.apache.zookeeper.server.admin;
 
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
+import java.net.InetSocketAddress;
+import java.util.*;
 
 Review comment:
   sure, thanks!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [zookeeper] symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network

2019-08-14 Thread GitBox
symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve 
resilience to network
URL: https://github.com/apache/zookeeper/pull/1048#discussion_r313738065
 
 

 ##
 File path: 
zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java
 ##
 @@ -657,11 +647,31 @@ public CommandResponse run(ZooKeeperServer zkServer, 
Map kwargs)
 TreeMap::new));
 }
 
+private String getMultiAddressString(QuorumPeer.QuorumServer qs) {
+return qs.addr.getAllAddresses().stream()
+.map(address -> getSingleAddressString(qs, address))
+.collect(Collectors.joining(","));
+}
+
+private String getSingleAddressString(QuorumPeer.QuorumServer qs, 
InetSocketAddress address) {
 
 Review comment:
   Yes, I agree, I will change it. 
   
   This admin command is about to show the internal view of the voting members 
(how the zookeeper server thinks who the voting members are and where do they 
listen). I wouldn't complicate this PR any further, but it might be a good idea 
to create a follow-up ticket to have some admin command showing that if the 
given server can reach all the different ports of the other servers (not only 
the voting members). It can help debugging network problems, showing if certain 
network interfaces on some servers are unreachable. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services