Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/33#discussion_r15896126
  
    --- Diff: 
core/src/main/scala/org/apache/spark/network/ConnectionManager.scala ---
    @@ -489,10 +508,133 @@ private[spark] class ConnectionManager(port: Int, 
conf: SparkConf) extends Loggi
         /*handleMessage(connection, message)*/
       }
     
    -  private def handleMessage(connectionManagerId: ConnectionManagerId, 
message: Message) {
    +  private def handleClientAuthentication(
    +      waitingConn: SendingConnection,
    +      securityMsg: SecurityMessage, 
    +      connectionId : ConnectionId) {
    +    if (waitingConn.isSaslComplete()) {
    +      logDebug("Client sasl completed for id: "  + 
waitingConn.connectionId)
    +      connectionsAwaitingSasl -= waitingConn.connectionId
    +      waitingConn.getAuthenticated().synchronized {
    +        waitingConn.getAuthenticated().notifyAll();
    +      }
    +      return
    +    } else {
    +      var replyToken : Array[Byte] = null
    +      try {
    +        replyToken = 
waitingConn.sparkSaslClient.saslResponse(securityMsg.getToken);
    +        if (waitingConn.isSaslComplete()) {
    +          logDebug("Client sasl completed after evaluate for id: " + 
waitingConn.connectionId)
    +          connectionsAwaitingSasl -= waitingConn.connectionId
    +          waitingConn.getAuthenticated().synchronized {
    +            waitingConn.getAuthenticated().notifyAll()
    +          }
    +          return
    +        }
    +        var securityMsgResp = SecurityMessage.fromResponse(replyToken, 
    +          securityMsg.getConnectionId.toString())
    +        var message = securityMsgResp.toBufferMessage
    +        if (message == null) throw new Exception("Error creating security 
message")
    +        sendSecurityMessage(waitingConn.getRemoteConnectionManagerId(), 
message)
    +      } catch  {
    +        case e: Exception => {
    +          logError("Error handling sasl client authentication", e)
    +          waitingConn.close()
    +          throw new Exception("Error evaluating sasl response: " + e)
    +        }
    +      }
    +    }
    +  }
    +
    +  private def handleServerAuthentication(
    +      connection: Connection, 
    +      securityMsg: SecurityMessage,
    +      connectionId: ConnectionId) {
    +    if (!connection.isSaslComplete()) {
    +      logDebug("saslContext not established")
    +      var replyToken : Array[Byte] = null
    +      try {
    +        connection.synchronized {
    +          if (connection.sparkSaslServer == null) {
    +            logDebug("Creating sasl Server")
    +            connection.sparkSaslServer = new 
SparkSaslServer(securityManager)
    +          }
    +        }
    +        replyToken = 
connection.sparkSaslServer.response(securityMsg.getToken)
    +        if (connection.isSaslComplete()) {
    +          logDebug("Server sasl completed: " + connection.connectionId) 
    +        } else {
    +          logDebug("Server sasl not completed: " + connection.connectionId)
    +        }
    +        if (replyToken != null) {
    +          var securityMsgResp = SecurityMessage.fromResponse(replyToken,
    +            securityMsg.getConnectionId)
    +          var message = securityMsgResp.toBufferMessage
    +          if (message == null) throw new Exception("Error creating 
security Message")
    +          sendSecurityMessage(connection.getRemoteConnectionManagerId(), 
message)
    +        } 
    +      } catch {
    +        case e: Exception => {
    +          logError("Error in server auth negotiation: " + e)
    +          // It would probably be better to send an error message telling 
other side auth failed
    +          // but for now just close
    +          connection.close()
    +        }
    +      }
    +    } else {
    +      logDebug("connection already established for this connection id: " + 
connection.connectionId) 
    +    }
    +  }
    +
    +
    +  private def handleAuthentication(conn: Connection, bufferMessage: 
BufferMessage): Boolean = {
    +    if (bufferMessage.isSecurityNeg) {
    +      logDebug("This is security neg message")
    +
    +      // parse as SecurityMessage
    +      val securityMsg = SecurityMessage.fromBufferMessage(bufferMessage)
    +      val connectionId = 
ConnectionId.createConnectionIdFromString(securityMsg.getConnectionId)
    +
    +      connectionsAwaitingSasl.get(connectionId) match {
    --- End diff --
    
    @tgravescs 
    
    Is this prone to race conditions if both connection managers simultaneously 
try to initiate connections with each other?  Here's the scenario I'm worried 
about:
    
    - `ConnectionManager`s Alice and Bob, both newly-created, simultaneously 
attempt to send messages to each other.
    - Alice adds Bob to `connectionsAwaitingSasl` and sends a message to Bob to 
begin negotiating the connection.
    - Bob's first message to Alice, negotiating his sending connection, arrives 
at Alice before Bob receives and responds to Alice's request.
    - When Bob receives Alice's first message, he incorrectly assumes that it's 
a response to his send and chooses to handle it with 
`handleClientAuthentication`, even though he should have handled it with 
`handleServerAuthentication`.  
    
    Can this problematic scenario occur?  If so, it would be safer to add a 
field to the SecurityMessage to indicate whether it's from a SASL client or 
server.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to