reddit-marketing-agent

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Reddit Marketing Agent

Reddit营销Agent

Skill by ara.so — Marketing Skills collection.
This skill enables AI coding agents to help developers use the Reddit Marketing Agent, a system for researching relevant Reddit threads, generating useful responses, and turning repeatable Reddit workflows into scalable growth operations. The agent identifies high-intent conversations across subreddits, drafts context-aware responses, and supports repeatable workflows for research and optimization.
ara.so提供的Skill——营销技能合集。
该Skill可让AI编码Agent帮助开发者使用Reddit营销Agent,这是一个用于研究相关Reddit帖子、生成实用回复,并将可重复的Reddit工作流转化为可扩展的增长运营的系统。该Agent能在各子版块中识别高意向对话、草拟贴合上下文的回复,并为研究和优化提供可重复的工作流支持。

What It Does

功能介绍

The Reddit Marketing Agent helps you:
  • Identify high-intent conversations and opportunities across relevant subreddits
  • Draft context-aware, non-spammy responses and content angles
  • Support repeatable workflows for research, posting support, and iteration
  • Keep strategy, logs, and source context organized for ongoing optimization
  • Scale Reddit community engagement without appearing spammy
Built by the AI Automation Mastery community for systematic Reddit growth.
Reddit营销Agent可帮助您:
  • 在相关子版块中识别高意向对话与潜在机会
  • 草拟贴合上下文、非垃圾营销性质的回复及内容方向
  • 为研究、发帖支持和迭代提供可重复的工作流
  • 整理策略、日志和源上下文,以便持续优化
  • 在不显得像垃圾营销的前提下,扩大Reddit社区参与规模
由AI自动化精通社区打造,用于系统化的Reddit增长运营。

Installation

安装步骤

Clone the repository:
bash
git clone https://github.com/lucaswalter/reddit-marketing-agent.git
cd reddit-marketing-agent
Install dependencies (typically Python-based):
bash
pip install -r requirements.txt
克隆仓库:
bash
git clone https://github.com/lucaswalter/reddit-marketing-agent.git
cd reddit-marketing-agent
安装依赖(基于Python):
bash
pip install -r requirements.txt

or

pip install praw openai python-dotenv

Set up environment variables in `.env`:

```bash
REDDIT_CLIENT_ID=your_reddit_client_id
REDDIT_CLIENT_SECRET=your_reddit_client_secret
REDDIT_USER_AGENT=your_user_agent
OPENAI_API_KEY=your_openai_api_key
pip install praw openai python-dotenv

在`.env`文件中设置环境变量:

```bash
REDDIT_CLIENT_ID=your_reddit_client_id
REDDIT_CLIENT_SECRET=your_reddit_client_secret
REDDIT_USER_AGENT=your_user_agent
OPENAI_API_KEY=your_openai_api_key

Configuration

配置说明

Create a
config.yaml
or
config.json
file to define your target subreddits and search parameters:
yaml
subreddits:
  - "Entrepreneur"
  - "startups"
  - "SaaS"
  - "marketing"

keywords:
  - "need automation"
  - "looking for tools"
  - "recommend software"
  - "help with marketing"

filters:
  min_upvotes: 5
  max_age_hours: 48
  exclude_patterns:
    - "spam"
    - "promotion"

response_settings:
  tone: "helpful"
  max_length: 300
  include_value_first: true
创建
config.yaml
config.json
文件,定义目标子版块和搜索参数:
yaml
subreddits:
  - "Entrepreneur"
  - "startups"
  - "SaaS"
  - "marketing"

keywords:
  - "need automation"
  - "looking for tools"
  - "recommend software"
  - "help with marketing"

filters:
  min_upvotes: 5
  max_age_hours: 48
  exclude_patterns:
    - "spam"
    - "promotion"

response_settings:
  tone: "helpful"
  max_length: 300
  include_value_first: true

Core Usage Patterns

核心使用场景

1. Search for Relevant Threads

1. 搜索相关帖子

python
import praw
from dotenv import load_dotenv
import os

load_dotenv()
python
import praw
from dotenv import load_dotenv
import os

load_dotenv()

Initialize Reddit API client

初始化Reddit API客户端

