local-llm-router

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Local LLM Router for Air-Gapped Networks

面向气隙网络的本地LLM路由

Intelligent routing of AI coding queries to local LLMs with Serena LSP integration for secure, offline-capable development environments.
通过集成Serena LSP,将AI编码请求智能路由至本地LLM,适用于安全、支持离线的开发环境。

Prerequisites (CRITICAL)

前提条件(至关重要)

Before using this skill, ensure:
  1. Serena MCP Server installed and running (PRIMARY TOOL)
  2. At least one local LLM service running (Ollama, LM Studio, Jan, etc.)
bash
undefined
使用本工具前,请确保:
  1. 已安装并运行Serena MCP Server(核心工具)
  2. 至少运行一个本地LLM服务(Ollama、LM Studio、Jan等)
bash
undefined

Install Serena (required)

安装Serena(必需)

pip install serena
pip install serena

Or via uvx

或通过uvx安装

uvx --from git+https://github.com/oraios/serena serena start-mcp-server
uvx --from git+https://github.com/oraios/serena serena start-mcp-server

Verify local LLM service

验证本地LLM服务

Quick Start

快速开始

python
import httpx
import asyncio
from dataclasses import dataclass
from enum import Enum
from typing import Optional

class TaskCategory(Enum):
    CODING = "coding"
    REASONING = "reasoning"
    ANALYSIS = "analysis"
    DOCUMENTATION = "documentation"

@dataclass
class RouterConfig:
    """Local LLM Router configuration."""
    ollama_url: str = "http://localhost:11434"
    lmstudio_url: str = "http://localhost:1234"
    jan_url: str = "http://localhost:1337"
    serena_enabled: bool = True
    timeout: int = 30

async def quick_route(query: str, config: RouterConfig = RouterConfig()):
    """Quick routing example - detects services and routes query."""

    # 1. Detect available services
    services = await discover_services(config)
    if not services:
        raise RuntimeError("No local LLM services available")

    # 2. Classify task
    category = classify_task(query)

    # 3. Select best model for task
    model = select_model(category, services)

    # 4. Execute query
    return await execute_query(query, model, services[0])
python
import httpx
import asyncio
from dataclasses import dataclass
from enum import Enum
from typing import Optional

class TaskCategory(Enum):
    CODING = "coding"
    REASONING = "reasoning"
    ANALYSIS = "analysis"
    DOCUMENTATION = "documentation"

@dataclass
class RouterConfig:
    """本地LLM路由配置。"""
    ollama_url: str = "http://localhost:11434"
    lmstudio_url: str = "http://localhost:1234"
    jan_url: str = "http://localhost:1337"
    serena_enabled: bool = True
    timeout: int = 30

async def quick_route(query: str, config: RouterConfig = RouterConfig()):
    """快速路由示例 - 检测服务并路由请求。"""

    # 1. 检测可用服务
    services = await discover_services(config)
    if not services:
        raise RuntimeError("无可用本地LLM服务")

    # 2. 分类任务
    category = classify_task(query)

    # 3. 为任务选择最佳模型
    model = select_model(category, services)

    # 4. 执行请求
    return await execute_query(query, model, services[0])

Example usage

示例用法

async def main(): response = await quick_route("Write a function to parse JSON safely") print(response)
asyncio.run(main())
undefined
async def main(): response = await quick_route("编写一个安全解析JSON的函数") print(response)
asyncio.run(main())
undefined

Serena Integration (PRIMARY TOOL)

Serena集成(核心工具)

CRITICAL: Serena MCP MUST be invoked FIRST for all code-related tasks. This provides semantic understanding of the codebase before routing to an LLM.
至关重要:所有代码相关任务必须首先调用Serena MCP。这会在路由至LLM前提供代码库的语义理解能力。

Why Serena First?

为何优先使用Serena?

  1. Token Efficiency: Serena extracts only relevant code context
  2. Accuracy: Symbol-level operations vs grep-style searches
  3. Codebase Awareness: Understands types, references, call hierarchies
  4. Edit Precision: Applies changes at symbol level, not string matching
  1. Token效率:Serena仅提取相关代码上下文
  2. 准确性:基于符号级操作而非grep式搜索
  3. 代码库感知:理解类型、引用、调用层级
  4. 编辑精度:在符号级别应用更改,而非字符串匹配

Serena MCP Setup

Serena MCP设置

python
import subprocess
import json
from typing import Any

class SerenaMCP:
    """Serena MCP client for code intelligence."""

    def __init__(self, workspace_root: str):
        self.workspace = workspace_root
        self.process = None

    async def start(self):
        """Start Serena MCP server."""
        self.process = subprocess.Popen(
            ["serena", "start-mcp-server", "--workspace", self.workspace],
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE
        )

    async def call(self, method: str, params: dict) -> Any:
        """Call Serena MCP method."""
        request = {
            "jsonrpc": "2.0",
            "id": 1,
            "method": method,
            "params": params
        }
        self.process.stdin.write(json.dumps(request).encode() + b"\n")
        self.process.stdin.flush()
        response = self.process.stdout.readline()
        return json.loads(response)

    async def find_symbol(self, name: str) -> dict:
        """Find symbol definition by name."""
        return await self.call("find_symbol", {"name": name})

    async def get_references(self, file: str, line: int, char: int) -> list:
        """Get all references to symbol at position."""
        return await self.call("get_references", {
            "file": file,
            "line": line,
            "character": char
        })

    async def get_hover_info(self, file: str, line: int, char: int) -> dict:
        """Get type/documentation info at position."""
        return await self.call("get_hover_info", {
            "file": file,
            "line": line,
            "character": char
        })

    async def get_diagnostics(self, file: str) -> list:
        """Get errors/warnings for file."""
        return await self.call("get_diagnostics", {"file": file})

    async def apply_edit(self, file: str, edits: list) -> bool:
        """Apply code edits to file."""
        return await self.call("apply_edit", {"file": file, "edits": edits})
python
import subprocess
import json
from typing import Any

class SerenaMCP:
    """用于代码智能的Serena MCP客户端。"""

    def __init__(self, workspace_root: str):
        self.workspace = workspace_root
        self.process = None

    async def start(self):
        """启动Serena MCP服务器。"""
        self.process = subprocess.Popen(
            ["serena", "start-mcp-server", "--workspace", self.workspace],
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE
        )

    async def call(self, method: str, params: dict) -> Any:
        """调用Serena MCP方法。"""
        request = {
            "jsonrpc": "2.0",
            "id": 1,
            "method": method,
            "params": params
        }
        self.process.stdin.write(json.dumps(request).encode() + b"\n")
        self.process.stdin.flush()
        response = self.process.stdout.readline()
        return json.loads(response)

    async def find_symbol(self, name: str) -> dict:
        """按名称查找符号定义。"""
        return await self.call("find_symbol", {"name": name})

    async def get_references(self, file: str, line: int, char: int) -> list:
        """获取指定位置符号的所有引用。"""
        return await self.call("get_references", {
            "file": file,
            "line": line,
            "character": char
        })

    async def get_hover_info(self, file: str, line: int, char: int) -> dict:
        """获取指定位置的类型/文档信息。"""
        return await self.call("get_hover_info", {
            "file": file,
            "line": line,
            "character": char
        })

    async def get_diagnostics(self, file: str) -> list:
        """获取文件的错误/警告信息。"""
        return await self.call("get_diagnostics", {"file": file})

    async def apply_edit(self, file: str, edits: list) -> bool:
        """将代码编辑应用到文件。"""
        return await self.call("apply_edit", {"file": file, "edits": edits})

Serena tools by priority (always use higher priority first)

Serena工具优先级(始终优先使用高优先级工具)

SERENA_TOOLS = { # Priority 1: Symbol-level operations (highest) "find_symbol": {"priority": 1, "use_for": ["navigation", "definition"]}, "get_references": {"priority": 1, "use_for": ["refactoring", "impact analysis"]}, "get_hover_info": {"priority": 1, "use_for": ["type info", "documentation"]},
# Priority 2: Code navigation
"go_to_definition": {"priority": 2, "use_for": ["navigation"]},
"go_to_type_definition": {"priority": 2, "use_for": ["type navigation"]},
"go_to_implementation": {"priority": 2, "use_for": ["interface impl"]},

# Priority 3: Code understanding
"get_document_symbols": {"priority": 3, "use_for": ["file structure"]},
"get_workspace_symbols": {"priority": 3, "use_for": ["codebase search"]},
"get_call_hierarchy": {"priority": 3, "use_for": ["call analysis"]},

# Priority 4: Code modification
"apply_edit": {"priority": 4, "use_for": ["editing"]},
"rename_symbol": {"priority": 4, "use_for": ["refactoring"]},

# Priority 5: Diagnostics
"get_diagnostics": {"priority": 5, "use_for": ["errors", "warnings"]},
"get_code_actions": {"priority": 5, "use_for": ["quick fixes"]},
}
undefined
SERENA_TOOLS = { # 优先级1:符号级操作(最高) "find_symbol": {"priority": 1, "use_for": ["navigation", "definition"]}, "get_references": {"priority": 1, "use_for": ["refactoring", "impact analysis"]}, "get_hover_info": {"priority": 1, "use_for": ["type info", "documentation"]},
# 优先级2:代码导航
"go_to_definition": {"priority": 2, "use_for": ["navigation"]},
"go_to_type_definition": {"priority": 2, "use_for": ["type navigation"]},
"go_to_implementation": {"priority": 2, "use_for": ["interface impl"]},

# 优先级3:代码理解
"get_document_symbols": {"priority": 3, "use_for": ["file structure"]},
"get_workspace_symbols": {"priority": 3, "use_for": ["codebase search"]},
"get_call_hierarchy": {"priority": 3, "use_for": ["call analysis"]},

# 优先级4:代码修改
"apply_edit": {"priority": 4, "use_for": ["editing"]},
"rename_symbol": {"priority": 4, "use_for": ["refactoring"]},

# 优先级5:诊断
"get_diagnostics": {"priority": 5, "use_for": ["errors", "warnings"]},
"get_code_actions": {"priority": 5, "use_for": ["quick fixes"]},
}
undefined

Serena-First Request Handler

Serena优先的请求处理器

python
async def handle_code_request(
    query: str,
    file_context: Optional[dict] = None,
    serena: SerenaMCP = None,
    router: "LLMRouter" = None
):
    """
    Handle code request with Serena-first pattern.

    CRITICAL: Serena is ALWAYS invoked first for code tasks.
    """

    # Step 1: Classify the task
    category = classify_task(query)

    # Step 2: ALWAYS use Serena for code context (if available)
    serena_context = {}
    if serena and file_context:
        # Gather semantic context from Serena
        if file_context.get("file") and file_context.get("position"):
            file = file_context["file"]
            line = file_context["position"]["line"]
            char = file_context["position"]["character"]

            # Get hover info (type, docs)
            serena_context["hover"] = await serena.get_hover_info(file, line, char)

            # For refactoring/analysis, get references
            if category in [TaskCategory.ANALYSIS, TaskCategory.CODING]:
                if "refactor" in query.lower() or "rename" in query.lower():
                    serena_context["references"] = await serena.get_references(
                        file, line, char
                    )

            # Always get diagnostics for the file
            serena_context["diagnostics"] = await serena.get_diagnostics(file)

    # Step 3: Build enriched prompt with Serena context
    enriched_query = build_enriched_query(query, serena_context)

    # Step 4: Select and route to appropriate LLM
    model = router.select_model(category)
    response = await router.execute(enriched_query, model)

    # Step 5: If response contains edits, apply via Serena
    if serena and contains_code_edit(response):
        edits = parse_code_edits(response)
        await serena.apply_edit(file_context["file"], edits)

    return response

def build_enriched_query(query: str, serena_context: dict) -> str:
    """Build query enriched with Serena context."""
    parts = [query]

    if serena_context.get("hover"):
        hover = serena_context["hover"]
        parts.append(f"\n## Type Information\n```\n{hover}\n```")

    if serena_context.get("references"):
        refs = serena_context["references"]
        parts.append(f"\n## References ({len(refs)} found)\n")
        for ref in refs[:10]:  # Limit to first 10
            parts.append(f"- {ref['file']}:{ref['line']}")

    if serena_context.get("diagnostics"):
        diags = serena_context["diagnostics"]
        if diags:
            parts.append(f"\n## Current Issues ({len(diags)})\n")
            for diag in diags[:5]:
                parts.append(f"- Line {diag['line']}: {diag['message']}")

    return "\n".join(parts)
python
async def handle_code_request(
    query: str,
    file_context: Optional[dict] = None,
    serena: SerenaMCP = None,
    router: "LLMRouter" = None
):
    """
    使用Serena优先模式处理代码请求。

    至关重要:代码任务必须始终首先调用Serena。
    """

    # 步骤1:分类任务
    category = classify_task(query)

    # 步骤2:如果可用,始终使用Serena获取代码上下文
    serena_context = {}
    if serena and file_context:
        # 从Serena收集语义上下文
        if file_context.get("file") and file_context.get("position"):
            file = file_context["file"]
            line = file_context["position"]["line"]
            char = file_context["position"]["character"]

            # 获取悬停信息(类型、文档)
            serena_context["hover"] = await serena.get_hover_info(file, line, char)

            # 对于重构/分析任务,获取引用
            if category in [TaskCategory.ANALYSIS, TaskCategory.CODING]:
                if "refactor" in query.lower() or "rename" in query.lower():
                    serena_context["references"] = await serena.get_references(
                        file, line, char
                    )

            # 始终获取文件的诊断信息
            serena_context["diagnostics"] = await serena.get_diagnostics(file)

    # 步骤3:使用Serena上下文构建增强请求
    enriched_query = build_enriched_query(query, serena_context)

    # 步骤4:选择并路由至合适的LLM
    model = router.select_model(category)
    response = await router.execute(enriched_query, model)

    # 步骤5:如果响应包含编辑内容,通过Serena应用
    if serena and contains_code_edit(response):
        edits = parse_code_edits(response)
        await serena.apply_edit(file_context["file"], edits)

    return response

def build_enriched_query(query: str, serena_context: dict) -> str:
    """使用Serena上下文构建增强请求。"""
    parts = [query]

    if serena_context.get("hover"):
        hover = serena_context["hover"]
        parts.append(f"\n## 类型信息\n```\n{hover}\n```")

    if serena_context.get("references"):
        refs = serena_context["references"]
        parts.append(f"\n## 引用(找到{len(refs)}个)\n")
        for ref in refs[:10]:  # 限制为前10个
            parts.append(f"- {ref['file']}:{ref['line']}")

    if serena_context.get("diagnostics"):
        diags = serena_context["diagnostics"]
        if diags:
            parts.append(f"\n## 当前问题({len(diags)}个)\n")
            for diag in diags[:5]:
                parts.append(f"- 第{diag['line']}行: {diag['message']}")

    return "\n".join(parts)

Service Discovery

服务发现

Supported Services

支持的服务

ServiceDefault EndpointHealth CheckModels EndpointChat EndpointAPI Style
Ollama
localhost:11434
/api/version
/api/tags
/api/chat
Native
LM Studio
localhost:1234
/v1/models
/v1/models
/v1/chat/completions
OpenAI
Jan
localhost:1337
/v1/models
/v1/models
/v1/chat/completions
OpenAI
OpenWebUI
localhost:3000
/api/health
/api/models
/api/chat
Custom
LocalAI
localhost:8080
/readyz
/v1/models
/v1/chat/completions
OpenAI
vLLM
localhost:8000
/health
/v1/models
/v1/chat/completions
OpenAI
llama.cpp
localhost:8080
/health
/v1/models
/v1/chat/completions
OpenAI
Kobold.cpp
localhost:5001
/api/v1/info
/api/v1/models
/api/v1/generate
Custom
GPT4All
localhost:4891
/v1/models
/v1/models
/v1/chat/completions
OpenAI
text-generation-webui
localhost:5000
/api/v1/model
/api/v1/models
/api/v1/chat
Custom
服务默认端点健康检查模型端点聊天端点API风格
Ollama
localhost:11434
/api/version
/api/tags
/api/chat
原生
LM Studio
localhost:1234
/v1/models
/v1/models
/v1/chat/completions
OpenAI
Jan
localhost:1337
/v1/models
/v1/models
/v1/chat/completions
OpenAI
OpenWebUI
localhost:3000
/api/health
/api/models
/api/chat
自定义
LocalAI
localhost:8080
/readyz
/v1/models
/v1/chat/completions
OpenAI
vLLM
localhost:8000
/health
/v1/models
/v1/chat/completions
OpenAI
llama.cpp
localhost:8080
/health
/v1/models
/v1/chat/completions
OpenAI
Kobold.cpp
localhost:5001
/api/v1/info
/api/v1/models
/api/v1/generate
自定义
GPT4All
localhost:4891
/v1/models
/v1/models
/v1/chat/completions
OpenAI
text-generation-webui
localhost:5000
/api/v1/model
/api/v1/models
/api/v1/chat
自定义

OS Detection

操作系统检测

python
import sys
import os
import platform
from dataclasses import dataclass

@dataclass
class OSInfo:
    platform: str      # 'windows', 'linux', 'darwin'
    release: str
    arch: str          # 'x64', 'arm64'
    is_wsl: bool
    is_container: bool

def detect_os() -> OSInfo:
    """Detect operating system and environment."""
    plat = sys.platform

    # Normalize platform name
    if plat == 'win32':
        plat = 'windows'
    elif plat == 'darwin':
        plat = 'darwin'
    else:
        plat = 'linux'

    # WSL detection
    is_wsl = False
    if plat == 'linux':
        try:
            with open('/proc/version', 'r') as f:
                is_wsl = 'microsoft' in f.read().lower()
        except FileNotFoundError:
            pass
        is_wsl = is_wsl or os.environ.get('WSL_DISTRO_NAME') is not None

    # Container detection
    is_container = (
        os.path.exists('/.dockerenv') or
        os.environ.get('KUBERNETES_SERVICE_HOST') is not None
    )
    if not is_container and plat == 'linux':
        try:
            with open('/proc/1/cgroup', 'r') as f:
                is_container = 'docker' in f.read() or 'kubepods' in f.read()
        except FileNotFoundError:
            pass

    return OSInfo(
        platform=plat,
        release=platform.release(),
        arch=platform.machine(),
        is_wsl=is_wsl,
        is_container=is_container
    )

