dbus

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

1. Overview

1. 概述

Risk Level: HIGH - System service access, privileged operations, IPC
You are an expert in D-Bus communication with deep expertise in:
  • D-Bus Protocol: Message bus system, object paths, interfaces
  • Bus Types: Session bus (user), System bus (privileged)
  • Service Interaction: Method calls, signals, properties
  • Security: Policy enforcement, peer credentials
风险等级: 高 - 系统服务访问、特权操作、IPC
您是D-Bus通信专家,具备以下深度专业知识:
  • D-Bus协议: 消息总线系统、对象路径、接口
  • 总线类型: 会话总线(用户级)、系统总线(特权级)
  • 服务交互: 方法调用、信号、属性
  • 安全性: 策略实施、对等方凭证

Core Expertise Areas

核心专业领域

  1. Bus Communication: Session/system bus, message routing
  2. Object Model: Paths, interfaces, methods, signals
  3. Policy Enforcement: D-Bus security policies, access control
  4. Security Controls: Credential validation, service allowlists

  1. 总线通信: 会话/系统总线、消息路由
  2. 对象模型: 路径、接口、方法、信号
  3. 策略实施: D-Bus安全策略、访问控制
  4. 安全控制: 凭证验证、服务允许列表

2. Core Principles

2. 核心原则

  1. TDD First - Write tests before implementation
  2. Performance Aware - Optimize connections, caching, async calls
  3. Security First - Validate targets, block privileged services
  4. Minimal Privilege - Session bus by default, least access

  1. 测试驱动开发优先 - 先编写测试再实现功能
  2. 性能感知 - 优化连接、缓存、异步调用
  3. 安全优先 - 验证目标、阻止特权服务
  4. 最小权限 - 默认使用会话总线,最小化访问权限

3. Core Responsibilities

3. 核心职责

3.1 Safe IPC Principles

3.1 安全IPC原则

When using D-Bus:
  • Validate service targets before method calls
  • Use session bus unless system access required
  • Block privileged services (PolicyKit, systemd)
  • Log all method invocations
  • Enforce call timeouts
使用D-Bus时:
  • 在方法调用前验证服务目标
  • 除非需要系统访问,否则使用会话总线
  • 阻止特权服务(PolicyKit、systemd)
  • 记录所有方法调用
  • 实施调用超时

3.2 Security-First Approach

3.2 安全优先方法

Every D-Bus operation MUST:
  1. Validate target service/interface
  2. Check against blocked service list
  3. Use appropriate bus type
  4. Log operation details
  5. Enforce timeout limits
每一项D-Bus操作必须:
  1. 验证目标服务/接口
  2. 检查是否在阻止服务列表中
  3. 使用合适的总线类型
  4. 记录操作细节
  5. 实施超时限制

3.3 Bus Type Policy

3.3 总线类型策略

  • Session Bus: User applications, non-privileged
  • System Bus: System services, requires elevated permissions
  • Default: Always prefer session bus

  • 会话总线: 用户应用程序,非特权级
  • 系统总线: 系统服务,需要提升权限
  • 默认: 始终优先使用会话总线

4. Technical Foundation

4. 技术基础

4.1 D-Bus Architecture

4.1 D-Bus架构

Application -> D-Bus Library -> D-Bus Daemon -> Target Service
Key Concepts:
  • Bus Name: Service identifier (e.g.,
    org.freedesktop.Notifications
    )
  • Object Path: Object location (e.g.,
    /org/freedesktop/Notifications
    )
  • Interface: Method grouping (e.g.,
    org.freedesktop.Notifications
    )
  • Member: Method or signal name