reddit = praw.Reddit( client_id=os.getenv('REDDIT_CLIENT_ID'), client_secret=os.getenv('REDDIT_CLIENT_SECRET'), user_agent=os.getenv('REDDIT_USER_AGENT') )
def search_relevant_threads(subreddit_name, keywords, limit=25): """Search for threads matching keywords in a subreddit.""" subreddit = reddit.subreddit(subreddit_name) relevant_threads = []
for keyword in keywords:
    for submission in subreddit.search(keyword, time_filter='week', limit=limit):
        if submission.score >= 5:  # Filter by upvotes
            relevant_threads.append({
                'title': submission.title,
                'url': submission.url,
                'score': submission.score,
                'num_comments': submission.num_comments,
                'created_utc': submission.created_utc,
                'id': submission.id,
                'selftext': submission.selftext
            })

return relevant_threads
reddit = praw.Reddit( client_id=os.getenv('REDDIT_CLIENT_ID'), client_secret=os.getenv('REDDIT_CLIENT_SECRET'), user_agent=os.getenv('REDDIT_USER_AGENT') )
def search_relevant_threads(subreddit_name, keywords, limit=25): """在子版块中搜索匹配关键词的帖子。""" subreddit = reddit.subreddit(subreddit_name) relevant_threads = []
for keyword in keywords:
    for submission in subreddit.search(keyword, time_filter='week', limit=limit):
        if submission.score >= 5:  # 按点赞数过滤
            relevant_threads.append({
                'title': submission.title,
                'url': submission.url,
                'score': submission.score,
                'num_comments': submission.num_comments,
                'created_utc': submission.created_utc,
                'id': submission.id,
                'selftext': submission.selftext
            })

return relevant_threads

Example usage

使用示例

threads = search_relevant_threads('Entrepreneur', ['automation tools', 'marketing help'], limit=10) for thread in threads: print(f"{thread['title']} - Score: {thread['score']}")
undefined
threads = search_relevant_threads('Entrepreneur', ['automation tools', 'marketing help'], limit=10) for thread in threads: print(f"{thread['title']} - 点赞数: {thread['score']}")
undefined

2. Generate Context-Aware Responses

2. 生成上下文相关回复

python
import openai
import os

openai.api_key = os.getenv('OPENAI_API_KEY')

def generate_response(thread_title, thread_content, tone='helpful'):
    """Generate a contextual, non-spammy Reddit response."""
    
    prompt = f"""You are a helpful community member on Reddit. Generate a genuine, value-first response to this thread.

Thread Title: {thread_title}
Thread Content: {thread_content}

Guidelines:
- Be genuinely helpful and specific
- Provide value before any mentions
- Keep tone {tone} and conversational
- Avoid obvious promotion
- Max 250 words

Response:"""

    response = openai.ChatCompletion.create(
        model='gpt-4',
        messages=[
            {'role': 'system', 'content': 'You are an experienced marketer who engages authentically on Reddit.'},
            {'role': 'user', 'content': prompt}
        ],
        temperature=0.7,
        max_tokens=400
    )
    
    return response.choices[0].message.content
python
import openai
import os

openai.api_key = os.getenv('OPENAI_API_KEY')

def generate_response(thread_title, thread_content, tone='helpful'):
    """生成贴合上下文、非垃圾营销性质的Reddit回复。"""
    
    prompt = f"""您是Reddit上乐于助人的社区成员。请针对此帖子生成真诚、优先提供价值的回复。

帖子标题: {thread_title}
帖子内容: {thread_content}

准则:
- 真诚且具体地提供帮助
- 先提供价值再提及其他内容
- 保持{tone}的语气和对话感
- 避免明显的推广
- 最多250词

回复:"""

    response = openai.ChatCompletion.create(
        model='gpt-4',
        messages=[
            {'role': 'system', 'content': '您是一位在Reddit上真诚互动的资深营销人员。'},
            {'role': 'user', 'content': prompt}
        ],
        temperature=0.7,
        max_tokens=400
    )
    
    return response.choices[0].message.content

Example usage

使用示例

response = generate_response( "Need help automating my social media", "I'm spending 3 hours a day on social media for my startup. Any tools or strategies?" ) print(response)
undefined
response = generate_response( "Need help automating my social media", "I'm spending 3 hours a day on social media for my startup. Any tools or strategies?" ) print(response)
undefined

3. Complete Workflow Script

3. 完整工作流脚本

python
import json
from datetime import datetime

