[FLINK-1705] [taskmanager] Fix hostname lookup.

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5308ac83
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5308ac83
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5308ac83

Branch: refs/heads/master
Commit: 5308ac8325a5b31627023bfd002a9a3757d15c1f
Parents: 2eea012
Author: Stephan Ewen <se...@apache.org>
Authored: Sat Mar 14 18:20:14 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Mar 15 16:37:11 2015 +0100

----------------------------------------------------------------------
 .../instance/InstanceConnectionInfo.java        | 75 +++++++++-------
 .../instance/InstanceConnectionInfoTest.java    | 95 ++++++++++++++++----
 2 files changed, 122 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5308ac83/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
index a1eec4d..ee79c23 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
@@ -31,7 +31,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * This class encapsulates all connection information necessary to connect to 
the instance's task manager.
+ * This class encapsulates the connection information of a TaskManager.
+ * It describes the host where the TaskManager operates and its server port
+ * for data exchange. This class also contains utilities to work with the
+ * TaskManager's host name, which is used to localize work assignments.
  */
 public class InstanceConnectionInfo implements IOReadableWritable, 
Comparable<InstanceConnectionInfo>, java.io.Serializable {
 
@@ -56,15 +59,9 @@ public class InstanceConnectionInfo implements 
IOReadableWritable, Comparable<In
        private String fqdnHostName;
        
        /**
-        * The hostname
+        * The hostname, derived from the fully qualified host name.
         */
        private String hostName;
-       
-       /**
-        * This flag indicates if the FQDN hostname cound not be resolved and 
is represented
-        * as an IP address (string).
-        */
-       private boolean fqdnHostNameIsIP = false;
 
 
        /**
@@ -90,14 +87,24 @@ public class InstanceConnectionInfo implements 
IOReadableWritable, Comparable<In
                // get FQDN hostname on this TaskManager.
                try {
                        this.fqdnHostName = 
this.inetAddress.getCanonicalHostName();
-               } catch (Throwable t) {
-                       LOG.warn("Unable to determine hostname for TaskManager. 
The performance might be degraded since HDFS input split assignment is not 
possible");
-                       if(LOG.isDebugEnabled()) {
-                               LOG.debug("getCanonicalHostName() Exception", 
t);
-                       }
-                       // could not determine host name, so take IP textual 
representation
-                       this.fqdnHostName = inetAddress.getHostAddress();
-                       this.fqdnHostNameIsIP = true;
+               }
+               catch (Throwable t) {
+                       LOG.warn("Unable to determine the canonical hostname. 
Input split assignment (such as " +
+                                       "for HDFS files) may be non-local when 
the canonical hostname is missing.");
+                       LOG.debug("getCanonicalHostName() Exception:", t);
+                       this.fqdnHostName = this.inetAddress.getHostAddress();
+               }
+
+               if 
(this.fqdnHostName.equals(this.inetAddress.getHostAddress())) {
+                       // this happens when the name lookup fails, either due 
to an exception,
+                       // or because no hostname can be found for the address
+                       // take IP textual representation
+                       this.hostName = this.fqdnHostName;
+                       LOG.warn("No hostname could be resolved for the IP 
address {}, using IP address as host name. "
+                                       + "Local input split assignment (such 
as for HDFS files) may be impacted.");
+               }
+               else {
+                       this.hostName = 
NetUtils.getHostnameFromFQDN(this.fqdnHostName);
                }
        }
 
@@ -126,27 +133,37 @@ public class InstanceConnectionInfo implements 
IOReadableWritable, Comparable<In
        }
 
        /**
-        * Returns the host name of the instance. If the host name could not be 
determined, the return value will be a
-        * textual representation of the instance's IP address.
+        * Returns the fully-qualified domain name the TaskManager. If the name 
could not be
+        * determined, the return value will be a textual representation of the 
TaskManager's IP address.
         * 
-        * @return the host name of the instance
+        * @return The fully-qualified domain name of the TaskManager.
         */
        public String getFQDNHostname() {
                return this.fqdnHostName;
        }
-       
+
+       /**
+        * Gets the hostname of the TaskManager. The hostname derives from the 
fully qualified
+        * domain name (FQDN, see {@link #getFQDNHostname()}):
+        * <ul>
+        *     <li>If the FQDN is the textual IP address, then the hostname is 
also the IP address</li>
+        *     <li>If the FQDN has only one segment (such as "localhost", or 
"host17"), then this is
+        *         used as the hostname.</li>
+        *     <li>If the FQDN has multiple segments (such as 
"worker3.subgroup.company.net"), then the first
+        *         segment (here "worker3") will be used as the hostname.</li>
+        * </ul>
+        *
+        * @return The hostname of the TaskManager.
+        */
        public String getHostname() {
-               if(hostName == null) {
-                       String fqdn = getFQDNHostname();
-                       if(this.fqdnHostNameIsIP) { // fqdn to hostname 
translation is pointless if FQDN is an ip address.
-                               hostName = fqdn;
-                       } else {
-                               hostName = NetUtils.getHostnameFromFQDN(fqdn);
-                       }
-               }
                return hostName;
        }
 
+       /**
+        * Gets the IP address where the TaskManager operates.
+        *
+        * @return The IP address.
+        */
        public String getInetAdress() {
                return this.inetAddress.toString();
        }
@@ -166,7 +183,6 @@ public class InstanceConnectionInfo implements 
IOReadableWritable, Comparable<In
                
                this.fqdnHostName = StringUtils.readNullableString(in);
                this.hostName = StringUtils.readNullableString(in);
-               this.fqdnHostNameIsIP = in.readBoolean();
 
                try {
                        this.inetAddress = InetAddress.getByAddress(address);
@@ -185,7 +201,6 @@ public class InstanceConnectionInfo implements 
IOReadableWritable, Comparable<In
                
                StringUtils.writeNullableString(fqdnHostName, out);
                StringUtils.writeNullableString(hostName, out);
-               out.writeBoolean(fqdnHostNameIsIP);
        }
 
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/5308ac83/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceConnectionInfoTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceConnectionInfoTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceConnectionInfoTest.java
index c072e59..2769183 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceConnectionInfoTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceConnectionInfoTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.instance;
 
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -27,19 +28,43 @@ import java.net.InetAddress;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.junit.Assert;
 import org.junit.Test;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.reflect.Whitebox;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the InstanceConnectionInfo, which identifies the location and 
connection
+ * information of a TaskManager.
+ */
 public class InstanceConnectionInfoTest {
 
        @Test
        public void testEqualsHashAndCompareTo() {
                try {
+                       // we mock the addresses to save the times of the 
reverse name lookups
+                       InetAddress address1 = mock(InetAddress.class);
+                       
when(address1.getCanonicalHostName()).thenReturn("localhost");
+                       when(address1.getHostName()).thenReturn("localhost");
+                       when(address1.getHostAddress()).thenReturn("127.0.0.1");
+                       when(address1.getAddress()).thenReturn(new byte[] {127, 
0, 0, 1} );
+
+                       InetAddress address2 = mock(InetAddress.class);
+                       
when(address2.getCanonicalHostName()).thenReturn("testhost1");
+                       when(address2.getHostName()).thenReturn("testhost1");
+                       when(address2.getHostAddress()).thenReturn("0.0.0.0");
+                       when(address2.getAddress()).thenReturn(new byte[] {0, 
0, 0, 0} );
+
+                       InetAddress address3 = mock(InetAddress.class);
+                       
when(address3.getCanonicalHostName()).thenReturn("testhost2");
+                       when(address3.getHostName()).thenReturn("testhost2");
+                       
when(address3.getHostAddress()).thenReturn("192.168.0.1");
+                       when(address3.getAddress()).thenReturn(new byte[] 
{(byte) 192, (byte) 168, 0, 1} );
+
                        // one == four != two != three
-                       InstanceConnectionInfo one = new 
InstanceConnectionInfo(InetAddress.getByName("127.0.0.1"), 19871);
-                       InstanceConnectionInfo two = new 
InstanceConnectionInfo(InetAddress.getByName("0.0.0.0"), 19871);
-                       InstanceConnectionInfo three = new 
InstanceConnectionInfo(InetAddress.getByName("192.168.0.1"), 10871);
-                       InstanceConnectionInfo four = new 
InstanceConnectionInfo(InetAddress.getByName("127.0.0.1"), 19871);
+                       InstanceConnectionInfo one = new 
InstanceConnectionInfo(address1, 19871);
+                       InstanceConnectionInfo two = new 
InstanceConnectionInfo(address2, 19871);
+                       InstanceConnectionInfo three = new 
InstanceConnectionInfo(address3, 10871);
+                       InstanceConnectionInfo four = new 
InstanceConnectionInfo(address1, 19871);
                        
                        assertTrue(one.equals(four));
                        assertTrue(!one.equals(two));
@@ -101,10 +126,10 @@ public class InstanceConnectionInfoTest {
        public void testGetFQDNHostname() {
                try {
                        InstanceConnectionInfo info1 = new 
InstanceConnectionInfo(InetAddress.getByName("127.0.0.1"), 19871);
-                       assertTrue(info1.getFQDNHostname() != null);
+                       assertNotNull(info1.getFQDNHostname());
                        
                        InstanceConnectionInfo info2 = new 
InstanceConnectionInfo(InetAddress.getByName("1.2.3.4"), 8888);
-                       assertTrue(info2.getFQDNHostname() != null);
+                       assertNotNull(info2.getFQDNHostname());
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -115,10 +140,15 @@ public class InstanceConnectionInfoTest {
        @Test
        public void testGetHostname0() {
                try {
-                       final InstanceConnectionInfo info1 = 
PowerMockito.spy(new InstanceConnectionInfo(InetAddress.getByName("127.0.0.1"), 
19871));
-                       Whitebox.setInternalState(info1, "fqdnHostName", 
"worker2.cluster.mycompany.com");
-                       Assert.assertEquals("worker2", info1.getHostname());
-               } catch (Exception e) {
+                       InetAddress address = mock(InetAddress.class);
+                       
when(address.getCanonicalHostName()).thenReturn("worker2.cluster.mycompany.com");
+                       
when(address.getHostName()).thenReturn("worker2.cluster.mycompany.com");
+                       when(address.getHostAddress()).thenReturn("127.0.0.1");
+
+                       final InstanceConnectionInfo info = new 
InstanceConnectionInfo(address, 19871);
+                       Assert.assertEquals("worker2", info.getHostname());
+               }
+               catch (Exception e) {
                        e.printStackTrace();
                        fail(e.getMessage());
                }
@@ -127,14 +157,43 @@ public class InstanceConnectionInfoTest {
        @Test
        public void testGetHostname1() {
                try {
-                       final InstanceConnectionInfo info1 = 
PowerMockito.spy(new InstanceConnectionInfo(InetAddress.getByName("127.0.0.1"), 
19871));
-                       Whitebox.setInternalState(info1, "fqdnHostName", 
"worker10");
-                       Assert.assertEquals("worker10", info1.getHostname());
-               } catch (Exception e) {
+                       InetAddress address = mock(InetAddress.class);
+                       
when(address.getCanonicalHostName()).thenReturn("worker10");
+                       when(address.getHostName()).thenReturn("worker10");
+                       when(address.getHostAddress()).thenReturn("127.0.0.1");
+
+                       InstanceConnectionInfo info = new 
InstanceConnectionInfo(address, 19871);
+                       Assert.assertEquals("worker10", info.getHostname());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void testGetHostname2() {
+               try {
+                       final String addressString = "192.168.254.254";
+
+                       // we mock the addresses to save the times of the 
reverse name lookups
+                       InetAddress address = mock(InetAddress.class);
+                       
when(address.getCanonicalHostName()).thenReturn("192.168.254.254");
+                       
when(address.getHostName()).thenReturn("192.168.254.254");
+                       
when(address.getHostAddress()).thenReturn("192.168.254.254");
+                       when(address.getAddress()).thenReturn(new byte[] 
{(byte) 192, (byte) 168, (byte) 254, (byte) 254} );
+
+                       InstanceConnectionInfo info = new 
InstanceConnectionInfo(address, 54152);
+
+                       assertNotNull(info.getFQDNHostname());
+                       
assertTrue(info.getFQDNHostname().equals(addressString));
+
+                       assertNotNull(info.getHostname());
+                       assertTrue(info.getHostname().equals(addressString));
+               }
+               catch (Exception e) {
                        e.printStackTrace();
                        fail(e.getMessage());
                }
        }
-       
-       
 }

Reply via email to