zephyr/scripts/logging/dictionary/dictionary_parser/log_parser_v3.py
Georges Oates_Larsen a4599219b7 logging: dictionary: Support unsigned integers
Add unsigned integer support to the log parser.

This does not change the underlying log format,
it only allows the log parser to more accurately
read the log format.

Signed-off-by: Georges Oates_Larsen <georges.larsen@nordicsemi.no>
2024-10-03 11:40:41 +01:00

386 lines
12 KiB
Python

#!/usr/bin/env python3
#
# Copyright (c) 2021, 2024 Intel Corporation
#
# SPDX-License-Identifier: Apache-2.0
# Parsers are gonig to have very similar code.
# So tell pylint not to care.
# pylint: disable=duplicate-code
"""
Dictionary-based Logging Parser Version 3
This contains the implementation of the parser for
version 3 databases.
"""
import logging
import struct
import colorama
from colorama import Fore
from .log_parser import (LogParser, get_log_level_str_color, formalize_fmt_string)
from .data_types import DataTypes
HEX_BYTES_IN_LINE = 16
# Need to keep sync with struct log_dict_output_msg_hdr in
# include/logging/log_output_dict.h.
#
# struct log_dict_output_normal_msg_hdr_t {
# uint8_t type;
# uint32_t domain:4;
# uint32_t level:4;
# uint32_t package_len:16;
# uint32_t data_len:16;
# uintptr_t source;
# log_timestamp_t timestamp;
# } __packed;
#
# Note "type" and "timestamp" are encoded separately below.
FMT_MSG_HDR_32 = "BHHI"
FMT_MSG_HDR_64 = "BHHQ"
# Message type
# 0: normal message
# 1: number of dropped messages
FMT_MSG_TYPE = "B"
# Depends on CONFIG_LOG_TIMESTAMP_64BIT
FMT_MSG_TIMESTAMP_32 = "I"
FMT_MSG_TIMESTAMP_64 = "Q"
# Keep message types in sync with include/logging/log_output_dict.h
MSG_TYPE_NORMAL = 0
MSG_TYPE_DROPPED = 1
# Number of dropped messages
FMT_DROPPED_CNT = "H"
logger = logging.getLogger("parser")
class LogParserV3(LogParser):
"""Log Parser V1"""
def __init__(self, database):
super().__init__(database=database)
if self.database.is_tgt_little_endian():
endian = "<"
self.is_big_endian = False
else:
endian = ">"
self.is_big_endian = True
self.fmt_msg_type = endian + FMT_MSG_TYPE
self.fmt_dropped_cnt = endian + FMT_DROPPED_CNT
if self.database.is_tgt_64bit():
self.fmt_msg_hdr = endian + FMT_MSG_HDR_64
else:
self.fmt_msg_hdr = endian + FMT_MSG_HDR_32
if "CONFIG_LOG_TIMESTAMP_64BIT" in self.database.get_kconfigs():
self.fmt_msg_timestamp = endian + FMT_MSG_TIMESTAMP_64
else:
self.fmt_msg_timestamp = endian + FMT_MSG_TIMESTAMP_32
def __get_string(self, arg, arg_offset, string_tbl):
one_str = self.database.find_string(arg)
if one_str is not None:
ret = one_str
else:
# The index from the string table is basically
# the order in va_list. Need to add to the index
# to skip the packaged string header and
# the format string.
str_idx = arg_offset + self.data_types.get_sizeof(DataTypes.PTR) * 2
str_idx /= self.data_types.get_sizeof(DataTypes.INT)
if int(str_idx) not in string_tbl:
ret = f'<string@0x{arg:x}>'
else:
ret = string_tbl[int(str_idx)]
return ret
def process_one_fmt_str(self, fmt_str, arg_list, string_tbl):
"""Parse the format string to extract arguments from
the binary arglist and return a tuple usable with
Python's string formatting"""
idx = 0
arg_offset = 0
arg_data_type = None
is_parsing = False
do_extract = False
args = []
# Translated from cbvprintf_package()
for idx, fmt in enumerate(fmt_str):
if not is_parsing:
if fmt == '%':
is_parsing = True
arg_data_type = DataTypes.INT
continue
elif fmt == '%':
# '%%' -> literal percentage sign
is_parsing = False
continue
elif fmt == '*':
pass
elif fmt.isdecimal() or str.lower(fmt) == 'l' \
or fmt in (' ', '#', '-', '+', '.', 'h'):
# formatting modifiers, just ignore
continue
elif fmt in ('j', 'z', 't'):
# intmax_t, size_t or ptrdiff_t
arg_data_type = DataTypes.LONG
elif fmt in ('c', 'd', 'i', 'o', 'u', 'x', 'X'):
unsigned = fmt in ('c', 'o', 'u', 'x', 'X')
if fmt_str[idx - 1] == 'l':
if fmt_str[idx - 2] == 'l':
arg_data_type = DataTypes.ULONG_LONG if unsigned else DataTypes.LONG_LONG
else:
arg_data_type = DataTypes.ULONG if unsigned else DataTypes.LONG
else:
arg_data_type = DataTypes.UINT if unsigned else DataTypes.INT
is_parsing = False
do_extract = True
elif fmt in ('s', 'p', 'n'):
arg_data_type = DataTypes.PTR
is_parsing = False
do_extract = True
elif str.lower(fmt) in ('a', 'e', 'f', 'g'):
# Python doesn't do"long double".
#
# Parse it as double (probably incorrect), but
# still have to skip enough bytes.
if fmt_str[idx - 1] == 'L':
arg_data_type = DataTypes.LONG_DOUBLE
else:
arg_data_type = DataTypes.DOUBLE
is_parsing = False
do_extract = True
else:
is_parsing = False
continue
if do_extract:
do_extract = False
align = self.data_types.get_alignment(arg_data_type)
size = self.data_types.get_sizeof(arg_data_type)
unpack_fmt = self.data_types.get_formatter(arg_data_type)
# Align the argument list by rounding up
stack_align = self.data_types.get_stack_alignment(arg_data_type)
if stack_align > 1:
arg_offset = int((arg_offset + (align - 1)) / align) * align
one_arg = struct.unpack_from(unpack_fmt, arg_list, arg_offset)[0]
if fmt == 's':
one_arg = self.__get_string(one_arg, arg_offset, string_tbl)
args.append(one_arg)
arg_offset += size
# Align the offset
if stack_align > 1:
arg_offset = int((arg_offset + align - 1) / align) * align
return tuple(args)
@staticmethod
def extract_string_table(str_tbl):
"""Extract string table in a packaged log message"""
tbl = {}
one_str = ""
next_new_string = True
# Translated from cbvprintf_package()
for one_ch in str_tbl:
if next_new_string:
str_idx = one_ch
next_new_string = False
continue
if one_ch == 0:
tbl[str_idx] = one_str
one_str = ""
next_new_string = True
continue
one_str += chr(one_ch)
return tbl
@staticmethod
def print_hexdump(hex_data, prefix_len, color):
"""Print hex dump"""
hex_vals = ""
chr_vals = ""
chr_done = 0
for one_hex in hex_data:
hex_vals += f'{one_hex:02x} '
chr_vals += chr(one_hex)
chr_done += 1
if chr_done == HEX_BYTES_IN_LINE / 2:
hex_vals += " "
chr_vals += " "
elif chr_done == HEX_BYTES_IN_LINE:
print(f"{color}%s%s|%s{Fore.RESET}" % ((" " * prefix_len),
hex_vals, chr_vals))
hex_vals = ""
chr_vals = ""
chr_done = 0
if len(chr_vals) > 0:
hex_padding = " " * (HEX_BYTES_IN_LINE - chr_done)
print(f"{color}%s%s%s|%s{Fore.RESET}" % ((" " * prefix_len),
hex_vals, hex_padding, chr_vals))
def parse_one_normal_msg(self, logdata, offset):
"""Parse one normal log message and print the encoded message"""
# Parse log message header
domain_lvl, pkg_len, data_len, source_id = struct.unpack_from(self.fmt_msg_hdr,
logdata, offset)
offset += struct.calcsize(self.fmt_msg_hdr)
timestamp = struct.unpack_from(self.fmt_msg_timestamp, logdata, offset)[0]
offset += struct.calcsize(self.fmt_msg_timestamp)
# domain_id, level
if self.is_big_endian:
level = domain_lvl & 0x0F
domain_id = (domain_lvl >> 4) & 0x0F
else:
domain_id = domain_lvl & 0x0F
level = (domain_lvl >> 4) & 0x0F
level_str, color = get_log_level_str_color(level)
source_id_str = self.database.get_log_source_string(domain_id, source_id)
# Skip over data to point to next message (save as return value)
next_msg_offset = offset + pkg_len + data_len
# Offset from beginning of cbprintf_packaged data to end of va_list arguments
offset_end_of_args = struct.unpack_from("B", logdata, offset)[0]
offset_end_of_args *= self.data_types.get_sizeof(DataTypes.INT)
offset_end_of_args += offset
# Extra data after packaged log
extra_data = logdata[(offset + pkg_len):next_msg_offset]
# Number of appended strings in package
num_packed_strings = struct.unpack_from("B", logdata, offset+1)[0]
# Number of read-only string indexes
num_ro_str_indexes = struct.unpack_from("B", logdata, offset+2)[0]
offset_end_of_args += num_ro_str_indexes
# Number of read-write string indexes
num_rw_str_indexes = struct.unpack_from("B", logdata, offset+3)[0]
offset_end_of_args += num_rw_str_indexes
# Extract the string table in the packaged log message
string_tbl = self.extract_string_table(logdata[offset_end_of_args:(offset + pkg_len)])
if len(string_tbl) != num_packed_strings:
logger.error("------ Error extracting string table")
return None
# Skip packaged string header
offset += self.data_types.get_sizeof(DataTypes.PTR)
# Grab the format string
#
# Note the negative offset to __get_string(). It is because
# the offset begins at 0 for va_list. However, the format string
# itself is before the va_list, so need to go back the width of
# a pointer.
fmt_str_ptr = struct.unpack_from(self.data_types.get_formatter(DataTypes.PTR),
logdata, offset)[0]
fmt_str = self.__get_string(fmt_str_ptr,
-self.data_types.get_sizeof(DataTypes.PTR),
string_tbl)
offset += self.data_types.get_sizeof(DataTypes.PTR)
if not fmt_str:
logger.error("------ Error getting format string at 0x%x", fmt_str_ptr)
return None
args = self.process_one_fmt_str(fmt_str, logdata[offset:offset_end_of_args], string_tbl)
fmt_str = formalize_fmt_string(fmt_str)
log_msg = fmt_str % args
if level == 0:
print(f"{log_msg}", end='')
log_prefix = ""
else:
log_prefix = f"[{timestamp:>10}] <{level_str}> {source_id_str}: "
print(f"{color}%s%s{Fore.RESET}" % (log_prefix, log_msg))
if data_len > 0:
# Has hexdump data
self.print_hexdump(extra_data, len(log_prefix), color)
# Point to next message
return next_msg_offset
def parse_log_data(self, logdata, debug=False):
"""Parse binary log data and print the encoded log messages"""
offset = 0
while offset < len(logdata):
# Get message type
msg_type = struct.unpack_from(self.fmt_msg_type, logdata, offset)[0]
offset += struct.calcsize(self.fmt_msg_type)
if msg_type == MSG_TYPE_DROPPED:
num_dropped = struct.unpack_from(self.fmt_dropped_cnt, logdata, offset)
offset += struct.calcsize(self.fmt_dropped_cnt)
print(f"--- {num_dropped} messages dropped ---")
elif msg_type == MSG_TYPE_NORMAL:
ret = self.parse_one_normal_msg(logdata, offset)
if ret is None:
return False
offset = ret
else:
logger.error("------ Unknown message type: %s", msg_type)
return False
return True
colorama.init()