atproto-ingest

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

AT Protocol Data Ingestion

AT Protocol 数据采集

Layer 1 - Data Acquisition for Bluesky/AT Protocol social graph and content.
GF(3) Trit: +1 (Generator) — Produces data streams for downstream processing.
第一层 - 针对Bluesky/AT Protocol社交图谱与内容的数据采集。
GF(3) Trit: +1(生成器) — 为下游处理生成数据流。

Authentication

身份验证

App Password (Recommended for scripts)

应用密码(推荐用于脚本)

bash
undefined
bash
undefined

Create session

创建会话

curl -X POST https://bsky.social/xrpc/com.atproto.server.createSession
-H "Content-Type: application/json"
-d '{"identifier": "handle.bsky.social", "password": "app-password-here"}'
curl -X POST https://bsky.social/xrpc/com.atproto.server.createSession
-H "Content-Type: application/json"
-d '{"identifier": "handle.bsky.social", "password": "app-password-here"}'

Response contains accessJwt and refreshJwt

响应包含accessJwt和refreshJwt

export BSKY_TOKEN="eyJ..."
undefined
export BSKY_TOKEN="eyJ..."
undefined

OAuth (For apps)

OAuth(适用于应用程序)

bash
undefined
bash
undefined

Scopes: atproto, transition:generic

权限范围: atproto, transition:generic

undefined
undefined

Capabilities

功能特性

1. fetch-user-posts

1. 获取用户帖子

Get all posts from a user by handle or DID.
bash
undefined
通过handle或DID获取某用户的所有帖子。
bash
undefined

Resolve handle to DID

将handle解析为DID

Get author feed (paginated)

获取作者动态(分页)


**Pagination loop:**
```python
def fetch_all_posts(actor: str, token: str) -> list:
    posts, cursor = [], None
    while True:
        url = f"https://bsky.social/xrpc/app.bsky.feed.getAuthorFeed?actor={actor}&limit=100"
        if cursor:
            url += f"&cursor={cursor}"
        resp = requests.get(url, headers={"Authorization": f"Bearer {token}"})
        data = resp.json()
        posts.extend(data.get("feed", []))
        cursor = data.get("cursor")
        if not cursor:
            break
        time.sleep(0.5)  # Rate limit respect
    return posts

**分页循环:**
```python
def fetch_all_posts(actor: str, token: str) -> list:
    posts, cursor = [], None
    while True:
        url = f"https://bsky.social/xrpc/app.bsky.feed.getAuthorFeed?actor={actor}&limit=100"
        if cursor:
            url += f"&cursor={cursor}"
        resp = requests.get(url, headers={"Authorization": f"Bearer {token}"})
        data = resp.json()
        posts.extend(data.get("feed", []))
        cursor = data.get("cursor")
        if not cursor:
            break
        time.sleep(0.5)  # 遵守速率限制
    return posts

2. get-engagement-graph

2. 获取互动图谱

Map likes, reposts, replies, and quotes for a post.
bash
undefined
映射帖子的点赞、转发、回复和引用关系。
bash
undefined

Get likes

获取点赞列表

Get reposts

获取转发列表

Get quotes (search for post URI)

获取引用列表(搜索帖子URI)


**Engagement record schema:**
```sql
CREATE TABLE engagement (
    post_uri VARCHAR PRIMARY KEY,
    author_did VARCHAR,
    like_count INTEGER,
    repost_count INTEGER,
    reply_count INTEGER,
    quote_count INTEGER,
    likers VARCHAR[],      -- Array of DIDs
    reposters VARCHAR[],
    indexed_at TIMESTAMP
);

**互动记录表结构:**
```sql
CREATE TABLE engagement (
    post_uri VARCHAR PRIMARY KEY,
    author_did VARCHAR,
    like_count INTEGER,
    repost_count INTEGER,
    reply_count INTEGER,
    quote_count INTEGER,
    likers VARCHAR[],      -- DID数组
    reposters VARCHAR[],
    indexed_at TIMESTAMP
);

3. stream-mentions

3. 提及流监控

Real-time monitoring via Firehose or polling.
bash
undefined
通过Firehose或轮询实现实时监控。
bash
undefined

Firehose (WebSocket) - all network events

Firehose(WebSocket)- 全网络事件

wscat -c wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos
wscat -c wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos

Polling fallback - notifications endpoint

轮询备选方案 - 通知端点


**Firehose filter (Python):**
```python
import websocket
import cbor2

def on_message(ws, message):
    header, body = cbor2.loads(message)
    if header.get("op") == 1:  # Commit
        for op in body.get("ops", []):
            if "app.bsky.feed.post" in op.get("path", ""):
                record = op.get("record", {})
                # Check for mentions in facets
                for facet in record.get("facets", []):
                    for feature in facet.get("features", []):
                        if feature.get("$type") == "app.bsky.richtext.facet#mention":
                            if feature.get("did") == TARGET_DID:
                                yield record

ws = websocket.WebSocketApp("wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos",
                             on_message=on_message)

**Firehose 过滤器(Python):**
```python
import websocket
import cbor2