def reddit_marketing_workflow(config_path='config.json'):
    """Complete workflow: search, analyze, generate responses, log results."""
    
    # Load configuration
    with open(config_path, 'r') as f:
        config = json.load(f)
    
    results = []
    
    for subreddit_name in config['subreddits']:
        print(f"\n🔍 Searching r/{subreddit_name}...")
        
        # Search threads
        threads = search_relevant_threads(
            subreddit_name, 
            config['keywords'],
            limit=config.get('limit', 25)
        )
        
        # Filter by criteria
        filtered_threads = [
            t for t in threads 
            if t['score'] >= config['filters']['min_upvotes']
        ]
        
        print(f"✅ Found {len(filtered_threads)} relevant threads")
        
        # Generate responses for top threads
        for thread in filtered_threads[:5]:  # Top 5
            response = generate_response(
                thread['title'],
                thread['selftext'],
                tone=config['response_settings']['tone']
            )
            
            results.append({
                'subreddit': subreddit_name,
                'thread_id': thread['id'],
                'thread_title': thread['title'],
                'thread_url': thread['url'],
                'score': thread['score'],
                'generated_response': response,
                'timestamp': datetime.now().isoformat()
            })
    
    # Save results
    output_file = f"reddit_opportunities_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
    with open(output_file, 'w') as f:
        json.dump(results, f, indent=2)
    
    print(f"\n💾 Results saved to {output_file}")
    return results
python
import json
from datetime import datetime

def reddit_marketing_workflow(config_path='config.json'):
    """完整工作流:搜索、分析、生成回复、记录结果。"""
    
    # 加载配置
    with open(config_path, 'r') as f:
        config = json.load(f)
    
    results = []
    
    for subreddit_name in config['subreddits']:
        print(f"\n🔍 正在搜索r/{subreddit_name}...")
        
        # 搜索帖子
        threads = search_relevant_threads(
            subreddit_name, 
            config['keywords'],
            limit=config.get('limit', 25)
        )
        
        # 按条件过滤
        filtered_threads = [
            t for t in threads 
            if t['score'] >= config['filters']['min_upvotes']
        ]
        
        print(f"✅ 找到{len(filtered_threads)}条相关帖子")
        
        # 为热门帖子生成回复
        for thread in filtered_threads[:5]:  # 前5条
            response = generate_response(
                thread['title'],
                thread['selftext'],
                tone=config['response_settings']['tone']
            )
            
            results.append({
                'subreddit': subreddit_name,
                'thread_id': thread['id'],
                'thread_title': thread['title'],
                'thread_url': thread['url'],
                'score': thread['score'],
                'generated_response': response,
                'timestamp': datetime.now().isoformat()
            })
    
    # 保存结果
    output_file = f"reddit_opportunities_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
    with open(output_file, 'w') as f:
        json.dump(results, f, indent=2)
    
    print(f"\n💾 结果已保存至{output_file}")
    return results

Run workflow

运行工作流

results = reddit_marketing_workflow()
undefined
results = reddit_marketing_workflow()
undefined

4. Sentiment and Opportunity Scoring

4. 情感与机会评分

python
def score_opportunity(thread):
    """Score a thread's marketing opportunity potential."""
    score = 0
    
    # Engagement metrics
    if thread['score'] > 20:
        score += 2
    if thread['num_comments'] > 10:
        score += 2
    
    # Intent signals in title/content
    high_intent_phrases = ['recommend', 'looking for', 'need help', 'suggestions', 'alternatives']
    text = (thread['title'] + ' ' + thread['selftext']).lower()
    
    for phrase in high_intent_phrases:
        if phrase in text:
            score += 3
    
    # Recency (threads less than 24 hours old)
    age_hours = (datetime.now().timestamp() - thread['created_utc']) / 3600
    if age_hours < 24:
        score += 2
    
    return score
python
def score_opportunity(thread):
    """为帖子的营销机会潜力打分。"""
    score = 0
    
    # 参与度指标
    if thread['score'] > 20:
        score += 2
    if thread['num_comments'] > 10:
        score += 2
    
    # 标题/内容中的意向信号
    high_intent_phrases = ['recommend', 'looking for', 'need help', 'suggestions', 'alternatives']
    text = (thread['title'] + ' ' + thread['selftext']).lower()
    
    for phrase in high_intent_phrases:
        if phrase in text:
            score += 3
    
    # 时效性(发布不到24小时的帖子)
    age_hours = (datetime.now().timestamp() - thread['created_utc']) / 3600
    if age_hours < 24:
        score += 2
    
    return score

Example usage

使用示例

threads = search_relevant_threads('SaaS', ['automation'], limit=20) scored_threads = sorted( [(score_opportunity(t), t) for t in threads], key=lambda x: x[0], reverse=True )
print("Top opportunities:") for score, thread in scored_threads[:5]: print(f"Score {score}: {thread['title']}")
undefined
threads = search_relevant_threads('SaaS', ['automation'], limit=20) scored_threads = sorted( [(score_opportunity(t), t) for t in threads], key=lambda x: x[0], reverse=True )
print("Top opportunities:") for score, thread in scored_threads[:5]: print(f"Score {score}: {thread['title']}")
undefined

