zephyr/scripts/logging/dictionary/dictionary_parser/log_database.py
Daniel Leung 2e2c8cc931 scripts: logging/dictionary/sys-t: log module info in collateral
This updates the MIPI Sys-T collateral generation to include log
modules in the XML file. This allows the parser to show where
each log message is coming from.

Note that the addition of Name propety in sys:Client is to clarify
that this collateral is from Zephyr.

Also note he addition of Mask property in syst:Guid is for parser
to match this collateral to any incoming messages, as the module
IDs are encoded in the set bits in the GUID. Without the mask,
the parser will discard any messages with non-zero module IDs.
This is based on the observation of the reference parser,
systprint, where pseudo GUIDs are generated from the module/unit
IDs (origin unit) in the incoming messages for matching with
collaterals.

Signed-off-by: Daniel Leung <daniel.leung@intel.com>
2022-05-09 12:54:53 -05:00

321 lines
8.4 KiB
Python

#!/usr/bin/env python3
#
# Copyright (c) 2021 Intel Corporation
#
# SPDX-License-Identifier: Apache-2.0
"""
Class for Dictionary-based Logging Database
"""
import base64
import copy
import json
from .mipi_syst import gen_syst_xml_file
from .utils import extract_one_string_in_section
from .utils import find_string_in_mappings
ARCHS = {
"arc" : {
"kconfig": "CONFIG_ARC",
},
"arm" : {
"kconfig": "CONFIG_ARM",
},
"arm64" : {
"kconfig": "CONFIG_ARM64",
},
"mips" : {
"kconfig": "CONFIG_MIPS",
},
"sparc" : {
"kconfig": "CONFIG_SPARC",
},
"x86" : {
"kconfig": "CONFIG_X86",
},
"nios2" : {
"kconfig": "CONFIG_NIOS2",
# Small static strings are put into section "datas"
# so we need to include them also.
#
# See include/arch/nios2/linker.ld on .sdata.*
# for explanation.
"extra_string_section": ['datas'],
},
"riscv" : {
"kconfig": "CONFIG_RISCV",
},
"xtensa" : {
"kconfig": "CONFIG_XTENSA",
},
}
class LogDatabase():
"""Class of log database"""
# Update this if database format of dictionary based logging
# has changed
ZEPHYR_DICT_LOG_VER = 2
LITTLE_ENDIAN = True
BIG_ENDIAN = False
def __init__(self):
new_db = {}
new_db['version'] = self.ZEPHYR_DICT_LOG_VER
new_db['target'] = {}
new_db['log_subsys'] = {}
new_db['log_subsys']['log_instances'] = {}
new_db['build_id'] = None
new_db['arch'] = None
new_db['kconfigs'] = {}
self.database = new_db
def get_version(self):
"""Get Database Version"""
return self.database['version']
def get_build_id(self):
"""Get Build ID"""
return self.database['build_id']
def set_build_id(self, build_id):
"""Set Build ID in Database"""
self.database['build_id'] = build_id
def get_arch(self):
"""Get the Target Architecture"""
return self.database['arch']
def set_arch(self, arch):
"""Set the Target Architecture"""
self.database['arch'] = arch
def get_tgt_bits(self):
"""Get Target Bitness: 32 or 64"""
if 'bits' in self.database['target']:
return self.database['target']['bits']
return None
def set_tgt_bits(self, bits):
"""Set Target Bitness: 32 or 64"""
self.database['target']['bits'] = bits
def is_tgt_64bit(self):
"""Return True if target is 64-bit, False if 32-bit.
None if error."""
if 'bits' not in self.database['target']:
return None
if self.database['target']['bits'] == 32:
return False
if self.database['target']['bits'] == 64:
return True
return None
def get_tgt_endianness(self):
"""
Get Target Endianness.
Return True if little endian, False if big.
"""
if 'little_endianness' in self.database['target']:
return self.database['target']['little_endianness']
return None
def set_tgt_endianness(self, endianness):
"""
Set Target Endianness
True if little endian, False if big.
"""
self.database['target']['little_endianness'] = endianness
def is_tgt_little_endian(self):
"""Return True if target is little endian"""
if 'little_endianness' not in self.database['target']:
return None
return self.database['target']['little_endianness'] == self.LITTLE_ENDIAN
def get_string_mappings(self):
"""Get string mappings to database"""
return self.database['string_mappings']
def set_string_mappings(self, database):
"""Add string mappings to database"""
self.database['string_mappings'] = database
def has_string_mappings(self):
"""Return True if there are string mappings in database"""
if 'string_mappings' in self.database:
return True
return False
def has_string_sections(self):
"""Return True if there are any static string sections"""
if 'sections' not in self.database:
return False
return len(self.database['sections']) != 0
def __find_string_in_mappings(self, string_ptr):
"""
Find string pointed by string_ptr in the string mapping
list. Return None if not found.
"""
return find_string_in_mappings(self.database['string_mappings'], string_ptr)
def __find_string_in_sections(self, string_ptr):
"""
Find string pointed by string_ptr in the binary data
sections. Return None if not found.
"""
for _, sect in self.database['sections'].items():
one_str = extract_one_string_in_section(sect, string_ptr)
if one_str is not None:
return one_str
return None
def find_string(self, string_ptr):
"""Find string pointed by string_ptr in the database.
Return None if not found."""
one_str = None
if self.has_string_mappings():
one_str = self.__find_string_in_mappings(string_ptr)
if one_str is None and self.has_string_sections():
one_str = self.__find_string_in_sections(string_ptr)
return one_str
def add_log_instance(self, source_id, name, level, address):
"""Add one log instance into database"""
self.database['log_subsys']['log_instances'][source_id] = {
'source_id' : source_id,
'name' : name,
'level' : level,
'addr' : address,
}
def get_log_source_string(self, domain_id, source_id):
"""Get the source string based on source ID"""
# JSON stores key as string, so we need to convert
src_id = str(source_id)
if src_id in self.database['log_subsys']['log_instances']:
return self.database['log_subsys']['log_instances'][src_id]['name']
return f"unknown<{domain_id}:{source_id}>"
def add_kconfig(self, name, val):
"""Add a kconfig name-value pair into database"""
self.database['kconfigs'][name] = val
def get_kconfigs(self):
"""Return kconfig name-value pairs"""
return self.database['kconfigs']
@staticmethod
def read_json_database(db_file_name):
"""Read database from file and return a LogDatabase object"""
try:
with open(db_file_name, "r", encoding="iso-8859-1") as db_fd:
json_db = json.load(db_fd)
except (OSError, json.JSONDecodeError):
return None
# Decode data in JSON back into binary data
if 'sections' in json_db:
for _, sect in json_db['sections'].items():
sect['data'] = base64.b64decode(sect['data_b64'])
database = LogDatabase()
database.database = json_db
# JSON encodes the addresses in string mappings as literal strings.
# So convert them back to integers, as this is needed for partial
# matchings.
if database.has_string_mappings():
new_str_map = {}
for addr, one_str in database.get_string_mappings().items():
new_str_map[int(addr)] = one_str
database.set_string_mappings(new_str_map)
return database
@staticmethod
def write_json_database(db_file_name, database):
"""Write the database into file"""
json_db = copy.deepcopy(database.database)
# Make database object into something JSON can dump
if 'sections' in json_db:
for _, sect in json_db['sections'].items():
encoded = base64.b64encode(sect['data'])
sect['data_b64'] = encoded.decode('ascii')
del sect['data']
try:
with open(db_file_name, "w", encoding="iso-8859-1") as db_fd:
db_fd.write(json.dumps(json_db))
except OSError:
return False
return True
@staticmethod
def write_syst_database(db_file_name, database):
"""
Write the database into MIPI Sys-T Collateral XML file
"""
try:
with open(db_file_name, "w", encoding="iso-8859-1") as db_fd:
xml = gen_syst_xml_file(database)
db_fd.write(xml)
except OSError:
return False
return True