rusackas commented on code in PR #40673:
URL: https://github.com/apache/superset/pull/40673#discussion_r3415592098


##########
superset/extensions/ssh.py:
##########
@@ -15,26 +15,61 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import base64
+import binascii
 import logging
+import socket
 from io import StringIO
 from typing import TYPE_CHECKING
 
+import paramiko
 import sshtunnel
 from flask import Flask
 from paramiko import RSAKey
+from paramiko.pkey import UnknownKeyType
 
-from superset.commands.database.ssh_tunnel.exceptions import 
SSHTunnelDatabasePortError
+from superset.commands.database.ssh_tunnel.exceptions import (
+    SSHTunnelDatabasePortError,
+    SSHTunnelHostKeyVerificationError,
+)
 from superset.databases.utils import make_url_safe
 from superset.utils.class_utils import load_class_from_name
 
 if TYPE_CHECKING:
     from superset.databases.ssh_tunnel.models import SSHTunnel
 
+logger = logging.getLogger(__name__)
+
+
+def _parse_authorized_key(authorized_key: str) -> paramiko.PKey:
+    """
+    Parse a host key in authorized-key form (``"<type> <base64>[ comment]"``) 
into a
+    :class:`paramiko.PKey`. The optional trailing comment field and surrounding
+    whitespace are ignored.
+
+    :raises ValueError: if the value is empty or cannot be parsed as a host 
key.
+    """
+    fields = authorized_key.strip().split()
+    if len(fields) < 2:
+        raise ValueError("Host key must be in 'ssh-<type> <base64>' form")
+    key_type, key_b64 = fields[0], fields[1]
+    try:
+        key_bytes = base64.b64decode(key_b64)

Review Comment:
   Good catch — switched to `base64.b64decode(key_b64, validate=True)` so a 
malformed payload raises instead of being silently coerced.



