https://github.com/python/cpython/commit/45a24f54af4a65c14cc15fc13d3258726e2fe73b
commit: 45a24f54af4a65c14cc15fc13d3258726e2fe73b
branch: main
author: Fredrik Ahlberg <fred...@z80.se>
committer: encukou <encu...@gmail.com>
date: 2025-02-27T12:51:47Z
summary:

gh-129288: Add optional l2_cid and l2_bdaddr_type in BTPROTO_L2CAP socket 
address tuple (#129293)

Add two optional, traling elements in the AF_BLUETOOTH socket address tuple:

- l2_cid, to allow e.g raw LE ATT connections
- l2_bdaddr_type. To be able to connect L2CAP sockets to Bluetooth LE devices,
  the l2_bdaddr_type must be set to BDADDR_LE_PUBLIC or BDADDR_LE_RANDOM.

files:
A Misc/NEWS.d/next/Library/2025-01-26-15-35-53.gh-issue-129288.wB3uxU.rst
M Doc/library/socket.rst
M Lib/test/test_socket.py
M Modules/socketmodule.c

diff --git a/Doc/library/socket.rst b/Doc/library/socket.rst
index b936a502ca886e..67f3074e63c3c6 100644
--- a/Doc/library/socket.rst
+++ b/Doc/library/socket.rst
@@ -137,8 +137,19 @@ created.  Socket addresses are represented as follows:
 - :const:`AF_BLUETOOTH` supports the following protocols and address
   formats:
 
-  - :const:`BTPROTO_L2CAP` accepts ``(bdaddr, psm)`` where ``bdaddr`` is
-    the Bluetooth address as a string and ``psm`` is an integer.
+  - :const:`BTPROTO_L2CAP` accepts a tuple
+    ``(bdaddr, psm[, cid[, bdaddr_type]])`` where:
+
+    - ``bdaddr`` is a string specifying the Bluetooth address.
+    - ``psm`` is an integer specifying the Protocol/Service Multiplexer.
+    - ``cid`` is an optional integer specifying the Channel Identifier.
+      If not given, defaults to zero.
+    - ``bdaddr_type`` is an optional integer specifying the address type;
+      one of :const:`BDADDR_BREDR` (default), :const:`BDADDR_LE_PUBLIC`,
+      :const:`BDADDR_LE_RANDOM`.
+
+    .. versionchanged:: next
+      Added ``cid`` and ``bdaddr_type`` fields.
 
   - :const:`BTPROTO_RFCOMM` accepts ``(bdaddr, channel)`` where ``bdaddr``
     is the Bluetooth address as a string and ``channel`` is an integer.
@@ -626,6 +637,14 @@ Constants
    This constant contains a boolean value which indicates if IPv6 is supported 
on
    this platform.
 
+.. data:: AF_BLUETOOTH
+          BTPROTO_L2CAP
+          BTPROTO_RFCOMM
+          BTPROTO_HCI
+          BTPROTO_SCO
+
+   Integer constants for use with Bluetooth addresses.
+
 .. data:: BDADDR_ANY
           BDADDR_LOCAL
 
@@ -634,6 +653,15 @@ Constants
    any address when specifying the binding socket with
    :const:`BTPROTO_RFCOMM`.
 
+.. data:: BDADDR_BREDR
+          BDADDR_LE_PUBLIC
+          BDADDR_LE_RANDOM
+
+   These constants describe the Bluetooth address type when binding or
+   connecting a :const:`BTPROTO_L2CAP` socket.
+
+    .. versionadded:: next
+
 .. data:: HCI_FILTER
           HCI_TIME_STAMP
           HCI_DATA_DIR
diff --git a/Lib/test/test_socket.py b/Lib/test/test_socket.py
index b77fa3cb21512a..fc6e8593ae30bb 100644
--- a/Lib/test/test_socket.py
+++ b/Lib/test/test_socket.py
@@ -177,6 +177,17 @@ def _have_socket_bluetooth():
     return True
 
 
+def _have_socket_bluetooth_l2cap():
+    """Check whether BTPROTO_L2CAP sockets are supported on this host."""
+    try:
+        s = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, 
socket.BTPROTO_L2CAP)
+    except (AttributeError, OSError):
+        return False
+    else:
+        s.close()
+    return True
+
+
 def _have_socket_hyperv():
     """Check whether AF_HYPERV sockets are supported on this host."""
     try:
@@ -219,6 +230,8 @@ def socket_setdefaulttimeout(timeout):
 
 HAVE_SOCKET_BLUETOOTH = _have_socket_bluetooth()
 
+HAVE_SOCKET_BLUETOOTH_L2CAP = _have_socket_bluetooth_l2cap()
+
 HAVE_SOCKET_HYPERV = _have_socket_hyperv()
 
 # Size in bytes of the int type
@@ -2608,6 +2621,33 @@ def testCreateScoSocket(self):
         with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, 
socket.BTPROTO_SCO) as s:
             pass
 