def on_message(ws, message):
    header, body = cbor2.loads(message)
    if header.get("op") == 1:  # 提交
        for op in body.get("ops", []):
            if "app.bsky.feed.post" in op.get("path", ""):
                record = op.get("record", {})
                # 检查富文本中的提及
                for facet in record.get("facets", []):
                    for feature in facet.get("features", []):
                        if feature.get("$type") == "app.bsky.richtext.facet#mention":
                            if feature.get("did") == TARGET_DID:
                                yield record

ws = websocket.WebSocketApp("wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos",
                             on_message=on_message)

4. extract-thread-tree

4. 提取对话线程树

Full conversation tree from a post.
bash
curl "https://bsky.social/xrpc/app.bsky.feed.getPostThread?uri=at://did:plc:xxx/app.bsky.feed.post/yyy&depth=10&parentHeight=10" \
  -H "Authorization: Bearer $BSKY_TOKEN"
Thread tree structure:
python
def extract_thread_tree(thread_data: dict) -> dict:
    """Flatten thread to adjacency list for DuckDB."""
    nodes = []
    edges = []
    
    def walk(node, parent_uri=None):
        post = node.get("post", {})
        uri = post.get("uri")
        nodes.append({
            "uri": uri,
            "author_did": post.get("author", {}).get("did"),
            "text": post.get("record", {}).get("text"),
            "created_at": post.get("record", {}).get("createdAt"),
            "depth": node.get("depth", 0)
        })
        if parent_uri:
            edges.append({"parent": parent_uri, "child": uri})
        for reply in node.get("replies", []):
            walk(reply, uri)
    
    walk(thread_data.get("thread", {}))
    return {"nodes": nodes, "edges": edges}
获取帖子的完整对话树。
bash
curl "https://bsky.social/xrpc/app.bsky.feed.getPostThread?uri=at://did:plc:xxx/app.bsky.feed.post/yyy&depth=10&parentHeight=10" \
  -H "Authorization: Bearer $BSKY_TOKEN"
对话线程树结构处理:
python
def extract_thread_tree(thread_data: dict) -> dict:
    """将线程树扁平化为邻接表,用于DuckDB。"""
    nodes = []
    edges = []
    
    def walk(node, parent_uri=None):
        post = node.get("post", {})
        uri = post.get("uri")
        nodes.append({
            "uri": uri,
            "author_did": post.get("author", {}).get("did"),
            "text": post.get("record", {}).get("text"),
            "created_at": post.get("record", {}).get("createdAt"),
            "depth": node.get("depth", 0)
        })
        if parent_uri:
            edges.append({"parent": parent_uri, "child": uri})
        for reply in node.get("replies", []):
            walk(reply, uri)
    
    walk(thread_data.get("thread", {}))
    return {"nodes": nodes, "edges": edges}

5. get-follower-network

5. 获取关注者网络

First and second-order connections.
bash
undefined
获取一阶和二阶关联关系。
bash
undefined

Followers

关注者列表

Following

关注列表


**Second-order expansion:**
```python
def get_follower_network(actor: str, depth: int = 2) -> dict:
    """BFS expansion of follower graph."""
    visited = set()
    edges = []
    queue = [(actor, 0)]
    
    while queue:
        current, d = queue.pop(0)
        if current in visited or d > depth:
            continue
        visited.add(current)
        
        followers = paginate_all("app.bsky.graph.getFollowers", actor=current)
        for f in followers:
            did = f["did"]
            edges.append({"from": did, "to": current, "type": "follows"})
            if d < depth:
                queue.append((did, d + 1))
    
    return {"nodes": list(visited), "edges": edges}

**二阶关联扩展:**
```python
def get_follower_network(actor: str, depth: int = 2) -> dict:
    """通过广度优先搜索扩展关注者图谱。"""
    visited = set()
    edges = []
    queue = [(actor, 0)]
    
    while queue:
        current, d = queue.pop(0)
        if current in visited or d > depth:
            continue
        visited.add(current)
        
        followers = paginate_all("app.bsky.graph.getFollowers", actor=current)
        for f in followers:
            did = f["did"]
            edges.append({"from": did, "to": current, "type": "follows"})
            if d < depth:
                queue.append((did, d + 1))
    
    return {"nodes": list(visited), "edges": edges}

Rate Limiting

速率限制

Endpoint CategoryRate LimitWindow
Read (feed, graph)3000/5minRolling
Write (post, like)1500/hrRolling
FirehoseUnlimited-
Search100/minRolling
Backoff strategy:
python
def rate_limited_request(url, headers, max_retries=5):
    for attempt in range(max_retries):
        resp = requests.get(url, headers=headers)
        if resp.status_code == 429:
            wait = int(resp.headers.get("Retry-After", 2 ** attempt))
            time.sleep(wait)
            continue
        return resp
    raise Exception("Rate limit exceeded")
