Loading...
Loading...
This skill should be used when building data processing pipelines with CocoIndex v1, a Python library for incremental data transformation. Use when the task involves processing files/data into databases, creating vector embeddings, building knowledge graphs, ETL workflows, or any data pipeline requiring automatic change detection and incremental updates. CocoIndex v1 is Python-native (supports any Python types), has no DSL, and is currently under pre-release (version 1.0.0a1 or later).
npx skill4agent add cocoindex-io/cocoindex-claude cocoindex-v1TargetState = Transform(SourceState)cocoindex init my-project
cd my-projectmain.pypyproject.toml.envREADME.mdpyproject.toml# For vector embeddings
dependencies = ["cocoindex>=1.0.0a1", "sentence-transformers", "asyncpg"]
# For PostgreSQL only
dependencies = ["cocoindex>=1.0.0a1", "asyncpg"]
# For LLM extraction
dependencies = ["cocoindex>=1.0.0a1", "litellm", "instructor", "pydantic>=2.0"]# Create docker-compose.yml with pgvector image
docker-compose up -d# Create docker-compose.yml with Qdrant image
docker-compose up -dpip install -e .
cocoindex update main.pyimport cocoindex as coco
@coco.function
def app_main(sourcedir: pathlib.Path, outdir: pathlib.Path) -> None:
# Processing logic here
...
app = coco.App(
coco.AppConfig(name="MyApp"),
app_main,
sourcedir=pathlib.Path("./data"),
outdir=pathlib.Path("./output"),
)
if __name__ == "__main__":
app.update(report_to_stdout=True)coco_aio.mount_each()coco_aio.mount()# Preferred: mount one component per item (async, keyed iterable)
await coco_aio.mount_each(process_file, files.items(), target_table)
# Equivalent async manual loop
for key, f in files.items():
await coco_aio.mount(coco.component_subpath(key), process_file, f, target_table)
# Sync mount — only for CPU-intensive leaf components (no I/O)
coco.mount(coco.component_subpath(str(f.file_path.path)), process_file, f, target_table)use_mount()result = await coco_aio.use_mount(subpath, fn, *args)target_table = await target_db.mount_table_target(
table_name="my_table",
table_schema=await postgres.TableSchema.from_class(MyRecord, primary_key=["id"]),
)coco_aio.mount_each()coco_aio.mount()coco.mount()use_mount()memo=True@coco.function(memo=True)
def expensive_operation(data: str) -> Result:
# LLM call, embedding generation, heavy computation
result = expensive_transform(data)
return result# File target
localfs.declare_file(outdir / "output.txt", content)
# Database row target
table.declare_row(row=MyRecord(id=1, name="example"))
# Vector point target (Qdrant)
collection.declare_point(point=PointStruct(id="1", vector=[...]))ContextKeyEMBEDDER = coco.ContextKey[SentenceTransformerEmbedder]("embedder")
@coco.lifespan
def coco_lifespan(builder: coco.EnvironmentBuilder):
embedder = SentenceTransformerEmbedder("all-MiniLM-L6-v2")
builder.provide(EMBEDDER, embedder)
yield@coco.lifespan@coco.function
def process_item(text: str) -> None:
embedder = coco.use_context(EMBEDDER)
embedding = embedder.embed(text)from cocoindex.resources.id import generate_id, IdGenerator
# Deterministic: same dep → same ID
chunk_id = generate_id(chunk.content)
# Always distinct: each call → new ID, even with same dep
id_gen = IdGenerator()
for chunk in chunks:
chunk_id = id_gen.next_id(chunk.content)
table.declare_row(row=Row(id=chunk_id, content=chunk.content))generate_id(dep)IdGeneratorimport pathlib
import cocoindex as coco
import cocoindex.asyncio as coco_aio
from cocoindex.connectors import localfs
from cocoindex.resources.file import PatternFilePathMatcher
@coco.function(memo=True)
def process_file(file, outdir):
# CPU-bound transform — sync is fine here at the leaf
content = file.read_text()
transformed = transform_content(content) # Your logic
outname = file.file_path.path.stem + ".out"
localfs.declare_file(outdir / outname, transformed, create_parent_dirs=True)
@coco.function
async def app_main(sourcedir, outdir):
files = localfs.walk_dir(
sourcedir,
recursive=True,
path_matcher=PatternFilePathMatcher(
included_patterns=["*.txt", "*.md"],
excluded_patterns=[".*/**"],
),
)
await coco_aio.mount_each(process_file, files.items(), outdir)
app = coco_aio.App(coco_aio.AppConfig(name="Transform"), app_main, sourcedir=pathlib.Path("./data"), outdir=pathlib.Path("./out"))import pathlib
from dataclasses import dataclass
from typing import Annotated, AsyncIterator
import cocoindex as coco
import cocoindex.asyncio as coco_aio
from cocoindex.connectors import localfs, postgres
from cocoindex.ops.text import RecursiveSplitter
from cocoindex.ops.sentence_transformers import SentenceTransformerEmbedder
from cocoindex.resources.chunk import Chunk
from cocoindex.resources.file import FileLike, PatternFilePathMatcher
from cocoindex.resources.id import IdGenerator
from numpy.typing import NDArray
PG_DB = coco.ContextKey[postgres.PgDatabase]("pg_db")
_embedder = SentenceTransformerEmbedder("sentence-transformers/all-MiniLM-L6-v2")
_splitter = RecursiveSplitter()
@dataclass
class DocEmbedding:
id: int # Generated stable ID
filename: str
text: str
embedding: Annotated[NDArray, _embedder] # Auto-infer dimensions
chunk_start: int
chunk_end: int
@coco_aio.lifespan
async def coco_lifespan(builder: coco_aio.EnvironmentBuilder) -> AsyncIterator[None]:
async with await postgres.create_pool(DATABASE_URL) as pool:
builder.provide(PG_DB, postgres.register_db("embedding_db", pool))
yield
@coco.function
async def process_chunk(chunk: Chunk, filename: pathlib.PurePath, id_gen: IdGenerator, table):
table.declare_row(
row=DocEmbedding(
id=await id_gen.next_id(chunk.text),
filename=str(filename),
text=chunk.text,
embedding=await _embedder.embed(chunk.text),
chunk_start=chunk.start.char_offset,
chunk_end=chunk.end.char_offset,
),
)
@coco.function(memo=True)
async def process_file(file: FileLike, table):
text = file.read_text()
chunks = _splitter.split(text, chunk_size=1000, chunk_overlap=200)
id_gen = IdGenerator()
await coco_aio.map(process_chunk, chunks, file.file_path.path, id_gen, table)
@coco.function
async def app_main(sourcedir: pathlib.Path):
target_db = coco.use_context(PG_DB)
target_table = await target_db.mount_table_target(
table_name="embeddings",
table_schema=await postgres.TableSchema.from_class(
DocEmbedding, primary_key=["id"],
),
)
files = localfs.walk_dir(sourcedir, recursive=True)
await coco_aio.mount_each(process_file, files.items(), target_table)
app = coco_aio.App(coco_aio.AppConfig(name="Embedding"), app_main, sourcedir=Path("./data"))import instructor
from pydantic import BaseModel
from litellm import acompletion
_instructor_client = instructor.from_litellm(acompletion, mode=instructor.Mode.JSON)
class ExtractionResult(BaseModel):
title: str
topics: list[str]
@coco.function(memo=True) # Memo avoids re-calling LLM
async def extract_and_store(content, message_id, table):
result = await _instructor_client.chat.completions.create(
model="gpt-4",
response_model=ExtractionResult,
messages=[{"role": "user", "content": f"Extract topics: {content}"}],
)
table.declare_row(row=Message(id=message_id, title=result.title, content=content))from cocoindex.ops.text import RecursiveSplitter, detect_code_language
splitter = RecursiveSplitter()
language = detect_code_language(filename="example.py")
chunks = splitter.split(
text,
chunk_size=1000,
min_chunk_size=300,
chunk_overlap=200,
language=language, # Syntax-aware splitting
)from cocoindex.ops.sentence_transformers import SentenceTransformerEmbedder
embedder = SentenceTransformerEmbedder("sentence-transformers/all-MiniLM-L6-v2")
# Sync
embedding = embedder.embed(text)
# Async
embedding = await embedder.embed_async(text)cocoindex update main.py # Run app in main.py
cocoindex update main.py:my_app # Run specific app
cocoindex update my_module:my_app # Run from modulecocoindex drop main.py [-f] # Drop and resetcocoindex ls main.py # List apps in file
cocoindex ls --db ./cocoindex.db # List apps in DBcocoindex show main.py # Show component tree# ✅ Good: Stable identifiers
coco.component_subpath("file", str(file.file_path.path))
coco.component_subpath("record", record.id)
# ❌ Bad: Unstable identifiers
coco.component_subpath("file", file) # Object reference
coco.component_subpath("idx", idx) # Index changes# ✅ Good: Memoize expensive ops
@coco.function(memo=True)
async def process_chunk(chunk, table):
embedding = await embedder.embed_async(chunk.text) # Expensive!
table.declare_row(...)
# ❌ Bad: No memoization
@coco.function # Re-embeds every run
async def process_chunk(chunk, table):
embedding = await embedder.embed_async(chunk.text)# ✅ Good: Load model once
@coco.lifespan
def coco_lifespan(builder):
model = load_expensive_model()
builder.provide(MODEL_KEY, model)
yield
# ❌ Bad: Load model every time
@coco.function
def process(data):
model = load_expensive_model() # Loaded repeatedly!# ✅ Good: Type-safe
from dataclasses import dataclass
from typing import Annotated
from numpy.typing import NDArray
@dataclass
class Record:
id: int
name: str
vector: Annotated[NDArray, embedder] # Auto-infer dimensions
# ❌ Bad: No type safety
record = {"id": 1, "name": "example", "vector": [...]}# Target setup — subpath is automatic
table = await target_db.mount_table_target(
table_name="my_table",
table_schema=await postgres.TableSchema.from_class(MyRecord, primary_key=["id"]),
)
# Iterate with mount_each — keys become component subpaths
await coco_aio.mount_each(process_item, items.items(), table)# ✅ Default: async mount for I/O-bound or general-purpose components
@coco.function
async def app_main(sourcedir):
await coco_aio.mount_each(process_file, files.items(), table) # list of items
await coco_aio.mount(coco.component_subpath("setup"), setup_fn) # single component
# ✅ Sync mount only when the leaf function is CPU-intensive (no I/O)
@coco.function(memo=True)
def cpu_heavy_leaf(data: str) -> Result:
return expensive_computation(data) # Pure CPU work, no async needed
# ❌ Don't use sync mount inside async app_main for general components
@coco.function
async def app_main(sourcedir):
for key, f in files.items():
coco.mount(coco.component_subpath(key), process_file, f) # Use await coco_aio.mount() instead| Before | After |
|---|---|
| |
| |
| |
| |
[tool.uv]
prerelease = "explicit"# Connect to your database and enable the extension
psql "postgres://localhost/db" -c "CREATE EXTENSION IF NOT EXISTS vector;"# Use stable IDs, not object references
coco.component_subpath(file.stable_key) # ✅
coco.component_subpath(file) # ❌memo=True@coco.function(memo=True) # Add this
async def process_item(item):
...>=1.0.0a1