cocoindex

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

CocoIndex

CocoIndex

Overview

概述

CocoIndex is an ultra-performant real-time data transformation framework for AI with incremental processing. This skill enables building indexing flows that extract data from sources, apply transformations (chunking, embedding, LLM extraction), and export to targets (vector databases, graph databases, relational databases).
Core capabilities:
  1. Write indexing flows - Define ETL pipelines using Python
  2. Create custom functions - Build reusable transformation logic
  3. Operate flows - Run and manage flows using CLI or Python API
Key features:
  • Incremental processing (only processes changed data)
  • Live updates (continuously sync source changes to targets)
  • Built-in functions (text chunking, embeddings, LLM extraction)
  • Multiple data sources (local files, S3, Azure Blob, Google Drive, Postgres)
  • Multiple targets (Postgres+pgvector, Qdrant, LanceDB, Neo4j, Kuzu)
CocoIndex是一款面向AI的超高性能实时数据转换框架,支持增量处理。本技能可用于构建索引flows,从数据源提取数据,应用转换(分块、嵌入、LLM抽取),并导出到目标系统(向量数据库、图数据库、关系型数据库)。
核心功能:
  1. 编写索引flows - 使用Python定义ETL管道
  2. 创建自定义函数 - 构建可复用的转换逻辑
  3. 操作flows - 通过CLI或Python API运行和管理flows
关键特性:
  • 增量处理(仅处理变更数据)
  • 实时更新(持续同步源数据变更到目标系统)
  • 内置函数(文本分块、嵌入、LLM抽取)
  • 多数据源支持(本地文件、S3、Azure Blob、Google Drive、Postgres)
  • 多目标系统支持(Postgres+pgvector、Qdrant、LanceDB、Neo4j、Kuzu)

When to Use This Skill

适用场景

Use when users request:
  • "Build a vector search index for my documents"
  • "Create an embedding pipeline for code/PDFs/images"
  • "Extract structured information using LLMs"
  • "Build a knowledge graph from documents"
  • "Set up live document indexing"
  • "Create custom transformation functions"
  • "Run/update my CocoIndex flow"
当用户提出以下需求时使用本技能:
  • "为我的文档构建向量搜索索引"
  • "为代码/PDF/图片创建嵌入管道"
  • "使用LLM抽取结构化信息"
  • "从文档构建知识图谱"
  • "设置文档实时索引"
  • "创建自定义转换函数"
  • "运行/更新我的CocoIndex flow"

Flow Writing Workflow

Flow编写流程

Step 1: Understand Requirements

步骤1:明确需求

Ask clarifying questions to understand:
Data source:
  • Where is the data? (local files, S3, database, etc.)
  • What file types? (text, PDF, JSON, images, code, etc.)
  • How often does it change? (one-time, periodic, continuous)
Transformations:
  • What processing is needed? (chunking, embedding, extraction, etc.)
  • Which embedding model? (SentenceTransformer, OpenAI, custom)
  • Any custom logic? (filtering, parsing, enrichment)
Target:
  • Where should results go? (Postgres, Qdrant, Neo4j, etc.)
  • What schema? (fields, primary keys, indexes)
  • Vector search needed? (specify similarity metric)
通过澄清问题了解以下信息:
数据源:
  • 数据存储位置?(本地文件、S3、数据库等)
  • 文件类型?(文本、PDF、JSON、图片、代码等)
  • 数据更新频率?(一次性、周期性、持续更新)
转换需求:
  • 需要哪些处理?(分块、嵌入、抽取等)
  • 使用哪种嵌入模型?(SentenceTransformer、OpenAI、自定义模型)
  • 是否需要自定义逻辑?(过滤、解析、增强)
目标系统:
  • 结果要导出到哪里?(Postgres、Qdrant、Neo4j等)
  • 数据 schema?(字段、主键、索引)
  • 是否需要向量搜索?(指定相似度度量)

Step 2: Set Up Dependencies

步骤2:配置依赖

Guide user to add CocoIndex with appropriate extras to their project based on their needs:
Required dependency:
  • cocoindex
    - Core functionality, CLI, and most built-in functions
Optional extras (add as needed):
  • cocoindex[embeddings]
    - For SentenceTransformer embeddings (when using
    SentenceTransformerEmbed
    )
  • cocoindex[colpali]
    - For ColPali image/document embeddings (when using
    ColPaliEmbedImage
    or
    ColPaliEmbedQuery
    )
  • cocoindex[lancedb]
    - For LanceDB target (when exporting to LanceDB)
  • cocoindex[embeddings,lancedb]
    - Multiple extras can be combined
What's included:
  • Base package: Core functionality, CLI, most built-in functions, Postgres/Qdrant/Neo4j/Kuzu targets
  • embeddings
    extra: SentenceTransformers library for local embedding models
  • colpali
    extra: ColPali engine for multimodal document/image embeddings
  • lancedb
    extra: LanceDB client library for LanceDB vector database support
Users can install using their preferred package manager (pip, uv, poetry, etc.) or add to
pyproject.toml
.
根据用户需求,指导其为项目添加CocoIndex及对应扩展:
必需依赖:
  • cocoindex
    - 核心功能、CLI及大多数内置函数
可选扩展(按需添加):
  • cocoindex[embeddings]
    - 用于SentenceTransformer嵌入(使用
    SentenceTransformerEmbed
    时需要)
  • cocoindex[colpali]
    - 用于ColPali图像/文档嵌入(使用
    ColPaliEmbedImage
    ColPaliEmbedQuery
    时需要)
  • cocoindex[lancedb]
    - 用于LanceDB目标系统(导出到LanceDB时需要)
  • cocoindex[embeddings,lancedb]
    - 可组合多个扩展
各包包含内容:
  • 基础包:核心功能、CLI、大多数内置函数、Postgres/Qdrant/Neo4j/Kuzu目标系统支持
  • embeddings
    扩展:用于本地嵌入模型的SentenceTransformers库
  • colpali
    扩展:用于多模态文档/图像嵌入的ColPali引擎
  • lancedb
    扩展:用于LanceDB向量数据库的客户端库