##########
superset/migrations/versions/2026-06-03_10-00_78a40c08b4be_add_server_host_key_to_ssh_tunnels.py:
##########
@@ -0,0 +1,48 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""add server_host_key to ssh_tunnels
+
+Adds a nullable ``server_host_key`` column to the ``ssh_tunnels`` table. It 
stores the
+expected SSH server host key in authorized-key form (e.g. "ssh-ed25519 
AAAA...") so
+operators can opt in to verifying the SSH server's host key before a tunnel is 
opened.
+This is a public key and is stored in plaintext (not encrypted). The column is
+nullable, so existing tunnels are unaffected.
+
+Revision ID: 78a40c08b4be
+Revises: b7c9d1e2f3a4
+Create Date: 2026-06-03 10:00:00.000000
+
+"""
+
+import sqlalchemy as sa
+
+from superset.migrations.shared.utils import add_columns, drop_columns
+
+# revision identifiers, used by Alembic.
+revision = "78a40c08b4be"
+down_revision = "b7c9d1e2f3a4"
+
+
+def upgrade() -> None:
+    add_columns(
+        "ssh_tunnels",
+        sa.Column("server_host_key", sa.Text(), nullable=True),
+    )

Review Comment:
   Added a docstring on `upgrade()`.



##########
superset/migrations/versions/2026-06-03_10-00_78a40c08b4be_add_server_host_key_to_ssh_tunnels.py:
##########
@@ -0,0 +1,48 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""add server_host_key to ssh_tunnels
+
+Adds a nullable ``server_host_key`` column to the ``ssh_tunnels`` table. It 
stores the
+expected SSH server host key in authorized-key form (e.g. "ssh-ed25519 
AAAA...") so
+operators can opt in to verifying the SSH server's host key before a tunnel is 
opened.
+This is a public key and is stored in plaintext (not encrypted). The column is
+nullable, so existing tunnels are unaffected.
+
+Revision ID: 78a40c08b4be
+Revises: b7c9d1e2f3a4
+Create Date: 2026-06-03 10:00:00.000000
+
+"""
+
+import sqlalchemy as sa
+
+from superset.migrations.shared.utils import add_columns, drop_columns
+
+# revision identifiers, used by Alembic.
+revision = "78a40c08b4be"
+down_revision = "b7c9d1e2f3a4"
+
+
+def upgrade() -> None:
+    add_columns(
+        "ssh_tunnels",
+        sa.Column("server_host_key", sa.Text(), nullable=True),
+    )
+
+
+def downgrade() -> None:
+    drop_columns("ssh_tunnels", "server_host_key")

Review Comment:
   Added a docstring on `downgrade()`.



##########
tests/unit_tests/extensions/ssh_test.py:
##########
@@ -34,3 +67,196 @@ def test_ssh_tunnel_timeout_setting() -> None:
     factory.init_app(app)
     assert sshtunnel.TUNNEL_TIMEOUT == 123.0
     assert sshtunnel.SSH_TIMEOUT == 321.0
+
+
+@patch("superset.extensions.ssh.socket.create_connection")
+@patch("superset.extensions.ssh.paramiko.Transport")
+def test_verify_host_key_match(
+    mock_transport_cls: Mock, mock_create_connection: Mock
+) -> None:
+    """The server presents the same key we expect: verification passes."""
+    server_key = paramiko.RSAKey.generate(2048)
+    manager = _make_manager(strict=False)
+    tunnel = _ssh_tunnel(_authorized_key(server_key))
+
+    transport = mock_transport_cls.return_value
+    transport.get_remote_server_key.return_value = server_key
+
+    result = manager._verify_host_key(tunnel)  # should not raise
+
+    # The TCP connect is bounded by an explicit timeout, and the resulting
+    # socket is handed to Transport.
+    mock_create_connection.assert_called_once_with(
+        ("ssh.example.com", 22), timeout=321.0
+    )
+    
mock_transport_cls.assert_called_once_with(mock_create_connection.return_value)
+    transport.start_client.assert_called_once()
+    transport.close.assert_called_once()
+    # The parsed expected key is returned so the caller can pin it on the 
tunnel.
+    assert result == server_key
+
+
+@patch("superset.extensions.ssh.socket.create_connection")
+@patch("superset.extensions.ssh.paramiko.Transport")
+def test_verify_host_key_mismatch_raises(
+    mock_transport_cls: Mock, mock_create_connection: Mock
+) -> None:
+    """The server presents a different key than expected: verification 
fails."""
+    expected_key = paramiko.RSAKey.generate(2048)
+    presented_key = paramiko.RSAKey.generate(2048)
+    manager = _make_manager(strict=False)
+    tunnel = _ssh_tunnel(_authorized_key(expected_key))
+
+    transport = mock_transport_cls.return_value
+    transport.get_remote_server_key.return_value = presented_key
+
+    with pytest.raises(SSHTunnelHostKeyVerificationError):
+        manager._verify_host_key(tunnel)
+
+    mock_create_connection.assert_called_once()
+    transport.close.assert_called_once()
+
+
+@patch("superset.extensions.ssh.socket.create_connection")
+def test_verify_host_key_connect_failure_raises(
+    mock_create_connection: Mock,
+) -> None:

Review Comment:
   Added a docstring.



##########
tests/unit_tests/extensions/ssh_test.py:
##########
@@ -34,3 +67,196 @@ def test_ssh_tunnel_timeout_setting() -> None:
     factory.init_app(app)
     assert sshtunnel.TUNNEL_TIMEOUT == 123.0
     assert sshtunnel.SSH_TIMEOUT == 321.0
+
+
+@patch("superset.extensions.ssh.socket.create_connection")
+@patch("superset.extensions.ssh.paramiko.Transport")
+def test_verify_host_key_match(
+    mock_transport_cls: Mock, mock_create_connection: Mock
+) -> None:
+    """The server presents the same key we expect: verification passes."""
+    server_key = paramiko.RSAKey.generate(2048)
+    manager = _make_manager(strict=False)
+    tunnel = _ssh_tunnel(_authorized_key(server_key))
+
+    transport = mock_transport_cls.return_value
+    transport.get_remote_server_key.return_value = server_key
+
+    result = manager._verify_host_key(tunnel)  # should not raise
+
+    # The TCP connect is bounded by an explicit timeout, and the resulting
+    # socket is handed to Transport.
+    mock_create_connection.assert_called_once_with(
+        ("ssh.example.com", 22), timeout=321.0
+    )
+    
mock_transport_cls.assert_called_once_with(mock_create_connection.return_value)
+    transport.start_client.assert_called_once()
+    transport.close.assert_called_once()
+    # The parsed expected key is returned so the caller can pin it on the 
tunnel.
+    assert result == server_key
+
+
+@patch("superset.extensions.ssh.socket.create_connection")
+@patch("superset.extensions.ssh.paramiko.Transport")
+def test_verify_host_key_mismatch_raises(
+    mock_transport_cls: Mock, mock_create_connection: Mock
+) -> None:
+    """The server presents a different key than expected: verification 
fails."""
+    expected_key = paramiko.RSAKey.generate(2048)
+    presented_key = paramiko.RSAKey.generate(2048)
+    manager = _make_manager(strict=False)
+    tunnel = _ssh_tunnel(_authorized_key(expected_key))
+
+    transport = mock_transport_cls.return_value
+    transport.get_remote_server_key.return_value = presented_key
+
+    with pytest.raises(SSHTunnelHostKeyVerificationError):
+        manager._verify_host_key(tunnel)
+
+    mock_create_connection.assert_called_once()
+    transport.close.assert_called_once()
+
+
+@patch("superset.extensions.ssh.socket.create_connection")
+def test_verify_host_key_connect_failure_raises(
+    mock_create_connection: Mock,
+) -> None:
+    # A bounded TCP connect failure surfaces as a host-key verification error.
+    manager = _make_manager(strict=False)
+    server_key = paramiko.RSAKey.generate(2048)
+    tunnel = _ssh_tunnel(_authorized_key(server_key))
+
+    mock_create_connection.side_effect = OSError("connection refused")
+
+    with pytest.raises(SSHTunnelHostKeyVerificationError):
+        manager._verify_host_key(tunnel)
+
+
+@patch("superset.extensions.ssh.paramiko.Transport")
+def test_verify_host_key_unset_non_strict_skips(mock_transport_cls: Mock) -> 
None:

Review Comment:
   Added a docstring.



##########
tests/unit_tests/extensions/ssh_test.py:
##########
@@ -34,3 +67,196 @@ def test_ssh_tunnel_timeout_setting() -> None:
     factory.init_app(app)
     assert sshtunnel.TUNNEL_TIMEOUT == 123.0
     assert sshtunnel.SSH_TIMEOUT == 321.0
+
+
+@patch("superset.extensions.ssh.socket.create_connection")
+@patch("superset.extensions.ssh.paramiko.Transport")
+def test_verify_host_key_match(
+    mock_transport_cls: Mock, mock_create_connection: Mock
+) -> None:
+    """The server presents the same key we expect: verification passes."""
+    server_key = paramiko.RSAKey.generate(2048)
+    manager = _make_manager(strict=False)
+    tunnel = _ssh_tunnel(_authorized_key(server_key))
+
+    transport = mock_transport_cls.return_value
+    transport.get_remote_server_key.return_value = server_key
+
+    result = manager._verify_host_key(tunnel)  # should not raise
+
+    # The TCP connect is bounded by an explicit timeout, and the resulting
+    # socket is handed to Transport.
+    mock_create_connection.assert_called_once_with(
+        ("ssh.example.com", 22), timeout=321.0
+    )
+    
mock_transport_cls.assert_called_once_with(mock_create_connection.return_value)
+    transport.start_client.assert_called_once()
+    transport.close.assert_called_once()
+    # The parsed expected key is returned so the caller can pin it on the 
tunnel.
+    assert result == server_key
+
+
+@patch("superset.extensions.ssh.socket.create_connection")
+@patch("superset.extensions.ssh.paramiko.Transport")
+def test_verify_host_key_mismatch_raises(
+    mock_transport_cls: Mock, mock_create_connection: Mock
+) -> None:
+    """The server presents a different key than expected: verification 
fails."""
+    expected_key = paramiko.RSAKey.generate(2048)
+    presented_key = paramiko.RSAKey.generate(2048)
+    manager = _make_manager(strict=False)
+    tunnel = _ssh_tunnel(_authorized_key(expected_key))
+
+    transport = mock_transport_cls.return_value
+    transport.get_remote_server_key.return_value = presented_key
+
+    with pytest.raises(SSHTunnelHostKeyVerificationError):
+        manager._verify_host_key(tunnel)
+
+    mock_create_connection.assert_called_once()
+    transport.close.assert_called_once()
+
+
+@patch("superset.extensions.ssh.socket.create_connection")
+def test_verify_host_key_connect_failure_raises(
+    mock_create_connection: Mock,
+) -> None:
+    # A bounded TCP connect failure surfaces as a host-key verification error.
+    manager = _make_manager(strict=False)
+    server_key = paramiko.RSAKey.generate(2048)
+    tunnel = _ssh_tunnel(_authorized_key(server_key))
+
+    mock_create_connection.side_effect = OSError("connection refused")
+
+    with pytest.raises(SSHTunnelHostKeyVerificationError):
+        manager._verify_host_key(tunnel)
+
+
+@patch("superset.extensions.ssh.paramiko.Transport")
+def test_verify_host_key_unset_non_strict_skips(mock_transport_cls: Mock) -> 
None:
+    # Back-compat: no expected key + strict checking off => no verification at 
all.
+    manager = _make_manager(strict=False)
+    tunnel = _ssh_tunnel(None)
+
+    assert manager._verify_host_key(tunnel) is None  # should not raise
+
+    mock_transport_cls.assert_not_called()
+
+
+@patch("superset.extensions.ssh.paramiko.Transport")
+def test_verify_host_key_unset_strict_raises(mock_transport_cls: Mock) -> None:

Review Comment:
   Added a docstring.



##########
tests/unit_tests/extensions/ssh_test.py:
##########
@@ -34,3 +67,196 @@ def test_ssh_tunnel_timeout_setting() -> None:
     factory.init_app(app)
     assert sshtunnel.TUNNEL_TIMEOUT == 123.0
     assert sshtunnel.SSH_TIMEOUT == 321.0
+
+
+@patch("superset.extensions.ssh.socket.create_connection")
+@patch("superset.extensions.ssh.paramiko.Transport")
+def test_verify_host_key_match(
+    mock_transport_cls: Mock, mock_create_connection: Mock
+) -> None:
+    """The server presents the same key we expect: verification passes."""
+    server_key = paramiko.RSAKey.generate(2048)
+    manager = _make_manager(strict=False)
+    tunnel = _ssh_tunnel(_authorized_key(server_key))
+
+    transport = mock_transport_cls.return_value
+    transport.get_remote_server_key.return_value = server_key
+
+    result = manager._verify_host_key(tunnel)  # should not raise
+
+    # The TCP connect is bounded by an explicit timeout, and the resulting
+    # socket is handed to Transport.
+    mock_create_connection.assert_called_once_with(
+        ("ssh.example.com", 22), timeout=321.0
+    )
+    
mock_transport_cls.assert_called_once_with(mock_create_connection.return_value)
+    transport.start_client.assert_called_once()
+    transport.close.assert_called_once()
+    # The parsed expected key is returned so the caller can pin it on the 
tunnel.
+    assert result == server_key
+
+
+@patch("superset.extensions.ssh.socket.create_connection")
+@patch("superset.extensions.ssh.paramiko.Transport")
+def test_verify_host_key_mismatch_raises(
+    mock_transport_cls: Mock, mock_create_connection: Mock
+) -> None:
+    """The server presents a different key than expected: verification 
fails."""
+    expected_key = paramiko.RSAKey.generate(2048)
+    presented_key = paramiko.RSAKey.generate(2048)
+    manager = _make_manager(strict=False)
+    tunnel = _ssh_tunnel(_authorized_key(expected_key))
+
+    transport = mock_transport_cls.return_value
+    transport.get_remote_server_key.return_value = presented_key
+
+    with pytest.raises(SSHTunnelHostKeyVerificationError):
+        manager._verify_host_key(tunnel)
+
+    mock_create_connection.assert_called_once()
+    transport.close.assert_called_once()
+
+
+@patch("superset.extensions.ssh.socket.create_connection")
+def test_verify_host_key_connect_failure_raises(
+    mock_create_connection: Mock,
+) -> None:
+    # A bounded TCP connect failure surfaces as a host-key verification error.
+    manager = _make_manager(strict=False)
+    server_key = paramiko.RSAKey.generate(2048)
+    tunnel = _ssh_tunnel(_authorized_key(server_key))
+
+    mock_create_connection.side_effect = OSError("connection refused")
+
+    with pytest.raises(SSHTunnelHostKeyVerificationError):
+        manager._verify_host_key(tunnel)
+
+
+@patch("superset.extensions.ssh.paramiko.Transport")
+def test_verify_host_key_unset_non_strict_skips(mock_transport_cls: Mock) -> 
None:
+    # Back-compat: no expected key + strict checking off => no verification at 
all.
+    manager = _make_manager(strict=False)
+    tunnel = _ssh_tunnel(None)
+
+    assert manager._verify_host_key(tunnel) is None  # should not raise
+
+    mock_transport_cls.assert_not_called()
+
+
+@patch("superset.extensions.ssh.paramiko.Transport")
+def test_verify_host_key_unset_strict_raises(mock_transport_cls: Mock) -> None:
+    # Fail-closed: no expected key + strict checking on => reject.
+    manager = _make_manager(strict=True)
+    tunnel = _ssh_tunnel(None)
+
+    with pytest.raises(SSHTunnelHostKeyVerificationError):
+        manager._verify_host_key(tunnel)
+
+    mock_transport_cls.assert_not_called()
+
+
+@patch("superset.extensions.ssh.socket.create_connection")
+@patch("superset.extensions.ssh.paramiko.Transport")
+def test_verify_host_key_match_ignores_comment_and_whitespace(
+    mock_transport_cls: Mock,
+    mock_create_connection: Mock,
+) -> None:
+    # The stored key may carry a trailing comment and extra whitespace.
+    server_key = paramiko.RSAKey.generate(2048)
+    manager = _make_manager(strict=False)
+    stored = f"  {_authorized_key(server_key)} user@host  "
+    tunnel = _ssh_tunnel(stored)
+
+    transport = mock_transport_cls.return_value
+    transport.get_remote_server_key.return_value = server_key
+
+    manager._verify_host_key(tunnel)  # should not raise
+
+    # Whitespace/comment stripping must not short-circuit verification: the
+    # bounded TCP connect and Transport handshake still run as in the plain
+    # match case.
+    mock_create_connection.assert_called_once_with(
+        ("ssh.example.com", 22), timeout=321.0
+    )
+    
mock_transport_cls.assert_called_once_with(mock_create_connection.return_value)
+    transport.start_client.assert_called_once()
+    transport.close.assert_called_once()
+
+
+def test_verify_host_key_invalid_expected_raises() -> None:
+    # A malformed expected key is rejected before any network connection.
+    manager = _make_manager(strict=False)
+    tunnel = _ssh_tunnel("not-a-valid-key")
+
+    with pytest.raises(SSHTunnelHostKeyVerificationError):
+        manager._verify_host_key(tunnel)
+
+
+def test_verify_host_key_unknown_key_type_raises() -> None:
+    """An unsupported key type is wrapped in the verification error, not 
leaked."""
+    manager = _make_manager(strict=False)
+    server_key = paramiko.RSAKey.generate(2048)
+    tunnel = _ssh_tunnel(f"ssh-bogus {server_key.get_base64()}")
+
+    with pytest.raises(SSHTunnelHostKeyVerificationError):
+        manager._verify_host_key(tunnel)
+
+
+@patch("superset.extensions.ssh.sshtunnel.open_tunnel")
+@patch("superset.extensions.ssh.socket.create_connection")
+@patch("superset.extensions.ssh.paramiko.Transport")
+def test_create_tunnel_pins_verified_host_key(
+    mock_transport_cls: Mock,
+    mock_create_connection: Mock,
+    mock_open_tunnel: Mock,
+) -> None:

Review Comment:
   Added a docstring.



##########
tests/unit_tests/extensions/ssh_test.py:
##########
@@ -34,3 +67,196 @@ def test_ssh_tunnel_timeout_setting() -> None:
     factory.init_app(app)
     assert sshtunnel.TUNNEL_TIMEOUT == 123.0
     assert sshtunnel.SSH_TIMEOUT == 321.0
+
+
+@patch("superset.extensions.ssh.socket.create_connection")
+@patch("superset.extensions.ssh.paramiko.Transport")
+def test_verify_host_key_match(
+    mock_transport_cls: Mock, mock_create_connection: Mock
+) -> None:
+    """The server presents the same key we expect: verification passes."""
+    server_key = paramiko.RSAKey.generate(2048)
+    manager = _make_manager(strict=False)
+    tunnel = _ssh_tunnel(_authorized_key(server_key))
+
+    transport = mock_transport_cls.return_value
+    transport.get_remote_server_key.return_value = server_key
+
+    result = manager._verify_host_key(tunnel)  # should not raise
+
+    # The TCP connect is bounded by an explicit timeout, and the resulting
+    # socket is handed to Transport.
+    mock_create_connection.assert_called_once_with(
+        ("ssh.example.com", 22), timeout=321.0
+    )
+    
mock_transport_cls.assert_called_once_with(mock_create_connection.return_value)
+    transport.start_client.assert_called_once()
+    transport.close.assert_called_once()
+    # The parsed expected key is returned so the caller can pin it on the 
tunnel.
+    assert result == server_key
+
+
+@patch("superset.extensions.ssh.socket.create_connection")
+@patch("superset.extensions.ssh.paramiko.Transport")
+def test_verify_host_key_mismatch_raises(
+    mock_transport_cls: Mock, mock_create_connection: Mock
+) -> None:
+    """The server presents a different key than expected: verification 
fails."""
+    expected_key = paramiko.RSAKey.generate(2048)
+    presented_key = paramiko.RSAKey.generate(2048)
+    manager = _make_manager(strict=False)
+    tunnel = _ssh_tunnel(_authorized_key(expected_key))
+
+    transport = mock_transport_cls.return_value
+    transport.get_remote_server_key.return_value = presented_key
+
+    with pytest.raises(SSHTunnelHostKeyVerificationError):
+        manager._verify_host_key(tunnel)
+
+    mock_create_connection.assert_called_once()
+    transport.close.assert_called_once()
+
+
+@patch("superset.extensions.ssh.socket.create_connection")
+def test_verify_host_key_connect_failure_raises(
+    mock_create_connection: Mock,
+) -> None:
+    # A bounded TCP connect failure surfaces as a host-key verification error.
+    manager = _make_manager(strict=False)
+    server_key = paramiko.RSAKey.generate(2048)
+    tunnel = _ssh_tunnel(_authorized_key(server_key))
+
+    mock_create_connection.side_effect = OSError("connection refused")
+
+    with pytest.raises(SSHTunnelHostKeyVerificationError):
+        manager._verify_host_key(tunnel)
+
+
+@patch("superset.extensions.ssh.paramiko.Transport")
+def test_verify_host_key_unset_non_strict_skips(mock_transport_cls: Mock) -> 
None:
+    # Back-compat: no expected key + strict checking off => no verification at 
all.
+    manager = _make_manager(strict=False)
+    tunnel = _ssh_tunnel(None)
+
+    assert manager._verify_host_key(tunnel) is None  # should not raise
+
+    mock_transport_cls.assert_not_called()
+
+
+@patch("superset.extensions.ssh.paramiko.Transport")
+def test_verify_host_key_unset_strict_raises(mock_transport_cls: Mock) -> None:
+    # Fail-closed: no expected key + strict checking on => reject.
+    manager = _make_manager(strict=True)
+    tunnel = _ssh_tunnel(None)
+
+    with pytest.raises(SSHTunnelHostKeyVerificationError):
+        manager._verify_host_key(tunnel)
+
+    mock_transport_cls.assert_not_called()
+
+
+@patch("superset.extensions.ssh.socket.create_connection")
+@patch("superset.extensions.ssh.paramiko.Transport")
+def test_verify_host_key_match_ignores_comment_and_whitespace(
+    mock_transport_cls: Mock,
+    mock_create_connection: Mock,
+) -> None:
+    # The stored key may carry a trailing comment and extra whitespace.
+    server_key = paramiko.RSAKey.generate(2048)
+    manager = _make_manager(strict=False)
+    stored = f"  {_authorized_key(server_key)} user@host  "
+    tunnel = _ssh_tunnel(stored)
+
+    transport = mock_transport_cls.return_value
+    transport.get_remote_server_key.return_value = server_key
+
+    manager._verify_host_key(tunnel)  # should not raise
+
+    # Whitespace/comment stripping must not short-circuit verification: the
+    # bounded TCP connect and Transport handshake still run as in the plain
+    # match case.
+    mock_create_connection.assert_called_once_with(
+        ("ssh.example.com", 22), timeout=321.0
+    )
+    
mock_transport_cls.assert_called_once_with(mock_create_connection.return_value)
+    transport.start_client.assert_called_once()
+    transport.close.assert_called_once()
+
+
+def test_verify_host_key_invalid_expected_raises() -> None:
+    # A malformed expected key is rejected before any network connection.
+    manager = _make_manager(strict=False)
+    tunnel = _ssh_tunnel("not-a-valid-key")
+
+    with pytest.raises(SSHTunnelHostKeyVerificationError):
+        manager._verify_host_key(tunnel)
+
+
+def test_verify_host_key_unknown_key_type_raises() -> None:
+    """An unsupported key type is wrapped in the verification error, not 
leaked."""
+    manager = _make_manager(strict=False)
+    server_key = paramiko.RSAKey.generate(2048)
+    tunnel = _ssh_tunnel(f"ssh-bogus {server_key.get_base64()}")
+
+    with pytest.raises(SSHTunnelHostKeyVerificationError):
+        manager._verify_host_key(tunnel)
+
+
+@patch("superset.extensions.ssh.sshtunnel.open_tunnel")
+@patch("superset.extensions.ssh.socket.create_connection")
+@patch("superset.extensions.ssh.paramiko.Transport")
+def test_create_tunnel_pins_verified_host_key(
+    mock_transport_cls: Mock,
+    mock_create_connection: Mock,
+    mock_open_tunnel: Mock,
+) -> None:
+    # When an expected host key is configured and verified, it is also pinned 
on the
+    # tunnel's own connection (``ssh_host_key``) so paramiko verifies the host 
that
+    # actually carries traffic on the same transport — closing the 
probe-vs-tunnel
+    # TOCTOU gap rather than trusting only the pre-flight probe.
+    server_key = paramiko.RSAKey.generate(2048)
+    manager = _make_manager(strict=False)
+    tunnel = _ssh_tunnel(_authorized_key(server_key))
+    tunnel.username = "user"
+    tunnel.password = None
+    tunnel.private_key = None
+
+    mock_transport_cls.return_value.get_remote_server_key.return_value = 
server_key
+
+    manager.create_tunnel(tunnel, "postgresql://u:p@db:5432/ex")
+
+    _, kwargs = mock_open_tunnel.call_args
+    assert kwargs["ssh_host_key"] == server_key
+
+
+@patch("superset.extensions.ssh.sshtunnel.open_tunnel")
+def test_create_tunnel_without_host_key_does_not_pin(mock_open_tunnel: Mock) 
-> None:
+    # No expected key configured (non-strict): nothing is pinned, preserving 
the
+    # prior behavior.
+    manager = _make_manager(strict=False)
+    tunnel = _ssh_tunnel(None)
+    tunnel.username = "user"
+    tunnel.password = None
+    tunnel.private_key = None
+
+    manager.create_tunnel(tunnel, "postgresql://u:p@db:5432/ex")
+
+    _, kwargs = mock_open_tunnel.call_args
+    assert "ssh_host_key" not in kwargs
+
+
+def test_ssh_tunnel_schema_round_trips_server_host_key() -> None:

Review Comment:
   Added a docstring.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to