This is an automated email from the ASF dual-hosted git repository.

aonishuk pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 54ba891  AMBARI-23222. Ambari-agent fails to connect to server with 
two_way_auth enabled (aonishuk)
54ba891 is described below

commit 54ba89133f61b8ad013d96f4f8d881844229abd9
Author: Andrew Onishuk <aonis...@hortonworks.com>
AuthorDate: Tue Mar 13 20:56:07 2018 +0200

    AMBARI-23222. Ambari-agent fails to connect to server with two_way_auth 
enabled (aonishuk)
---
 .../main/python/ambari_agent/HeartbeatThread.py    |  3 +-
 .../src/main/python/ambari_agent/security.py       | 94 ++++++++--------------
 .../src/test/python/ambari_agent/TestSecurity.py   | 53 ------------
 .../main/python/ambari_stomp/adapter/websocket.py  |  8 +-
 4 files changed, 40 insertions(+), 118 deletions(-)

diff --git a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py 
b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
index d559fac..2a4ae56 100644
--- a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
+++ b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
@@ -213,7 +213,8 @@ class HeartbeatThread(threading.Thread):
     Create a stomp connection
     """
     connection_url = 
'wss://{0}:{1}/agent/stomp/v1'.format(self.config.server_hostname, 
self.config.secured_url_port)
-    self.connection = security.establish_connection(connection_url)
+    connection_helper = 
security.VerifiedHTTPSConnection(self.config.server_hostname, connection_url, 
self.config)
+    self.connection = connection_helper.connect()
 
   def add_listeners(self):
     """
diff --git a/ambari-agent/src/main/python/ambari_agent/security.py 
b/ambari-agent/src/main/python/ambari_agent/security.py
index 5fb21b0..215608e 100644
--- a/ambari-agent/src/main/python/ambari_agent/security.py
+++ b/ambari-agent/src/main/python/ambari_agent/security.py
@@ -42,12 +42,12 @@ GEN_AGENT_KEY = 'openssl req -new -newkey rsa:1024 -nodes 
-keyout "%(keysdir)s'
 KEY_FILENAME = '%(hostname)s.key'
 
 
-class VerifiedHTTPSConnection(httplib.HTTPSConnection):
+class VerifiedHTTPSConnection:
   """ Connecting using ssl wrapped sockets """
-  def __init__(self, host, port=None, config=None):
-    httplib.HTTPSConnection.__init__(self, host, port=port)
+  def __init__(self, host, connection_url, config):
     self.two_way_ssl_required = False
     self.host = host
+    self.connection_url = connection_url
     self.config = config
 
   def connect(self):