端点类别速率限制时间窗口
读取(动态、图谱)3000次/5分钟滚动窗口
写入(发帖、点赞)1500次/小时滚动窗口
Firehose无限制-
搜索100次/分钟滚动窗口
退避策略:
python
def rate_limited_request(url, headers, max_retries=5):
    for attempt in range(max_retries):
        resp = requests.get(url, headers=headers)
        if resp.status_code == 429:
            wait = int(resp.headers.get("Retry-After", 2 ** attempt))
            time.sleep(wait)
            continue
        return resp
    raise Exception("速率限制超出上限")

DuckDB Ingestion Format

DuckDB 导入格式

sql
-- Posts table
CREATE TABLE bsky_posts (
    uri VARCHAR PRIMARY KEY,
    cid VARCHAR,
    author_did VARCHAR,
    text TEXT,
    created_at TIMESTAMP,
    reply_parent VARCHAR,
    reply_root VARCHAR,
    embed_type VARCHAR,
    langs VARCHAR[],
    labels VARCHAR[],
    ingested_at TIMESTAMP DEFAULT now()
);

-- Social graph
CREATE TABLE bsky_graph (
    subject_did VARCHAR,
    object_did VARCHAR,
    relation VARCHAR,  -- 'follows', 'blocks', 'mutes'
    created_at TIMESTAMP,
    PRIMARY KEY (subject_did, object_did, relation)
);

-- Engagement events
CREATE TABLE bsky_engagement (
    uri VARCHAR,
    actor_did VARCHAR,
    action VARCHAR,  -- 'like', 'repost', 'quote', 'reply'
    target_uri VARCHAR,
    created_at TIMESTAMP,
    PRIMARY KEY (uri)
);

-- Insert from JSON
COPY bsky_posts FROM 'posts.json' (FORMAT JSON, ARRAY true);
sql
-- 帖子表
CREATE TABLE bsky_posts (
    uri VARCHAR PRIMARY KEY,
    cid VARCHAR,
    author_did VARCHAR,
    text TEXT,
    created_at TIMESTAMP,
    reply_parent VARCHAR,
    reply_root VARCHAR,
    embed_type VARCHAR,
    langs VARCHAR[],
    labels VARCHAR[],
    ingested_at TIMESTAMP DEFAULT now()
);

-- 社交图谱表
CREATE TABLE bsky_graph (
    subject_did VARCHAR,
    object_did VARCHAR,
    relation VARCHAR,  -- 'follows', 'blocks', 'mutes'
    created_at TIMESTAMP,
    PRIMARY KEY (subject_did, object_did, relation)
);

-- 互动事件表
CREATE TABLE bsky_engagement (
    uri VARCHAR,
    actor_did VARCHAR,
    action VARCHAR,  -- 'like', 'repost', 'quote', 'reply'
    target_uri VARCHAR,
    created_at TIMESTAMP,
    PRIMARY KEY (uri)
);

-- 从JSON导入数据
COPY bsky_posts FROM 'posts.json' (FORMAT JSON, ARRAY true);

AT Protocol Lexicon Reference

AT Protocol 词典参考

LexiconPurpose
app.bsky.feed.getAuthorFeed
User's posts
app.bsky.feed.getPostThread
Thread tree
app.bsky.feed.getLikes
Who liked
app.bsky.feed.getRepostedBy
Who reposted
app.bsky.graph.getFollowers
Follower list
app.bsky.graph.getFollows
Following list
app.bsky.actor.getProfile
User profile
com.atproto.sync.subscribeRepos
Firehose
词典用途
app.bsky.feed.getAuthorFeed
获取用户帖子
app.bsky.feed.getPostThread
获取对话线程树
app.bsky.feed.getLikes
获取点赞者列表
app.bsky.feed.getRepostedBy
获取转发者列表
app.bsky.graph.getFollowers
获取关注者列表
app.bsky.graph.getFollows
获取关注列表
app.bsky.actor.getProfile
获取用户资料
com.atproto.sync.subscribeRepos
Firehose 订阅

Integration with Layer 2

与第二层的集成

Output feeds into:
  • GF(3) Trit 0 (Transformer): Sentiment analysis, embedding generation
  • GF(3) Trit -1 (Consumer): Dashboard display, alert triggers
python
undefined
将输出流导入至:
  • GF(3) Trit 0(转换器):情感分析、嵌入生成
  • GF(3) Trit -1(消费者):仪表板展示、警报触发
python
undefined

Pipeline handoff

流水线交接

posts = fetch_all_posts(actor, token) duckdb.execute("INSERT INTO bsky_posts SELECT * FROM read_json(?)", [posts])
posts = fetch_all_posts(actor, token) duckdb.execute("INSERT INTO bsky_posts SELECT * FROM read_json(?)", [posts])

Signal next layer

向下一层发送信号

publish_event("bsky.ingestion.complete", {"actor": actor, "count": len(posts)})
undefined
publish_event("bsky.ingestion.complete", {"actor": actor, "count": len(posts)})
undefined