def adjust_endpoint_for_os(endpoint: str, os_info: OSInfo) -> str:
    """Adjust endpoint based on OS environment."""
    if os_info.is_wsl or os_info.is_container:
        # In WSL/containers, localhost services are on the host
        return endpoint.replace('localhost', 'host.docker.internal')
    return endpoint
python
import sys
import os
import platform
from dataclasses import dataclass

@dataclass
class OSInfo:
    platform: str      # 'windows', 'linux', 'darwin'
    release: str
    arch: str          # 'x64', 'arm64'
    is_wsl: bool
    is_container: bool

def detect_os() -> OSInfo:
    """检测操作系统和环境。"""
    plat = sys.platform

    # 标准化平台名称
    if plat == 'win32':
        plat = 'windows'
    elif plat == 'darwin':
        plat = 'darwin'
    else:
        plat = 'linux'

    # WSL检测
    is_wsl = False
    if plat == 'linux':
        try:
            with open('/proc/version', 'r') as f:
                is_wsl = 'microsoft' in f.read().lower()
        except FileNotFoundError:
            pass
        is_wsl = is_wsl or os.environ.get('WSL_DISTRO_NAME') is not None

    # 容器检测
    is_container = (
        os.path.exists('/.dockerenv') or
        os.environ.get('KUBERNETES_SERVICE_HOST') is not None
    )
    if not is_container and plat == 'linux':
        try:
            with open('/proc/1/cgroup', 'r') as f:
                is_container = 'docker' in f.read() or 'kubepods' in f.read()
        except FileNotFoundError:
            pass

    return OSInfo(
        platform=plat,
        release=platform.release(),
        arch=platform.machine(),
        is_wsl=is_wsl,
        is_container=is_container
    )

def adjust_endpoint_for_os(endpoint: str, os_info: OSInfo) -> str:
    """根据操作系统环境调整端点。"""
    if os_info.is_wsl or os_info.is_container:
        # 在WSL/容器中,localhost服务位于主机
        return endpoint.replace('localhost', 'host.docker.internal')
    return endpoint

Service Discovery Implementation

服务发现实现

python
import httpx
import asyncio
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional

@dataclass
class DiscoveredModel:
    id: str
    name: str
    size: int = 0
    family: Optional[str] = None
    context_length: int = 4096
    quantization: Optional[str] = None

@dataclass
class LLMService:
    name: str
    type: str  # 'ollama', 'lmstudio', 'jan', 'openwebui', 'custom'
    endpoint: str
    status: str = 'unknown'  # 'online', 'offline', 'unknown'
    models: list = field(default_factory=list)
    last_checked: datetime = None
    api_style: str = 'openai'  # 'openai', 'native'

    # Endpoint paths
    health_path: str = '/v1/models'
    models_path: str = '/v1/models'
    chat_path: str = '/v1/chat/completions'
python
import httpx
import asyncio
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional

@dataclass
class DiscoveredModel:
    id: str
    name: str
    size: int = 0
    family: Optional[str] = None
    context_length: int = 4096
    quantization: Optional[str] = None

@dataclass
class LLMService:
    name: str
    type: str  # 'ollama', 'lmstudio', 'jan', 'openwebui', 'custom'
    endpoint: str
    status: str = 'unknown'  # 'online', 'offline', 'unknown'
    models: list = field(default_factory=list)
    last_checked: datetime = None
    api_style: str = 'openai'  # 'openai', 'native'

    # 端点路径
    health_path: str = '/v1/models'
    models_path: str = '/v1/models'
    chat_path: str = '/v1/chat/completions'

Default service configurations

默认服务配置

SERVICE_DEFAULTS = { 'ollama': LLMService( name='Ollama', type='ollama', endpoint='http://localhost:11434', health_path='/api/version', models_path='/api/tags', chat_path='/api/chat', api_style='native' ), 'lmstudio': LLMService( name='LM Studio', type='lmstudio', endpoint='http://localhost:1234', health_path='/v1/models', models_path='/v1/models', chat_path='/v1/chat/completions', api_style='openai' ), 'jan': LLMService( name='Jan', type='jan', endpoint='http://localhost:1337', health_path='/v1/models', models_path='/v1/models', chat_path='/v1/chat/completions', api_style='openai' ), 'openwebui': LLMService( name='Open WebUI', type='openwebui', endpoint='http://localhost:3000', health_path='/api/health', models_path='/api/models', chat_path='/api/chat', api_style='custom' ), 'localai': LLMService( name='LocalAI', type='localai', endpoint='http://localhost:8080', health_path='/readyz', models_path='/v1/models', chat_path='/v1/chat/completions', api_style='openai' ), 'vllm': LLMService( name='vLLM', type='vllm', endpoint='http://localhost:8000', health_path='/health', models_path='/v1/models', chat_path='/v1/chat/completions', api_style='openai' ), 'llamacpp': LLMService( name='llama.cpp', type='llamacpp', endpoint='http://localhost:8080', health_path='/health', models_path='/v1/models', chat_path='/v1/chat/completions', api_style='openai' ), 'koboldcpp': LLMService( name='Kobold.cpp', type='koboldcpp', endpoint='http://localhost:5001', health_path='/api/v1/info', models_path='/api/v1/model', chat_path='/api/v1/generate', api_style='custom' ), 'gpt4all': LLMService( name='GPT4All', type='gpt4all', endpoint='http://localhost:4891', health_path='/v1/models', models_path='/v1/models', chat_path='/v1/chat/completions', api_style='openai' ), }
class ServiceDiscovery: """Discover and monitor local LLM services."""
def __init__(self, custom_endpoints: list = None):
    self.services: dict[str, LLMService] = {}
    self.os_info = detect_os()
    self.custom_endpoints = custom_endpoints or []
    self._client = httpx.AsyncClient(timeout=5.0)

async def discover_all(self) -> list[LLMService]:
    """Discover all available LLM services."""
    discovered = []

    # Check default services
    tasks = []
    for key, default in SERVICE_DEFAULTS.items():
        service = LLMService(
            name=default.name,
            type=default.type,
            endpoint=adjust_endpoint_for_os(default.endpoint, self.os_info),
            health_path=default.health_path,
            models_path=default.models_path,
            chat_path=default.chat_path,
            api_style=default.api_style
        )
        tasks.append(self._check_service(service))

    # Check custom endpoints
    for custom in self.custom_endpoints:
        service = LLMService(
            name=custom.get('name', 'Custom'),
            type='custom',
            endpoint=custom['endpoint'],
            health_path=custom.get('health_path', '/v1/models'),
            models_path=custom.get('models_path', '/v1/models'),
            chat_path=custom.get('chat_path', '/v1/chat/completions'),
            api_style=custom.get('api_style', 'openai')
        )
        tasks.append(self._check_service(service))

    results = await asyncio.gather(*tasks, return_exceptions=True)

    for result in results:
        if isinstance(result, LLMService) and result.status == 'online':
            discovered.append(result)
            self.services[result.type] = result

    return discovered

async def _check_service(self, service: LLMService) -> LLMService:
    """Check if service is online and discover models."""
    try:
        # Health check
        response = await self._client.get(
            f"{service.endpoint}{service.health_path}"
        )

        if response.status_code == 200:
            service.status = 'online'
            service.last_checked = datetime.now()

            # Discover models
            service.models = await self._discover_models(service)
        else:
            service.status = 'offline'

    except (httpx.ConnectError, httpx.TimeoutException):
        service.status = 'offline'

    return service

async def _discover_models(self, service: LLMService) -> list[DiscoveredModel]:
    """Discover available models on service."""
    try:
        response = await self._client.get(
            f"{service.endpoint}{service.models_path}"
        )
        data = response.json()

        # Parse based on service type
        if service.type == 'ollama':
            return [
                DiscoveredModel(
                    id=m['name'],
                    name=m['name'],
                    size=m.get('size', 0),
                    family=m.get('details', {}).get('family'),
                    context_length=self._infer_context_length(m['name'])
                )
                for m in data.get('models', [])
            ]
        else:  # OpenAI-style
            return [
                DiscoveredModel(
                    id=m['id'],
                    name=m['id'],
                    context_length=m.get('context_length', 4096)
                )
                for m in data.get('data', [])
            ]
    except Exception:
        return []

def _infer_context_length(self, model_name: str) -> int:
    """Infer context length from model name."""
    name_lower = model_name.lower()

    # Check for explicit context markers
    if '128k' in name_lower or '131k' in name_lower:
        return 131072
    if '64k' in name_lower:
        return 65536
    if '32k' in name_lower:
        return 32768
    if '16k' in name_lower:
        return 16384

    # Model family defaults
    if 'qwen' in name_lower:
        return 131072  # Qwen models typically have 128K+
    if 'deepseek' in name_lower:
        return 128000
    if 'llama-3' in name_lower or 'llama3' in name_lower:
        return 128000
    if 'codellama' in name_lower:
        return 100000
    if 'mixtral' in name_lower:
        return 65536

    return 8192  # Safe default
undefined
SERVICE_DEFAULTS = { 'ollama': LLMService( name='Ollama', type='ollama', endpoint='http://localhost:11434', health_path='/api/version', models_path='/api/tags', chat_path='/api/chat', api_style='native' ), 'lmstudio': LLMService( name='LM Studio', type='lmstudio', endpoint='http://localhost:1234', health_path='/v1/models', models_path='/v1/models', chat_path='/v1/chat/completions', api_style='openai' ), 'jan': LLMService( name='Jan', type='jan', endpoint='http://localhost:1337', health_path='/v1/models', models_path='/v1/models', chat_path='/v1/chat/completions', api_style='openai' ), 'openwebui': LLMService( name='Open WebUI', type='openwebui', endpoint='http://localhost:3000', health_path='/api/health', models_path='/api/models', chat_path='/api/chat', api_style='custom' ), 'localai': LLMService( name='LocalAI', type='localai', endpoint='http://localhost:8080', health_path='/readyz', models_path='/v1/models', chat_path='/v1/chat/completions', api_style='openai' ), 'vllm': LLMService( name='vLLM', type='vllm', endpoint='http://localhost:8000', health_path='/health', models_path='/v1/models', chat_path='/v1/chat/completions', api_style='openai' ), 'llamacpp': LLMService( name='llama.cpp', type='llamacpp', endpoint='http://localhost:8080', health_path='/health', models_path='/v1/models', chat_path='/v1/chat/completions', api_style='openai' ), 'koboldcpp': LLMService( name='Kobold.cpp', type='koboldcpp', endpoint='http://localhost:5001', health_path='/api/v1/info', models_path='/api/v1/model', chat_path='/api/v1/generate', api_style='custom' ), 'gpt4all': LLMService( name='GPT4All', type='gpt4all', endpoint='http://localhost:4891', health_path='/v1/models', models_path='/v1/models', chat_path='/v1/chat/completions', api_style='openai' ), }
class ServiceDiscovery: """发现并监控本地LLM服务。"""
def __init__(self, custom_endpoints: list = None):
    self.services: dict[str, LLMService] = {}
    self.os_info = detect_os()
    self.custom_endpoints = custom_endpoints or []
    self._client = httpx.AsyncClient(timeout=5.0)

async def discover_all(self) -> list[LLMService]:
    """发现所有可用LLM服务。"""
    discovered = []

    # 检查默认服务
    tasks = []
    for key, default in SERVICE_DEFAULTS.items():
        service = LLMService(
            name=default.name,
            type=default.type,
            endpoint=adjust_endpoint_for_os(default.endpoint, self.os_info),
            health_path=default.health_path,
            models_path=default.models_path,
            chat_path=default.chat_path,
            api_style=default.api_style
        )
        tasks.append(self._check_service(service))

    # 检查自定义端点
    for custom in self.custom_endpoints:
        service = LLMService(
            name=custom.get('name', 'Custom'),
            type='custom',
            endpoint=custom['endpoint'],
            health_path=custom.get('health_path', '/v1/models'),
            models_path=custom.get('models_path', '/v1/models'),
            chat_path=custom.get('chat_path', '/v1/chat/completions'),
            api_style=custom.get('api_style', 'openai')
        )
        tasks.append(self._check_service(service))

    results = await asyncio.gather(*tasks, return_exceptions=True)

    for result in results:
        if isinstance(result, LLMService) and result.status == 'online':
            discovered.append(result)
            self.services[result.type] = result

    return discovered

async def _check_service(self, service: LLMService) -> LLMService:
    """检查服务是否在线并发现模型。"""
    try:
        # 健康检查
        response = await self._client.get(
            f"{service.endpoint}{service.health_path}"
        )

        if response.status_code == 200:
            service.status = 'online'
            service.last_checked = datetime.now()

            # 发现模型
            service.models = await self._discover_models(service)
        else:
            service.status = 'offline'

    except (httpx.ConnectError, httpx.TimeoutException):
        service.status = 'offline'

    return service

async def _discover_models(self, service: LLMService) -> list[DiscoveredModel]:
    """发现服务上的可用模型。"""
    try:
        response = await self._client.get(
            f"{service.endpoint}{service.models_path}"
        )
        data = response.json()

        # 根据服务类型解析
        if service.type == 'ollama':
            return [
                DiscoveredModel(
                    id=m['name'],
                    name=m['name'],
                    size=m.get('size', 0),
                    family=m.get('details', {}).get('family'),
                    context_length=self._infer_context_length(m['name'])
                )
                for m in data.get('models', [])
            ]
        else:  # OpenAI风格
            return [
                DiscoveredModel(
                    id=m['id'],
                    name=m['id'],
                    context_length=m.get('context_length', 4096)
                )
                for m in data.get('data', [])
            ]
    except Exception:
        return []

def _infer_context_length(self, model_name: str) -> int:
    """从模型名称推断上下文长度。"""
    name_lower = model_name.lower()

    # 检查显式上下文标记
    if '128k' in name_lower or '131k' in name_lower:
        return 131072
    if '64k' in name_lower:
        return 65536
    if '32k' in name_lower:
        return 32768
    if '16k' in name_lower:
        return 16384

    # 模型系列默认值
    if 'qwen' in name_lower:
        return 131072  # Qwen模型通常有128K+
    if 'deepseek' in name_lower:
        return 128000
    if 'llama-3' in name_lower or 'llama3' in name_lower:
        return 128000
    if 'codellama' in name_lower:
        return 100000
    if 'mixtral' in name_lower:
        return 65536

    return 8192  # 安全默认值
undefined

Task Classification

任务分类

Classification System

分类系统

python
import re
from enum import Enum
from dataclasses import dataclass

class TaskCategory(Enum):
    CODING = "coding"
    REASONING = "reasoning"
    ANALYSIS = "analysis"
    DOCUMENTATION = "documentation"

@dataclass
class ClassificationResult:
    category: TaskCategory
    confidence: float  # 0.0 - 1.0
    requires_serena: bool
    keywords_matched: list[str]
python
import re
from enum import Enum
from dataclasses import dataclass

class TaskCategory(Enum):
    CODING = "coding"
    REASONING = "reasoning"
    ANALYSIS = "analysis"
    DOCUMENTATION = "documentation"

@dataclass
class ClassificationResult:
    category: TaskCategory
    confidence: float  # 0.0 - 1.0
    requires_serena: bool
    keywords_matched: list[str]

Task patterns (regex)

任务模式(正则表达式)

TASK_PATTERNS = { TaskCategory.CODING: [ r"(?:write|create|implement|code|generate)\s+(?:a\s+)?(?:function|class|method|component)", r"(?:fix|debug|solve)\s+(?:this|the)\s+(?:bug|error|issue)", r"refactor\s+(?:this|the)", r"add\s+(?:error\s+handling|validation|logging|tests?)", r"complete\s+(?:this|the)\s+code", r"(?:convert|translate)\s+(?:this|the)\s+code", r"(?:optimize|improve)\s+(?:this|the)\s+(?:function|code|performance)", ], TaskCategory.REASONING: [ r"(?:design|architect|plan)\s+(?:a|the)\s+(?:system|architecture|solution)", r"how\s+should\s+(?:I|we)\s+(?:approach|structure|implement)", r"what\s+(?:is|would\s+be)\s+the\s+best\s+(?:way|approach|pattern)", r"explain\s+the\s+(?:logic|reasoning|algorithm)", r"compare\s+(?:and\s+contrast|between)", r"(?:recommend|suggest)\s+(?:an?\s+)?(?:approach|solution|pattern)", r"trade-?offs?\s+(?:between|of)", ], TaskCategory.ANALYSIS: [ r"(?:review|analyze|audit)\s+(?:this|the)\s+code", r"find\s+(?:potential\s+)?(?:issues|vulnerabilities|bugs|problems)", r"(?:security|performance)\s+(?:review|analysis|audit)", r"what\s+(?:could|might)\s+go\s+wrong", r"identify\s+(?:problems|improvements|issues)", r"(?:check|scan)\s+for\s+(?:vulnerabilities|issues)", ], TaskCategory.DOCUMENTATION: [ r"(?:write|create|generate)\s+(?:documentation|docs|docstring)", r"(?:add|write)\s+(?:comments|jsdoc|docstring|type\s+hints)", r"(?:document|explain)\s+(?:this|the)\s+(?:code|function|api)", r"(?:create|write)\s+(?:a\s+)?readme", r"(?:generate|write)\s+(?:api\s+)?documentation", r"describe\s+(?:what|how)\s+(?:this|the)", ], }
TASK_PATTERNS = { TaskCategory.CODING: [ r"(?:write|create|implement|code|generate)\s+(?:a\s+)?(?:function|class|method|component)", r"(?:fix|debug|solve)\s+(?:this|the)\s+(?:bug|error|issue)", r"refactor\s+(?:this|the)", r"add\s+(?:error\s+handling|validation|logging|tests?)", r"complete\s+(?:this|the)\s+code", r"(?:convert|translate)\s+(?:this|the)\s+code", r"(?:optimize|improve)\s+(?:this|the)\s+(?:function|code|performance)", ], TaskCategory.REASONING: [ r"(?:design|architect|plan)\s+(?:a|the)\s+(?:system|architecture|solution)", r"how\s+should\s+(?:I|we)\s+(?:approach|structure|implement)", r"what\s+(?:is|would\s+be)\s+the\s+best\s+(?:way|approach|pattern)", r"explain\s+the\s+(?:logic|reasoning|algorithm)", r"compare\s+(?:and\s+contrast|between)", r"(?:recommend|suggest)\s+(?:an?\s+)?(?:approach|solution|pattern)", r"trade-?offs?\s+(?:between|of)", ], TaskCategory.ANALYSIS: [ r"(?:review|analyze|audit)\s+(?:this|the)\s+code", r"find\s+(?:potential\s+)?(?:issues|vulnerabilities|bugs|problems)", r"(?:security|performance)\s+(?:review|analysis|audit)", r"what\s+(?:could|might)\s+go\s+wrong", r"identify\s+(?:problems|improvements|issues)", r"(?:check|scan)\s+for\s+(?:vulnerabilities|issues)", ], TaskCategory.DOCUMENTATION: [ r"(?:write|create|generate)\s+(?:documentation|docs|docstring)", r"(?:add|write)\s+(?:comments|jsdoc|docstring|type\s+hints)", r"(?:document|explain)\s+(?:this|the)\s+(?:code|function|api)", r"(?:create|write)\s+(?:a\s+)?readme", r"(?:generate|write)\s+(?:api\s+)?documentation", r"describe\s+(?:what|how)\s+(?:this|the)", ], }

