code


requirements.txt

python-can>=4.2.2
pandas>=2.0.0
openpyxl>=3.1.2
canoe-api>=1.0.0

can_handeler.py

from py_canoe import CANoe, wait
import pandas as pd
import time
from typing import Dict, Any, Optional
import logging
import os
from datetime import datetime
import shutil
import openpyxl
from openpyxl.styles import PatternFill, Font
from openpyxl.styles.colors import Color

class CANHandler:
def init(self, dbc_file: str, cfg_file: str, excel_file: str):

    self.dbc_file = dbc_file
    self.cfg_file = cfg_file
    self.excel_file = excel_file
    
    # Define colors for formatting
    self.PASS_COLOR = '90EE90'  # Light green
    self.FAIL_COLOR = 'FF0000'  # Red
    self.ERROR_COLOR = 'FF0000'  # Red for error text
    
    # Create results directory and setup logging
    self.setup_results_directory()
    self.setup_logging()
    
    # Initialize CANoe and load data
    self.initialize_canoe()
    self.copy_and_prepare_excel()
    
def setup_results_directory(self):
    """Create results directory with timestamp"""
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    self.results_dir = os.path.join(os.path.dirname(self.excel_file), "results", timestamp)
    os.makedirs(self.results_dir, exist_ok=True)
    
    # Create path for result excel and log file
    self.result_excel = os.path.join(self.results_dir, f"result_{os.path.basename(self.excel_file)}")
    self.log_file = os.path.join(self.results_dir, "execution.log")

def setup_logging(self):
    """Setup logging configuration"""
    self.logger = logging.getLogger(__name__)
    self.logger.setLevel(logging.INFO)
    
    # File handler
    file_handler = logging.FileHandler(self.log_file)
    file_handler.setLevel(logging.INFO)
    
    # Console handler
    console_handler = logging.StreamHandler()
    console_handler.setLevel(logging.INFO)
    
    # Create formatter
    formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
    file_handler.setFormatter(formatter)
    console_handler.setFormatter(formatter)
    
    # Add handlers
    self.logger.addHandler(file_handler)
    self.logger.addHandler(console_handler)

def initialize_canoe(self):
    """Initialize CANoe connection"""
    try:
        self.app = CANoe()
        self.app.open(canoe_cfg=self.cfg_file)
        self.app.add_database(self.dbc_file, 'CAN', 1)
        self.app.start_measurement()
        self.logger.info("CANoe initialized successfully")
    except Exception as e:
        self.logger.error(f"Failed to initialize CANoe: {str(e)}")
        raise

def copy_and_prepare_excel(self):
    """Copy Excel file to results directory"""
    try:
        # Copy Excel file
        shutil.copy2(self.excel_file, self.result_excel)
        self.logger.info(f"Excel file copied to: {self.result_excel}")
        
        # Load all sheets from Excel file
        self.excel_book = pd.ExcelFile(self.result_excel)
        self.sheet_names = self.excel_book.sheet_names
        
        self.logger.info("Excel file prepared successfully")
    except Exception as e:
        self.logger.error(f"Failed to prepare Excel file: {str(e)}")
        raise