用户可使用偏好的包管理器(pip、uv、poetry等)安装,或添加到
pyproject.toml

Step 3: Set Up Environment

步骤3:配置环境

Check existing environment first:
  1. Check if
    COCOINDEX_DATABASE_URL
    exists in environment variables
    • If not found, use default:
      postgres://cocoindex:cocoindex@localhost/cocoindex
  2. For flows requiring LLM APIs (embeddings, extraction):
    • Ask user which LLM provider they want to use:
      • OpenAI - Both generation and embeddings
      • Anthropic - Generation only
      • Gemini - Both generation and embeddings
      • Voyage - Embeddings only
      • Ollama - Local models (generation and embeddings)
    • Check if the corresponding API key exists in environment variables
    • If not found, ask user to provide the API key value
    • Never create simplified examples without LLM - always get the proper API key and use the real LLM functions
Guide user to create
.env
file:
bash
undefined
先检查现有环境:
  1. 检查环境变量中是否存在
    COCOINDEX_DATABASE_URL
    • 若未找到,使用默认值:
      postgres://cocoindex:cocoindex@localhost/cocoindex
  2. 对于需要LLM API的flows(嵌入、抽取):
    • 询问用户要使用的LLM提供商:
      • OpenAI - 支持生成和嵌入
      • Anthropic - 仅支持生成
      • Gemini - 支持生成和嵌入
      • Voyage - 仅支持嵌入
      • Ollama - 本地模型(支持生成和嵌入)
    • 检查环境变量中是否存在对应的API密钥
    • 若未找到,请用户提供API密钥值
    • 切勿创建无LLM的简化示例 - 务必获取正确的API密钥并使用真实的LLM函数
指导用户创建
.env
文件:
bash
undefined

Database connection (required - internal storage)

Database connection (required - internal storage)

COCOINDEX_DATABASE_URL=postgres://cocoindex:cocoindex@localhost/cocoindex
COCOINDEX_DATABASE_URL=postgres://cocoindex:cocoindex@localhost/cocoindex

LLM API keys (add the ones you need)

LLM API keys (add the ones you need)

OPENAI_API_KEY=sk-... # For OpenAI (generation + embeddings) ANTHROPIC_API_KEY=sk-ant-... # For Anthropic (generation only) GOOGLE_API_KEY=... # For Gemini (generation + embeddings) VOYAGE_API_KEY=pa-... # For Voyage (embeddings only)
OPENAI_API_KEY=sk-... # For OpenAI (generation + embeddings) ANTHROPIC_API_KEY=sk-ant-... # For Anthropic (generation only) GOOGLE_API_KEY=... # For Gemini (generation + embeddings) VOYAGE_API_KEY=pa-... # For Voyage (embeddings only)

Ollama requires no API key (local)

Ollama requires no API key (local)


**For more LLM options:** <https://cocoindex.io/docs/ai/llm>

Create basic project structure:

```python

**更多LLM选项:** <https://cocoindex.io/docs/ai/llm>

创建基础项目结构:

```python

main.py

main.py

from dotenv import load_dotenv import cocoindex
@cocoindex.flow_def(name="FlowName") def my_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope): # Flow definition here pass
if name == "main": load_dotenv() cocoindex.init() my_flow.update()
undefined
from dotenv import load_dotenv import cocoindex
@cocoindex.flow_def(name="FlowName") def my_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope): # Flow definition here pass
if name == "main": load_dotenv() cocoindex.init() my_flow.update()
undefined

Step 4: Write the Flow

步骤4:编写Flow

Follow this structure:
python
@cocoindex.flow_def(name="DescriptiveName")
def flow_name(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope):
    # 1. Import source data
    data_scope["source_name"] = flow_builder.add_source(
        cocoindex.sources.SourceType(...)
    )

    # 2. Create collector(s) for outputs
    collector = data_scope.add_collector()

    # 3. Transform data (iterate through rows)
    with data_scope["source_name"].row() as item:
        # Apply transformations
        item["new_field"] = item["existing_field"].transform(
            cocoindex.functions.FunctionName(...)
        )

        ...

        # Nested iteration (e.g., chunks within documents)
        with item["nested_table"].row() as nested_item:
            # More transformations
            nested_item["embedding"] = nested_item["text"].transform(...)

            # Collect data for export
            collector.collect(
                field1=nested_item["field1"],
                field2=item["field2"],
                generated_id=cocoindex.GeneratedField.UUID
            )

    # 4. Export to target
    collector.export(
        "target_name",
        cocoindex.targets.TargetType(...),
        primary_key_fields=["field1"],
        vector_indexes=[...]  # If needed
    )
Key principles:
  • Each source creates a field in the top-level data scope
  • Use
    .row()
    to iterate through table data
  • CRITICAL: Always assign transformed data to row fields - Use
    item["new_field"] = item["existing_field"].transform(...)
    , NOT local variables like
    new_field = item["existing_field"].transform(...)
  • Transformations create new fields without mutating existing data
  • Collectors gather data from any scope level
  • Export must happen at top level (not within row iterations)
Common mistakes to avoid:
Wrong: Using local variables for transformations
python
with data_scope["files"].row() as file:
    summary = file["content"].transform(...)  # ❌ Local variable
    summaries_collector.collect(filename=file["filename"], summary=summary)
Correct: Assigning to row fields
python
with data_scope["files"].row() as file:
    file["summary"] = file["content"].transform(...)  # ✅ Field assignment
    summaries_collector.collect(filename=file["filename"], summary=file["summary"])
Wrong: Creating unnecessary dataclasses to mirror flow fields
python
from dataclasses import dataclass

@dataclass
class FileSummary:  # ❌ Unnecessary - CocoIndex manages fields automatically
    filename: str
    summary: str
    embedding: list[float]