Keyword weights for scoring

关键词权重用于评分

KEYWORD_WEIGHTS = { # Coding "function": (TaskCategory.CODING, 0.3), "implement": (TaskCategory.CODING, 0.4), "code": (TaskCategory.CODING, 0.2), "debug": (TaskCategory.CODING, 0.5), "refactor": (TaskCategory.CODING, 0.6), "fix": (TaskCategory.CODING, 0.4), "test": (TaskCategory.CODING, 0.3), "bug": (TaskCategory.CODING, 0.5),
# Reasoning
"architecture": (TaskCategory.REASONING, 0.6),
"design": (TaskCategory.REASONING, 0.4),
"approach": (TaskCategory.REASONING, 0.3),
"strategy": (TaskCategory.REASONING, 0.5),
"tradeoff": (TaskCategory.REASONING, 0.5),
"compare": (TaskCategory.REASONING, 0.4),
"recommend": (TaskCategory.REASONING, 0.4),

# Analysis
"review": (TaskCategory.ANALYSIS, 0.5),
"analyze": (TaskCategory.ANALYSIS, 0.6),
"security": (TaskCategory.ANALYSIS, 0.4),
"vulnerability": (TaskCategory.ANALYSIS, 0.7),
"performance": (TaskCategory.ANALYSIS, 0.3),
"audit": (TaskCategory.ANALYSIS, 0.6),

# Documentation
"document": (TaskCategory.DOCUMENTATION, 0.6),
"readme": (TaskCategory.DOCUMENTATION, 0.8),
"docstring": (TaskCategory.DOCUMENTATION, 0.8),
"comment": (TaskCategory.DOCUMENTATION, 0.4),
"explain": (TaskCategory.DOCUMENTATION, 0.3),
}
def classify_task(query: str) -> ClassificationResult: """Classify a query into a task category.""" query_lower = query.lower() scores = {cat: 0.0 for cat in TaskCategory} matched_keywords = []
# Pattern matching (weight: 0.5)
for category, patterns in TASK_PATTERNS.items():
    for pattern in patterns:
        if re.search(pattern, query_lower):
            scores[category] += 0.5

# Keyword scoring (weight: 0.5)
words = re.findall(r'\w+', query_lower)
for word in words:
    if word in KEYWORD_WEIGHTS:
        category, weight = KEYWORD_WEIGHTS[word]
        scores[category] += weight * 0.5
        matched_keywords.append(word)

# Find highest scoring category
best_category = max(scores, key=scores.get)
confidence = min(scores[best_category], 1.0)

# Default to CODING if no clear match
if confidence < 0.2:
    best_category = TaskCategory.CODING
    confidence = 0.5

# Determine if Serena is required
requires_serena = (
    best_category == TaskCategory.ANALYSIS or
    any(kw in query_lower for kw in [
        'definition', 'reference', 'symbol', 'rename',
        'where is', 'find all', 'go to', 'jump to'
    ])
)

return ClassificationResult(
    category=best_category,
    confidence=confidence,
    requires_serena=requires_serena,
    keywords_matched=matched_keywords
)
undefined
KEYWORD_WEIGHTS = { # 编码 "function": (TaskCategory.CODING, 0.3), "implement": (TaskCategory.CODING, 0.4), "code": (TaskCategory.CODING, 0.2), "debug": (TaskCategory.CODING, 0.5), "refactor": (TaskCategory.CODING, 0.6), "fix": (TaskCategory.CODING, 0.4), "test": (TaskCategory.CODING, 0.3), "bug": (TaskCategory.CODING, 0.5),
# 推理
"architecture": (TaskCategory.REASONING, 0.6),
"design": (TaskCategory.REASONING, 0.4),
"approach": (TaskCategory.REASONING, 0.3),
"strategy": (TaskCategory.REASONING, 0.5),
"tradeoff": (TaskCategory.REASONING, 0.5),
"compare": (TaskCategory.REASONING, 0.4),
"recommend": (TaskCategory.REASONING, 0.4),

# 分析
"review": (TaskCategory.ANALYSIS, 0.5),
"analyze": (TaskCategory.ANALYSIS, 0.6),
"security": (TaskCategory.ANALYSIS, 0.4),
"vulnerability": (TaskCategory.ANALYSIS, 0.7),
"performance": (TaskCategory.ANALYSIS, 0.3),
"audit": (TaskCategory.ANALYSIS, 0.6),

# 文档
"document": (TaskCategory.DOCUMENTATION, 0.6),
"readme": (TaskCategory.DOCUMENTATION, 0.8),
"docstring": (TaskCategory.DOCUMENTATION, 0.8),
"comment": (TaskCategory.DOCUMENTATION, 0.4),
"explain": (TaskCategory.DOCUMENTATION, 0.3),
}
def classify_task(query: str) -> ClassificationResult: """将请求分类为任务类别。""" query_lower = query.lower() scores = {cat: 0.0 for cat in TaskCategory} matched_keywords = []
# 模式匹配(权重:0.5)
for category, patterns in TASK_PATTERNS.items():
    for pattern in patterns:
        if re.search(pattern, query_lower):
            scores[category] += 0.5

# 关键词评分(权重:0.5)
words = re.findall(r'\w+', query_lower)
for word in words:
    if word in KEYWORD_WEIGHTS:
        category, weight = KEYWORD_WEIGHTS[word]
        scores[category] += weight * 0.5
        matched_keywords.append(word)

# 找到得分最高的类别
best_category = max(scores, key=scores.get)
confidence = min(scores[best_category], 1.0)

# 如果没有明确匹配,默认分类为CODING
if confidence < 0.2:
    best_category = TaskCategory.CODING
    confidence = 0.5

# 确定是否需要Serena
requires_serena = (
    best_category == TaskCategory.ANALYSIS or
    any(kw in query_lower for kw in [
        'definition', 'reference', 'symbol', 'rename',
        'where is', 'find all', 'go to', 'jump to'
    ])
)

return ClassificationResult(
    category=best_category,
    confidence=confidence,
    requires_serena=requires_serena,
    keywords_matched=matched_keywords
)
undefined

Model Selection

模型选择

Model Capability Matrix

模型能力矩阵

python
from dataclasses import dataclass
from typing import Optional

@dataclass
class ModelCapability:
    id: str
    family: str
    context_window: int
    vram_gb: float
    categories: list[TaskCategory]
    performance_scores: dict[TaskCategory, int]  # 0-100
    tier: int  # 1=best, 2=good, 3=basic
    quantization: Optional[str] = None
python
from dataclasses import dataclass
from typing import Optional

@dataclass
class ModelCapability:
    id: str
    family: str
    context_window: int
    vram_gb: float
    categories: list[TaskCategory]
    performance_scores: dict[TaskCategory, int]  # 0-100
    tier: int  # 1=最佳, 2=良好, 3=基础
    quantization: Optional[str] = None

Comprehensive model database (40+ models) - Updated January 2025

综合模型数据库(40+模型)- 2025年1月更新