def process_excel_sheets(self):
    """Process all sheets in the Excel file"""
    try:
        for sheet_name in self.sheet_names:
            self.logger.info(f"\nProcessing sheet: {sheet_name}")
            workbook = openpyxl.load_workbook(self.result_excel)
            sheet = workbook[sheet_name]
            
            # Get column indices
            header_row = [cell.value for cell in sheet[1]]
            col_indices = {
                'testcase': header_row.index('testcase number') + 1,
                'function': header_row.index('function name') + 1,
                'message': header_row.index('message') + 1,
                'signal': header_row.index('signal') + 1,
                'exact_value': header_row.index('Exact Value') + 1,
                'condition': header_row.index('Condition') + 1,
                'expected': header_row.index('Expected Value') + 1,
                'status': header_row.index('Status') + 1,
                'time': header_row.index('Step Execution time') + 1,
                'comments': header_row.index('Comments') + 1
            }
            
            # Process each row
            for row in range(2, sheet.max_row + 1):
                start_time = time.time()
                
                try:
                    # Get row values
                    function_name = str(sheet.cell(row=row, column=col_indices['function']).value).lower().strip()
                    message = str(sheet.cell(row=row, column=col_indices['message']).value)
                    signal = str(sheet.cell(row=row, column=col_indices['signal']).value)
                    exact_value = sheet.cell(row=row, column=col_indices['exact_value']).value
                    condition = str(sheet.cell(row=row, column=col_indices['condition']).value)
                    
                    # Execute function and get result
                    result, error = self.execute_function(function_name, message, signal, exact_value, condition)
                    
                    # Calculate execution time
                    execution_time = round(time.time() - start_time, 2)
                    
                    # Update cells
                    if error:
                        # Error case
                        sheet.cell(row=row, column=col_indices['expected']).value = "ERROR"
                        sheet.cell(row=row, column=col_indices['expected']).font = Font(color=self.ERROR_COLOR)
                        sheet.cell(row=row, column=col_indices['status']).value = "FAIL"
                        sheet.cell(row=row, column=col_indices['status']).fill = PatternFill(start_color=self.FAIL_COLOR, end_color=self.FAIL_COLOR, fill_type='solid')
                        sheet.cell(row=row, column=col_indices['comments']).value = str(error)
                    else:
                        # Success case
                        sheet.cell(row=row, column=col_indices['expected']).value = str(result)
                        
                        # Compare with exact value if it exists
                        if exact_value is not None:
                            try:
                                exact_value = float(exact_value)
                                result = float(result)
                                is_pass = exact_value == result
                            except ValueError:
                                is_pass = str(exact_value).strip() == str(result).strip()
                        else:
                            is_pass = True
                        
                        status_cell = sheet.cell(row=row, column=col_indices['status'])
                        status_cell.value = "PASS" if is_pass else "FAIL"
                        status_cell.fill = PatternFill(
                            start_color=self.PASS_COLOR if is_pass else self.FAIL_COLOR,
                            end_color=self.PASS_COLOR if is_pass else self.FAIL_COLOR,
                            fill_type='solid'
                        )
                        sheet.cell(row=row, column=col_indices['comments']).value = "OK" if is_pass else "Value mismatch"
                    
                    # Update execution time
                    sheet.cell(row=row, column=col_indices['time']).value = f"{execution_time}s"
                    
                except Exception as e:
                    self.logger.error(f"Error processing row {row}: {str(e)}")
                    sheet.cell(row=row, column=col_indices['expected']).value = "ERROR"
                    sheet.cell(row=row, column=col_indices['expected']).font = Font(color=self.ERROR_COLOR)
                    sheet.cell(row=row, column=col_indices['status']).value = "FAIL"
                    sheet.cell(row=row, column=col_indices['status']).fill = PatternFill(start_color=self.FAIL_COLOR, end_color=self.FAIL_COLOR, fill_type='solid')
                    sheet.cell(row=row, column=col_indices['comments']).value = str(e)
                    sheet.cell(row=row, column=col_indices['time']).value = f"{round(time.time() - start_time, 2)}s"
            
            # Save changes
            workbook.save(self.result_excel)
            self.logger.info(f"Completed processing sheet: {sheet_name}")
            
    except Exception as e:
        self.logger.error(f"Failed to process Excel sheets: {str(e)}")
        raise

def execute_function(self, function_name: str, message: str, signal: str, exact_value: Any, condition: str) -> tuple:
    """Execute a function and return result and error (if any)"""
    try:
        if function_name == 'read':
            result = self.read(message, signal)
            return result, None
            
        elif function_name == 'write':
            if exact_value is None:
                return None, "Value is required for write operation"
            self.write(message, signal, float(exact_value))
            return exact_value, None
            
        elif function_name == 'wait':
            if exact_value is None:
                return None, "Value is required for wait operation"
            self.wait(float(exact_value))
            return exact_value, None
            
        elif function_name == 'fault':
            if exact_value is None:
                return None, "Fault type is required"
            self.create_fault(str(exact_value))
            return exact_value, None
            
        elif function_name == 'clear_fault':
            if exact_value is None:
                return None, "Fault type is required for clear_fault"
            self.clear_fault(str(exact_value))
            return exact_value, None
            
        else:
            return None, f"Unknown function type: {function_name}"
            
    except Exception as e:
        return None, str(e)