遵循以下结构:
python
@cocoindex.flow_def(name="DescriptiveName")
def flow_name(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope):
    # 1. 导入源数据
    data_scope["source_name"] = flow_builder.add_source(
        cocoindex.sources.SourceType(...)
    )

    # 2. 为输出创建收集器
    collector = data_scope.add_collector()

    # 3. 转换数据(遍历行)
    with data_scope["source_name"].row() as item:
        # 应用转换
        item["new_field"] = item["existing_field"].transform(
            cocoindex.functions.FunctionName(...)
        )

        ...

        # 嵌套遍历(例如文档中的分块)
        with item["nested_table"].row() as nested_item:
            # 更多转换
            nested_item["embedding"] = nested_item["text"].transform(...)

            # 收集待导出的数据
            collector.collect(
                field1=nested_item["field1"],
                field2=item["field2"],
                generated_id=cocoindex.GeneratedField.UUID
            )

    # 4. 导出到目标系统
    collector.export(
        "target_name",
        cocoindex.targets.TargetType(...),
        primary_key_fields=["field1"],
        vector_indexes=[...]  # 按需配置
    )
核心原则:
  • 每个数据源会在顶层数据作用域中创建一个字段
  • 使用
    .row()
    遍历表格数据
  • 关键注意事项:始终将转换后的数据分配给行字段 - 使用
    item["new_field"] = item["existing_field"].transform(...)
    ,而非局部变量如
    new_field = item["existing_field"].transform(...)
  • 转换操作会创建新字段,不会修改现有数据
  • 收集器可从任意作用域层级收集数据
  • 导出操作必须在顶层执行(不可在行遍历内部)
需避免的常见错误:
错误: 使用局部变量存储转换结果
python
with data_scope["files"].row() as file:
    summary = file["content"].transform(...)  # ❌ 局部变量
    summaries_collector.collect(filename=file["filename"], summary=summary)
正确: 分配给行字段
python
with data_scope["files"].row() as file:
    file["summary"] = file["content"].transform(...)  # ✅ 字段赋值
    summaries_collector.collect(filename=file["filename"], summary=file["summary"])
错误: 创建不必要的数据类来镜像flow字段
python
from dataclasses import dataclass

@dataclass
class FileSummary:  # ❌ 无需创建 - CocoIndex会自动管理字段
    filename: str
    summary: str
    embedding: list[float]

This dataclass is never used in the flow!

该数据类在flow中从未使用!

undefined
undefined

Step 5: Design the Flow Solution

步骤5:设计Flow解决方案

IMPORTANT: The patterns listed below are common starting points, but you cannot exhaustively enumerate all possible scenarios. When user requirements don't match existing patterns:
  1. Combine elements from multiple patterns - Mix and match sources, transformations, and targets creatively
  2. Review additional examples - See https://github.com/cocoindex-io/cocoindex?tab=readme-ov-file#-examples-and-demo for diverse real-world use cases (face recognition, multimodal search, product recommendations, patient form extraction, etc.)
  3. Think from first principles - Use the core APIs (sources, transforms, collectors, exports) and apply common sense to solve novel problems
  4. Be creative - CocoIndex is flexible; unique combinations of components can solve unique problems
Common starting patterns (use references for detailed examples):
For text embedding: Load
references/flow_patterns.md
and refer to "Pattern 1: Simple Text Embedding"
For code embedding: Load
references/flow_patterns.md
and refer to "Pattern 2: Code Embedding with Language Detection"
For LLM extraction + knowledge graph: Load
references/flow_patterns.md
and refer to "Pattern 3: LLM-based Extraction to Knowledge Graph"
For live updates: Load
references/flow_patterns.md
and refer to "Pattern 4: Live Updates with Refresh Interval"
For custom functions: Load
references/flow_patterns.md
and refer to "Pattern 5: Custom Transform Function"
For reusable query logic: Load
references/flow_patterns.md
and refer to "Pattern 6: Transform Flow for Reusable Logic"
For concurrency control: Load
references/flow_patterns.md
and refer to "Pattern 7: Concurrency Control"
Example of pattern composition:
If a user asks to "index images from S3, generate captions with a vision API, and store in Qdrant", combine:
  • AmazonS3 source (from S3 examples)
  • Custom function for vision API calls (from custom functions pattern)
  • EmbedText to embed the captions (from embedding patterns)
  • Qdrant target (from target examples)
No single pattern covers this exact scenario, but the building blocks are composable.
重要提示: 以下列出的模式是常见的起点,但无法覆盖所有可能场景。当用户需求与现有模式不匹配时:
  1. 组合多种模式的元素 - 灵活组合数据源、转换操作和目标系统
  2. 查看更多示例 - 访问https://github.com/cocoindex-io/cocoindex?tab=readme-ov-file#-examples-and-demo查看多样化的真实场景(人脸识别、多模态搜索、产品推荐、患者表单抽取等)
  3. 从核心API出发思考 - 使用核心API(数据源、转换、收集器、导出)并结合常识解决新问题
  4. 灵活创新 - CocoIndex具备高灵活性,组件的独特组合可解决独特问题
常见起始模式(可参考详细示例):
文本嵌入: 加载
references/flow_patterns.md
并参考"Pattern 1: Simple Text Embedding"
代码嵌入: 加载
references/flow_patterns.md
并参考"Pattern 2: Code Embedding with Language Detection"
LLM抽取+知识图谱: 加载
references/flow_patterns.md
并参考"Pattern 3: LLM-based Extraction to Knowledge Graph"
实时更新: 加载
references/flow_patterns.md
并参考"Pattern 4: Live Updates with Refresh Interval"
自定义函数: 加载
references/flow_patterns.md
并参考"Pattern 5: Custom Transform Function"
可复用查询逻辑: 加载
references/flow_patterns.md
并参考"Pattern 6: Transform Flow for Reusable Logic"
并发控制: 加载
references/flow_patterns.md
并参考"Pattern 7: Concurrency Control"
模式组合示例:
若用户需求为"索引S3中的图片,使用视觉API生成标题,并存储到Qdrant",可组合以下元素:
  • AmazonS3数据源(来自S3示例)
  • 用于视觉API调用的自定义函数(来自自定义函数模式)
  • EmbedText嵌入标题(来自嵌入模式)
  • Qdrant目标系统(来自目标系统示例)