MODEL_DATABASE: dict[str, ModelCapability] = { # === CODING SPECIALISTS (Tier 1) === "deepseek-v3": ModelCapability( id="deepseek-v3", family="deepseek", context_window=128000, vram_gb=48, # MoE: 685B total, 37B active categories=[TaskCategory.CODING, TaskCategory.REASONING, TaskCategory.ANALYSIS], performance_scores={ TaskCategory.CODING: 99, TaskCategory.REASONING: 97, TaskCategory.ANALYSIS: 96, TaskCategory.DOCUMENTATION: 92 }, tier=1 ), "qwen2.5-coder-32b": ModelCapability( id="qwen2.5-coder-32b", family="qwen", context_window=131072, vram_gb=22, categories=[TaskCategory.CODING, TaskCategory.ANALYSIS], performance_scores={ TaskCategory.CODING: 96, TaskCategory.REASONING: 82, TaskCategory.ANALYSIS: 92, TaskCategory.DOCUMENTATION: 88 }, tier=1 ), "deepseek-coder-v2": ModelCapability( id="deepseek-coder-v2", family="deepseek", context_window=128000, vram_gb=48, # MoE: 236B total, 21B active categories=[TaskCategory.CODING, TaskCategory.ANALYSIS, TaskCategory.REASONING], performance_scores={ TaskCategory.CODING: 95, TaskCategory.REASONING: 88, TaskCategory.ANALYSIS: 92, TaskCategory.DOCUMENTATION: 80 }, tier=1 ), "codellama-70b": ModelCapability( id="codellama-70b", family="llama", context_window=100000, vram_gb=40, categories=[TaskCategory.CODING], performance_scores={ TaskCategory.CODING: 90, TaskCategory.REASONING: 70, TaskCategory.ANALYSIS: 85, TaskCategory.DOCUMENTATION: 75 }, tier=1 ), "codellama-34b": ModelCapability( id="codellama-34b", family="llama", context_window=100000, vram_gb=20, categories=[TaskCategory.CODING], performance_scores={ TaskCategory.CODING: 85, TaskCategory.REASONING: 65, TaskCategory.ANALYSIS: 80, TaskCategory.DOCUMENTATION: 70 }, tier=2 ), "qwen2.5-coder-14b": ModelCapability( id="qwen2.5-coder-14b", family="qwen", context_window=131072, vram_gb=10, categories=[TaskCategory.CODING], performance_scores={ TaskCategory.CODING: 82, TaskCategory.REASONING: 60, TaskCategory.ANALYSIS: 75, TaskCategory.DOCUMENTATION: 70 }, tier=2 ), "starcoder2-15b": ModelCapability( id="starcoder2-15b", family="starcoder", context_window=16384, vram_gb=10, categories=[TaskCategory.CODING], performance_scores={ TaskCategory.CODING: 80, TaskCategory.REASONING: 50, TaskCategory.ANALYSIS: 70, TaskCategory.DOCUMENTATION: 60 }, tier=2 ), "deepseek-coder-6.7b": ModelCapability( id="deepseek-coder-6.7b", family="deepseek", context_window=16384, vram_gb=5, categories=[TaskCategory.CODING], performance_scores={ TaskCategory.CODING: 75, TaskCategory.REASONING: 50, TaskCategory.ANALYSIS: 65, TaskCategory.DOCUMENTATION: 55 }, tier=3 ), "codellama-7b": ModelCapability( id="codellama-7b", family="llama", context_window=16384, vram_gb=5, categories=[TaskCategory.CODING], performance_scores={ TaskCategory.CODING: 70, TaskCategory.REASONING: 45, TaskCategory.ANALYSIS: 60, TaskCategory.DOCUMENTATION: 50 }, tier=3 ),
# === REASONING SPECIALISTS ===
"deepseek-r1": ModelCapability(
    id="deepseek-r1",
    family="deepseek",
    context_window=128000,
    vram_gb=160,  # 671B total
    categories=[TaskCategory.REASONING, TaskCategory.CODING],
    performance_scores={
        TaskCategory.CODING: 92,
        TaskCategory.REASONING: 99,
        TaskCategory.ANALYSIS: 95,
        TaskCategory.DOCUMENTATION: 90
    },
    tier=1
),
"deepseek-r1-distill-70b": ModelCapability(
    id="deepseek-r1-distill-70b",
    family="deepseek",
    context_window=128000,
    vram_gb=42,
    categories=[TaskCategory.REASONING, TaskCategory.CODING],
    performance_scores={
        TaskCategory.CODING: 88,
        TaskCategory.REASONING: 94,
        TaskCategory.ANALYSIS: 90,
        TaskCategory.DOCUMENTATION: 86
    },
    tier=1
),
"qwen2.5-72b-instruct": ModelCapability(
    id="qwen2.5-72b-instruct",
    family="qwen",
    context_window=131072,
    vram_gb=48,
    categories=[TaskCategory.REASONING, TaskCategory.CODING],
    performance_scores={
        TaskCategory.CODING: 88,
        TaskCategory.REASONING: 95,
        TaskCategory.ANALYSIS: 92,
        TaskCategory.DOCUMENTATION: 94
    },
    tier=1
),
"llama-3.3-70b-instruct": ModelCapability(
    id="llama-3.3-70b-instruct",
    family="llama",
    context_window=128000,
    vram_gb=42,
    categories=[TaskCategory.REASONING, TaskCategory.CODING],
    performance_scores={
        TaskCategory.CODING: 85,
        TaskCategory.REASONING: 92,
        TaskCategory.ANALYSIS: 88,
        TaskCategory.DOCUMENTATION: 90
    },
    tier=1
),
"deepseek-r1-distill-32b": ModelCapability(
    id="deepseek-r1-distill-32b",
    family="deepseek",
    context_window=128000,
    vram_gb=22,
    categories=[TaskCategory.REASONING, TaskCategory.CODING],
    performance_scores={
        TaskCategory.CODING: 82,
        TaskCategory.REASONING: 90,
        TaskCategory.ANALYSIS: 85,
        TaskCategory.DOCUMENTATION: 82
    },
    tier=2
),
"mistral-small-24b": ModelCapability(
    id="mistral-small-24b",
    family="mistral",
    context_window=32768,
    vram_gb=16,
    categories=[TaskCategory.REASONING, TaskCategory.CODING],
    performance_scores={
        TaskCategory.CODING: 80,
        TaskCategory.REASONING: 85,
        TaskCategory.ANALYSIS: 82,
        TaskCategory.DOCUMENTATION: 84
    },
    tier=2
),
"qwen2.5-32b-instruct": ModelCapability(
    id="qwen2.5-32b-instruct",
    family="qwen",
    context_window=131072,
    vram_gb=22,
    categories=[TaskCategory.REASONING, TaskCategory.DOCUMENTATION],
    performance_scores={
        TaskCategory.CODING: 78,
        TaskCategory.REASONING: 86,
        TaskCategory.ANALYSIS: 82,
        TaskCategory.DOCUMENTATION: 88
    },
    tier=2
),
"phi-4": ModelCapability(
    id="phi-4",
    family="phi",
    context_window=16384,
    vram_gb=10,
    categories=[TaskCategory.REASONING, TaskCategory.CODING],
    performance_scores={
        TaskCategory.CODING: 82,
        TaskCategory.REASONING: 88,
        TaskCategory.ANALYSIS: 80,
        TaskCategory.DOCUMENTATION: 78
    },
    tier=2
),
"deepseek-r1-distill-14b": ModelCapability(
    id="deepseek-r1-distill-14b",
    family="deepseek",
    context_window=128000,
    vram_gb=10,
    categories=[TaskCategory.REASONING],
    performance_scores={
        TaskCategory.CODING: 75,
        TaskCategory.REASONING: 85,
        TaskCategory.ANALYSIS: 78,
        TaskCategory.DOCUMENTATION: 76
    },
    tier=2
),
"llama-3.2-11b-vision": ModelCapability(
    id="llama-3.2-11b-vision",
    family="llama",
    context_window=128000,
    vram_gb=8,
    categories=[TaskCategory.REASONING, TaskCategory.DOCUMENTATION],
    performance_scores={
        TaskCategory.CODING: 68,
        TaskCategory.REASONING: 78,
        TaskCategory.ANALYSIS: 75,
        TaskCategory.DOCUMENTATION: 80
    },
    tier=2
),
"gemma-2-27b": ModelCapability(
    id="gemma-2-27b",
    family="gemma",
    context_window=8192,
    vram_gb=18,
    categories=[TaskCategory.REASONING, TaskCategory.CODING],
    performance_scores={
        TaskCategory.CODING: 78,
        TaskCategory.REASONING: 82,
        TaskCategory.ANALYSIS: 78,
        TaskCategory.DOCUMENTATION: 80
    },
    tier=2
),
"deepseek-r1-distill-8b": ModelCapability(
    id="deepseek-r1-distill-8b",
    family="deepseek",
    context_window=128000,
    vram_gb=6,
    categories=[TaskCategory.REASONING],
    performance_scores={
        TaskCategory.CODING: 68,
        TaskCategory.REASONING: 78,
        TaskCategory.ANALYSIS: 70,
        TaskCategory.DOCUMENTATION: 68
    },
    tier=3
),
"gemma-2-9b": ModelCapability(
    id="gemma-2-9b",
    family="gemma",
    context_window=8192,
    vram_gb=7,
    categories=[TaskCategory.REASONING, TaskCategory.CODING],
    performance_scores={
        TaskCategory.CODING: 72,
        TaskCategory.REASONING: 75,
        TaskCategory.ANALYSIS: 70,
        TaskCategory.DOCUMENTATION: 74
    },
    tier=3
),
"llama-3.2-3b": ModelCapability(
    id="llama-3.2-3b",
    family="llama",
    context_window=128000,
    vram_gb=3,
    categories=[TaskCategory.REASONING],
    performance_scores={
        TaskCategory.CODING: 55,
        TaskCategory.REASONING: 65,
        TaskCategory.ANALYSIS: 58,
        TaskCategory.DOCUMENTATION: 65
    },
    tier=3
),

# === ANALYSIS SPECIALISTS (Serena Required) ===
"codellama-34b-instruct": ModelCapability(
    id="codellama-34b-instruct",
    family="llama",
    context_window=100000,
    vram_gb=20,
    categories=[TaskCategory.ANALYSIS],
    performance_scores={
        TaskCategory.CODING: 80,
        TaskCategory.REASONING: 70,
        TaskCategory.ANALYSIS: 88,
        TaskCategory.DOCUMENTATION: 75
    },
    tier=2
),

# === DOCUMENTATION SPECIALISTS ===
"mistral-nemo-12b": ModelCapability(
    id="mistral-nemo-12b",
    family="mistral",
    context_window=128000,
    vram_gb=8,
    categories=[TaskCategory.DOCUMENTATION],
    performance_scores={
        TaskCategory.CODING: 65,
        TaskCategory.REASONING: 70,
        TaskCategory.ANALYSIS: 65,
        TaskCategory.DOCUMENTATION: 82
    },
    tier=2
),
"mistral-7b": ModelCapability(
    id="mistral-7b",
    family="mistral",
    context_window=32768,
    vram_gb=5,
    categories=[TaskCategory.DOCUMENTATION],
    performance_scores={
        TaskCategory.CODING: 55,
        TaskCategory.REASONING: 60,
        TaskCategory.ANALYSIS: 55,
        TaskCategory.DOCUMENTATION: 72
    },
    tier=3
),

# === ADDITIONAL MODELS ===
"phi-3-medium": ModelCapability(
    id="phi-3-medium",
    family="phi",
    context_window=128000,
    vram_gb=8,
    categories=[TaskCategory.CODING, TaskCategory.REASONING],
    performance_scores={
        TaskCategory.CODING: 72,
        TaskCategory.REASONING: 75,
        TaskCategory.ANALYSIS: 68,
        TaskCategory.DOCUMENTATION: 70
    },
    tier=2
),
"gemma-2-27b": ModelCapability(
    id="gemma-2-27b",
    family="gemma",
    context_window=8192,
    vram_gb=18,
    categories=[TaskCategory.CODING, TaskCategory.REASONING],
    performance_scores={
        TaskCategory.CODING: 78,
        TaskCategory.REASONING: 80,
        TaskCategory.ANALYSIS: 75,
        TaskCategory.DOCUMENTATION: 78
    },
    tier=2
),
"yi-34b": ModelCapability(
    id="yi-34b",
    family="yi",
    context_window=200000,
    vram_gb=20,
    categories=[TaskCategory.REASONING, TaskCategory.DOCUMENTATION],
    performance_scores={
        TaskCategory.CODING: 72,
        TaskCategory.REASONING: 82,
        TaskCategory.ANALYSIS: 75,
        TaskCategory.DOCUMENTATION: 80
    },
    tier=2
),
"command-r-plus": ModelCapability(
    id="command-r-plus",
    family="cohere",
    context_window=128000,
    vram_gb=48,
    categories=[TaskCategory.REASONING, TaskCategory.DOCUMENTATION],
    performance_scores={
        TaskCategory.CODING: 70,
        TaskCategory.REASONING: 85,
        TaskCategory.ANALYSIS: 78,
        TaskCategory.DOCUMENTATION: 88
    },
    tier=1
),
"wizardcoder-33b": ModelCapability(
    id="wizardcoder-33b",
    family="wizard",
    context_window=16384,
    vram_gb=20,
    categories=[TaskCategory.CODING],
    performance_scores={
        TaskCategory.CODING: 85,
        TaskCategory.REASONING: 60,
        TaskCategory.ANALYSIS: 75,
        TaskCategory.DOCUMENTATION: 65
    },
    tier=2
),
"magicoder-7b": ModelCapability(
    id="magicoder-7b",
    family="magicoder",
    context_window=16384,
    vram_gb=5,
    categories=[TaskCategory.CODING],
    performance_scores={
        TaskCategory.CODING: 78,
        TaskCategory.REASONING: 50,
        TaskCategory.ANALYSIS: 65,
        TaskCategory.DOCUMENTATION: 55
    },
    tier=3
),
"dolphin-mixtral-8x7b": ModelCapability(
    id="dolphin-mixtral-8x7b",
    family="dolphin",
    context_window=32768,
    vram_gb=28,
    categories=[TaskCategory.CODING, TaskCategory.REASONING],
    performance_scores={
        TaskCategory.CODING: 75,
        TaskCategory.REASONING: 78,
        TaskCategory.ANALYSIS: 72,
        TaskCategory.DOCUMENTATION: 75
    },
    tier=2
),
"nous-hermes-2-mixtral": ModelCapability(
    id="nous-hermes-2-mixtral",
    family="nous",
    context_window=32768,
    vram_gb=28,
    categories=[TaskCategory.REASONING],
    performance_scores={
        TaskCategory.CODING: 72,
        TaskCategory.REASONING: 82,
        TaskCategory.ANALYSIS: 75,
        TaskCategory.DOCUMENTATION: 78
    },
    tier=2
),
"solar-10.7b": ModelCapability(
    id="solar-10.7b",
    family="solar",
    context_window=4096,
    vram_gb=7,
    categories=[TaskCategory.REASONING, TaskCategory.DOCUMENTATION],
    performance_scores={
        TaskCategory.CODING: 60,
        TaskCategory.REASONING: 72,
        TaskCategory.ANALYSIS: 65,
        TaskCategory.DOCUMENTATION: 75
    },
    tier=3
),
}
MODEL_DATABASE: dict[str, ModelCapability] = { # === 编码专家(Tier 1) === "deepseek-v3": ModelCapability( id="deepseek-v3", family="deepseek", context_window=128000, vram_gb=48, # MoE: 总685B,激活37B categories=[TaskCategory.CODING, TaskCategory.REASONING, TaskCategory.ANALYSIS], performance_scores={ TaskCategory.CODING: 99, TaskCategory.REASONING: 97, TaskCategory.ANALYSIS: 96, TaskCategory.DOCUMENTATION: 92 }, tier=1 ), "qwen2.5-coder-32b": ModelCapability( id="qwen2.5-coder-32b", family="qwen", context_window=131072, vram_gb=22, categories=[TaskCategory.CODING, TaskCategory.ANALYSIS], performance_scores={ TaskCategory.CODING: 96, TaskCategory.REASONING: 82, TaskCategory.ANALYSIS: 92, TaskCategory.DOCUMENTATION: 88 }, tier=1 ), "deepseek-coder-v2": ModelCapability( id="deepseek-coder-v2", family="deepseek", context_window=128000, vram_gb=48, # MoE: 总236B,激活21B categories=[TaskCategory.CODING, TaskCategory.ANALYSIS, TaskCategory.REASONING], performance_scores={ TaskCategory.CODING: 95, TaskCategory.REASONING: 88, TaskCategory.ANALYSIS: 92, TaskCategory.DOCUMENTATION: 80 }, tier=1 ), "codellama-70b": ModelCapability( id="codellama-70b", family="llama", context_window=100000, vram_gb=40, categories=[TaskCategory.CODING], performance_scores={ TaskCategory.CODING: 90, TaskCategory.REASONING: 70, TaskCategory.ANALYSIS: 85, TaskCategory.DOCUMENTATION: 75 }, tier=1 ), "codellama-34b": ModelCapability( id="codellama-34b", family="llama", context_window=100000, vram_gb=20, categories=[TaskCategory.CODING], performance_scores={ TaskCategory.CODING: 85, TaskCategory.REASONING: 65, TaskCategory.ANALYSIS: 80, TaskCategory.DOCUMENTATION: 70 }, tier=2 ), "qwen2.5-coder-14b": ModelCapability( id="qwen2.5-coder-14b", family="qwen", context_window=131072, vram_gb=10, categories=[TaskCategory.CODING], performance_scores={ TaskCategory.CODING: 82, TaskCategory.REASONING: 60, TaskCategory.ANALYSIS: 75, TaskCategory.DOCUMENTATION: 70 }, tier=2 ), "starcoder2-15b": ModelCapability( id="starcoder2-15b", family="starcoder", context_window=16384, vram_gb=10, categories=[TaskCategory.CODING], performance_scores={ TaskCategory.CODING: 80, TaskCategory.REASONING: 50, TaskCategory.ANALYSIS: 70, TaskCategory.DOCUMENTATION: 60 }, tier=2 ), "deepseek-coder-6.7b": ModelCapability( id="deepseek-coder-6.7b", family="deepseek", context_window=16384, vram_gb=5, categories=[TaskCategory.CODING], performance_scores={ TaskCategory.CODING: 75, TaskCategory.REASONING: 50, TaskCategory.ANALYSIS: 65, TaskCategory.DOCUMENTATION: 55 }, tier=3 ), "codellama-7b": ModelCapability( id="codellama-7b", family="llama", context_window=16384, vram_gb=5, categories=[TaskCategory.CODING], performance_scores={ TaskCategory.CODING: 70, TaskCategory.REASONING: 45, TaskCategory.ANALYSIS: 60, TaskCategory.DOCUMENTATION: 50 }, tier=3 ),
# === 推理专家 ===
"deepseek-r1": ModelCapability(
    id="deepseek-r1",
    family="deepseek",
    context_window=128000,
    vram_gb=160,  # 总671B
    categories=[TaskCategory.REASONING, TaskCategory.CODING],
    performance_scores={
        TaskCategory.CODING: 92,
        TaskCategory.REASONING: 99,
        TaskCategory.ANALYSIS: 95,
        TaskCategory.DOCUMENTATION: 90
    },
    tier=1
),
"deepseek-r1-distill-70b": ModelCapability(
    id="deepseek-r1-distill-70b",
    family="deepseek",
    context_window=128000,
    vram_gb=42,
    categories=[TaskCategory.REASONING, TaskCategory.CODING],
    performance_scores={
        TaskCategory.CODING: 88,
        TaskCategory.REASONING: 94,
        TaskCategory.ANALYSIS: 90,
        TaskCategory.DOCUMENTATION: 86
    },
    tier=1
),
"qwen2.5-72b-instruct": ModelCapability(
    id="qwen2.5-72b-instruct",
    family="qwen",
    context_window=131072,
    vram_gb=48,
    categories=[TaskCategory.REASONING, TaskCategory.CODING],
    performance_scores={
        TaskCategory.CODING: 88,
        TaskCategory.REASONING: 95,
        TaskCategory.ANALYSIS: 92,
        TaskCategory.DOCUMENTATION: 94
    },
    tier=1
),
"llama-3.3-70b-instruct": ModelCapability(
    id="llama-3.3-70b-instruct",
    family="llama",
    context_window=128000,
    vram_gb=42,
    categories=[TaskCategory.REASONING, TaskCategory.CODING],
    performance_scores={
        TaskCategory.CODING: 85,
        TaskCategory.REASONING: 92,
        TaskCategory.ANALYSIS: 88,
        TaskCategory.DOCUMENTATION: 90
    },
    tier=1
),
"deepseek-r1-distill-32b": ModelCapability(
    id="deepseek-r1-distill-32b",
    family="deepseek",
    context_window=128000,
    vram_gb=22,
    categories=[TaskCategory.REASONING, TaskCategory.CODING],
    performance_scores={
        TaskCategory.CODING: 82,
        TaskCategory.REASONING: 90,
        TaskCategory.ANALYSIS: 85,
        TaskCategory.DOCUMENTATION: 82
    },
    tier=2
),
"mistral-small-24b": ModelCapability(
    id="mistral-small-24b",
    family="mistral",
    context_window=32768,
    vram_gb=16,
    categories=[TaskCategory.REASONING, TaskCategory.CODING],
    performance_scores={
        TaskCategory.CODING: 80,
        TaskCategory.REASONING: 85,
        TaskCategory.ANALYSIS: 82,
        TaskCategory.DOCUMENTATION: 84
    },
    tier=2
),
"qwen2.5-32b-instruct": ModelCapability(
    id="qwen2.5-32b-instruct",
    family="qwen",
    context_window=131072,
    vram_gb=22,
    categories=[TaskCategory.REASONING, TaskCategory.DOCUMENTATION],
    performance_scores={
        TaskCategory.CODING: 78,
        TaskCategory.REASONING: 86,
        TaskCategory.ANALYSIS: 82,
        TaskCategory.DOCUMENTATION: 88
    },
    tier=2
),
"phi-4": ModelCapability(
    id="phi-4",
    family="phi",
    context_window=16384,
    vram_gb=10,
    categories=[TaskCategory.REASONING, TaskCategory.CODING],
    performance_scores={
        TaskCategory.CODING: 82,
        TaskCategory.REASONING: 88,
        TaskCategory.ANALYSIS: 80,
        TaskCategory.DOCUMENTATION: 78
    },
    tier=2
),
"deepseek-r1-distill-14b": ModelCapability(
    id="deepseek-r1-distill-14b",
    family="deepseek",
    context_window=128000,
    vram_gb=10,
    categories=[TaskCategory.REASONING],
    performance_scores={
        TaskCategory.CODING: 75,
        TaskCategory.REASONING: 85,
        TaskCategory.ANALYSIS: 78,
        TaskCategory.DOCUMENTATION: 76
    },
    tier=2
),
"llama-3.2-11b-vision": ModelCapability(
    id="llama-3.2-11b-vision",
    family="llama",
    context_window=128000,
    vram_gb=8,
    categories=[TaskCategory.REASONING, TaskCategory.DOCUMENTATION],
    performance_scores={
        TaskCategory.CODING: 68,
        TaskCategory.REASONING: 78,
        TaskCategory.ANALYSIS: 75,
        TaskCategory.DOCUMENTATION: 80
    },
    tier=2
),
"gemma-2-27b": ModelCapability(
    id="gemma-2-27b",
    family="gemma",
    context_window=8192,
    vram_gb=18,
    categories=[TaskCategory.REASONING, TaskCategory.CODING],
    performance_scores={
        TaskCategory.CODING: 78,
        TaskCategory.REASONING: 82,
        TaskCategory.ANALYSIS: 78,
        TaskCategory.DOCUMENTATION: 80
    },
    tier=2
),
"deepseek-r1-distill-8b": ModelCapability(
    id="deepseek-r1-distill-8b",
    family="deepseek",
    context_window=128000,
    vram_gb=6,
    categories=[TaskCategory.REASONING],
    performance_scores={
        TaskCategory.CODING: 68,
        TaskCategory.REASONING: 78,
        TaskCategory.ANALYSIS: 70,
        TaskCategory.DOCUMENTATION: 68
    },
    tier=3
),
"gemma-2-9b": ModelCapability(
    id="gemma-2-9b",
    family="gemma",
    context_window=8192,
    vram_gb=7,
    categories=[TaskCategory.REASONING, TaskCategory.CODING],
    performance_scores={
        TaskCategory.CODING: 72,
        TaskCategory.REASONING: 75,
        TaskCategory.ANALYSIS: 70,
        TaskCategory.DOCUMENTATION: 74
    },
    tier=3
),
"llama-3.2-3b": ModelCapability(
    id="llama-3.2-3b",
    family="llama",
    context_window=128000,
    vram_gb=3,
    categories=[TaskCategory.REASONING],
    performance_scores={
        TaskCategory.CODING: 55,
        TaskCategory.REASONING: 65,
        TaskCategory.ANALYSIS: 58,
        TaskCategory.DOCUMENTATION: 65
    },
    tier=3
),

# === 分析专家(需要Serena) ===
"codellama-34b-instruct": ModelCapability(
    id="codellama-34b-instruct",
    family="llama",
    context_window=100000,
    vram_gb=20,
    categories=[TaskCategory.ANALYSIS],
    performance_scores={
        TaskCategory.CODING: 80,
        TaskCategory.REASONING: 70,
        TaskCategory.ANALYSIS: 88,
        TaskCategory.DOCUMENTATION: 75
    },
    tier=2
),

# === 文档专家 ===
"mistral-nemo-12b": ModelCapability(
    id="mistral-nemo-12b",
    family="mistral",
    context_window=128000,
    vram_gb=8,
    categories=[TaskCategory.DOCUMENTATION],
    performance_scores={
        TaskCategory.CODING: 65,
        TaskCategory.REASONING: 70,
        TaskCategory.ANALYSIS: 65,
        TaskCategory.DOCUMENTATION: 82
    },
    tier=2
),
"mistral-7b": ModelCapability(
    id="mistral-7b",
    family="mistral",
    context_window=32768,
    vram_gb=5,
    categories=[TaskCategory.DOCUMENTATION],
    performance_scores={
        TaskCategory.CODING: 55,
        TaskCategory.REASONING: 60,
        TaskCategory.ANALYSIS: 55,
        TaskCategory.DOCUMENTATION: 72
    },
    tier=3
),

# === 其他模型 ===
"phi-3-medium": ModelCapability(
    id="phi-3-medium",
    family="phi",
    context_window=128000,
    vram_gb=8,
    categories=[TaskCategory.CODING, TaskCategory.REASONING],
    performance_scores={
        TaskCategory.CODING: 72,
        TaskCategory.REASONING: 75,
        TaskCategory.ANALYSIS: 68,
        TaskCategory.DOCUMENTATION: 70
    },
    tier=2
),
"gemma-2-27b": ModelCapability(
    id="gemma-2-27b",
    family="gemma",
    context_window=8192,
    vram_gb=18,
    categories=[TaskCategory.CODING, TaskCategory.REASONING],
    performance_scores={
        TaskCategory.CODING: 78,
        TaskCategory.REASONING: 80,
        TaskCategory.ANALYSIS: 75,
        TaskCategory.DOCUMENTATION: 78
    },
    tier=2
),
"yi-34b": ModelCapability(
    id="yi-34b",
    family="yi",
    context_window=200000,
    vram_gb=20,
    categories=[TaskCategory.REASONING, TaskCategory.DOCUMENTATION],
    performance_scores={
        TaskCategory.CODING: 72,
        TaskCategory.REASONING: 82,
        TaskCategory.ANALYSIS: 75,
        TaskCategory.DOCUMENTATION: 80
    },
    tier=2
),
"command-r-plus": ModelCapability(
    id="command-r-plus",
    family="cohere",
    context_window=128000,
    vram_gb=48,
    categories=[TaskCategory.REASONING, TaskCategory.DOCUMENTATION],
    performance_scores={
        TaskCategory.CODING: 70,
        TaskCategory.REASONING: 85,
        TaskCategory.ANALYSIS: 78,
        TaskCategory.DOCUMENTATION: 88
    },
    tier=1
),
"wizardcoder-33b": ModelCapability(
    id="wizardcoder-33b",
    family="wizard",
    context_window=16384,
    vram_gb=20,
    categories=[TaskCategory.CODING],
    performance_scores={
        TaskCategory.CODING: 85,
        TaskCategory.REASONING: 60,
        TaskCategory.ANALYSIS: 75,
        TaskCategory.DOCUMENTATION: 65
    },
    tier=2
),
"magicoder-7b": ModelCapability(
    id="magicoder-7b",
    family="magicoder",
    context_window=16384,
    vram_gb=5,
    categories=[TaskCategory.CODING],
    performance_scores={
        TaskCategory.CODING: 78,
        TaskCategory.REASONING: 50,
        TaskCategory.ANALYSIS: 65,
        TaskCategory.DOCUMENTATION: 55
    },
    tier=3
),
"dolphin-mixtral-8x7b": ModelCapability(
    id="dolphin-mixtral-8x7b",
    family="dolphin",
    context_window=32768,
    vram_gb=28,
    categories=[TaskCategory.CODING, TaskCategory.REASONING],
    performance_scores={
        TaskCategory.CODING: 75,
        TaskCategory.REASONING: 78,
        TaskCategory.ANALYSIS: 72,
        TaskCategory.DOCUMENTATION: 75
    },
    tier=2
),
"nous-hermes-2-mixtral": ModelCapability(
    id="nous-hermes-2-mixtral",
    family="nous",
    context_window=32768,
    vram_gb=28,
    categories=[TaskCategory.REASONING],
    performance_scores={
        TaskCategory.CODING: 72,
        TaskCategory.REASONING: 82,
        TaskCategory.ANALYSIS: 75,
        TaskCategory.DOCUMENTATION: 78
    },
    tier=2
),
"solar-10.7b": ModelCapability(
    id="solar-10.7b",
    family="solar",
    context_window=4096,
    vram_gb=7,
    categories=[TaskCategory.REASONING, TaskCategory.DOCUMENTATION],
    performance_scores={
        TaskCategory.CODING: 60,
        TaskCategory.REASONING: 72,
        TaskCategory.ANALYSIS: 65,
        TaskCategory.DOCUMENTATION: 75
    },
    tier=3
),
}

