[GitHub] [zookeeper] symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network
symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network URL: https://github.com/apache/zookeeper/pull/1048#discussion_r332900640 ## File path: zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainMultiAddressTest.java ## @@ -0,0 +1,423 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + +import static org.junit.Assert.assertEquals; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.DummyWatcher; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.PortAssignment; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.admin.ZooKeeperAdmin; +import org.apache.zookeeper.test.ClientBase; +import org.apache.zookeeper.test.ReconfigTest; +import org.junit.Before; +import org.junit.Test; + + +public class QuorumPeerMainMultiAddressTest extends QuorumPeerTestBase { + + private static final int FIRST_SERVER = 0; + private static final int SECOND_SERVER = 1; + private static final int THIRD_SERVER = 2; + private static final int FIRST_ADDRESS = 0; + private static final int SECOND_ADDRESS = 1; + private static final String UNREACHABLE_HOST = "invalid.hostname.unreachable.com"; + private static final String IPV6_LOCALHOST = "[0:0:0:0:0:0:0:1]"; + + // IPv4 by default, change to IPV6_LOCALHOST to test with servers binding to IPv6 + private String hostName = "127.0.0.1"; + + private int zNodeId = 0; + + @Before + public void setUp() throws Exception { +ClientBase.setupTestEnv(); +System.setProperty("zookeeper.DigestAuthenticationProvider.superDigest", "super:D/InIHSb7yEEbrWz8b9l71RjZJU="/* password is 'test'*/); +QuorumPeerConfig.setReconfigEnabled(true); + } + + + @Test + public void shouldStartClusterWithMultipleAddresses() throws Exception { +// we have three ZK servers, each server has two quorumPort and two electionPort registered +QuorumConfigBuilder quorumConfig = new QuorumConfigBuilder(hostName, 3, 2); + +// we launch the three servers, each server having the same configuration +QuorumConfigBuilder builderForServer1 = new QuorumConfigBuilder(quorumConfig); +QuorumConfigBuilder builderForServer2 = new QuorumConfigBuilder(quorumConfig); +QuorumConfigBuilder builderForServer3 = new QuorumConfigBuilder(quorumConfig); +launchServers(Arrays.asList(builderForServer1, builderForServer2, builderForServer3)); + +checkIfZooKeeperQuorumWorks(quorumConfig); + } + + + @Test + public void shouldStartClusterWithMultipleAddresses_IPv6() throws Exception { +hostName = IPV6_LOCALHOST; + +shouldStartClusterWithMultipleAddresses(); + } + + + @Test + public void shouldStartClusterWhenSomeAddressesAreUnreachable() throws Exception { +// we have three ZK servers, each server has two quorumPort and two electionPort registered +QuorumConfigBuilder quorumConfig = new QuorumConfigBuilder(hostName, 3, 2); + +// we launch the three servers +// in the config of each server, we misconfigure one of the addresses for the other two servers +QuorumConfigBuilder builderForServer1 = new QuorumConfigBuilder(quorumConfig) + .changeHostName(SECOND_SERVER, SECOND_ADDRESS, UNREACHABLE_HOST) + .changeHostName(THIRD_SERVER, SECOND_ADDRESS, UNREACHABLE_HOST); + +QuorumConfigBuilder builderForServer2 = new QuorumConfigBuilder(quorumConfig) + .changeHostName(FIRST_SERVER, SECOND_ADDRESS, UNREACHABLE_HOST) + .changeHostName(THIRD_SERVER, SECOND_ADDRESS, UNREACHABLE_HOST); + +QuorumConfigBuilder builderForServer3 = new QuorumConfigBuilder(quorumConfig) + .changeHostName(FIRST_SERVER, SECOND_ADDRESS, UNREACHABLE_HOST) + .changeHostName(SECOND_SERVER, SECOND_ADDRESS, UNREACHABLE_HOST); + +launchServers(Arrays.asList(builderForServer1, builderForServer2, builderForServer3)); + +checkIfZooKeeperQuorumWorks(quorumConfig); + } + +
[GitHub] [zookeeper] symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network
symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network URL: https://github.com/apache/zookeeper/pull/1048#discussion_r332900576 ## File path: zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigTest.java ## @@ -1152,4 +1171,74 @@ private void assertRemotePeerMXBeanAttributes(QuorumServer qs, String beanName) getAddrPortFromBean(beanName, "QuorumAddress")); } + +/* + * A helper class to parse / compare server address config lines. + * Example: server.1=127.0.0.1:11228:11231|127.0.0.1:11230:11229:participant;0.0.0.0:11227 + */ +private static class ServerConfigLine { Review comment: I know about this place in `QuorumPeer`: https://github.com/apache/zookeeper/blob/2dcb5e799ec02a2c6a6c7bad80c47169dc095271/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java#L240 In my case I want to compare complete server config lines during the test for verification (without caring the syntactical correctness or order of the actual addresses), so what I need is a bit different. It might be nice to create a re-usable helper class for this, but also I don't mind if this logic gets duplicated in the test code. Beside: even nicer would be to use some more extendable config format, like JSON. (but it is out of the scope here) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [zookeeper] symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network
symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network URL: https://github.com/apache/zookeeper/pull/1048#discussion_r331468963 ## File path: zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java ## @@ -418,66 +426,108 @@ public boolean isQuorumSynced(QuorumVerifier qv) { class LearnerCnxAcceptor extends ZooKeeperCriticalThread { -private volatile boolean stop = false; +private final AtomicBoolean stop = new AtomicBoolean(false); +private final AtomicBoolean fail = new AtomicBoolean(false); -public LearnerCnxAcceptor() { -super("LearnerCnxAcceptor-" + ss.getLocalSocketAddress(), zk.getZooKeeperServerListener()); +LearnerCnxAcceptor() { +super("LearnerCnxAcceptor-" + serverSockets.stream() + .map(ServerSocket::getLocalSocketAddress) + .map(Objects::toString) + .collect(Collectors.joining(",")), + zk.getZooKeeperServerListener()); } @Override public void run() { -try { -while (!stop) { -Socket s = null; -boolean error = false; -try { -s = ss.accept(); - -// start with the initLimit, once the ack is processed -// in LearnerHandler switch to the syncLimit -s.setSoTimeout(self.tickTime * self.initLimit); -s.setTcpNoDelay(nodelay); - -BufferedInputStream is = new BufferedInputStream(s.getInputStream()); -LearnerHandler fh = new LearnerHandler(s, is, Leader.this); -fh.start(); -} catch (SocketException e) { -error = true; -if (stop) { -LOG.info("exception while shutting down acceptor: " + e); - -// When Leader.shutdown() calls ss.close(), -// the call to accept throws an exception. -// We catch and set stop to true. -stop = true; -} else { -throw e; -} -} catch (SaslException e) { -LOG.error("Exception while connecting to quorum learner", e); -error = true; -} catch (Exception e) { -error = true; +if (!stop.get() && !serverSockets.isEmpty()) { +ExecutorService executor = Executors.newFixedThreadPool(serverSockets.size()); +CountDownLatch latch = new CountDownLatch(serverSockets.size()); + +serverSockets.forEach(serverSocket -> +executor.submit(new LearnerCnxAcceptorHandler(serverSocket, latch))); + +try { +latch.await(); +} catch (InterruptedException ie) { +LOG.error("Interrupted while sleeping. Ignoring exception", ie); +} finally { +closeSockets(); Review comment: Yes, You are right, I will add the awaitTermination here (and also to the Learner class). I will provide some log messages if there would be any tasks alive after the 1 second. (you are right that is is very unlikely in case of real use) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [zookeeper] symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network
symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network URL: https://github.com/apache/zookeeper/pull/1048#discussion_r331463634 ## File path: zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/MultipleAddresses.java ## @@ -0,0 +1,226 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.NoRouteToHostException; +import java.net.UnknownHostException; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * This class allows to store several quorum and electing addresses. + * + * See ZOOKEEPER-3188 for a discussion of this feature. + */ +public class MultipleAddresses { +private static final int DEFAULT_TIMEOUT = 100; + +private Set addresses; +private int timeout; + +public MultipleAddresses() { +addresses = Collections.newSetFromMap(new ConcurrentHashMap<>()); +timeout = DEFAULT_TIMEOUT; +} + +public MultipleAddresses(List addresses) { +this(addresses, DEFAULT_TIMEOUT); +} + +public MultipleAddresses(InetSocketAddress address) { +this(address, DEFAULT_TIMEOUT); +} + +public MultipleAddresses(List addresses, int timeout) { +this.addresses = Collections.newSetFromMap(new ConcurrentHashMap<>()); +this.addresses.addAll(addresses); +this.timeout = timeout; +} + +public MultipleAddresses(InetSocketAddress address, int timeout) { +addresses = Collections.newSetFromMap(new ConcurrentHashMap<>()); +addresses.add(address); +this.timeout = timeout; +} + +public int getTimeout() { +return timeout; +} + +public void setTimeout(int timeout) { +this.timeout = timeout; +} + +public boolean isEmpty() { +return addresses.isEmpty(); +} + +/** + * Returns all addresses. + * + * @return set of all InetSocketAddress + */ +public Set getAllAddresses() { +return Collections.unmodifiableSet(addresses); +} + +/** + * Returns wildcard addresses for all ports + * + * @return set of InetSocketAddress with wildcards for all ports + */ +public Set getWildcardAddresses() { +return addresses.stream().map(a -> new InetSocketAddress(a.getPort())).collect(Collectors.toSet()); +} + +/** + * Returns all ports + * + * @return list of all ports + */ +public List getAllPorts() { +return addresses.stream().map(InetSocketAddress::getPort).distinct().collect(Collectors.toList()); +} + +/** + * Returns distinct list of all host strings + * + * @return list of all hosts + */ +public List getAllHostStrings() { +return addresses.stream().map(InetSocketAddress::getHostString).distinct().collect(Collectors.toList()); +} + +public void addAddress(InetSocketAddress address) { +addresses.add(address); +} + +/** + * Returns reachable address. If none is reachable than throws exception. + * + * @return address which is reachable. + * @throws NoRouteToHostException if none address is reachable + */ +public InetSocketAddress getReachableAddress() throws NoRouteToHostException { +AtomicReference address = new AtomicReference<>(null); +getInetSocketAddressStream().forEach(addr -> checkIfAddressIsReachableAndSet(addr, address)); + +if (address.get() != null) { +return address.get(); +} else { +throw new NoRouteToHostException("No valid address among " + addresses); +} +} + +/** + * Returns reachable address or first one, if none is reachable. + * + * @return address which is reachable or fist one. + */ +public InetSocketAddress getReachableOrOne() { +InetSocketAddress address; +
[GitHub] [zookeeper] symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network
symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network URL: https://github.com/apache/zookeeper/pull/1048#discussion_r331445109 ## File path: zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/MultipleAddresses.java ## @@ -0,0 +1,226 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.NoRouteToHostException; +import java.net.UnknownHostException; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * This class allows to store several quorum and electing addresses. + * + * See ZOOKEEPER-3188 for a discussion of this feature. + */ +public class MultipleAddresses { +private static final int DEFAULT_TIMEOUT = 100; + +private Set addresses; +private int timeout; + +public MultipleAddresses() { +addresses = Collections.newSetFromMap(new ConcurrentHashMap<>()); +timeout = DEFAULT_TIMEOUT; +} + +public MultipleAddresses(List addresses) { +this(addresses, DEFAULT_TIMEOUT); +} + +public MultipleAddresses(InetSocketAddress address) { +this(address, DEFAULT_TIMEOUT); +} + +public MultipleAddresses(List addresses, int timeout) { +this.addresses = Collections.newSetFromMap(new ConcurrentHashMap<>()); +this.addresses.addAll(addresses); +this.timeout = timeout; +} + +public MultipleAddresses(InetSocketAddress address, int timeout) { +addresses = Collections.newSetFromMap(new ConcurrentHashMap<>()); +addresses.add(address); +this.timeout = timeout; +} + +public int getTimeout() { +return timeout; +} + +public void setTimeout(int timeout) { +this.timeout = timeout; +} + +public boolean isEmpty() { +return addresses.isEmpty(); +} + +/** + * Returns all addresses. + * + * @return set of all InetSocketAddress + */ +public Set getAllAddresses() { +return Collections.unmodifiableSet(addresses); +} + +/** + * Returns wildcard addresses for all ports + * + * @return set of InetSocketAddress with wildcards for all ports + */ +public Set getWildcardAddresses() { +return addresses.stream().map(a -> new InetSocketAddress(a.getPort())).collect(Collectors.toSet()); +} + +/** + * Returns all ports + * + * @return list of all ports + */ +public List getAllPorts() { +return addresses.stream().map(InetSocketAddress::getPort).distinct().collect(Collectors.toList()); +} + +/** + * Returns distinct list of all host strings + * + * @return list of all hosts + */ +public List getAllHostStrings() { +return addresses.stream().map(InetSocketAddress::getHostString).distinct().collect(Collectors.toList()); +} + +public void addAddress(InetSocketAddress address) { +addresses.add(address); +} + +/** + * Returns reachable address. If none is reachable than throws exception. + * + * @return address which is reachable. + * @throws NoRouteToHostException if none address is reachable + */ +public InetSocketAddress getReachableAddress() throws NoRouteToHostException { +AtomicReference address = new AtomicReference<>(null); +getInetSocketAddressStream().forEach(addr -> checkIfAddressIsReachableAndSet(addr, address)); + +if (address.get() != null) { +return address.get(); +} else { +throw new NoRouteToHostException("No valid address among " + addresses); +} +} + +/** + * Returns reachable address or first one, if none is reachable. + * + * @return address which is reachable or fist one. + */ +public InetSocketAddress getReachableOrOne() { +InetSocketAddress address; +
[GitHub] [zookeeper] symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network
symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network URL: https://github.com/apache/zookeeper/pull/1048#discussion_r331445109 ## File path: zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/MultipleAddresses.java ## @@ -0,0 +1,226 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.NoRouteToHostException; +import java.net.UnknownHostException; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * This class allows to store several quorum and electing addresses. + * + * See ZOOKEEPER-3188 for a discussion of this feature. + */ +public class MultipleAddresses { +private static final int DEFAULT_TIMEOUT = 100; + +private Set addresses; +private int timeout; + +public MultipleAddresses() { +addresses = Collections.newSetFromMap(new ConcurrentHashMap<>()); +timeout = DEFAULT_TIMEOUT; +} + +public MultipleAddresses(List addresses) { +this(addresses, DEFAULT_TIMEOUT); +} + +public MultipleAddresses(InetSocketAddress address) { +this(address, DEFAULT_TIMEOUT); +} + +public MultipleAddresses(List addresses, int timeout) { +this.addresses = Collections.newSetFromMap(new ConcurrentHashMap<>()); +this.addresses.addAll(addresses); +this.timeout = timeout; +} + +public MultipleAddresses(InetSocketAddress address, int timeout) { +addresses = Collections.newSetFromMap(new ConcurrentHashMap<>()); +addresses.add(address); +this.timeout = timeout; +} + +public int getTimeout() { +return timeout; +} + +public void setTimeout(int timeout) { +this.timeout = timeout; +} + +public boolean isEmpty() { +return addresses.isEmpty(); +} + +/** + * Returns all addresses. + * + * @return set of all InetSocketAddress + */ +public Set getAllAddresses() { +return Collections.unmodifiableSet(addresses); +} + +/** + * Returns wildcard addresses for all ports + * + * @return set of InetSocketAddress with wildcards for all ports + */ +public Set getWildcardAddresses() { +return addresses.stream().map(a -> new InetSocketAddress(a.getPort())).collect(Collectors.toSet()); +} + +/** + * Returns all ports + * + * @return list of all ports + */ +public List getAllPorts() { +return addresses.stream().map(InetSocketAddress::getPort).distinct().collect(Collectors.toList()); +} + +/** + * Returns distinct list of all host strings + * + * @return list of all hosts + */ +public List getAllHostStrings() { +return addresses.stream().map(InetSocketAddress::getHostString).distinct().collect(Collectors.toList()); +} + +public void addAddress(InetSocketAddress address) { +addresses.add(address); +} + +/** + * Returns reachable address. If none is reachable than throws exception. + * + * @return address which is reachable. + * @throws NoRouteToHostException if none address is reachable + */ +public InetSocketAddress getReachableAddress() throws NoRouteToHostException { +AtomicReference address = new AtomicReference<>(null); +getInetSocketAddressStream().forEach(addr -> checkIfAddressIsReachableAndSet(addr, address)); + +if (address.get() != null) { +return address.get(); +} else { +throw new NoRouteToHostException("No valid address among " + addresses); +} +} + +/** + * Returns reachable address or first one, if none is reachable. + * + * @return address which is reachable or fist one. + */ +public InetSocketAddress getReachableOrOne() { +InetSocketAddress address; +
[GitHub] [zookeeper] symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network
symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network URL: https://github.com/apache/zookeeper/pull/1048#discussion_r331433800 ## File path: zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/MultipleAddresses.java ## @@ -0,0 +1,226 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.NoRouteToHostException; +import java.net.UnknownHostException; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * This class allows to store several quorum and electing addresses. + * + * See ZOOKEEPER-3188 for a discussion of this feature. + */ +public class MultipleAddresses { +private static final int DEFAULT_TIMEOUT = 100; + +private Set addresses; +private int timeout; + +public MultipleAddresses() { +addresses = Collections.newSetFromMap(new ConcurrentHashMap<>()); +timeout = DEFAULT_TIMEOUT; +} + +public MultipleAddresses(List addresses) { +this(addresses, DEFAULT_TIMEOUT); +} + +public MultipleAddresses(InetSocketAddress address) { +this(address, DEFAULT_TIMEOUT); +} + +public MultipleAddresses(List addresses, int timeout) { +this.addresses = Collections.newSetFromMap(new ConcurrentHashMap<>()); +this.addresses.addAll(addresses); +this.timeout = timeout; +} + +public MultipleAddresses(InetSocketAddress address, int timeout) { +addresses = Collections.newSetFromMap(new ConcurrentHashMap<>()); +addresses.add(address); +this.timeout = timeout; +} + +public int getTimeout() { +return timeout; +} + +public void setTimeout(int timeout) { +this.timeout = timeout; +} + +public boolean isEmpty() { +return addresses.isEmpty(); +} + +/** + * Returns all addresses. + * + * @return set of all InetSocketAddress + */ +public Set getAllAddresses() { +return Collections.unmodifiableSet(addresses); +} + +/** + * Returns wildcard addresses for all ports + * + * @return set of InetSocketAddress with wildcards for all ports + */ +public Set getWildcardAddresses() { +return addresses.stream().map(a -> new InetSocketAddress(a.getPort())).collect(Collectors.toSet()); +} + +/** + * Returns all ports + * + * @return list of all ports + */ +public List getAllPorts() { +return addresses.stream().map(InetSocketAddress::getPort).distinct().collect(Collectors.toList()); +} + +/** + * Returns distinct list of all host strings + * + * @return list of all hosts + */ +public List getAllHostStrings() { +return addresses.stream().map(InetSocketAddress::getHostString).distinct().collect(Collectors.toList()); +} + +public void addAddress(InetSocketAddress address) { +addresses.add(address); +} + +/** + * Returns reachable address. If none is reachable than throws exception. + * + * @return address which is reachable. + * @throws NoRouteToHostException if none address is reachable + */ +public InetSocketAddress getReachableAddress() throws NoRouteToHostException { +AtomicReference address = new AtomicReference<>(null); +getInetSocketAddressStream().forEach(addr -> checkIfAddressIsReachableAndSet(addr, address)); + +if (address.get() != null) { +return address.get(); +} else { +throw new NoRouteToHostException("No valid address among " + addresses); +} +} + +/** + * Returns reachable address or first one, if none is reachable. + * + * @return address which is reachable or fist one. + */ +public InetSocketAddress getReachableOrOne() { +InetSocketAddress address; +
[GitHub] [zookeeper] symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network
symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network URL: https://github.com/apache/zookeeper/pull/1048#discussion_r331414154 ## File path: zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/MultipleAddresses.java ## @@ -0,0 +1,226 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.NoRouteToHostException; +import java.net.UnknownHostException; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * This class allows to store several quorum and electing addresses. + * + * See ZOOKEEPER-3188 for a discussion of this feature. + */ +public class MultipleAddresses { +private static final int DEFAULT_TIMEOUT = 100; + +private Set addresses; +private int timeout; + +public MultipleAddresses() { +addresses = Collections.newSetFromMap(new ConcurrentHashMap<>()); +timeout = DEFAULT_TIMEOUT; +} + +public MultipleAddresses(List addresses) { +this(addresses, DEFAULT_TIMEOUT); +} + +public MultipleAddresses(InetSocketAddress address) { +this(address, DEFAULT_TIMEOUT); +} + +public MultipleAddresses(List addresses, int timeout) { +this.addresses = Collections.newSetFromMap(new ConcurrentHashMap<>()); +this.addresses.addAll(addresses); +this.timeout = timeout; +} + +public MultipleAddresses(InetSocketAddress address, int timeout) { +addresses = Collections.newSetFromMap(new ConcurrentHashMap<>()); +addresses.add(address); +this.timeout = timeout; +} + +public int getTimeout() { +return timeout; +} + +public void setTimeout(int timeout) { +this.timeout = timeout; +} + +public boolean isEmpty() { +return addresses.isEmpty(); +} + +/** + * Returns all addresses. + * + * @return set of all InetSocketAddress + */ +public Set getAllAddresses() { +return Collections.unmodifiableSet(addresses); +} + +/** + * Returns wildcard addresses for all ports + * + * @return set of InetSocketAddress with wildcards for all ports + */ +public Set getWildcardAddresses() { +return addresses.stream().map(a -> new InetSocketAddress(a.getPort())).collect(Collectors.toSet()); +} + +/** + * Returns all ports + * + * @return list of all ports + */ +public List getAllPorts() { +return addresses.stream().map(InetSocketAddress::getPort).distinct().collect(Collectors.toList()); +} + +/** + * Returns distinct list of all host strings + * + * @return list of all hosts + */ +public List getAllHostStrings() { +return addresses.stream().map(InetSocketAddress::getHostString).distinct().collect(Collectors.toList()); +} + +public void addAddress(InetSocketAddress address) { +addresses.add(address); +} + +/** + * Returns reachable address. If none is reachable than throws exception. + * + * @return address which is reachable. + * @throws NoRouteToHostException if none address is reachable + */ +public InetSocketAddress getReachableAddress() throws NoRouteToHostException { +AtomicReference address = new AtomicReference<>(null); +getInetSocketAddressStream().forEach(addr -> checkIfAddressIsReachableAndSet(addr, address)); + +if (address.get() != null) { +return address.get(); +} else { +throw new NoRouteToHostException("No valid address among " + addresses); +} +} + +/** + * Returns reachable address or first one, if none is reachable. + * + * @return address which is reachable or fist one. + */ +public InetSocketAddress getReachableOrOne() { +InetSocketAddress address; +
[GitHub] [zookeeper] symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network
symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network URL: https://github.com/apache/zookeeper/pull/1048#discussion_r331407319 ## File path: zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/MultipleAddresses.java ## @@ -0,0 +1,226 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.NoRouteToHostException; +import java.net.UnknownHostException; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * This class allows to store several quorum and electing addresses. + * + * See ZOOKEEPER-3188 for a discussion of this feature. + */ +public class MultipleAddresses { Review comment: thx This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [zookeeper] symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network
symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network URL: https://github.com/apache/zookeeper/pull/1048#discussion_r331406937 ## File path: zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java ## @@ -255,65 +260,114 @@ protected void sockConnect(Socket sock, InetSocketAddress addr, int timeout) thr * @throws X509Exception * @throws InterruptedException */ -protected void connectToLeader(InetSocketAddress addr, String hostname) throws IOException, InterruptedException, X509Exception { -this.sock = createSocket(); +protected void connectToLeader(MultipleAddresses addr, String hostname) +throws IOException, InterruptedException { + this.leaderAddr = addr; +Set addresses = addr.getAllAddresses(); +ExecutorService executor = Executors.newFixedThreadPool(addresses.size()); Review comment: thanks, I will do it This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [zookeeper] symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network
symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network URL: https://github.com/apache/zookeeper/pull/1048#discussion_r331403117 ## File path: zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java ## @@ -418,66 +426,108 @@ public boolean isQuorumSynced(QuorumVerifier qv) { class LearnerCnxAcceptor extends ZooKeeperCriticalThread { -private volatile boolean stop = false; +private final AtomicBoolean stop = new AtomicBoolean(false); +private final AtomicBoolean fail = new AtomicBoolean(false); -public LearnerCnxAcceptor() { -super("LearnerCnxAcceptor-" + ss.getLocalSocketAddress(), zk.getZooKeeperServerListener()); +LearnerCnxAcceptor() { +super("LearnerCnxAcceptor-" + serverSockets.stream() + .map(ServerSocket::getLocalSocketAddress) + .map(Objects::toString) + .collect(Collectors.joining(",")), + zk.getZooKeeperServerListener()); } @Override public void run() { -try { -while (!stop) { -Socket s = null; -boolean error = false; -try { -s = ss.accept(); - -// start with the initLimit, once the ack is processed -// in LearnerHandler switch to the syncLimit -s.setSoTimeout(self.tickTime * self.initLimit); -s.setTcpNoDelay(nodelay); - -BufferedInputStream is = new BufferedInputStream(s.getInputStream()); -LearnerHandler fh = new LearnerHandler(s, is, Leader.this); -fh.start(); -} catch (SocketException e) { -error = true; -if (stop) { -LOG.info("exception while shutting down acceptor: " + e); - -// When Leader.shutdown() calls ss.close(), -// the call to accept throws an exception. -// We catch and set stop to true. -stop = true; -} else { -throw e; -} -} catch (SaslException e) { -LOG.error("Exception while connecting to quorum learner", e); -error = true; -} catch (Exception e) { -error = true; +if (!stop.get() && !serverSockets.isEmpty()) { +ExecutorService executor = Executors.newFixedThreadPool(serverSockets.size()); +CountDownLatch latch = new CountDownLatch(serverSockets.size()); + +serverSockets.forEach(serverSocket -> +executor.submit(new LearnerCnxAcceptorHandler(serverSocket, latch))); + +try { +latch.await(); +} catch (InterruptedException ie) { +LOG.error("Interrupted while sleeping. Ignoring exception", ie); +} finally { +closeSockets(); Review comment: thanks, nice catch. I will use executor.shutdownNow() to terminate the tasks immediately. I was thinking on adding a timeout first to wait for the tasks to shutdown gracefully in case of an interrupted exception, but I decided against it. At this point we are in a phase when all the tasks are finished (or will be finished anyway because of closing the sockets). This point I think it is better to finish everything quickly so that a new leader election can took place. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [zookeeper] symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network
symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network URL: https://github.com/apache/zookeeper/pull/1048#discussion_r331403263 ## File path: zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java ## @@ -418,66 +426,108 @@ public boolean isQuorumSynced(QuorumVerifier qv) { class LearnerCnxAcceptor extends ZooKeeperCriticalThread { -private volatile boolean stop = false; +private final AtomicBoolean stop = new AtomicBoolean(false); +private final AtomicBoolean fail = new AtomicBoolean(false); -public LearnerCnxAcceptor() { -super("LearnerCnxAcceptor-" + ss.getLocalSocketAddress(), zk.getZooKeeperServerListener()); +LearnerCnxAcceptor() { +super("LearnerCnxAcceptor-" + serverSockets.stream() + .map(ServerSocket::getLocalSocketAddress) + .map(Objects::toString) + .collect(Collectors.joining(",")), + zk.getZooKeeperServerListener()); } @Override public void run() { -try { -while (!stop) { -Socket s = null; -boolean error = false; -try { -s = ss.accept(); - -// start with the initLimit, once the ack is processed -// in LearnerHandler switch to the syncLimit -s.setSoTimeout(self.tickTime * self.initLimit); -s.setTcpNoDelay(nodelay); - -BufferedInputStream is = new BufferedInputStream(s.getInputStream()); -LearnerHandler fh = new LearnerHandler(s, is, Leader.this); -fh.start(); -} catch (SocketException e) { -error = true; -if (stop) { -LOG.info("exception while shutting down acceptor: " + e); - -// When Leader.shutdown() calls ss.close(), -// the call to accept throws an exception. -// We catch and set stop to true. -stop = true; -} else { -throw e; -} -} catch (SaslException e) { -LOG.error("Exception while connecting to quorum learner", e); -error = true; -} catch (Exception e) { -error = true; +if (!stop.get() && !serverSockets.isEmpty()) { +ExecutorService executor = Executors.newFixedThreadPool(serverSockets.size()); +CountDownLatch latch = new CountDownLatch(serverSockets.size()); + +serverSockets.forEach(serverSocket -> +executor.submit(new LearnerCnxAcceptorHandler(serverSocket, latch))); + +try { +latch.await(); +} catch (InterruptedException ie) { +LOG.error("Interrupted while sleeping. Ignoring exception", ie); +} finally { +closeSockets(); +} +} +} + +public void halt() { +stop.set(true); +closeSockets(); +} + +class LearnerCnxAcceptorHandler implements Runnable { +private ServerSocket serverSocket; +private CountDownLatch latch; + +LearnerCnxAcceptorHandler(ServerSocket serverSocket, CountDownLatch latch) { +this.serverSocket = serverSocket; +this.latch = latch; +} + +@Override +public void run() { +try { + Thread.currentThread().setName("LearnerCnxAcceptorHandler-" + serverSocket.getLocalSocketAddress()); + +while (!stop.get()) { +acceptConnections(); +} +} catch (Exception e) { +LOG.warn("Exception while accepting follower", e); +if (fail.compareAndSet(false, true)) { +handleException(getName(), e); +halt(); +} +} finally { +latch.countDown(); +} +} + +private void acceptConnections() throws IOException { +Socket socket = null; +boolean error = false; +try { +socket = serverSocket.accept(); + +// start with the initLimit, once the ack is processed +// in LearnerHandler switch to the syncLimit +socket.setSoTimeout(self.tickTime * self.initLimit); +
[GitHub] [zookeeper] symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network
symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network URL: https://github.com/apache/zookeeper/pull/1048#discussion_r317189846 ## File path: zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java ## @@ -417,71 +427,111 @@ public boolean isQuorumSynced(QuorumVerifier qv) { protected final Proposal newLeaderProposal = new Proposal(); class LearnerCnxAcceptor extends ZooKeeperCriticalThread { -private volatile boolean stop = false; +private final AtomicBoolean stop = new AtomicBoolean(false); +private final AtomicBoolean fail = new AtomicBoolean(false); -public LearnerCnxAcceptor() { -super("LearnerCnxAcceptor-" + ss.getLocalSocketAddress(), zk -.getZooKeeperServerListener()); +LearnerCnxAcceptor() { +super("LearnerCnxAcceptor-" + serverSockets.stream() + .map(ServerSocket::getLocalSocketAddress) + .map(Objects::toString) + .collect(Collectors.joining(",")), + zk.getZooKeeperServerListener()); } @Override public void run() { -try { -while (!stop) { -Socket s = null; -boolean error = false; -try { -s = ss.accept(); - -// start with the initLimit, once the ack is processed -// in LearnerHandler switch to the syncLimit -s.setSoTimeout(self.tickTime * self.initLimit); -s.setTcpNoDelay(nodelay); - -BufferedInputStream is = new BufferedInputStream( -s.getInputStream()); -LearnerHandler fh = new LearnerHandler(s, is, -Leader.this); -fh.start(); -} catch (SocketException e) { -error = true; -if (stop) { -LOG.info("exception while shutting down acceptor: " -+ e); - -// When Leader.shutdown() calls ss.close(), -// the call to accept throws an exception. -// We catch and set stop to true. -stop = true; -} else { -throw e; -} -} catch (SaslException e){ -LOG.error("Exception while connecting to quorum learner", e); -error = true; -} catch (Exception e) { -error = true; +if (!stop.get() && !serverSockets.isEmpty()) { +ExecutorService executor = Executors.newFixedThreadPool(serverSockets.size()); +CountDownLatch latch = new CountDownLatch(serverSockets.size()); + +serverSockets.forEach(serverSocket -> +executor.submit(new LearnerCnxAcceptorHandler(serverSocket, latch))); + +try { +latch.await(); Review comment: The code basically now starting listener threads on all local addresses, then waiting until all the listeners are dead, and then simply start over again (assuming no stop was requested by Leader.shutdown() or no unexpected failure happened in the Listener threads). I don't think we need a timeout here... ideally we want to wait forever :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [zookeeper] symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network
symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network URL: https://github.com/apache/zookeeper/pull/1048#discussion_r317167254 ## File path: zookeeper-server/src/main/java/org/apache/zookeeper/server/ObserverBean.java ## @@ -49,10 +50,11 @@ public String getQuorumAddress() { public String getLearnerMaster() { QuorumPeer.QuorumServer learnerMaster = observer.getCurrentLearnerMaster(); -if (learnerMaster == null || learnerMaster.addr == null) { +InetSocketAddress address = learnerMaster.addr.getReachableOrOne(); +if (learnerMaster == null || address == null) { Review comment: thanks, nice catch! (actually beside this, we also got a NoSuchElementException if learnerMaster.addr is empty) I will fix this in the next commit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [zookeeper] symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network
symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network URL: https://github.com/apache/zookeeper/pull/1048#discussion_r313774702 ## File path: zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java ## @@ -657,11 +647,31 @@ public CommandResponse run(ZooKeeperServer zkServer, Map kwargs) TreeMap::new)); } +private String getMultiAddressString(QuorumPeer.QuorumServer qs) { +return qs.addr.getAllAddresses().stream() +.map(address -> getSingleAddressString(qs, address)) +.collect(Collectors.joining(",")); +} + +private String getSingleAddressString(QuorumPeer.QuorumServer qs, InetSocketAddress address) { Review comment: I changed the format, this is how the `voting_view` admin command responds now: ``` { "current_config" : { "1" : { "server_addresses" : [ "/172.16.101.11:2888", "/172.16.102.11:2888" ], "election_addresses" : [ "/172.16.101.11:3888", "/172.16.102.11:3888" ], "client_address" : "/0.0.0.0:2181", "learner_type" : "participant" }, "2" : { "server_addresses" : [ "/172.16.101.22:2888", "/172.16.102.22:2888" ], "election_addresses" : [ "/172.16.101.22:3888", "/172.16.102.22:3888" ], "client_address" : "/0.0.0.0:2181", "learner_type" : "participant" }, "3" : { "server_addresses" : [ "/172.16.101.33:2888", "/172.16.102.33:2888" ], "election_addresses" : [ "/172.16.101.33:3888", "/172.16.102.33:3888" ], "client_address" : "/0.0.0.0:2181", "learner_type" : "participant" } }, "command" : "voting_view", "error" : null } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [zookeeper] symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network
symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network URL: https://github.com/apache/zookeeper/pull/1048#discussion_r313738065 ## File path: zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java ## @@ -657,11 +647,31 @@ public CommandResponse run(ZooKeeperServer zkServer, Map kwargs) TreeMap::new)); } +private String getMultiAddressString(QuorumPeer.QuorumServer qs) { +return qs.addr.getAllAddresses().stream() +.map(address -> getSingleAddressString(qs, address)) +.collect(Collectors.joining(",")); +} + +private String getSingleAddressString(QuorumPeer.QuorumServer qs, InetSocketAddress address) { Review comment: Yes, I agree, I will change it. This admin command is about to show the internal view of the voting members (how the zookeeper server thinks who the voting members are and where do they listen). I wouldn't complicate this PR any further, but it might be a good idea to create a follow-up ticket to have some admin command showing if the given server can actually reach all the different ports of the other servers (and not only the voting members). It can help debugging network problems, showing if certain network interfaces on some servers are unreachable. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [zookeeper] symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network
symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network URL: https://github.com/apache/zookeeper/pull/1048#discussion_r313820217 ## File path: zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java ## @@ -228,28 +240,33 @@ static public InitialMessage parse(Long protocolVersion, DataInputStream din) num_read, remaining, sid); } -String addr = new String(b); -String[] host_port; -try { -host_port = ConfigUtils.getHostAndPort(addr); -} catch (ConfigException e) { -throw new InitialMessageException("Badly formed address: %s", addr); -} +String[] addressStrings = new String(b).split(","); Review comment: referring to our second offline discussion: - I incremented the PROTOCOL_VERSION - did not do the JSON format modification, as it makes the future code complicated to always ensure both forward and backward compatibility of the election protocol during the rolling upgrades. The current solution (simply failing if the protocol versions mismatch) is more simple and still working just fine: as the servers are restarted one-by-one, the nodes with the old protocol version and the nodes with the new protocol version will form two partitions, but any given time only one partition will have the quorum. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [zookeeper] symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network
symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network URL: https://github.com/apache/zookeeper/pull/1048#discussion_r313776919 ## File path: zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java ## @@ -271,39 +278,42 @@ public boolean isQuorumSynced(QuorumVerifier qv) { return qv.containsQuorum(ids); } -private final ServerSocket ss; +private final List serverSockets = new LinkedList<>(); Leader(QuorumPeer self,LeaderZooKeeperServer zk) throws IOException { this.self = self; this.proposalStats = new BufferStats(); + +Set addresses; +if (self.getQuorumListenOnAllIPs()) { +addresses = self.getQuorumAddress().getWildcardAddresses(); +} else { +addresses = self.getQuorumAddress().getAllAddresses(); +} + +for (InetSocketAddress address : addresses) { +serverSockets.add(createServerSocket(address, self.shouldUsePortUnification(), self.isSslQuorum())); +} + +this.zk = zk; +} + +ServerSocket createServerSocket(InetSocketAddress address, boolean portUnification, boolean sslQuorum) +throws IOException { +ServerSocket serverSocket; try { -if (self.shouldUsePortUnification() || self.isSslQuorum()) { -boolean allowInsecureConnection = self.shouldUsePortUnification(); -if (self.getQuorumListenOnAllIPs()) { -ss = new UnifiedServerSocket(self.getX509Util(), allowInsecureConnection, self.getQuorumAddress().getPort()); -} else { -ss = new UnifiedServerSocket(self.getX509Util(), allowInsecureConnection); -} +if (portUnification || sslQuorum) { +serverSocket = new UnifiedServerSocket(self.getX509Util(), portUnification); } else { -if (self.getQuorumListenOnAllIPs()) { -ss = new ServerSocket(self.getQuorumAddress().getPort()); -} else { -ss = new ServerSocket(); -} -} -ss.setReuseAddress(true); -if (!self.getQuorumListenOnAllIPs()) { -ss.bind(self.getQuorumAddress()); +serverSocket = new ServerSocket(); } +serverSocket.setReuseAddress(true); +serverSocket.bind(address); +return serverSocket; } catch (BindException e) { -if (self.getQuorumListenOnAllIPs()) { -LOG.error("Couldn't bind to port " + self.getQuorumAddress().getPort(), e); -} else { -LOG.error("Couldn't bind to " + self.getQuorumAddress(), e); -} +LOG.error("Couldn't bind to " + self.getQuorumAddress(), e); Review comment: thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [zookeeper] symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network
symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network URL: https://github.com/apache/zookeeper/pull/1048#discussion_r313774702 ## File path: zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java ## @@ -657,11 +647,31 @@ public CommandResponse run(ZooKeeperServer zkServer, Map kwargs) TreeMap::new)); } +private String getMultiAddressString(QuorumPeer.QuorumServer qs) { +return qs.addr.getAllAddresses().stream() +.map(address -> getSingleAddressString(qs, address)) +.collect(Collectors.joining(",")); +} + +private String getSingleAddressString(QuorumPeer.QuorumServer qs, InetSocketAddress address) { Review comment: I changed the format, this is how the `voting_view` admin command responds now (I haven't push the commit yet): ``` { "current_config" : { "1" : { "server_addresses" : [ "/172.16.101.11:2888", "/172.16.102.11:2888" ], "election_addresses" : [ "/172.16.101.11:3888", "/172.16.102.11:3888" ], "client_address" : "/0.0.0.0:2181", "learner_type" : "participant" }, "2" : { "server_addresses" : [ "/172.16.101.22:2888", "/172.16.102.22:2888" ], "election_addresses" : [ "/172.16.101.22:3888", "/172.16.102.22:3888" ], "client_address" : "/0.0.0.0:2181", "learner_type" : "participant" }, "3" : { "server_addresses" : [ "/172.16.101.33:2888", "/172.16.102.33:2888" ], "election_addresses" : [ "/172.16.101.33:3888", "/172.16.102.33:3888" ], "client_address" : "/0.0.0.0:2181", "learner_type" : "participant" } }, "command" : "voting_view", "error" : null } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [zookeeper] symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network
symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network URL: https://github.com/apache/zookeeper/pull/1048#discussion_r313774702 ## File path: zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java ## @@ -657,11 +647,31 @@ public CommandResponse run(ZooKeeperServer zkServer, Map kwargs) TreeMap::new)); } +private String getMultiAddressString(QuorumPeer.QuorumServer qs) { +return qs.addr.getAllAddresses().stream() +.map(address -> getSingleAddressString(qs, address)) +.collect(Collectors.joining(",")); +} + +private String getSingleAddressString(QuorumPeer.QuorumServer qs, InetSocketAddress address) { Review comment: I changed the format, this is how the admin command responds now (I haven't push the commit yet): ``` { "current_config" : { "1" : { "server_addresses" : [ "/172.16.101.11:2888", "/172.16.102.11:2888" ], "election_addresses" : [ "/172.16.101.11:3888", "/172.16.102.11:3888" ], "client_address" : "/0.0.0.0:2181", "learner_type" : "participant" }, "2" : { "server_addresses" : [ "/172.16.101.22:2888", "/172.16.102.22:2888" ], "election_addresses" : [ "/172.16.101.22:3888", "/172.16.102.22:3888" ], "client_address" : "/0.0.0.0:2181", "learner_type" : "participant" }, "3" : { "server_addresses" : [ "/172.16.101.33:2888", "/172.16.102.33:2888" ], "election_addresses" : [ "/172.16.101.33:3888", "/172.16.102.33:3888" ], "client_address" : "/0.0.0.0:2181", "learner_type" : "participant" } }, "command" : "voting_view", "error" : null } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [zookeeper] symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network
symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network URL: https://github.com/apache/zookeeper/pull/1048#discussion_r313739495 ## File path: zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java ## @@ -18,16 +18,8 @@ package org.apache.zookeeper.server.admin; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; -import java.util.SortedMap; -import java.util.TreeMap; +import java.net.InetSocketAddress; +import java.util.*; Review comment: sure, thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [zookeeper] symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network
symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network URL: https://github.com/apache/zookeeper/pull/1048#discussion_r313738065 ## File path: zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java ## @@ -657,11 +647,31 @@ public CommandResponse run(ZooKeeperServer zkServer, Map kwargs) TreeMap::new)); } +private String getMultiAddressString(QuorumPeer.QuorumServer qs) { +return qs.addr.getAllAddresses().stream() +.map(address -> getSingleAddressString(qs, address)) +.collect(Collectors.joining(",")); +} + +private String getSingleAddressString(QuorumPeer.QuorumServer qs, InetSocketAddress address) { Review comment: Yes, I agree, I will change it. This admin command is about to show the internal view of the voting members (how the zookeeper server thinks who the voting members are and where do they listen). I wouldn't complicate this PR any further, but it might be a good idea to create a follow-up ticket to have some admin command showing that if the given server can reach all the different ports of the other servers (not only the voting members). It can help debugging network problems, showing if certain network interfaces on some servers are unreachable. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services