This is an automated email from the ASF dual-hosted git repository. ilyak pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 5f5caf4 IGNITE-14607 Regex Based Filtering For Discovery IP Addresses - Fixes #9048. 5f5caf4 is described below commit 5f5caf452582f1fb1219704d93d4e8bc9dc9c7f2 Author: Atri Sharma <atri.j...@gmail.com> AuthorDate: Sat May 1 18:24:33 2021 +0300 IGNITE-14607 Regex Based Filtering For Discovery IP Addresses - Fixes #9048. Signed-off-by: Ilya Kasnacheev <ilya.kasnach...@gmail.com> --- .../ignite/spi/discovery/tcp/TcpDiscoverySpi.java | 23 ++++++ .../tcp/TcpDiscoveryWithAddressFilterTest.java | 86 ++++++++++++++++++++++ .../tcp/TcpDiscoveryWithWrongServerTest.java | 20 ++--- .../IgniteSpiDiscoverySelfTestSuite.java | 3 + 4 files changed, 122 insertions(+), 10 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 6417195..0f4fc15 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -72,6 +72,7 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.marshaller.Marshaller; @@ -319,6 +320,9 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements IgniteDiscovery /** IP finder. */ protected TcpDiscoveryIpFinder ipFinder; + /** Address filter */ + private IgnitePredicate<InetSocketAddress> addressFilter; + /** Socket operations timeout. */ private long sockTimeout; // Must be initialized in the constructor of child class. @@ -926,6 +930,22 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements IgniteDiscovery } /** + * Sets filter for IP addresses. Each address found by {@link TcpDiscoveryIpFinder} will be checked against + * this filter and only passing addresses will be used for discovery. + * <p> + * If not specified or null, all found addresses are used. + * + * @param addressFilter Address filter to use + * @return {@code this} for chaining. + */ + @IgniteSpiConfiguration(optional = true) + public TcpDiscoverySpi setAddressFilter(IgnitePredicate<InetSocketAddress> addressFilter) { + this.addressFilter = addressFilter; + + return this; + } + + /** * Sets socket operations timeout. This timeout is used to limit connection time and * write-to-socket time. * <p> @@ -1994,6 +2014,9 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements IgniteDiscovery assert addr != null; try { + if (addressFilter != null && !addressFilter.apply(addr)) + continue; + InetSocketAddress resolved = addr.isUnresolved() ? new InetSocketAddress(InetAddress.getByName(addr.getHostName()), addr.getPort()) : addr; diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryWithAddressFilterTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryWithAddressFilterTest.java new file mode 100644 index 0000000..5c8b251 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryWithAddressFilterTest.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp; + +import java.net.InetSocketAddress; +import java.util.Collection; +import java.util.Collections; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.typedef.P1; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.junit.Test; + +/** + * Test discovery SPI with address filter present + */ +public class TcpDiscoveryWithAddressFilterTest extends TcpDiscoveryWithWrongServerTest { + /** Address filter predicate which allows filtering IP addresses duringd discovery */ + private IgnitePredicate<InetSocketAddress> addressFilter = new P1<InetSocketAddress>() { + @Override public boolean apply(InetSocketAddress address) { + // Compile regular expression + Pattern pattern = Pattern.compile("^/127\\.0\\.0\\.1:47503$", Pattern.CASE_INSENSITIVE); + // Match regex against input + Matcher matcher = pattern.matcher(address.toString()); + // Use results... + return !(matcher.matches()); + } + }; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder(); + + ipFinder.setAddresses(Collections.singleton("127.0.0.1:" + SERVER_PORT + ".." + LAST_SERVER_PORT)); + + cfg.setDiscoverySpi(new TcpDiscoveryWithAddressFilter().setIpFinder(ipFinder).setAddressFilter(addressFilter)); + + return cfg; + } + + /** + * Basic test + */ + @Test + public void testBasic() throws Exception { + startTcpThread(new NoResponseWorker(), SERVER_PORT); + startTcpThread(new NoResponseWorker(), LAST_SERVER_PORT); + + simpleTest(); + } + + /** + * Check for incoming addresses and check that the filter was applied + */ + private class TcpDiscoveryWithAddressFilter extends TcpDiscoverySpi { + /** {@inheritDoc} */ + @Override protected Collection<InetSocketAddress> resolvedAddresses() throws IgniteSpiException { + Collection<InetSocketAddress> res = super.resolvedAddresses(); + + for (InetSocketAddress addr : res) + assertFalse(addr.getHostName().matches("^/127\\.0\\.0\\.1:47503$")); + + return res; + } + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryWithWrongServerTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryWithWrongServerTest.java index 09ba74b..df9b6dc 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryWithWrongServerTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryWithWrongServerTest.java @@ -43,16 +43,16 @@ import org.junit.Test; */ public class TcpDiscoveryWithWrongServerTest extends GridCommonAbstractTest { /** Non-Ignite Server port #1. */ - private static final int SERVER_PORT = 47500; + protected static final int SERVER_PORT = 47500; /** Non-Ignite Server port #2. */ - private static final int LAST_SERVER_PORT = SERVER_PORT + 5; + protected static final int LAST_SERVER_PORT = SERVER_PORT + 5; /** Non-Ignite Server sockets. */ - private List<ServerSocket> srvSocks = new ArrayList<>(); + protected List<ServerSocket> srvSocks = new ArrayList<>(); /** Count of accepted connections to non-Ignite Server. */ - private AtomicInteger connCnt = new AtomicInteger(0); + protected AtomicInteger connCnt = new AtomicInteger(0); /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { @@ -81,7 +81,7 @@ public class TcpDiscoveryWithWrongServerTest extends GridCommonAbstractTest { * Starts tcp test thread * @param workerFactory one of WorkerFactory */ - private void startTcpThread(final WorkerFactory workerFactory, final int port) throws Exception { + protected void startTcpThread(final WorkerFactory workerFactory, final int port) throws Exception { final ServerSocket srvSock = new ServerSocket(port, 10, InetAddress.getByName("127.0.0.1")); srvSocks.add(srvSock); @@ -173,7 +173,7 @@ public class TcpDiscoveryWithWrongServerTest extends GridCommonAbstractTest { * It is expected that both client and server could successfully perform Discovery Procedure when there is * unknown (test) server in the ipFinder list. */ - private void simpleTest() { + protected void simpleTest() { try { Ignite srv = startGrid("server"); Ignite client = startClientGrid("client"); @@ -196,7 +196,7 @@ public class TcpDiscoveryWithWrongServerTest extends GridCommonAbstractTest { /** * Just a factory for runnable workers */ - private interface WorkerFactory { + protected interface WorkerFactory { /** * Creates a new worker for socket * @param clientSock socket for worker @@ -248,7 +248,7 @@ public class TcpDiscoveryWithWrongServerTest extends GridCommonAbstractTest { /** * SomeResponseWorker. */ - private class SomeResponseWorker implements WorkerFactory { + protected class SomeResponseWorker implements WorkerFactory { /** {@inheritDoc} */ @Override public Runnable newWorker(Socket clientSock) { return new SocketWorker(clientSock) { @@ -264,7 +264,7 @@ public class TcpDiscoveryWithWrongServerTest extends GridCommonAbstractTest { /** * NoResponseWorker. */ - private class NoResponseWorker implements WorkerFactory { + protected class NoResponseWorker implements WorkerFactory { /** {@inheritDoc} */ @Override public Runnable newWorker(Socket clientSock) { return new SocketWorker(clientSock) { @@ -320,7 +320,7 @@ public class TcpDiscoveryWithWrongServerTest extends GridCommonAbstractTest { * TcpDiscoverySpi with non-shuffled resolved IP addresses. We should ensure that in this test non-Ignite server * is the first element of the addresses list */ - class TcpDiscoverySpiWithOrderedIps extends TcpDiscoverySpi { + protected class TcpDiscoverySpiWithOrderedIps extends TcpDiscoverySpi { /** {@inheritDoc} */ @Override protected Collection<InetSocketAddress> resolvedAddresses() throws IgniteSpiException { Collection<InetSocketAddress> shuffled = super.resolvedAddresses(); diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java index 8f19f2d..3757ce7 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java @@ -72,6 +72,7 @@ import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySslSecuredUnsecuredTest; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySslSelfTest; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySslTrustedSelfTest; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySslTrustedUntrustedTest; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryWithAddressFilterTest; import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryWithWrongServerTest; import org.apache.ignite.spi.discovery.tcp.ipfinder.jdbc.TcpDiscoveryJdbcIpFinderSelfTest; import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinderSelfTest; @@ -136,6 +137,8 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_OVERRIDE_MCAST_GRP TcpDiscoveryWithWrongServerTest.class, + TcpDiscoveryWithAddressFilterTest.class, + TcpDiscoverySpiReconnectDelayTest.class, TcpDiscoveryNetworkIssuesTest.class,