Task-to-model priority mapping (Updated January 2025)

任务到模型的优先级映射(2025年1月更新)

TASK_MODEL_PRIORITY = { TaskCategory.CODING: [ # Tier 1 - Best "deepseek-v3", "qwen2.5-coder-32b", "deepseek-coder-v2", # Tier 2 - Good "codellama-70b", "qwen2.5-coder-14b", "codellama-34b", "starcoder2-15b", "phi-4", # Tier 3 - Basic "qwen2.5-coder-7b", "codellama-7b", "deepseek-coder-6.7b" ], TaskCategory.REASONING: [ # Tier 1 - Best "deepseek-r1", "deepseek-v3", "deepseek-r1-distill-70b", "qwen2.5-72b-instruct", "llama-3.3-70b-instruct", # Tier 2 - Good "deepseek-r1-distill-32b", "mistral-small-24b", "qwen2.5-32b-instruct", "phi-4", "gemma-2-27b", # Tier 3 - Basic "deepseek-r1-distill-14b", "deepseek-r1-distill-8b", "gemma-2-9b" ], TaskCategory.ANALYSIS: [ # Requires Serena LSP "deepseek-v3", "qwen2.5-coder-32b", "deepseek-coder-v2", "codellama-34b-instruct", "qwen2.5-72b-instruct" ], TaskCategory.DOCUMENTATION: [ "qwen2.5-72b-instruct", "llama-3.3-70b-instruct", "qwen2.5-32b-instruct", "mistral-small-24b", "mistral-nemo-12b", "gemma-2-27b" ], }
undefined
TASK_MODEL_PRIORITY = { TaskCategory.CODING: [ # Tier 1 - 最佳 "deepseek-v3", "qwen2.5-coder-32b", "deepseek-coder-v2", # Tier 2 - 良好 "codellama-70b", "qwen2.5-coder-14b", "codellama-34b", "starcoder2-15b", "phi-4", # Tier 3 - 基础 "qwen2.5-coder-7b", "codellama-7b", "deepseek-coder-6.7b" ], TaskCategory.REASONING: [ # Tier 1 - 最佳 "deepseek-r1", "deepseek-v3", "deepseek-r1-distill-70b", "qwen2.5-72b-instruct", "llama-3.3-70b-instruct", # Tier 2 - 良好 "deepseek-r1-distill-32b", "mistral-small-24b", "qwen2.5-32b-instruct", "phi-4", "gemma-2-27b", # Tier 3 - 基础 "deepseek-r1-distill-14b", "deepseek-r1-distill-8b", "gemma-2-9b" ], TaskCategory.ANALYSIS: [ # 需要Serena LSP "deepseek-v3", "qwen2.5-coder-32b", "deepseek-coder-v2", "codellama-34b-instruct", "qwen2.5-72b-instruct" ], TaskCategory.DOCUMENTATION: [ "qwen2.5-72b-instruct", "llama-3.3-70b-instruct", "qwen2.5-32b-instruct", "mistral-small-24b", "mistral-nemo-12b", "gemma-2-27b" ], }
undefined

Model Selection Logic

模型选择逻辑

python
from typing import Optional

class ModelSelector:
    """Select optimal model for task based on availability and requirements."""

    def __init__(self, available_models: list[str]):
        self.available = set(m.lower() for m in available_models)

    def select(
        self,
        category: TaskCategory,
        required_context: int = 0,
        max_vram_gb: Optional[float] = None
    ) -> Optional[str]:
        """Select best available model for task category."""

        # Get priority list for category
        priority_list = TASK_MODEL_PRIORITY.get(category, [])

        for model_id in priority_list:
            # Check if model is available
            if not self._is_available(model_id):
                continue

            # Check model capability
            capability = MODEL_DATABASE.get(model_id)
            if not capability:
                continue

            # Check context window requirement
            if required_context > 0 and capability.context_window < required_context:
                continue

            # Check VRAM constraint
            if max_vram_gb and capability.vram_gb > max_vram_gb:
                continue

            return model_id

        # Fallback: return any available model
        for model_id, capability in MODEL_DATABASE.items():
            if self._is_available(model_id):
                return model_id

        return None

    def _is_available(self, model_id: str) -> bool:
        """Check if model is available (fuzzy matching)."""
        model_lower = model_id.lower()

        # Exact match
        if model_lower in self.available:
            return True

        # Partial match (model name contained in available)
        for avail in self.available:
            if model_lower in avail or avail in model_lower:
                return True

        return False

    def get_fallback_models(self, category: TaskCategory) -> list[str]:
        """Get list of fallback models for category."""
        priority_list = TASK_MODEL_PRIORITY.get(category, [])

        available_in_priority = [
            m for m in priority_list if self._is_available(m)
        ]

        # Return tier 2 and 3 models as fallbacks
        fallbacks = []
        for model_id in available_in_priority:
            capability = MODEL_DATABASE.get(model_id)
            if capability and capability.tier >= 2:
                fallbacks.append(model_id)

        return fallbacks
python
from typing import Optional

class ModelSelector:
    """根据可用性和需求为任务选择最优模型。"""

    def __init__(self, available_models: list[str]):
        self.available = set(m.lower() for m in available_models)

    def select(
        self,
        category: TaskCategory,
        required_context: int = 0,
        max_vram_gb: Optional[float] = None
    ) -> Optional[str]:
        """为任务类别选择最佳可用模型。"""

        # 获取类别的优先级列表
        priority_list = TASK_MODEL_PRIORITY.get(category, [])

        for model_id in priority_list:
            # 检查模型是否可用
            if not self._is_available(model_id):
                continue

            # 检查模型能力
            capability = MODEL_DATABASE.get(model_id)
            if not capability:
                continue

            # 检查上下文窗口需求
            if required_context > 0 and capability.context_window < required_context:
                continue

            # 检查VRAM限制
            if max_vram_gb and capability.vram_gb > max_vram_gb:
                continue

            return model_id

        # 回退:返回任何可用模型
        for model_id, capability in MODEL_DATABASE.items():
            if self._is_available(model_id):
                return model_id

        return None

    def _is_available(self, model_id: str) -> bool:
        """检查模型是否可用(模糊匹配)。"""
        model_lower = model_id.lower()

        # 精确匹配
        if model_lower in self.available:
            return True

        # 部分匹配(模型名称包含在可用模型中)
        for avail in self.available:
            if model_lower in avail or avail in model_lower:
                return True

        return False

    def get_fallback_models(self, category: TaskCategory) -> list[str]:
        """获取类别的回退模型列表。"""
        priority_list = TASK_MODEL_PRIORITY.get(category, [])

        available_in_priority = [
            m for m in priority_list if self._is_available(m)
        ]

        # 返回Tier 2和3模型作为回退
        fallbacks = []
        for model_id in available_in_priority:
            capability = MODEL_DATABASE.get(model_id)
            if capability and capability.tier >= 2:
                fallbacks.append(model_id)

        return fallbacks

Context Management

上下文管理

Token Counting

Token计数

python
from abc import ABC, abstractmethod
import re

class TokenCounter(ABC):
    """Base class for token counting."""

    @abstractmethod
    def count(self, text: str) -> int:
        pass

class EstimationCounter(TokenCounter):
    """Estimation-based token counter (no external dependencies)."""

    def __init__(self, chars_per_token: float = 4.0):
        self.chars_per_token = chars_per_token

    def count(self, text: str) -> int:
        return int(len(text) / self.chars_per_token)

class QwenCounter(TokenCounter):
    """Token counter for Qwen models."""

    def count(self, text: str) -> int:
        # Qwen uses slightly different tokenization
        return int(len(text) / 3.5)

class LlamaCounter(TokenCounter):
    """Token counter for Llama models."""

    def count(self, text: str) -> int:
        # Llama uses SentencePiece
        return int(len(text) / 3.8)
python
from abc import ABC, abstractmethod
import re

class TokenCounter(ABC):
    """Token计数基类。"""

    @abstractmethod
    def count(self, text: str) -> int:
        pass

class EstimationCounter(TokenCounter):
    """基于估算的Token计数器(无外部依赖)。"""

    def __init__(self, chars_per_token: float = 4.0):
        self.chars_per_token = chars_per_token

    def count(self, text: str) -> int:
        return int(len(text) / self.chars_per_token)

class QwenCounter(TokenCounter):
    """Qwen模型的Token计数器。"""

    def count(self, text: str) -> int:
        # Qwen使用略有不同的Token化方式
        return int(len(text) / 3.5)

class LlamaCounter(TokenCounter):
    """Llama模型的Token计数器。"""

    def count(self, text: str) -> int:
        # Llama使用SentencePiece
        return int(len(text) / 3.8)

Model family to counter mapping

模型系列到计数器的映射

TOKEN_COUNTERS = { "qwen": QwenCounter(), "deepseek": EstimationCounter(4.0), "llama": LlamaCounter(), "mistral": EstimationCounter(4.0), "mixtral": EstimationCounter(4.0), "default": EstimationCounter(4.0), }
def get_token_counter(model_id: str) -> TokenCounter: """Get appropriate token counter for model.""" capability = MODEL_DATABASE.get(model_id) if capability: return TOKEN_COUNTERS.get(capability.family, TOKEN_COUNTERS["default"]) return TOKEN_COUNTERS["default"]
undefined
TOKEN_COUNTERS = { "qwen": QwenCounter(), "deepseek": EstimationCounter(4.0), "llama": LlamaCounter(), "mistral": EstimationCounter(4.0), "mixtral": EstimationCounter(4.0), "default": EstimationCounter(4.0), }
def get_token_counter(model_id: str) -> TokenCounter: """为模型获取合适的Token计数器。""" capability = MODEL_DATABASE.get(model_id) if capability: return TOKEN_COUNTERS.get(capability.family, TOKEN_COUNTERS["default"]) return TOKEN_COUNTERS["default"]
undefined

Context Manager

上下文管理器

python
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional

@dataclass
class Message:
    role: str  # 'system', 'user', 'assistant', 'tool'
    content: str
    timestamp: datetime = field(default_factory=datetime.now)
    token_count: int = 0
    metadata: dict = field(default_factory=dict)

@dataclass
class ConversationContext:
    session_id: str
    messages: list[Message] = field(default_factory=list)
    total_tokens: int = 0
    system_prompt: str = ""
    system_prompt_tokens: int = 0
    active_model: str = ""
    model_history: list[str] = field(default_factory=list)
    compaction_count: int = 0

class ContextManager:
    """Manage conversation context with compaction support."""

    def __init__(
        self,
        session_id: str,
        system_prompt: str = "",
        compaction_threshold: float = 0.8,  # 80% of context window
        compaction_target: float = 0.5,      # Compact to 50%
        preserve_recent: int = 10            # Keep last N messages
    ):
        self.context = ConversationContext(
            session_id=session_id,
            system_prompt=system_prompt
        )
        self.compaction_threshold = compaction_threshold
        self.compaction_target = compaction_target
        self.preserve_recent = preserve_recent
        self._counter: Optional[TokenCounter] = None

    def set_model(self, model_id: str):
        """Set active model and update token counter."""
        if self.context.active_model:
            self.context.model_history.append(self.context.active_model)
        self.context.active_model = model_id
        self._counter = get_token_counter(model_id)

        # Recount all tokens with new counter
        self._recount_tokens()

    def add_message(self, role: str, content: str, metadata: dict = None):
        """Add message to context."""
        token_count = self._counter.count(content) if self._counter else 0

        message = Message(
            role=role,
            content=content,
            token_count=token_count,
            metadata=metadata or {}
        )

        self.context.messages.append(message)
        self.context.total_tokens += token_count

    def check_and_compact(self, max_tokens: int) -> bool:
        """Check if compaction needed and perform if so."""
        threshold = int(max_tokens * self.compaction_threshold)

        if self.context.total_tokens > threshold:
            self._compact(max_tokens)
            return True
        return False

    def _compact(self, max_tokens: int):
        """Compact context to target size."""
        target = int(max_tokens * self.compaction_target)

        # Step 1: Truncate large tool outputs
        for msg in self.context.messages:
            if msg.role == 'tool' and msg.token_count > 500:
                original = msg.token_count
                msg.content = f"[Tool output truncated - {msg.metadata.get('tool_name', 'unknown')}]"
                msg.token_count = self._counter.count(msg.content)
                msg.metadata['truncated'] = True
                msg.metadata['original_tokens'] = original

        self._recalculate_total()

        if self.context.total_tokens <= target:
            return

        # Step 2: Summarize older messages
        if len(self.context.messages) > self.preserve_recent:
            older = self.context.messages[:-self.preserve_recent]
            recent = self.context.messages[-self.preserve_recent:]

            # Create summary of older messages
            summary = self._create_summary(older)
            summary_msg = Message(
                role='system',
                content=f"[Previous conversation summary]\n{summary}",
                token_count=self._counter.count(summary),
                metadata={'compacted': True}
            )

            self.context.messages = [summary_msg] + recent
            self.context.compaction_count += 1

        self._recalculate_total()

    def _create_summary(self, messages: list[Message]) -> str:
        """Create summary of messages (simple implementation)."""
        # In production, this would use a lightweight LLM
        key_points = []

        for msg in messages:
            if msg.role == 'user':
                # Extract first sentence of user queries
                first_sentence = msg.content.split('.')[0][:100]
                key_points.append(f"- User asked: {first_sentence}")
            elif msg.role == 'assistant' and len(key_points) < 10:
                # Extract key decisions/results
                if 'created' in msg.content.lower() or 'implemented' in msg.content.lower():
                    first_sentence = msg.content.split('.')[0][:100]
                    key_points.append(f"- Assistant: {first_sentence}")

        return "\n".join(key_points[:10])

    def _recount_tokens(self):
        """Recount all tokens with current counter."""
        if not self._counter:
            return

        self.context.system_prompt_tokens = self._counter.count(self.context.system_prompt)
        for msg in self.context.messages:
            msg.token_count = self._counter.count(msg.content)
        self._recalculate_total()

    def _recalculate_total(self):
        """Recalculate total token count."""
        self.context.total_tokens = (
            self.context.system_prompt_tokens +
            sum(m.token_count for m in self.context.messages)
        )

    def export_for_api(self) -> list[dict]:
        """Export messages in API format."""
        messages = []

        if self.context.system_prompt:
            messages.append({
                "role": "system",
                "content": self.context.system_prompt
            })

        for msg in self.context.messages:
            messages.append({
                "role": msg.role,
                "content": msg.content
            })

        return messages

    def prepare_handoff(self, new_model: str) -> "ContextManager":
        """Prepare context for model switch."""
        self.set_model(new_model)
        return self
python
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional

@dataclass
class Message:
    role: str  # 'system', 'user', 'assistant', 'tool'
    content: str
    timestamp: datetime = field(default_factory=datetime.now)
    token_count: int = 0
    metadata: dict = field(default_factory=dict)

@dataclass
class ConversationContext:
    session_id: str
    messages: list[Message] = field(default_factory=list)
    total_tokens: int = 0
    system_prompt: str = ""
    system_prompt_tokens: int = 0
    active_model: str = ""
    model_history: list[str] = field(default_factory=list)
    compaction_count: int = 0

class ContextManager:
    """管理对话上下文,支持压缩。"""

    def __init__(
        self,
        session_id: str,
        system_prompt: str = "",
        compaction_threshold: float = 0.8,  # 上下文窗口的80%
        compaction_target: float = 0.5,      # 压缩到50%
        preserve_recent: int = 10            # 保留最后N条消息
    ):
        self.context = ConversationContext(
            session_id=session_id,
            system_prompt=system_prompt
        )
        self.compaction_threshold = compaction_threshold
        self.compaction_target = compaction_target
        self.preserve_recent = preserve_recent
        self._counter: Optional[TokenCounter] = None

    def set_model(self, model_id: str):
        """设置激活模型并更新Token计数器。"""
        if self.context.active_model:
            self.context.model_history.append(self.context.active_model)
        self.context.active_model = model_id
        self._counter = get_token_counter(model_id)

        # 使用新计数器重新计算所有Token
        self._recount_tokens()

    def add_message(self, role: str, content: str, metadata: dict = None):
        """添加消息到上下文。"""
        token_count = self._counter.count(content) if self._counter else 0

        message = Message(
            role=role,
            content=content,
            token_count=token_count,
            metadata=metadata or {}
        )

        self.context.messages.append(message)
        self.context.total_tokens += token_count

    def check_and_compact(self, max_tokens: int) -> bool:
        """检查是否需要压缩并执行压缩。"""
        threshold = int(max_tokens * self.compaction_threshold)

        if self.context.total_tokens > threshold:
            self._compact(max_tokens)
            return True
        return False

    def _compact(self, max_tokens: int):
        """将上下文压缩到目标大小。"""
        target = int(max_tokens * self.compaction_target)

        # 步骤1:截断大型工具输出
        for msg in self.context.messages:
            if msg.role == 'tool' and msg.token_count > 500:
                original = msg.token_count
                msg.content = f"[工具输出已截断 - {msg.metadata.get('tool_name', 'unknown')}]"
                msg.token_count = self._counter.count(msg.content)
                msg.metadata['truncated'] = True
                msg.metadata['original_tokens'] = original

        self._recalculate_total()

        if self.context.total_tokens <= target:
            return

        # 步骤2:总结旧消息
        if len(self.context.messages) > self.preserve_recent:
            older = self.context.messages[:-self.preserve_recent]
            recent = self.context.messages[-self.preserve_recent:]

            # 创建旧消息的摘要
            summary = self._create_summary(older)
            summary_msg = Message(
                role='system',
                content=f"[之前对话摘要]\n{summary}",
                token_count=self._counter.count(summary),
                metadata={'compacted': True}
            )

            self.context.messages = [summary_msg] + recent
            self.context.compaction_count += 1

        self._recalculate_total()

    def _create_summary(self, messages: list[Message]) -> str:
        """创建消息摘要(简单实现)。"""
        # 生产环境中,这会使用轻量级LLM
        key_points = []

        for msg in messages:
            if msg.role == 'user':
                # 提取用户请求的第一句话
                first_sentence = msg.content.split('.')[0][:100]
                key_points.append(f"- 用户询问: {first_sentence}")
            elif msg.role == 'assistant' and len(key_points) < 10:
                # 提取关键决策/结果
                if 'created' in msg.content.lower() or 'implemented' in msg.content.lower():
                    first_sentence = msg.content.split('.')[0][:100]
                    key_points.append(f"- 助手回复: {first_sentence}")

        return "\n".join(key_points[:10])

    def _recount_tokens(self):
        """使用当前计数器重新计算所有Token。"""
        if not self._counter:
            return

        self.context.system_prompt_tokens = self._counter.count(self.context.system_prompt)
        for msg in self.context.messages:
            msg.token_count = self._counter.count(msg.content)
        self._recalculate_total()

    def _recalculate_total(self):
        """重新计算总Token数。"""
        self.context.total_tokens = (
            self.context.system_prompt_tokens +
            sum(m.token_count for m in self.context.messages)
        )

    def export_for_api(self) -> list[dict]:
        """以API格式导出消息。"""
        messages = []

        if self.context.system_prompt:
            messages.append({
                "role": "system",
                "content": self.context.system_prompt
            })

        for msg in self.context.messages:
            messages.append({
                "role": msg.role,
                "content": msg.content
            })

        return messages

    def prepare_handoff(self, new_model: str) -> "ContextManager":
        """为模型切换准备上下文。"""
        self.set_model(new_model)
        return self

Configuration

配置

Inline Configuration Schema

内联配置Schema

python
from dataclasses import dataclass, field
from typing import Optional

@dataclass
class ServiceConfig:
    """Configuration for a single LLM service."""
    enabled: bool = True
    endpoint: str = ""
    priority: int = 1
    timeout: int = 30000
    max_retries: int = 3
    api_style: str = "openai"

@dataclass
class TaskRoutingConfig:
    """Configuration for task routing."""
    primary_models: list[str] = field(default_factory=list)
    fallback_models: list[str] = field(default_factory=list)
    min_context: int = 8192
    require_serena: bool = False

@dataclass
class SecurityConfig:
    """Security configuration for air-gapped networks."""
    allow_external: bool = False
    allowed_hosts: list[str] = field(default_factory=lambda: [
        "localhost", "127.0.0.1", "host.docker.internal"
    ])
    allowed_cidrs: list[str] = field(default_factory=lambda: [
        "192.168.0.0/16", "10.0.0.0/8", "172.16.0.0/12"
    ])
    audit_enabled: bool = True
    audit_log_path: str = "./audit.log"
    log_queries: bool = True
    log_responses: bool = False  # Don't log sensitive responses
    verify_checksums: bool = True

@dataclass
class ContextConfig:
    """Context management configuration."""
    compaction_threshold: float = 0.8
    compaction_target: float = 0.5
    preserve_recent_messages: int = 10
    preserve_recent_tool_calls: int = 5
    max_tool_output_tokens: int = 500

@dataclass
class RouterConfig:
    """Complete router configuration."""
    # Services
    ollama: ServiceConfig = field(default_factory=lambda: ServiceConfig(
        endpoint="http://localhost:11434",
        priority=1
    ))
    lmstudio: ServiceConfig = field(default_factory=lambda: ServiceConfig(
        endpoint="http://localhost:1234",
        priority=2
    ))
    jan: ServiceConfig = field(default_factory=lambda: ServiceConfig(
        endpoint="http://localhost:1337",
        priority=3
    ))
    custom_endpoints: list[dict] = field(default_factory=list)

    # Task routing (Updated January 2025)
    coding: TaskRoutingConfig = field(default_factory=lambda: TaskRoutingConfig(
        primary_models=["deepseek-v3", "qwen2.5-coder-32b", "deepseek-coder-v2"],
        fallback_models=["codellama-34b", "qwen2.5-coder-14b", "phi-4"],
        min_context=8192
    ))
    reasoning: TaskRoutingConfig = field(default_factory=lambda: TaskRoutingConfig(
        primary_models=["deepseek-r1", "deepseek-v3", "qwen2.5-72b-instruct"],
        fallback_models=["deepseek-r1-distill-32b", "mistral-small-24b"],
        min_context=16384
    ))
    analysis: TaskRoutingConfig = field(default_factory=lambda: TaskRoutingConfig(
        primary_models=["deepseek-v3", "qwen2.5-coder-32b"],
        fallback_models=["codellama-34b-instruct", "qwen2.5-72b-instruct"],
        min_context=16384,
        require_serena=True
    ))
    documentation: TaskRoutingConfig = field(default_factory=lambda: TaskRoutingConfig(
        primary_models=["qwen2.5-72b-instruct", "llama-3.3-70b-instruct"],
        fallback_models=["qwen2.5-32b-instruct", "mistral-nemo-12b"],
        min_context=8192
    ))

    # Serena
    serena_enabled: bool = True
    serena_priority: str = "always_first"

    # Context
    context: ContextConfig = field(default_factory=ContextConfig)

    # Security
    security: SecurityConfig = field(default_factory=SecurityConfig)
python
from dataclasses import dataclass, field
from typing import Optional

@dataclass
class ServiceConfig:
    """单个LLM服务的配置。"""
    enabled: bool = True
    endpoint: str = ""
    priority: int = 1
    timeout: int = 30000
    max_retries: int = 3
    api_style: str = "openai"

@dataclass
class TaskRoutingConfig:
    """任务路由配置。"""
    primary_models: list[str] = field(default_factory=list)
    fallback_models: list[str] = field(default_factory=list)
    min_context: int = 8192
    require_serena: bool = False

@dataclass
class SecurityConfig:
    """气隙网络的安全配置。"""
    allow_external: bool = False
    allowed_hosts: list[str] = field(default_factory=lambda: [
        "localhost", "127.0.0.1", "host.docker.internal"
    ])
    allowed_cidrs: list[str] = field(default_factory=lambda: [
        "192.168.0.0/16", "10.0.0.0/8", "172.16.0.0/12"
    ])
    audit_enabled: bool = True
    audit_log_path: str = "./audit.log"
    log_queries: bool = True
    log_responses: bool = False  # 不记录敏感响应
    verify_checksums: bool = True

@dataclass
class ContextConfig:
    """上下文管理配置。"""
    compaction_threshold: float = 0.8
    compaction_target: float = 0.5
    preserve_recent_messages: int = 10
    preserve_recent_tool_calls: int = 5
    max_tool_output_tokens: int = 500

@dataclass
class RouterConfig:
    """完整的路由配置。"""
    # 服务
    ollama: ServiceConfig = field(default_factory=lambda: ServiceConfig(
        endpoint="http://localhost:11434",
        priority=1
    ))
    lmstudio: ServiceConfig = field(default_factory=lambda: ServiceConfig(
        endpoint="http://localhost:1234",
        priority=2
    ))
    jan: ServiceConfig = field(default_factory=lambda: ServiceConfig(
        endpoint="http://localhost:1337",
        priority=3
    ))
    custom_endpoints: list[dict] = field(default_factory=list)

    # 任务路由(2025年1月更新)
    coding: TaskRoutingConfig = field(default_factory=lambda: TaskRoutingConfig(
        primary_models=["deepseek-v3", "qwen2.5-coder-32b", "deepseek-coder-v2"],
        fallback_models=["codellama-34b", "qwen2.5-coder-14b", "phi-4"],
        min_context=8192
    ))
    reasoning: TaskRoutingConfig = field(default_factory=lambda: TaskRoutingConfig(
        primary_models=["deepseek-r1", "deepseek-v3", "qwen2.5-72b-instruct"],
        fallback_models=["deepseek-r1-distill-32b", "mistral-small-24b"],
        min_context=16384
    ))
    analysis: TaskRoutingConfig = field(default_factory=lambda: TaskRoutingConfig(
        primary_models=["deepseek-v3", "qwen2.5-coder-32b"],
        fallback_models=["codellama-34b-instruct", "qwen2.5-72b-instruct"],
        min_context=16384,
        require_serena=True
    ))
    documentation: TaskRoutingConfig = field(default_factory=lambda: TaskRoutingConfig(
        primary_models=["qwen2.5-72b-instruct", "llama-3.3-70b-instruct"],
        fallback_models=["qwen2.5-32b-instruct", "mistral-nemo-12b"],
        min_context=8192
    ))

    # Serena
    serena_enabled: bool = True
    serena_priority: str = "always_first"

    # 上下文
    context: ContextConfig = field(default_factory=ContextConfig)

    # 安全
    security: SecurityConfig = field(default_factory=SecurityConfig)

Default configuration instance

默认配置实例

DEFAULT_CONFIG = RouterConfig()
def load_config_from_dict(data: dict) -> RouterConfig: """Load configuration from dictionary (e.g., parsed YAML).""" config = RouterConfig()
# Update services
if 'services' in data:
    for service_name, service_data in data['services'].items():
        if hasattr(config, service_name):
            setattr(config, service_name, ServiceConfig(**service_data))

# Update task routing
for category in ['coding', 'reasoning', 'analysis', 'documentation']:
    if category in data.get('task_routing', {}):
        setattr(config, category, TaskRoutingConfig(**data['task_routing'][category]))

# Update security
if 'security' in data:
    config.security = SecurityConfig(**data['security'])

return config
undefined
DEFAULT_CONFIG = RouterConfig()
def load_config_from_dict(data: dict) -> RouterConfig: """从字典加载配置(例如,解析后的YAML)。""" config = RouterConfig()
# 更新服务
if 'services' in data:
    for service_name, service_data in data['services'].items():
        if hasattr(config, service_name):
            setattr(config, service_name, ServiceConfig(**service_data))

# 更新任务路由
for category in ['coding', 'reasoning', 'analysis', 'documentation']:
    if category in data.get('task_routing', {}):
        setattr(config, category, TaskRoutingConfig(**data['task_routing'][category]))

# 更新安全配置
if 'security' in data:
    config.security = SecurityConfig(**data['security'])

return config
undefined

Example YAML Configuration (for reference)

示例YAML配置(参考)

yaml
undefined
yaml
undefined

local-llm-router.yaml

local-llm-router.yaml

Copy this to your project and customize

复制到项目中并自定义

version: "1.0" environment: "air-gapped"
services: ollama: enabled: true endpoint: "http://localhost:11434" priority: 1 timeout: 30000
lmstudio: enabled: true endpoint: "http://localhost:1234" priority: 2
jan: enabled: false endpoint: "http://localhost:1337" priority: 3
custom_endpoints: - name: "internal-gpu-server" endpoint: "http://192.168.1.100:8000" priority: 0 api_style: "openai"
task_routing: coding: primary_models: - "deepseek-v3" - "qwen2.5-coder-32b" - "deepseek-coder-v2" fallback_models: - "codellama-34b" - "qwen2.5-coder-14b" - "phi-4" min_context: 8192
reasoning: primary_models: - "deepseek-r1" - "deepseek-v3" - "qwen2.5-72b-instruct" fallback_models: - "deepseek-r1-distill-32b" - "mistral-small-24b" min_context: 16384
analysis: primary_models: - "deepseek-v3" - "qwen2.5-coder-32b" require_serena: true
documentation: primary_models: - "qwen2.5-72b-instruct" - "llama-3.3-70b-instruct" fallback_models: - "mistral-nemo-12b"
serena: enabled: true priority: "always_first" workspace: "${WORKSPACE_ROOT}"
context: compaction_threshold: 0.8 preserve_recent_messages: 10
security: allow_external: false allowed_hosts: - "localhost" - "127.0.0.1" - "192.168.0.0/16" audit_enabled: true audit_log_path: "./llm-router-audit.log"
undefined
version: "1.0" environment: "air-gapped"
services: ollama: enabled: true endpoint: "http://localhost:11434" priority: 1 timeout: 30000
lmstudio: enabled: true endpoint: "http://localhost:1234" priority: 2
jan: enabled: false endpoint: "http://localhost:1337" priority: 3
custom_endpoints: - name: "internal-gpu-server" endpoint: "http://192.168.1.100:8000" priority: 0 api_style: "openai"
task_routing: coding: primary_models: - "deepseek-v3" - "qwen2.5-coder-32b" - "deepseek-coder-v2" fallback_models: - "codellama-34b" - "qwen2.5-coder-14b" - "phi-4" min_context: 8192
reasoning: primary_models: - "deepseek-r1" - "deepseek-v3" - "qwen2.5-72b-instruct" fallback_models: - "deepseek-r1-distill-32b" - "mistral-small-24b" min_context: 16384
analysis: primary_models: - "deepseek-v3" - "qwen2.5-coder-32b" require_serena: true
documentation: primary_models: - "qwen2.5-72b-instruct" - "llama-3.3-70b-instruct" fallback_models: - "mistral-nemo-12b"
serena: enabled: true priority: "always_first" workspace: "${WORKSPACE_ROOT}"
context: compaction_threshold: 0.8 preserve_recent_messages: 10
security: allow_external: false allowed_hosts: - "localhost" - "127.0.0.1" - "192.168.0.0/16" audit_enabled: true audit_log_path: "./llm-router-audit.log"
undefined

Fallback Strategy

回退策略

Graceful Degradation

优雅降级

python
from enum import IntEnum
from dataclasses import dataclass
from typing import Optional, Any

class FallbackLevel(IntEnum):
    PRIMARY = 0
    FALLBACK_MODELS = 1
    REDUCED_CONTEXT = 2
    SMALLEST_MODEL = 3
    FAILED = 4

@dataclass
class ExecutionResult:
    success: bool
    model: Optional[str] = None
    service: Optional[str] = None
    response: Any = None
    fallback_level: FallbackLevel = FallbackLevel.PRIMARY
    error: Optional[str] = None

class FallbackExecutor:
    """Execute queries with multi-level fallback."""

    def __init__(
        self,
        discovery: ServiceDiscovery,
        context_manager: ContextManager,
        config: RouterConfig
    ):
        self.discovery = discovery
        self.context = context_manager
        self.config = config

    async def execute_with_fallback(
        self,
        query: str,
        category: TaskCategory
    ) -> ExecutionResult:
        """Execute query with fallback strategy."""

        # Get model lists
        task_config = getattr(self.config, category.value)
        primary_models = task_config.primary_models
        fallback_models = task_config.fallback_models

        # Level 0: Try primary models
        for model in primary_models:
            result = await self._try_model(model, query)
            if result.success:
                result.fallback_level = FallbackLevel.PRIMARY
                return result

        # Level 1: Try fallback models
        for model in fallback_models:
            result = await self._try_model(model, query)
            if result.success:
                result.fallback_level = FallbackLevel.FALLBACK_MODELS
                return result

        # Level 2: Reduce context and retry
        self.context._compact(task_config.min_context)
        for model in primary_models + fallback_models:
            result = await self._try_model(model, query)
            if result.success:
                result.fallback_level = FallbackLevel.REDUCED_CONTEXT
                return result

        # Level 3: Use smallest available model
        smallest = await self._find_smallest_model()
        if smallest:
            result = await self._try_model(smallest, query)
            if result.success:
                result.fallback_level = FallbackLevel.SMALLEST_MODEL
                return result

        # Level 4: All failed
        return ExecutionResult(
            success=False,
            fallback_level=FallbackLevel.FAILED,
            error="All fallback strategies exhausted"
        )

    async def _try_model(self, model_id: str, query: str) -> ExecutionResult:
        """Try executing query on specific model."""
        # Find service with this model
        service = await self._find_service_with_model(model_id)
        if not service:
            return ExecutionResult(
                success=False,
                error=f"Model {model_id} not available"
            )

        try:
            response = await self._execute_on_service(service, model_id, query)
            return ExecutionResult(
                success=True,
                model=model_id,
                service=service.name,
                response=response
            )
        except Exception as e:
            return ExecutionResult(
                success=False,
                error=str(e)
            )

    async def _find_service_with_model(self, model_id: str) -> Optional[LLMService]:
        """Find service that has the specified model."""
        services = list(self.discovery.services.values())

        # Sort by priority
        services.sort(key=lambda s: getattr(self.config, s.type, ServiceConfig()).priority)

        for service in services:
            for model in service.models:
                if model_id.lower() in model.id.lower() or model.id.lower() in model_id.lower():
                    return service

        return None

    async def _find_smallest_model(self) -> Optional[str]:
        """Find smallest available model by VRAM requirement."""
        smallest = None
        smallest_vram = float('inf')

        for service in self.discovery.services.values():
            for model in service.models:
                capability = MODEL_DATABASE.get(model.id)
                if capability and capability.vram_gb < smallest_vram:
                    smallest = model.id
                    smallest_vram = capability.vram_gb

        return smallest

    async def _execute_on_service(
        self,
        service: LLMService,
        model_id: str,
        query: str
    ) -> str:
        """Execute query on specific service."""
        import httpx

        messages = self.context.export_for_api()
        messages.append({"role": "user", "content": query})

        async with httpx.AsyncClient() as client:
            if service.api_style == 'native' and service.type == 'ollama':
                # Ollama native API
                response = await client.post(
                    f"{service.endpoint}{service.chat_path}",
                    json={
                        "model": model_id,
                        "messages": messages,
                        "stream": False
                    },
                    timeout=self.config.ollama.timeout / 1000
                )
                data = response.json()
                return data.get('message', {}).get('content', '')

            else:
                # OpenAI-compatible API
                response = await client.post(
                    f"{service.endpoint}{service.chat_path}",
                    json={
                        "model": model_id,
                        "messages": messages,
                        "stream": False
                    },
                    timeout=30
                )
                data = response.json()
                return data.get('choices', [{}])[0].get('message', {}).get('content', '')
python
from enum import IntEnum
from dataclasses import dataclass
from typing import Optional, Any

class FallbackLevel(IntEnum):
    PRIMARY = 0
    FALLBACK_MODELS = 1
    REDUCED_CONTEXT = 2
    SMALLEST_MODEL = 3
    FAILED = 4

@dataclass
class ExecutionResult:
    success: bool
    model: Optional[str] = None
    service: Optional[str] = None
    response: Any = None
    fallback_level: FallbackLevel = FallbackLevel.PRIMARY
    error: Optional[str] = None

class FallbackExecutor:
    """使用多级回退执行请求。"""

    def __init__(
        self,
        discovery: ServiceDiscovery,
        context_manager: ContextManager,
        config: RouterConfig
    ):
        self.discovery = discovery
        self.context = context_manager
        self.config = config

    async def execute_with_fallback(
        self,
        query: str,
        category: TaskCategory
    ) -> ExecutionResult:
        """使用回退策略执行请求。"""

        # 获取模型列表
        task_config = getattr(self.config, category.value)
        primary_models = task_config.primary_models
        fallback_models = task_config.fallback_models

        # 级别0:尝试主模型
        for model in primary_models:
            result = await self._try_model(model, query)
            if result.success:
                result.fallback_level = FallbackLevel.PRIMARY
                return result

        # 级别1:尝试回退模型
        for model in fallback_models:
            result = await self._try_model(model, query)
            if result.success:
                result.fallback_level = FallbackLevel.FALLBACK_MODELS
                return result

        # 级别2:减少上下文并重试
        self.context._compact(task_config.min_context)
        for model in primary_models + fallback_models:
            result = await self._try_model(model, query)
            if result.success:
                result.fallback_level = FallbackLevel.REDUCED_CONTEXT
                return result

        # 级别3:使用最小可用模型
        smallest = await self._find_smallest_model()
        if smallest:
            result = await self._try_model(smallest, query)
            if result.success:
                result.fallback_level = FallbackLevel.SMALLEST_MODEL
                return result

        # 级别4:全部失败
        return ExecutionResult(
            success=False,
            fallback_level=FallbackLevel.FAILED,
            error="所有回退策略均已耗尽"
        )

    async def _try_model(self, model_id: str, query: str) -> ExecutionResult:
        """尝试在特定模型上执行请求。"""
        # 查找拥有该模型的服务
        service = await self._find_service_with_model(model_id)
        if not service:
            return ExecutionResult(
                success=False,
                error=f"模型 {model_id} 不可用"
            )

        try:
            response = await self._execute_on_service(service, model_id, query)
            return ExecutionResult(
                success=True,
                model=model_id,
                service=service.name,
                response=response
            )
        except Exception as e:
            return ExecutionResult(
                success=False,
                error=str(e)
            )

    async def _find_service_with_model(self, model_id: str) -> Optional[LLMService]:
        """查找拥有指定模型的服务。"""
        services = list(self.discovery.services.values())

        # 按优先级排序
        services.sort(key=lambda s: getattr(self.config, s.type, ServiceConfig()).priority)

        for service in services:
            for model in service.models:
                if model_id.lower() in model.id.lower() or model.id.lower() in model_id.lower():
                    return service

        return None

    async def _find_smallest_model(self) -> Optional[str]:
        """查找VRAM需求最小的可用模型。"""
        smallest = None
        smallest_vram = float('inf')

        for service in self.discovery.services.values():
            for model in service.models:
                capability = MODEL_DATABASE.get(model.id)
                if capability and capability.vram_gb < smallest_vram:
                    smallest = model.id
                    smallest_vram = capability.vram_gb

        return smallest

    async def _execute_on_service(
        self,
        service: LLMService,
        model_id: str,
        query: str
    ) -> str:
        """在特定服务上执行请求。"""
        import httpx

        messages = self.context.export_for_api()
        messages.append({"role": "user", "content": query})

        async with httpx.AsyncClient() as client:
            if service.api_style == 'native' and service.type == 'ollama':
                # Ollama原生API
                response = await client.post(
                    f"{service.endpoint}{service.chat_path}",
                    json={
                        "model": model_id,
                        "messages": messages,
                        "stream": False
                    },
                    timeout=self.config.ollama.timeout / 1000
                )
                data = response.json()
                return data.get('message', {}).get('content', '')

            else:
                # OpenAI兼容API
                response = await client.post(
                    f"{service.endpoint}{service.chat_path}",
                    json={
                        "model": model_id,
                        "messages": messages,
                        "stream": False
                    },
                    timeout=30
                )
                data = response.json()
                return data.get('choices', [{}])[0].get('message', {}).get('content', '')

Security (Air-Gapped)

安全(气隙网络)

Network Isolation

网络隔离

python
import hashlib
import json
from datetime import datetime
from dataclasses import dataclass
from typing import Optional
import ipaddress
import logging

@dataclass
class AuditLogEntry:
    timestamp: str
    event_type: str
    session_id: Optional[str] = None
    model: Optional[str] = None
    service: Optional[str] = None
    query_hash: Optional[str] = None  # Hashed, not plaintext
    tokens_in: int = 0
    tokens_out: int = 0
    success: bool = True
    error: Optional[str] = None

class SecurityModule:
    """Security enforcement for air-gapped networks."""

    def __init__(self, config: SecurityConfig):
        self.config = config
        self._allowed_ips = self._parse_allowed_networks()
        self._logger = self._setup_audit_logger()

    def _parse_allowed_networks(self) -> list:
        """Parse allowed hosts and CIDRs."""
        networks = []

        for host in self.config.allowed_hosts:
            if '/' in host:
                # CIDR notation
                networks.append(ipaddress.ip_network(host, strict=False))
            else:
                # Single host
                try:
                    ip = ipaddress.ip_address(host)
                    networks.append(ipaddress.ip_network(f"{ip}/32"))
                except ValueError:
                    # Hostname like 'localhost'
                    if host == 'localhost':
                        networks.append(ipaddress.ip_network("127.0.0.0/8"))
                    elif host == 'host.docker.internal':
                        # Allow common Docker host IPs
                        networks.append(ipaddress.ip_network("172.17.0.0/16"))

        for cidr in self.config.allowed_cidrs:
            networks.append(ipaddress.ip_network(cidr, strict=False))

        return networks

    def _setup_audit_logger(self) -> logging.Logger:
        """Setup audit logger."""
        logger = logging.getLogger('llm-router-audit')
        logger.setLevel(logging.INFO)

        if self.config.audit_enabled:
            handler = logging.FileHandler(self.config.audit_log_path)
            handler.setFormatter(logging.Formatter('%(message)s'))
            logger.addHandler(handler)

        return logger

    def validate_endpoint(self, url: str) -> bool:
        """Validate that endpoint is in allowed network."""
        if self.config.allow_external:
            return True

        try:
            from urllib.parse import urlparse
            parsed = urlparse(url)
            host = parsed.hostname

            # Check for localhost
            if host in ['localhost', '127.0.0.1', '::1']:
                return True

            # Check against allowed networks
            try:
                ip = ipaddress.ip_address(host)
                for network in self._allowed_ips:
                    if ip in network:
                        return True
            except ValueError:
                # Hostname - only allow specific ones
                return host in ['localhost', 'host.docker.internal']

            return False

        except Exception:
            return False

    def log_query(
        self,
        session_id: str,
        model: str,
        service: str,
        query: str,
        tokens_in: int,
        tokens_out: int,
        success: bool,
        error: Optional[str] = None
    ):
        """Log query for audit trail."""
        if not self.config.audit_enabled:
            return

        entry = AuditLogEntry(
            timestamp=datetime.now().isoformat(),
            event_type='query',
            session_id=session_id,
            model=model,
            service=service,
            query_hash=self._hash_content(query) if self.config.log_queries else None,
            tokens_in=tokens_in,
            tokens_out=tokens_out,
            success=success,
            error=error
        )

        self._logger.info(json.dumps(entry.__dict__))

    def log_security_event(self, event_type: str, details: dict):
        """Log security-related event."""
        if not self.config.audit_enabled:
            return

        entry = {
            'timestamp': datetime.now().isoformat(),
            'event_type': f'security:{event_type}',
            **details
        }

        self._logger.warning(json.dumps(entry))

    def _hash_content(self, content: str) -> str:
        """Hash content for audit logging (privacy)."""
        return hashlib.sha256(content.encode()).hexdigest()[:16]
python
import hashlib
import json
from datetime import datetime
from dataclasses import dataclass
from typing import Optional
import ipaddress
import logging

@dataclass
class AuditLogEntry:
    timestamp: str
    event_type: str
    session_id: Optional[str] = None
    model: Optional[str] = None
    service: Optional[str] = None
    query_hash: Optional[str] = None  # 哈希值,而非明文
    tokens_in: int = 0
    tokens_out: int = 0
    success: bool = True
    error: Optional[str] = None

class SecurityModule:
    """气隙网络的安全实施。"""

    def __init__(self, config: SecurityConfig):
        self.config = config
        self._allowed_ips = self._parse_allowed_networks()
        self._logger = self._setup_audit_logger()

    def _parse_allowed_networks(self) -> list:
        """解析允许的主机和CIDR。"""
        networks = []

        for host in self.config.allowed_hosts:
            if '/' in host:
                # CIDR表示法
                networks.append(ipaddress.ip_network(host, strict=False))
            else:
                # 单个主机
                try:
                    ip = ipaddress.ip_address(host)
                    networks.append(ipaddress.ip_network(f"{ip}/32"))
                except ValueError:
                    # 主机名如'localhost'
                    if host == 'localhost':
                        networks.append(ipaddress.ip_network("127.0.0.0/8"))
                    elif host == 'host.docker.internal':
                        # 允许常见的Docker主机IP
                        networks.append(ipaddress.ip_network("172.17.0.0/16"))

        for cidr in self.config.allowed_cidrs:
            networks.append(ipaddress.ip_network(cidr, strict=False))

        return networks

    def _setup_audit_logger(self) -> logging.Logger:
        """设置审计日志记录器。"""
        logger = logging.getLogger('llm-router-audit')
        logger.setLevel(logging.INFO)

        if self.config.audit_enabled:
            handler = logging.FileHandler(self.config.audit_log_path)
            handler.setFormatter(logging.Formatter('%(message)s'))
            logger.addHandler(handler)

        return logger

    def validate_endpoint(self, url: str) -> bool:
        """验证端点是否在允许的网络中。"""
        if self.config.allow_external:
            return True

        try:
            from urllib.parse import urlparse
            parsed = urlparse(url)
            host = parsed.hostname

            # 检查localhost
            if host in ['localhost', '127.0.0.1', '::1']:
                return True

            # 检查允许的网络
            try:
                ip = ipaddress.ip_address(host)
                for network in self._allowed_ips:
                    if ip in network:
                        return True
            except ValueError:
                # 主机名 - 仅允许特定主机
                return host in ['localhost', 'host.docker.internal']

            return False

        except Exception:
            return False

    def log_query(
        self,
        session_id: str,
        model: str,
        service: str,
        query: str,
        tokens_in: int,
        tokens_out: int,
        success: bool,
        error: Optional[str] = None
    ):
        """记录请求用于审计跟踪。"""
        if not self.config.audit_enabled:
            return

        entry = AuditLogEntry(
            timestamp=datetime.now().isoformat(),
            event_type='query',
            session_id=session_id,
            model=model,
            service=service,
            query_hash=self._hash_content(query) if self.config.log_queries else None,
            tokens_in=tokens_in,
            tokens_out=tokens_out,
            success=success,
            error=error
        )

        self._logger.info(json.dumps(entry.__dict__))

    def log_security_event(self, event_type: str, details: dict):
        """记录安全相关事件。"""
        if not self.config.audit_enabled:
            return

        entry = {
            'timestamp': datetime.now().isoformat(),
            'event_type': f'security:{event_type}',
            **details
        }

        self._logger.warning(json.dumps(entry))

    def _hash_content(self, content: str) -> str:
        """对内容进行哈希以用于审计日志(隐私保护)。"""
        return hashlib.sha256(content.encode()).hexdigest()[:16]

Security checklist for air-gapped deployment

气隙网络部署检查清单

AIR_GAPPED_CHECKLIST = """
AIR_GAPPED_CHECKLIST = """

Air-Gapped Deployment Checklist

气隙网络部署检查清单

Network

网络

  • Verify no external DNS resolution
  • Block all egress traffic at firewall
  • Whitelist only internal IP ranges
  • Disable IPv6 if not needed
  • 验证无外部DNS解析
  • 在防火墙阻止所有出站流量
  • 仅白名单内部IP范围
  • 如果不需要则禁用IPv6

Model Verification

模型验证

  • Pre-download all required models
  • Generate SHA256 checksums for all models
  • Store checksums in tamper-evident location
  • Verify checksums before loading models
  • 预下载所有必需模型
  • 为所有模型生成SHA256校验和
  • 将校验和存储在防篡改位置
  • 在加载模型前验证校验和

Access Control

访问控制

  • Implement role-based access to LLM services
  • Require authentication for all endpoints
  • Use short-lived tokens for API access
  • Log all access attempts
  • 为LLM服务实现基于角色的访问
  • 所有端点需要认证
  • 为API访问使用短期令牌
  • 记录所有访问尝试

Audit

审计

  • Enable comprehensive audit logging
  • Log queries (hashed, not plaintext)
  • Log model usage patterns
  • Log all security events
  • Implement log rotation and retention """
undefined
  • 启用全面审计日志
  • 记录请求(哈希值,而非明文)
  • 记录模型使用模式
  • 记录所有安全事件
  • 实现日志轮转和保留策略 """
undefined

Coding Agent Detection

编码Agent检测

Detect Active Coding Agent

检测活跃编码Agent

python
import os
import sys
from dataclasses import dataclass
from typing import Optional

@dataclass
class CodingAgentInfo:
    name: str
    type: str
    version: Optional[str] = None
    config_path: Optional[str] = None
python
import os
import sys
from dataclasses import dataclass
from typing import Optional

@dataclass
class CodingAgentInfo:
    name: str
    type: str
    version: Optional[str] = None
    config_path: Optional[str] = None

Environment variable markers for different agents

不同Agent的环境变量标记

AGENT_ENV_MARKERS = { # CLI-based agents 'QWEN_CLI_VERSION': ('qwen-cli', 'cli'), 'OPENCODE_SESSION': ('opencode', 'cli'), 'AIDER_SESSION': ('aider', 'cli'), 'CODEX_SESSION': ('codex', 'cli'), 'GEMINI_CLI_SESSION': ('gemini-cli', 'cli'),
# IDE extensions
'CONTINUE_SESSION': ('continue', 'ide'),
'CLINE_SESSION': ('cline', 'ide'),
'ROO_CODE_SESSION': ('roo-code', 'ide'),
'CURSOR_SESSION': ('cursor', 'ide'),

# Local GUI apps
'OPENWEBUI_SESSION': ('openwebui', 'gui'),
'JAN_SESSION': ('jan', 'gui'),
'AGNO_SESSION': ('agno', 'gui'),

# Generic markers
'LLM_AGENT': ('generic', 'unknown'),
}
def detect_coding_agent() -> CodingAgentInfo: """Detect which coding agent is invoking the router."""
# Check environment variables
for env_var, (name, agent_type) in AGENT_ENV_MARKERS.items():
    value = os.environ.get(env_var)
    if value:
        return CodingAgentInfo(
            name=name,
            type=agent_type,
            version=value if value != '1' else None
        )

# Check process name / parent process
try:
    import psutil
    parent = psutil.Process(os.getppid())
    parent_name = parent.name().lower()

    agent_process_names = {
        'qwen': 'qwen-cli',
        'aider': 'aider',
        'codex': 'codex',
        'continue': 'continue',
        'cursor': 'cursor',
    }

    for proc_name, agent_name in agent_process_names.items():
        if proc_name in parent_name:
            return CodingAgentInfo(name=agent_name, type='detected')

except ImportError:
    pass  # psutil not available

# Check for MCP client markers
if os.environ.get('MCP_CLIENT'):
    return CodingAgentInfo(
        name=os.environ.get('MCP_CLIENT', 'mcp-client'),
        type='mcp'
    )

# Default: unknown
return CodingAgentInfo(name='unknown', type='unknown')
def get_agent_specific_config(agent: CodingAgentInfo) -> dict: """Get agent-specific configuration overrides."""
configs = {
    'qwen-cli': {
        'default_model_preference': 'qwen',
        'context_format': 'qwen',
    },
    'aider': {
        'default_model_preference': 'gpt',
        'context_format': 'openai',
    },
    'cursor': {
        'default_model_preference': 'claude',
        'context_format': 'anthropic',
    },
    'continue': {
        'supports_streaming': True,
        'context_format': 'openai',
    },
}

return configs.get(agent.name, {})
undefined
AGENT_ENV_MARKERS = { # CLI-based agents 'QWEN_CLI_VERSION': ('qwen-cli', 'cli'), 'OPENCODE_SESSION': ('opencode', 'cli'), 'AIDER_SESSION': ('aider', 'cli'), 'CODEX_SESSION': ('codex', 'cli'), 'GEMINI_CLI_SESSION': ('gemini-cli', 'cli'),
# IDE扩展
'CONTINUE_SESSION': ('continue', 'ide'),
'CLINE_SESSION': ('cline', 'ide'),
'ROO_CODE_SESSION': ('roo-code', 'ide'),
'CURSOR_SESSION': ('cursor', 'ide'),

# 本地GUI应用
'OPENWEBUI_SESSION': ('openwebui', 'gui'),
'JAN_SESSION': ('jan', 'gui'),
'AGNO_SESSION': ('agno', 'gui'),

# 通用标记
'LLM_AGENT': ('generic', 'unknown'),
}
def detect_coding_agent() -> CodingAgentInfo: """检测调用路由的编码Agent。"""
# 检查环境变量
for env_var, (name, agent_type) in AGENT_ENV_MARKERS.items():
    value = os.environ.get(env_var)
    if value:
        return CodingAgentInfo(
            name=name,
            type=agent_type,
            version=value if value != '1' else None
        )

# 检查进程名/父进程
try:
    import psutil
    parent = psutil.Process(os.getppid())
    parent_name = parent.name().lower()

    agent_process_names = {
        'qwen': 'qwen-cli',
        'aider': 'aider',
        'codex': 'codex',
        'continue': 'continue',
        'cursor': 'cursor',
    }

    for proc_name, agent_name in agent_process_names.items():
        if proc_name in parent_name:
            return CodingAgentInfo(name=agent_name, type='detected')

except ImportError:
    pass  # psutil不可用

# 检查MCP客户端标记
if os.environ.get('MCP_CLIENT'):
    return CodingAgentInfo(
        name=os.environ.get('MCP_CLIENT', 'mcp-client'),
        type='mcp'
    )

# 默认:未知
return CodingAgentInfo(name='unknown', type='unknown')
def get_agent_specific_config(agent: CodingAgentInfo) -> dict: """获取Agent特定的配置覆盖。"""
configs = {
    'qwen-cli': {
        'default_model_preference': 'qwen',
        'context_format': 'qwen',
    },
    'aider': {
        'default_model_preference': 'gpt',
        'context_format': 'openai',
    },
    'cursor': {
        'default_model_preference': 'claude',
        'context_format': 'anthropic',
    },
    'continue': {
        'supports_streaming': True,
        'context_format': 'openai',
    },
}

return configs.get(agent.name, {})
undefined

Complete Router Implementation

完整路由实现

python
class LocalLLMRouter:
    """
    Complete Local LLM Router with Serena integration.

    Usage:
        router = LocalLLMRouter(workspace="/path/to/project")
        await router.initialize()

        response = await router.route("Implement a binary search function")
        print(response)
    """

    def __init__(
        self,
        workspace: str,
        config: RouterConfig = None,
        session_id: str = None
    ):
        self.workspace = workspace
        self.config = config or DEFAULT_CONFIG
        self.session_id = session_id or self._generate_session_id()

        # Components
        self.serena: Optional[SerenaMCP] = None
        self.discovery: Optional[ServiceDiscovery] = None
        self.context: Optional[ContextManager] = None
        self.security: Optional[SecurityModule] = None
        self.selector: Optional[ModelSelector] = None
        self.fallback: Optional[FallbackExecutor] = None

        # State
        self.os_info = detect_os()
        self.coding_agent = detect_coding_agent()
        self._initialized = False

    async def initialize(self):
        """Initialize all router components."""

        # Security module
        self.security = SecurityModule(self.config.security)

        # Service discovery
        self.discovery = ServiceDiscovery(self.config.custom_endpoints)
        services = await self.discovery.discover_all()

        if not services:
            raise RuntimeError("No local LLM services available")

        # Model selector
        all_models = []
        for service in services:
            all_models.extend(m.id for m in service.models)
        self.selector = ModelSelector(all_models)

        # Context manager
        self.context = ContextManager(
            session_id=self.session_id,
            system_prompt=self._build_system_prompt(),
            compaction_threshold=self.config.context.compaction_threshold,
            compaction_target=self.config.context.compaction_target,
            preserve_recent=self.config.context.preserve_recent_messages
        )

        # Serena MCP (if enabled)
        if self.config.serena_enabled:
            self.serena = SerenaMCP(self.workspace)
            try:
                await self.serena.start()
            except Exception as e:
                logging.warning(f"Serena MCP failed to start: {e}")
                self.serena = None

        # Fallback executor
        self.fallback = FallbackExecutor(
            self.discovery,
            self.context,
            self.config
        )

        self._initialized = True

    async def route(
        self,
        query: str,
        file_context: dict = None
    ) -> str:
        """
        Route query to appropriate LLM.

        Args:
            query: The user's query
            file_context: Optional dict with 'file', 'position' for code context

        Returns:
            LLM response string
        """
        if not self._initialized:
            await self.initialize()

        # Step 1: Classify task
        classification = classify_task(query)

        # Step 2: Serena first (if code-related)
        serena_context = {}
        if self.serena and (classification.requires_serena or file_context):
            serena_context = await self._gather_serena_context(
                query, file_context, classification
            )

        # Step 3: Build enriched query
        enriched_query = self._build_enriched_query(query, serena_context)

        # Step 4: Select model
        model = self.selector.select(
            classification.category,
            required_context=self.context.context.total_tokens + len(query) // 4
        )

        if not model:
            raise RuntimeError("No suitable model available")

        # Step 5: Update context manager with selected model
        self.context.set_model(model)

        # Step 6: Check context and compact if needed
        model_capability = MODEL_DATABASE.get(model)
        if model_capability:
            self.context.check_and_compact(model_capability.context_window)

        # Step 7: Execute with fallback
        result = await self.fallback.execute_with_fallback(
            enriched_query,
            classification.category
        )

        # Step 8: Log for audit
        self.security.log_query(
            session_id=self.session_id,
            model=result.model or model,
            service=result.service or 'unknown',
            query=query,
            tokens_in=len(query) // 4,
            tokens_out=len(result.response or '') // 4,
            success=result.success,
            error=result.error
        )

        if not result.success:
            raise RuntimeError(f"Query failed: {result.error}")

        # Step 9: Update context with response
        self.context.add_message('user', query)
        self.context.add_message('assistant', result.response)

        # Step 10: Apply edits via Serena if needed
        if self.serena and file_context and contains_code_edit(result.response):
            await self._apply_serena_edits(result.response, file_context)

        return result.response

    async def _gather_serena_context(
        self,
        query: str,
        file_context: dict,
        classification: ClassificationResult
    ) -> dict:
        """Gather code context from Serena."""
        context = {}

        if not file_context:
            return context

        file = file_context.get('file')
        position = file_context.get('position', {})
        line = position.get('line', 0)
        char = position.get('character', 0)

        try:
            # Always get hover info
            context['hover'] = await self.serena.get_hover_info(file, line, char)

            # Get references for refactoring tasks
            if 'refactor' in query.lower() or 'rename' in query.lower():
                context['references'] = await self.serena.get_references(file, line, char)

            # Get diagnostics for analysis
            if classification.category == TaskCategory.ANALYSIS:
                context['diagnostics'] = await self.serena.get_diagnostics(file)

        except Exception as e:
            logging.warning(f"Serena context gathering failed: {e}")

        return context

    def _build_enriched_query(self, query: str, serena_context: dict) -> str:
        """Build query enriched with Serena context."""
        return build_enriched_query(query, serena_context)

    async def _apply_serena_edits(self, response: str, file_context: dict):
        """Apply code edits from response via Serena."""
        edits = parse_code_edits(response)
        if edits:
            await self.serena.apply_edit(file_context['file'], edits)

    def _build_system_prompt(self) -> str:
        """Build system prompt with router context."""
        return f"""You are a coding assistant running in a local, air-gapped environment.

Environment:
- OS: {self.os_info.platform} ({self.os_info.arch})
- Coding Agent: {self.coding_agent.name}
- Serena LSP: {'enabled' if self.config.serena_enabled else 'disabled'}

Guidelines:
- Provide concise, accurate code
- Use Serena's semantic information when provided
- Respect security constraints (no external calls)
- Focus on the specific task at hand
"""

    def _generate_session_id(self) -> str:
        """Generate unique session ID."""
        import uuid
        return str(uuid.uuid4())[:8]
python
class LocalLLMRouter:
    """
    完整的本地LLM路由,集成Serena。

    使用方法:
        router = LocalLLMRouter(workspace="/path/to/project")
        await router.initialize()

        response = await router.route("实现一个二分查找函数")
        print(response)
    """

    def __init__(
        self,
        workspace: str,
        config: RouterConfig = None,
        session_id: str = None
    ):
        self.workspace = workspace
        self.config = config or DEFAULT_CONFIG
        self.session_id = session_id or self._generate_session_id()

        # 组件
        self.serena: Optional[SerenaMCP] = None
        self.discovery: Optional[ServiceDiscovery] = None
        self.context: Optional[ContextManager] = None
        self.security: Optional[SecurityModule] = None
        self.selector: Optional[ModelSelector] = None
        self.fallback: Optional[FallbackExecutor] = None

        # 状态
        self.os_info = detect_os()
        self.coding_agent = detect_coding_agent()
        self._initialized = False

    async def initialize(self):
        """初始化所有路由组件。"""

        # 安全模块
        self.security = SecurityModule(self.config.security)

        # 服务发现
        self.discovery = ServiceDiscovery(self.config.custom_endpoints)
        services = await self.discovery.discover_all()

        if not services:
            raise RuntimeError("无可用本地LLM服务")

        # 模型选择器
        all_models = []
        for service in services:
            all_models.extend(m.id for m in service.models)
        self.selector = ModelSelector(all_models)

        # 上下文管理器
        self.context = ContextManager(
            session_id=self.session_id,
            system_prompt=self._build_system_prompt(),
            compaction_threshold=self.config.context.compaction_threshold,
            compaction_target=self.config.context.compaction_target,
            preserve_recent=self.config.context.preserve_recent_messages
        )

        # Serena MCP(如果启用)
        if self.config.serena_enabled:
            self.serena = SerenaMCP(self.workspace)
            try:
                await self.serena.start()
            except Exception as e:
                logging.warning(f"Serena MCP启动失败: {e}")
                self.serena = None

        # 回退执行器
        self.fallback = FallbackExecutor(
            self.discovery,
            self.context,
            self.config
        )

        self._initialized = True

    async def route(
        self,
        query: str,
        file_context: dict = None
    ) -> str:
        """
        将请求路由至合适的LLM。

        参数:
            query: 用户的请求
            file_context: 可选字典,包含'file'、'position'用于代码上下文

        返回:
            LLM响应字符串
        """
        if not self._initialized:
            await self.initialize()

        # 步骤1:分类任务
        classification = classify_task(query)

        # 步骤2:优先使用Serena(如果是代码相关任务)
        serena_context = {}
        if self.serena and (classification.requires_serena or file_context):
            serena_context = await self._gather_serena_context(
                query, file_context, classification
            )

        # 步骤3:构建增强请求
        enriched_query = self._build_enriched_query(query, serena_context)

        # 步骤4:选择模型
        model = self.selector.select(
            classification.category,
            required_context=self.context.context.total_tokens + len(query) // 4
        )

        if not model:
            raise RuntimeError("无合适模型可用")

        # 步骤5:使用选中的模型更新上下文管理器
        self.context.set_model(model)

        # 步骤6:检查上下文并在需要时压缩
        model_capability = MODEL_DATABASE.get(model)
        if model_capability:
            self.context.check_and_compact(model_capability.context_window)

        # 步骤7:使用回退执行
        result = await self.fallback.execute_with_fallback(
            enriched_query,
            classification.category
        )

        # 步骤8:审计日志
        self.security.log_query(
            session_id=self.session_id,
            model=result.model or model,
            service=result.service or 'unknown',
            query=query,
            tokens_in=len(query) // 4,
            tokens_out=len(result.response or '') // 4,
            success=result.success,
            error=result.error
        )

        if not result.success:
            raise RuntimeError(f"请求失败: {result.error}")

        # 步骤9:使用响应更新上下文
        self.context.add_message('user', query)
        self.context.add_message('assistant', result.response)

        # 步骤10:如果需要,通过Serena应用编辑
        if self.serena and file_context and contains_code_edit(result.response):
            await self._apply_serena_edits(result.response, file_context)

        return result.response

    async def _gather_serena_context(
        self,
        query: str,
        file_context: dict,
        classification: ClassificationResult
    ) -> dict:
        """从Serena收集代码上下文。"""
        context = {}

        if not file_context:
            return context

        file = file_context.get('file')
        position = file_context.get('position', {})
        line = position.get('line', 0)
        char = position.get('character', 0)

        try:
            # 始终获取悬停信息
            context['hover'] = await self.serena.get_hover_info(file, line, char)

            # 对于重构任务,获取引用
            if 'refactor' in query.lower() or 'rename' in query.lower():
                context['references'] = await self.serena.get_references(file, line, char)

            # 对于分析任务,获取诊断信息
            if classification.category == TaskCategory.ANALYSIS:
                context['diagnostics'] = await self.serena.get_diagnostics(file)

        except Exception as e:
            logging.warning(f"Serena上下文收集失败: {e}")

        return context

    def _build_enriched_query(self, query: str, serena_context: dict) -> str:
        """使用Serena上下文构建增强请求。"""
        return build_enriched_query(query, serena_context)

    async def _apply_serena_edits(self, response: str, file_context: dict):
        """通过Serena应用响应中的代码编辑。"""
        edits = parse_code_edits(response)
        if edits:
            await self.serena.apply_edit(file_context['file'], edits)

    def _build_system_prompt(self) -> str:
        """使用路由上下文构建系统提示词。"""
        return f"""你是运行在本地气隙环境中的编码助手。

环境信息:
- 操作系统: {self.os_info.platform} ({self.os_info.arch})
- 编码Agent: {self.coding_agent.name}
- Serena LSP: {'已启用' if self.config.serena_enabled else '已禁用'}

指南:
- 提供简洁、准确的代码
- 当提供Serena语义信息时,请加以利用
- 遵守安全约束(无外部调用)
- 专注于当前具体任务
"""

    def _generate_session_id(self) -> str:
        """生成唯一会话ID。"""
        import uuid
        return str(uuid.uuid4())[:8]

Utility functions

工具函数

def contains_code_edit(response: str) -> bool: """Check if response contains code edits.""" markers = ['```', 'def ', 'class ', 'function ', 'const ', 'let ', 'var '] return any(marker in response for marker in markers)
def parse_code_edits(response: str) -> list: """Parse code edits from response.""" # Simple implementation - extract code blocks import re code_blocks = re.findall(r'
(?:\w+)?\n(.*?)
', response, re.DOTALL) return [{'content': block.strip()} for block in code_blocks]
undefined
def contains_code_edit(response: str) -> bool: """检查响应是否包含代码编辑。""" markers = ['```', 'def ', 'class ', 'function ', 'const ', 'let ', 'var '] return any(marker in response for marker in markers)
def parse_code_edits(response: str) -> list: """从响应中解析代码编辑。""" # 简单实现 - 提取代码块 import re code_blocks = re.findall(r'
(?:\w+)?\n(.*?)
', response, re.DOTALL) return [{'content': block.strip()} for block in code_blocks]
undefined

Resources

资源