def close(self):
    """Close CANoe connection and display results"""
    try:
        self.app.stop_measurement()
        self.app.quit()
        self.logger.info("\nCANoe connection closed")
        self.logger.info(f"\nResults saved to: {self.result_excel}")
        self.logger.info(f"Log file saved to: {self.log_file}")
    except Exception as e:
        self.logger.error(f"Failed to close CANoe connection: {str(e)}")
        raise

def _get_fault_details(self, fault_type: str) -> tuple:
    
    try:
        # Find the row where function type is 'fault' and matches the fault type
        fault_row = self.df[
            (self.df['function type'].str.lower() == 'fault') & 
            (self.df['value'].str.lower() == fault_type.lower())
        ].iloc[0]
        
        message_name = fault_row['message name']
        signal_name = fault_row['signal name']
        
        if pd.isna(message_name) or pd.isna(signal_name):
            raise ValueError(f"Missing message name or signal name for fault type: {fault_type}")
            
        return message_name, signal_name
    except IndexError:
        raise ValueError(f"Fault type '{fault_type}' not found in Excel file")
    except Exception as e:
        raise ValueError(f"Error getting fault details: {str(e)}")

def read(self, message_name: str, signal_name: str) -> float:
    """
    Read a signal value from the CAN bus
    
    Args:
        message_name (str): Name of the CAN message
        signal_name (str): Name of the signal to read
        
    Returns:
        float: The current value of the signal
    """
    try:
        value = self.app.get_signal_value(bus='CAN', channel=1, message=message_name, signal=signal_name, raw_value=False)
        self.logger.info(f"Read {signal_name} from {message_name}: {value}")
        return value
    except Exception as e:
        self.logger.error(f"Failed to read signal {signal_name}: {str(e)}")
        raise

def write(self, message_name: str, signal_name: str, value: float):
    
    try:
        self.app.set_signal_value(bus='CAN', channel=1, message=message_name, signal=signal_name, value=value, raw_value=False)
        self.logger.info(f"Wrote value {value} to {signal_name} in {message_name}")
    except Exception as e:
        self.logger.error(f"Failed to write to signal {signal_name}: {str(e)}")
        raise

def wait(self, seconds: float):
    
    wait(seconds)
    self.logger.info(f"Waited for {seconds} seconds")

def create_fault(self, fault_type: str):
    """
    Create a fault condition based on fault type specified in Excel file
    
    Args:
        fault_type (str): Type of fault ('short_to_battery', 'short_to_ground', 'over_current')
    """
    try:
        message_name, signal_name = self._get_fault_details(fault_type)
        
        if fault_type.lower() == "short_to_battery":
            self.write(message_name, signal_name, 5.0)  # Maximum voltage
        elif fault_type.lower() == "short_to_ground":
            self.write(message_name, signal_name, 0.0)  # Ground voltage
        elif fault_type.lower() == "over_current":
            self.write(message_name, signal_name, 100.0)  # High current value
        else:
            raise ValueError(f"Unknown fault type: {fault_type}")
        
        self.logger.info(f"Created {fault_type} fault for {signal_name}")
    except Exception as e:
        self.logger.error(f"Failed to create fault: {str(e)}")
        raise

def clear_fault(self, fault_type: str):
    
    try:
        message_name, signal_name = self._get_fault_details(fault_type)
        # Reset to nominal value (assuming 0 is nominal)
        self.write(message_name, signal_name, 0.0)
        self.logger.info(f"Cleared {fault_type} fault for {signal_name}")
    except Exception as e:
        self.logger.error(f"Failed to clear fault: {str(e)}")
        raise 
        
        

test_cases.xlsx

testcase number,function name,message,signal,Exact Value,Condition,Expected Value,Status,Step Execution time,Comments
1,read,Engine_Status,RPM,1500,>=,,,,
2,write,Engine_Control,Target_RPM,2000,=,,,,
3,wait,,,,,,,,2.0
4,read,Battery_Status,Voltage,12.5,>=,,,,
5,fault,Battery_Status,Voltage,short_to_battery,=,,,,
6,wait,,,,,,,,1.0
7,read,Battery_Status,Voltage,5.0,=,,,,
8,clear_fault,Battery_Status,Voltage,short_to_battery,=,,,,
9,read,Engine_Status,Temperature,90.0,<=,,,,
10,write,Engine_Control,Fan_Speed,100.0,=,,,,