没有单一模式能完全匹配该场景,但可通过组合基础组件实现。

Step 6: Test and Run

步骤6:测试与运行

Guide user through testing:
bash
undefined
指导用户完成测试:
bash
undefined

1. Run with setup

1. 带初始化的运行

cocoindex update --setup -f main # -f force setup without confirmation prompts
cocoindex update --setup -f main # -f 强制初始化,无需确认提示

2. Start a server and redirect users to CocoInsight

2. 启动服务器并引导用户访问CocoInsight

cocoindex server -ci main
cocoindex server -ci main

Then open CocoInsight at https://cocoindex.io/cocoinsight

然后打开CocoInsight:https://cocoindex.io/cocoinsight

undefined
undefined

Data Types

数据类型

CocoIndex has a type system independent of programming languages. All data types are determined at flow definition time, making schemas clear and predictable.
IMPORTANT: When to define types:
  • Custom functions: Type annotations are required for return values (these are the source of truth for type inference)
  • Flow fields: Type annotations are NOT needed - CocoIndex automatically infers types from sources, functions, and transformations
  • Dataclasses/Pydantic models: Only create them when they're actually used (as function parameters/returns or ExtractByLlm output_type), NOT to mirror flow field schemas
Type annotation requirements:
  • Return values of custom functions: Must use specific type annotations - these are the source of truth for type inference
  • Arguments of custom functions: Relaxed - can use
    Any
    ,
    dict[str, Any]
    , or omit annotations; engine already knows the types
  • Flow definitions: No explicit type annotations needed - CocoIndex automatically infers types from sources and functions
Why specific return types matter: Custom function return types let CocoIndex infer field types throughout the flow without processing real data. This enables creating proper target schemas (e.g., vector indexes with fixed dimensions).
Common type categories:
  1. Primitive types:
    str
    ,
    int
    ,
    float
    ,
    bool
    ,
    bytes
    ,
    datetime.date
    ,
    datetime.datetime
    ,
    uuid.UUID
  2. Vector types (embeddings): Specify dimension in return type if you plan to export as vectors to targets, as most targets require a fixed vector dimension
    • cocoindex.Vector[cocoindex.Float32, typing.Literal[768]]
      - 768-dim float32 vector (recommended)
    • list[float]
      without dimension also works
  3. Struct types: Dataclass, NamedTuple, or Pydantic model
    • Return type: Must use specific class (e.g.,
      Person
      )
    • Argument: Can use
      dict[str, Any]
      or
      Any
  4. Table types:
    • KTable (keyed):
      dict[K, V]
      where K = key type (primitive or frozen struct), V = Struct type
    • LTable (ordered):
      list[R]
      where R = Struct type
    • Arguments: Can use
      dict[Any, Any]
      or
      list[Any]
  5. Json type:
    cocoindex.Json
    for unstructured/dynamic data
  6. Optional types:
    T | None
    for nullable values
Examples:
python
from dataclasses import dataclass
from typing import Literal
import cocoindex

@dataclass
class Person:
    name: str
    age: int
CocoIndex拥有独立于编程语言的类型系统。所有数据类型在flow定义阶段确定,使schema清晰可预测。
重要提示:何时定义类型:
  • 自定义函数: 返回值的类型注解是必需的(这是类型推断的依据)
  • Flow字段: 无需类型注解 - CocoIndex会自动从数据源、函数和转换操作中推断类型
  • 数据类/Pydantic模型: 仅在实际使用时创建(作为函数参数/返回值或ExtractByLlm的output_type),而非用于镜像flow字段schema
类型注解要求:
  • 自定义函数返回值: 必须使用具体类型注解 - 这是类型推断的依据
  • 自定义函数参数: 要求宽松 - 可使用
    Any
    dict[str, Any]
    或省略注解;引擎已知晓参数类型
  • Flow定义: 无需显式类型注解 - CocoIndex会自动从数据源和函数推断类型