Command Line Interface

命令行界面

If the project includes a CLI script:
bash
undefined
如果项目包含CLI脚本:
bash
undefined

Search for opportunities

搜索潜在机会

python reddit_agent.py search --subreddits "Entrepreneur,startups" --keywords "automation,tools"
python reddit_agent.py search --subreddits "Entrepreneur,startups" --keywords "automation,tools"

Generate responses for saved threads

为已保存的帖子生成回复

python reddit_agent.py generate --input threads.json --output responses.json
python reddit_agent.py generate --input threads.json --output responses.json

Run full workflow

运行完整工作流

python reddit_agent.py workflow --config config.yaml
python reddit_agent.py workflow --config config.yaml

Analyze subreddit activity

分析子版块活动

python reddit_agent.py analyze --subreddit SaaS --days 7
undefined
python reddit_agent.py analyze --subreddit SaaS --days 7
undefined

Data Storage and Logging

数据存储与日志

Keep track of opportunities and responses:
python
import sqlite3

def setup_database():
    """Create SQLite database for tracking threads and responses."""
    conn = sqlite3.connect('reddit_marketing.db')
    c = conn.cursor()
    
    c.execute('''CREATE TABLE IF NOT EXISTS threads
                 (id TEXT PRIMARY KEY,
                  subreddit TEXT,
                  title TEXT,
                  url TEXT,
                  score INTEGER,
                  opportunity_score INTEGER,
                  found_date TEXT,
                  status TEXT)''')
    
    c.execute('''CREATE TABLE IF NOT EXISTS responses
                 (id INTEGER PRIMARY KEY AUTOINCREMENT,
                  thread_id TEXT,
                  generated_response TEXT,
                  posted BOOLEAN,
                  posted_date TEXT,
                  FOREIGN KEY(thread_id) REFERENCES threads(id))''')
    
    conn.commit()
    conn.close()

def log_thread(thread_data):
    """Log a discovered thread to database."""
    conn = sqlite3.connect('reddit_marketing.db')
    c = conn.cursor()
    
    c.execute('''INSERT OR IGNORE INTO threads 
                 (id, subreddit, title, url, score, opportunity_score, found_date, status)
                 VALUES (?, ?, ?, ?, ?, ?, ?, ?)''',
              (thread_data['id'], thread_data['subreddit'], thread_data['title'],
               thread_data['url'], thread_data['score'], thread_data['opportunity_score'],
               datetime.now().isoformat(), 'new'))
    
    conn.commit()
    conn.close()
跟踪潜在机会和回复:
python
import sqlite3

def setup_database():
    """创建SQLite数据库用于跟踪帖子和回复。"""
    conn = sqlite3.connect('reddit_marketing.db')
    c = conn.cursor()
    
    c.execute('''CREATE TABLE IF NOT EXISTS threads
                 (id TEXT PRIMARY KEY,
                  subreddit TEXT,
                  title TEXT,
                  url TEXT,
                  score INTEGER,
                  opportunity_score INTEGER,
                  found_date TEXT,
                  status TEXT)''')
    
    c.execute('''CREATE TABLE IF NOT EXISTS responses
                 (id INTEGER PRIMARY KEY AUTOINCREMENT,
                  thread_id TEXT,
                  generated_response TEXT,
                  posted BOOLEAN,
                  posted_date TEXT,
                  FOREIGN KEY(thread_id) REFERENCES threads(id))''')
    
    conn.commit()
    conn.close()

def log_thread(thread_data):
    """将发现的帖子记录到数据库。"""
    conn = sqlite3.connect('reddit_marketing.db')
    c = conn.cursor()
    
    c.execute('''INSERT OR IGNORE INTO threads 
                 (id, subreddit, title, url, score, opportunity_score, found_date, status)
                 VALUES (?, ?, ?, ?, ?, ?, ?, ?)''',
              (thread_data['id'], thread_data['subreddit'], thread_data['title'],
               thread_data['url'], thread_data['score'], thread_data['opportunity_score'],
               datetime.now().isoformat(), 'new'))
    
    conn.commit()
    conn.close()

Best Practices

最佳实践

Non-Spammy Engagement

非垃圾营销式互动