example.py

from can_handler import CANHandler
import os

def main():
# Get the current directory
current_dir = os.path.dirname(os.path.abspath(file))

# Initialize the CAN handler with your files
handler = CANHandler(
    dbc_file="path/to/your.dbc",  # Replace with your DBC file path
    cfg_file="path/to/your.cfg",   # Replace with your CANoe config file path
    excel_file=os.path.join(current_dir, "test_cases.xlsx")
)

try:
    # Process all sheets in the Excel file
    handler.process_excel_sheets()
finally:
    # Always close the connection properly
    handler.close()

if name == "main":
main()

readme.md

"

CAN Testing Framework

A comprehensive Python framework for automated CAN bus testing using Vector CANoe. This framework allows you to define test cases in Excel and execute them automatically while capturing results and timing information.

Table of Contents

Prerequisites

  • Python 3.9 or higher
  • Vector CANoe software (v11 or higher)
  • Required Python packages:
    py_canoe>=3.0.4
    pandas>=2.0.0
    openpyxl>=3.1.2
    

Installation

  1. Install required packages:
pip install -r requirements.txt

Framework Overview

Project Structure

can_framework/
│
├── can_handler.py      # Main framework implementation
├── example.py         # Example usage script
├── test_cases.xlsx   # Test cases definition file
├── requirements.txt  # Python dependencies
│
├── results/          # Auto-generated results directory
│   └── YYYYMMDD_HHMMSS/  # Timestamped test results
│       ├── result_test_cases.xlsx  # Results file
│       └── execution.log          # Detailed log file

Key Components

  1. CANHandler Class (can_handler.py):

    class CANHandler:
        def __init__(self, dbc_file: str, cfg_file: str, excel_file: str):
            """
            Initialize the CAN Handler
            Args:
                dbc_file: Path to the DBC file
                cfg_file: Path to the CANoe configuration file
                excel_file: Path to the Excel file containing test cases
            """
    
  2. Example Usage (example.py):

    from can_handler import CANHandler
    import os
    
    def main():
        current_dir = os.path.dirname(os.path.abspath(__file__))
        handler = CANHandler(
            dbc_file="path/to/your.dbc",
            cfg_file="path/to/your.cfg",
            excel_file=os.path.join(current_dir, "test_cases.xlsx")
        )
        try:
            handler.process_excel_sheets()
        finally:
            handler.close()
    

Code Structure

Main Framework Implementation (can_handler.py)

  1. Initialization:

    def initialize_canoe(self):
        """Initialize CANoe connection"""
        self.app = CANoe()
        self.app.open(canoe_cfg=self.cfg_file)
        self.app.add_database(self.dbc_file, 'CAN', 1)
        self.app.start_measurement()
    
  2. Excel Processing:

    def process_excel_sheets(self):
        """Process all sheets in Excel file"""
        for sheet_name in self.sheet_names:
            # Process each row in the sheet
            for row in range(2, sheet.max_row + 1):
                # Execute test case and record results
    
  3. Function Execution:

    def execute_function(self, function_name: str, message: str, 
                        signal: str, exact_value: Any, condition: str) -> tuple:
        """Execute a test function and return result"""
        if function_name == 'read':
            result = self.read(message, signal)
            return result, None
        # ... other function implementations
    

Excel Format

Test Case File Structure (test_cases.xlsx)

| Column Name        | Description                    | Example           |
|-------------------|--------------------------------|-------------------|
| testcase number   | Unique test identifier         | 1                |
| function name     | Operation to perform           | read             |
| message           | CAN message name               | Engine_Status    |
| signal            | Signal name                    | RPM              |
| Exact Value       | Expected/Write value           | 1500             |
| Condition         | Comparison operator            | >=               |
| Expected Value    | Actual value (auto-filled)     | 1498             |
| Status            | Test result (auto-filled)      | PASS/FAIL        |
| Step Execution time| Time taken (auto-filled)      | 0.23s            |
| Comments          | Result details (auto-filled)   | OK/Error message |

