Loading...
Loading...
Vehicle network security testing framework for automotive CAN bus analysis and penetration testing
npx skill4agent add aradotso/security-skills s800-vehicle-network-security-testingSkill by ara.so — Security Skills collection.
# Clone the repository
git clone https://github.com/zhu-zhu666/S800-Vehicle-Network-Security-Testing-Framework.git
cd S800-Vehicle-Network-Security-Testing-Framework
# Install dependencies
pip install -r requirements.txt
# Install python-can for CAN bus communication
pip install python-can
# Set up SocketCAN interface (Linux)
sudo ip link set can0 type can bitrate 500000
sudo ip link set up can0import can
# Initialize CAN bus connection
bus = can.interface.Bus(channel='can0', bustype='socketcan')
# Read CAN messages
def read_can_traffic(duration=10):
"""Capture CAN bus traffic for specified duration"""
messages = []
start_time = time.time()
while time.time() - start_time < duration:
msg = bus.recv(timeout=1.0)
if msg:
messages.append({
'timestamp': msg.timestamp,
'arbitration_id': hex(msg.arbitration_id),
'data': msg.data.hex(),
'dlc': msg.dlc
})
return messages
# Send CAN message
def send_can_message(arbitration_id, data):
"""Inject CAN message onto the bus"""
msg = can.Message(
arbitration_id=arbitration_id,
data=bytes.fromhex(data),
is_extended_id=False
)
bus.send(msg)
print(f"Sent: ID={hex(arbitration_id)}, Data={data}")from collections import Counter
import json
def analyze_can_traffic(messages):
"""Analyze CAN traffic for security assessment"""
analysis = {
'total_messages': len(messages),
'unique_ids': set(),
'id_frequency': Counter(),
'data_patterns': {}
}
for msg in messages:
arb_id = msg['arbitration_id']
analysis['unique_ids'].add(arb_id)
analysis['id_frequency'][arb_id] += 1
# Track data patterns per ID
if arb_id not in analysis['data_patterns']:
analysis['data_patterns'][arb_id] = []
analysis['data_patterns'][arb_id].append(msg['data'])
# Convert set to list for JSON serialization
analysis['unique_ids'] = list(analysis['unique_ids'])
analysis['id_frequency'] = dict(analysis['id_frequency'])
return analysis
def save_analysis(analysis, output_file='analysis.json'):
"""Save analysis results to file"""
with open(output_file, 'w') as f:
json.dump(analysis, f, indent=2)import random
import time
def fuzz_can_id(target_id, num_iterations=100, delay=0.1):
"""Fuzz a specific CAN ID with random data"""
print(f"Starting fuzzing on CAN ID: {hex(target_id)}")
for i in range(num_iterations):
# Generate random 8-byte payload
fuzz_data = bytes([random.randint(0, 255) for _ in range(8)])
msg = can.Message(
arbitration_id=target_id,
data=fuzz_data,
is_extended_id=False
)
try:
bus.send(msg)
print(f"[{i+1}/{num_iterations}] Sent: {fuzz_data.hex()}")
time.sleep(delay)
except Exception as e:
print(f"Error sending message: {e}")
def intelligent_fuzz(target_id, baseline_data, mutations=50):
"""Perform mutation-based fuzzing on known data"""
baseline = bytes.fromhex(baseline_data)
for i in range(mutations):
mutated = bytearray(baseline)
# Mutate random byte
byte_idx = random.randint(0, len(mutated) - 1)
mutated[byte_idx] = random.randint(0, 255)
msg = can.Message(
arbitration_id=target_id,
data=bytes(mutated),
is_extended_id=False
)
bus.send(msg)
time.sleep(0.05)import pickle
def capture_session(output_file='session.pkl', duration=30):
"""Capture CAN session for replay"""
print(f"Capturing session for {duration} seconds...")
messages = []
start_time = time.time()
while time.time() - start_time < duration:
msg = bus.recv(timeout=1.0)
if msg:
messages.append({
'arbitration_id': msg.arbitration_id,
'data': bytes(msg.data),
'timestamp': msg.timestamp,
'is_extended_id': msg.is_extended_id
})
with open(output_file, 'wb') as f:
pickle.dump(messages, f)
print(f"Captured {len(messages)} messages")
return messages
def replay_session(input_file='session.pkl', speed_multiplier=1.0):
"""Replay captured CAN session"""
with open(input_file, 'rb') as f:
messages = pickle.load(f)
print(f"Replaying {len(messages)} messages...")
if not messages:
return
base_time = messages[0]['timestamp']
start_time = time.time()
for msg_data in messages:
# Calculate timing
original_delay = msg_data['timestamp'] - base_time
target_time = start_time + (original_delay / speed_multiplier)
# Wait until target time
sleep_time = target_time - time.time()
if sleep_time > 0:
time.sleep(sleep_time)
# Send message
msg = can.Message(
arbitration_id=msg_data['arbitration_id'],
data=msg_data['data'],
is_extended_id=msg_data['is_extended_id']
)
bus.send(msg)# CAN interface configuration
export CAN_INTERFACE=can0
export CAN_BITRATE=500000
# Logging configuration
export S800_LOG_LEVEL=INFO
export S800_LOG_FILE=/var/log/s800.log
# Testing parameters
export S800_FUZZ_ITERATIONS=1000
export S800_FUZZ_DELAY=0.1config.json{
"interface": {
"channel": "can0",
"bustype": "socketcan",
"bitrate": 500000
},
"logging": {
"level": "INFO",
"file": "s800.log",
"format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
},
"fuzzing": {
"default_iterations": 100,
"default_delay": 0.1,
"target_ids": ["0x123", "0x456", "0x789"]
},
"analysis": {
"output_dir": "./results",
"save_pcap": true
}
}import os
from datetime import datetime
def run_security_assessment(target_ids, duration=60):
"""Perform comprehensive security assessment"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_dir = f"assessment_{timestamp}"
os.makedirs(output_dir, exist_ok=True)
# 1. Capture baseline traffic
print("[1/4] Capturing baseline traffic...")
baseline = read_can_traffic(duration=duration)
with open(f"{output_dir}/baseline.json", 'w') as f:
json.dump(baseline, f, indent=2)
# 2. Analyze traffic
print("[2/4] Analyzing traffic...")
analysis = analyze_can_traffic(baseline)
save_analysis(analysis, f"{output_dir}/analysis.json")
# 3. Perform targeted fuzzing
print("[3/4] Fuzzing target IDs...")
for target_id in target_ids:
fuzz_can_id(int(target_id, 16), num_iterations=50)
time.sleep(2)
# 4. Capture post-fuzz traffic
print("[4/4] Capturing post-fuzz traffic...")
post_fuzz = read_can_traffic(duration=30)
with open(f"{output_dir}/post_fuzz.json", 'w') as f:
json.dump(post_fuzz, f, indent=2)
print(f"Assessment complete. Results in {output_dir}/")def monitor_can_bus(alert_ids=None, callback=None):
"""Monitor CAN bus for specific IDs or anomalies"""
alert_ids = alert_ids or []
print("Starting CAN bus monitoring...")
print(f"Alert IDs: {[hex(id) for id in alert_ids]}")
while True:
msg = bus.recv(timeout=1.0)
if msg:
if msg.arbitration_id in alert_ids:
alert_msg = f"ALERT: Detected ID {hex(msg.arbitration_id)} - Data: {msg.data.hex()}"
print(alert_msg)
if callback:
callback(msg)# Check available interfaces
ip link show
# Verify SocketCAN module loaded
lsmod | grep can
# Load modules if needed
sudo modprobe can
sudo modprobe can_raw
sudo modprobe vcan# Add user to dialout group (for USB-CAN adapters)
sudo usermod -a -G dialout $USER
# Or run with sudo (temporary)
sudo python your_script.py# Verify CAN bus is active
def test_can_connection():
"""Test CAN interface connectivity"""
try:
bus = can.interface.Bus(channel='can0', bustype='socketcan')
msg = bus.recv(timeout=5.0)
if msg:
print(f"Connection OK: Received message ID={hex(msg.arbitration_id)}")
return True
else:
print("No messages received - check vehicle connection")
return False
except Exception as e:
print(f"Connection failed: {e}")
return False# Try different bitrates
sudo ip link set can0 type can bitrate 125000
sudo ip link set can0 type can bitrate 250000
sudo ip link set can0 type can bitrate 500000