python
ENGAGEMENT_RULES = {
    'value_first': True,
    'max_daily_comments': 5,
    'wait_between_comments_minutes': 60,
    'personalize_each_response': True,
    'avoid_direct_promotion': True
}

def is_safe_to_engage(thread):
    """Check if engagement is appropriate."""
    # Don't engage if already commented
    # Check rate limits
    # Verify thread age and activity
    # Ensure genuine opportunity
    return True  # Implement your logic
python
ENGAGEMENT_RULES = {
    'value_first': True,
    'max_daily_comments': 5,
    'wait_between_comments_minutes': 60,
    'personalize_each_response': True,
    'avoid_direct_promotion': True
}

def is_safe_to_engage(thread):
    """检查是否适合进行互动。"""
    # 若已评论则不互动
    # 检查频率限制
    # 验证帖子时效性和活跃度
    # 确认是真实的机会
    return True  # 实现您的逻辑

Troubleshooting

故障排除

Reddit API Rate Limits:
  • PRAW handles most rate limiting automatically
  • Add delays between requests:
    time.sleep(2)
  • Use
    reddit.auth.limits
    to check remaining requests
Authentication Errors:
  • Verify
    .env
    file contains correct credentials
  • Ensure Reddit app is configured as "script" type
  • Check that user agent string is descriptive and unique
Empty Search Results:
  • Broaden keyword search terms
  • Increase time filter (e.g., 'month' instead of 'week')
  • Verify subreddit names are correct
  • Check if subreddits are private or restricted
OpenAI API Errors:
  • Verify
    OPENAI_API_KEY
    is set correctly
  • Check API quota and billing status
  • Reduce
    max_tokens
    if hitting limits
  • Add retry logic with exponential backoff
Reddit API频率限制:
  • PRAW会自动处理大部分频率限制
  • 在请求之间添加延迟:
    time.sleep(2)
  • 使用
    reddit.auth.limits
    查看剩余请求数
认证错误:
  • 验证
    .env
    文件包含正确的凭证
  • 确保Reddit应用配置为“script”类型
  • 检查用户代理字符串是否描述清晰且唯一
搜索结果为空:
  • 扩大关键词搜索范围
  • 增加时间过滤器(例如将'week'改为'month')
  • 验证子版块名称是否正确
  • 检查子版块是否为私有或受限
OpenAI API错误:
  • 验证
    OPENAI_API_KEY
    是否设置正确
  • 检查API配额和计费状态
  • 若达到限制则减少
    max_tokens
  • 添加带指数退避的重试逻辑

Advanced Features

高级功能

Multi-Subreddit Monitoring

多子版块监控

python
import schedule
import time

def monitor_subreddits():
    """Continuously monitor subreddits for new opportunities."""
    config = load_config('config.json')
    results = reddit_marketing_workflow(config)
    print(f"Found {len(results)} new opportunities")
python
import schedule
import time

def monitor_subreddits():
    """持续监控子版块以发现新机会。"""
    config = load_config('config.json')
    results = reddit_marketing_workflow(config)
    print(f"Found {len(results)} new opportunities")

Schedule monitoring

设置监控计划

schedule.every(2).hours.do(monitor_subreddits)
while True: schedule.run_pending() time.sleep(60)
undefined
schedule.every(2).hours.do(monitor_subreddits)
while True: schedule.run_pending() time.sleep(60)
undefined

Response Quality Validation

回复质量验证

python
def validate_response(response_text):
    """Ensure generated response meets quality standards."""
    checks = {
        'not_too_short': len(response_text) > 50,
        'not_too_long': len(response_text) < 500,
        'no_spam_words': not any(word in response_text.lower() for word in ['buy now', 'click here', 'limited offer']),
        'has_value': any(word in response_text.lower() for word in ['try', 'suggest', 'recommend', 'help', 'consider'])
    }
    
    return all(checks.values())
This skill provides comprehensive guidance for using the Reddit Marketing Agent to identify opportunities, generate contextual responses, and scale Reddit community engagement systematically.
python
def validate_response(response_text):
    """确保生成的回复符合质量标准。"""
    checks = {
        'not_too_short': len(response_text) > 50,
        'not_too_long': len(response_text) < 500,
        'no_spam_words': not any(word in response_text.lower() for word in ['buy now', 'click here', 'limited offer']),
        'has_value': any(word in response_text.lower() for word in ['try', 'suggest', 'recommend', 'help', 'consider'])
    }
    
    return all(checks.values())
该Skill为使用Reddit营销Agent识别机会、生成上下文回复及系统化扩大Reddit社区参与规模提供了全面指导。