# # SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # Author : Chia-Tung (Mark) Ho, NVIDIA # from vcdvcd import VCDVCD, binary_string_to_hex, StreamParserCallbacks import math import io, re import pandas as pd from typing import List import subprocess, os from debug_graph_analyzer import DebugGraph class CustomCallback(StreamParserCallbacks): def __init__(self, printIds={}, lines=20, offset=0): self._printIdx = printIds self._references_to_widths = {} self.lines = 20 self.counter = 0 self.offset = offset def enddefinitions( self, vcd, signals, cur_sig_vals ): vcd.io = io.StringIO() self._printIdx = self._printIdx if self._printIdx else {i: i.split('.')[-1] for i in vcd.signals} if signals: self._print_dumps_refs = signals else: self._print_dumps_refs = sorted(vcd.data[i].references[0] for i in cur_sig_vals.keys()) for i, ref in enumerate(self._print_dumps_refs, 1): if i == 0: i = 1 identifier_code = vcd.references_to_ids[ref] size = int(vcd.data[identifier_code].size) width = max(((size // 4)), int(math.floor(math.log10(i))) + 1) self._references_to_widths[ref] = width to_print = '// {0:<16}'.format('time') for ref in vcd.signals: string = '{0:>{1}s}'.format(self._printIdx[ref], self._references_to_widths.get(ref, 1)) to_print += '{0:<16}'.format(string) print(to_print, file=vcd.io) def time( self, vcd, time, cur_sig_vals ): self.counter += 1 if self.counter > self.offset + self.lines or self.counter < self.offset: return if (vcd.signal_changed): ss = [] ss.append('// {0:<16}'.format(str(time) + 'ns')) for ref in self._printIdx: identifier_code = vcd.references_to_ids[ref] value = cur_sig_vals[identifier_code] string = '{0:>{1}s}'.format( binary_string_to_hex(value), self._references_to_widths.get(ref, 1)) ss.append('{0:<16}'.format(string)) print(''.join(ss), file=vcd.io) # extract raw name from dut and ref def get_raw_signal_name(name: str) -> str: if "_dut" in name: match = re.search("_dut", name) return name[:match.start()] elif "_ref" in name: match = re.search("_ref", name) return name[:match.start()] elif "_tb" in name: match = re.search("_tb", name) return name[:match.start()] else: return name def tabular_via_callback(vcd_path, offset: int, mismatch_columns: List[str], window_size: int = 10, ori_mismatch_columns: list[str]=[]): vcd = VCDVCD(vcd_path, callbacks=CustomCallback(offset=offset, lines=window_size), store_tvs=False, only_sigs=False) tabular_text = vcd.io.getvalue() return tabular_text def tabular_via_dataframe(vcd_path, offset: int, mismatch_columns: List[str], window_size: int = 5, ori_mismatch_columns: list[str]=[]): # print('generated tabular columns = ', mismatch_columns, ' offset =', offset) # from scipy.sparse import csc_matrix import numpy as np def insert_field_before_bracket(string, field): index = string.find('[') # Find the index of '[' if index != -1: # If '[' is found string = string[:index] + field + string[index:] # Insert '_ref'/_dut before '[' else: string += field return string vcd = VCDVCD(vcd_path) n_row = vcd.endtime + 1 n_col = len(vcd.signals) # fill in the waveform to the np array matrix = np.full((n_row, n_col), np.nan, dtype=float) for e, ref in enumerate(vcd.signals): symbol = vcd.references_to_ids[ref] for ts, signal in vcd.data[symbol].tv: try: matrix[ts, e] = int(signal) if signal.isdigit() else -999 except: matrix[ts, e] = -999 # Deal with the signal names including the module inside. # 通用模式:直接提取信号名(去掉层次前缀) transformed_signals = [] tb_signals = [] for signal in vcd.signals: signal_fields = signal.split('.') # 直接使用最后一个字段作为信号名 signal_name = signal_fields[-1] # 如果有多个同名信号,添加模块前缀区分 if len(signal_fields) >= 2: module_name = signal_fields[-2] # 顶层 testbench 信号 if len(signal_fields) == 2: transformed_signals.append(signal_name) tb_signals.append(signal_name) else: # DUT 内部信号,添加模块前缀避免重名 transformed_signals.append(f"{module_name}_{signal_name}") else: transformed_signals.append(signal_name) assert(len(transformed_signals) == n_col) df = pd.DataFrame(matrix, columns=[i.split(".")[-1] for i in transformed_signals]).dropna(subset='clk') df = df.fillna(method='ffill') df = df.loc[:, ~df.columns.duplicated()] # 通用模式:直接匹配追踪信号(不依赖 _dut/_ref 后缀) matched_columns = [] for col in df.columns: raw_name = get_raw_signal_name(col) # 检查是否在追踪信号列表中 for target in mismatch_columns: if target == raw_name or target in col or col.endswith(target): matched_columns.append(col) break # 去重 matched_columns = list(dict.fromkeys(matched_columns)) # 如果没有匹配到任何信号,使用所有列 if not matched_columns: matched_columns = list(df.columns) mismatch_columns = matched_columns first_row = df.loc[0: 1][mismatch_columns] # tail_rows = df.loc[1: offset + 1][mismatch_columns] # Whether to drop duplicates? tail_rows = df.loc[1: offset + 1][mismatch_columns].drop_duplicates(keep='first') # Mark: Keep 4 clock cycles if offset + window_size > df.shape[0]: future_rows = df.loc[offset + 1:][mismatch_columns] else: future_rows = df.loc[offset + 1: offset + window_size][mismatch_columns] # print('future row = ', future_rows.shape) # tail_rows = df.loc[1: offset + 1][mismatch_columns] # df = pd.concat([first_row, tail_rows, future_rows])[-window_size:] if offset > window_size: df = pd.concat([first_row, tail_rows])[-window_size:] else: df = pd.concat([first_row, tail_rows]) df = df.astype(int).astype(str).applymap(lambda x: binary_string_to_hex(x) if x != -999 else 'x') # 通用模式:不再依赖 _dut/_ref 对比,直接显示波形 df = df.sort_index(axis=1) # sort the signal df.index.names = ['time(ns)'] # 生成二进制字符串(用于多位信号) binary_string_mismatch = {} for ms in mismatch_columns: if bool(re.search('\[\d+:0\]', ms)): # matched_res = re.search('\[\d+:0\]', ms); change to binary try: ms_binary = ''.join(bin(int(c, 16))[2:].zfill(4) for c in df.iloc[-1][ms]) binary_string_mismatch[ms] = ms_binary except: binary_string_mismatch[ms] = str(df.iloc[-1][ms]) print("Failed to transform to binary ", ms, " target line: ", df.iloc[-1][ms]) else: binary_string_mismatch[ms] = str(df.iloc[-1][ms]) binary_string_mismatch = dict(sorted(binary_string_mismatch.items())) waveform = "### First mismatched signals time(ns) Trace ###\n" + df.to_string(header=True, index=True) + \ "\n### First mismatched signals time(ns) End ###\n" # df.loc[offset + 1:].to_string(header=True, index=True) if len(binary_string_mismatch) != 0: waveform += "The values of mismatched signals at the first mismatched signal time above:\n" for ms, bin_signal in binary_string_mismatch.items(): waveform += ms + ": " + bin_signal + "\n" # Reference the future waveforms if df.shape[0] < window_size: print('data frame shape = ', df.shape[0], " ", window_size) df = pd.concat([first_row, tail_rows, future_rows]) if df.shape[0] > window_size + 2: df = df[-(window_size + 2):] df = df.astype(int).astype(str).applymap(lambda x: binary_string_to_hex(x) if x != -999 else 'x') df = df.sort_index(axis=1) # sort the signal df.index.names = ['time(ns)'] waveform += "\n### Mismatched signals time(ns) Trace After the First Mismatch ###\n" + df.to_string(header=True, index=True) + \ "\n### Mismatched signals time(ns) Trace After the First Mismatch End ###\n" return waveform def parse_mismatch(test_output: str): mismatch = {} prefix = "First mismatch occurred at time" for line in test_output.split('\n'): if prefix in line: # signal name st = line.find("Output '") ed = line.find("' ") signal_name = line[st + 8:ed] # timestep st = line.find(prefix) mismatch_timestep = int(line[st + len(prefix):-1].strip()) mismatch[signal_name] = mismatch_timestep first_mismatch_timestep = min(mismatch.values()) return list(mismatch.keys()), first_mismatch_timestep def get_tabular(method: str, vcd_path: str, mismatch_columns: list[str], offset:int, window_size: int=20, ori_mismatch_columns: list[str]=[]): with open(vcd_path, 'r', encoding='utf-8') as f: waveform = f.read() f.close() # print('waveform = ', waveform) tmp_vcd_path = os.path.dirname(os.path.abspath(vcd_path)) + '/tmp.vcd' # print('tmp_vcd_path = ', tmp_vcd_path) with open(tmp_vcd_path, "w", encoding='utf-8') as f: f.write(waveform) f.seek(0) gen_func = { 'callback': tabular_via_callback, 'dataframe': tabular_via_dataframe, }.get(method) if gen_func is None: raise Exception(f"get tabular do not support {method} method.") return gen_func(tmp_vcd_path, offset, mismatch_columns, window_size, ori_mismatch_columns) # From Yun-Da; Probably will not use it class WaveformTabular(): def _run(self, vcd_path: str, test_output: str): def parse_mismatch(test_output: str): mismatch = {} prefix = "First mismatch occurred at time" for line in test_output.split('\n'): if prefix in line: # signal name st = line.find("Output '") ed = line.find("' ") signal_name = line[st + 8:ed] # timestep st = line.find(prefix) mismatch_timestep = int(line[st + len(prefix):-1].strip()) mismatch[signal_name] = mismatch_timestep first_mismatch_timestep = min(mismatch.values()) return list(mismatch.keys()), first_mismatch_timestep def get_tabular(method: str, vcd_path: str): with open(vcd_path, 'r', encoding='utf-8') as f: waveform = f.read() f.close() # print('waveform = ', waveform) tmp_vcd_path = os.path.dirname(os.path.abspath(vcd_path)) + '/tmp.vcd' # print('tmp_vcd_path = ', tmp_vcd_path) with open(tmp_vcd_path, "w", encoding='utf-8') as f: f.write(waveform) f.seek(0) mismatch_columns, offset = parse_mismatch(test_output) mismatch_columns.extend(["counter", "state", "done", "in", "data", "byte_r", ]) window_size = 20 gen_func = { 'callback': tabular_via_callback, 'dataframe': tabular_via_dataframe, }.get(method) if gen_func is None: raise Exception(f"get tabular do not support {method} method.") return gen_func(tmp_vcd_path, offset, mismatch_columns, window_size) tabular = get_tabular('dataframe', vcd_path) return tabular if __name__ == '__main__': vcd_waveanalyze = WaveformTabular() cmds = "vvp /home/scratch.chiatungh_nvresearch/hardware-agent-marco/verilog_tool_tmp/test.vvp".split(' ') print(" ".join(cmds)) try: test_output = subprocess.check_output(cmds, stderr=subprocess.STDOUT) test_output = test_output.decode("utf-8") print(test_output) except subprocess.CalledProcessError as e: print('Exception') raise RuntimeError("command '{}' return with error (code {}): {}".format(e.cmd, e.returncode, e.output)) debug_wave = vcd_waveanalyze._run(vcd_path="./wave.vcd", test_output=test_output) if isinstance(debug_wave, str): print(debug_wave)