feat: 集成 waveform_trace 波形调试工具
新增功能: - waveformTracer.ts: 调用 waveform_trace.exe 的工具实现 - toolExecutor.ts: 添加 waveform_trace 工具分发 - types/api.ts: 添加 WaveformTraceArgs 类型定义 工具源码 (tools/waveform_trace/src/): - AST 解析 + BFS 信号追踪 - VCD 波形解析 - 修复通用 testbench 支持 配置文件: - .gitignore: 排除 exe 和打包产物 - .vscodeignore: 发布时排除源码 - build.bat/build.sh: 打包脚本
This commit is contained in:
335
tools/waveform_trace/src/vcd_waveform_analyzer.py
Normal file
335
tools/waveform_trace/src/vcd_waveform_analyzer.py
Normal file
@ -0,0 +1,335 @@
|
||||
#
|
||||
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
# Author : Chia-Tung (Mark) Ho, NVIDIA
|
||||
#
|
||||
|
||||
from vcdvcd import VCDVCD, binary_string_to_hex, StreamParserCallbacks
|
||||
import math
|
||||
import io, re
|
||||
import pandas as pd
|
||||
from typing import List
|
||||
import subprocess, os
|
||||
from debug_graph_analyzer import DebugGraph
|
||||
|
||||
class CustomCallback(StreamParserCallbacks):
|
||||
def __init__(self, printIds={}, lines=20, offset=0):
|
||||
self._printIdx = printIds
|
||||
self._references_to_widths = {}
|
||||
self.lines = 20
|
||||
self.counter = 0
|
||||
self.offset = offset
|
||||
|
||||
def enddefinitions(
|
||||
self,
|
||||
vcd,
|
||||
signals,
|
||||
cur_sig_vals
|
||||
):
|
||||
vcd.io = io.StringIO()
|
||||
self._printIdx = self._printIdx if self._printIdx else {i: i.split('.')[-1] for i in vcd.signals}
|
||||
|
||||
if signals:
|
||||
self._print_dumps_refs = signals
|
||||
else:
|
||||
self._print_dumps_refs = sorted(vcd.data[i].references[0] for i in cur_sig_vals.keys())
|
||||
|
||||
for i, ref in enumerate(self._print_dumps_refs, 1):
|
||||
if i == 0:
|
||||
i = 1
|
||||
identifier_code = vcd.references_to_ids[ref]
|
||||
size = int(vcd.data[identifier_code].size)
|
||||
width = max(((size // 4)), int(math.floor(math.log10(i))) + 1)
|
||||
self._references_to_widths[ref] = width
|
||||
|
||||
to_print = '// {0:<16}'.format('time')
|
||||
for ref in vcd.signals:
|
||||
string = '{0:>{1}s}'.format(self._printIdx[ref], self._references_to_widths.get(ref, 1))
|
||||
to_print += '{0:<16}'.format(string)
|
||||
|
||||
print(to_print, file=vcd.io)
|
||||
|
||||
def time(
|
||||
self,
|
||||
vcd,
|
||||
time,
|
||||
cur_sig_vals
|
||||
):
|
||||
self.counter += 1
|
||||
|
||||
if self.counter > self.offset + self.lines or self.counter < self.offset:
|
||||
return
|
||||
|
||||
if (vcd.signal_changed):
|
||||
ss = []
|
||||
ss.append('// {0:<16}'.format(str(time) + 'ns'))
|
||||
for ref in self._printIdx:
|
||||
identifier_code = vcd.references_to_ids[ref]
|
||||
value = cur_sig_vals[identifier_code]
|
||||
string = '{0:>{1}s}'.format(
|
||||
binary_string_to_hex(value),
|
||||
self._references_to_widths.get(ref, 1))
|
||||
ss.append('{0:<16}'.format(string))
|
||||
print(''.join(ss), file=vcd.io)
|
||||
|
||||
# extract raw name from dut and ref
|
||||
def get_raw_signal_name(name: str) -> str:
|
||||
if "_dut" in name:
|
||||
match = re.search("_dut", name)
|
||||
return name[:match.start()]
|
||||
elif "_ref" in name:
|
||||
match = re.search("_ref", name)
|
||||
return name[:match.start()]
|
||||
elif "_tb" in name:
|
||||
match = re.search("_tb", name)
|
||||
return name[:match.start()]
|
||||
else:
|
||||
return name
|
||||
|
||||
def tabular_via_callback(vcd_path, offset: int, mismatch_columns: List[str], window_size: int = 10, ori_mismatch_columns: list[str]=[]):
|
||||
vcd = VCDVCD(vcd_path, callbacks=CustomCallback(offset=offset, lines=window_size), store_tvs=False, only_sigs=False)
|
||||
tabular_text = vcd.io.getvalue()
|
||||
return tabular_text
|
||||
|
||||
|
||||
def tabular_via_dataframe(vcd_path, offset: int, mismatch_columns: List[str], window_size: int = 5,
|
||||
ori_mismatch_columns: list[str]=[]):
|
||||
# print('generated tabular columns = ', mismatch_columns, ' offset =', offset)
|
||||
# from scipy.sparse import csc_matrix
|
||||
import numpy as np
|
||||
|
||||
def insert_field_before_bracket(string, field):
|
||||
index = string.find('[') # Find the index of '['
|
||||
if index != -1: # If '[' is found
|
||||
string = string[:index] + field + string[index:] # Insert '_ref'/_dut before '['
|
||||
else:
|
||||
string += field
|
||||
return string
|
||||
|
||||
vcd = VCDVCD(vcd_path)
|
||||
n_row = vcd.endtime + 1
|
||||
n_col = len(vcd.signals)
|
||||
# fill in the waveform to the np array
|
||||
matrix = np.full((n_row, n_col), np.nan, dtype=float)
|
||||
for e, ref in enumerate(vcd.signals):
|
||||
symbol = vcd.references_to_ids[ref]
|
||||
for ts, signal in vcd.data[symbol].tv:
|
||||
try:
|
||||
matrix[ts, e] = int(signal) if signal.isdigit() else -999
|
||||
except:
|
||||
matrix[ts, e] = -999
|
||||
|
||||
# Deal with the signal names including the module inside.
|
||||
# 通用模式:直接提取信号名(去掉层次前缀)
|
||||
transformed_signals = []
|
||||
tb_signals = []
|
||||
for signal in vcd.signals:
|
||||
signal_fields = signal.split('.')
|
||||
# 直接使用最后一个字段作为信号名
|
||||
signal_name = signal_fields[-1]
|
||||
|
||||
# 如果有多个同名信号,添加模块前缀区分
|
||||
if len(signal_fields) >= 2:
|
||||
module_name = signal_fields[-2]
|
||||
# 顶层 testbench 信号
|
||||
if len(signal_fields) == 2:
|
||||
transformed_signals.append(signal_name)
|
||||
tb_signals.append(signal_name)
|
||||
else:
|
||||
# DUT 内部信号,添加模块前缀避免重名
|
||||
transformed_signals.append(f"{module_name}_{signal_name}")
|
||||
else:
|
||||
transformed_signals.append(signal_name)
|
||||
assert(len(transformed_signals) == n_col)
|
||||
|
||||
df = pd.DataFrame(matrix, columns=[i.split(".")[-1] for i in transformed_signals]).dropna(subset='clk')
|
||||
df = df.fillna(method='ffill')
|
||||
df = df.loc[:, ~df.columns.duplicated()]
|
||||
|
||||
# 通用模式:直接匹配追踪信号(不依赖 _dut/_ref 后缀)
|
||||
matched_columns = []
|
||||
for col in df.columns:
|
||||
raw_name = get_raw_signal_name(col)
|
||||
# 检查是否在追踪信号列表中
|
||||
for target in mismatch_columns:
|
||||
if target == raw_name or target in col or col.endswith(target):
|
||||
matched_columns.append(col)
|
||||
break
|
||||
|
||||
# 去重
|
||||
matched_columns = list(dict.fromkeys(matched_columns))
|
||||
|
||||
# 如果没有匹配到任何信号,使用所有列
|
||||
if not matched_columns:
|
||||
matched_columns = list(df.columns)
|
||||
|
||||
mismatch_columns = matched_columns
|
||||
first_row = df.loc[0: 1][mismatch_columns]
|
||||
# tail_rows = df.loc[1: offset + 1][mismatch_columns]
|
||||
# Whether to drop duplicates?
|
||||
tail_rows = df.loc[1: offset + 1][mismatch_columns].drop_duplicates(keep='first')
|
||||
# Mark: Keep 4 clock cycles
|
||||
if offset + window_size > df.shape[0]:
|
||||
future_rows = df.loc[offset + 1:][mismatch_columns]
|
||||
else:
|
||||
future_rows = df.loc[offset + 1: offset + window_size][mismatch_columns]
|
||||
# print('future row = ', future_rows.shape)
|
||||
# tail_rows = df.loc[1: offset + 1][mismatch_columns]
|
||||
# df = pd.concat([first_row, tail_rows, future_rows])[-window_size:]
|
||||
if offset > window_size:
|
||||
df = pd.concat([first_row, tail_rows])[-window_size:]
|
||||
else:
|
||||
df = pd.concat([first_row, tail_rows])
|
||||
df = df.astype(int).astype(str).applymap(lambda x: binary_string_to_hex(x) if x != -999 else 'x')
|
||||
|
||||
# 通用模式:不再依赖 _dut/_ref 对比,直接显示波形
|
||||
df = df.sort_index(axis=1) # sort the signal
|
||||
df.index.names = ['time(ns)']
|
||||
|
||||
# 生成二进制字符串(用于多位信号)
|
||||
binary_string_mismatch = {}
|
||||
for ms in mismatch_columns:
|
||||
if bool(re.search('\[\d+:0\]', ms)):
|
||||
# matched_res = re.search('\[\d+:0\]', ms); change to binary
|
||||
try:
|
||||
ms_binary = ''.join(bin(int(c, 16))[2:].zfill(4) for c in df.iloc[-1][ms])
|
||||
binary_string_mismatch[ms] = ms_binary
|
||||
except:
|
||||
binary_string_mismatch[ms] = str(df.iloc[-1][ms])
|
||||
print("Failed to transform to binary ", ms, " target line: ", df.iloc[-1][ms])
|
||||
else:
|
||||
binary_string_mismatch[ms] = str(df.iloc[-1][ms])
|
||||
binary_string_mismatch = dict(sorted(binary_string_mismatch.items()))
|
||||
waveform = "### First mismatched signals time(ns) Trace ###\n" + df.to_string(header=True, index=True) + \
|
||||
"\n### First mismatched signals time(ns) End ###\n"
|
||||
# df.loc[offset + 1:].to_string(header=True, index=True)
|
||||
if len(binary_string_mismatch) != 0:
|
||||
waveform += "The values of mismatched signals at the first mismatched signal time above:\n"
|
||||
for ms, bin_signal in binary_string_mismatch.items():
|
||||
waveform += ms + ": " + bin_signal + "\n"
|
||||
|
||||
# Reference the future waveforms
|
||||
if df.shape[0] < window_size:
|
||||
print('data frame shape = ', df.shape[0], " ", window_size)
|
||||
df = pd.concat([first_row, tail_rows, future_rows])
|
||||
if df.shape[0] > window_size + 2:
|
||||
df = df[-(window_size + 2):]
|
||||
df = df.astype(int).astype(str).applymap(lambda x: binary_string_to_hex(x) if x != -999 else 'x')
|
||||
df = df.sort_index(axis=1) # sort the signal
|
||||
df.index.names = ['time(ns)']
|
||||
waveform += "\n### Mismatched signals time(ns) Trace After the First Mismatch ###\n" + df.to_string(header=True, index=True) + \
|
||||
"\n### Mismatched signals time(ns) Trace After the First Mismatch End ###\n"
|
||||
return waveform
|
||||
|
||||
def parse_mismatch(test_output: str):
|
||||
mismatch = {}
|
||||
prefix = "First mismatch occurred at time"
|
||||
for line in test_output.split('\n'):
|
||||
if prefix in line:
|
||||
# signal name
|
||||
st = line.find("Output '")
|
||||
ed = line.find("' ")
|
||||
signal_name = line[st + 8:ed]
|
||||
|
||||
# timestep
|
||||
st = line.find(prefix)
|
||||
mismatch_timestep = int(line[st + len(prefix):-1].strip())
|
||||
|
||||
mismatch[signal_name] = mismatch_timestep
|
||||
|
||||
first_mismatch_timestep = min(mismatch.values())
|
||||
return list(mismatch.keys()), first_mismatch_timestep
|
||||
|
||||
|
||||
def get_tabular(method: str, vcd_path: str, mismatch_columns: list[str], offset:int, window_size: int=20,
|
||||
ori_mismatch_columns: list[str]=[]):
|
||||
with open(vcd_path, 'r', encoding='utf-8') as f:
|
||||
waveform = f.read()
|
||||
f.close()
|
||||
# print('waveform = ', waveform)
|
||||
tmp_vcd_path = os.path.dirname(os.path.abspath(vcd_path)) + '/tmp.vcd'
|
||||
# print('tmp_vcd_path = ', tmp_vcd_path)
|
||||
|
||||
with open(tmp_vcd_path, "w", encoding='utf-8') as f:
|
||||
f.write(waveform)
|
||||
f.seek(0)
|
||||
|
||||
gen_func = {
|
||||
'callback': tabular_via_callback,
|
||||
'dataframe': tabular_via_dataframe,
|
||||
}.get(method)
|
||||
if gen_func is None:
|
||||
raise Exception(f"get tabular do not support {method} method.")
|
||||
|
||||
return gen_func(tmp_vcd_path, offset, mismatch_columns, window_size, ori_mismatch_columns)
|
||||
|
||||
|
||||
# From Yun-Da; Probably will not use it
|
||||
class WaveformTabular():
|
||||
|
||||
def _run(self, vcd_path: str, test_output: str):
|
||||
|
||||
def parse_mismatch(test_output: str):
|
||||
mismatch = {}
|
||||
prefix = "First mismatch occurred at time"
|
||||
for line in test_output.split('\n'):
|
||||
if prefix in line:
|
||||
# signal name
|
||||
st = line.find("Output '")
|
||||
ed = line.find("' ")
|
||||
signal_name = line[st + 8:ed]
|
||||
|
||||
# timestep
|
||||
st = line.find(prefix)
|
||||
mismatch_timestep = int(line[st + len(prefix):-1].strip())
|
||||
|
||||
mismatch[signal_name] = mismatch_timestep
|
||||
|
||||
first_mismatch_timestep = min(mismatch.values())
|
||||
return list(mismatch.keys()), first_mismatch_timestep
|
||||
|
||||
def get_tabular(method: str, vcd_path: str):
|
||||
with open(vcd_path, 'r', encoding='utf-8') as f:
|
||||
waveform = f.read()
|
||||
f.close()
|
||||
# print('waveform = ', waveform)
|
||||
tmp_vcd_path = os.path.dirname(os.path.abspath(vcd_path)) + '/tmp.vcd'
|
||||
# print('tmp_vcd_path = ', tmp_vcd_path)
|
||||
|
||||
with open(tmp_vcd_path, "w", encoding='utf-8') as f:
|
||||
f.write(waveform)
|
||||
f.seek(0)
|
||||
|
||||
mismatch_columns, offset = parse_mismatch(test_output)
|
||||
mismatch_columns.extend(["counter", "state", "done", "in", "data", "byte_r", ])
|
||||
window_size = 20
|
||||
|
||||
gen_func = {
|
||||
'callback': tabular_via_callback,
|
||||
'dataframe': tabular_via_dataframe,
|
||||
}.get(method)
|
||||
if gen_func is None:
|
||||
raise Exception(f"get tabular do not support {method} method.")
|
||||
|
||||
return gen_func(tmp_vcd_path, offset, mismatch_columns, window_size)
|
||||
|
||||
tabular = get_tabular('dataframe', vcd_path)
|
||||
return tabular
|
||||
|
||||
if __name__ == '__main__':
|
||||
vcd_waveanalyze = WaveformTabular()
|
||||
cmds = "vvp /home/scratch.chiatungh_nvresearch/hardware-agent-marco/verilog_tool_tmp/test.vvp".split(' ')
|
||||
print(" ".join(cmds))
|
||||
try:
|
||||
test_output = subprocess.check_output(cmds, stderr=subprocess.STDOUT)
|
||||
test_output = test_output.decode("utf-8")
|
||||
print(test_output)
|
||||
except subprocess.CalledProcessError as e:
|
||||
print('Exception')
|
||||
raise RuntimeError("command '{}' return with error (code {}): {}".format(e.cmd, e.returncode, e.output))
|
||||
|
||||
debug_wave = vcd_waveanalyze._run(vcd_path="./wave.vcd",
|
||||
test_output=test_output)
|
||||
if isinstance(debug_wave, str):
|
||||
print(debug_wave)
|
||||
|
||||
Reference in New Issue
Block a user