cocoindex-v1
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseCocoIndex v1
CocoIndex v1
CocoIndex v1 is a Python library for building incremental data processing pipelines with declarative target states. Think spreadsheets or React for data pipelines: declare what the output should look like based on current input, and CocoIndex automatically handles incremental updates, change detection, and syncing to external systems.
CocoIndex v1是一个用于构建基于声明式目标状态的增量数据处理管道的Python库。可以把它想象成数据管道领域的电子表格或React:只需根据当前输入声明输出应该是什么样子,CocoIndex就会自动处理增量更新、变更检测以及与外部系统的同步。
Overview
概述
CocoIndex v1 enables building data pipelines that:
- Automatically handle incremental updates: Only reprocess changed data
- Use declarative target states: Declare what should exist, not how to update
- Support any Python types: No custom DSL—use dataclasses, Pydantic, NamedTuple
- Provide function memoization: Skip expensive operations when inputs/code unchanged
- Sync to multiple targets: PostgreSQL, SQLite, LanceDB, Qdrant, file systems
Key principle:
TargetState = Transform(SourceState)CocoIndex v1可构建具备以下特性的数据管道:
- 自动处理增量更新:仅重新处理变更的数据
- 使用声明式目标状态:声明应该存在的结果,而非更新的方式
- 支持所有Python类型:无需自定义DSL——可使用dataclasses、Pydantic、NamedTuple
- 提供函数记忆化:当输入/代码未变更时,跳过昂贵的操作
- 同步到多个目标系统:PostgreSQL、SQLite、LanceDB、Qdrant、文件系统
核心原则:
TargetState = Transform(SourceState)When to Use This Skill
何时使用本技能
Use this skill when building pipelines that involve:
- Document processing: PDF/Markdown conversion, text extraction, chunking
- Vector embeddings: Embedding documents/code for semantic search
- Database transformations: ETL from source DB to target DB
- Knowledge graphs: Extract entities and relationships from data
- LLM-based extraction: Structured data extraction using LLMs
- File-based pipelines: Transform files from one format to another
- Incremental indexing: Keep search indexes up-to-date with source changes
当你构建涉及以下场景的管道时,可使用本技能:
- 文档处理:PDF/Markdown转换、文本提取、分块
- 向量嵌入:为文档/代码生成嵌入用于语义搜索
- 数据库转换:从源数据库到目标数据库的ETL
- 知识图谱:从数据中提取实体和关系
- 基于LLM的提取:使用LLM提取结构化数据
- 基于文件的管道:将文件从一种格式转换为另一种
- 增量索引:保持搜索索引与源数据变更同步
Quick Start: Creating a New Project
快速开始:创建新项目
Initialize Project
初始化项目
Use the built-in CLI to create a new project:
bash
cocoindex init my-project
cd my-projectThis creates:
- - Main app definition
main.py - - Dependencies with pre-release config
pyproject.toml - - Environment configuration
.env - - Quick start guide
README.md
使用内置CLI创建新项目:
bash
cocoindex init my-project
cd my-project这会创建以下文件:
- - 主应用定义
main.py - - 包含预发布配置的依赖文件
pyproject.toml - - 环境配置文件
.env - - 快速入门指南
README.md
Add Dependencies for Specific Use Cases
根据特定场景添加依赖
Add dependencies to based on your needs:
pyproject.tomltoml
undefined根据你的需求,向添加依赖:
pyproject.tomltoml
undefinedFor vector embeddings
用于向量嵌入
dependencies = ["cocoindex>=1.0.0a1", "sentence-transformers", "asyncpg"]
dependencies = ["cocoindex>=1.0.0a1", "sentence-transformers", "asyncpg"]
For PostgreSQL only
仅用于PostgreSQL
dependencies = ["cocoindex>=1.0.0a1", "asyncpg"]
dependencies = ["cocoindex>=1.0.0a1", "asyncpg"]
For LLM extraction
用于LLM提取
dependencies = ["cocoindex>=1.0.0a1", "litellm", "instructor", "pydantic>=2.0"]
See [references/setup_project.md](references/setup_project.md) for complete examples.dependencies = ["cocoindex>=1.0.0a1", "litellm", "instructor", "pydantic>=2.0"]
完整示例请参考[references/setup_project.md](references/setup_project.md)。Set Up Database (if using Postgres/Qdrant)
设置数据库(如果使用Postgres/Qdrant)
For PostgreSQL with Docker:
bash
undefined使用Docker设置PostgreSQL:
bash
undefinedCreate docker-compose.yml with pgvector image
创建包含pgvector镜像的docker-compose.yml
docker-compose up -d
For Qdrant with Docker:
```bashdocker-compose up -d
使用Docker设置Qdrant:
```bashCreate docker-compose.yml with Qdrant image
创建包含Qdrant镜像的docker-compose.yml
docker-compose up -d
See [references/setup_database.md](references/setup_database.md) for detailed setup instructions.docker-compose up -d
详细设置说明请参考[references/setup_database.md](references/setup_database.md)。Run the Pipeline
运行管道
bash
pip install -e .
cocoindex update main.pybash
pip install -e .
cocoindex update main.pyCore Concepts
核心概念
1. Apps
1. 应用(Apps)
An app is the top-level executable that binds a main function with parameters:
python
import cocoindex as coco
@coco.function
def app_main(sourcedir: pathlib.Path, outdir: pathlib.Path) -> None:
# Processing logic here
...
app = coco.App(
coco.AppConfig(name="MyApp"),
app_main,
sourcedir=pathlib.Path("./data"),
outdir=pathlib.Path("./output"),
)
if __name__ == "__main__":
app.update(report_to_stdout=True)应用是顶级可执行对象,将主函数与参数绑定:
python
import cocoindex as coco
@coco.function
def app_main(sourcedir: pathlib.Path, outdir: pathlib.Path) -> None:
# 处理逻辑
...
app = coco.App(
coco.AppConfig(name="MyApp"),
app_main,
sourcedir=pathlib.Path("./data"),
outdir=pathlib.Path("./output"),
)
if __name__ == "__main__":
app.update(report_to_stdout=True)2. Processing Components
2. 处理组件
A processing component groups an item's processing with its target states.
Mount independent components with (preferred) or :
coco_aio.mount_each()coco_aio.mount()python
undefined处理组件将单个项的处理逻辑与其目标状态分组。
使用(推荐)或挂载独立组件:
coco_aio.mount_each()coco_aio.mount()python
undefinedPreferred: mount one component per item (async, keyed iterable)
推荐:为每个项挂载一个组件(异步,带键的可迭代对象)
await coco_aio.mount_each(process_file, files.items(), target_table)
await coco_aio.mount_each(process_file, files.items(), target_table)
Equivalent async manual loop
等效的异步手动循环
for key, f in files.items():
await coco_aio.mount(coco.component_subpath(key), process_file, f, target_table)
for key, f in files.items():
await coco_aio.mount(coco.component_subpath(key), process_file, f, target_table)
Sync mount — only for CPU-intensive leaf components (no I/O)
同步挂载 — 仅适用于CPU密集型叶子组件(无I/O)
coco.mount(coco.component_subpath(str(f.file_path.path)), process_file, f, target_table)
**Mount dependent components** with `use_mount()` when you need the return value:
```python
result = await coco_aio.use_mount(subpath, fn, *args)Mount targets using connector convenience methods (async, subpath is automatic):
python
target_table = await target_db.mount_table_target(
table_name="my_table",
table_schema=await postgres.TableSchema.from_class(MyRecord, primary_key=["id"]),
)Key points:
- Each component runs independently
- Async-first: prefer /
coco_aio.mount_each()for all components; use synccoco_aio.mount()only for CPU-intensive leaf work (no I/O)coco.mount() - Use when you need the return value of a child component
use_mount() - Use stable paths for proper memoization
- Component path determines target state ownership
coco.mount(coco.component_subpath(str(f.file_path.path)), process_file, f, target_table)
当你需要子组件的返回值时,使用`use_mount()`**挂载依赖组件**:
```python
result = await coco_aio.use_mount(subpath, fn, *args)使用连接器便捷方法挂载目标(异步,子路径自动生成):
python
target_table = await target_db.mount_table_target(
table_name="my_table",
table_schema=await postgres.TableSchema.from_class(MyRecord, primary_key=["id"]),
)关键点:
- 每个组件独立运行
- 优先异步:所有组件优先使用/
coco_aio.mount_each();仅当叶子函数是CPU密集型(无I/O)时,使用同步coco_aio.mount()coco.mount() - 当需要子组件的返回值时,使用
use_mount() - 使用稳定路径以实现正确的记忆化
- 组件路径决定目标状态的归属
3. Function Memoization
3. 函数记忆化
Add to skip re-execution when inputs/code unchanged:
memo=Truepython
@coco.function(memo=True)
def expensive_operation(data: str) -> Result:
# LLM call, embedding generation, heavy computation
result = expensive_transform(data)
return result添加,当输入/代码未变更时跳过重新执行:
memo=Truepython
@coco.function(memo=True)
def expensive_operation(data: str) -> Result:
# LLM调用、嵌入生成、重型计算
result = expensive_transform(data)
return result4. Target States
4. 目标状态
Declare what should exist—CocoIndex handles creation/update/deletion:
python
undefined声明应该存在的内容——CocoIndex会处理创建/更新/删除:
python
undefinedFile target
文件目标
localfs.declare_file(outdir / "output.txt", content)
localfs.declare_file(outdir / "output.txt", content)
Database row target
数据库行目标
table.declare_row(row=MyRecord(id=1, name="example"))
table.declare_row(row=MyRecord(id=1, name="example"))
Vector point target (Qdrant)
向量点目标(Qdrant)
collection.declare_point(point=PointStruct(id="1", vector=[...]))
undefinedcollection.declare_point(point=PointStruct(id="1", vector=[...]))
undefined5. Context for Shared Resources
5. 共享资源上下文
Use to share expensive resources across components:
ContextKeypython
EMBEDDER = coco.ContextKey[SentenceTransformerEmbedder]("embedder")
@coco.lifespan
def coco_lifespan(builder: coco.EnvironmentBuilder):
embedder = SentenceTransformerEmbedder("all-MiniLM-L6-v2")
builder.provide(EMBEDDER, embedder)
yieldThe decorator registers the function to the default CocoIndex environment, which is shared among all apps by default.
@coco.lifespanpython
@coco.function
def process_item(text: str) -> None:
embedder = coco.use_context(EMBEDDER)
embedding = embedder.embed(text)使用在组件间共享昂贵的资源:
ContextKeypython
EMBEDDER = coco.ContextKey[SentenceTransformerEmbedder]("embedder")
@coco.lifespan
def coco_lifespan(builder: coco.EnvironmentBuilder):
embedder = SentenceTransformerEmbedder("all-MiniLM-L6-v2")
builder.provide(EMBEDDER, embedder)
yield@coco.lifespanpython
@coco.function
def process_item(text: str) -> None:
embedder = coco.use_context(EMBEDDER)
embedding = embedder.embed(text)6. ID Generation
6. ID生成
Generate stable, unique identifiers that persist across incremental updates:
python
from cocoindex.resources.id import generate_id, IdGenerator生成稳定、唯一的标识符,在增量更新中保持不变:
python
from cocoindex.resources.id import generate_id, IdGeneratorDeterministic: same dep → same ID
确定性:相同依赖 → 相同ID
chunk_id = generate_id(chunk.content)
chunk_id = generate_id(chunk.content)
Always distinct: each call → new ID, even with same dep
始终唯一:即使依赖相同,每次调用都会生成新ID
id_gen = IdGenerator()
for chunk in chunks:
chunk_id = id_gen.next_id(chunk.content)
table.declare_row(row=Row(id=chunk_id, content=chunk.content))
Use `generate_id(dep)` when same content should yield same ID. Use `IdGenerator` when you need distinct IDs even for duplicate content. See [ID Generation docs](https://cocoindex.io/docs-v1/resource_types#id-generation) for details.id_gen = IdGenerator()
for chunk in chunks:
chunk_id = id_gen.next_id(chunk.content)
table.declare_row(row=Row(id=chunk_id, content=chunk.content))
当相同内容应生成相同ID时,使用`generate_id(dep)`。当需要为重复内容生成不同ID时,使用`IdGenerator`。详情请参考[ID生成文档](https://cocoindex.io/docs-v1/resource_types#id-generation)。Common Pipeline Patterns
常见管道模式
Pattern 1: File Transformation
模式1:文件转换
Transform files from input to output directory:
python
import pathlib
import cocoindex as coco
import cocoindex.asyncio as coco_aio
from cocoindex.connectors import localfs
from cocoindex.resources.file import PatternFilePathMatcher
@coco.function(memo=True)
def process_file(file, outdir):
# CPU-bound transform — sync is fine here at the leaf
content = file.read_text()
transformed = transform_content(content) # Your logic
outname = file.file_path.path.stem + ".out"
localfs.declare_file(outdir / outname, transformed, create_parent_dirs=True)
@coco.function
async def app_main(sourcedir, outdir):
files = localfs.walk_dir(
sourcedir,
recursive=True,
path_matcher=PatternFilePathMatcher(
included_patterns=["*.txt", "*.md"],
excluded_patterns=[".*/**"],
),
)
await coco_aio.mount_each(process_file, files.items(), outdir)
app = coco_aio.App(coco_aio.AppConfig(name="Transform"), app_main, sourcedir=pathlib.Path("./data"), outdir=pathlib.Path("./out"))将输入目录中的文件转换到输出目录:
python
import pathlib
import cocoindex as coco
import cocoindex.asyncio as coco_aio
from cocoindex.connectors import localfs
from cocoindex.resources.file import PatternFilePathMatcher
@coco.function(memo=True)
def process_file(file, outdir):
# CPU密集型转换 — 叶子节点使用同步即可
content = file.read_text()
transformed = transform_content(content) # 你的自定义逻辑
outname = file.file_path.path.stem + ".out"
localfs.declare_file(outdir / outname, transformed, create_parent_dirs=True)
@coco.function
async def app_main(sourcedir, outdir):
files = localfs.walk_dir(
sourcedir,
recursive=True,
path_matcher=PatternFilePathMatcher(
included_patterns=["*.txt", "*.md"],
excluded_patterns=[".*/**"],
),
)
await coco_aio.mount_each(process_file, files.items(), outdir)
app = coco_aio.App(coco_aio.AppConfig(name="Transform"), app_main, sourcedir=pathlib.Path("./data"), outdir=pathlib.Path("./out"))Pattern 2: Vector Embedding Pipeline
模式2:向量嵌入管道
Chunk and embed documents for semantic search:
python
import pathlib
from dataclasses import dataclass
from typing import Annotated, AsyncIterator
import cocoindex as coco
import cocoindex.asyncio as coco_aio
from cocoindex.connectors import localfs, postgres
from cocoindex.ops.text import RecursiveSplitter
from cocoindex.ops.sentence_transformers import SentenceTransformerEmbedder
from cocoindex.resources.chunk import Chunk
from cocoindex.resources.file import FileLike, PatternFilePathMatcher
from cocoindex.resources.id import IdGenerator
from numpy.typing import NDArray
PG_DB = coco.ContextKey[postgres.PgDatabase]("pg_db")
_embedder = SentenceTransformerEmbedder("sentence-transformers/all-MiniLM-L6-v2")
_splitter = RecursiveSplitter()
@dataclass
class DocEmbedding:
id: int # Generated stable ID
filename: str
text: str
embedding: Annotated[NDArray, _embedder] # Auto-infer dimensions
chunk_start: int
chunk_end: int
@coco_aio.lifespan
async def coco_lifespan(builder: coco_aio.EnvironmentBuilder) -> AsyncIterator[None]:
async with await postgres.create_pool(DATABASE_URL) as pool:
builder.provide(PG_DB, postgres.register_db("embedding_db", pool))
yield
@coco.function
async def process_chunk(chunk: Chunk, filename: pathlib.PurePath, id_gen: IdGenerator, table):
table.declare_row(
row=DocEmbedding(
id=await id_gen.next_id(chunk.text),
filename=str(filename),
text=chunk.text,
embedding=await _embedder.embed(chunk.text),
chunk_start=chunk.start.char_offset,
chunk_end=chunk.end.char_offset,
),
)
@coco.function(memo=True)
async def process_file(file: FileLike, table):
text = file.read_text()
chunks = _splitter.split(text, chunk_size=1000, chunk_overlap=200)
id_gen = IdGenerator()
await coco_aio.map(process_chunk, chunks, file.file_path.path, id_gen, table)
@coco.function
async def app_main(sourcedir: pathlib.Path):
target_db = coco.use_context(PG_DB)
target_table = await target_db.mount_table_target(
table_name="embeddings",
table_schema=await postgres.TableSchema.from_class(
DocEmbedding, primary_key=["id"],
),
)
files = localfs.walk_dir(sourcedir, recursive=True)
await coco_aio.mount_each(process_file, files.items(), target_table)
app = coco_aio.App(coco_aio.AppConfig(name="Embedding"), app_main, sourcedir=Path("./data"))对文档进行分块并生成嵌入用于语义搜索:
python
import pathlib
from dataclasses import dataclass
from typing import Annotated, AsyncIterator
import cocoindex as coco
import cocoindex.asyncio as coco_aio
from cocoindex.connectors import localfs, postgres
from cocoindex.ops.text import RecursiveSplitter
from cocoindex.ops.sentence_transformers import SentenceTransformerEmbedder
from cocoindex.resources.chunk import Chunk
from cocoindex.resources.file import FileLike, PatternFilePathMatcher
from cocoindex.resources.id import IdGenerator
from numpy.typing import NDArray
PG_DB = coco.ContextKey[postgres.PgDatabase]("pg_db")
_embedder = SentenceTransformerEmbedder("sentence-transformers/all-MiniLM-L6-v2")
_splitter = RecursiveSplitter()
@dataclass
class DocEmbedding:
id: int # 生成的稳定ID
filename: str
text: str
embedding: Annotated[NDArray, _embedder] # 自动推断维度
chunk_start: int
chunk_end: int
@coco_aio.lifespan
async def coco_lifespan(builder: coco_aio.EnvironmentBuilder) -> AsyncIterator[None]:
async with await postgres.create_pool(DATABASE_URL) as pool:
builder.provide(PG_DB, postgres.register_db("embedding_db", pool))
yield
@coco.function
async def process_chunk(chunk: Chunk, filename: pathlib.PurePath, id_gen: IdGenerator, table):
table.declare_row(
row=DocEmbedding(
id=await id_gen.next_id(chunk.text),
filename=str(filename),
text=chunk.text,
embedding=await _embedder.embed(chunk.text),
chunk_start=chunk.start.char_offset,
chunk_end=chunk.end.char_offset,
),
)
@coco.function(memo=True)
async def process_file(file: FileLike, table):
text = file.read_text()
chunks = _splitter.split(text, chunk_size=1000, chunk_overlap=200)
id_gen = IdGenerator()
await coco_aio.map(process_chunk, chunks, file.file_path.path, id_gen, table)
@coco.function
async def app_main(sourcedir: pathlib.Path):
target_db = coco.use_context(PG_DB)
target_table = await target_db.mount_table_target(
table_name="embeddings",
table_schema=await postgres.TableSchema.from_class(
DocEmbedding, primary_key=["id"],
),
)
files = localfs.walk_dir(sourcedir, recursive=True)
await coco_aio.mount_each(process_file, files.items(), target_table)
app = coco_aio.App(coco_aio.AppConfig(name="Embedding"), app_main, sourcedir=Path("./data"))Pattern 3: LLM-Based Extraction
模式3:基于LLM的提取
Extract structured data using LLMs:
python
import instructor
from pydantic import BaseModel
from litellm import acompletion
_instructor_client = instructor.from_litellm(acompletion, mode=instructor.Mode.JSON)
class ExtractionResult(BaseModel):
title: str
topics: list[str]
@coco.function(memo=True) # Memo avoids re-calling LLM
async def extract_and_store(content, message_id, table):
result = await _instructor_client.chat.completions.create(
model="gpt-4",
response_model=ExtractionResult,
messages=[{"role": "user", "content": f"Extract topics: {content}"}],
)
table.declare_row(row=Message(id=message_id, title=result.title, content=content))使用LLM提取结构化数据:
python
import instructor
from pydantic import BaseModel
from litellm import acompletion
_instructor_client = instructor.from_litellm(acompletion, mode=instructor.Mode.JSON)
class ExtractionResult(BaseModel):
title: str
topics: list[str]
@coco.function(memo=True) # 记忆化避免重复调用LLM
async def extract_and_store(content, message_id, table):
result = await _instructor_client.chat.completions.create(
model="gpt-4",
response_model=ExtractionResult,
messages=[{"role": "user", "content": f"Extract topics: {content}"}],
)
table.declare_row(row=Message(id=message_id, title=result.title, content=content))Connectors and Operations
连接器与操作
CocoIndex v1 provides connectors for reading from and writing to various external systems including databases (SQL and vector), file systems, and more.
For detailed connector documentation, see:
- references/connectors.md - Complete connector reference with examples
- Pattern examples - Real-world usage in pipelines
- AI-optimized docs - Comprehensive online documentation
Text and Embedding Operations
文本与嵌入操作
Text Splitting
文本分块
python
from cocoindex.ops.text import RecursiveSplitter, detect_code_language
splitter = RecursiveSplitter()
language = detect_code_language(filename="example.py")
chunks = splitter.split(
text,
chunk_size=1000,
min_chunk_size=300,
chunk_overlap=200,
language=language, # Syntax-aware splitting
)python
from cocoindex.ops.text import RecursiveSplitter, detect_code_language
splitter = RecursiveSplitter()
language = detect_code_language(filename="example.py")
chunks = splitter.split(
text,
chunk_size=1000,
min_chunk_size=300,
chunk_overlap=200,
language=language, # 语法感知分块
)Embeddings
嵌入生成
python
from cocoindex.ops.sentence_transformers import SentenceTransformerEmbedder
embedder = SentenceTransformerEmbedder("sentence-transformers/all-MiniLM-L6-v2")python
from cocoindex.ops.sentence_transformers import SentenceTransformerEmbedder
embedder = SentenceTransformerEmbedder("sentence-transformers/all-MiniLM-L6-v2")Sync
同步
embedding = embedder.embed(text)
embedding = embedder.embed(text)
Async
异步
embedding = await embedder.embed_async(text)
undefinedembedding = await embedder.embed_async(text)
undefinedCLI Commands
CLI命令
Run Pipeline
运行管道
bash
cocoindex update main.py # Run app in main.py
cocoindex update main.py:my_app # Run specific app
cocoindex update my_module:my_app # Run from modulebash
cocoindex update main.py # 运行main.py中的应用
cocoindex update main.py:my_app # 运行特定应用
cocoindex update my_module:my_app # 从模块运行应用Drop All State
删除所有状态
bash
cocoindex drop main.py [-f] # Drop and resetbash
cocoindex drop main.py [-f] # 删除并重置状态List Apps
列出应用
bash
cocoindex ls main.py # List apps in file
cocoindex ls --db ./cocoindex.db # List apps in DBbash
cocoindex ls main.py # 列出文件中的应用
cocoindex ls --db ./cocoindex.db # 列出数据库中的应用Show Component Paths
显示组件路径
bash
cocoindex show main.py # Show component treebash
cocoindex show main.py # 显示组件树Best Practices
最佳实践
1. Use Stable Component Paths
1. 使用稳定的组件路径
python
undefinedpython
undefined✅ Good: Stable identifiers
✅ 推荐:稳定的标识符
coco.component_subpath("file", str(file.file_path.path))
coco.component_subpath("record", record.id)
coco.component_subpath("file", str(file.file_path.path))
coco.component_subpath("record", record.id)
❌ Bad: Unstable identifiers
❌ 不推荐:不稳定的标识符
coco.component_subpath("file", file) # Object reference
coco.component_subpath("idx", idx) # Index changes
undefinedcoco.component_subpath("file", file) # 对象引用
coco.component_subpath("idx", idx) # 索引会变化
undefined2. Add Memoization for Expensive Operations
2. 为昂贵的操作添加记忆化
python
undefinedpython
undefined✅ Good: Memoize expensive ops
✅ 推荐:为昂贵操作添加记忆化
@coco.function(memo=True)
async def process_chunk(chunk, table):
embedding = await embedder.embed_async(chunk.text) # Expensive!
table.declare_row(...)
@coco.function(memo=True)
async def process_chunk(chunk, table):
embedding = await embedder.embed_async(chunk.text) # 昂贵操作!
table.declare_row(...)
❌ Bad: No memoization
❌ 不推荐:无记忆化
@coco.function # Re-embeds every run
async def process_chunk(chunk, table):
embedding = await embedder.embed_async(chunk.text)
undefined@coco.function # 每次运行都会重新生成嵌入
async def process_chunk(chunk, table):
embedding = await embedder.embed_async(chunk.text)
undefined3. Use Context for Shared Resources
3. 使用上下文共享资源
python
undefinedpython
undefined✅ Good: Load model once
✅ 推荐:仅加载一次模型
@coco.lifespan
def coco_lifespan(builder):
model = load_expensive_model()
builder.provide(MODEL_KEY, model)
yield
@coco.lifespan
def coco_lifespan(builder):
model = load_expensive_model()
builder.provide(MODEL_KEY, model)
yield
❌ Bad: Load model every time
❌ 不推荐:每次都加载模型
@coco.function
def process(data):
model = load_expensive_model() # Loaded repeatedly!
undefined@coco.function
def process(data):
model = load_expensive_model() # 重复加载!
undefined4. Use Type Annotations
4. 使用类型注解
python
undefinedpython
undefined✅ Good: Type-safe
✅ 推荐:类型安全
from dataclasses import dataclass
from typing import Annotated
from numpy.typing import NDArray
@dataclass
class Record:
id: int
name: str
vector: Annotated[NDArray, embedder] # Auto-infer dimensions
from dataclasses import dataclass
from typing import Annotated
from numpy.typing import NDArray
@dataclass
class Record:
id: int
name: str
vector: Annotated[NDArray, embedder] # 自动推断维度
❌ Bad: No type safety
❌ 不推荐:无类型安全
record = {"id": 1, "name": "example", "vector": [...]}
undefinedrecord = {"id": 1, "name": "example", "vector": [...]}
undefined5. Use Convenience APIs for Targets and Iteration
5. 使用便捷API处理目标与迭代
python
undefinedpython
undefinedTarget setup — subpath is automatic
目标设置 — 子路径自动生成
table = await target_db.mount_table_target(
table_name="my_table",
table_schema=await postgres.TableSchema.from_class(MyRecord, primary_key=["id"]),
)
table = await target_db.mount_table_target(
table_name="my_table",
table_schema=await postgres.TableSchema.from_class(MyRecord, primary_key=["id"]),
)
Iterate with mount_each — keys become component subpaths
使用mount_each迭代 — 键会成为组件子路径
await coco_aio.mount_each(process_item, items.items(), table)
undefinedawait coco_aio.mount_each(process_item, items.items(), table)
undefined6. Prefer Async Mount
6. 优先使用异步挂载
python
undefinedpython
undefined✅ Default: async mount for I/O-bound or general-purpose components
✅ 默认:对I/O密集型或通用组件使用异步挂载
@coco.function
async def app_main(sourcedir):
await coco_aio.mount_each(process_file, files.items(), table) # list of items
await coco_aio.mount(coco.component_subpath("setup"), setup_fn) # single component
@coco.function
async def app_main(sourcedir):
await coco_aio.mount_each(process_file, files.items(), table) # 项列表
await coco_aio.mount(coco.component_subpath("setup"), setup_fn) # 单个组件
✅ Sync mount only when the leaf function is CPU-intensive (no I/O)
✅ 仅当叶子函数是CPU密集型(无I/O)时使用同步挂载
@coco.function(memo=True)
def cpu_heavy_leaf(data: str) -> Result:
return expensive_computation(data) # Pure CPU work, no async needed
@coco.function(memo=True)
def cpu_heavy_leaf(data: str) -> Result:
return expensive_computation(data) # 纯CPU工作,无需异步
❌ Don't use sync mount inside async app_main for general components
❌ 不要在异步app_main中对通用组件使用同步挂载
@coco.function
async def app_main(sourcedir):
for key, f in files.items():
coco.mount(coco.component_subpath(key), process_file, f) # Use await coco_aio.mount() instead
undefined@coco.function
async def app_main(sourcedir):
for key, f in files.items():
coco.mount(coco.component_subpath(key), process_file, f) # 应使用await coco_aio.mount()
undefinedMigration from Old API
从旧API迁移
| Before | After |
|---|---|
| |
| |
| |
| |
| 旧API | 新API |
|---|---|
| |
| |
| |
| |
Troubleshooting
故障排除
"Module not found" Error
"Module not found"错误
Ensure pyproject.toml has pre-release config:
toml
[tool.uv]
prerelease = "explicit"确保pyproject.toml包含预发布配置:
toml
[tool.uv]
prerelease = "explicit"PostgreSQL pgvector Not Found
PostgreSQL pgvector未找到
Enable the pgvector extension:
bash
undefined启用pgvector扩展:
bash
undefinedConnect to your database and enable the extension
连接到数据库并启用扩展
psql "postgres://localhost/db" -c "CREATE EXTENSION IF NOT EXISTS vector;"
See [references/setup_database.md](references/setup_database.md) for detailed setup instructions.psql "postgres://localhost/db" -c "CREATE EXTENSION IF NOT EXISTS vector;"
详细设置说明请参考[references/setup_database.md](references/setup_database.md)。Memoization Not Working
记忆化不生效
Check component paths are stable:
python
undefined检查组件路径是否稳定:
python
undefinedUse stable IDs, not object references
使用稳定ID,而非对象引用
coco.component_subpath(file.stable_key) # ✅
coco.component_subpath(file) # ❌
undefinedcoco.component_subpath(file.stable_key) # ✅
coco.component_subpath(file) # ❌
undefinedEverything Reprocessing
所有内容都被重新处理
Add to expensive functions:
memo=Truepython
@coco.function(memo=True) # Add this
async def process_item(item):
...为昂贵的函数添加:
memo=Truepython
@coco.function(memo=True) # 添加此参数
async def process_item(item):
...Resources
资源
references/
references/
- setup_project.md: Project setup guide with dependency examples for different use cases
- setup_database.md: Database setup guide (PostgreSQL, SQLite, LanceDB, Qdrant)
- connectors.md: Complete connector reference with usage examples
- patterns.md: Detailed pipeline patterns with full working code
- api_reference.md: Quick API reference for common functions
- setup_project.md:项目设置指南,包含不同场景的依赖示例
- setup_database.md:数据库设置指南(PostgreSQL、SQLite、LanceDB、Qdrant)
- connectors.md:完整的连接器参考及使用示例
- patterns.md:详细的管道模式及完整可运行代码
- api_reference.md:常用函数的快速API参考
assets/
assets/
- simple-template/: Minimal project template structure
- simple-template/:最小化项目模板结构
Additional Resources
其他资源
For AI Agents:
- AI-Optimized Documentation - Comprehensive documentation optimized for LLM consumption
For Humans:
- CocoIndex Documentation - Full documentation site
- Programming Guide - Core concepts and patterns
- GitHub Examples - Real-world example projects
- CocoIndex on PyPI - Package repository (pre-release)
针对AI Agent:
- AI优化文档 - 针对LLM消费优化的全面文档
针对人类开发者:
- CocoIndex文档 - 完整的文档站点
- 编程指南 - 核心概念与模式
- GitHub示例 - 真实世界的示例项目
- PyPI上的CocoIndex - 包仓库(预发布)
Version Note
版本说明
This skill is for CocoIndex v1 (pre-release: ). It uses a completely different API from v0. Key differences:
>=1.0.0a1- Python-native (no DSL)
- Any Python types supported
- No flow definitions required
- More flexible and seamless experience
本技能适用于CocoIndex v1(预发布版本:)。它使用与v0完全不同的API。主要差异:
>=1.0.0a1- 原生Python实现(无DSL)
- 支持所有Python类型
- 无需定义流
- 更灵活、更无缝的体验