[ 
https://issues.apache.org/jira/browse/HADOOP-18024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Masatake Iwasaki resolved HADOOP-18024.
---------------------------------------
    Fix Version/s: 3.4.0
     Hadoop Flags: Reviewed
       Resolution: Fixed

> SocketChannel is not closed when IOException happens in 
> Server$Listener.doAccept
> --------------------------------------------------------------------------------
>
>                 Key: HADOOP-18024
>                 URL: https://issues.apache.org/jira/browse/HADOOP-18024
>             Project: Hadoop Common
>          Issue Type: Bug
>          Components: ipc
>    Affects Versions: 3.2.2
>            Reporter: Haoze Wu
>            Assignee: Haoze Wu
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 3.4.0
>
>          Time Spent: 3.5h
>  Remaining Estimate: 0h
>
> This is a follow-up of HADOOP-17552.
> When the symptom described in HADOOP-17552 happens, the client may time out 
> in 2min, according to the default RPC timeout configuration specified in 
> HADOOP-17552. Before this timeout, the client just waits, and does not know 
> this issue happens.
> However, we recently found that actually the client doesn’t need to waste 
> this 2min, and the server’s availability can be also improved. If the 
> IOException happens in line 1402 or 1403 or 1404, we can just close this 
> problematic `SocketChannel` and continue to accept new socket connections. 
> The client side can also be aware of the close socket immediately, instead of 
> waiting 2min.
> The old implementation:
> {code:java}
> //hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
>    public void run() {
>       while (running) {
>         // ...
>         try {
>           // ...
>           while (iter.hasNext()) {
>             // ...
>             try {
>               if (key.isValid()) {
>                 if (key.isAcceptable())
>                   doAccept(key);                              // line 1348
>               }
>             } catch (IOException e) {                         // line 1350
>             }
>             // ...
>           }
>         } catch (OutOfMemoryError e) {
>           // ...
>         } catch (Exception e) {
>           // ...
>         }
>       }
>     } {code}
> {code:java}
> //hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
>     void doAccept(SelectionKey key) throws InterruptedException, IOException, 
>         OutOfMemoryError {
>       ServerSocketChannel server = (ServerSocketChannel) key.channel();
>       SocketChannel channel;
>       while ((channel = server.accept()) != null) {           // line 1400
>         channel.configureBlocking(false);                     // line 1402
>         channel.socket().setTcpNoDelay(tcpNoDelay);           // line 1403
>         channel.socket().setKeepAlive(true);                  // line 1404
>         Reader reader = getReader();
>         Connection c = connectionManager.register(channel,
>             this.listenPort, this.isOnAuxiliaryPort);
>         // If the connectionManager can't take it, close the connection.
>         if (c == null) {
>           if (channel.isOpen()) {
>             IOUtils.cleanup(null, channel);
>           }
>           connectionManager.droppedConnections.getAndIncrement();
>           continue;
>         }
>         key.attach(c);  // so closeCurrentConnection can get the object
>         reader.addConnection(c);
>       }
>     } {code}
>  
> We propose that the following implementation is better:
> {code:java}
>     void doAccept(SelectionKey key) throws InterruptedException, IOException, 
>         OutOfMemoryError {
>       ServerSocketChannel server = (ServerSocketChannel) key.channel();
>       SocketChannel channel;
>       while ((channel = server.accept()) != null) {           // line 1400
>         try {
>           channel.configureBlocking(false);                   // line 1402
>           channel.socket().setTcpNoDelay(tcpNoDelay);         // line 1403
>           channel.socket().setKeepAlive(true);                // line 1404
>         } catch (IOException e) {
>           LOG.warn(...);
>           try {
>             channel.socket().close();
>             channel.close();
>           } catch (IOException ignored) { }
>           continue;
>         }
>         // ...
>       }
>     }{code}
> The advantages include:
>  # {*}In the old implementation{*}, the `ServerSocketChannel` was abandoned 
> due to the single exception in this single `SocketChannel`, because the 
> exception handler is in line 1350. {*}In the new implementation{*}, we use a 
> try-catch to handle the exception in line 1402 or 1403 or 1404, then the 
> `ServerSocketChannel` can continue to accept new connections, and don’t need 
> to go back to the line 1348 in the next while loop in the run method.
>  # {*}In the old implementation{*}, the client (another endpoint of this 
> `SocketChannel`) is not aware of this issue, because the `SocketChannel` is 
> accepted and not closed. {*}In the new implementation{*}, we close the 
> `SocketChannel` when the IOException happens, then the client will 
> immediately get EOF from the socket. Then the client can choose to retry or 
> throw an exception, by the client’s discretion.
>  
> We have confirmed that this patch works as expected, in our local machine.
>  
> This code pattern was adopted by other communities. For example, in Kafka 
> [https://github.com/apache/kafka/blob/23e9818e625976c22fe6d4297a5ab76b01f92ef6/core/src/main/scala/kafka/network/SocketServer.scala#L714-L740]:
> {code:java}
>    /**
>    * Accept a new connection
>    */
>   private def accept(key: SelectionKey): Option[SocketChannel] = {
>     val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
>     val socketChannel = serverSocketChannel.accept()
>     try {
>       connectionQuotas.inc(endPoint.listenerName, 
> socketChannel.socket.getInetAddress, blockedPercentMeter)
>       configureAcceptedSocketChannel(socketChannel)
>       Some(socketChannel)
>     } catch {
>       case e: TooManyConnectionsException =>
>         info(...)
>         close(endPoint.listenerName, socketChannel)
>         None
>       case e: ConnectionThrottledException =>
>         // ...
>         None
>       case e: IOException =>
>         error(...)
>         close(endPoint.listenerName, socketChannel)
>         None
>     }
>   }
>   /**
>    * Close `channel` and decrement the connection count.
>    */
>   def close(listenerName: ListenerName, channel: SocketChannel): Unit = {
>     if (channel != null) {
>       // ...
>       closeSocket(channel)
>     }
>   }
>   protected def closeSocket(channel: SocketChannel): Unit = {
>     CoreUtils.swallow(channel.socket().close(), this, Level.ERROR)
>     CoreUtils.swallow(channel.close(), this, Level.ERROR)
>   }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: common-dev-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-dev-h...@hadoop.apache.org

Reply via email to