为何具体返回类型很重要: 自定义函数的返回类型可让CocoIndex无需处理真实数据即可推断整个flow的字段类型。这有助于创建正确的目标系统schema(例如固定维度的向量索引)。
常见类型分类:
  1. 基本类型
    str
    ,
    int
    ,
    float
    ,
    bool
    ,
    bytes
    ,
    datetime.date
    ,
    datetime.datetime
    ,
    uuid.UUID
  2. 向量类型(嵌入):若计划导出为目标系统的向量,需在返回类型中指定维度,因为大多数目标系统要求固定的向量维度
    • cocoindex.Vector[cocoindex.Float32, typing.Literal[768]]
      - 768维float32向量(推荐)
    • 不指定维度的
      list[float]
      也可使用
  3. 结构体类型:数据类、NamedTuple或Pydantic模型
    • 返回类型:必须使用具体类(例如
      Person
    • 参数:可使用
      dict[str, Any]
      Any
  4. 表格类型
    • KTable(带键):
      dict[K, V]
      ,其中K=键类型(基本类型或冻结结构体),V=结构体类型
    • LTable(有序):
      list[R]
      ,其中R=结构体类型
    • 参数:可使用
      dict[Any, Any]
      list[Any]
  5. Json类型
    cocoindex.Json
    用于非结构化/动态数据
  6. 可选类型
    T | None
    用于可空值
示例:
python
from dataclasses import dataclass
from typing import Literal
import cocoindex

@dataclass
class Person:
    name: str
    age: int

✅ Vector with dimension (recommended for vector search)

✅ 指定维度的向量(向量搜索推荐)

@cocoindex.op.function(behavior_version=1) def embed_text(text: str) -> cocoindex.Vector[cocoindex.Float32, Literal[768]]: """Generate 768-dim embedding - dimension needed for vector index.""" # ... embedding logic ... return embedding # numpy array or list of 768 floats
@cocoindex.op.function(behavior_version=1) def embed_text(text: str) -> cocoindex.Vector[cocoindex.Float32, Literal[768]]: """生成768维嵌入 - 向量索引需要指定维度。""" # ... 嵌入逻辑 ... return embedding # numpy数组或包含768个浮点数的列表

✅ Struct return type, relaxed argument

✅ 结构体返回类型,参数要求宽松

@cocoindex.op.function(behavior_version=1) def process_person(person: dict[str, Any]) -> Person: """Argument can be dict[str, Any], return must be specific Struct.""" return Person(name=person["name"], age=person["age"])
@cocoindex.op.function(behavior_version=1) def process_person(person: dict[str, Any]) -> Person: """参数可使用dict[str, Any],返回值必须是具体结构体。""" return Person(name=person["name"], age=person["age"])

✅ LTable return type

✅ LTable返回类型

@cocoindex.op.function(behavior_version=1) def filter_people(people: list[Any]) -> list[Person]: """Return type specifies list of specific Struct.""" return [p for p in people if p.age >= 18]
@cocoindex.op.function(behavior_version=1) def filter_people(people: list[Any]) -> list[Person]: """返回类型指定为具体结构体的列表。""" return [p for p in people if p.age >= 18]

❌ Wrong: dict[str, str] is not a valid specific CocoIndex type

❌ 错误:dict[str, str]不是有效的CocoIndex具体类型

@cocoindex.op.function(...)

@cocoindex.op.function(...)

def bad_example(person: Person) -> dict[str, str]:

def bad_example(person: Person) -> dict[str, str]:

return {"name": person.name}

return {"name": person.name}


**For comprehensive data types documentation:** <https://cocoindex.io/docs/core/data_types>

**完整数据类型文档:** <https://cocoindex.io/docs/core/data_types>

Custom Functions

自定义函数

When users need custom transformation logic, create custom functions.
当用户需要自定义转换逻辑时,可创建自定义函数。

Decision: Standalone vs Spec+Executor

决策:独立函数 vs 配置+执行器

Use standalone function when:
  • Simple transformation
  • No configuration needed
  • No setup/initialization required
Use spec+executor when:
  • Needs configuration (model names, API endpoints, parameters)
  • Requires setup (loading models, establishing connections)
  • Complex multi-step processing
使用独立函数的场景:
  • 简单转换
  • 无需配置
  • 无需初始化/设置
使用配置+执行器的场景:
  • 需要配置(模型名称、API端点、参数)
  • 需要初始化(加载模型、建立连接)
  • 复杂的多步骤处理

Creating Standalone Functions

创建独立函数

python
@cocoindex.op.function(behavior_version=1)
def my_function(input_arg: str, optional_arg: int | None = None) -> dict:
    """
    Function description.

    Args:
        input_arg: Description
        optional_arg: Optional description
    """
    # Transformation logic
    return {"result": f"processed-{input_arg}"}
Requirements:
  • Decorator:
    @cocoindex.op.function()
  • Type annotations on all arguments and return value
  • Optional parameters:
    cache=True
    for expensive ops,
    behavior_version
    (required with cache)
python
@cocoindex.op.function(behavior_version=1)
def my_function(input_arg: str, optional_arg: int | None = None) -> dict:
    """
    函数描述。

    参数:
        input_arg: 参数描述
        optional_arg: 可选参数描述
    """
    # 转换逻辑
    return {"result": f"processed-{input_arg}"}
要求:
  • 装饰器:
    @cocoindex.op.function()
  • 所有参数和返回值需添加类型注解
  • 可选参数:
    cache=True
    用于开销较大的操作,
    behavior_version
    (启用缓存时必需)

Creating Spec+Executor Functions

创建配置+执行器函数

python
undefined
python
undefined

1. Define configuration spec

1. 定义配置规格

class MyFunction(cocoindex.op.FunctionSpec): """Configuration for MyFunction.""" model_name: str threshold: float = 0.5
class MyFunction(cocoindex.op.FunctionSpec): """MyFunction的配置。""" model_name: str threshold: float = 0.5

2. Define executor

2. 定义执行器

@cocoindex.op.executor_class(cache=True, behavior_version=1) class MyFunctionExecutor: spec: MyFunction # Required: link to spec model = None # Instance variables for state
def prepare(self) -> None:
    """Optional: run once before execution."""
    # Load model, setup connections, etc.
    self.model = load_model(self.spec.model_name)

def __call__(self, text: str) -> dict:
    """Required: execute for each data row."""
    # Use self.spec for configuration
    # Use self.model for loaded resources
    result = self.model.process(text)
    return {"result": result}

**When to enable cache:**

- LLM API calls
- Model inference
- External API calls
- Computationally expensive operations

**Important:** Increment `behavior_version` when function logic changes to invalidate cache.

For detailed examples and patterns, load `references/custom_functions.md`.

**For more on custom functions:** <https://cocoindex.io/docs/custom_ops/custom_functions>
@cocoindex.op.executor_class(cache=True, behavior_version=1) class MyFunctionExecutor: spec: MyFunction # 必需:关联到配置规格 model = None # 用于存储状态的实例变量
def prepare(self) -> None:
    """可选:执行前运行一次。"""
    # 加载模型、建立连接等
    self.model = load_model(self.spec.model_name)

def __call__(self, text: str) -> dict:
    """必需:为每一行数据执行。"""
    # 使用self.spec获取配置
    # 使用self.model获取已加载的资源
    result = self.model.process(text)
    return {"result": result}

**何时启用缓存:**

- LLM API调用
- 模型推理
- 外部API调用
- 计算密集型操作

**重要提示:** 当函数逻辑变更时,需递增`behavior_version`以失效缓存。

如需详细示例和模式,可加载`references/custom_functions.md`。

**自定义函数更多文档:** <https://cocoindex.io/docs/custom_ops/custom_functions>

Operating Flows

Flow操作

CLI Operations

CLI操作

Setup flow (create resources):
bash
cocoindex setup main
One-time update:
bash
cocoindex update main
初始化flow(创建资源):
bash
cocoindex setup main
一次性更新:
bash
cocoindex update main

With auto-setup

自动初始化

cocoindex update --setup main
cocoindex update --setup main

Force reset everything before setup and update

强制重置所有资源后再初始化和更新

cocoindex update --reset main

**Live update (continuous monitoring):**

```bash
cocoindex update main.py -L
cocoindex update --reset main

**实时更新(持续监控):**

```bash
cocoindex update main.py -L

Requires refresh_interval on source or source-specific change capture

需要在数据源中设置refresh_interval或启用数据源特定的变更捕获


**Drop flow (remove all resources):**

```bash
cocoindex drop main.py
Inspect flow:
bash
cocoindex show main.py:FlowName
Test without side effects:
bash
cocoindex evaluate main.py:FlowName --output-dir ./test_output
For complete CLI reference, load
references/cli_operations.md
.
For CLI documentation: https://cocoindex.io/docs/core/cli

**删除flow(移除所有资源):**

```bash
cocoindex drop main.py
查看flow详情:
bash
cocoindex show main.py:FlowName
无副作用测试:
bash
cocoindex evaluate main.py:FlowName --output-dir ./test_output
完整CLI参考可加载
references/cli_operations.md

API Operations

API操作

Basic setup:
python
from dotenv import load_dotenv
import cocoindex

load_dotenv()
cocoindex.init()

@cocoindex.flow_def(name="MyFlow")
def my_flow(flow_builder, data_scope):
    # ... flow definition ...
    pass
One-time update:
python
stats = my_flow.update()
print(f"Processed {stats.total_rows} rows")
基础配置:
python
from dotenv import load_dotenv
import cocoindex

load_dotenv()
cocoindex.init()

@cocoindex.flow_def(name="MyFlow")
def my_flow(flow_builder, data_scope):
    # ... flow定义 ...
    pass
一次性更新:
python
stats = my_flow.update()
print(f"已处理 {stats.total_rows} 行数据")

Async

异步版本

stats = await my_flow.update_async()

**Live update:**

```python
stats = await my_flow.update_async()

**实时更新:**

```python

As context manager

使用上下文管理器

with cocoindex.FlowLiveUpdater(my_flow) as updater: # Updater runs in background # Your application logic here pass
with cocoindex.FlowLiveUpdater(my_flow) as updater: # updater在后台运行 # 此处编写你的应用逻辑 pass

Manual control

手动控制

updater = cocoindex.FlowLiveUpdater( my_flow, cocoindex.FlowLiveUpdaterOptions( live_mode=True, print_stats=True ) ) updater.start()
updater = cocoindex.FlowLiveUpdater( my_flow, cocoindex.FlowLiveUpdaterOptions( live_mode=True, print_stats=True ) ) updater.start()

... application logic ...

... 应用逻辑 ...

updater.wait()

**Setup/drop:**

```python
my_flow.setup(report_to_stdout=True)
my_flow.drop(report_to_stdout=True)
cocoindex.setup_all_flows()
cocoindex.drop_all_flows()
Query with transform flows:
python
@cocoindex.transform_flow()
def text_to_embedding(text: cocoindex.DataSlice[str]) -> cocoindex.DataSlice[list[float]]:
    return text.transform(
        cocoindex.functions.SentenceTransformerEmbed(model="...")
    )
updater.wait()

**初始化/删除:**

```python
my_flow.setup(report_to_stdout=True)
my_flow.drop(report_to_stdout=True)
cocoindex.setup_all_flows()
cocoindex.drop_all_flows()
使用转换flow查询:
python
@cocoindex.transform_flow()
def text_to_embedding(text: cocoindex.DataSlice[str]) -> cocoindex.DataSlice[list[float]]:
    return text.transform(
        cocoindex.functions.SentenceTransformerEmbed(model="...")
    )

Use in flow for indexing

在索引flow中使用

doc["embedding"] = text_to_embedding(doc["content"])
doc["embedding"] = text_to_embedding(doc["content"])

Use for querying

用于查询

query_embedding = text_to_embedding.eval("search query")

For complete API reference and patterns, load `references/api_operations.md`.

**For API documentation:** <https://cocoindex.io/docs/core/flow_methods>
query_embedding = text_to_embedding.eval("search query")

完整API参考和模式可加载`references/api_operations.md`。

**API文档:** <https://cocoindex.io/docs/core/flow_methods>

Built-in Functions

内置函数

Text Processing

文本处理

SplitRecursively - Chunk text intelligently
python
doc["chunks"] = doc["content"].transform(
    cocoindex.functions.SplitRecursively(),
    language="markdown",  # or "python", "javascript", etc.
    chunk_size=2000,
    chunk_overlap=500
)
ParseJson - Parse JSON strings
python
data = json_string.transform(cocoindex.functions.ParseJson())
DetectProgrammingLanguage - Detect language from filename
python
file["language"] = file["filename"].transform(
    cocoindex.functions.DetectProgrammingLanguage()
)
SplitRecursively - 智能分块文本
python
doc["chunks"] = doc["content"].transform(
    cocoindex.functions.SplitRecursively(),
    language="markdown",  # 或 "python", "javascript"等
    chunk_size=2000,
    chunk_overlap=500
)
ParseJson - 解析JSON字符串
python
data = json_string.transform(cocoindex.functions.ParseJson())
DetectProgrammingLanguage - 通过文件名检测编程语言
python
file["language"] = file["filename"].transform(
    cocoindex.functions.DetectProgrammingLanguage()
)

Embeddings

嵌入

SentenceTransformerEmbed - Local embedding model
python
undefined
SentenceTransformerEmbed - 本地嵌入模型
python
undefined

Requires: cocoindex[embeddings]

需安装:cocoindex[embeddings]

chunk["embedding"] = chunk["text"].transform( cocoindex.functions.SentenceTransformerEmbed( model="sentence-transformers/all-MiniLM-L6-v2" ) )

**EmbedText** - LLM API embeddings

This is the **recommended way** to generate embeddings using LLM APIs (OpenAI, Voyage, etc.).

```python
chunk["embedding"] = chunk["text"].transform(
    cocoindex.functions.EmbedText(
        api_type=cocoindex.LlmApiType.OPENAI,
        model="text-embedding-3-small",
    )
)
ColPaliEmbedImage - Multimodal image embeddings
python
undefined
chunk["embedding"] = chunk["text"].transform( cocoindex.functions.SentenceTransformerEmbed( model="sentence-transformers/all-MiniLM-L6-v2" ) )

**EmbedText** - LLM API嵌入

这是使用LLM API生成嵌入(OpenAI、Voyage等)的**推荐方式**。

```python
chunk["embedding"] = chunk["text"].transform(
    cocoindex.functions.EmbedText(
        api_type=cocoindex.LlmApiType.OPENAI,
        model="text-embedding-3-small",
    )
)
ColPaliEmbedImage - 多模态图像嵌入
python
undefined

Requires: cocoindex[colpali]

需安装:cocoindex[colpali]

image["embedding"] = image["img_bytes"].transform( cocoindex.functions.ColPaliEmbedImage(model="vidore/colpali-v1.2") )
undefined
image["embedding"] = image["img_bytes"].transform( cocoindex.functions.ColPaliEmbedImage(model="vidore/colpali-v1.2") )
undefined

LLM Extraction

LLM抽取

ExtractByLlm - Extract structured data with LLM
This is the recommended way to use LLMs for extraction and summarization tasks. It supports both structured outputs (dataclasses, Pydantic models) and simple text outputs (str).
python
import dataclasses
ExtractByLlm - 使用LLM抽取结构化数据
这是使用LLM进行抽取和摘要任务的推荐方式。支持结构化输出(数据类、Pydantic模型)和简单文本输出(str)。
python
import dataclasses

For structured extraction

结构化抽取

@dataclasses.dataclass class ProductInfo: name: str price: float category: str
item["product_info"] = item["text"].transform( cocoindex.functions.ExtractByLlm( llm_spec=cocoindex.LlmSpec( api_type=cocoindex.LlmApiType.OPENAI, model="gpt-4o-mini" ), output_type=ProductInfo, instruction="Extract product information" ) )
@dataclasses.dataclass class ProductInfo: name: str price: float category: str
item["product_info"] = item["text"].transform( cocoindex.functions.ExtractByLlm( llm_spec=cocoindex.LlmSpec( api_type=cocoindex.LlmApiType.OPENAI, model="gpt-4o-mini" ), output_type=ProductInfo, instruction="抽取产品信息" ) )

For text summarization/generation

文本摘要/生成

file["summary"] = file["content"].transform( cocoindex.functions.ExtractByLlm( llm_spec=cocoindex.LlmSpec( api_type=cocoindex.LlmApiType.OPENAI, model="gpt-4o-mini" ), output_type=str, instruction="Summarize this document in one paragraph" ) )
undefined
file["summary"] = file["content"].transform( cocoindex.functions.ExtractByLlm( llm_spec=cocoindex.LlmSpec( api_type=cocoindex.LlmApiType.OPENAI, model="gpt-4o-mini" ), output_type=str, instruction="将本文档摘要为一段话" ) )
undefined

Common Sources and Targets

常见数据源与目标系统

浏览所有数据源: https://cocoindex.io/docs/sources/ 浏览所有目标系统: https://cocoindex.io/docs/targets/

Sources

数据源

LocalFile:
python
cocoindex.sources.LocalFile(
    path="documents",
    included_patterns=["*.md", "*.txt"],
    excluded_patterns=["**/.*", "node_modules"]
)
AmazonS3:
python
cocoindex.sources.AmazonS3(
    bucket="my-bucket",
    prefix="documents/",
    aws_access_key_id=cocoindex.add_transient_auth_entry("..."),
    aws_secret_access_key=cocoindex.add_transient_auth_entry("...")
)
Postgres:
python
cocoindex.sources.Postgres(
    connection=cocoindex.add_auth_entry("conn", cocoindex.sources.PostgresConnection(...)),
    query="SELECT id, content FROM documents"
)
LocalFile:
python
cocoindex.sources.LocalFile(
    path="documents",
    included_patterns=["*.md", "*.txt"],
    excluded_patterns=["**/.*", "node_modules"]
)
AmazonS3:
python
cocoindex.sources.AmazonS3(
    bucket="my-bucket",
    prefix="documents/",
    aws_access_key_id=cocoindex.add_transient_auth_entry("..."),
    aws_secret_access_key=cocoindex.add_transient_auth_entry("...")
)
Postgres:
python
cocoindex.sources.Postgres(
    connection=cocoindex.add_auth_entry("conn", cocoindex.sources.PostgresConnection(...)),
    query="SELECT id, content FROM documents"
)

Targets

目标系统

Postgres (with vector support):
python
collector.export(
    "target_name",
    cocoindex.targets.Postgres(),
    primary_key_fields=["id"],
    vector_indexes=[
        cocoindex.VectorIndexDef(
            field_name="embedding",
            metric=cocoindex.VectorSimilarityMetric.COSINE_SIMILARITY
        )
    ]
)
Qdrant:
python
collector.export(
    "target_name",
    cocoindex.targets.Qdrant(collection_name="my_collection"),
    primary_key_fields=["id"]
)
LanceDB:
python
undefined
Postgres(支持向量):
python
collector.export(
    "target_name",
    cocoindex.targets.Postgres(),
    primary_key_fields=["id"],
    vector_indexes=[
        cocoindex.VectorIndexDef(
            field_name="embedding",
            metric=cocoindex.VectorSimilarityMetric.COSINE_SIMILARITY
        )
    ]
)
Qdrant:
python
collector.export(
    "target_name",
    cocoindex.targets.Qdrant(collection_name="my_collection"),
    primary_key_fields=["id"]
)
LanceDB:
python
undefined

Requires: cocoindex[lancedb]

需安装:cocoindex[lancedb]

collector.export( "target_name", cocoindex.targets.LanceDB(uri="lancedb_data", table_name="my_table"), primary_key_fields=["id"] )

**Neo4j (nodes):**

```python
collector.export(
    "nodes",
    cocoindex.targets.Neo4j(
        connection=neo4j_conn,
        mapping=cocoindex.targets.Nodes(label="Entity")
    ),
    primary_key_fields=["id"]
)
Neo4j (relationships):
python
collector.export(
    "relationships",
    cocoindex.targets.Neo4j(
        connection=neo4j_conn,
        mapping=cocoindex.targets.Relationships(
            rel_type="RELATES_TO",
            source=cocoindex.targets.NodeFromFields(
                label="Entity",
                fields=[cocoindex.targets.TargetFieldMapping(source="source_id", target="id")]
            ),
            target=cocoindex.targets.NodeFromFields(
                label="Entity",
                fields=[cocoindex.targets.TargetFieldMapping(source="target_id", target="id")]
            )
        )
    ),
    primary_key_fields=["id"]
)
collector.export( "target_name", cocoindex.targets.LanceDB(uri="lancedb_data", table_name="my_table"), primary_key_fields=["id"] )

**Neo4j(节点):**

```python
collector.export(
    "nodes",
    cocoindex.targets.Neo4j(
        connection=neo4j_conn,
        mapping=cocoindex.targets.Nodes(label="Entity")
    ),
    primary_key_fields=["id"]
)
Neo4j(关系):
python
collector.export(
    "relationships",
    cocoindex.targets.Neo4j(
        connection=neo4j_conn,
        mapping=cocoindex.targets.Relationships(
            rel_type="RELATES_TO",
            source=cocoindex.targets.NodeFromFields(
                label="Entity",
                fields=[cocoindex.targets.TargetFieldMapping(source="source_id", target="id")]
            ),
            target=cocoindex.targets.NodeFromFields(
                label="Entity",
                fields=[cocoindex.targets.TargetFieldMapping(source="target_id", target="id")]
            )
        )
    ),
    primary_key_fields=["id"]
)

Common Issues and Solutions

常见问题与解决方案

"Flow not found"

"Flow not found"

  • Check APP_TARGET format:
    cocoindex show main.py
  • Use
    --app-dir
    if not in project root
  • Verify flow name matches decorator
  • 检查APP_TARGET格式:
    cocoindex show main.py
  • 若不在项目根目录,使用
    --app-dir
    参数
  • 确认flow名称与装饰器中的名称一致

"Database connection failed"

"Database connection failed"

  • Check
    .env
    has
    COCOINDEX_DATABASE_URL
  • Test connection:
    psql $COCOINDEX_DATABASE_URL
  • Use
    --env-file
    to specify custom location
  • 检查
    .env
    文件中是否存在
    COCOINDEX_DATABASE_URL
  • 测试连接:
    psql $COCOINDEX_DATABASE_URL
  • 使用
    --env-file
    指定自定义环境文件位置

"Schema mismatch"

"Schema mismatch"

  • Re-run setup:
    cocoindex setup main.py
  • Drop and recreate:
    cocoindex drop main.py && cocoindex setup main.py
  • 重新运行初始化:
    cocoindex setup main.py
  • 删除后重新创建:
    cocoindex drop main.py && cocoindex setup main.py

"Live update exits immediately"

"Live update exits immediately"

  • Add
    refresh_interval
    to source
  • Or use source-specific change capture (Postgres notifications, S3 events)
  • 为数据源添加
    refresh_interval
  • 或使用数据源特定的变更捕获(Postgres通知、S3事件)

"Out of memory"

"Out of memory"

  • Add concurrency limits on sources:
    max_inflight_rows
    ,
    max_inflight_bytes
  • Set global limits in
    .env
    :
    COCOINDEX_SOURCE_MAX_INFLIGHT_ROWS
  • 为数据源添加并发限制:
    max_inflight_rows
    ,
    max_inflight_bytes
  • .env
    中设置全局限制:
    COCOINDEX_SOURCE_MAX_INFLIGHT_ROWS

Reference Documentation

参考文档

This skill includes comprehensive reference documentation for common patterns and operations:
  • references/flow_patterns.md - Complete examples of common flow patterns (text embedding, code embedding, knowledge graphs, live updates, concurrency control, etc.)
  • references/custom_functions.md - Detailed guide for creating custom functions with examples (standalone functions, spec+executor pattern, LLM calls, external APIs, caching)
  • references/cli_operations.md - Complete CLI reference with all commands, options, and workflows
  • references/api_operations.md - Python API reference with examples for programmatic flow control, live updates, queries, and application integration patterns
Load these references when users need:
  • Detailed examples of specific patterns
  • Complete API documentation
  • Advanced usage scenarios
  • Troubleshooting guidance
For comprehensive documentation: https://cocoindex.io/docs/ Search specific topics: https://cocoindex.io/docs/search?q=url%20encoded%20keyword
本技能包含常见模式和操作的完整参考文档:
  • references/flow_patterns.md - 常见flow模式的完整示例(文本嵌入、代码嵌入、知识图谱、实时更新、并发控制等)
  • references/custom_functions.md - 创建自定义函数的详细指南及示例(独立函数、配置+执行器模式、LLM调用、外部API、缓存)
  • references/cli_operations.md - 完整CLI参考,包含所有命令、选项和工作流
  • references/api_operations.md - Python API参考,包含程序化flow控制、实时更新、查询和应用集成模式的示例
当用户需要以下内容时,可加载这些参考文档:
  • 特定模式的详细示例
  • 完整API文档
  • 高级使用场景
  • 故障排除指南