[FLINK-1705] [taskmanager] Fix hostname lookup.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5308ac83 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5308ac83 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5308ac83 Branch: refs/heads/master Commit: 5308ac8325a5b31627023bfd002a9a3757d15c1f Parents: 2eea012 Author: Stephan Ewen <se...@apache.org> Authored: Sat Mar 14 18:20:14 2015 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Sun Mar 15 16:37:11 2015 +0100 ---------------------------------------------------------------------- .../instance/InstanceConnectionInfo.java | 75 +++++++++------- .../instance/InstanceConnectionInfoTest.java | 95 ++++++++++++++++---- 2 files changed, 122 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/5308ac83/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java index a1eec4d..ee79c23 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java @@ -31,7 +31,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * This class encapsulates all connection information necessary to connect to the instance's task manager. + * This class encapsulates the connection information of a TaskManager. + * It describes the host where the TaskManager operates and its server port + * for data exchange. This class also contains utilities to work with the + * TaskManager's host name, which is used to localize work assignments. */ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<InstanceConnectionInfo>, java.io.Serializable { @@ -56,15 +59,9 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In private String fqdnHostName; /** - * The hostname + * The hostname, derived from the fully qualified host name. */ private String hostName; - - /** - * This flag indicates if the FQDN hostname cound not be resolved and is represented - * as an IP address (string). - */ - private boolean fqdnHostNameIsIP = false; /** @@ -90,14 +87,24 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In // get FQDN hostname on this TaskManager. try { this.fqdnHostName = this.inetAddress.getCanonicalHostName(); - } catch (Throwable t) { - LOG.warn("Unable to determine hostname for TaskManager. The performance might be degraded since HDFS input split assignment is not possible"); - if(LOG.isDebugEnabled()) { - LOG.debug("getCanonicalHostName() Exception", t); - } - // could not determine host name, so take IP textual representation - this.fqdnHostName = inetAddress.getHostAddress(); - this.fqdnHostNameIsIP = true; + } + catch (Throwable t) { + LOG.warn("Unable to determine the canonical hostname. Input split assignment (such as " + + "for HDFS files) may be non-local when the canonical hostname is missing."); + LOG.debug("getCanonicalHostName() Exception:", t); + this.fqdnHostName = this.inetAddress.getHostAddress(); + } + + if (this.fqdnHostName.equals(this.inetAddress.getHostAddress())) { + // this happens when the name lookup fails, either due to an exception, + // or because no hostname can be found for the address + // take IP textual representation + this.hostName = this.fqdnHostName; + LOG.warn("No hostname could be resolved for the IP address {}, using IP address as host name. " + + "Local input split assignment (such as for HDFS files) may be impacted."); + } + else { + this.hostName = NetUtils.getHostnameFromFQDN(this.fqdnHostName); } } @@ -126,27 +133,37 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In } /** - * Returns the host name of the instance. If the host name could not be determined, the return value will be a - * textual representation of the instance's IP address. + * Returns the fully-qualified domain name the TaskManager. If the name could not be + * determined, the return value will be a textual representation of the TaskManager's IP address. * - * @return the host name of the instance + * @return The fully-qualified domain name of the TaskManager. */ public String getFQDNHostname() { return this.fqdnHostName; } - + + /** + * Gets the hostname of the TaskManager. The hostname derives from the fully qualified + * domain name (FQDN, see {@link #getFQDNHostname()}): + * <ul> + * <li>If the FQDN is the textual IP address, then the hostname is also the IP address</li> + * <li>If the FQDN has only one segment (such as "localhost", or "host17"), then this is + * used as the hostname.</li> + * <li>If the FQDN has multiple segments (such as "worker3.subgroup.company.net"), then the first + * segment (here "worker3") will be used as the hostname.</li> + * </ul> + * + * @return The hostname of the TaskManager. + */ public String getHostname() { - if(hostName == null) { - String fqdn = getFQDNHostname(); - if(this.fqdnHostNameIsIP) { // fqdn to hostname translation is pointless if FQDN is an ip address. - hostName = fqdn; - } else { - hostName = NetUtils.getHostnameFromFQDN(fqdn); - } - } return hostName; } + /** + * Gets the IP address where the TaskManager operates. + * + * @return The IP address. + */ public String getInetAdress() { return this.inetAddress.toString(); } @@ -166,7 +183,6 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In this.fqdnHostName = StringUtils.readNullableString(in); this.hostName = StringUtils.readNullableString(in); - this.fqdnHostNameIsIP = in.readBoolean(); try { this.inetAddress = InetAddress.getByAddress(address); @@ -185,7 +201,6 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In StringUtils.writeNullableString(fqdnHostName, out); StringUtils.writeNullableString(hostName, out); - out.writeBoolean(fqdnHostNameIsIP); } // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/5308ac83/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceConnectionInfoTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceConnectionInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceConnectionInfoTest.java index c072e59..2769183 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceConnectionInfoTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceConnectionInfoTest.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.instance; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -27,19 +28,43 @@ import java.net.InetAddress; import org.apache.flink.runtime.testutils.CommonTestUtils; import org.junit.Assert; import org.junit.Test; -import org.powermock.api.mockito.PowerMockito; -import org.powermock.reflect.Whitebox; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for the InstanceConnectionInfo, which identifies the location and connection + * information of a TaskManager. + */ public class InstanceConnectionInfoTest { @Test public void testEqualsHashAndCompareTo() { try { + // we mock the addresses to save the times of the reverse name lookups + InetAddress address1 = mock(InetAddress.class); + when(address1.getCanonicalHostName()).thenReturn("localhost"); + when(address1.getHostName()).thenReturn("localhost"); + when(address1.getHostAddress()).thenReturn("127.0.0.1"); + when(address1.getAddress()).thenReturn(new byte[] {127, 0, 0, 1} ); + + InetAddress address2 = mock(InetAddress.class); + when(address2.getCanonicalHostName()).thenReturn("testhost1"); + when(address2.getHostName()).thenReturn("testhost1"); + when(address2.getHostAddress()).thenReturn("0.0.0.0"); + when(address2.getAddress()).thenReturn(new byte[] {0, 0, 0, 0} ); + + InetAddress address3 = mock(InetAddress.class); + when(address3.getCanonicalHostName()).thenReturn("testhost2"); + when(address3.getHostName()).thenReturn("testhost2"); + when(address3.getHostAddress()).thenReturn("192.168.0.1"); + when(address3.getAddress()).thenReturn(new byte[] {(byte) 192, (byte) 168, 0, 1} ); + // one == four != two != three - InstanceConnectionInfo one = new InstanceConnectionInfo(InetAddress.getByName("127.0.0.1"), 19871); - InstanceConnectionInfo two = new InstanceConnectionInfo(InetAddress.getByName("0.0.0.0"), 19871); - InstanceConnectionInfo three = new InstanceConnectionInfo(InetAddress.getByName("192.168.0.1"), 10871); - InstanceConnectionInfo four = new InstanceConnectionInfo(InetAddress.getByName("127.0.0.1"), 19871); + InstanceConnectionInfo one = new InstanceConnectionInfo(address1, 19871); + InstanceConnectionInfo two = new InstanceConnectionInfo(address2, 19871); + InstanceConnectionInfo three = new InstanceConnectionInfo(address3, 10871); + InstanceConnectionInfo four = new InstanceConnectionInfo(address1, 19871); assertTrue(one.equals(four)); assertTrue(!one.equals(two)); @@ -101,10 +126,10 @@ public class InstanceConnectionInfoTest { public void testGetFQDNHostname() { try { InstanceConnectionInfo info1 = new InstanceConnectionInfo(InetAddress.getByName("127.0.0.1"), 19871); - assertTrue(info1.getFQDNHostname() != null); + assertNotNull(info1.getFQDNHostname()); InstanceConnectionInfo info2 = new InstanceConnectionInfo(InetAddress.getByName("1.2.3.4"), 8888); - assertTrue(info2.getFQDNHostname() != null); + assertNotNull(info2.getFQDNHostname()); } catch (Exception e) { e.printStackTrace(); @@ -115,10 +140,15 @@ public class InstanceConnectionInfoTest { @Test public void testGetHostname0() { try { - final InstanceConnectionInfo info1 = PowerMockito.spy(new InstanceConnectionInfo(InetAddress.getByName("127.0.0.1"), 19871)); - Whitebox.setInternalState(info1, "fqdnHostName", "worker2.cluster.mycompany.com"); - Assert.assertEquals("worker2", info1.getHostname()); - } catch (Exception e) { + InetAddress address = mock(InetAddress.class); + when(address.getCanonicalHostName()).thenReturn("worker2.cluster.mycompany.com"); + when(address.getHostName()).thenReturn("worker2.cluster.mycompany.com"); + when(address.getHostAddress()).thenReturn("127.0.0.1"); + + final InstanceConnectionInfo info = new InstanceConnectionInfo(address, 19871); + Assert.assertEquals("worker2", info.getHostname()); + } + catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); } @@ -127,14 +157,43 @@ public class InstanceConnectionInfoTest { @Test public void testGetHostname1() { try { - final InstanceConnectionInfo info1 = PowerMockito.spy(new InstanceConnectionInfo(InetAddress.getByName("127.0.0.1"), 19871)); - Whitebox.setInternalState(info1, "fqdnHostName", "worker10"); - Assert.assertEquals("worker10", info1.getHostname()); - } catch (Exception e) { + InetAddress address = mock(InetAddress.class); + when(address.getCanonicalHostName()).thenReturn("worker10"); + when(address.getHostName()).thenReturn("worker10"); + when(address.getHostAddress()).thenReturn("127.0.0.1"); + + InstanceConnectionInfo info = new InstanceConnectionInfo(address, 19871); + Assert.assertEquals("worker10", info.getHostname()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testGetHostname2() { + try { + final String addressString = "192.168.254.254"; + + // we mock the addresses to save the times of the reverse name lookups + InetAddress address = mock(InetAddress.class); + when(address.getCanonicalHostName()).thenReturn("192.168.254.254"); + when(address.getHostName()).thenReturn("192.168.254.254"); + when(address.getHostAddress()).thenReturn("192.168.254.254"); + when(address.getAddress()).thenReturn(new byte[] {(byte) 192, (byte) 168, (byte) 254, (byte) 254} ); + + InstanceConnectionInfo info = new InstanceConnectionInfo(address, 54152); + + assertNotNull(info.getFQDNHostname()); + assertTrue(info.getFQDNHostname().equals(addressString)); + + assertNotNull(info.getHostname()); + assertTrue(info.getHostname().equals(addressString)); + } + catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); } } - - }