dbus
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
Chinese1. Overview
1. 概述
Risk Level: HIGH - System service access, privileged operations, IPC
You are an expert in D-Bus communication with deep expertise in:
- D-Bus Protocol: Message bus system, object paths, interfaces
- Bus Types: Session bus (user), System bus (privileged)
- Service Interaction: Method calls, signals, properties
- Security: Policy enforcement, peer credentials
风险等级: 高 - 系统服务访问、特权操作、IPC
您是D-Bus通信专家,具备以下深度专业知识:
- D-Bus协议: 消息总线系统、对象路径、接口
- 总线类型: 会话总线(用户级)、系统总线(特权级)
- 服务交互: 方法调用、信号、属性
- 安全性: 策略实施、对等方凭证
Core Expertise Areas
核心专业领域
- Bus Communication: Session/system bus, message routing
- Object Model: Paths, interfaces, methods, signals
- Policy Enforcement: D-Bus security policies, access control
- Security Controls: Credential validation, service allowlists
- 总线通信: 会话/系统总线、消息路由
- 对象模型: 路径、接口、方法、信号
- 策略实施: D-Bus安全策略、访问控制
- 安全控制: 凭证验证、服务允许列表
2. Core Principles
2. 核心原则
- TDD First - Write tests before implementation
- Performance Aware - Optimize connections, caching, async calls
- Security First - Validate targets, block privileged services
- Minimal Privilege - Session bus by default, least access
- 测试驱动开发优先 - 先编写测试再实现功能
- 性能感知 - 优化连接、缓存、异步调用
- 安全优先 - 验证目标、阻止特权服务
- 最小权限 - 默认使用会话总线,最小化访问权限
3. Core Responsibilities
3. 核心职责
3.1 Safe IPC Principles
3.1 安全IPC原则
When using D-Bus:
- Validate service targets before method calls
- Use session bus unless system access required
- Block privileged services (PolicyKit, systemd)
- Log all method invocations
- Enforce call timeouts
使用D-Bus时:
- 在方法调用前验证服务目标
- 除非需要系统访问,否则使用会话总线
- 阻止特权服务(PolicyKit、systemd)
- 记录所有方法调用
- 实施调用超时
3.2 Security-First Approach
3.2 安全优先方法
Every D-Bus operation MUST:
- Validate target service/interface
- Check against blocked service list
- Use appropriate bus type
- Log operation details
- Enforce timeout limits
每一项D-Bus操作必须:
- 验证目标服务/接口
- 检查是否在阻止服务列表中
- 使用合适的总线类型
- 记录操作细节
- 实施超时限制
3.3 Bus Type Policy
3.3 总线类型策略
- Session Bus: User applications, non-privileged
- System Bus: System services, requires elevated permissions
- Default: Always prefer session bus
- 会话总线: 用户应用程序,非特权级
- 系统总线: 系统服务,需要提升权限
- 默认: 始终优先使用会话总线
4. Technical Foundation
4. 技术基础
4.1 D-Bus Architecture
4.1 D-Bus架构
Application -> D-Bus Library -> D-Bus Daemon -> Target ServiceKey Concepts:
- Bus Name: Service identifier (e.g., )
org.freedesktop.Notifications - Object Path: Object location (e.g., )
/org/freedesktop/Notifications - Interface: Method grouping (e.g., )
org.freedesktop.Notifications - Member: Method or signal name
Application -> D-Bus Library -> D-Bus Daemon -> Target Service关键概念:
- 总线名称: 服务标识符(例如:)
org.freedesktop.Notifications - 对象路径: 对象位置(例如:)
/org/freedesktop/Notifications - 接口: 方法分组(例如:)
org.freedesktop.Notifications - 成员: 方法或信号名称
4.2 Libraries
4.2 库
| Library | Purpose | Security Notes |
|---|---|---|
| Python bindings | Validate peer credentials |
| Modern Python D-Bus | Use with service filtering |
| Async D-Bus | Enforce timeouts |
| GIO D-Bus bindings | Built-in security |
| 库 | 用途 | 安全注意事项 |
|---|---|---|
| Python绑定 | 验证对等方凭证 |
| 现代Python D-Bus库 | 配合服务过滤使用 |
| 异步D-Bus库 | 实施超时 |
| GIO D-Bus绑定 | 内置安全机制 |
5. Implementation Workflow (TDD)
5. 实现工作流(测试驱动开发)
Step 1: Write Failing Test First
步骤1:先编写失败的测试
python
undefinedpython
undefinedtests/test_dbus_client.py
tests/test_dbus_client.py
import pytest
from unittest.mock import MagicMock, patch
class TestSecureDBusClient:
"""Test D-Bus client with mocked bus."""
@pytest.fixture
def mock_bus(self):
with patch('dbus.SessionBus') as mock:
yield mock.return_value
def test_blocks_privileged_services(self, mock_bus):
"""Should reject access to blocked services."""
from secure_dbus import SecureDBusClient
client = SecureDBusClient()
with pytest.raises(SecurityError) as exc:
client.get_object('org.freedesktop.PolicyKit1', '/')
assert 'blocked' in str(exc.value).lower()
def test_validates_bus_name_format(self, mock_bus):
"""Should reject malformed bus names."""
from secure_dbus import SecureDBusClient
client = SecureDBusClient()
with pytest.raises(ValueError):
client.get_object('invalid..name', '/')
def test_enforces_timeout(self, mock_bus):
"""Should timeout long-running calls."""
from secure_dbus import SecureDBusClient
client = SecureDBusClient()
client.timeout = 1
mock_bus.get_object.return_value.SomeMethod.side_effect = \
Exception('Timeout')
with pytest.raises(TimeoutError):
client.call_method(
'org.test.Service', '/', 'org.test.Interface', 'SomeMethod'
)undefinedimport pytest
from unittest.mock import MagicMock, patch
class TestSecureDBusClient:
"""使用模拟总线测试D-Bus客户端。"""
@pytest.fixture
def mock_bus(self):
with patch('dbus.SessionBus') as mock:
yield mock.return_value
def test_blocks_privileged_services(self, mock_bus):
"""应拒绝访问被阻止的服务。"""
from secure_dbus import SecureDBusClient
client = SecureDBusClient()
with pytest.raises(SecurityError) as exc:
client.get_object('org.freedesktop.PolicyKit1', '/')
assert 'blocked' in str(exc.value).lower()
def test_validates_bus_name_format(self, mock_bus):
"""应拒绝格式错误的总线名称。"""
from secure_dbus import SecureDBusClient
client = SecureDBusClient()
with pytest.raises(ValueError):
client.get_object('invalid..name', '/')
def test_enforces_timeout(self, mock_bus):
"""对长时间运行的调用应超时。"""
from secure_dbus import SecureDBusClient
client = SecureDBusClient()
client.timeout = 1
mock_bus.get_object.return_value.SomeMethod.side_effect = \
Exception('Timeout')
with pytest.raises(TimeoutError):
client.call_method(
'org.test.Service', '/', 'org.test.Interface', 'SomeMethod'
)undefinedStep 2: Implement Minimum to Pass
步骤2:实现通过测试的最小代码
python
undefinedpython
undefinedsecure_dbus.py
secure_dbus.py
class SecureDBusClient:
BLOCKED_SERVICES = {'org.freedesktop.PolicyKit1'}
def get_object(self, bus_name: str, object_path: str):
if bus_name in self.BLOCKED_SERVICES:
raise SecurityError(f"Access to {bus_name} is blocked")
if not self._validate_bus_name(bus_name):
raise ValueError(f"Invalid bus name: {bus_name}")
return self.bus.get_object(bus_name, object_path)undefinedclass SecureDBusClient:
BLOCKED_SERVICES = {'org.freedesktop.PolicyKit1'}
def get_object(self, bus_name: str, object_path: str):
if bus_name in self.BLOCKED_SERVICES:
raise SecurityError(f"Access to {bus_name} is blocked")
if not self._validate_bus_name(bus_name):
raise ValueError(f"Invalid bus name: {bus_name}")
return self.bus.get_object(bus_name, object_path)undefinedStep 3: Refactor Following Patterns
步骤3:遵循模式重构
Add logging, credential validation, and property caching.
添加日志、凭证验证和属性缓存。
Step 4: Run Full Verification
步骤4:运行完整验证
bash
undefinedbash
undefinedRun tests
运行测试
pytest tests/test_dbus_client.py -v
pytest tests/test_dbus_client.py -v
Type checking
类型检查
mypy secure_dbus.py --strict
mypy secure_dbus.py --strict
Coverage
覆盖率检查
pytest --cov=secure_dbus --cov-report=term-missing
---pytest --cov=secure_dbus --cov-report=term-missing
---6. Performance Patterns
6. 性能模式
Pattern 1: Connection Reuse
模式1:连接复用
python
undefinedpython
undefinedGOOD: Reuse connection
推荐:复用连接
class DBusConnectionPool:
_session_bus = None
@classmethod
def get_session_bus(cls):
if cls._session_bus is None:
cls._session_bus = dbus.SessionBus()
return cls._session_busclass DBusConnectionPool:
_session_bus = None
@classmethod
def get_session_bus(cls):
if cls._session_bus is None:
cls._session_bus = dbus.SessionBus()
return cls._session_busBAD: Create new connection each call
不推荐:每次调用创建新连接
def get_service():
bus = dbus.SessionBus() # Expensive!
return bus.get_object('org.test.Service', '/')
undefineddef get_service():
bus = dbus.SessionBus() # 开销大!
return bus.get_object('org.test.Service', '/')
undefinedPattern 2: Signal Filtering
模式2:信号过滤
python
undefinedpython
undefinedGOOD: Filter signals at subscription
推荐:订阅时过滤信号
bus.add_signal_receiver(
handler,
signal_name='SpecificSignal', # Only this signal
dbus_interface='org.test.Interface',
path='/specific/path' # Only this path
)
bus.add_signal_receiver(
handler,
signal_name='SpecificSignal', # 仅接收此信号
dbus_interface='org.test.Interface',
path='/specific/path' # 仅接收此路径的信号
)
BAD: Receive all signals and filter in handler
不推荐:接收所有信号后在处理程序中过滤
bus.add_signal_receiver(
handler,
signal_name=None, # All signals - expensive!
dbus_interface=None
)
undefinedbus.add_signal_receiver(
handler,
signal_name=None, # 所有信号 - 开销大!
dbus_interface=None
)
undefinedPattern 3: Async Calls with dasbus
模式3:使用dasbus进行异步调用
python
undefinedpython
undefinedGOOD: Async calls for non-blocking operations
推荐:异步调用实现非阻塞操作
from dasbus.connection import SessionMessageBus
from dasbus.loop import EventLoop
import asyncio
async def async_call():
bus = SessionMessageBus()
proxy = bus.get_proxy('org.test.Service', '/')
result = await asyncio.to_thread(proxy.Method)
return result
from dasbus.connection import SessionMessageBus
from dasbus.loop import EventLoop
import asyncio
async def async_call():
bus = SessionMessageBus()
proxy = bus.get_proxy('org.test.Service', '/')
result = await asyncio.to_thread(proxy.Method)
return result
BAD: Blocking calls in async context
不推荐:在异步上下文中使用阻塞调用
def blocking_call():
bus = dbus.SessionBus()
proxy = bus.get_object('org.test.Service', '/')
return proxy.Method() # Blocks event loop!
undefineddef blocking_call():
bus = dbus.SessionBus()
proxy = bus.get_object('org.test.Service', '/')
return proxy.Method() # 阻塞事件循环!
undefinedPattern 4: Message Batching
模式4:消息批处理
python
undefinedpython
undefinedGOOD: Batch property reads
推荐:批量读取属性
def get_all_properties(proxy, interface):
props = dbus.Interface(proxy, 'org.freedesktop.DBus.Properties')
return props.GetAll(interface) # One call
def get_all_properties(proxy, interface):
props = dbus.Interface(proxy, 'org.freedesktop.DBus.Properties')
return props.GetAll(interface) # 一次调用
BAD: Individual property reads
不推荐:单独读取每个属性
def get_properties_slow(proxy, interface):
props = dbus.Interface(proxy, 'org.freedesktop.DBus.Properties')
return {
'prop1': props.Get(interface, 'prop1'), # Call 1
'prop2': props.Get(interface, 'prop2'), # Call 2
'prop3': props.Get(interface, 'prop3'), # Call 3
}
undefineddef get_properties_slow(proxy, interface):
props = dbus.Interface(proxy, 'org.freedesktop.DBus.Properties')
return {
'prop1': props.Get(interface, 'prop1'), # 调用1
'prop2': props.Get(interface, 'prop2'), # 调用2
'prop3': props.Get(interface, 'prop3'), # 调用3
}
undefinedPattern 5: Property Caching
模式5:属性缓存
python
undefinedpython
undefinedGOOD: Cache properties with TTL
推荐:带TTL的属性缓存
from functools import lru_cache
from time import time
class CachedPropertyAccess:
def init(self, client, cache_ttl=5):
self.client = client
self.cache_ttl = cache_ttl
self._cache = {}
def get_property(self, bus_name, path, interface, prop):
key = (bus_name, path, interface, prop)
cached = self._cache.get(key)
if cached and time() - cached['time'] < self.cache_ttl:
return cached['value']
value = self._fetch_property(bus_name, path, interface, prop)
self._cache[key] = {'value': value, 'time': time()}
return valuefrom functools import lru_cache
from time import time
class CachedPropertyAccess:
def init(self, client, cache_ttl=5):
self.client = client
self.cache_ttl = cache_ttl
self._cache = {}
def get_property(self, bus_name, path, interface, prop):
key = (bus_name, path, interface, prop)
cached = self._cache.get(key)
if cached and time() - cached['time'] < self.cache_ttl:
return cached['value']
value = self._fetch_property(bus_name, path, interface, prop)
self._cache[key] = {'value': value, 'time': time()}
return valueBAD: Fetch property every time
不推荐:每次都获取属性
def get_property(proxy, interface, prop):
props = dbus.Interface(proxy, 'org.freedesktop.DBus.Properties')
return props.Get(interface, prop) # Always fetches
---def get_property(proxy, interface, prop):
props = dbus.Interface(proxy, 'org.freedesktop.DBus.Properties')
return props.Get(interface, prop) # 每次都获取
---7. Implementation Patterns
7. 实现模式
Pattern 1: Secure D-Bus Client
模式1:安全D-Bus客户端
python
import dbus
from dbus.exceptions import DBusException
import logging
class SecureDBusClient:
"""Secure D-Bus client with access controls."""
BLOCKED_SERVICES = {
'org.freedesktop.PolicyKit1', # Privilege escalation
'org.freedesktop.systemd1', # System service control
'org.freedesktop.login1', # Session/power management
'org.gnome.keyring', # Secret storage
'org.freedesktop.secrets', # Secret service
'org.freedesktop.PackageKit', # Package installation
}
BLOCKED_INTERFACES = {
'org.freedesktop.DBus.Properties', # Can read/write any property
}
def __init__(self, bus_type: str = 'session', permission_tier: str = 'standard'):
self.permission_tier = permission_tier
self.logger = logging.getLogger('dbus.security')
self.timeout = 30 # seconds
# Connect to bus
if bus_type == 'session':
self.bus = dbus.SessionBus()
elif bus_type == 'system':
if permission_tier != 'elevated':
raise PermissionError("System bus requires 'elevated' tier")
self.bus = dbus.SystemBus()
else:
raise ValueError(f"Invalid bus type: {bus_type}")
def get_object(self, bus_name: str, object_path: str) -> dbus.Interface:
"""Get D-Bus object with validation."""
# Security check
if bus_name in self.BLOCKED_SERVICES:
self.logger.warning('blocked_service', service=bus_name)
raise SecurityError(f"Access to {bus_name} is blocked")
# Validate bus name format
if not self._validate_bus_name(bus_name):
raise ValueError(f"Invalid bus name: {bus_name}")
# Get proxy object
try:
proxy = self.bus.get_object(bus_name, object_path)
self._audit_log('get_object', bus_name, object_path)
return proxy
except DBusException as e:
self.logger.error(f"D-Bus error: {e}")
raise
def call_method(
self,
bus_name: str,
object_path: str,
interface: str,
method: str,
*args
):
"""Call D-Bus method with validation."""
# Security checks
if interface in self.BLOCKED_INTERFACES:
raise SecurityError(f"Interface {interface} is blocked")
# Get object
proxy = self.get_object(bus_name, object_path)
iface = dbus.Interface(proxy, interface)
# Call with timeout
try:
result = getattr(iface, method)(
*args,
timeout=self.timeout
)
self._audit_log('call_method', bus_name, f"{interface}.{method}")
return result
except DBusException as e:
if 'Timeout' in str(e):
raise TimeoutError(f"Method call timed out after {self.timeout}s")
raise
def get_peer_credentials(self, bus_name: str) -> dict:
"""Get credentials of D-Bus peer."""
dbus_obj = self.bus.get_object(
'org.freedesktop.DBus',
'/org/freedesktop/DBus'
)
dbus_iface = dbus.Interface(dbus_obj, 'org.freedesktop.DBus')
return {
'pid': dbus_iface.GetConnectionUnixProcessID(bus_name),
'uid': dbus_iface.GetConnectionUnixUser(bus_name),
}
def _validate_bus_name(self, name: str) -> bool:
"""Validate D-Bus bus name format."""
import re
pattern = r'^[a-zA-Z_][a-zA-Z0-9_]*(\.[a-zA-Z_][a-zA-Z0-9_]*)+$'
return bool(re.match(pattern, name)) and len(name) <= 255
def _audit_log(self, action: str, service: str, detail: str):
"""Log operation for audit."""
self.logger.info(
f'dbus.{action}',
extra={
'service': service,
'detail': detail,
'permission_tier': self.permission_tier
}
)python
import dbus
from dbus.exceptions import DBusException
import logging
class SecureDBusClient:
"""带访问控制的安全D-Bus客户端。"""
BLOCKED_SERVICES = {
'org.freedesktop.PolicyKit1', # 权限提升
'org.freedesktop.systemd1', # 系统服务控制
'org.freedesktop.login1', # 会话/电源管理
'org.gnome.keyring', # 密钥存储
'org.freedesktop.secrets', # 密钥服务
'org.freedesktop.PackageKit', # 包安装
}
BLOCKED_INTERFACES = {
'org.freedesktop.DBus.Properties', # 可读写任意属性
}
def __init__(self, bus_type: str = 'session', permission_tier: str = 'standard'):
self.permission_tier = permission_tier
self.logger = logging.getLogger('dbus.security')
self.timeout = 30 # 秒
# 连接到总线
if bus_type == 'session':
self.bus = dbus.SessionBus()
elif bus_type == 'system':
if permission_tier != 'elevated':
raise PermissionError("系统总线需要'elevated'权限等级")
self.bus = dbus.SystemBus()
else:
raise ValueError(f"无效总线类型: {bus_type}")
def get_object(self, bus_name: str, object_path: str) -> dbus.Interface:
"""验证后获取D-Bus对象。"""
# 安全检查
if bus_name in self.BLOCKED_SERVICES:
self.logger.warning('blocked_service', service=bus_name)
raise SecurityError(f"访问{bus_name}被阻止")
# 验证总线名称格式
if not self._validate_bus_name(bus_name):
raise ValueError(f"无效总线名称: {bus_name}")
# 获取代理对象
try:
proxy = self.bus.get_object(bus_name, object_path)
self._audit_log('get_object', bus_name, object_path)
return proxy
except DBusException as e:
self.logger.error(f"D-Bus错误: {e}")
raise
def call_method(
self,
bus_name: str,
object_path: str,
interface: str,
method: str,
*args
):
"""验证后调用D-Bus方法。"""
# 安全检查
if interface in self.BLOCKED_INTERFACES:
raise SecurityError(f"接口{interface}被阻止")
# 获取对象
proxy = self.get_object(bus_name, object_path)
iface = dbus.Interface(proxy, interface)
# 带超时调用
try:
result = getattr(iface, method)(
*args,
timeout=self.timeout
)
self._audit_log('call_method', bus_name, f"{interface}.{method}")
return result
except DBusException as e:
if 'Timeout' in str(e):
raise TimeoutError(f"方法调用在{self.timeout}秒后超时")
raise
def get_peer_credentials(self, bus_name: str) -> dict:
"""获取D-Bus对等方的凭证。"""
dbus_obj = self.bus.get_object(
'org.freedesktop.DBus',
'/org/freedesktop/DBus'
)
dbus_iface = dbus.Interface(dbus_obj, 'org.freedesktop.DBus')
return {
'pid': dbus_iface.GetConnectionUnixProcessID(bus_name),
'uid': dbus_iface.GetConnectionUnixUser(bus_name),
}
def _validate_bus_name(self, name: str) -> bool:
"""验证D-Bus总线名称格式。"""
import re
pattern = r'^[a-zA-Z_][a-zA-Z0-9_]*(\.[a-zA-Z_][a-zA-Z0-9_]*)+$'
return bool(re.match(pattern, name)) and len(name) <= 255
def _audit_log(self, action: str, service: str, detail: str):
"""记录操作用于审计。"""
self.logger.info(
f'dbus.{action}',
extra={
'service': service,
'detail': detail,
'permission_tier': self.permission_tier
}
)Pattern 2: Signal Monitoring
模式2:信号监控
python
from dbus.mainloop.glib import DBusGMainLoop
from gi.repository import GLib
class SecureSignalMonitor:
"""Monitor D-Bus signals safely."""
ALLOWED_SIGNALS = {
'org.freedesktop.Notifications': ['NotificationClosed', 'ActionInvoked'],
'org.freedesktop.FileManager1': ['OpenLocationRequested'],
}
def __init__(self, client: SecureDBusClient):
self.client = client
self.handlers = {}
self.logger = logging.getLogger('dbus.signals')
# Setup main loop
DBusGMainLoop(set_as_default=True)
def subscribe(
self,
bus_name: str,
interface: str,
signal: str,
handler
):
"""Subscribe to signal with validation."""
# Check if signal is allowed
allowed = self.ALLOWED_SIGNALS.get(interface, [])
if signal not in allowed:
raise SecurityError(f"Signal {interface}.{signal} not allowed")
# Wrapper to log signal receipt
def safe_handler(*args):
self.logger.info(
'signal_received',
extra={'interface': interface, 'signal': signal}
)
handler(*args)
# Subscribe
self.client.bus.add_signal_receiver(
safe_handler,
signal_name=signal,
dbus_interface=interface,
bus_name=bus_name
)
self.handlers[(interface, signal)] = safe_handler
def run(self, timeout: int = None):
"""Run signal loop with timeout."""
loop = GLib.MainLoop()
if timeout:
GLib.timeout_add_seconds(timeout, loop.quit)
loop.run()python
from dbus.mainloop.glib import DBusGMainLoop
from gi.repository import GLib
class SecureSignalMonitor:
"""安全监控D-Bus信号。"""
ALLOWED_SIGNALS = {
'org.freedesktop.Notifications': ['NotificationClosed', 'ActionInvoked'],
'org.freedesktop.FileManager1': ['OpenLocationRequested'],
}
def __init__(self, client: SecureDBusClient):
self.client = client
self.handlers = {}
self.logger = logging.getLogger('dbus.signals')
# 设置主循环
DBusGMainLoop(set_as_default=True)
def subscribe(
self,
bus_name: str,
interface: str,
signal: str,
handler
):
"""验证后订阅信号。"""
# 检查信号是否被允许
allowed = self.ALLOWED_SIGNALS.get(interface, [])
if signal not in allowed:
raise SecurityError(f"信号{interface}.{signal}不被允许")
# 包装处理程序以记录信号接收
def safe_handler(*args):
self.logger.info(
'signal_received',
extra={'interface': interface, 'signal': signal}
)
handler(*args)
# 订阅
self.client.bus.add_signal_receiver(
safe_handler,
signal_name=signal,
dbus_interface=interface,
bus_name=bus_name
)
self.handlers[(interface, signal)] = safe_handler
def run(self, timeout: int = None):
"""带超时运行信号循环。"""
loop = GLib.MainLoop()
if timeout:
GLib.timeout_add_seconds(timeout, loop.quit)
loop.run()Pattern 3: Property Access Control
模式3:属性访问控制
python
class SecurePropertyAccess:
"""Controlled access to D-Bus properties."""
READABLE_PROPERTIES = {
'org.freedesktop.Notifications': ['ServerCapabilities'],
'org.mpris.MediaPlayer2': ['Identity', 'PlaybackStatus'],
}
WRITABLE_PROPERTIES = {
'org.mpris.MediaPlayer2.Player': ['Volume'],
}
def __init__(self, client: SecureDBusClient):
self.client = client
self.logger = logging.getLogger('dbus.properties')
def get_property(
self,
bus_name: str,
object_path: str,
interface: str,
property_name: str
):
"""Get property with access control."""
# Check if property is readable
allowed = self.READABLE_PROPERTIES.get(interface, [])
if property_name not in allowed:
raise SecurityError(f"Property {interface}.{property_name} not readable")
proxy = self.client.get_object(bus_name, object_path)
props = dbus.Interface(proxy, 'org.freedesktop.DBus.Properties')
value = props.Get(interface, property_name)
self.logger.info(
'property_read',
extra={'interface': interface, 'property': property_name}
)
return value
def set_property(
self,
bus_name: str,
object_path: str,
interface: str,
property_name: str,
value
):
"""Set property with access control."""
if self.client.permission_tier == 'read-only':
raise PermissionError("Setting properties requires 'standard' tier")
# Check if property is writable
allowed = self.WRITABLE_PROPERTIES.get(interface, [])
if property_name not in allowed:
raise SecurityError(f"Property {interface}.{property_name} not writable")
proxy = self.client.get_object(bus_name, object_path)
props = dbus.Interface(proxy, 'org.freedesktop.DBus.Properties')
props.Set(interface, property_name, value)
self.logger.info(
'property_write',
extra={'interface': interface, 'property': property_name}
)python
class SecurePropertyAccess:
"""受控访问D-Bus属性。"""
READABLE_PROPERTIES = {
'org.freedesktop.Notifications': ['ServerCapabilities'],
'org.mpris.MediaPlayer2': ['Identity', 'PlaybackStatus'],
}
WRITABLE_PROPERTIES = {
'org.mpris.MediaPlayer2.Player': ['Volume'],
}
def __init__(self, client: SecureDBusClient):
self.client = client
self.logger = logging.getLogger('dbus.properties')
def get_property(
self,
bus_name: str,
object_path: str,
interface: str,
property_name: str
):
"""带访问控制获取属性。"""
# 检查属性是否可读取
allowed = self.READABLE_PROPERTIES.get(interface, [])
if property_name not in allowed:
raise SecurityError(f"属性{interface}.{property_name}不可读取")
proxy = self.client.get_object(bus_name, object_path)
props = dbus.Interface(proxy, 'org.freedesktop.DBus.Properties')
value = props.Get(interface, property_name)
self.logger.info(
'property_read',
extra={'interface': interface, 'property': property_name}
)
return value
def set_property(
self,
bus_name: str,
object_path: str,
interface: str,
property_name: str,
value
):
"""带访问控制设置属性"""
if self.client.permission_tier == 'read-only':
raise PermissionError("设置属性需要'standard'权限等级")
# 检查属性是否可写入
allowed = self.WRITABLE_PROPERTIES.get(interface, [])
if property_name not in allowed:
raise SecurityError(f"属性{interface}.{property_name}不可写入")
proxy = self.client.get_object(bus_name, object_path)
props = dbus.Interface(proxy, 'org.freedesktop.DBus.Properties')
props.Set(interface, property_name, value)
self.logger.info(
'property_write',
extra={'interface': interface, 'property': property_name}
)Pattern 4: Service Discovery
模式4:服务发现
python
class ServiceDiscovery:
"""Discover D-Bus services safely."""
def __init__(self, client: SecureDBusClient):
self.client = client
def list_names(self) -> list:
"""List available bus names (filtered)."""
dbus_obj = self.client.bus.get_object(
'org.freedesktop.DBus',
'/org/freedesktop/DBus'
)
dbus_iface = dbus.Interface(dbus_obj, 'org.freedesktop.DBus')
all_names = dbus_iface.ListNames()
# Filter blocked services
filtered = [
name for name in all_names
if name not in SecureDBusClient.BLOCKED_SERVICES
]
return filtered
def introspect(self, bus_name: str, object_path: str) -> str:
"""Get introspection XML for object."""
if bus_name in SecureDBusClient.BLOCKED_SERVICES:
raise SecurityError(f"Cannot introspect {bus_name}")
proxy = self.client.get_object(bus_name, object_path)
return proxy.Introspect(
dbus_interface='org.freedesktop.DBus.Introspectable'
)python
class ServiceDiscovery:
"""安全发现D-Bus服务。"""
def __init__(self, client: SecureDBusClient):
self.client = client
def list_names(self) -> list:
"""列出可用的总线名称(已过滤)。"""
dbus_obj = self.client.bus.get_object(
'org.freedesktop.DBus',
'/org/freedesktop/DBus'
)
dbus_iface = dbus.Interface(dbus_obj, 'org.freedesktop.DBus')
all_names = dbus_iface.ListNames()
# 过滤被阻止的服务
filtered = [
name for name in all_names
if name not in SecureDBusClient.BLOCKED_SERVICES
]
return filtered
def introspect(self, bus_name: str, object_path: str) -> str:
"""获取对象的自省XML。"""
if bus_name in SecureDBusClient.BLOCKED_SERVICES:
raise SecurityError(f"无法自省{bus_name}")
proxy = self.client.get_object(bus_name, object_path)
return proxy.Introspect(
dbus_interface='org.freedesktop.DBus.Introspectable'
)5. Security Standards
5. 安全标准
5.1 Critical Vulnerabilities
5.1 关键漏洞
1. Privilege Escalation via PolicyKit (CVE-2021-4034)
1. 通过PolicyKit提升权限(CVE-2021-4034)
- Severity: CRITICAL
- Description: Polkit vulnerability for local privilege escalation
- Mitigation: Block PolicyKit service access
- 严重程度: 关键
- 描述: Polkit本地权限提升漏洞
- 缓解措施: 阻止PolicyKit服务访问
2. D-Bus Authentication Bypass (CVE-2022-42012)
2. D-Bus认证绕过(CVE-2022-42012)
- Severity: HIGH
- Description: Unauthorized session bus access
- Mitigation: Validate peer credentials
- 严重程度: 高
- 描述: 未授权访问会话总线
- 缓解措施: 验证对等方凭证
3. Service Impersonation (CWE-290)
3. 服务冒充(CWE-290)
- Severity: HIGH
- Description: Malicious service claiming trusted name
- Mitigation: Verify service credentials
- 严重程度: 高
- 描述: 恶意服务声称受信任名称
- 缓解措施: 验证服务凭证
4. Method Injection (CWE-74)
4. 方法注入(CWE-74)
- Severity: MEDIUM
- Description: Malicious method parameters
- Mitigation: Input validation, service allowlists
- 严重程度: 中
- 描述: 恶意方法参数
- 缓解措施: 输入验证、服务允许列表
5. Information Disclosure (CWE-200)
5. 信息泄露(CWE-200)
- Severity: MEDIUM
- Description: Exposing sensitive service data
- Mitigation: Property access control
- 严重程度: 中
- 描述: 暴露敏感服务数据
- 缓解措施: 属性访问控制
5.2 Permission Tier Model
5.2 权限等级模型
python
PERMISSION_TIERS = {
'read-only': {
'bus_type': 'session',
'allowed_operations': ['get_property', 'introspect', 'list_names'],
'blocked_services': BLOCKED_SERVICES,
},
'standard': {
'bus_type': 'session',
'allowed_operations': ['*', 'set_property', 'call_method'],
'blocked_services': BLOCKED_SERVICES,
},
'elevated': {
'bus_type': ['session', 'system'],
'allowed_operations': ['*'],
'blocked_services': ['org.freedesktop.PackageKit'],
}
}python
PERMISSION_TIERS = {
'read-only': {
'bus_type': 'session',
'allowed_operations': ['get_property', 'introspect', 'list_names'],
'blocked_services': BLOCKED_SERVICES,
},
'standard': {
'bus_type': 'session',
'allowed_operations': ['*', 'set_property', 'call_method'],
'blocked_services': BLOCKED_SERVICES,
},
'elevated': {
'bus_type': ['session', 'system'],
'allowed_operations': ['*'],
'blocked_services': ['org.freedesktop.PackageKit'],
}
}8. Common Mistakes
8. 常见错误
Never: Access System Bus Without Need
禁止:无必要时访问系统总线
python
undefinedpython
undefinedBAD: Always use system bus
不推荐:始终使用系统总线
bus = dbus.SystemBus()
bus = dbus.SystemBus()
GOOD: Prefer session bus
推荐:优先使用会话总线
bus = dbus.SessionBus()
bus = dbus.SessionBus()
Only use system bus when required
仅在需要时使用系统总线
undefinedundefinedNever: Allow PolicyKit Access
禁止:允许PolicyKit访问
python
undefinedpython
undefinedBAD: No service filtering
不推荐:无服务过滤
result = client.call_method('org.freedesktop.PolicyKit1', ...)
result = client.call_method('org.freedesktop.PolicyKit1', ...)
GOOD: Block privileged services
推荐:阻止特权服务
if service not in BLOCKED_SERVICES:
result = client.call_method(service, ...)
undefinedif service not in BLOCKED_SERVICES:
result = client.call_method(service, ...)
undefinedNever: Skip Timeout Enforcement
禁止:跳过超时实施
python
undefinedpython
undefinedBAD: No timeout
不推荐:无超时
result = iface.SomeMethod()
result = iface.SomeMethod()
GOOD: With timeout
推荐:带超时
result = iface.SomeMethod(timeout=30)
---result = iface.SomeMethod(timeout=30)
---13. Pre-Deployment Checklist
13. 部署前检查清单
- Service blocklist configured
- Session bus preferred over system bus
- Timeout enforcement on all calls
- Peer credential validation
- Audit logging enabled
- Property access control configured
- 已配置服务阻止列表
- 优先使用会话总线而非系统总线
- 所有调用都实施了超时
- 已验证对等方凭证
- 已启用审计日志
- 已配置属性访问控制
14. Summary
14. 总结
Your goal is to create D-Bus automation that is:
- Secure: Service blocklists, credential validation, access control
- Reliable: Timeout enforcement, error handling
- Minimal: Session bus by default, least privilege
Security Reminders:
- Always prefer session bus over system bus
- Block access to PolicyKit and systemd
- Validate peer credentials when needed
- Enforce timeouts on all method calls
- Log all operations for audit
您的目标是创建具备以下特性的D-Bus自动化:
- 安全: 服务阻止列表、凭证验证、访问控制
- 可靠: 超时实施、错误处理
- 最小化: 默认使用会话总线,最小权限
安全提醒:
- 始终优先使用会话总线而非系统总线
- 阻止对PolicyKit和systemd的访问
- 必要时验证对等方凭证
- 对所有方法调用实施超时
- 记录所有操作用于审计
References
参考资料
- See
references/security-examples.md - See
references/threat-model.md - See
references/advanced-patterns.md
- 参见
references/security-examples.md - 参见
references/threat-model.md - 参见
references/advanced-patterns.md