tests: Bluetooth: BR: Add test suite of bonding for SMP

IUT works as an SM Initiator/Responder. The peer device,
SM Responder/Initiator, is a PC running bumble on it.

Add shell commands to create and manage L2CAP server and channel with
different security level.

In the test suite, there are 5 test cases. The test matrix combines
each bonding flag:
| Test Case ID | Initiator | Responder | Expected Result |
|--------------|-----------|-----------|-----------------|
| BR_SM_Bonding_INIT_001 | Non-bondable | Non-bondable | Pass |
| BR_SM_Bonding_INIT_005 | General Bonding | General Bonding | Pass |
| BR_SM_Bonding_RSP_010 | Non-bondable | Non-bondable | Pass |
| BR_SM_Bonding_RSP_011 | General Bonding | Non-bondable | Fail |
| BR_SM_Bonding_RSP_014 | General Bonding | General Bonding | Pass |

Signed-off-by: Can Wang <can.wang@nxp.com>
This commit is contained in:
Can Wang 2025-06-03 18:03:58 +08:00 committed by Daniel DeGrasse
parent 9937be7c30
commit d56f1e018d
9 changed files with 965 additions and 0 deletions

View File

@ -0,0 +1,10 @@
# SPDX-License-Identifier: Apache-2.0
cmake_minimum_required(VERSION 3.20.0)
set(NO_QEMU_SERIAL_BT_SERVER 1)
find_package(Zephyr REQUIRED HINTS $ENV{ZEPHYR_BASE})
project(bluetooth)
FILE(GLOB app_sources src/*.c)
target_sources(app PRIVATE ${app_sources})

View File

@ -0,0 +1,74 @@
.. _bluetooth_classic_smp_bonding_tests:
Bluetooth Classic SMP Bonding Tests
##################################
Overview
********
This test suite uses ``bumble`` for testing Bluetooth Classic communication between a host
PC (running :ref:`Twister <twister_script>`) and a device under test (DUT) running Zephyr.
Prerequisites
*************
The test suite has the following prerequisites:
* The ``bumble`` library installed on the host PC.
The Bluetooth Classic controller on PC side is required. Refer to getting started of `bumble`_
for details.
The HCI transport for ``bumble`` can be configured as follows:
* A specific configuration context can be provided along with the ``usb_hci`` fixture separated by
a ``:`` (i.e. specify fixture ``usb_hci:usb:0`` to use the ``usb:0`` as hci transport for
``bumble``).
* The configuration context can be overridden using the `hci transport`_ can be provided using the
``--hci-transport`` test suite argument (i.e. run ``twister`` with the
``--pytest-args=--hci-transport=usb:0`` argument to use the ``usb:0`` as hci transport for
``bumble``).
Building and Running
********************
Running on mimxrt1170_evk@B/mimxrt1176/cm7
==========================================
Running the test suite on :ref:`mimxrt1170_evk` relies on configuration of ``bumble``.
On the host PC, a HCI transport needs to be required. Refer to `bumble platforms`_ page of
``bumble`` for details.
For example, on windows, a PTS dongle is used. After `WinUSB driver`_ has been installed,
the HCI transport would be USB transport interface ``usb:<index>``.
If the HCI transport is ``usb:0`` and debug console port is ``COM4``, the test suite can be
launched using Twister:
.. code-block:: shell
west twister -v -p mimxrt1170_evk@B/mimxrt1176/cm7 --device-testing --device-serial COM4 -T tests/bluetooth/classic/smp_bonding -O smp_bonding --force-platform --west-flash --west-runner=jlink -X usb_hci:usb:0
Running on Hardware
===================
Running the test suite on hardware requires a HCI transport connected to the host PC.
The test suite can be launched using Twister. Below is an example for running on the
:zephyr:board:`mimxrt1170_evk@B/mimxrt1176/cm7`:
.. code-block:: shell
west twister -v -p mimxrt1170_evk@B/mimxrt1176/cm7 --device-testing --device-serial COM4 -T tests/bluetooth/classic/smp_bonding -O smp_bonding --force-platform --west-flash --west-runner=jlink -X usb_hci:usb:0
.. _bumble:
https://google.github.io/bumble/getting_started.html
.. _hci transport:
https://google.github.io/bumble/transports/index.html
.. _bumble platforms:
https://google.github.io/bumble/platforms/index.html
.. _WinUSB driver:
https://google.github.io/bumble/platforms/windows.html

View File

@ -0,0 +1,4 @@
#select NXP NW612 Chipset
CONFIG_BT_NXP_NW612=y
CONFIG_ENTROPY_GENERATOR=y

View File

@ -0,0 +1,11 @@
/*
* Copyright 2025 NXP
*
* SPDX-License-Identifier: Apache-2.0
*/
/ {
chosen {
zephyr,sram = &dtcm;
};
};

View File

@ -0,0 +1,16 @@
CONFIG_BT=y
CONFIG_BT_CLASSIC=y
CONFIG_BT_SMP=y
CONFIG_BT_SHELL=y
CONFIG_LOG=y
CONFIG_ZTEST=y
CONFIG_BT_SETTINGS=y
CONFIG_FLASH=y
CONFIG_FLASH_MAP=y
CONFIG_NVS=y
CONFIG_SETTINGS=y
CONFIG_BT_DEVICE_NAME="smp_bonding"
CONFIG_BT_CREATE_CONN_TIMEOUT=30
CONFIG_BT_PAGE_TIMEOUT=0xFFFF

View File

@ -0,0 +1,57 @@
# Copyright 2025 NXP
#
# SPDX-License-Identifier: Apache-2.0
import logging
import re
import pytest
from twister_harness import DeviceAdapter, Shell
logger = logging.getLogger(__name__)
def pytest_addoption(parser) -> None:
"""Add local parser options to pytest."""
parser.addoption('--hci-transport', default=None, help='Configuration HCI transport for bumble')
@pytest.fixture(name='initialize', scope='session')
def fixture_initialize(request, shell: Shell, dut: DeviceAdapter):
"""Session initializtion"""
# Get HCI transport for bumble
hci = request.config.getoption('--hci-transport')
if hci is None:
for fixture in dut.device_config.fixtures:
if fixture.startswith('usb_hci:'):
hci = fixture.split(sep=':', maxsplit=1)[1]
break
assert hci is not None
shell.exec_command("bt init")
lines = dut.readlines_until("Settings Loaded")
regex = r'Identity: *(?P<bd_addr>([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2}) *\((.*?)\))'
bd_addr = None
for line in lines:
logger.info(f"Shell log {line}")
m = re.search(regex, line)
if m:
bd_addr = m.group('bd_addr')
if bd_addr is None:
logger.error('Fail to get IUT BD address')
raise AssertionError
shell.exec_command("br pscan on")
shell.exec_command("br iscan on")
logger.info('initialized')
return hci, bd_addr
@pytest.fixture
def smp_initiator_dut(initialize):
logger.info('Start running testcase')
yield initialize
logger.info('Done')

View File

@ -0,0 +1,495 @@
# Copyright 2025 NXP
#
# SPDX-License-Identifier: Apache-2.0
import asyncio
import logging
import re
import sys
from bumble.core import (
BT_BR_EDR_TRANSPORT,
)
from bumble.device import Device
from bumble.hci import Address, HCI_Write_Page_Timeout_Command
from bumble.l2cap import ClassicChannelSpec
from bumble.pairing import PairingConfig, PairingDelegate
from bumble.smp import (
SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY,
)
from bumble.snoop import BtSnooper
from bumble.transport import open_transport_or_link
from twister_harness import DeviceAdapter, Shell
logger = logging.getLogger(__name__)
l2cap_server_psm = 0x1001
# power on dongle
async def device_power_on(device) -> None:
while True:
try:
await device.power_on()
break
except Exception:
continue
def check_shell_response(lines: list[str], regex: str) -> bool:
found = False
try:
for line in lines:
if re.search(regex, line):
found = True
break
except Exception as e:
logger.error(f'{e}!', exc_info=True)
raise e
return found
def search_messages(result, messages, read_lines):
for message in messages:
for line in read_lines:
if re.search(message, line):
result[message] = True
if False not in result.values():
return True
break
return False
async def wait_for_shell_response(dut, regex: list[str] | str, max_wait_sec=10):
found = False
lines = []
logger.debug('wait_for_shell_response')
messages = [regex] if isinstance(regex, str) else regex
result = dict.fromkeys(messages, False)
try:
for _ in range(0, max_wait_sec):
read_lines = dut.readlines(print_output=True)
lines += read_lines
for line in read_lines:
logger.debug(f'DUT response: {str(line)}')
found = search_messages(result, messages, read_lines)
if found is True:
break
await asyncio.sleep(1)
except Exception as e:
logger.error(f'{e}!', exc_info=True)
raise e
for key in result:
logger.debug(f'Expected DUT response: "{key}", Matched: {result[key]}')
return found, lines
async def send_cmd_to_iut(shell, dut, cmd, parse=None):
shell.exec_command(cmd)
if parse is not None:
found, lines = await wait_for_shell_response(dut, parse)
else:
found = True
lines = []
assert found is True
return lines
async def bumble_acl_connect(shell, dut, device, target_address):
connection = None
try:
connection = await device.connect(target_address, transport=BT_BR_EDR_TRANSPORT)
logger.info(f'=== Connected to {connection.peer_address}!')
except Exception as e:
logger.error(f'Fail to connect to {target_address}!')
raise e
return connection
async def sm_bonding_init_001(hci_port, shell, dut, address, snoop_file) -> None:
async with await open_transport_or_link(hci_port) as hci_transport:
device = Device.with_hci(
'Bumble',
Address('F0:F1:F2:F3:F4:F5'),
hci_transport.source,
hci_transport.sink,
)
device.classic_enabled = True
device.le_enabled = False
device.classic_sc_enabled = False
delegate = PairingDelegate(SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY)
device.pairing_config_factory = lambda connection: PairingConfig(
sc=True, mitm=False, bonding=False, delegate=delegate
)
device.host.snooper = BtSnooper(snoop_file)
await device_power_on(device)
await device.send_command(HCI_Write_Page_Timeout_Command(page_timeout=0xFFFF))
bumble_address = device.public_address.to_string().split('/P')[0]
# Initiator as l2cap client: non-bondable
await send_cmd_to_iut(shell, dut, "bt clear all", None)
await send_cmd_to_iut(shell, dut, "br clear all", None)
await send_cmd_to_iut(shell, dut, "bt auth none", None)
await send_cmd_to_iut(shell, dut, "bt bondable off", None)
# Responder as l2cap server: non-bondable
server = device.l2cap_channel_manager.create_classic_server(
spec=ClassicChannelSpec(psm=l2cap_server_psm)
)
assert server is not None
# Initiator ceate connection
await send_cmd_to_iut(
shell, dut, f"br connect {bumble_address}", f"Connected: {bumble_address}"
)
# Initiator l2cap connect
await send_cmd_to_iut(
shell, dut, f"l2cap_br connect {format(l2cap_server_psm, 'x')} sec 2", None
)
# Initiator check pairing success
found, lines = await wait_for_shell_response(
dut,
[f"Paired with {bumble_address}", f"Security changed: {bumble_address} level 2"],
)
assert found is True
# Initiator check l2cap connection
found = check_shell_response(lines, r"Channel \w+ connected")
if not found:
found, _ = await wait_for_shell_response(dut, [r"Channel \w+ connected"])
assert found is True
# Initiator disconnect
await send_cmd_to_iut(shell, dut, "bt disconnect", f"Disconnected: {bumble_address}")
# Initiator check bond removal
lines = shell.exec_command("br bonds")
assert check_shell_response(lines, r"Total 0")
async def sm_bonding_init_005(hci_port, shell, dut, address, snoop_file) -> None:
async with await open_transport_or_link(hci_port) as hci_transport:
device = Device.with_hci(
'Bumble',
Address('F0:F1:F2:F3:F4:F5'),
hci_transport.source,
hci_transport.sink,
)
device.classic_enabled = True
device.le_enabled = False
device.classic_sc_enabled = False
delegate = PairingDelegate(SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY)
device.pairing_config_factory = lambda connection: PairingConfig(
sc=True, mitm=False, bonding=True, delegate=delegate
)
device.host.snooper = BtSnooper(snoop_file)
await device_power_on(device)
await device.send_command(HCI_Write_Page_Timeout_Command(page_timeout=0xFFFF))
bumble_address = device.public_address.to_string().split('/P')[0]
# Initiator as l2cap client: General Bonding
await send_cmd_to_iut(shell, dut, "bt clear all", None)
await send_cmd_to_iut(shell, dut, "br clear all", None)
await send_cmd_to_iut(shell, dut, "bt auth none", None)
await send_cmd_to_iut(shell, dut, "bt bondable on", None)
# Responder as l2cap server: General Bonding
server = device.l2cap_channel_manager.create_classic_server(
spec=ClassicChannelSpec(psm=l2cap_server_psm)
)
assert server is not None
# Initiator as l2cap client: General Bonding
await send_cmd_to_iut(
shell, dut, f"br connect {bumble_address}", f"Connected: {bumble_address}"
)
# Initiator l2cap connect
await send_cmd_to_iut(
shell, dut, f"l2cap_br connect {format(l2cap_server_psm, 'x')} sec 2", None
)
# Initiator check bonding success
found, lines = await wait_for_shell_response(
dut,
[f"Bonded with {bumble_address}", f"Security changed: {bumble_address} level 2"],
)
assert found is True
# Initiator check l2cap connection
found = check_shell_response(lines, r"Channel \w+ connected")
if not found:
found, _ = await wait_for_shell_response(dut, [r"Channel \w+ connected"])
assert found is True
# Initiator disconnect
await send_cmd_to_iut(shell, dut, "bt disconnect", f"Disconnected: {bumble_address}")
# Initiator check bonded
lines = shell.exec_command("br bonds")
assert check_shell_response(lines, f"Remote Identity: {bumble_address}")
async def sm_bonding_rsp_010(hci_port, shell, dut, address, snoop_file) -> None:
async with await open_transport_or_link(hci_port) as hci_transport:
device = Device.with_hci(
'Bumble',
Address('F0:F1:F2:F3:F4:F5'),
hci_transport.source,
hci_transport.sink,
)
device.classic_enabled = True
device.le_enabled = False
device.classic_sc_enabled = False
delegate = PairingDelegate(SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY)
device.pairing_config_factory = lambda connection: PairingConfig(
sc=True, mitm=False, bonding=False, delegate=delegate
)
device.host.snooper = BtSnooper(snoop_file)
await device_power_on(device)
await device.send_command(HCI_Write_Page_Timeout_Command(page_timeout=0xFFFF))
bumble_address = device.public_address.to_string().split('/P')[0]
iut_address = address.split(" ")[0]
# Responder as l2cap server: non-bondable
await send_cmd_to_iut(shell, dut, "bt clear all", None)
await send_cmd_to_iut(shell, dut, "br clear all", None)
await send_cmd_to_iut(shell, dut, "bt auth none", None)
await send_cmd_to_iut(shell, dut, "bt bondable off", None)
await send_cmd_to_iut(
shell, dut, f"l2cap_br register {format(l2cap_server_psm, 'x')}", None
)
await send_cmd_to_iut(
shell, dut, f"l2cap_br security {format(l2cap_server_psm, 'x')} 2", None
)
# Initiator as l2cap client: non-bondable
# Initiator create connection
connection = await device.connect(iut_address, transport=BT_BR_EDR_TRANSPORT)
# Responder check connection
found, _ = await wait_for_shell_response(dut, [f"Connected: {bumble_address}"])
assert found is True
# Initiator init authentication and encryption
await device.authenticate(connection)
await device.encrypt(connection)
# Responder check pairing success
found, _ = await wait_for_shell_response(
dut,
[f"Paired with {bumble_address}", f"Security changed: {bumble_address} level 2"],
)
assert found is True
# Initiator create l2cap connection
await device.l2cap_channel_manager.create_classic_channel(
connection=connection, spec=ClassicChannelSpec(psm=l2cap_server_psm)
)
# Responder check l2cap channel connection
found, _ = await wait_for_shell_response(dut, [r"Channel \w+ connected"])
assert found is True
# Responder disconnect
await send_cmd_to_iut(shell, dut, "bt disconnect", f"Disconnected: {bumble_address}")
# Responder check no bonds
lines = shell.exec_command("br bonds")
assert check_shell_response(lines, r"Total 0")
async def sm_bonding_rsp_011(hci_port, shell, dut, address, snoop_file) -> None:
async with await open_transport_or_link(hci_port) as hci_transport:
device = Device.with_hci(
'Bumble',
Address('F0:F1:F2:F3:F4:F5'),
hci_transport.source,
hci_transport.sink,
)
device.classic_enabled = True
device.le_enabled = False
device.classic_sc_enabled = False
delegate = PairingDelegate(SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY)
device.pairing_config_factory = lambda connection: PairingConfig(
sc=True, mitm=False, bonding=True, delegate=delegate
)
device.host.snooper = BtSnooper(snoop_file)
await device_power_on(device)
await device.send_command(HCI_Write_Page_Timeout_Command(page_timeout=0xFFFF))
bumble_address = device.public_address.to_string().split('/P')[0]
iut_address = address.split(" ")[0]
# Responder as l2cap server: non-bondable
await send_cmd_to_iut(shell, dut, "bt clear all", None)
await send_cmd_to_iut(shell, dut, "br clear all", None)
await send_cmd_to_iut(shell, dut, "bt auth none", None)
await send_cmd_to_iut(shell, dut, "bt bondable off", None)
await send_cmd_to_iut(
shell, dut, f"l2cap_br register {format(l2cap_server_psm, 'x')}", None
)
await send_cmd_to_iut(
shell, dut, f"l2cap_br security {format(l2cap_server_psm, 'x')} 2", None
)
# Initiator as l2cap client: General Bonding
# Initiator create connection
connection = await device.connect(iut_address, transport=BT_BR_EDR_TRANSPORT)
found, _ = await wait_for_shell_response(dut, [f"Connected: {bumble_address}"])
assert found is True
# Initiator init authentication and encryption
try:
await device.authenticate(connection)
await device.encrypt(connection)
except Exception as e:
logger.error(f"Authentication or encryption failed: {e}")
# Responder check pairing failure
found, _ = await wait_for_shell_response(
dut,
[
f"Pairing failed with {bumble_address} reason: Authentication failure",
rf"Disconnected: {bumble_address} \(reason 0x16\)",
],
)
assert found is True
async def sm_bonding_rsp_014(hci_port, shell, dut, address, snoop_file) -> None:
async with await open_transport_or_link(hci_port) as hci_transport:
device = Device.with_hci(
'Bumble',
Address('F0:F1:F2:F3:F4:F5'),
hci_transport.source,
hci_transport.sink,
)
device.classic_enabled = True
device.le_enabled = False
device.classic_sc_enabled = False
delegate = PairingDelegate(SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY)
device.pairing_config_factory = lambda connection: PairingConfig(
sc=True, mitm=False, bonding=True, delegate=delegate
)
device.host.snooper = BtSnooper(snoop_file)
await device_power_on(device)
await device.send_command(HCI_Write_Page_Timeout_Command(page_timeout=0xFFFF))
bumble_address = device.public_address.to_string().split('/P')[0]
iut_address = address.split(" ")[0]
# Responder as l2cap server: General Bonding
await send_cmd_to_iut(shell, dut, "bt clear all", None)
await send_cmd_to_iut(shell, dut, "br clear all", None)
await send_cmd_to_iut(shell, dut, "bt auth none", None)
await send_cmd_to_iut(shell, dut, "bt bondable on", None)
await send_cmd_to_iut(
shell, dut, f"l2cap_br register {format(l2cap_server_psm, 'x')}", None
)
await send_cmd_to_iut(
shell, dut, f"l2cap_br security {format(l2cap_server_psm, 'x')} 2", None
)
# Initiator as l2cap client: General Bonding
# Initiator create connection
connection = await device.connect(iut_address, transport=BT_BR_EDR_TRANSPORT)
# Responder check connection
found, _ = await wait_for_shell_response(dut, [f"Connected: {bumble_address}"])
assert found is True
# Initiator init authentication and encryption
await device.authenticate(connection)
await device.encrypt(connection)
# Responder check bonding success
found, _ = await wait_for_shell_response(
dut,
[f"Bonded with {bumble_address}", f"Security changed: {bumble_address} level 2"],
)
assert found is True
# Initiator create l2cap connection
await device.l2cap_channel_manager.create_classic_channel(
connection=connection, spec=ClassicChannelSpec(psm=l2cap_server_psm)
)
# Responder check l2cap channel connection
found, _ = await wait_for_shell_response(dut, [r"Channel \w+ connected"])
assert found is True
# Responder disconnect
await send_cmd_to_iut(shell, dut, "bt disconnect", f"Disconnected: {bumble_address}")
# Responder check bonds
lines = shell.exec_command("br bonds")
assert check_shell_response(lines, f"Remote Identity: {bumble_address}")
class TestSmpServer:
def test_sm_bonding_init_001(self, shell: Shell, dut: DeviceAdapter, smp_initiator_dut):
"""Verify pairing can be established when both devices are in Non-bondable mode with
local device as L2CAP client."""
logger.info(f'test_sm_bonding_init_001 {smp_initiator_dut}')
hci, iut_address = smp_initiator_dut
with open(f"bumble_hci_{sys._getframe().f_code.co_name}.log", "wb") as snoop_file:
asyncio.run(sm_bonding_init_001(hci, shell, dut, iut_address, snoop_file))
def test_sm_bonding_init_005(self, shell: Shell, dut: DeviceAdapter, smp_initiator_dut):
"""Verify pairing and bonding can be established when both devices are
in General Bonding mode."""
logger.info(f'test_sm_bonding_init_005 {smp_initiator_dut}')
hci, iut_address = smp_initiator_dut
with open(f"bumble_hci_{sys._getframe().f_code.co_name}.log", "wb") as snoop_file:
asyncio.run(sm_bonding_init_005(hci, shell, dut, iut_address, snoop_file))
def test_sm_bonding_rsp_010(self, shell: Shell, dut: DeviceAdapter, smp_initiator_dut):
"""To verify that pairing succeeds when both local and peer devices are configured
as Non-bondable and the local device acts as an L2CAP server."""
logger.info(f'test_sm_bonding_rsp_010 {smp_initiator_dut}')
hci, iut_address = smp_initiator_dut
with open(f"bumble_hci_{sys._getframe().f_code.co_name}.log", "wb") as snoop_file:
asyncio.run(sm_bonding_rsp_010(hci, shell, dut, iut_address, snoop_file))
def test_sm_bonding_rsp_011(self, shell: Shell, dut: DeviceAdapter, smp_initiator_dut):
"""To verify that pairing fails when local device is Non-bondable and peer device
is General Bonding, with the local device acting as an L2CAP server."""
logger.info(f'test_sm_bonding_rsp_011 {smp_initiator_dut}')
hci, iut_address = smp_initiator_dut
with open(f"bumble_hci_{sys._getframe().f_code.co_name}.log", "wb") as snoop_file:
asyncio.run(sm_bonding_rsp_011(hci, shell, dut, iut_address, snoop_file))
def test_sm_bonding_rsp_014(self, shell: Shell, dut: DeviceAdapter, smp_initiator_dut):
"""To verify that pairing succeeds when both local and peer devices are configured
as General Bonding, with the local device acting as an L2CAP server."""
logger.info(f'test_sm_bonding_rsp_014 {smp_initiator_dut}')
hci, iut_address = smp_initiator_dut
with open(f"bumble_hci_{sys._getframe().f_code.co_name}.log", "wb") as snoop_file:
asyncio.run(sm_bonding_rsp_014(hci, shell, dut, iut_address, snoop_file))

View File

@ -0,0 +1,274 @@
/* smp_br_bonding.c - Bluetooth classic SMP bonding smoke test */
/*
* Copyright 2025 NXP
*
* SPDX-License-Identifier: Apache-2.0
*/
#include <errno.h>
#include <zephyr/types.h>
#include <stddef.h>
#include <stdlib.h>
#include <string.h>
#include <zephyr/sys/byteorder.h>
#include <zephyr/kernel.h>
#include <zephyr/settings/settings.h>
#include <zephyr/bluetooth/hci.h>
#include <zephyr/bluetooth/bluetooth.h>
#include <zephyr/bluetooth/conn.h>
#include <zephyr/bluetooth/l2cap.h>
#include <zephyr/shell/shell.h>
#include "host/shell/bt.h"
#include "common/bt_shell_private.h"
#define DATA_BREDR_MTU 48
NET_BUF_POOL_FIXED_DEFINE(data_rx_pool, 1, DATA_BREDR_MTU, 8, NULL);
struct l2cap_br_chan {
bool active;
struct bt_l2cap_br_chan chan;
};
#define APPL_L2CAP_CONNECTION_MAX_COUNT 1
static struct l2cap_br_chan l2cap_chans[APPL_L2CAP_CONNECTION_MAX_COUNT];
static struct bt_l2cap_server l2cap_servers[APPL_L2CAP_CONNECTION_MAX_COUNT];
static int l2cap_recv(struct bt_l2cap_chan *chan, struct net_buf *buf)
{
struct l2cap_br_chan *br_chan = CONTAINER_OF(
CONTAINER_OF(chan, struct bt_l2cap_br_chan, chan), struct l2cap_br_chan, chan);
bt_shell_print("Incoming data channel %d len %u", ARRAY_INDEX(l2cap_chans, br_chan),
buf->len);
if (buf->len) {
bt_shell_hexdump(buf->data, buf->len);
}
return 0;
}
static void l2cap_connected(struct bt_l2cap_chan *chan)
{
struct l2cap_br_chan *br_chan = CONTAINER_OF(
CONTAINER_OF(chan, struct bt_l2cap_br_chan, chan), struct l2cap_br_chan, chan);
bt_shell_print("Channel %d connected", ARRAY_INDEX(l2cap_chans, br_chan));
}
static void l2cap_disconnected(struct bt_l2cap_chan *chan)
{
struct l2cap_br_chan *br_chan = CONTAINER_OF(
CONTAINER_OF(chan, struct bt_l2cap_br_chan, chan), struct l2cap_br_chan, chan);
br_chan->active = false;
bt_shell_print("Channel %d disconnected", ARRAY_INDEX(l2cap_chans, br_chan));
}
static struct net_buf *l2cap_alloc_buf(struct bt_l2cap_chan *chan)
{
bt_shell_print("Channel %p requires buffer", chan);
return net_buf_alloc(&data_rx_pool, K_NO_WAIT);
}
static const struct bt_l2cap_chan_ops l2cap_ops = {
.alloc_buf = l2cap_alloc_buf,
.recv = l2cap_recv,
.connected = l2cap_connected,
.disconnected = l2cap_disconnected,
};
static struct l2cap_br_chan *l2cap_alloc_chan(void)
{
ARRAY_FOR_EACH(l2cap_chans, index) {
if (l2cap_chans[index].active == false) {
l2cap_chans[index].active = true;
l2cap_chans[index].chan.chan.ops = &l2cap_ops;
l2cap_chans[index].chan.rx.mtu = DATA_BREDR_MTU;
return &l2cap_chans[index];
}
}
return NULL;
}
static int l2cap_accept(struct bt_conn *conn, struct bt_l2cap_server *server,
struct bt_l2cap_chan **chan)
{
struct l2cap_br_chan *br_chan;
bt_shell_print("Incoming BR/EDR conn %p", conn);
br_chan = l2cap_alloc_chan();
if (br_chan == NULL) {
bt_shell_error("No channels available");
return -ENOMEM;
}
*chan = &br_chan->chan.chan;
return 0;
}
static struct bt_l2cap_server *l2cap_alloc_server(uint16_t psm)
{
ARRAY_FOR_EACH(l2cap_servers, index) {
if (l2cap_servers[index].psm == 0) {
l2cap_servers[index].psm = psm;
l2cap_servers[index].accept = l2cap_accept;
return &l2cap_servers[index];
}
}
return NULL;
}
static int cmd_l2cap_register(const struct shell *sh, size_t argc, char *argv[])
{
uint16_t psm = strtoul(argv[1], NULL, 16);
struct bt_l2cap_server *br_server;
ARRAY_FOR_EACH(l2cap_servers, index) {
if (l2cap_servers[index].psm == psm) {
shell_print(sh, "Already registered");
return -ENOEXEC;
}
}
br_server = l2cap_alloc_server(psm);
if (br_server == NULL) {
shell_error(sh, "No servers available");
return -ENOMEM;
}
if ((argc > 3) && (strcmp(argv[2], "sec") == 0)) {
br_server->sec_level = strtoul(argv[3], NULL, 16);
} else {
br_server->sec_level = BT_SECURITY_L1;
}
if (bt_l2cap_br_server_register(br_server) < 0) {
br_server->psm = 0U;
shell_error(sh, "Unable to register psm");
return -ENOEXEC;
}
shell_print(sh, "L2CAP psm %u registered", br_server->psm);
return 0;
}
static int cmd_l2cap_connect(const struct shell *sh, size_t argc, char *argv[])
{
int err;
struct bt_conn_info info;
struct l2cap_br_chan *br_chan;
uint16_t psm;
if (default_conn == NULL) {
shell_error(sh, "Not connected");
return -ENOEXEC;
}
br_chan = l2cap_alloc_chan();
if (br_chan == NULL) {
shell_error(sh, "No channels available");
br_chan->active = false;
return -ENOMEM;
}
err = bt_conn_get_info(default_conn, &info);
if ((err < 0) || (info.type != BT_CONN_TYPE_BR)) {
shell_error(sh, "Invalid conn type");
br_chan->active = false;
return -ENOEXEC;
}
psm = strtoul(argv[1], NULL, 16);
if ((argc > 3) && (strcmp(argv[2], "sec") == 0)) {
br_chan->chan.required_sec_level = strtoul(argv[3], NULL, 16);
} else {
br_chan->chan.required_sec_level = BT_SECURITY_L1;
}
err = bt_l2cap_chan_connect(default_conn, &br_chan->chan.chan, psm);
if (err < 0) {
br_chan->active = false;
shell_error(sh, "Unable to connect to psm %u (err %d)", psm, err);
} else {
shell_print(sh, "L2CAP connection pending");
}
return err;
}
static int cmd_l2cap_disconnect(const struct shell *sh, size_t argc, char *argv[])
{
int err;
uint8_t id;
id = strtoul(argv[1], NULL, 16);
if ((id >= ARRAY_SIZE(l2cap_chans)) || (!l2cap_chans[id].active)) {
shell_print(sh, "channel %d not connected", id);
return -ENOEXEC;
}
err = bt_l2cap_chan_disconnect(&l2cap_chans[id].chan.chan);
if (err) {
shell_error(sh, "Unable to disconnect: %u", -err);
return err;
}
return 0;
}
static int cmd_set_security(const struct shell *sh, size_t argc, char *argv[])
{
uint16_t psm = strtoul(argv[1], NULL, 16);
uint8_t sec = strtoul(argv[2], NULL, 16);
if (sec > BT_SECURITY_L4) {
shell_error(sh, "Invalid security level: %d", sec);
return -ENOEXEC;
}
ARRAY_FOR_EACH(l2cap_servers, index) {
if (l2cap_servers[index].psm == psm) {
l2cap_servers[index].sec_level = sec;
shell_print(sh, "L2CAP psm %u security level %u", psm, sec);
return 0;
}
}
shell_error(sh, "L2CAP psm %u not registered", psm);
return -ENOEXEC;
}
SHELL_STATIC_SUBCMD_SET_CREATE(
l2cap_br_cmds,
SHELL_CMD_ARG(register, NULL, "<psm> [sec] [sec: 0 - 4]", cmd_l2cap_register, 2, 2),
SHELL_CMD_ARG(connect, NULL, "<psm> [sec] [sec: 0 - 4]", cmd_l2cap_connect, 2, 2),
SHELL_CMD_ARG(disconnect, NULL, "<id>", cmd_l2cap_disconnect, 2, 0),
SHELL_CMD_ARG(security, NULL, "<psm> <security level: 0 - 4>", cmd_set_security, 3, 0),
SHELL_SUBCMD_SET_END);
static int cmd_default_handler(const struct shell *sh, size_t argc, char **argv)
{
if (argc == 1) {
shell_help(sh);
return SHELL_CMD_HELP_PRINTED;
}
shell_error(sh, "%s unknown parameter: %s", argv[0], argv[1]);
return -EINVAL;
}
SHELL_CMD_REGISTER(l2cap_br, &l2cap_br_cmds, "Bluetooth classic l2cap shell commands",
cmd_default_handler);

View File

@ -0,0 +1,24 @@
tests:
bluetooth.classic.smp.bonding:
platform_allow:
- native_sim
integration_platforms:
- native_sim
tags:
- bluetooth
- smp
harness: pytest
harness_config:
pytest_dut_scope: session
fixture: usb_hci
timeout: 180
bluetooth.classic.smp.bonding.no_blobs:
platform_allow:
- mimxrt1170_evk@B/mimxrt1176/cm7
tags:
- bluetooth
- smp
extra_args:
- CONFIG_BUILD_ONLY_NO_BLOBS=y
timeout: 180
build_only: true