Example Test Cases:

testcase number,function name,message,signal,Exact Value,Condition
1,read,Engine_Status,RPM,1500,>=
2,write,Engine_Control,Target_RPM,2000,=
3,wait,,,,2.0,
4,fault,Battery_Status,Voltage,short_to_battery,=

Functions Reference

Core Functions

  1. Read Operation

    def read(self, message_name: str, signal_name: str) -> float:
        """
        Read a signal value from CAN bus
        Returns: Current value of the signal
        """
    
  2. Write Operation

    def write(self, message_name: str, signal_name: str, value: float):
        """
        Write a value to a CAN signal
        """
    
  3. Wait Operation

    def wait(self, seconds: float):
        """
        Implement a delay
        """
    
  4. Fault Operations

    def create_fault(self, fault_type: str):
        """
        Create a fault condition
        Supported types: short_to_battery, short_to_ground, over_current
        """
    
    def clear_fault(self, fault_type: str):
        """
        Clear a previously set fault
        """
    

Usage Guide

1. Prepare Test Cases

Create an Excel file with your test cases:

testcase number,function name,message,signal,Exact Value,Condition
1,read,Engine_Status,RPM,1500,>=

2. Run Tests

# Initialize framework
handler = CANHandler(
    dbc_file="your.dbc",
    cfg_file="your.cfg",
    excel_file="test_cases.xlsx"
)

# Run tests
handler.process_excel_sheets()

3. Example Test Sequences

  1. Read and Verify Engine RPM:

    testcase number: 1
    function name: read
    message: Engine_Status
    signal: RPM
    Exact Value: 1500
    Condition: >=
    
  2. Create and Verify Fault:

    testcase number: 5
    function name: fault
    message: Battery_Status
    signal: Voltage
    Exact Value: short_to_battery
    Condition: =
    

Results and Logging

Results Excel File

  • Location: results/YYYYMMDD_HHMMSS/result_test_cases.xlsx
  • Contains:
    • Original test cases
    • Actual values
    • Pass/Fail status (color-coded)
    • Execution times
    • Error messages

Color Coding

  • PASS: Green background (90EE90)
  • FAIL: Red background (FF0000)
  • ERROR: Red text

Log File

  • Location: results/YYYYMMDD_HHMMSS/execution.log
  • Contains:
    2024-02-27 10:15:23 - INFO - CANoe initialized successfully
    2024-02-27 10:15:24 - INFO - Processing sheet: Sheet1
    2024-02-27 10:15:24 - INFO - Executing: read - Message: Engine_Status, Signal: RPM
    

Error Handling

The framework handles:

  1. CANoe connection issues
  2. Invalid test cases
  3. Execution errors
  4. File handling errors

Each error is:

  • Logged in execution.log
  • Marked in results Excel
  • Displayed in Comments column

Example Test Results

Successful Test:

Expected Value: 1500
Status: PASS (Green)
Comments: OK
Step Execution time: 0.23s

Failed Test:

Expected Value: 1500
Status: FAIL (Red)
Comments: Value mismatch (1498 != 1500)
Step Execution time: 0.25s

Error Case:

Expected Value: ERROR (Red text)
Status: FAIL (Red)
Comments: Signal not found: Invalid_Signal
Step Execution time: 0.15s

"

report_generator.py

import pandas as pd
import plotly.graph_objects as go
import plotly.express as px
from fpdf import FPDF
import os
from datetime import datetime
import jinja2
import webbrowser

class ReportGenerator:
def init(self, result_excel: str, log_file: str):
"""
Initialize Report Generator

    Args:
        result_excel (str): Path to the results Excel file
        log_file (str): Path to the log file
    """
    self.result_excel = result_excel
    self.log_file = log_file
    self.report_dir = os.path.join(os.path.dirname(result_excel), 'reports')
    os.makedirs(self.report_dir, exist_ok=True)
    
    # Load the results
    self.results_df = pd.read_excel(result_excel)
    with open(log_file, 'r') as f:
        self.log_content = f.readlines()

