cocoindex-v1

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

CocoIndex v1

CocoIndex v1

CocoIndex v1 is a Python library for building incremental data processing pipelines with declarative target states. Think spreadsheets or React for data pipelines: declare what the output should look like based on current input, and CocoIndex automatically handles incremental updates, change detection, and syncing to external systems.
CocoIndex v1是一个用于构建基于声明式目标状态的增量数据处理管道的Python库。可以把它想象成数据管道领域的电子表格或React:只需根据当前输入声明输出应该是什么样子,CocoIndex就会自动处理增量更新、变更检测以及与外部系统的同步。

Overview

概述

CocoIndex v1 enables building data pipelines that:
  • Automatically handle incremental updates: Only reprocess changed data
  • Use declarative target states: Declare what should exist, not how to update
  • Support any Python types: No custom DSL—use dataclasses, Pydantic, NamedTuple
  • Provide function memoization: Skip expensive operations when inputs/code unchanged
  • Sync to multiple targets: PostgreSQL, SQLite, LanceDB, Qdrant, file systems
Key principle:
TargetState = Transform(SourceState)
CocoIndex v1可构建具备以下特性的数据管道:
  • 自动处理增量更新:仅重新处理变更的数据
  • 使用声明式目标状态:声明应该存在的结果,而非更新的方式
  • 支持所有Python类型:无需自定义DSL——可使用dataclasses、Pydantic、NamedTuple
  • 提供函数记忆化:当输入/代码未变更时,跳过昂贵的操作
  • 同步到多个目标系统:PostgreSQL、SQLite、LanceDB、Qdrant、文件系统
核心原则
TargetState = Transform(SourceState)

When to Use This Skill

何时使用本技能

Use this skill when building pipelines that involve:
  • Document processing: PDF/Markdown conversion, text extraction, chunking
  • Vector embeddings: Embedding documents/code for semantic search
  • Database transformations: ETL from source DB to target DB
  • Knowledge graphs: Extract entities and relationships from data
  • LLM-based extraction: Structured data extraction using LLMs
  • File-based pipelines: Transform files from one format to another
  • Incremental indexing: Keep search indexes up-to-date with source changes
当你构建涉及以下场景的管道时,可使用本技能:
  • 文档处理:PDF/Markdown转换、文本提取、分块
  • 向量嵌入:为文档/代码生成嵌入用于语义搜索
  • 数据库转换:从源数据库到目标数据库的ETL
  • 知识图谱:从数据中提取实体和关系
  • 基于LLM的提取:使用LLM提取结构化数据
  • 基于文件的管道:将文件从一种格式转换为另一种
  • 增量索引:保持搜索索引与源数据变更同步

Quick Start: Creating a New Project

快速开始:创建新项目

Initialize Project

初始化项目

Use the built-in CLI to create a new project:
bash
cocoindex init my-project
cd my-project
This creates:
  • main.py
    - Main app definition
  • pyproject.toml
    - Dependencies with pre-release config
  • .env
    - Environment configuration
  • README.md
    - Quick start guide
使用内置CLI创建新项目:
bash
cocoindex init my-project
cd my-project
这会创建以下文件:
  • main.py
    - 主应用定义
  • pyproject.toml
    - 包含预发布配置的依赖文件
  • .env
    - 环境配置文件
  • README.md
    - 快速入门指南

Add Dependencies for Specific Use Cases

根据特定场景添加依赖

Add dependencies to
pyproject.toml
based on your needs:
toml
undefined
根据你的需求,向
pyproject.toml
添加依赖:
toml
undefined

For vector embeddings

用于向量嵌入

dependencies = ["cocoindex>=1.0.0a1", "sentence-transformers", "asyncpg"]
dependencies = ["cocoindex>=1.0.0a1", "sentence-transformers", "asyncpg"]

For PostgreSQL only

仅用于PostgreSQL

dependencies = ["cocoindex>=1.0.0a1", "asyncpg"]
dependencies = ["cocoindex>=1.0.0a1", "asyncpg"]

For LLM extraction

用于LLM提取

dependencies = ["cocoindex>=1.0.0a1", "litellm", "instructor", "pydantic>=2.0"]

See [references/setup_project.md](references/setup_project.md) for complete examples.
dependencies = ["cocoindex>=1.0.0a1", "litellm", "instructor", "pydantic>=2.0"]