@@ -57,11 +57,11 @@ class VerifiedHTTPSConnection(httplib.HTTPSConnection):
       logger.info(
         'Server require two-way SSL authentication. Use it instead of 
one-way...')
 
+    logging.info("Connecting to {0}".format(self.connection_url))
+
+
     if not self.two_way_ssl_required:
-      sock = self.create_connection()
-      self.sock = ssl.wrap_socket(sock, cert_reqs=ssl.CERT_NONE)
-      logger.info('SSL connection established. Two-way SSL authentication is '
-                  'turned off on the server.')
+      conn = AmbariStompConnection(self.connection_url)
     else:
       self.certMan = CertificateManager(self.config, self.host)
       self.certMan.initSecurity()
@@ -69,67 +69,41 @@ class VerifiedHTTPSConnection(httplib.HTTPSConnection):
       agent_crt = self.certMan.getAgentCrtName()
       server_crt = self.certMan.getSrvrCrtName()
 
-      sock = self.create_connection()
+      ssl_options = {
+        'keyfile': agent_key,
+        'certfile': agent_crt,
+        'cert_reqs': ssl.CERT_REQUIRED,
+        'ca_certs': server_crt
+      }
 
-      try:
-        self.sock = ssl.wrap_socket(sock,
-                                    keyfile=agent_key,
-                                    certfile=agent_crt,
-                                    cert_reqs=ssl.CERT_REQUIRED,
-                                    ca_certs=server_crt)
-        logger.info('SSL connection established. Two-way SSL authentication '
-                    'completed successfully.')
-      except ssl.SSLError as err:
-        logger.error('Two-way SSL authentication failed. Ensure that '
-                     'server and agent certificates were signed by the same CA 
'
-                     'and restart the agent. '
-                     '\nIn order to receive a new agent certificate, remove '
-                     'existing certificate file from keys directory. As a '
-                     'workaround you can turn off two-way SSL authentication 
in '
-                     'server configuration(ambari.properties) '
-                     '\nExiting..')
-        raise err
-
-  def create_connection(self):
-    if self.sock:
-      self.sock.close()
-    logger.info("SSL Connect being called.. connecting to the server")
-    sock = socket.create_connection((self.host, self.port), 60)
-    sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
-    if self._tunnel_host:
-      self.sock = sock
-      self._tunnel()
-
-    return sock
-
-def establish_connection(connection_url):
-  """
-  Create a stomp connection
-  """
-  logging.info("Connecting to {0}".format(connection_url))
-
-  conn = AmbariStompConnection(connection_url)
-  try:
-    conn.start()
-    conn.connect(wait=True)
-  except Exception as ex:
-    try:
-      conn.disconnect()
-    except:
-      logger.exception("Exception during conn.disconnect()")
+      conn = AmbariStompConnection(self.connection_url, 
ssl_options=ssl_options)
 
-    if isinstance(ex, socket_error):
-      logger.warn("Could not connect to {0}".format(connection_url))
+    self.establish_connection(conn)
+    return conn
+
+  def establish_connection(self, conn):
+    """
+    Create a stomp connection
+    """
+    try:
+      conn.start()
+      conn.connect(wait=True)
+    except Exception as ex:
+      try:
+        conn.disconnect()
+      except:
+        logger.exception("Exception during conn.disconnect()")
 
-    raise
+      if isinstance(ex, socket_error):
+        logger.warn("Could not connect to {0}. 
{1}".format(self.connection_url, str(ex)))
 
-  return conn
+      raise
 
 class AmbariStompConnection(WsConnection):
-  def __init__(self, url):
+  def __init__(self, *args, **kwargs):
     self.lock = threading.RLock()
     self.correlation_id = -1
-    WsConnection.__init__(self, url)
+    WsConnection.__init__(self, *args, **kwargs)
 
   def send(self, destination, message, content_type=None, headers=None, 
**keyword_headers):
     with self.lock:
diff --git a/ambari-agent/src/test/python/ambari_agent/TestSecurity.py 
b/ambari-agent/src/test/python/ambari_agent/TestSecurity.py
index cd2a6c9..3387895 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestSecurity.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestSecurity.py
@@ -59,59 +59,6 @@ class TestSecurity(unittest.TestCase):
   def tearDown(self):
     # enable stdout
     sys.stdout = sys.__stdout__
-
-
-  ### VerifiedHTTPSConnection ###
-
-  @patch.object(security.CertificateManager, "initSecurity")
-  @patch("socket.create_connection")
-  @patch("ssl.wrap_socket")
-  def test_VerifiedHTTPSConnection_connect(self, wrap_socket_mock,
-                                           create_connection_mock,
-                                            init_security_mock):
-    init_security_mock.return_value = None
-    self.config.set('security', 'keysdir', '/dummy-keysdir')
-    connection = security.VerifiedHTTPSConnection("example.com",
-      self.config.get('server', 'secured_url_port'), self.config)
-    connection._tunnel_host = False
-    connection.sock = None
-    connection.connect()
-    self.assertTrue(wrap_socket_mock.called)
-
-  ### VerifiedHTTPSConnection with no certificates creation
-  @patch.object(security.CertificateManager, "initSecurity")
-  @patch("socket.create_connection")
-  @patch("ssl.wrap_socket")
-  def test_Verified_HTTPSConnection_non_secure_connect(self, wrap_socket_mock,
-                                                    create_connection_mock,
-                                                    init_security_mock):
-    connection = security.VerifiedHTTPSConnection("example.com",
-      self.config.get('server', 'secured_url_port'), self.config)
-    connection._tunnel_host = False
-    connection.sock = None
-    connection.connect()
-    self.assertFalse(init_security_mock.called)
-
-  ### VerifiedHTTPSConnection with two-way SSL authentication enabled
-  @patch.object(security.CertificateManager, "initSecurity")
-  @patch("socket.create_connection")
-  @patch("ssl.wrap_socket")
-  def test_Verified_HTTPSConnection_two_way_ssl_connect(self, wrap_socket_mock,
-                                                    create_connection_mock,
-                                                    init_security_mock):
-    wrap_socket_mock.side_effect=ssl.SSLError()
-    connection = security.VerifiedHTTPSConnection("example.com",
-      self.config.get('server', 'secured_url_port'), self.config)
-    self.config.isTwoWaySSLConnection = MagicMock(return_value=True)
-
-    connection._tunnel_host = False
-    connection.sock = None
-    try:
-      connection.connect()
-    except ssl.SSLError:
-      pass
-    self.assertTrue(init_security_mock.called)
-
   ### CachedHTTPSConnection ###
 
   @patch.object(security.VerifiedHTTPSConnection, "connect")
diff --git a/ambari-common/src/main/python/ambari_stomp/adapter/websocket.py 
b/ambari-common/src/main/python/ambari_stomp/adapter/websocket.py
index 6cf19db..ad61866 100644
--- a/ambari-common/src/main/python/ambari_stomp/adapter/websocket.py
+++ b/ambari-common/src/main/python/ambari_stomp/adapter/websocket.py
@@ -64,11 +64,11 @@ class QueuedWebSocketClient(WebSocketClient):
     self.messages.put(StopIteration)
 
 class WsTransport(Transport):
-  def __init__(self, url):
+  def __init__(self, url, ssl_options=None):
     Transport.__init__(self, (0, 0), False, False, 0.0, 0.0, 0.0, 0.0, 0, 
False, None, None, None, None, False,
     DEFAULT_SSL_VERSION, None, None, None)
     self.current_host_and_port = (0, 0) # mocking
-    self.ws = QueuedWebSocketClient(url, protocols=['http-only', 'chat'])
+    self.ws = QueuedWebSocketClient(url, protocols=['http-only', 'chat'], 
ssl_options=ssl_options)
     self.ws.daemon = False
 
   def wait_for_connection(self, timeout=DEFAULT_CONNECTION_TIMEOUT):
@@ -124,8 +124,8 @@ class WsTransport(Transport):
       logger.exception("Exception during Transport.stop(self)")
 
 class WsConnection(BaseConnection, Protocol12):
-  def __init__(self, url):
-    self.transport = WsTransport(url)
+  def __init__(self, url, ssl_options=None):
+    self.transport = WsTransport(url, ssl_options=ssl_options)
     self.transport.set_listener('ws-listener', self)
     self.transactions = {}
     Protocol12.__init__(self, self.transport, (0, 0))

-- 
To stop receiving notification emails like this one, please contact
aonis...@apache.org.

Reply via email to