Github user shijinkui commented on a diff in the pull request:

    https://github.com/apache/bahir-flink/pull/7#discussion_r84816760
  
    --- Diff: 
flink-connector-netty/src/main/scala/org/apache/flink/streaming/connectors/netty/example/NettyUtil.scala
 ---
    @@ -0,0 +1,160 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.netty.example
    +
    +import java.io.{BufferedReader, InputStreamReader}
    +import java.net._
    +
    +import org.apache.commons.lang3.SystemUtils
    +import org.mortbay.util.MultiException
    +import org.slf4j.LoggerFactory
    +
    +import scala.collection.JavaConverters._
    +
    +/**
    + * Netty Utility class for start netty service and retry tcp port
    + */
    +object NettyUtil {
    +  private lazy val logger = LoggerFactory.getLogger(getClass)
    +
    +  /** find local inet addresses */
    +  def findLocalInetAddress(): InetAddress = {
    +
    +    val address = InetAddress.getLocalHost
    +    address.isLoopbackAddress match {
    +      case true =>
    +        // Address resolves to something like 127.0.1.1, which happens on 
Debian; try to find
    +        // a better address using the local network interfaces
    +        // getNetworkInterfaces returns ifs in reverse order compared to 
ifconfig output order
    +        // on unix-like system. On windows, it returns in index order.
    +        // It's more proper to pick ip address following system output 
order.
    +        val activeNetworkIFs = 
NetworkInterface.getNetworkInterfaces.asScala.toSeq
    +        val reOrderedNetworkIFs = SystemUtils.IS_OS_WINDOWS match {
    +          case true => activeNetworkIFs
    +          case false => activeNetworkIFs.reverse
    +        }
    +
    +        reOrderedNetworkIFs.find { ni: NetworkInterface =>
    +          val addr = ni.getInetAddresses.asScala.toSeq.filterNot { addr =>
    +            addr.isLinkLocalAddress || addr.isLoopbackAddress
    +          }
    +          addr.nonEmpty
    +        } match {
    +          case Some(ni) =>
    +            val addr = ni.getInetAddresses.asScala.toSeq.filterNot { inet 
=>
    +              inet.isLinkLocalAddress || inet.isLoopbackAddress
    +            }
    +            val address = 
addr.find(_.isInstanceOf[Inet4Address]).getOrElse(addr.head).getAddress
    +            // because of Inet6Address.toHostName may add interface at the 
end if it knows about it
    +            InetAddress.getByAddress(address)
    +          case None => address
    +        }
    +      case false => address
    +    }
    +  }
    +
    +  /** start service, if port is collision, retry 128 times */
    +  def startServiceOnPort[T](
    +    startPort: Int,
    +    startService: Int => T,
    +    maxRetries: Int = 128,
    +    serviceName: String = ""): T = {
    +
    +    if (startPort != 0 && (startPort < 1024 || startPort > 65536)) {
    +      throw new Exception("startPort should be between 1024 and 65535 
(inclusive), " +
    +        "or 0 for a random free port.")
    +    }
    +
    +    val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'"
    +    for (offset <- 0 to maxRetries) {
    +      // Do not increment port if startPort is 0, which is treated as a 
special port
    +      val tryPort = if (startPort == 0) {
    +        startPort
    +      } else {
    +        // If the new port wraps around, do not try a privilege port
    +        ((startPort + offset - 1024) % (65536 - 1024)) + 1024
    +      }
    +
    +      try {
    +        val result = startService(tryPort)
    +        logger.info(s"Successfully started service$serviceString, 
result:$result.")
    +        return result
    +      } catch {
    +        case e: Exception if isBindCollision(e) =>
    +          if (offset >= maxRetries) {
    +            val exceptionMessage = s"${e.getMessage}: 
Service$serviceString failed after " +
    +              s"$maxRetries retries! Consider explicitly setting the 
appropriate port for the " +
    +              s"service$serviceString (for example spark.ui.port for 
SparkUI) to an available " +
    +              "port or increasing spark.port.maxRetries."
    --- End diff --
    
    @rmetzger OK, added


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to