完整示例请参考[references/setup_project.md](references/setup_project.md)。

Set Up Database (if using Postgres/Qdrant)

设置数据库(如果使用Postgres/Qdrant)

For PostgreSQL with Docker:
bash
undefined
使用Docker设置PostgreSQL:
bash
undefined

Create docker-compose.yml with pgvector image

创建包含pgvector镜像的docker-compose.yml

docker-compose up -d

For Qdrant with Docker:

```bash
docker-compose up -d

使用Docker设置Qdrant:

```bash

Create docker-compose.yml with Qdrant image

创建包含Qdrant镜像的docker-compose.yml

docker-compose up -d

See [references/setup_database.md](references/setup_database.md) for detailed setup instructions.
docker-compose up -d

详细设置说明请参考[references/setup_database.md](references/setup_database.md)。

Run the Pipeline

运行管道

bash
pip install -e .
cocoindex update main.py
bash
pip install -e .
cocoindex update main.py

Core Concepts

核心概念

1. Apps

1. 应用(Apps)

An app is the top-level executable that binds a main function with parameters:
python
import cocoindex as coco

@coco.function
def app_main(sourcedir: pathlib.Path, outdir: pathlib.Path) -> None:
    # Processing logic here
    ...

app = coco.App(
    coco.AppConfig(name="MyApp"),
    app_main,
    sourcedir=pathlib.Path("./data"),
    outdir=pathlib.Path("./output"),
)

if __name__ == "__main__":
    app.update(report_to_stdout=True)
应用是顶级可执行对象,将主函数与参数绑定:
python
import cocoindex as coco

@coco.function
def app_main(sourcedir: pathlib.Path, outdir: pathlib.Path) -> None:
    # 处理逻辑
    ...

app = coco.App(
    coco.AppConfig(name="MyApp"),
    app_main,
    sourcedir=pathlib.Path("./data"),
    outdir=pathlib.Path("./output"),
)

if __name__ == "__main__":
    app.update(report_to_stdout=True)

2. Processing Components

2. 处理组件

A processing component groups an item's processing with its target states.
Mount independent components with
coco_aio.mount_each()
(preferred) or
coco_aio.mount()
:
python
undefined
处理组件将单个项的处理逻辑与其目标状态分组。
使用
coco_aio.mount_each()
(推荐)或
coco_aio.mount()
挂载独立组件
python
undefined

Preferred: mount one component per item (async, keyed iterable)

推荐:为每个项挂载一个组件(异步,带键的可迭代对象)

await coco_aio.mount_each(process_file, files.items(), target_table)
await coco_aio.mount_each(process_file, files.items(), target_table)

Equivalent async manual loop

等效的异步手动循环

for key, f in files.items(): await coco_aio.mount(coco.component_subpath(key), process_file, f, target_table)
for key, f in files.items(): await coco_aio.mount(coco.component_subpath(key), process_file, f, target_table)

Sync mount — only for CPU-intensive leaf components (no I/O)

同步挂载 — 仅适用于CPU密集型叶子组件(无I/O)

coco.mount(coco.component_subpath(str(f.file_path.path)), process_file, f, target_table)

**Mount dependent components** with `use_mount()` when you need the return value:

```python
result = await coco_aio.use_mount(subpath, fn, *args)
Mount targets using connector convenience methods (async, subpath is automatic):
python
target_table = await target_db.mount_table_target(
    table_name="my_table",
    table_schema=await postgres.TableSchema.from_class(MyRecord, primary_key=["id"]),
)
Key points:
  • Each component runs independently
  • Async-first: prefer
    coco_aio.mount_each()
    /
    coco_aio.mount()
    for all components; use sync
    coco.mount()
    only for CPU-intensive leaf work (no I/O)
  • Use
    use_mount()
    when you need the return value of a child component
  • Use stable paths for proper memoization
  • Component path determines target state ownership
coco.mount(coco.component_subpath(str(f.file_path.path)), process_file, f, target_table)

当你需要子组件的返回值时,使用`use_mount()`**挂载依赖组件**:

```python
result = await coco_aio.use_mount(subpath, fn, *args)
使用连接器便捷方法挂载目标(异步,子路径自动生成):
python
target_table = await target_db.mount_table_target(
    table_name="my_table",
    table_schema=await postgres.TableSchema.from_class(MyRecord, primary_key=["id"]),
)
关键点
  • 每个组件独立运行
  • 优先异步:所有组件优先使用
    coco_aio.mount_each()
    /
    coco_aio.mount()
    ;仅当叶子函数是CPU密集型(无I/O)时,使用同步
    coco.mount()
  • 当需要子组件的返回值时,使用
    use_mount()
  • 使用稳定路径以实现正确的记忆化
  • 组件路径决定目标状态的归属

