asfgit closed pull request #7203: [FLINK-10149[mesos] Don't allocate extra 
mesos port for TM unless configured to do so
URL: https://github.com/apache/flink/pull/7203
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java
index 426a891e814..0c4e1f6bcba 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java
@@ -136,8 +136,9 @@
        /**
         * Config parameter to configure which configuration keys will 
dynamically get a port assigned through Mesos.
         */
-       public static final ConfigOption<String> PORT_ASSIGNMENTS = 
key("mesos.resourcemanager.tasks.port-assignments")
-               .defaultValue("")
+       public static final ConfigOption<String> PORT_ASSIGNMENTS =
+               key("mesos.resourcemanager.tasks.port-assignments")
+               .noDefaultValue()
                .withDescription(Description.builder()
                        .text("Comma-separated list of configuration keys which 
represent a configurable port. " +
                                "All port keys will dynamically get a port 
assigned through Mesos.")
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
index 84ec2229a2a..637442c899d 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
@@ -41,6 +41,7 @@
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashSet;
 import java.util.List;
@@ -64,12 +65,13 @@
 public class LaunchableMesosWorker implements LaunchableTask {
 
        protected static final Logger LOG = 
LoggerFactory.getLogger(LaunchableMesosWorker.class);
+
        /**
         * The set of configuration keys to be dynamically configured with a 
port allocated from Mesos.
         */
-       static final String[] TM_PORT_KEYS = {
+       static final Set<String> TM_PORT_KEYS = Collections.unmodifiableSet(new 
HashSet<>(Arrays.asList(
                "taskmanager.rpc.port",
-               "taskmanager.data.port"};
+               "taskmanager.data.port")));
 
        private final MesosArtifactResolver resolver;
        private final ContainerSpecification containerSpec;
@@ -342,16 +344,18 @@ public String toString() {
         * @return A deterministically ordered Set of port keys to expose from 
the TM container
         */
        static Set<String> extractPortKeys(Configuration config) {
-               final LinkedHashSet<String> tmPortKeys = new 
LinkedHashSet<>(Arrays.asList(TM_PORT_KEYS));
+               final LinkedHashSet<String> tmPortKeys = new 
LinkedHashSet<>(TM_PORT_KEYS);
 
                final String portKeys = config.getString(PORT_ASSIGNMENTS);
 
-               Arrays.stream(portKeys.split(","))
-                       .map(String::trim)
-                       .peek(key -> LOG.debug("Adding port key " + key + " to 
mesos request"))
-                       .forEach(tmPortKeys::add);
+               if (portKeys != null) {
+                       Arrays.stream(portKeys.split(","))
+                               .map(String::trim)
+                               .peek(key -> LOG.debug("Adding port key {} to 
mesos request"))
+                               .forEach(tmPortKeys::add);
+               }
 
-               return tmPortKeys;
+               return Collections.unmodifiableSet(tmPortKeys);
        }
 
        @Override
diff --git 
a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java
 
b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java
index 6784e427c1f..48a436cb995 100644
--- 
a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java
+++ 
b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java
@@ -23,11 +23,14 @@
 
 import org.junit.Test;
 
-import java.util.Iterator;
+import java.util.Arrays;
+import java.util.HashSet;
 import java.util.Set;
 
 import static 
org.apache.flink.mesos.configuration.MesosOptions.PORT_ASSIGNMENTS;
-import static org.junit.Assert.assertEquals;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
 
 /**
  * Test that mesos config are extracted correctly from the configuration.
@@ -36,20 +39,31 @@
 
        @Test
        public void canGetPortKeys() {
+               // Setup
+               Set<String> additionalPorts = new 
HashSet<>(Arrays.asList("someport.here", "anotherport"));
+
+               Configuration config = new Configuration();
+               config.setString(PORT_ASSIGNMENTS, String.join(",", 
additionalPorts));
+
+               // Act
+               Set<String> portKeys = 
LaunchableMesosWorker.extractPortKeys(config);
+
+               // Assert
+               Set<String> expectedPorts = new 
HashSet<>(LaunchableMesosWorker.TM_PORT_KEYS);
+               expectedPorts.addAll(additionalPorts);
+               assertThat(portKeys, is(equalTo(expectedPorts)));
+       }
+
+       @Test
+       public void canGetNoPortKeys() {
                // Setup
                Configuration config = new Configuration();
-               config.setString(PORT_ASSIGNMENTS, "someport.here,anotherport");
 
                // Act
                Set<String> portKeys = 
LaunchableMesosWorker.extractPortKeys(config);
 
                // Assert
-               assertEquals("Must get right number of port keys", 4, 
portKeys.size());
-               Iterator<String> iterator = portKeys.iterator();
-               assertEquals("port key must be correct", 
LaunchableMesosWorker.TM_PORT_KEYS[0], iterator.next());
-               assertEquals("port key must be correct", 
LaunchableMesosWorker.TM_PORT_KEYS[1], iterator.next());
-               assertEquals("port key must be correct", "someport.here", 
iterator.next());
-               assertEquals("port key must be correct", "anotherport", 
iterator.next());
+               assertThat(portKeys, 
is(equalTo(LaunchableMesosWorker.TM_PORT_KEYS)));
        }
 
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to