+    @unittest.skipUnless(HAVE_SOCKET_BLUETOOTH_L2CAP, 'Bluetooth L2CAP sockets 
required for this test')
+    def testBindLeAttL2capSocket(self):
+        with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, 
socket.BTPROTO_L2CAP) as f:
+            # ATT is the only CID allowed in userspace by the Linux kernel
+            CID_ATT = 4
+            f.bind((socket.BDADDR_ANY, 0, CID_ATT, socket.BDADDR_LE_PUBLIC))
+            addr = f.getsockname()
+            self.assertEqual(addr, (socket.BDADDR_ANY, 0, CID_ATT, 
socket.BDADDR_LE_PUBLIC))
+
+    @unittest.skipUnless(HAVE_SOCKET_BLUETOOTH_L2CAP, 'Bluetooth L2CAP sockets 
required for this test')
+    def testBindLePsmL2capSocket(self):
+        with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, 
socket.BTPROTO_L2CAP) as f:
+            # First user PSM in LE L2CAP
+            psm = 0x80
+            f.bind((socket.BDADDR_ANY, psm, 0, socket.BDADDR_LE_RANDOM))
+            addr = f.getsockname()
+            self.assertEqual(addr, (socket.BDADDR_ANY, psm, 0, 
socket.BDADDR_LE_RANDOM))
+
+    @unittest.skipUnless(HAVE_SOCKET_BLUETOOTH_L2CAP, 'Bluetooth L2CAP sockets 
required for this test')
+    def testBindBrEdrL2capSocket(self):
+        with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, 
socket.BTPROTO_L2CAP) as f:
+            # First user PSM in BR/EDR L2CAP
+            psm = 0x1001
+            f.bind((socket.BDADDR_ANY, psm))
+            addr = f.getsockname()
+            self.assertEqual(addr, (socket.BDADDR_ANY, psm))
+
 
 @unittest.skipUnless(HAVE_SOCKET_HYPERV,
                      'Hyper-V sockets required for this test.')
diff --git 
a/Misc/NEWS.d/next/Library/2025-01-26-15-35-53.gh-issue-129288.wB3uxU.rst 
b/Misc/NEWS.d/next/Library/2025-01-26-15-35-53.gh-issue-129288.wB3uxU.rst
new file mode 100644
index 00000000000000..e23cb735a5b046
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2025-01-26-15-35-53.gh-issue-129288.wB3uxU.rst
@@ -0,0 +1 @@
+Add optional ``l2_cid`` and ``l2_bdaddr_type`` fields to :mod:`socket` 
``BTPROTO_L2CAP`` sockaddr tuple.
diff --git a/Modules/socketmodule.c b/Modules/socketmodule.c
index 4b6d2dd1c5fc7b..916ad35623e94d 100644
--- a/Modules/socketmodule.c
+++ b/Modules/socketmodule.c
@@ -1495,9 +1495,20 @@ makesockaddr(SOCKET_T sockfd, struct sockaddr *addr, 
size_t addrlen, int proto)
             PyObject *addrobj = makebdaddr(&_BT_L2_MEMB(a, bdaddr));
             PyObject *ret = NULL;
             if (addrobj) {
-                ret = Py_BuildValue("Oi",
-                                    addrobj,
-                                    _BT_L2_MEMB(a, psm));
+                /* Retain old format for non-LE address.
+                   (cid may be set for BR/EDR, but we're discarding it for now)
+                 */
+                if (_BT_L2_MEMB(a, bdaddr_type) == BDADDR_BREDR) {
+                    ret = Py_BuildValue("Oi",
+                                        addrobj,
+                                        _BT_L2_MEMB(a, psm));
+                } else {
+                    ret = Py_BuildValue("OiiB",
+                                        addrobj,
+                                        _BT_L2_MEMB(a, psm),
+                                        _BT_L2_MEMB(a, cid),
+                                        _BT_L2_MEMB(a, bdaddr_type));
+                }
                 Py_DECREF(addrobj);
             }
             return ret;
@@ -2047,8 +2058,11 @@ getsockaddrarg(PySocketSockObject *s, PyObject *args,
             struct sockaddr_l2 *addr = &addrbuf->bt_l2;
             memset(addr, 0, sizeof(struct sockaddr_l2));
             _BT_L2_MEMB(addr, family) = AF_BLUETOOTH;
-            if (!PyArg_ParseTuple(args, "si", &straddr,
-                                  &_BT_L2_MEMB(addr, psm))) {
+            _BT_L2_MEMB(addr, bdaddr_type) = BDADDR_BREDR;
+            if (!PyArg_ParseTuple(args, "si|iB", &straddr,
+                                  &_BT_L2_MEMB(addr, psm),
+                                  &_BT_L2_MEMB(addr, cid),
+                                  &_BT_L2_MEMB(addr, bdaddr_type))) {
                 PyErr_Format(PyExc_OSError,
                              "%s(): wrong format", caller);
                 return 0;
@@ -7782,6 +7796,9 @@ socket_exec(PyObject *m)
     ADD_INT_MACRO(m, AF_BLUETOOTH);
 #ifdef BTPROTO_L2CAP
     ADD_INT_MACRO(m, BTPROTO_L2CAP);
+    ADD_INT_MACRO(m, BDADDR_BREDR);
+    ADD_INT_MACRO(m, BDADDR_LE_PUBLIC);
+    ADD_INT_MACRO(m, BDADDR_LE_RANDOM);
 #endif /* BTPROTO_L2CAP */
 #ifdef BTPROTO_HCI
     ADD_INT_MACRO(m, BTPROTO_HCI);

_______________________________________________
Python-checkins mailing list -- python-checkins@python.org
To unsubscribe send an email to python-checkins-le...@python.org
https://mail.python.org/mailman3/lists/python-checkins.python.org/
Member address: arch...@mail-archive.com

Reply via email to