def generate_statistics(self):
    """Generate test statistics"""
    stats = {
        'total_tests': len(self.results_df),
        'passed_tests': len(self.results_df[self.results_df['Status'] == 'PASS']),
        'failed_tests': len(self.results_df[self.results_df['Status'] == 'FAIL']),
        'total_time': self.results_df['Step Execution time'].str.replace('s', '').astype(float).sum(),
        'avg_time': self.results_df['Step Execution time'].str.replace('s', '').astype(float).mean()
    }
    stats['pass_rate'] = (stats['passed_tests'] / stats['total_tests']) * 100
    return stats

def create_pie_chart(self):
    """Create pass/fail pie chart"""
    stats = self.generate_statistics()
    fig = go.Figure(data=[go.Pie(
        labels=['Passed', 'Failed'],
        values=[stats['passed_tests'], stats['failed_tests']],
        hole=.3,
        marker_colors=['#90EE90', '#FF0000']
    )])
    fig.update_layout(title='Test Results Distribution')
    return fig

def create_execution_time_chart(self):
    """Create execution time bar chart"""
    df = self.results_df.copy()
    df['execution_time'] = df['Step Execution time'].str.replace('s', '').astype(float)
    fig = px.bar(df, 
                x='testcase number',
                y='execution_time',
                color='Status',
                title='Execution Time by Test Case',
                labels={'execution_time': 'Execution Time (s)',
                       'testcase number': 'Test Case Number'},
                color_discrete_map={'PASS': '#90EE90', 'FAIL': '#FF0000'})
    return fig

def generate_pdf_report(self):
    """Generate PDF report"""
    stats = self.generate_statistics()
    pdf = FPDF()
    
    # Title Page
    pdf.add_page()
    pdf.set_font('Arial', 'B', 24)
    pdf.cell(0, 20, 'CAN Test Execution Report', ln=True, align='C')
    pdf.set_font('Arial', '', 12)
    pdf.cell(0, 10, f'Generated on: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}', ln=True, align='C')
    
    # Summary
    pdf.add_page()
    pdf.set_font('Arial', 'B', 16)
    pdf.cell(0, 10, 'Test Summary', ln=True)
    pdf.set_font('Arial', '', 12)
    pdf.cell(0, 10, f'Total Tests: {stats["total_tests"]}', ln=True)
    pdf.cell(0, 10, f'Passed Tests: {stats["passed_tests"]}', ln=True)
    pdf.cell(0, 10, f'Failed Tests: {stats["failed_tests"]}', ln=True)
    pdf.cell(0, 10, f'Pass Rate: {stats["pass_rate"]:.2f}%', ln=True)
    pdf.cell(0, 10, f'Total Execution Time: {stats["total_time"]:.2f}s', ln=True)
    pdf.cell(0, 10, f'Average Execution Time: {stats["avg_time"]:.2f}s', ln=True)
    
    # Test Results Table
    pdf.add_page()
    pdf.set_font('Arial', 'B', 16)
    pdf.cell(0, 10, 'Detailed Test Results', ln=True)
    pdf.set_font('Arial', 'B', 10)
    
    # Table headers
    cols = ['Test #', 'Function', 'Message', 'Signal', 'Status', 'Time', 'Comments']
    col_widths = [15, 25, 30, 30, 20, 20, 50]
    for i, col in enumerate(cols):
        pdf.cell(col_widths[i], 10, col, 1)
    pdf.ln()
    
    # Table data
    pdf.set_font('Arial', '', 8)
    for _, row in self.results_df.iterrows():
        pdf.cell(15, 10, str(row['testcase number']), 1)
        pdf.cell(25, 10, str(row['function name']), 1)
        pdf.cell(30, 10, str(row['message']), 1)
        pdf.cell(30, 10, str(row['signal']), 1)
        pdf.cell(20, 10, str(row['Status']), 1)
        pdf.cell(20, 10, str(row['Step Execution time']), 1)
        pdf.cell(50, 10, str(row['Comments']), 1)
        pdf.ln()
    
    # Save charts as images
    pie_chart = self.create_pie_chart()
    exec_chart = self.create_execution_time_chart()
    pie_chart.write_image(os.path.join(self.report_dir, "pie_chart.png"))
    exec_chart.write_image(os.path.join(self.report_dir, "exec_chart.png"))
    
    # Add charts to PDF
    pdf.add_page()
    pdf.image(os.path.join(self.report_dir, "pie_chart.png"), x=10, w=190)
    pdf.add_page()
    pdf.image(os.path.join(self.report_dir, "exec_chart.png"), x=10, w=190)
    
    # Save PDF
    report_path = os.path.join(self.report_dir, f'test_report_{datetime.now().strftime("%Y%m%d_%H%M%S")}.pdf')
    pdf.output(report_path)
    return report_path