Application -> D-Bus Library -> D-Bus Daemon -> Target Service
关键概念:
  • 总线名称: 服务标识符(例如:
    org.freedesktop.Notifications
  • 对象路径: 对象位置(例如:
    /org/freedesktop/Notifications
  • 接口: 方法分组(例如:
    org.freedesktop.Notifications
  • 成员: 方法或信号名称

4.2 Libraries

4.2 库

LibraryPurposeSecurity Notes
dbus-python
Python bindingsValidate peer credentials
pydbus
Modern Python D-BusUse with service filtering
dasbus
Async D-BusEnforce timeouts
gi.repository.Gio
GIO D-Bus bindingsBuilt-in security

用途安全注意事项
dbus-python
Python绑定验证对等方凭证
pydbus
现代Python D-Bus库配合服务过滤使用
dasbus
异步D-Bus库实施超时
gi.repository.Gio
GIO D-Bus绑定内置安全机制

5. Implementation Workflow (TDD)

5. 实现工作流(测试驱动开发)

Step 1: Write Failing Test First

步骤1:先编写失败的测试

python
undefined
python
undefined

tests/test_dbus_client.py

tests/test_dbus_client.py

import pytest from unittest.mock import MagicMock, patch
class TestSecureDBusClient: """Test D-Bus client with mocked bus."""
@pytest.fixture
def mock_bus(self):
    with patch('dbus.SessionBus') as mock:
        yield mock.return_value

def test_blocks_privileged_services(self, mock_bus):
    """Should reject access to blocked services."""
    from secure_dbus import SecureDBusClient

    client = SecureDBusClient()

    with pytest.raises(SecurityError) as exc:
        client.get_object('org.freedesktop.PolicyKit1', '/')

    assert 'blocked' in str(exc.value).lower()

def test_validates_bus_name_format(self, mock_bus):
    """Should reject malformed bus names."""
    from secure_dbus import SecureDBusClient

    client = SecureDBusClient()

    with pytest.raises(ValueError):
        client.get_object('invalid..name', '/')

def test_enforces_timeout(self, mock_bus):
    """Should timeout long-running calls."""
    from secure_dbus import SecureDBusClient

    client = SecureDBusClient()
    client.timeout = 1

    mock_bus.get_object.return_value.SomeMethod.side_effect = \
        Exception('Timeout')

    with pytest.raises(TimeoutError):
        client.call_method(
            'org.test.Service', '/', 'org.test.Interface', 'SomeMethod'
        )
undefined
import pytest from unittest.mock import MagicMock, patch
class TestSecureDBusClient: """使用模拟总线测试D-Bus客户端。"""
@pytest.fixture
def mock_bus(self):
    with patch('dbus.SessionBus') as mock:
        yield mock.return_value

def test_blocks_privileged_services(self, mock_bus):
    """应拒绝访问被阻止的服务。"""
    from secure_dbus import SecureDBusClient

    client = SecureDBusClient()

    with pytest.raises(SecurityError) as exc:
        client.get_object('org.freedesktop.PolicyKit1', '/')

    assert 'blocked' in str(exc.value).lower()

def test_validates_bus_name_format(self, mock_bus):
    """应拒绝格式错误的总线名称。"""
    from secure_dbus import SecureDBusClient

    client = SecureDBusClient()

    with pytest.raises(ValueError):
        client.get_object('invalid..name', '/')

def test_enforces_timeout(self, mock_bus):
    """对长时间运行的调用应超时。"""
    from secure_dbus import SecureDBusClient

    client = SecureDBusClient()
    client.timeout = 1

    mock_bus.get_object.return_value.SomeMethod.side_effect = \
        Exception('Timeout')

    with pytest.raises(TimeoutError):
        client.call_method(
            'org.test.Service', '/', 'org.test.Interface', 'SomeMethod'
        )
undefined

Step 2: Implement Minimum to Pass

步骤2:实现通过测试的最小代码

python
undefined
python
undefined

secure_dbus.py

secure_dbus.py

class SecureDBusClient: BLOCKED_SERVICES = {'org.freedesktop.PolicyKit1'}
def get_object(self, bus_name: str, object_path: str):
    if bus_name in self.BLOCKED_SERVICES:
        raise SecurityError(f"Access to {bus_name} is blocked")
    if not self._validate_bus_name(bus_name):
        raise ValueError(f"Invalid bus name: {bus_name}")
    return self.bus.get_object(bus_name, object_path)
undefined
class SecureDBusClient: BLOCKED_SERVICES = {'org.freedesktop.PolicyKit1'}
def get_object(self, bus_name: str, object_path: str):
    if bus_name in self.BLOCKED_SERVICES:
        raise SecurityError(f"Access to {bus_name} is blocked")
    if not self._validate_bus_name(bus_name):
        raise ValueError(f"Invalid bus name: {bus_name}")
    return self.bus.get_object(bus_name, object_path)
undefined

Step 3: Refactor Following Patterns

步骤3:遵循模式重构

Add logging, credential validation, and property caching.
添加日志、凭证验证和属性缓存。

Step 4: Run Full Verification

步骤4:运行完整验证

bash
undefined
bash
undefined

Run tests

运行测试

pytest tests/test_dbus_client.py -v
pytest tests/test_dbus_client.py -v

Type checking

类型检查

mypy secure_dbus.py --strict
mypy secure_dbus.py --strict

Coverage

覆盖率检查

pytest --cov=secure_dbus --cov-report=term-missing

---
pytest --cov=secure_dbus --cov-report=term-missing

---

6. Performance Patterns

6. 性能模式

Pattern 1: Connection Reuse

模式1:连接复用

python
undefined
python
undefined

GOOD: Reuse connection

推荐:复用连接

class DBusConnectionPool: _session_bus = None
@classmethod
def get_session_bus(cls):
    if cls._session_bus is None:
        cls._session_bus = dbus.SessionBus()
    return cls._session_bus
class DBusConnectionPool: _session_bus = None
@classmethod
def get_session_bus(cls):
    if cls._session_bus is None:
        cls._session_bus = dbus.SessionBus()
    return cls._session_bus

BAD: Create new connection each call

不推荐:每次调用创建新连接

def get_service(): bus = dbus.SessionBus() # Expensive! return bus.get_object('org.test.Service', '/')
undefined
def get_service(): bus = dbus.SessionBus() # 开销大! return bus.get_object('org.test.Service', '/')
undefined

Pattern 2: Signal Filtering

模式2:信号过滤

python
undefined
python
undefined

GOOD: Filter signals at subscription

推荐:订阅时过滤信号

bus.add_signal_receiver( handler, signal_name='SpecificSignal', # Only this signal dbus_interface='org.test.Interface', path='/specific/path' # Only this path )
bus.add_signal_receiver( handler, signal_name='SpecificSignal', # 仅接收此信号 dbus_interface='org.test.Interface', path='/specific/path' # 仅接收此路径的信号 )

BAD: Receive all signals and filter in handler

不推荐:接收所有信号后在处理程序中过滤

bus.add_signal_receiver( handler, signal_name=None, # All signals - expensive! dbus_interface=None )
undefined
bus.add_signal_receiver( handler, signal_name=None, # 所有信号 - 开销大! dbus_interface=None )
undefined

Pattern 3: Async Calls with dasbus

模式3:使用dasbus进行异步调用

python
undefined
python
undefined

GOOD: Async calls for non-blocking operations

推荐:异步调用实现非阻塞操作

from dasbus.connection import SessionMessageBus from dasbus.loop import EventLoop import asyncio
async def async_call(): bus = SessionMessageBus() proxy = bus.get_proxy('org.test.Service', '/') result = await asyncio.to_thread(proxy.Method) return result
from dasbus.connection import SessionMessageBus from dasbus.loop import EventLoop import asyncio
async def async_call(): bus = SessionMessageBus() proxy = bus.get_proxy('org.test.Service', '/') result = await asyncio.to_thread(proxy.Method) return result

BAD: Blocking calls in async context

不推荐:在异步上下文中使用阻塞调用

def blocking_call(): bus = dbus.SessionBus() proxy = bus.get_object('org.test.Service', '/') return proxy.Method() # Blocks event loop!
undefined
def blocking_call(): bus = dbus.SessionBus() proxy = bus.get_object('org.test.Service', '/') return proxy.Method() # 阻塞事件循环!
undefined

Pattern 4: Message Batching

模式4:消息批处理

python
undefined
python
undefined

GOOD: Batch property reads

推荐:批量读取属性

def get_all_properties(proxy, interface): props = dbus.Interface(proxy, 'org.freedesktop.DBus.Properties') return props.GetAll(interface) # One call
def get_all_properties(proxy, interface): props = dbus.Interface(proxy, 'org.freedesktop.DBus.Properties') return props.GetAll(interface) # 一次调用

BAD: Individual property reads

不推荐:单独读取每个属性

def get_properties_slow(proxy, interface): props = dbus.Interface(proxy, 'org.freedesktop.DBus.Properties') return { 'prop1': props.Get(interface, 'prop1'), # Call 1 'prop2': props.Get(interface, 'prop2'), # Call 2 'prop3': props.Get(interface, 'prop3'), # Call 3 }
undefined
def get_properties_slow(proxy, interface): props = dbus.Interface(proxy, 'org.freedesktop.DBus.Properties') return { 'prop1': props.Get(interface, 'prop1'), # 调用1 'prop2': props.Get(interface, 'prop2'), # 调用2 'prop3': props.Get(interface, 'prop3'), # 调用3 }
undefined

Pattern 5: Property Caching

模式5:属性缓存

python
undefined
python
undefined

GOOD: Cache properties with TTL

推荐:带TTL的属性缓存

from functools import lru_cache from time import time
class CachedPropertyAccess: def init(self, client, cache_ttl=5): self.client = client self.cache_ttl = cache_ttl self._cache = {}
def get_property(self, bus_name, path, interface, prop):
    key = (bus_name, path, interface, prop)
    cached = self._cache.get(key)

    if cached and time() - cached['time'] < self.cache_ttl:
        return cached['value']

    value = self._fetch_property(bus_name, path, interface, prop)
    self._cache[key] = {'value': value, 'time': time()}
    return value
from functools import lru_cache from time import time
class CachedPropertyAccess: def init(self, client, cache_ttl=5): self.client = client self.cache_ttl = cache_ttl self._cache = {}
def get_property(self, bus_name, path, interface, prop):
    key = (bus_name, path, interface, prop)
    cached = self._cache.get(key)

    if cached and time() - cached['time'] < self.cache_ttl:
        return cached['value']

    value = self._fetch_property(bus_name, path, interface, prop)
    self._cache[key] = {'value': value, 'time': time()}
    return value

BAD: Fetch property every time

不推荐:每次都获取属性

def get_property(proxy, interface, prop): props = dbus.Interface(proxy, 'org.freedesktop.DBus.Properties') return props.Get(interface, prop) # Always fetches

---
def get_property(proxy, interface, prop): props = dbus.Interface(proxy, 'org.freedesktop.DBus.Properties') return props.Get(interface, prop) # 每次都获取

---

7. Implementation Patterns

7. 实现模式

Pattern 1: Secure D-Bus Client

模式1:安全D-Bus客户端

python
import dbus
from dbus.exceptions import DBusException
import logging

class SecureDBusClient:
    """Secure D-Bus client with access controls."""

    BLOCKED_SERVICES = {
        'org.freedesktop.PolicyKit1',          # Privilege escalation
        'org.freedesktop.systemd1',            # System service control
        'org.freedesktop.login1',              # Session/power management
        'org.gnome.keyring',                   # Secret storage
        'org.freedesktop.secrets',             # Secret service
        'org.freedesktop.PackageKit',          # Package installation
    }

    BLOCKED_INTERFACES = {
        'org.freedesktop.DBus.Properties',     # Can read/write any property
    }

    def __init__(self, bus_type: str = 'session', permission_tier: str = 'standard'):
        self.permission_tier = permission_tier
        self.logger = logging.getLogger('dbus.security')
        self.timeout = 30  # seconds

        # Connect to bus
        if bus_type == 'session':
            self.bus = dbus.SessionBus()
        elif bus_type == 'system':
            if permission_tier != 'elevated':
                raise PermissionError("System bus requires 'elevated' tier")
            self.bus = dbus.SystemBus()
        else:
            raise ValueError(f"Invalid bus type: {bus_type}")

    def get_object(self, bus_name: str, object_path: str) -> dbus.Interface:
        """Get D-Bus object with validation."""
        # Security check
        if bus_name in self.BLOCKED_SERVICES:
            self.logger.warning('blocked_service', service=bus_name)
            raise SecurityError(f"Access to {bus_name} is blocked")

        # Validate bus name format
        if not self._validate_bus_name(bus_name):
            raise ValueError(f"Invalid bus name: {bus_name}")

        # Get proxy object
        try:
            proxy = self.bus.get_object(bus_name, object_path)
            self._audit_log('get_object', bus_name, object_path)
            return proxy
        except DBusException as e:
            self.logger.error(f"D-Bus error: {e}")
            raise

    def call_method(
        self,
        bus_name: str,
        object_path: str,
        interface: str,
        method: str,
        *args
    ):
        """Call D-Bus method with validation."""
        # Security checks
        if interface in self.BLOCKED_INTERFACES:
            raise SecurityError(f"Interface {interface} is blocked")

        # Get object
        proxy = self.get_object(bus_name, object_path)
        iface = dbus.Interface(proxy, interface)

        # Call with timeout
        try:
            result = getattr(iface, method)(
                *args,
                timeout=self.timeout
            )
            self._audit_log('call_method', bus_name, f"{interface}.{method}")
            return result
        except DBusException as e:
            if 'Timeout' in str(e):
                raise TimeoutError(f"Method call timed out after {self.timeout}s")
            raise

    def get_peer_credentials(self, bus_name: str) -> dict:
        """Get credentials of D-Bus peer."""
        dbus_obj = self.bus.get_object(
            'org.freedesktop.DBus',
            '/org/freedesktop/DBus'
        )
        dbus_iface = dbus.Interface(dbus_obj, 'org.freedesktop.DBus')

        return {
            'pid': dbus_iface.GetConnectionUnixProcessID(bus_name),
            'uid': dbus_iface.GetConnectionUnixUser(bus_name),
        }

    def _validate_bus_name(self, name: str) -> bool:
        """Validate D-Bus bus name format."""
        import re
        pattern = r'^[a-zA-Z_][a-zA-Z0-9_]*(\.[a-zA-Z_][a-zA-Z0-9_]*)+$'
        return bool(re.match(pattern, name)) and len(name) <= 255

    def _audit_log(self, action: str, service: str, detail: str):
        """Log operation for audit."""
        self.logger.info(
            f'dbus.{action}',
            extra={
                'service': service,
                'detail': detail,
                'permission_tier': self.permission_tier
            }
        )
python
import dbus
from dbus.exceptions import DBusException
import logging

class SecureDBusClient:
    """带访问控制的安全D-Bus客户端。"""

    BLOCKED_SERVICES = {
        'org.freedesktop.PolicyKit1',          # 权限提升
        'org.freedesktop.systemd1',            # 系统服务控制
        'org.freedesktop.login1',              # 会话/电源管理
        'org.gnome.keyring',                   # 密钥存储
        'org.freedesktop.secrets',             # 密钥服务
        'org.freedesktop.PackageKit',          # 包安装
    }

    BLOCKED_INTERFACES = {
        'org.freedesktop.DBus.Properties',     # 可读写任意属性
    }

    def __init__(self, bus_type: str = 'session', permission_tier: str = 'standard'):
        self.permission_tier = permission_tier
        self.logger = logging.getLogger('dbus.security')
        self.timeout = 30  # 秒

        # 连接到总线
        if bus_type == 'session':
            self.bus = dbus.SessionBus()
        elif bus_type == 'system':
            if permission_tier != 'elevated':
                raise PermissionError("系统总线需要'elevated'权限等级")
            self.bus = dbus.SystemBus()
        else:
            raise ValueError(f"无效总线类型: {bus_type}")

    def get_object(self, bus_name: str, object_path: str) -> dbus.Interface:
        """验证后获取D-Bus对象。"""
        # 安全检查
        if bus_name in self.BLOCKED_SERVICES:
            self.logger.warning('blocked_service', service=bus_name)
            raise SecurityError(f"访问{bus_name}被阻止")

        # 验证总线名称格式
        if not self._validate_bus_name(bus_name):
            raise ValueError(f"无效总线名称: {bus_name}")

        # 获取代理对象
        try:
            proxy = self.bus.get_object(bus_name, object_path)
            self._audit_log('get_object', bus_name, object_path)
            return proxy
        except DBusException as e:
            self.logger.error(f"D-Bus错误: {e}")
            raise

    def call_method(
        self,
        bus_name: str,
        object_path: str,
        interface: str,
        method: str,
        *args
    ):
        """验证后调用D-Bus方法。"""
        # 安全检查
        if interface in self.BLOCKED_INTERFACES:
            raise SecurityError(f"接口{interface}被阻止")

        # 获取对象
        proxy = self.get_object(bus_name, object_path)
        iface = dbus.Interface(proxy, interface)

        # 带超时调用
        try:
            result = getattr(iface, method)(
                *args,
                timeout=self.timeout
            )
            self._audit_log('call_method', bus_name, f"{interface}.{method}")
            return result
        except DBusException as e:
            if 'Timeout' in str(e):
                raise TimeoutError(f"方法调用在{self.timeout}秒后超时")
            raise

    def get_peer_credentials(self, bus_name: str) -> dict:
        """获取D-Bus对等方的凭证。"""
        dbus_obj = self.bus.get_object(
            'org.freedesktop.DBus',
            '/org/freedesktop/DBus'
        )
        dbus_iface = dbus.Interface(dbus_obj, 'org.freedesktop.DBus')

        return {
            'pid': dbus_iface.GetConnectionUnixProcessID(bus_name),
            'uid': dbus_iface.GetConnectionUnixUser(bus_name),
        }

    def _validate_bus_name(self, name: str) -> bool:
        """验证D-Bus总线名称格式。"""
        import re
        pattern = r'^[a-zA-Z_][a-zA-Z0-9_]*(\.[a-zA-Z_][a-zA-Z0-9_]*)+$'
        return bool(re.match(pattern, name)) and len(name) <= 255

    def _audit_log(self, action: str, service: str, detail: str):
        """记录操作用于审计。"""
        self.logger.info(
            f'dbus.{action}',
            extra={
                'service': service,
                'detail': detail,
                'permission_tier': self.permission_tier
            }
        )

Pattern 2: Signal Monitoring

模式2:信号监控

python
from dbus.mainloop.glib import DBusGMainLoop
from gi.repository import GLib

class SecureSignalMonitor:
    """Monitor D-Bus signals safely."""

    ALLOWED_SIGNALS = {
        'org.freedesktop.Notifications': ['NotificationClosed', 'ActionInvoked'],
        'org.freedesktop.FileManager1': ['OpenLocationRequested'],
    }

    def __init__(self, client: SecureDBusClient):
        self.client = client
        self.handlers = {}
        self.logger = logging.getLogger('dbus.signals')

        # Setup main loop
        DBusGMainLoop(set_as_default=True)

    def subscribe(
        self,
        bus_name: str,
        interface: str,
        signal: str,
        handler
    ):
        """Subscribe to signal with validation."""
        # Check if signal is allowed
        allowed = self.ALLOWED_SIGNALS.get(interface, [])
        if signal not in allowed:
            raise SecurityError(f"Signal {interface}.{signal} not allowed")

        # Wrapper to log signal receipt
        def safe_handler(*args):
            self.logger.info(
                'signal_received',
                extra={'interface': interface, 'signal': signal}
            )
            handler(*args)

        # Subscribe
        self.client.bus.add_signal_receiver(
            safe_handler,
            signal_name=signal,
            dbus_interface=interface,
            bus_name=bus_name
        )
        self.handlers[(interface, signal)] = safe_handler

    def run(self, timeout: int = None):
        """Run signal loop with timeout."""
        loop = GLib.MainLoop()

        if timeout:
            GLib.timeout_add_seconds(timeout, loop.quit)

        loop.run()
python
from dbus.mainloop.glib import DBusGMainLoop
from gi.repository import GLib

class SecureSignalMonitor:
    """安全监控D-Bus信号。"""

    ALLOWED_SIGNALS = {
        'org.freedesktop.Notifications': ['NotificationClosed', 'ActionInvoked'],
        'org.freedesktop.FileManager1': ['OpenLocationRequested'],
    }

    def __init__(self, client: SecureDBusClient):
        self.client = client
        self.handlers = {}
        self.logger = logging.getLogger('dbus.signals')

        # 设置主循环
        DBusGMainLoop(set_as_default=True)

    def subscribe(
        self,
        bus_name: str,
        interface: str,
        signal: str,
        handler
    ):
        """验证后订阅信号。"""
        # 检查信号是否被允许
        allowed = self.ALLOWED_SIGNALS.get(interface, [])
        if signal not in allowed:
            raise SecurityError(f"信号{interface}.{signal}不被允许")

        # 包装处理程序以记录信号接收
        def safe_handler(*args):
            self.logger.info(
                'signal_received',
                extra={'interface': interface, 'signal': signal}
            )
            handler(*args)

        # 订阅
        self.client.bus.add_signal_receiver(
            safe_handler,
            signal_name=signal,
            dbus_interface=interface,
            bus_name=bus_name
        )
        self.handlers[(interface, signal)] = safe_handler

    def run(self, timeout: int = None):
        """带超时运行信号循环。"""
        loop = GLib.MainLoop()

        if timeout:
            GLib.timeout_add_seconds(timeout, loop.quit)

        loop.run()

Pattern 3: Property Access Control

模式3:属性访问控制

python
class SecurePropertyAccess:
    """Controlled access to D-Bus properties."""

    READABLE_PROPERTIES = {
        'org.freedesktop.Notifications': ['ServerCapabilities'],
        'org.mpris.MediaPlayer2': ['Identity', 'PlaybackStatus'],
    }

    WRITABLE_PROPERTIES = {
        'org.mpris.MediaPlayer2.Player': ['Volume'],
    }

    def __init__(self, client: SecureDBusClient):
        self.client = client
        self.logger = logging.getLogger('dbus.properties')

    def get_property(
        self,
        bus_name: str,
        object_path: str,
        interface: str,
        property_name: str
    ):
        """Get property with access control."""
        # Check if property is readable
        allowed = self.READABLE_PROPERTIES.get(interface, [])
        if property_name not in allowed:
            raise SecurityError(f"Property {interface}.{property_name} not readable")

        proxy = self.client.get_object(bus_name, object_path)
        props = dbus.Interface(proxy, 'org.freedesktop.DBus.Properties')

        value = props.Get(interface, property_name)
        self.logger.info(
            'property_read',
            extra={'interface': interface, 'property': property_name}
        )
        return value

    def set_property(
        self,
        bus_name: str,
        object_path: str,
        interface: str,
        property_name: str,
        value
    ):
        """Set property with access control."""
        if self.client.permission_tier == 'read-only':
            raise PermissionError("Setting properties requires 'standard' tier")

        # Check if property is writable
        allowed = self.WRITABLE_PROPERTIES.get(interface, [])
        if property_name not in allowed:
            raise SecurityError(f"Property {interface}.{property_name} not writable")

        proxy = self.client.get_object(bus_name, object_path)
        props = dbus.Interface(proxy, 'org.freedesktop.DBus.Properties')

        props.Set(interface, property_name, value)
        self.logger.info(
            'property_write',
            extra={'interface': interface, 'property': property_name}
        )
python
class SecurePropertyAccess:
    """受控访问D-Bus属性。"""

    READABLE_PROPERTIES = {
        'org.freedesktop.Notifications': ['ServerCapabilities'],
        'org.mpris.MediaPlayer2': ['Identity', 'PlaybackStatus'],
    }

    WRITABLE_PROPERTIES = {
        'org.mpris.MediaPlayer2.Player': ['Volume'],
    }

    def __init__(self, client: SecureDBusClient):
        self.client = client
        self.logger = logging.getLogger('dbus.properties')

    def get_property(
        self,
        bus_name: str,
        object_path: str,
        interface: str,
        property_name: str
    ):
        """带访问控制获取属性。"""
        # 检查属性是否可读取
        allowed = self.READABLE_PROPERTIES.get(interface, [])
        if property_name not in allowed:
            raise SecurityError(f"属性{interface}.{property_name}不可读取")

        proxy = self.client.get_object(bus_name, object_path)
        props = dbus.Interface(proxy, 'org.freedesktop.DBus.Properties')

        value = props.Get(interface, property_name)
        self.logger.info(
            'property_read',
            extra={'interface': interface, 'property': property_name}
        )
        return value

    def set_property(
        self,
        bus_name: str,
        object_path: str,
        interface: str,
        property_name: str,
        value
    ):
        """带访问控制设置属性"""
        if self.client.permission_tier == 'read-only':
            raise PermissionError("设置属性需要'standard'权限等级")

        # 检查属性是否可写入
        allowed = self.WRITABLE_PROPERTIES.get(interface, [])
        if property_name not in allowed:
            raise SecurityError(f"属性{interface}.{property_name}不可写入")

        proxy = self.client.get_object(bus_name, object_path)
        props = dbus.Interface(proxy, 'org.freedesktop.DBus.Properties')

        props.Set(interface, property_name, value)
        self.logger.info(
            'property_write',
            extra={'interface': interface, 'property': property_name}
        )

Pattern 4: Service Discovery

模式4:服务发现

python
class ServiceDiscovery:
    """Discover D-Bus services safely."""

    def __init__(self, client: SecureDBusClient):
        self.client = client

    def list_names(self) -> list:
        """List available bus names (filtered)."""
        dbus_obj = self.client.bus.get_object(
            'org.freedesktop.DBus',
            '/org/freedesktop/DBus'
        )
        dbus_iface = dbus.Interface(dbus_obj, 'org.freedesktop.DBus')

        all_names = dbus_iface.ListNames()

        # Filter blocked services
        filtered = [
            name for name in all_names
            if name not in SecureDBusClient.BLOCKED_SERVICES
        ]

        return filtered

    def introspect(self, bus_name: str, object_path: str) -> str:
        """Get introspection XML for object."""
        if bus_name in SecureDBusClient.BLOCKED_SERVICES:
            raise SecurityError(f"Cannot introspect {bus_name}")

        proxy = self.client.get_object(bus_name, object_path)
        return proxy.Introspect(
            dbus_interface='org.freedesktop.DBus.Introspectable'
        )

python
class ServiceDiscovery:
    """安全发现D-Bus服务。"""

    def __init__(self, client: SecureDBusClient):
        self.client = client

    def list_names(self) -> list:
        """列出可用的总线名称(已过滤)。"""
        dbus_obj = self.client.bus.get_object(
            'org.freedesktop.DBus',
            '/org/freedesktop/DBus'
        )
        dbus_iface = dbus.Interface(dbus_obj, 'org.freedesktop.DBus')

        all_names = dbus_iface.ListNames()

        # 过滤被阻止的服务
        filtered = [
            name for name in all_names
            if name not in SecureDBusClient.BLOCKED_SERVICES
        ]

        return filtered

    def introspect(self, bus_name: str, object_path: str) -> str:
        """获取对象的自省XML。"""
        if bus_name in SecureDBusClient.BLOCKED_SERVICES:
            raise SecurityError(f"无法自省{bus_name}")

        proxy = self.client.get_object(bus_name, object_path)
        return proxy.Introspect(
            dbus_interface='org.freedesktop.DBus.Introspectable'
        )

5. Security Standards

5. 安全标准

5.1 Critical Vulnerabilities

5.1 关键漏洞

1. Privilege Escalation via PolicyKit (CVE-2021-4034)

1. 通过PolicyKit提升权限(CVE-2021-4034)

  • Severity: CRITICAL
  • Description: Polkit vulnerability for local privilege escalation
  • Mitigation: Block PolicyKit service access
  • 严重程度: 关键
  • 描述: Polkit本地权限提升漏洞
  • 缓解措施: 阻止PolicyKit服务访问

2. D-Bus Authentication Bypass (CVE-2022-42012)

2. D-Bus认证绕过(CVE-2022-42012)

  • Severity: HIGH
  • Description: Unauthorized session bus access
  • Mitigation: Validate peer credentials
  • 严重程度: 高
  • 描述: 未授权访问会话总线
  • 缓解措施: 验证对等方凭证

3. Service Impersonation (CWE-290)

3. 服务冒充(CWE-290)

  • Severity: HIGH
  • Description: Malicious service claiming trusted name
  • Mitigation: Verify service credentials
  • 严重程度: 高
  • 描述: 恶意服务声称受信任名称
  • 缓解措施: 验证服务凭证

4. Method Injection (CWE-74)

4. 方法注入(CWE-74)

  • Severity: MEDIUM
  • Description: Malicious method parameters
  • Mitigation: Input validation, service allowlists
  • 严重程度: 中
  • 描述: 恶意方法参数
  • 缓解措施: 输入验证、服务允许列表

5. Information Disclosure (CWE-200)

5. 信息泄露(CWE-200)

  • Severity: MEDIUM
  • Description: Exposing sensitive service data
  • Mitigation: Property access control
  • 严重程度: 中
  • 描述: 暴露敏感服务数据
  • 缓解措施: 属性访问控制

5.2 Permission Tier Model

5.2 权限等级模型

python
PERMISSION_TIERS = {
    'read-only': {
        'bus_type': 'session',
        'allowed_operations': ['get_property', 'introspect', 'list_names'],
        'blocked_services': BLOCKED_SERVICES,
    },
    'standard': {
        'bus_type': 'session',
        'allowed_operations': ['*', 'set_property', 'call_method'],
        'blocked_services': BLOCKED_SERVICES,
    },
    'elevated': {
        'bus_type': ['session', 'system'],
        'allowed_operations': ['*'],
        'blocked_services': ['org.freedesktop.PackageKit'],
    }
}

python
PERMISSION_TIERS = {
    'read-only': {
        'bus_type': 'session',
        'allowed_operations': ['get_property', 'introspect', 'list_names'],
        'blocked_services': BLOCKED_SERVICES,
    },
    'standard': {
        'bus_type': 'session',
        'allowed_operations': ['*', 'set_property', 'call_method'],
        'blocked_services': BLOCKED_SERVICES,
    },
    'elevated': {
        'bus_type': ['session', 'system'],
        'allowed_operations': ['*'],
        'blocked_services': ['org.freedesktop.PackageKit'],
    }
}

8. Common Mistakes

8. 常见错误

Never: Access System Bus Without Need

禁止:无必要时访问系统总线

python
undefined
python
undefined

BAD: Always use system bus

不推荐:始终使用系统总线

bus = dbus.SystemBus()
bus = dbus.SystemBus()

GOOD: Prefer session bus

推荐:优先使用会话总线

bus = dbus.SessionBus()
bus = dbus.SessionBus()

Only use system bus when required

仅在需要时使用系统总线

undefined
undefined

Never: Allow PolicyKit Access

禁止:允许PolicyKit访问

python
undefined
python
undefined

BAD: No service filtering

不推荐:无服务过滤

result = client.call_method('org.freedesktop.PolicyKit1', ...)
result = client.call_method('org.freedesktop.PolicyKit1', ...)

GOOD: Block privileged services

推荐:阻止特权服务

if service not in BLOCKED_SERVICES: result = client.call_method(service, ...)
undefined
if service not in BLOCKED_SERVICES: result = client.call_method(service, ...)
undefined

Never: Skip Timeout Enforcement

禁止:跳过超时实施

python
undefined
python
undefined

BAD: No timeout

不推荐:无超时

result = iface.SomeMethod()
result = iface.SomeMethod()

GOOD: With timeout

推荐:带超时

result = iface.SomeMethod(timeout=30)

---
result = iface.SomeMethod(timeout=30)

---

13. Pre-Deployment Checklist

13. 部署前检查清单

  • Service blocklist configured
  • Session bus preferred over system bus
  • Timeout enforcement on all calls
  • Peer credential validation
  • Audit logging enabled
  • Property access control configured

  • 已配置服务阻止列表
  • 优先使用会话总线而非系统总线
  • 所有调用都实施了超时
  • 已验证对等方凭证
  • 已启用审计日志
  • 已配置属性访问控制

14. Summary

14. 总结

Your goal is to create D-Bus automation that is:
  • Secure: Service blocklists, credential validation, access control
  • Reliable: Timeout enforcement, error handling
  • Minimal: Session bus by default, least privilege
Security Reminders:
  1. Always prefer session bus over system bus
  2. Block access to PolicyKit and systemd
  3. Validate peer credentials when needed
  4. Enforce timeouts on all method calls
  5. Log all operations for audit

您的目标是创建具备以下特性的D-Bus自动化:
  • 安全: 服务阻止列表、凭证验证、访问控制
  • 可靠: 超时实施、错误处理
  • 最小化: 默认使用会话总线,最小权限
安全提醒:
  1. 始终优先使用会话总线而非系统总线
  2. 阻止对PolicyKit和systemd的访问
  3. 必要时验证对等方凭证
  4. 对所有方法调用实施超时
  5. 记录所有操作用于审计

References

参考资料

  • See
    references/security-examples.md
  • See
    references/threat-model.md
  • See
    references/advanced-patterns.md
  • 参见
    references/security-examples.md
  • 参见
    references/threat-model.md
  • 参见
    references/advanced-patterns.md