From d56f1e018db89438402324e0f802a10bfb70d2a2 Mon Sep 17 00:00:00 2001 From: Can Wang Date: Tue, 3 Jun 2025 18:03:58 +0800 Subject: [PATCH] tests: Bluetooth: BR: Add test suite of bonding for SMP IUT works as an SM Initiator/Responder. The peer device, SM Responder/Initiator, is a PC running bumble on it. Add shell commands to create and manage L2CAP server and channel with different security level. In the test suite, there are 5 test cases. The test matrix combines each bonding flag: | Test Case ID | Initiator | Responder | Expected Result | |--------------|-----------|-----------|-----------------| | BR_SM_Bonding_INIT_001 | Non-bondable | Non-bondable | Pass | | BR_SM_Bonding_INIT_005 | General Bonding | General Bonding | Pass | | BR_SM_Bonding_RSP_010 | Non-bondable | Non-bondable | Pass | | BR_SM_Bonding_RSP_011 | General Bonding | Non-bondable | Fail | | BR_SM_Bonding_RSP_014 | General Bonding | General Bonding | Pass | Signed-off-by: Can Wang --- .../classic/smp_bonding/CMakeLists.txt | 10 + .../bluetooth/classic/smp_bonding/README.rst | 74 +++ .../mimxrt1170_evk_mimxrt1176_cm7_B.conf | 4 + .../mimxrt1170_evk_mimxrt1176_cm7_B.overlay | 11 + tests/bluetooth/classic/smp_bonding/prj.conf | 16 + .../classic/smp_bonding/pytest/conftest.py | 57 ++ .../classic/smp_bonding/pytest/test_smp.py | 495 ++++++++++++++++++ .../classic/smp_bonding/src/smp_br_bonding.c | 274 ++++++++++ .../classic/smp_bonding/testcase.yaml | 24 + 9 files changed, 965 insertions(+) create mode 100644 tests/bluetooth/classic/smp_bonding/CMakeLists.txt create mode 100644 tests/bluetooth/classic/smp_bonding/README.rst create mode 100644 tests/bluetooth/classic/smp_bonding/boards/mimxrt1170_evk_mimxrt1176_cm7_B.conf create mode 100644 tests/bluetooth/classic/smp_bonding/boards/mimxrt1170_evk_mimxrt1176_cm7_B.overlay create mode 100644 tests/bluetooth/classic/smp_bonding/prj.conf create mode 100644 tests/bluetooth/classic/smp_bonding/pytest/conftest.py create mode 100644 tests/bluetooth/classic/smp_bonding/pytest/test_smp.py create mode 100644 tests/bluetooth/classic/smp_bonding/src/smp_br_bonding.c create mode 100644 tests/bluetooth/classic/smp_bonding/testcase.yaml diff --git a/tests/bluetooth/classic/smp_bonding/CMakeLists.txt b/tests/bluetooth/classic/smp_bonding/CMakeLists.txt new file mode 100644 index 00000000000..89de0b86386 --- /dev/null +++ b/tests/bluetooth/classic/smp_bonding/CMakeLists.txt @@ -0,0 +1,10 @@ +# SPDX-License-Identifier: Apache-2.0 + +cmake_minimum_required(VERSION 3.20.0) +set(NO_QEMU_SERIAL_BT_SERVER 1) + +find_package(Zephyr REQUIRED HINTS $ENV{ZEPHYR_BASE}) +project(bluetooth) + +FILE(GLOB app_sources src/*.c) +target_sources(app PRIVATE ${app_sources}) diff --git a/tests/bluetooth/classic/smp_bonding/README.rst b/tests/bluetooth/classic/smp_bonding/README.rst new file mode 100644 index 00000000000..5d73372b155 --- /dev/null +++ b/tests/bluetooth/classic/smp_bonding/README.rst @@ -0,0 +1,74 @@ +.. _bluetooth_classic_smp_bonding_tests: + +Bluetooth Classic SMP Bonding Tests +################################## + +Overview +******** + +This test suite uses ``bumble`` for testing Bluetooth Classic communication between a host +PC (running :ref:`Twister `) and a device under test (DUT) running Zephyr. + +Prerequisites +************* + +The test suite has the following prerequisites: + +* The ``bumble`` library installed on the host PC. +The Bluetooth Classic controller on PC side is required. Refer to getting started of `bumble`_ +for details. + +The HCI transport for ``bumble`` can be configured as follows: + +* A specific configuration context can be provided along with the ``usb_hci`` fixture separated by + a ``:`` (i.e. specify fixture ``usb_hci:usb:0`` to use the ``usb:0`` as hci transport for + ``bumble``). +* The configuration context can be overridden using the `hci transport`_ can be provided using the + ``--hci-transport`` test suite argument (i.e. run ``twister`` with the + ``--pytest-args=--hci-transport=usb:0`` argument to use the ``usb:0`` as hci transport for + ``bumble``). + +Building and Running +******************** + +Running on mimxrt1170_evk@B/mimxrt1176/cm7 +========================================== + +Running the test suite on :ref:`mimxrt1170_evk` relies on configuration of ``bumble``. + +On the host PC, a HCI transport needs to be required. Refer to `bumble platforms`_ page of +``bumble`` for details. + +For example, on windows, a PTS dongle is used. After `WinUSB driver`_ has been installed, +the HCI transport would be USB transport interface ``usb:``. + +If the HCI transport is ``usb:0`` and debug console port is ``COM4``, the test suite can be +launched using Twister: + +.. code-block:: shell + + west twister -v -p mimxrt1170_evk@B/mimxrt1176/cm7 --device-testing --device-serial COM4 -T tests/bluetooth/classic/smp_bonding -O smp_bonding --force-platform --west-flash --west-runner=jlink -X usb_hci:usb:0 + +Running on Hardware +=================== + +Running the test suite on hardware requires a HCI transport connected to the host PC. + +The test suite can be launched using Twister. Below is an example for running on the +:zephyr:board:`mimxrt1170_evk@B/mimxrt1176/cm7`: + +.. code-block:: shell + + west twister -v -p mimxrt1170_evk@B/mimxrt1176/cm7 --device-testing --device-serial COM4 -T tests/bluetooth/classic/smp_bonding -O smp_bonding --force-platform --west-flash --west-runner=jlink -X usb_hci:usb:0 + +.. _bumble: + https://google.github.io/bumble/getting_started.html + +.. _hci transport: + https://google.github.io/bumble/transports/index.html + +.. _bumble platforms: + https://google.github.io/bumble/platforms/index.html + +.. _WinUSB driver: + https://google.github.io/bumble/platforms/windows.html diff --git a/tests/bluetooth/classic/smp_bonding/boards/mimxrt1170_evk_mimxrt1176_cm7_B.conf b/tests/bluetooth/classic/smp_bonding/boards/mimxrt1170_evk_mimxrt1176_cm7_B.conf new file mode 100644 index 00000000000..fcba61c9d6f --- /dev/null +++ b/tests/bluetooth/classic/smp_bonding/boards/mimxrt1170_evk_mimxrt1176_cm7_B.conf @@ -0,0 +1,4 @@ +#select NXP NW612 Chipset +CONFIG_BT_NXP_NW612=y + +CONFIG_ENTROPY_GENERATOR=y diff --git a/tests/bluetooth/classic/smp_bonding/boards/mimxrt1170_evk_mimxrt1176_cm7_B.overlay b/tests/bluetooth/classic/smp_bonding/boards/mimxrt1170_evk_mimxrt1176_cm7_B.overlay new file mode 100644 index 00000000000..5e453897a15 --- /dev/null +++ b/tests/bluetooth/classic/smp_bonding/boards/mimxrt1170_evk_mimxrt1176_cm7_B.overlay @@ -0,0 +1,11 @@ +/* + * Copyright 2025 NXP + * + * SPDX-License-Identifier: Apache-2.0 + */ + +/ { + chosen { + zephyr,sram = &dtcm; + }; +}; diff --git a/tests/bluetooth/classic/smp_bonding/prj.conf b/tests/bluetooth/classic/smp_bonding/prj.conf new file mode 100644 index 00000000000..6575df71027 --- /dev/null +++ b/tests/bluetooth/classic/smp_bonding/prj.conf @@ -0,0 +1,16 @@ +CONFIG_BT=y +CONFIG_BT_CLASSIC=y +CONFIG_BT_SMP=y +CONFIG_BT_SHELL=y +CONFIG_LOG=y +CONFIG_ZTEST=y +CONFIG_BT_SETTINGS=y +CONFIG_FLASH=y +CONFIG_FLASH_MAP=y +CONFIG_NVS=y +CONFIG_SETTINGS=y + +CONFIG_BT_DEVICE_NAME="smp_bonding" + +CONFIG_BT_CREATE_CONN_TIMEOUT=30 +CONFIG_BT_PAGE_TIMEOUT=0xFFFF diff --git a/tests/bluetooth/classic/smp_bonding/pytest/conftest.py b/tests/bluetooth/classic/smp_bonding/pytest/conftest.py new file mode 100644 index 00000000000..accfcf20a51 --- /dev/null +++ b/tests/bluetooth/classic/smp_bonding/pytest/conftest.py @@ -0,0 +1,57 @@ +# Copyright 2025 NXP +# +# SPDX-License-Identifier: Apache-2.0 + +import logging +import re + +import pytest +from twister_harness import DeviceAdapter, Shell + +logger = logging.getLogger(__name__) + + +def pytest_addoption(parser) -> None: + """Add local parser options to pytest.""" + parser.addoption('--hci-transport', default=None, help='Configuration HCI transport for bumble') + + +@pytest.fixture(name='initialize', scope='session') +def fixture_initialize(request, shell: Shell, dut: DeviceAdapter): + """Session initializtion""" + # Get HCI transport for bumble + hci = request.config.getoption('--hci-transport') + + if hci is None: + for fixture in dut.device_config.fixtures: + if fixture.startswith('usb_hci:'): + hci = fixture.split(sep=':', maxsplit=1)[1] + break + + assert hci is not None + + shell.exec_command("bt init") + lines = dut.readlines_until("Settings Loaded") + regex = r'Identity: *(?P([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2}) *\((.*?)\))' + bd_addr = None + for line in lines: + logger.info(f"Shell log {line}") + m = re.search(regex, line) + if m: + bd_addr = m.group('bd_addr') + + if bd_addr is None: + logger.error('Fail to get IUT BD address') + raise AssertionError + + shell.exec_command("br pscan on") + shell.exec_command("br iscan on") + logger.info('initialized') + return hci, bd_addr + + +@pytest.fixture +def smp_initiator_dut(initialize): + logger.info('Start running testcase') + yield initialize + logger.info('Done') diff --git a/tests/bluetooth/classic/smp_bonding/pytest/test_smp.py b/tests/bluetooth/classic/smp_bonding/pytest/test_smp.py new file mode 100644 index 00000000000..9a0985b5520 --- /dev/null +++ b/tests/bluetooth/classic/smp_bonding/pytest/test_smp.py @@ -0,0 +1,495 @@ +# Copyright 2025 NXP +# +# SPDX-License-Identifier: Apache-2.0 + + +import asyncio +import logging +import re +import sys + +from bumble.core import ( + BT_BR_EDR_TRANSPORT, +) +from bumble.device import Device +from bumble.hci import Address, HCI_Write_Page_Timeout_Command +from bumble.l2cap import ClassicChannelSpec +from bumble.pairing import PairingConfig, PairingDelegate +from bumble.smp import ( + SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY, +) +from bumble.snoop import BtSnooper +from bumble.transport import open_transport_or_link +from twister_harness import DeviceAdapter, Shell + +logger = logging.getLogger(__name__) + +l2cap_server_psm = 0x1001 + + +# power on dongle +async def device_power_on(device) -> None: + while True: + try: + await device.power_on() + break + except Exception: + continue + + +def check_shell_response(lines: list[str], regex: str) -> bool: + found = False + try: + for line in lines: + if re.search(regex, line): + found = True + break + except Exception as e: + logger.error(f'{e}!', exc_info=True) + raise e + + return found + + +def search_messages(result, messages, read_lines): + for message in messages: + for line in read_lines: + if re.search(message, line): + result[message] = True + if False not in result.values(): + return True + break + + return False + + +async def wait_for_shell_response(dut, regex: list[str] | str, max_wait_sec=10): + found = False + lines = [] + + logger.debug('wait_for_shell_response') + + messages = [regex] if isinstance(regex, str) else regex + result = dict.fromkeys(messages, False) + + try: + for _ in range(0, max_wait_sec): + read_lines = dut.readlines(print_output=True) + lines += read_lines + for line in read_lines: + logger.debug(f'DUT response: {str(line)}') + + found = search_messages(result, messages, read_lines) + if found is True: + break + await asyncio.sleep(1) + except Exception as e: + logger.error(f'{e}!', exc_info=True) + raise e + + for key in result: + logger.debug(f'Expected DUT response: "{key}", Matched: {result[key]}') + + return found, lines + + +async def send_cmd_to_iut(shell, dut, cmd, parse=None): + shell.exec_command(cmd) + if parse is not None: + found, lines = await wait_for_shell_response(dut, parse) + else: + found = True + lines = [] + assert found is True + + return lines + + +async def bumble_acl_connect(shell, dut, device, target_address): + connection = None + try: + connection = await device.connect(target_address, transport=BT_BR_EDR_TRANSPORT) + logger.info(f'=== Connected to {connection.peer_address}!') + except Exception as e: + logger.error(f'Fail to connect to {target_address}!') + raise e + return connection + + +async def sm_bonding_init_001(hci_port, shell, dut, address, snoop_file) -> None: + async with await open_transport_or_link(hci_port) as hci_transport: + device = Device.with_hci( + 'Bumble', + Address('F0:F1:F2:F3:F4:F5'), + hci_transport.source, + hci_transport.sink, + ) + + device.classic_enabled = True + device.le_enabled = False + device.classic_sc_enabled = False + + delegate = PairingDelegate(SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY) + device.pairing_config_factory = lambda connection: PairingConfig( + sc=True, mitm=False, bonding=False, delegate=delegate + ) + + device.host.snooper = BtSnooper(snoop_file) + await device_power_on(device) + await device.send_command(HCI_Write_Page_Timeout_Command(page_timeout=0xFFFF)) + bumble_address = device.public_address.to_string().split('/P')[0] + + # Initiator as l2cap client: non-bondable + await send_cmd_to_iut(shell, dut, "bt clear all", None) + await send_cmd_to_iut(shell, dut, "br clear all", None) + await send_cmd_to_iut(shell, dut, "bt auth none", None) + await send_cmd_to_iut(shell, dut, "bt bondable off", None) + + # Responder as l2cap server: non-bondable + server = device.l2cap_channel_manager.create_classic_server( + spec=ClassicChannelSpec(psm=l2cap_server_psm) + ) + assert server is not None + + # Initiator ceate connection + await send_cmd_to_iut( + shell, dut, f"br connect {bumble_address}", f"Connected: {bumble_address}" + ) + + # Initiator l2cap connect + await send_cmd_to_iut( + shell, dut, f"l2cap_br connect {format(l2cap_server_psm, 'x')} sec 2", None + ) + + # Initiator check pairing success + found, lines = await wait_for_shell_response( + dut, + [f"Paired with {bumble_address}", f"Security changed: {bumble_address} level 2"], + ) + assert found is True + + # Initiator check l2cap connection + found = check_shell_response(lines, r"Channel \w+ connected") + if not found: + found, _ = await wait_for_shell_response(dut, [r"Channel \w+ connected"]) + assert found is True + + # Initiator disconnect + await send_cmd_to_iut(shell, dut, "bt disconnect", f"Disconnected: {bumble_address}") + + # Initiator check bond removal + lines = shell.exec_command("br bonds") + assert check_shell_response(lines, r"Total 0") + + +async def sm_bonding_init_005(hci_port, shell, dut, address, snoop_file) -> None: + async with await open_transport_or_link(hci_port) as hci_transport: + device = Device.with_hci( + 'Bumble', + Address('F0:F1:F2:F3:F4:F5'), + hci_transport.source, + hci_transport.sink, + ) + + device.classic_enabled = True + device.le_enabled = False + device.classic_sc_enabled = False + + delegate = PairingDelegate(SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY) + device.pairing_config_factory = lambda connection: PairingConfig( + sc=True, mitm=False, bonding=True, delegate=delegate + ) + + device.host.snooper = BtSnooper(snoop_file) + await device_power_on(device) + await device.send_command(HCI_Write_Page_Timeout_Command(page_timeout=0xFFFF)) + bumble_address = device.public_address.to_string().split('/P')[0] + + # Initiator as l2cap client: General Bonding + await send_cmd_to_iut(shell, dut, "bt clear all", None) + await send_cmd_to_iut(shell, dut, "br clear all", None) + await send_cmd_to_iut(shell, dut, "bt auth none", None) + await send_cmd_to_iut(shell, dut, "bt bondable on", None) + + # Responder as l2cap server: General Bonding + server = device.l2cap_channel_manager.create_classic_server( + spec=ClassicChannelSpec(psm=l2cap_server_psm) + ) + assert server is not None + + # Initiator as l2cap client: General Bonding + await send_cmd_to_iut( + shell, dut, f"br connect {bumble_address}", f"Connected: {bumble_address}" + ) + + # Initiator l2cap connect + await send_cmd_to_iut( + shell, dut, f"l2cap_br connect {format(l2cap_server_psm, 'x')} sec 2", None + ) + + # Initiator check bonding success + found, lines = await wait_for_shell_response( + dut, + [f"Bonded with {bumble_address}", f"Security changed: {bumble_address} level 2"], + ) + assert found is True + + # Initiator check l2cap connection + found = check_shell_response(lines, r"Channel \w+ connected") + if not found: + found, _ = await wait_for_shell_response(dut, [r"Channel \w+ connected"]) + assert found is True + + # Initiator disconnect + await send_cmd_to_iut(shell, dut, "bt disconnect", f"Disconnected: {bumble_address}") + + # Initiator check bonded + lines = shell.exec_command("br bonds") + assert check_shell_response(lines, f"Remote Identity: {bumble_address}") + + +async def sm_bonding_rsp_010(hci_port, shell, dut, address, snoop_file) -> None: + async with await open_transport_or_link(hci_port) as hci_transport: + device = Device.with_hci( + 'Bumble', + Address('F0:F1:F2:F3:F4:F5'), + hci_transport.source, + hci_transport.sink, + ) + + device.classic_enabled = True + device.le_enabled = False + device.classic_sc_enabled = False + + delegate = PairingDelegate(SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY) + device.pairing_config_factory = lambda connection: PairingConfig( + sc=True, mitm=False, bonding=False, delegate=delegate + ) + + device.host.snooper = BtSnooper(snoop_file) + await device_power_on(device) + await device.send_command(HCI_Write_Page_Timeout_Command(page_timeout=0xFFFF)) + bumble_address = device.public_address.to_string().split('/P')[0] + + iut_address = address.split(" ")[0] + + # Responder as l2cap server: non-bondable + await send_cmd_to_iut(shell, dut, "bt clear all", None) + await send_cmd_to_iut(shell, dut, "br clear all", None) + await send_cmd_to_iut(shell, dut, "bt auth none", None) + await send_cmd_to_iut(shell, dut, "bt bondable off", None) + await send_cmd_to_iut( + shell, dut, f"l2cap_br register {format(l2cap_server_psm, 'x')}", None + ) + await send_cmd_to_iut( + shell, dut, f"l2cap_br security {format(l2cap_server_psm, 'x')} 2", None + ) + + # Initiator as l2cap client: non-bondable + # Initiator create connection + connection = await device.connect(iut_address, transport=BT_BR_EDR_TRANSPORT) + # Responder check connection + found, _ = await wait_for_shell_response(dut, [f"Connected: {bumble_address}"]) + assert found is True + + # Initiator init authentication and encryption + await device.authenticate(connection) + await device.encrypt(connection) + + # Responder check pairing success + found, _ = await wait_for_shell_response( + dut, + [f"Paired with {bumble_address}", f"Security changed: {bumble_address} level 2"], + ) + assert found is True + + # Initiator create l2cap connection + await device.l2cap_channel_manager.create_classic_channel( + connection=connection, spec=ClassicChannelSpec(psm=l2cap_server_psm) + ) + + # Responder check l2cap channel connection + found, _ = await wait_for_shell_response(dut, [r"Channel \w+ connected"]) + assert found is True + + # Responder disconnect + await send_cmd_to_iut(shell, dut, "bt disconnect", f"Disconnected: {bumble_address}") + + # Responder check no bonds + lines = shell.exec_command("br bonds") + assert check_shell_response(lines, r"Total 0") + + +async def sm_bonding_rsp_011(hci_port, shell, dut, address, snoop_file) -> None: + async with await open_transport_or_link(hci_port) as hci_transport: + device = Device.with_hci( + 'Bumble', + Address('F0:F1:F2:F3:F4:F5'), + hci_transport.source, + hci_transport.sink, + ) + + device.classic_enabled = True + device.le_enabled = False + device.classic_sc_enabled = False + + delegate = PairingDelegate(SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY) + device.pairing_config_factory = lambda connection: PairingConfig( + sc=True, mitm=False, bonding=True, delegate=delegate + ) + + device.host.snooper = BtSnooper(snoop_file) + await device_power_on(device) + await device.send_command(HCI_Write_Page_Timeout_Command(page_timeout=0xFFFF)) + bumble_address = device.public_address.to_string().split('/P')[0] + + iut_address = address.split(" ")[0] + + # Responder as l2cap server: non-bondable + await send_cmd_to_iut(shell, dut, "bt clear all", None) + await send_cmd_to_iut(shell, dut, "br clear all", None) + await send_cmd_to_iut(shell, dut, "bt auth none", None) + await send_cmd_to_iut(shell, dut, "bt bondable off", None) + await send_cmd_to_iut( + shell, dut, f"l2cap_br register {format(l2cap_server_psm, 'x')}", None + ) + await send_cmd_to_iut( + shell, dut, f"l2cap_br security {format(l2cap_server_psm, 'x')} 2", None + ) + + # Initiator as l2cap client: General Bonding + # Initiator create connection + connection = await device.connect(iut_address, transport=BT_BR_EDR_TRANSPORT) + found, _ = await wait_for_shell_response(dut, [f"Connected: {bumble_address}"]) + assert found is True + + # Initiator init authentication and encryption + try: + await device.authenticate(connection) + await device.encrypt(connection) + except Exception as e: + logger.error(f"Authentication or encryption failed: {e}") + + # Responder check pairing failure + found, _ = await wait_for_shell_response( + dut, + [ + f"Pairing failed with {bumble_address} reason: Authentication failure", + rf"Disconnected: {bumble_address} \(reason 0x16\)", + ], + ) + assert found is True + + +async def sm_bonding_rsp_014(hci_port, shell, dut, address, snoop_file) -> None: + async with await open_transport_or_link(hci_port) as hci_transport: + device = Device.with_hci( + 'Bumble', + Address('F0:F1:F2:F3:F4:F5'), + hci_transport.source, + hci_transport.sink, + ) + + device.classic_enabled = True + device.le_enabled = False + device.classic_sc_enabled = False + + delegate = PairingDelegate(SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY) + device.pairing_config_factory = lambda connection: PairingConfig( + sc=True, mitm=False, bonding=True, delegate=delegate + ) + + device.host.snooper = BtSnooper(snoop_file) + await device_power_on(device) + await device.send_command(HCI_Write_Page_Timeout_Command(page_timeout=0xFFFF)) + bumble_address = device.public_address.to_string().split('/P')[0] + + iut_address = address.split(" ")[0] + + # Responder as l2cap server: General Bonding + await send_cmd_to_iut(shell, dut, "bt clear all", None) + await send_cmd_to_iut(shell, dut, "br clear all", None) + await send_cmd_to_iut(shell, dut, "bt auth none", None) + await send_cmd_to_iut(shell, dut, "bt bondable on", None) + await send_cmd_to_iut( + shell, dut, f"l2cap_br register {format(l2cap_server_psm, 'x')}", None + ) + await send_cmd_to_iut( + shell, dut, f"l2cap_br security {format(l2cap_server_psm, 'x')} 2", None + ) + + # Initiator as l2cap client: General Bonding + # Initiator create connection + connection = await device.connect(iut_address, transport=BT_BR_EDR_TRANSPORT) + # Responder check connection + found, _ = await wait_for_shell_response(dut, [f"Connected: {bumble_address}"]) + assert found is True + + # Initiator init authentication and encryption + await device.authenticate(connection) + await device.encrypt(connection) + + # Responder check bonding success + found, _ = await wait_for_shell_response( + dut, + [f"Bonded with {bumble_address}", f"Security changed: {bumble_address} level 2"], + ) + assert found is True + + # Initiator create l2cap connection + await device.l2cap_channel_manager.create_classic_channel( + connection=connection, spec=ClassicChannelSpec(psm=l2cap_server_psm) + ) + + # Responder check l2cap channel connection + found, _ = await wait_for_shell_response(dut, [r"Channel \w+ connected"]) + assert found is True + + # Responder disconnect + await send_cmd_to_iut(shell, dut, "bt disconnect", f"Disconnected: {bumble_address}") + + # Responder check bonds + lines = shell.exec_command("br bonds") + assert check_shell_response(lines, f"Remote Identity: {bumble_address}") + + +class TestSmpServer: + def test_sm_bonding_init_001(self, shell: Shell, dut: DeviceAdapter, smp_initiator_dut): + """Verify pairing can be established when both devices are in Non-bondable mode with + local device as L2CAP client.""" + logger.info(f'test_sm_bonding_init_001 {smp_initiator_dut}') + hci, iut_address = smp_initiator_dut + with open(f"bumble_hci_{sys._getframe().f_code.co_name}.log", "wb") as snoop_file: + asyncio.run(sm_bonding_init_001(hci, shell, dut, iut_address, snoop_file)) + + def test_sm_bonding_init_005(self, shell: Shell, dut: DeviceAdapter, smp_initiator_dut): + """Verify pairing and bonding can be established when both devices are + in General Bonding mode.""" + logger.info(f'test_sm_bonding_init_005 {smp_initiator_dut}') + hci, iut_address = smp_initiator_dut + with open(f"bumble_hci_{sys._getframe().f_code.co_name}.log", "wb") as snoop_file: + asyncio.run(sm_bonding_init_005(hci, shell, dut, iut_address, snoop_file)) + + def test_sm_bonding_rsp_010(self, shell: Shell, dut: DeviceAdapter, smp_initiator_dut): + """To verify that pairing succeeds when both local and peer devices are configured + as Non-bondable and the local device acts as an L2CAP server.""" + logger.info(f'test_sm_bonding_rsp_010 {smp_initiator_dut}') + hci, iut_address = smp_initiator_dut + with open(f"bumble_hci_{sys._getframe().f_code.co_name}.log", "wb") as snoop_file: + asyncio.run(sm_bonding_rsp_010(hci, shell, dut, iut_address, snoop_file)) + + def test_sm_bonding_rsp_011(self, shell: Shell, dut: DeviceAdapter, smp_initiator_dut): + """To verify that pairing fails when local device is Non-bondable and peer device + is General Bonding, with the local device acting as an L2CAP server.""" + logger.info(f'test_sm_bonding_rsp_011 {smp_initiator_dut}') + hci, iut_address = smp_initiator_dut + with open(f"bumble_hci_{sys._getframe().f_code.co_name}.log", "wb") as snoop_file: + asyncio.run(sm_bonding_rsp_011(hci, shell, dut, iut_address, snoop_file)) + + def test_sm_bonding_rsp_014(self, shell: Shell, dut: DeviceAdapter, smp_initiator_dut): + """To verify that pairing succeeds when both local and peer devices are configured + as General Bonding, with the local device acting as an L2CAP server.""" + logger.info(f'test_sm_bonding_rsp_014 {smp_initiator_dut}') + hci, iut_address = smp_initiator_dut + with open(f"bumble_hci_{sys._getframe().f_code.co_name}.log", "wb") as snoop_file: + asyncio.run(sm_bonding_rsp_014(hci, shell, dut, iut_address, snoop_file)) diff --git a/tests/bluetooth/classic/smp_bonding/src/smp_br_bonding.c b/tests/bluetooth/classic/smp_bonding/src/smp_br_bonding.c new file mode 100644 index 00000000000..c1b0a0f2ed9 --- /dev/null +++ b/tests/bluetooth/classic/smp_bonding/src/smp_br_bonding.c @@ -0,0 +1,274 @@ +/* smp_br_bonding.c - Bluetooth classic SMP bonding smoke test */ + +/* + * Copyright 2025 NXP + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#include +#include +#include +#include +#include +#include +#include + +#include + +#include +#include +#include +#include + +#include + +#include "host/shell/bt.h" +#include "common/bt_shell_private.h" + +#define DATA_BREDR_MTU 48 + +NET_BUF_POOL_FIXED_DEFINE(data_rx_pool, 1, DATA_BREDR_MTU, 8, NULL); + +struct l2cap_br_chan { + bool active; + struct bt_l2cap_br_chan chan; +}; + +#define APPL_L2CAP_CONNECTION_MAX_COUNT 1 +static struct l2cap_br_chan l2cap_chans[APPL_L2CAP_CONNECTION_MAX_COUNT]; +static struct bt_l2cap_server l2cap_servers[APPL_L2CAP_CONNECTION_MAX_COUNT]; + +static int l2cap_recv(struct bt_l2cap_chan *chan, struct net_buf *buf) +{ + struct l2cap_br_chan *br_chan = CONTAINER_OF( + CONTAINER_OF(chan, struct bt_l2cap_br_chan, chan), struct l2cap_br_chan, chan); + + bt_shell_print("Incoming data channel %d len %u", ARRAY_INDEX(l2cap_chans, br_chan), + buf->len); + + if (buf->len) { + bt_shell_hexdump(buf->data, buf->len); + } + + return 0; +} + +static void l2cap_connected(struct bt_l2cap_chan *chan) +{ + struct l2cap_br_chan *br_chan = CONTAINER_OF( + CONTAINER_OF(chan, struct bt_l2cap_br_chan, chan), struct l2cap_br_chan, chan); + + bt_shell_print("Channel %d connected", ARRAY_INDEX(l2cap_chans, br_chan)); +} + +static void l2cap_disconnected(struct bt_l2cap_chan *chan) +{ + struct l2cap_br_chan *br_chan = CONTAINER_OF( + CONTAINER_OF(chan, struct bt_l2cap_br_chan, chan), struct l2cap_br_chan, chan); + + br_chan->active = false; + bt_shell_print("Channel %d disconnected", ARRAY_INDEX(l2cap_chans, br_chan)); +} + +static struct net_buf *l2cap_alloc_buf(struct bt_l2cap_chan *chan) +{ + bt_shell_print("Channel %p requires buffer", chan); + + return net_buf_alloc(&data_rx_pool, K_NO_WAIT); +} + +static const struct bt_l2cap_chan_ops l2cap_ops = { + .alloc_buf = l2cap_alloc_buf, + .recv = l2cap_recv, + .connected = l2cap_connected, + .disconnected = l2cap_disconnected, +}; + +static struct l2cap_br_chan *l2cap_alloc_chan(void) +{ + ARRAY_FOR_EACH(l2cap_chans, index) { + if (l2cap_chans[index].active == false) { + l2cap_chans[index].active = true; + l2cap_chans[index].chan.chan.ops = &l2cap_ops; + l2cap_chans[index].chan.rx.mtu = DATA_BREDR_MTU; + return &l2cap_chans[index]; + } + } + return NULL; +} + +static int l2cap_accept(struct bt_conn *conn, struct bt_l2cap_server *server, + struct bt_l2cap_chan **chan) +{ + struct l2cap_br_chan *br_chan; + + bt_shell_print("Incoming BR/EDR conn %p", conn); + + br_chan = l2cap_alloc_chan(); + if (br_chan == NULL) { + bt_shell_error("No channels available"); + return -ENOMEM; + } + + *chan = &br_chan->chan.chan; + + return 0; +} + +static struct bt_l2cap_server *l2cap_alloc_server(uint16_t psm) +{ + ARRAY_FOR_EACH(l2cap_servers, index) { + if (l2cap_servers[index].psm == 0) { + l2cap_servers[index].psm = psm; + l2cap_servers[index].accept = l2cap_accept; + return &l2cap_servers[index]; + } + } + return NULL; +} + +static int cmd_l2cap_register(const struct shell *sh, size_t argc, char *argv[]) +{ + uint16_t psm = strtoul(argv[1], NULL, 16); + struct bt_l2cap_server *br_server; + + ARRAY_FOR_EACH(l2cap_servers, index) { + if (l2cap_servers[index].psm == psm) { + shell_print(sh, "Already registered"); + return -ENOEXEC; + } + } + + br_server = l2cap_alloc_server(psm); + if (br_server == NULL) { + shell_error(sh, "No servers available"); + return -ENOMEM; + } + + if ((argc > 3) && (strcmp(argv[2], "sec") == 0)) { + br_server->sec_level = strtoul(argv[3], NULL, 16); + } else { + br_server->sec_level = BT_SECURITY_L1; + } + + if (bt_l2cap_br_server_register(br_server) < 0) { + br_server->psm = 0U; + shell_error(sh, "Unable to register psm"); + return -ENOEXEC; + } + + shell_print(sh, "L2CAP psm %u registered", br_server->psm); + + return 0; +} + +static int cmd_l2cap_connect(const struct shell *sh, size_t argc, char *argv[]) +{ + int err; + struct bt_conn_info info; + struct l2cap_br_chan *br_chan; + uint16_t psm; + + if (default_conn == NULL) { + shell_error(sh, "Not connected"); + return -ENOEXEC; + } + + br_chan = l2cap_alloc_chan(); + if (br_chan == NULL) { + shell_error(sh, "No channels available"); + br_chan->active = false; + return -ENOMEM; + } + + err = bt_conn_get_info(default_conn, &info); + if ((err < 0) || (info.type != BT_CONN_TYPE_BR)) { + shell_error(sh, "Invalid conn type"); + br_chan->active = false; + return -ENOEXEC; + } + + psm = strtoul(argv[1], NULL, 16); + + if ((argc > 3) && (strcmp(argv[2], "sec") == 0)) { + br_chan->chan.required_sec_level = strtoul(argv[3], NULL, 16); + } else { + br_chan->chan.required_sec_level = BT_SECURITY_L1; + } + + err = bt_l2cap_chan_connect(default_conn, &br_chan->chan.chan, psm); + if (err < 0) { + br_chan->active = false; + shell_error(sh, "Unable to connect to psm %u (err %d)", psm, err); + } else { + shell_print(sh, "L2CAP connection pending"); + } + + return err; +} + +static int cmd_l2cap_disconnect(const struct shell *sh, size_t argc, char *argv[]) +{ + int err; + uint8_t id; + + id = strtoul(argv[1], NULL, 16); + if ((id >= ARRAY_SIZE(l2cap_chans)) || (!l2cap_chans[id].active)) { + shell_print(sh, "channel %d not connected", id); + return -ENOEXEC; + } + + err = bt_l2cap_chan_disconnect(&l2cap_chans[id].chan.chan); + if (err) { + shell_error(sh, "Unable to disconnect: %u", -err); + return err; + } + + return 0; +} + +static int cmd_set_security(const struct shell *sh, size_t argc, char *argv[]) +{ + uint16_t psm = strtoul(argv[1], NULL, 16); + uint8_t sec = strtoul(argv[2], NULL, 16); + + if (sec > BT_SECURITY_L4) { + shell_error(sh, "Invalid security level: %d", sec); + return -ENOEXEC; + } + + ARRAY_FOR_EACH(l2cap_servers, index) { + if (l2cap_servers[index].psm == psm) { + l2cap_servers[index].sec_level = sec; + shell_print(sh, "L2CAP psm %u security level %u", psm, sec); + return 0; + } + } + + shell_error(sh, "L2CAP psm %u not registered", psm); + return -ENOEXEC; +} + +SHELL_STATIC_SUBCMD_SET_CREATE( + l2cap_br_cmds, + SHELL_CMD_ARG(register, NULL, " [sec] [sec: 0 - 4]", cmd_l2cap_register, 2, 2), + SHELL_CMD_ARG(connect, NULL, " [sec] [sec: 0 - 4]", cmd_l2cap_connect, 2, 2), + SHELL_CMD_ARG(disconnect, NULL, "", cmd_l2cap_disconnect, 2, 0), + SHELL_CMD_ARG(security, NULL, " ", cmd_set_security, 3, 0), + SHELL_SUBCMD_SET_END); + +static int cmd_default_handler(const struct shell *sh, size_t argc, char **argv) +{ + if (argc == 1) { + shell_help(sh); + return SHELL_CMD_HELP_PRINTED; + } + + shell_error(sh, "%s unknown parameter: %s", argv[0], argv[1]); + + return -EINVAL; +} + +SHELL_CMD_REGISTER(l2cap_br, &l2cap_br_cmds, "Bluetooth classic l2cap shell commands", + cmd_default_handler); diff --git a/tests/bluetooth/classic/smp_bonding/testcase.yaml b/tests/bluetooth/classic/smp_bonding/testcase.yaml new file mode 100644 index 00000000000..57981ec905b --- /dev/null +++ b/tests/bluetooth/classic/smp_bonding/testcase.yaml @@ -0,0 +1,24 @@ +tests: + bluetooth.classic.smp.bonding: + platform_allow: + - native_sim + integration_platforms: + - native_sim + tags: + - bluetooth + - smp + harness: pytest + harness_config: + pytest_dut_scope: session + fixture: usb_hci + timeout: 180 + bluetooth.classic.smp.bonding.no_blobs: + platform_allow: + - mimxrt1170_evk@B/mimxrt1176/cm7 + tags: + - bluetooth + - smp + extra_args: + - CONFIG_BUILD_ONLY_NO_BLOBS=y + timeout: 180 + build_only: true