def generate_html_report(self):
    """Generate HTML report"""
    stats = self.generate_statistics()
    
    # Create charts
    pie_chart = self.create_pie_chart()
    exec_chart = self.create_execution_time_chart()
    
    # Convert charts to HTML
    pie_chart_html = pie_chart.to_html(full_html=False)
    exec_chart_html = exec_chart.to_html(full_html=False)
    
    # Prepare template
    template_str = """
    <!DOCTYPE html>
    <html>
    <head>
        <title>CAN Test Execution Report</title>
        <style>
            body { font-family: Arial, sans-serif; margin: 20px; }
            .header { text-align: center; margin-bottom: 30px; }
            .summary { margin-bottom: 30px; }
            .charts { display: flex; flex-wrap: wrap; justify-content: space-around; }
            .chart { margin: 20px; }
            table { border-collapse: collapse; width: 100%; margin-top: 20px; }
            th, td { border: 1px solid #ddd; padding: 8px; text-align: left; }
            th { background-color: #f2f2f2; }
            .pass { background-color: #90EE90; }
            .fail { background-color: #FF0000; color: white; }
        </style>
    </head>
    <body>
        <div class="header">
            <h1>CAN Test Execution Report</h1>
            <p>Generated on: {{ timestamp }}</p>
        </div>
        
        <div class="summary">
            <h2>Test Summary</h2>
            <p>Total Tests: {{ stats.total_tests }}</p>
            <p>Passed Tests: {{ stats.passed_tests }}</p>
            <p>Failed Tests: {{ stats.failed_tests }}</p>
            <p>Pass Rate: {{ "%.2f"|format(stats.pass_rate) }}%</p>
            <p>Total Execution Time: {{ "%.2f"|format(stats.total_time) }}s</p>
            <p>Average Execution Time: {{ "%.2f"|format(stats.avg_time) }}s</p>
        </div>
        
        <div class="charts">
            <div class="chart">
                {{ pie_chart|safe }}
            </div>
            <div class="chart">
                {{ exec_chart|safe }}
            </div>
        </div>
        
        <h2>Detailed Test Results</h2>
        <table>
            <tr>
                <th>Test #</th>
                <th>Function</th>
                <th>Message</th>
                <th>Signal</th>
                <th>Status</th>
                <th>Time</th>
                <th>Comments</th>
            </tr>
            {% for _, row in results.iterrows() %}
            <tr class="{{ 'pass' if row.Status == 'PASS' else 'fail' }}">
                <td>{{ row['testcase number'] }}</td>
                <td>{{ row['function name'] }}</td>
                <td>{{ row['message'] }}</td>
                <td>{{ row['signal'] }}</td>
                <td>{{ row['Status'] }}</td>
                <td>{{ row['Step Execution time'] }}</td>
                <td>{{ row['Comments'] }}</td>
            </tr>
            {% endfor %}
        </table>
    </body>
    </html>
    """
    
    # Create and render template
    template = jinja2.Template(template_str)
    html_content = template.render(
        timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
        stats=stats,
        pie_chart=pie_chart_html,
        exec_chart=exec_chart_html,
        results=self.results_df
    )
    
    # Save HTML report
    report_path = os.path.join(self.report_dir, f'test_report_{datetime.now().strftime("%Y%m%d_%H%M%S")}.html')
    with open(report_path, 'w', encoding='utf-8') as f:
        f.write(html_content)
    
    return report_path

def generate_report(self, include_graphs=True, export_format="PDF", highlight_failures=True):
    
    if export_format.upper() == "PDF":
        return self.generate_pdf_report()
    elif export_format.upper() == "HTML":
        return self.generate_html_report()
    else:
        raise ValueError("Unsupported export format. Use 'PDF' or 'HTML'")