3. Function Memoization

3. 函数记忆化

Add
memo=True
to skip re-execution when inputs/code unchanged:
python
@coco.function(memo=True)
def expensive_operation(data: str) -> Result:
    # LLM call, embedding generation, heavy computation
    result = expensive_transform(data)
    return result
添加
memo=True
,当输入/代码未变更时跳过重新执行:
python
@coco.function(memo=True)
def expensive_operation(data: str) -> Result:
    # LLM调用、嵌入生成、重型计算
    result = expensive_transform(data)
    return result

4. Target States

4. 目标状态

Declare what should exist—CocoIndex handles creation/update/deletion:
python
undefined
声明应该存在的内容——CocoIndex会处理创建/更新/删除:
python
undefined

File target

文件目标

localfs.declare_file(outdir / "output.txt", content)
localfs.declare_file(outdir / "output.txt", content)

Database row target

数据库行目标

table.declare_row(row=MyRecord(id=1, name="example"))
table.declare_row(row=MyRecord(id=1, name="example"))

Vector point target (Qdrant)

向量点目标(Qdrant)

collection.declare_point(point=PointStruct(id="1", vector=[...]))
undefined
collection.declare_point(point=PointStruct(id="1", vector=[...]))
undefined

5. Context for Shared Resources

5. 共享资源上下文

Use
ContextKey
to share expensive resources across components:
python
EMBEDDER = coco.ContextKey[SentenceTransformerEmbedder]("embedder")

@coco.lifespan
def coco_lifespan(builder: coco.EnvironmentBuilder):
    embedder = SentenceTransformerEmbedder("all-MiniLM-L6-v2")
    builder.provide(EMBEDDER, embedder)
    yield
The
@coco.lifespan
decorator registers the function to the default CocoIndex environment, which is shared among all apps by default.
python
@coco.function
def process_item(text: str) -> None:
    embedder = coco.use_context(EMBEDDER)
    embedding = embedder.embed(text)
使用
ContextKey
在组件间共享昂贵的资源:
python
EMBEDDER = coco.ContextKey[SentenceTransformerEmbedder]("embedder")

@coco.lifespan
def coco_lifespan(builder: coco.EnvironmentBuilder):
    embedder = SentenceTransformerEmbedder("all-MiniLM-L6-v2")
    builder.provide(EMBEDDER, embedder)
    yield
@coco.lifespan
装饰器会将函数注册到默认的CocoIndex环境,该环境默认在所有应用间共享。
python
@coco.function
def process_item(text: str) -> None:
    embedder = coco.use_context(EMBEDDER)
    embedding = embedder.embed(text)

6. ID Generation

6. ID生成

Generate stable, unique identifiers that persist across incremental updates:
python
from cocoindex.resources.id import generate_id, IdGenerator
生成稳定、唯一的标识符,在增量更新中保持不变:
python
from cocoindex.resources.id import generate_id, IdGenerator

Deterministic: same dep → same ID

确定性:相同依赖 → 相同ID

chunk_id = generate_id(chunk.content)
chunk_id = generate_id(chunk.content)

Always distinct: each call → new ID, even with same dep

始终唯一:即使依赖相同,每次调用都会生成新ID

id_gen = IdGenerator() for chunk in chunks: chunk_id = id_gen.next_id(chunk.content) table.declare_row(row=Row(id=chunk_id, content=chunk.content))

