atproto-ingest
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseAT Protocol Data Ingestion
AT Protocol 数据采集
Layer 1 - Data Acquisition for Bluesky/AT Protocol social graph and content.
GF(3) Trit: +1 (Generator) — Produces data streams for downstream processing.
第一层 - 针对Bluesky/AT Protocol社交图谱与内容的数据采集。
GF(3) Trit: +1(生成器) — 为下游处理生成数据流。
Authentication
身份验证
App Password (Recommended for scripts)
应用密码(推荐用于脚本)
bash
undefinedbash
undefinedCreate session
创建会话
curl -X POST https://bsky.social/xrpc/com.atproto.server.createSession
-H "Content-Type: application/json"
-d '{"identifier": "handle.bsky.social", "password": "app-password-here"}'
-H "Content-Type: application/json"
-d '{"identifier": "handle.bsky.social", "password": "app-password-here"}'
curl -X POST https://bsky.social/xrpc/com.atproto.server.createSession
-H "Content-Type: application/json"
-d '{"identifier": "handle.bsky.social", "password": "app-password-here"}'
-H "Content-Type: application/json"
-d '{"identifier": "handle.bsky.social", "password": "app-password-here"}'
Response contains accessJwt and refreshJwt
响应包含accessJwt和refreshJwt
export BSKY_TOKEN="eyJ..."
undefinedexport BSKY_TOKEN="eyJ..."
undefinedOAuth (For apps)
OAuth(适用于应用程序)
bash
undefinedbash
undefinedAuthorization endpoint: https://bsky.social/oauth/authorize
Token endpoint: https://bsky.social/oauth/token
Scopes: atproto, transition:generic
权限范围: atproto, transition:generic
undefinedundefinedCapabilities
功能特性
1. fetch-user-posts
1. 获取用户帖子
Get all posts from a user by handle or DID.
bash
undefined通过handle或DID获取某用户的所有帖子。
bash
undefinedResolve handle to DID
将handle解析为DID
Get author feed (paginated)
获取作者动态(分页)
curl "https://bsky.social/xrpc/app.bsky.feed.getAuthorFeed?actor=did:plc:xxx&limit=100&cursor=$CURSOR"
-H "Authorization: Bearer $BSKY_TOKEN"
-H "Authorization: Bearer $BSKY_TOKEN"
**Pagination loop:**
```python
def fetch_all_posts(actor: str, token: str) -> list:
posts, cursor = [], None
while True:
url = f"https://bsky.social/xrpc/app.bsky.feed.getAuthorFeed?actor={actor}&limit=100"
if cursor:
url += f"&cursor={cursor}"
resp = requests.get(url, headers={"Authorization": f"Bearer {token}"})
data = resp.json()
posts.extend(data.get("feed", []))
cursor = data.get("cursor")
if not cursor:
break
time.sleep(0.5) # Rate limit respect
return postscurl "https://bsky.social/xrpc/app.bsky.feed.getAuthorFeed?actor=did:plc:xxx&limit=100&cursor=$CURSOR"
-H "Authorization: Bearer $BSKY_TOKEN"
-H "Authorization: Bearer $BSKY_TOKEN"
**分页循环:**
```python
def fetch_all_posts(actor: str, token: str) -> list:
posts, cursor = [], None
while True:
url = f"https://bsky.social/xrpc/app.bsky.feed.getAuthorFeed?actor={actor}&limit=100"
if cursor:
url += f"&cursor={cursor}"
resp = requests.get(url, headers={"Authorization": f"Bearer {token}"})
data = resp.json()
posts.extend(data.get("feed", []))
cursor = data.get("cursor")
if not cursor:
break
time.sleep(0.5) # 遵守速率限制
return posts2. get-engagement-graph
2. 获取互动图谱
Map likes, reposts, replies, and quotes for a post.
bash
undefined映射帖子的点赞、转发、回复和引用关系。
bash
undefinedGet likes
获取点赞列表
curl "https://bsky.social/xrpc/app.bsky.feed.getLikes?uri=at://did:plc:xxx/app.bsky.feed.post/yyy&limit=100"
-H "Authorization: Bearer $BSKY_TOKEN"
-H "Authorization: Bearer $BSKY_TOKEN"
curl "https://bsky.social/xrpc/app.bsky.feed.getLikes?uri=at://did:plc:xxx/app.bsky.feed.post/yyy&limit=100"
-H "Authorization: Bearer $BSKY_TOKEN"
-H "Authorization: Bearer $BSKY_TOKEN"
Get reposts
获取转发列表
curl "https://bsky.social/xrpc/app.bsky.feed.getRepostedBy?uri=at://did:plc:xxx/app.bsky.feed.post/yyy&limit=100"
-H "Authorization: Bearer $BSKY_TOKEN"
-H "Authorization: Bearer $BSKY_TOKEN"
curl "https://bsky.social/xrpc/app.bsky.feed.getRepostedBy?uri=at://did:plc:xxx/app.bsky.feed.post/yyy&limit=100"
-H "Authorization: Bearer $BSKY_TOKEN"
-H "Authorization: Bearer $BSKY_TOKEN"
Get quotes (search for post URI)
获取引用列表(搜索帖子URI)
curl "https://bsky.social/xrpc/app.bsky.feed.getQuotes?uri=at://did:plc:xxx/app.bsky.feed.post/yyy&limit=100"
-H "Authorization: Bearer $BSKY_TOKEN"
-H "Authorization: Bearer $BSKY_TOKEN"
**Engagement record schema:**
```sql
CREATE TABLE engagement (
post_uri VARCHAR PRIMARY KEY,
author_did VARCHAR,
like_count INTEGER,
repost_count INTEGER,
reply_count INTEGER,
quote_count INTEGER,
likers VARCHAR[], -- Array of DIDs
reposters VARCHAR[],
indexed_at TIMESTAMP
);curl "https://bsky.social/xrpc/app.bsky.feed.getQuotes?uri=at://did:plc:xxx/app.bsky.feed.post/yyy&limit=100"
-H "Authorization: Bearer $BSKY_TOKEN"
-H "Authorization: Bearer $BSKY_TOKEN"
**互动记录表结构:**
```sql
CREATE TABLE engagement (
post_uri VARCHAR PRIMARY KEY,
author_did VARCHAR,
like_count INTEGER,
repost_count INTEGER,
reply_count INTEGER,
quote_count INTEGER,
likers VARCHAR[], -- DID数组
reposters VARCHAR[],
indexed_at TIMESTAMP
);3. stream-mentions
3. 提及流监控
Real-time monitoring via Firehose or polling.
bash
undefined通过Firehose或轮询实现实时监控。
bash
undefinedFirehose (WebSocket) - all network events
Firehose(WebSocket)- 全网络事件
wscat -c wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos
wscat -c wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos
Polling fallback - notifications endpoint
轮询备选方案 - 通知端点
curl "https://bsky.social/xrpc/app.bsky.notification.listNotifications?limit=50"
-H "Authorization: Bearer $BSKY_TOKEN"
-H "Authorization: Bearer $BSKY_TOKEN"
**Firehose filter (Python):**
```python
import websocket
import cbor2
def on_message(ws, message):
header, body = cbor2.loads(message)
if header.get("op") == 1: # Commit
for op in body.get("ops", []):
if "app.bsky.feed.post" in op.get("path", ""):
record = op.get("record", {})
# Check for mentions in facets
for facet in record.get("facets", []):
for feature in facet.get("features", []):
if feature.get("$type") == "app.bsky.richtext.facet#mention":
if feature.get("did") == TARGET_DID:
yield record
ws = websocket.WebSocketApp("wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos",
on_message=on_message)curl "https://bsky.social/xrpc/app.bsky.notification.listNotifications?limit=50"
-H "Authorization: Bearer $BSKY_TOKEN"
-H "Authorization: Bearer $BSKY_TOKEN"
**Firehose 过滤器(Python):**
```python
import websocket
import cbor2
def on_message(ws, message):
header, body = cbor2.loads(message)
if header.get("op") == 1: # 提交
for op in body.get("ops", []):
if "app.bsky.feed.post" in op.get("path", ""):
record = op.get("record", {})
# 检查富文本中的提及
for facet in record.get("facets", []):
for feature in facet.get("features", []):
if feature.get("$type") == "app.bsky.richtext.facet#mention":
if feature.get("did") == TARGET_DID:
yield record
ws = websocket.WebSocketApp("wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos",
on_message=on_message)4. extract-thread-tree
4. 提取对话线程树
Full conversation tree from a post.
bash
curl "https://bsky.social/xrpc/app.bsky.feed.getPostThread?uri=at://did:plc:xxx/app.bsky.feed.post/yyy&depth=10&parentHeight=10" \
-H "Authorization: Bearer $BSKY_TOKEN"Thread tree structure:
python
def extract_thread_tree(thread_data: dict) -> dict:
"""Flatten thread to adjacency list for DuckDB."""
nodes = []
edges = []
def walk(node, parent_uri=None):
post = node.get("post", {})
uri = post.get("uri")
nodes.append({
"uri": uri,
"author_did": post.get("author", {}).get("did"),
"text": post.get("record", {}).get("text"),
"created_at": post.get("record", {}).get("createdAt"),
"depth": node.get("depth", 0)
})
if parent_uri:
edges.append({"parent": parent_uri, "child": uri})
for reply in node.get("replies", []):
walk(reply, uri)
walk(thread_data.get("thread", {}))
return {"nodes": nodes, "edges": edges}获取帖子的完整对话树。
bash
curl "https://bsky.social/xrpc/app.bsky.feed.getPostThread?uri=at://did:plc:xxx/app.bsky.feed.post/yyy&depth=10&parentHeight=10" \
-H "Authorization: Bearer $BSKY_TOKEN"对话线程树结构处理:
python
def extract_thread_tree(thread_data: dict) -> dict:
"""将线程树扁平化为邻接表,用于DuckDB。"""
nodes = []
edges = []
def walk(node, parent_uri=None):
post = node.get("post", {})
uri = post.get("uri")
nodes.append({
"uri": uri,
"author_did": post.get("author", {}).get("did"),
"text": post.get("record", {}).get("text"),
"created_at": post.get("record", {}).get("createdAt"),
"depth": node.get("depth", 0)
})
if parent_uri:
edges.append({"parent": parent_uri, "child": uri})
for reply in node.get("replies", []):
walk(reply, uri)
walk(thread_data.get("thread", {}))
return {"nodes": nodes, "edges": edges}5. get-follower-network
5. 获取关注者网络
First and second-order connections.
bash
undefined获取一阶和二阶关联关系。
bash
undefinedFollowers
关注者列表
curl "https://bsky.social/xrpc/app.bsky.graph.getFollowers?actor=did:plc:xxx&limit=100&cursor=$CURSOR"
-H "Authorization: Bearer $BSKY_TOKEN"
-H "Authorization: Bearer $BSKY_TOKEN"
curl "https://bsky.social/xrpc/app.bsky.graph.getFollowers?actor=did:plc:xxx&limit=100&cursor=$CURSOR"
-H "Authorization: Bearer $BSKY_TOKEN"
-H "Authorization: Bearer $BSKY_TOKEN"
Following
关注列表
curl "https://bsky.social/xrpc/app.bsky.graph.getFollows?actor=did:plc:xxx&limit=100&cursor=$CURSOR"
-H "Authorization: Bearer $BSKY_TOKEN"
-H "Authorization: Bearer $BSKY_TOKEN"
**Second-order expansion:**
```python
def get_follower_network(actor: str, depth: int = 2) -> dict:
"""BFS expansion of follower graph."""
visited = set()
edges = []
queue = [(actor, 0)]
while queue:
current, d = queue.pop(0)
if current in visited or d > depth:
continue
visited.add(current)
followers = paginate_all("app.bsky.graph.getFollowers", actor=current)
for f in followers:
did = f["did"]
edges.append({"from": did, "to": current, "type": "follows"})
if d < depth:
queue.append((did, d + 1))
return {"nodes": list(visited), "edges": edges}curl "https://bsky.social/xrpc/app.bsky.graph.getFollows?actor=did:plc:xxx&limit=100&cursor=$CURSOR"
-H "Authorization: Bearer $BSKY_TOKEN"
-H "Authorization: Bearer $BSKY_TOKEN"
**二阶关联扩展:**
```python
def get_follower_network(actor: str, depth: int = 2) -> dict:
"""通过广度优先搜索扩展关注者图谱。"""
visited = set()
edges = []
queue = [(actor, 0)]
while queue:
current, d = queue.pop(0)
if current in visited or d > depth:
continue
visited.add(current)
followers = paginate_all("app.bsky.graph.getFollowers", actor=current)
for f in followers:
did = f["did"]
edges.append({"from": did, "to": current, "type": "follows"})
if d < depth:
queue.append((did, d + 1))
return {"nodes": list(visited), "edges": edges}Rate Limiting
速率限制
| Endpoint Category | Rate Limit | Window |
|---|---|---|
| Read (feed, graph) | 3000/5min | Rolling |
| Write (post, like) | 1500/hr | Rolling |
| Firehose | Unlimited | - |
| Search | 100/min | Rolling |
Backoff strategy:
python
def rate_limited_request(url, headers, max_retries=5):
for attempt in range(max_retries):
resp = requests.get(url, headers=headers)
if resp.status_code == 429:
wait = int(resp.headers.get("Retry-After", 2 ** attempt))
time.sleep(wait)
continue
return resp
raise Exception("Rate limit exceeded")| 端点类别 | 速率限制 | 时间窗口 |
|---|---|---|
| 读取(动态、图谱) | 3000次/5分钟 | 滚动窗口 |
| 写入(发帖、点赞) | 1500次/小时 | 滚动窗口 |
| Firehose | 无限制 | - |
| 搜索 | 100次/分钟 | 滚动窗口 |
退避策略:
python
def rate_limited_request(url, headers, max_retries=5):
for attempt in range(max_retries):
resp = requests.get(url, headers=headers)
if resp.status_code == 429:
wait = int(resp.headers.get("Retry-After", 2 ** attempt))
time.sleep(wait)
continue
return resp
raise Exception("速率限制超出上限")DuckDB Ingestion Format
DuckDB 导入格式
sql
-- Posts table
CREATE TABLE bsky_posts (
uri VARCHAR PRIMARY KEY,
cid VARCHAR,
author_did VARCHAR,
text TEXT,
created_at TIMESTAMP,
reply_parent VARCHAR,
reply_root VARCHAR,
embed_type VARCHAR,
langs VARCHAR[],
labels VARCHAR[],
ingested_at TIMESTAMP DEFAULT now()
);
-- Social graph
CREATE TABLE bsky_graph (
subject_did VARCHAR,
object_did VARCHAR,
relation VARCHAR, -- 'follows', 'blocks', 'mutes'
created_at TIMESTAMP,
PRIMARY KEY (subject_did, object_did, relation)
);
-- Engagement events
CREATE TABLE bsky_engagement (
uri VARCHAR,
actor_did VARCHAR,
action VARCHAR, -- 'like', 'repost', 'quote', 'reply'
target_uri VARCHAR,
created_at TIMESTAMP,
PRIMARY KEY (uri)
);
-- Insert from JSON
COPY bsky_posts FROM 'posts.json' (FORMAT JSON, ARRAY true);sql
-- 帖子表
CREATE TABLE bsky_posts (
uri VARCHAR PRIMARY KEY,
cid VARCHAR,
author_did VARCHAR,
text TEXT,
created_at TIMESTAMP,
reply_parent VARCHAR,
reply_root VARCHAR,
embed_type VARCHAR,
langs VARCHAR[],
labels VARCHAR[],
ingested_at TIMESTAMP DEFAULT now()
);
-- 社交图谱表
CREATE TABLE bsky_graph (
subject_did VARCHAR,
object_did VARCHAR,
relation VARCHAR, -- 'follows', 'blocks', 'mutes'
created_at TIMESTAMP,
PRIMARY KEY (subject_did, object_did, relation)
);
-- 互动事件表
CREATE TABLE bsky_engagement (
uri VARCHAR,
actor_did VARCHAR,
action VARCHAR, -- 'like', 'repost', 'quote', 'reply'
target_uri VARCHAR,
created_at TIMESTAMP,
PRIMARY KEY (uri)
);
-- 从JSON导入数据
COPY bsky_posts FROM 'posts.json' (FORMAT JSON, ARRAY true);AT Protocol Lexicon Reference
AT Protocol 词典参考
| Lexicon | Purpose |
|---|---|
| User's posts |
| Thread tree |
| Who liked |
| Who reposted |
| Follower list |
| Following list |
| User profile |
| Firehose |
| 词典 | 用途 |
|---|---|
| 获取用户帖子 |
| 获取对话线程树 |
| 获取点赞者列表 |
| 获取转发者列表 |
| 获取关注者列表 |
| 获取关注列表 |
| 获取用户资料 |
| Firehose 订阅 |
Integration with Layer 2
与第二层的集成
Output feeds into:
- GF(3) Trit 0 (Transformer): Sentiment analysis, embedding generation
- GF(3) Trit -1 (Consumer): Dashboard display, alert triggers
python
undefined将输出流导入至:
- GF(3) Trit 0(转换器):情感分析、嵌入生成
- GF(3) Trit -1(消费者):仪表板展示、警报触发
python
undefinedPipeline handoff
流水线交接
posts = fetch_all_posts(actor, token)
duckdb.execute("INSERT INTO bsky_posts SELECT * FROM read_json(?)", [posts])
posts = fetch_all_posts(actor, token)
duckdb.execute("INSERT INTO bsky_posts SELECT * FROM read_json(?)", [posts])
Signal next layer
向下一层发送信号
publish_event("bsky.ingestion.complete", {"actor": actor, "count": len(posts)})
undefinedpublish_event("bsky.ingestion.complete", {"actor": actor, "count": len(posts)})
undefined