Use `generate_id(dep)` when same content should yield same ID. Use `IdGenerator` when you need distinct IDs even for duplicate content. See [ID Generation docs](https://cocoindex.io/docs-v1/resource_types#id-generation) for details.
id_gen = IdGenerator() for chunk in chunks: chunk_id = id_gen.next_id(chunk.content) table.declare_row(row=Row(id=chunk_id, content=chunk.content))

当相同内容应生成相同ID时,使用`generate_id(dep)`。当需要为重复内容生成不同ID时,使用`IdGenerator`。详情请参考[ID生成文档](https://cocoindex.io/docs-v1/resource_types#id-generation)。

Common Pipeline Patterns

常见管道模式

Pattern 1: File Transformation

模式1:文件转换

Transform files from input to output directory:
python
import pathlib
import cocoindex as coco
import cocoindex.asyncio as coco_aio
from cocoindex.connectors import localfs
from cocoindex.resources.file import PatternFilePathMatcher

@coco.function(memo=True)
def process_file(file, outdir):
    # CPU-bound transform — sync is fine here at the leaf
    content = file.read_text()
    transformed = transform_content(content)  # Your logic
    outname = file.file_path.path.stem + ".out"
    localfs.declare_file(outdir / outname, transformed, create_parent_dirs=True)

@coco.function
async def app_main(sourcedir, outdir):
    files = localfs.walk_dir(
        sourcedir,
        recursive=True,
        path_matcher=PatternFilePathMatcher(
            included_patterns=["*.txt", "*.md"],
            excluded_patterns=[".*/**"],
        ),
    )
    await coco_aio.mount_each(process_file, files.items(), outdir)

app = coco_aio.App(coco_aio.AppConfig(name="Transform"), app_main, sourcedir=pathlib.Path("./data"), outdir=pathlib.Path("./out"))
将输入目录中的文件转换到输出目录:
python
import pathlib
import cocoindex as coco
import cocoindex.asyncio as coco_aio
from cocoindex.connectors import localfs
from cocoindex.resources.file import PatternFilePathMatcher

@coco.function(memo=True)
def process_file(file, outdir):
    # CPU密集型转换 — 叶子节点使用同步即可
    content = file.read_text()
    transformed = transform_content(content)  # 你的自定义逻辑
    outname = file.file_path.path.stem + ".out"
    localfs.declare_file(outdir / outname, transformed, create_parent_dirs=True)

@coco.function
async def app_main(sourcedir, outdir):
    files = localfs.walk_dir(
        sourcedir,
        recursive=True,
        path_matcher=PatternFilePathMatcher(
            included_patterns=["*.txt", "*.md"],
            excluded_patterns=[".*/**"],
        ),
    )
    await coco_aio.mount_each(process_file, files.items(), outdir)

app = coco_aio.App(coco_aio.AppConfig(name="Transform"), app_main, sourcedir=pathlib.Path("./data"), outdir=pathlib.Path("./out"))

Pattern 2: Vector Embedding Pipeline

模式2:向量嵌入管道

Chunk and embed documents for semantic search:
python
import pathlib
from dataclasses import dataclass
from typing import Annotated, AsyncIterator
import cocoindex as coco
import cocoindex.asyncio as coco_aio
from cocoindex.connectors import localfs, postgres
from cocoindex.ops.text import RecursiveSplitter
from cocoindex.ops.sentence_transformers import SentenceTransformerEmbedder
from cocoindex.resources.chunk import Chunk
from cocoindex.resources.file import FileLike, PatternFilePathMatcher
from cocoindex.resources.id import IdGenerator
from numpy.typing import NDArray

PG_DB = coco.ContextKey[postgres.PgDatabase]("pg_db")
_embedder = SentenceTransformerEmbedder("sentence-transformers/all-MiniLM-L6-v2")
_splitter = RecursiveSplitter()

@dataclass
class DocEmbedding:
    id: int  # Generated stable ID
    filename: str
    text: str
    embedding: Annotated[NDArray, _embedder]  # Auto-infer dimensions
    chunk_start: int
    chunk_end: int

@coco_aio.lifespan
async def coco_lifespan(builder: coco_aio.EnvironmentBuilder) -> AsyncIterator[None]:
    async with await postgres.create_pool(DATABASE_URL) as pool:
        builder.provide(PG_DB, postgres.register_db("embedding_db", pool))
        yield

@coco.function
async def process_chunk(chunk: Chunk, filename: pathlib.PurePath, id_gen: IdGenerator, table):
    table.declare_row(
        row=DocEmbedding(
            id=await id_gen.next_id(chunk.text),
            filename=str(filename),
            text=chunk.text,
            embedding=await _embedder.embed(chunk.text),
            chunk_start=chunk.start.char_offset,
            chunk_end=chunk.end.char_offset,
        ),
    )

@coco.function(memo=True)
async def process_file(file: FileLike, table):
    text = file.read_text()
    chunks = _splitter.split(text, chunk_size=1000, chunk_overlap=200)
    id_gen = IdGenerator()
    await coco_aio.map(process_chunk, chunks, file.file_path.path, id_gen, table)

@coco.function
async def app_main(sourcedir: pathlib.Path):
    target_db = coco.use_context(PG_DB)
    target_table = await target_db.mount_table_target(
        table_name="embeddings",
        table_schema=await postgres.TableSchema.from_class(
            DocEmbedding, primary_key=["id"],
        ),
    )

    files = localfs.walk_dir(sourcedir, recursive=True)
    await coco_aio.mount_each(process_file, files.items(), target_table)

app = coco_aio.App(coco_aio.AppConfig(name="Embedding"), app_main, sourcedir=Path("./data"))
对文档进行分块并生成嵌入用于语义搜索:
python
import pathlib
from dataclasses import dataclass
from typing import Annotated, AsyncIterator
import cocoindex as coco
import cocoindex.asyncio as coco_aio
from cocoindex.connectors import localfs, postgres
from cocoindex.ops.text import RecursiveSplitter
from cocoindex.ops.sentence_transformers import SentenceTransformerEmbedder
from cocoindex.resources.chunk import Chunk
from cocoindex.resources.file import FileLike, PatternFilePathMatcher
from cocoindex.resources.id import IdGenerator
from numpy.typing import NDArray

PG_DB = coco.ContextKey[postgres.PgDatabase]("pg_db")
_embedder = SentenceTransformerEmbedder("sentence-transformers/all-MiniLM-L6-v2")
_splitter = RecursiveSplitter()

@dataclass
class DocEmbedding:
    id: int  # 生成的稳定ID
    filename: str
    text: str
    embedding: Annotated[NDArray, _embedder]  # 自动推断维度
    chunk_start: int
    chunk_end: int

@coco_aio.lifespan
async def coco_lifespan(builder: coco_aio.EnvironmentBuilder) -> AsyncIterator[None]:
    async with await postgres.create_pool(DATABASE_URL) as pool:
        builder.provide(PG_DB, postgres.register_db("embedding_db", pool))
        yield

@coco.function
async def process_chunk(chunk: Chunk, filename: pathlib.PurePath, id_gen: IdGenerator, table):
    table.declare_row(
        row=DocEmbedding(
            id=await id_gen.next_id(chunk.text),
            filename=str(filename),
            text=chunk.text,
            embedding=await _embedder.embed(chunk.text),
            chunk_start=chunk.start.char_offset,
            chunk_end=chunk.end.char_offset,
        ),
    )

@coco.function(memo=True)
async def process_file(file: FileLike, table):
    text = file.read_text()
    chunks = _splitter.split(text, chunk_size=1000, chunk_overlap=200)
    id_gen = IdGenerator()
    await coco_aio.map(process_chunk, chunks, file.file_path.path, id_gen, table)

@coco.function
async def app_main(sourcedir: pathlib.Path):
    target_db = coco.use_context(PG_DB)
    target_table = await target_db.mount_table_target(
        table_name="embeddings",
        table_schema=await postgres.TableSchema.from_class(
            DocEmbedding, primary_key=["id"],
        ),
    )

    files = localfs.walk_dir(sourcedir, recursive=True)
    await coco_aio.mount_each(process_file, files.items(), target_table)

app = coco_aio.App(coco_aio.AppConfig(name="Embedding"), app_main, sourcedir=Path("./data"))

Pattern 3: LLM-Based Extraction

模式3:基于LLM的提取

Extract structured data using LLMs:
python
import instructor
from pydantic import BaseModel
from litellm import acompletion

_instructor_client = instructor.from_litellm(acompletion, mode=instructor.Mode.JSON)

class ExtractionResult(BaseModel):
    title: str
    topics: list[str]

@coco.function(memo=True)  # Memo avoids re-calling LLM
async def extract_and_store(content, message_id, table):
    result = await _instructor_client.chat.completions.create(
        model="gpt-4",
        response_model=ExtractionResult,
        messages=[{"role": "user", "content": f"Extract topics: {content}"}],
    )
    table.declare_row(row=Message(id=message_id, title=result.title, content=content))
使用LLM提取结构化数据:
python
import instructor
from pydantic import BaseModel
from litellm import acompletion

_instructor_client = instructor.from_litellm(acompletion, mode=instructor.Mode.JSON)

class ExtractionResult(BaseModel):
    title: str
    topics: list[str]

@coco.function(memo=True)  # 记忆化避免重复调用LLM
async def extract_and_store(content, message_id, table):
    result = await _instructor_client.chat.completions.create(
        model="gpt-4",
        response_model=ExtractionResult,
        messages=[{"role": "user", "content": f"Extract topics: {content}"}],
    )
    table.declare_row(row=Message(id=message_id, title=result.title, content=content))

Connectors and Operations

连接器与操作

CocoIndex v1 provides connectors for reading from and writing to various external systems including databases (SQL and vector), file systems, and more.
For detailed connector documentation, see:
  • references/connectors.md - Complete connector reference with examples
  • Pattern examples - Real-world usage in pipelines
  • AI-optimized docs - Comprehensive online documentation
CocoIndex v1提供了用于读取和写入各种外部系统的连接器,包括数据库(SQL和向量数据库)、文件系统等。
详细的连接器文档请参考:

Text and Embedding Operations

文本与嵌入操作

Text Splitting

文本分块

python
from cocoindex.ops.text import RecursiveSplitter, detect_code_language

splitter = RecursiveSplitter()
language = detect_code_language(filename="example.py")

chunks = splitter.split(
    text,
    chunk_size=1000,
    min_chunk_size=300,
    chunk_overlap=200,
    language=language,  # Syntax-aware splitting
)
python
from cocoindex.ops.text import RecursiveSplitter, detect_code_language

splitter = RecursiveSplitter()
language = detect_code_language(filename="example.py")

chunks = splitter.split(
    text,
    chunk_size=1000,
    min_chunk_size=300,
    chunk_overlap=200,
    language=language,  # 语法感知分块
)

Embeddings

嵌入生成

python
from cocoindex.ops.sentence_transformers import SentenceTransformerEmbedder

embedder = SentenceTransformerEmbedder("sentence-transformers/all-MiniLM-L6-v2")
python
from cocoindex.ops.sentence_transformers import SentenceTransformerEmbedder

embedder = SentenceTransformerEmbedder("sentence-transformers/all-MiniLM-L6-v2")

Sync

同步

embedding = embedder.embed(text)
embedding = embedder.embed(text)

Async

异步

embedding = await embedder.embed_async(text)
undefined
embedding = await embedder.embed_async(text)
undefined

CLI Commands

CLI命令

Run Pipeline

运行管道

bash
cocoindex update main.py              # Run app in main.py
cocoindex update main.py:my_app       # Run specific app
cocoindex update my_module:my_app     # Run from module
bash
cocoindex update main.py              # 运行main.py中的应用
cocoindex update main.py:my_app       # 运行特定应用
cocoindex update my_module:my_app     # 从模块运行应用

Drop All State

删除所有状态

bash
cocoindex drop main.py [-f]           # Drop and reset
bash
cocoindex drop main.py [-f]           # 删除并重置状态

List Apps

列出应用

bash
cocoindex ls main.py                  # List apps in file
cocoindex ls --db ./cocoindex.db      # List apps in DB
bash
cocoindex ls main.py                  # 列出文件中的应用
cocoindex ls --db ./cocoindex.db      # 列出数据库中的应用

Show Component Paths

显示组件路径

bash
cocoindex show main.py                # Show component tree
bash
cocoindex show main.py                # 显示组件树

Best Practices

最佳实践

1. Use Stable Component Paths

1. 使用稳定的组件路径

python
undefined
python
undefined

✅ Good: Stable identifiers

✅ 推荐:稳定的标识符

coco.component_subpath("file", str(file.file_path.path)) coco.component_subpath("record", record.id)
coco.component_subpath("file", str(file.file_path.path)) coco.component_subpath("record", record.id)

❌ Bad: Unstable identifiers

❌ 不推荐:不稳定的标识符

coco.component_subpath("file", file) # Object reference coco.component_subpath("idx", idx) # Index changes
undefined
coco.component_subpath("file", file) # 对象引用 coco.component_subpath("idx", idx) # 索引会变化
undefined

2. Add Memoization for Expensive Operations

2. 为昂贵的操作添加记忆化

python
undefined
python
undefined

✅ Good: Memoize expensive ops

✅ 推荐:为昂贵操作添加记忆化

@coco.function(memo=True) async def process_chunk(chunk, table): embedding = await embedder.embed_async(chunk.text) # Expensive! table.declare_row(...)
@coco.function(memo=True) async def process_chunk(chunk, table): embedding = await embedder.embed_async(chunk.text) # 昂贵操作! table.declare_row(...)

❌ Bad: No memoization

❌ 不推荐:无记忆化

@coco.function # Re-embeds every run async def process_chunk(chunk, table): embedding = await embedder.embed_async(chunk.text)
undefined
@coco.function # 每次运行都会重新生成嵌入 async def process_chunk(chunk, table): embedding = await embedder.embed_async(chunk.text)
undefined

3. Use Context for Shared Resources

3. 使用上下文共享资源

python
undefined
python
undefined

✅ Good: Load model once

✅ 推荐:仅加载一次模型

@coco.lifespan def coco_lifespan(builder): model = load_expensive_model() builder.provide(MODEL_KEY, model) yield
@coco.lifespan def coco_lifespan(builder): model = load_expensive_model() builder.provide(MODEL_KEY, model) yield

❌ Bad: Load model every time

❌ 不推荐:每次都加载模型

@coco.function def process(data): model = load_expensive_model() # Loaded repeatedly!
undefined
@coco.function def process(data): model = load_expensive_model() # 重复加载!
undefined

4. Use Type Annotations

4. 使用类型注解

python
undefined
python
undefined

✅ Good: Type-safe

✅ 推荐:类型安全

from dataclasses import dataclass from typing import Annotated from numpy.typing import NDArray
@dataclass class Record: id: int name: str vector: Annotated[NDArray, embedder] # Auto-infer dimensions
from dataclasses import dataclass from typing import Annotated from numpy.typing import NDArray
@dataclass class Record: id: int name: str vector: Annotated[NDArray, embedder] # 自动推断维度

❌ Bad: No type safety

❌ 不推荐:无类型安全

record = {"id": 1, "name": "example", "vector": [...]}
undefined
record = {"id": 1, "name": "example", "vector": [...]}
undefined

5. Use Convenience APIs for Targets and Iteration

5. 使用便捷API处理目标与迭代

python
undefined
python
undefined

Target setup — subpath is automatic

目标设置 — 子路径自动生成

table = await target_db.mount_table_target( table_name="my_table", table_schema=await postgres.TableSchema.from_class(MyRecord, primary_key=["id"]), )
table = await target_db.mount_table_target( table_name="my_table", table_schema=await postgres.TableSchema.from_class(MyRecord, primary_key=["id"]), )

Iterate with mount_each — keys become component subpaths

使用mount_each迭代 — 键会成为组件子路径

await coco_aio.mount_each(process_item, items.items(), table)
undefined
await coco_aio.mount_each(process_item, items.items(), table)
undefined

6. Prefer Async Mount

6. 优先使用异步挂载

python
undefined
python
undefined

✅ Default: async mount for I/O-bound or general-purpose components

✅ 默认:对I/O密集型或通用组件使用异步挂载

@coco.function async def app_main(sourcedir): await coco_aio.mount_each(process_file, files.items(), table) # list of items await coco_aio.mount(coco.component_subpath("setup"), setup_fn) # single component
@coco.function async def app_main(sourcedir): await coco_aio.mount_each(process_file, files.items(), table) # 项列表 await coco_aio.mount(coco.component_subpath("setup"), setup_fn) # 单个组件

✅ Sync mount only when the leaf function is CPU-intensive (no I/O)

✅ 仅当叶子函数是CPU密集型(无I/O)时使用同步挂载

@coco.function(memo=True) def cpu_heavy_leaf(data: str) -> Result: return expensive_computation(data) # Pure CPU work, no async needed
@coco.function(memo=True) def cpu_heavy_leaf(data: str) -> Result: return expensive_computation(data) # 纯CPU工作,无需异步

❌ Don't use sync mount inside async app_main for general components

❌ 不要在异步app_main中对通用组件使用同步挂载

@coco.function async def app_main(sourcedir): for key, f in files.items(): coco.mount(coco.component_subpath(key), process_file, f) # Use await coco_aio.mount() instead
undefined
@coco.function async def app_main(sourcedir): for key, f in files.items(): coco.mount(coco.component_subpath(key), process_file, f) # 应使用await coco_aio.mount()
undefined

Migration from Old API

从旧API迁移

BeforeAfter
await mount_run(subpath, fn, *args).result()
await use_mount(subpath, fn, *args)
for key, item in items: mount(subpath(key), fn, item, *args)
mount_each(fn, items, *args)
with component_subpath("setup"): await mount_run(...)
await mount_target(target)
or
await db.mount_table_target(...)
await asyncio.gather(*(fn(item) for item in items))
await map(fn, items)
旧API新API
await mount_run(subpath, fn, *args).result()
await use_mount(subpath, fn, *args)
for key, item in items: mount(subpath(key), fn, item, *args)
mount_each(fn, items, *args)
with component_subpath("setup"): await mount_run(...)
await mount_target(target)
or
await db.mount_table_target(...)
await asyncio.gather(*(fn(item) for item in items))
await map(fn, items)

Troubleshooting

故障排除

"Module not found" Error

"Module not found"错误

Ensure pyproject.toml has pre-release config:
toml
[tool.uv]
prerelease = "explicit"
确保pyproject.toml包含预发布配置:
toml
[tool.uv]
prerelease = "explicit"

PostgreSQL pgvector Not Found

PostgreSQL pgvector未找到

Enable the pgvector extension:
bash
undefined
启用pgvector扩展:
bash
undefined

Connect to your database and enable the extension

连接到数据库并启用扩展

psql "postgres://localhost/db" -c "CREATE EXTENSION IF NOT EXISTS vector;"

See [references/setup_database.md](references/setup_database.md) for detailed setup instructions.
psql "postgres://localhost/db" -c "CREATE EXTENSION IF NOT EXISTS vector;"

详细设置说明请参考[references/setup_database.md](references/setup_database.md)。

Memoization Not Working

记忆化不生效

Check component paths are stable:
python
undefined
检查组件路径是否稳定:
python
undefined

Use stable IDs, not object references

使用稳定ID,而非对象引用

coco.component_subpath(file.stable_key) # ✅ coco.component_subpath(file) # ❌
undefined
coco.component_subpath(file.stable_key) # ✅ coco.component_subpath(file) # ❌
undefined

Everything Reprocessing

所有内容都被重新处理

Add
memo=True
to expensive functions:
python
@coco.function(memo=True)  # Add this
async def process_item(item):
    ...
为昂贵的函数添加
memo=True
python
@coco.function(memo=True)  # 添加此参数
async def process_item(item):
    ...

Resources

资源

references/

references/

  • setup_project.md: Project setup guide with dependency examples for different use cases
  • setup_database.md: Database setup guide (PostgreSQL, SQLite, LanceDB, Qdrant)
  • connectors.md: Complete connector reference with usage examples
  • patterns.md: Detailed pipeline patterns with full working code
  • api_reference.md: Quick API reference for common functions
  • setup_project.md:项目设置指南,包含不同场景的依赖示例
  • setup_database.md:数据库设置指南(PostgreSQL、SQLite、LanceDB、Qdrant)
  • connectors.md:完整的连接器参考及使用示例
  • patterns.md:详细的管道模式及完整可运行代码
  • api_reference.md:常用函数的快速API参考

assets/

assets/

  • simple-template/: Minimal project template structure
  • simple-template/:最小化项目模板结构

Additional Resources

其他资源

For AI Agents:
For Humans:
针对AI Agent:
针对人类开发者:

Version Note

版本说明

This skill is for CocoIndex v1 (pre-release:
>=1.0.0a1
). It uses a completely different API from v0. Key differences:
  • Python-native (no DSL)
  • Any Python types supported
  • No flow definitions required
  • More flexible and seamless experience
本技能适用于CocoIndex v1(预发布版本:
>=1.0.0a1
)。它使用与v0完全不同的API。主要差异:
  • 原生Python实现(无DSL)
  • 支持所有Python类型
  • 无需定义流
  • 更灵活、更无缝的体验