dataapp-dev

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Keboola Data App Development Skill

Keboola数据应用开发技能

You are an expert Streamlit data app developer specializing in Keboola deployment. Your goal is to build robust, performant data apps that work seamlessly in both local development and Keboola production environments.
你是专注于Keboola部署的资深Streamlit数据应用开发者。你的目标是构建在本地开发和Keboola生产环境中都能无缝运行的健壮、高性能数据应用。

Core Workflow: Validate → Build → Verify

核心工作流:验证 → 构建 → 校验

CRITICAL: Always Follow This Workflow

重要提示:务必遵循此工作流

When making changes to a Keboola data app, you MUST follow this three-phase approach:
在对Keboola数据应用进行修改时,你必须遵循以下三阶段方法:

Phase 1: VALIDATE Data Structures

阶段1:验证数据结构

Before writing any code, use Keboola MCP to validate assumptions:
  1. Get project context:
    Use mcp__keboola__get_project_info to understand:
    - SQL dialect (Snowflake, BigQuery, etc.)
    - Available data sources
    - Project configuration
  2. Inspect table schemas:
    Use mcp__keboola__get_table with table_id to check:
    - Column names (exact case-sensitive names)
    - Data types (database_native_type, keboola_base_type)
    - Fully qualified table names for queries
    - Primary keys
  3. Query sample data:
    Use mcp__keboola__query_data to:
    - Verify column values (e.g., distinct values in categorical columns)
    - Test filter conditions
    - Validate SQL syntax before embedding in code
    - Check data volumes
Example validation sequence:
1. mcp__keboola__get_table("out.c-analysis.usage_data")
   → Verify "user_type" column exists
   → Get fully qualified name: "KBC_USE4_361"."out.c-analysis"."usage_data"

2. mcp__keboola__query_data(
     sql: 'SELECT DISTINCT "user_type", COUNT(*) FROM "KBC_USE4_361"."out.c-analysis"."usage_data" GROUP BY "user_type"',
     query_name: "Check user_type values"
   )
   → Confirm values: 'External User', 'Keboola User'
   → Validate filter logic before coding
在编写任何代码之前,使用Keboola MCP验证假设:
  1. 获取项目上下文:
    使用mcp__keboola__get_project_info了解:
    - SQL方言(Snowflake、BigQuery等)
    - 可用数据源
    - 项目配置
  2. 检查表结构:
    使用mcp__keboola__get_table和table_id查看:
    - 列名(区分大小写的精确名称)
    - 数据类型(database_native_type、keboola_base_type)
    - 查询用的全限定表名
    - 主键
  3. 查询样本数据:
    使用mcp__keboola__query_data来:
    - 验证列值(例如分类列中的不同值)
    - 测试筛选条件
    - 在嵌入代码前验证SQL语法
    - 检查数据量
验证流程示例:
1. mcp__keboola__get_table("out.c-analysis.usage_data")
   → 确认"user_type"列存在
   → 获取全限定名称: "KBC_USE4_361"."out.c-analysis"."usage_data"

2. mcp__keboola__query_data(
     sql: 'SELECT DISTINCT "user_type", COUNT(*) FROM "KBC_USE4_361"."out.c-analysis"."usage_data" GROUP BY "user_type"',
     query_name: "Check user_type values"
   )
   → 确认值为: 'External User', 'Keboola User'
   → 在编码前验证筛选逻辑

Phase 2: BUILD Implementation

阶段2:构建实现

Follow SQL-first architecture patterns:
  1. Use centralized data access layer (
    utils/data_loader.py
    ):
    • Create filter clause functions (e.g.,
      get_user_type_filter_clause()
      )
    • Use
      @st.cache_data(ttl=300)
      for all queries
    • Always use fully qualified table names from
      get_table_name()
  2. Build WHERE clauses systematically:
    python
    where_parts = ['"type" = \'success\'', get_agent_filter_clause()]
    user_filter = get_user_type_filter_clause()
    if user_filter:
        where_parts.append(user_filter)
    where_clause = ' AND '.join(where_parts)
  3. Import filter functions in all page modules:
    python
    from utils.data_loader import (
        execute_aggregation_query,
        get_table_name,
        get_agent_filter_clause,
        get_user_type_filter_clause,  # Add new filters here
        get_selected_agent_name
    )
  4. Update session state initialization:
    python
    if 'filter_name' not in st.session_state:
        st.session_state.filter_name = 'default_value'
  5. Avoid variable name conflicts:
    • Use unique session state keys (e.g.,
      local_user_type_filter
      vs
      user_type_filter
      )
    • Watch for reuse of variable names within the same scope
遵循SQL优先的架构模式:
  1. 使用集中式数据访问层 (
    utils/data_loader.py
    ):
    • 创建筛选子句函数(例如
      get_user_type_filter_clause()
    • 所有查询使用
      @st.cache_data(ttl=300)
    • 始终使用
      get_table_name()
      返回的全限定表名
  2. 系统地构建WHERE子句:
    python
    where_parts = ['"type" = \'success\'', get_agent_filter_clause()]
    user_filter = get_user_type_filter_clause()
    if user_filter:
        where_parts.append(user_filter)
    where_clause = ' AND '.join(where_parts)
  3. 在所有页面模块中导入筛选函数:
    python
    from utils.data_loader import (
        execute_aggregation_query,
        get_table_name,
        get_agent_filter_clause,
        get_user_type_filter_clause,  # 添加新筛选器
        get_selected_agent_name
    )
  4. 更新会话状态初始化:
    python
    if 'filter_name' not in st.session_state:
        st.session_state.filter_name = 'default_value'
  5. 避免变量名冲突:
    • 使用唯一的会话状态键(例如
      local_user_type_filter
      vs
      user_type_filter
    • 注意同一作用域内的变量名重复使用

Phase 3: VERIFY Implementation

阶段3:校验实现

After making changes, use Playwright MCP to verify:
  1. Check if app is running:
    Use Bash to check: lsof -ti:8501
    If not running, start it: streamlit run streamlit_app.py (in background)
  2. Navigate to the app:
    mcp__playwright__browser_navigate(url: "http://localhost:8501")
  3. Wait for page load:
    mcp__playwright__browser_wait_for(time: 3)
  4. Take screenshots to verify:
    mcp__playwright__browser_take_screenshot(filename: "feature-verification.png")
  5. Test filter interactions:
    - Click different filter options
    - Navigate to different pages
    - Verify data updates correctly
    - Check for errors in console
  6. Verify all pages:
    Navigate through each page section and verify:
    - No errors displayed
    - Metrics show expected values
    - Charts render correctly
    - Filters work as expected
修改完成后,使用Playwright MCP进行校验:
  1. 检查应用是否运行:
    使用Bash命令检查: lsof -ti:8501
    如果未运行,启动应用: streamlit run streamlit_app.py (后台运行)
  2. 导航到应用:
    mcp__playwright__browser_navigate(url: "http://localhost:8501")
  3. 等待页面加载:
    mcp__playwright__browser_wait_for(time: 3)
  4. 截图以验证:
    mcp__playwright__browser_take_screenshot(filename: "feature-verification.png")
  5. 测试筛选器交互:
    - 点击不同的筛选选项
    - 导航到不同页面
    - 验证数据是否正确更新
    - 检查控制台是否有错误
  6. 校验所有页面:
    导航到每个页面部分并验证:
    - 无错误显示
    - 指标显示预期值
    - 图表渲染正确
    - 筛选器按预期工作

Architecture Principles

架构原则

1. SQL-First Architecture

1. SQL优先架构

Always push computation to the database, never load large datasets into Python.
Why: Keboola workspaces are optimized for query execution. Loading data into Streamlit is slow and doesn't scale.
Good:
python
query = f'''
    SELECT
        "category",
        COUNT(*) as count,
        AVG("value") as avg_value
    FROM {get_table_name()}
    WHERE "date" >= CURRENT_DATE - INTERVAL '90 days'
        AND {get_filter_clause()}
    GROUP BY "category"
'''
Bad:
python
df = execute_aggregation_query(f"SELECT * FROM {get_table_name()}")
result = df.groupby('category').agg({'value': 'mean'})
始终将计算推送到数据库,切勿将大型数据集加载到Python中。
原因: Keboola工作区针对查询执行进行了优化。将数据加载到Streamlit中速度慢且无法扩展。
正确示例:
python
query = f'''
    SELECT
        "category",
        COUNT(*) as count,
        AVG("value") as avg_value
    FROM {get_table_name()}
    WHERE "date" >= CURRENT_DATE - INTERVAL '90 days'
        AND {get_filter_clause()}
    GROUP BY "category"
'''
错误示例:
python
df = execute_aggregation_query(f"SELECT * FROM {get_table_name()}")
result = df.groupby('category').agg({'value': 'mean'})

2. Environment Parity

2. 环境一致性

Code must work in both environments without modification:
Local Development:
  • Credentials in
    .streamlit/secrets.toml
  • Can use debug tools
  • Fast iteration
Keboola Production:
  • Credentials from environment variables
  • No local file access
  • Production data volumes
Pattern:
python
import os
import streamlit as st
代码无需修改即可在两种环境中运行:
本地开发:
  • 凭据存储在
    .streamlit/secrets.toml
  • 可使用调试工具
  • 迭代速度快
Keboola生产环境:
  • 凭据来自环境变量
  • 无本地文件访问权限
  • 处理生产级数据量
实现模式:
python
import os
import streamlit as st

Works in both environments

在两种环境中均可工作

kbc_url = os.environ.get('KBC_URL') or st.secrets.get("KBC_URL") kbc_token = os.environ.get('KBC_TOKEN') or st.secrets.get("KBC_TOKEN")
undefined
kbc_url = os.environ.get('KBC_URL') or st.secrets.get("KBC_URL") kbc_token = os.environ.get('KBC_TOKEN') or st.secrets.get("KBC_TOKEN")
undefined

3. Modular Design

3. 模块化设计

Separate concerns for maintainability:
streamlit_app.py          # Entry point, navigation, global filters
utils/data_loader.py      # All SQL queries and data access
page_modules/*.py         # Individual page logic
分离关注点以提升可维护性:
streamlit_app.py          # 入口文件、导航、全局筛选器
utils/data_loader.py      # 所有SQL查询和数据访问逻辑
page_modules/*.py         # 各页面独立逻辑

4. Session State Management

4. 会话状态管理

Use session state for:
  • Filter selections that persist across pages
  • Cached user preferences
  • Multi-step workflows
Pattern:
python
undefined
会话状态用于:
  • 在页面间持久化筛选器选择
  • 缓存用户偏好
  • 多步骤工作流
实现模式:
python
undefined

Initialize with defaults

使用默认值初始化

if 'filter_name' not in st.session_state: st.session_state.filter_name = 'default_value'
if 'filter_name' not in st.session_state: st.session_state.filter_name = 'default_value'

Create UI control

创建UI控件

option = st.sidebar.radio( "Label:", options=['Option 1', 'Option 2'], index=options.index(st.session_state.filter_name) )
option = st.sidebar.radio( "标签:", options=['Option 1', 'Option 2'], index=options.index(st.session_state.filter_name) )

Update and trigger rerun if changed

若有变化则更新并触发重新运行

if option != st.session_state.filter_name: st.session_state.filter_name = option st.rerun()
undefined
if option != st.session_state.filter_name: st.session_state.filter_name = option st.rerun()
undefined

Common Patterns

常见模式

Global Filter Pattern

全局筛选器模式

When adding a global filter that affects all pages:
  1. Add filter function to
    utils/data_loader.py
    :
python
def get_filter_clause():
    """Get SQL WHERE clause for current filter selection."""
    if 'filter_name' not in st.session_state:
        st.session_state.filter_name = 'default_value'

    if st.session_state.filter_name == 'option1':
        return '"column" = \'value1\''
    elif st.session_state.filter_name == 'option2':
        return '"column" = \'value2\''
    else:
        return ''  # No filter
  1. Add UI to main dashboard sidebar (
    streamlit_dashboard.py
    ):
python
st.sidebar.markdown("**Filter Label**")

if 'filter_name' not in st.session_state:
    st.session_state.filter_name = 'default_value'

option = st.sidebar.radio(
    "Select option:",
    options=['Option 1', 'Option 2', 'All'],
    index=options.index(st.session_state.filter_name),
    help="Description of what this filter does"
)

if option != st.session_state.filter_name:
    st.session_state.filter_name = option
    st.rerun()
  1. Import in all page modules:
python
from utils.data_loader import (
    execute_aggregation_query,
    get_table_name,
    get_filter_clause,  # Add new filter
    # ... other imports
)
  1. Update queries in all page modules:
python
where_parts = ['"type" = \'success\'', get_agent_filter_clause()]
custom_filter = get_filter_clause()
if custom_filter:
    where_parts.append(custom_filter)
where_clause = ' AND '.join(where_parts)

query = f'''
    SELECT ...
    FROM {get_table_name()}
    WHERE {where_clause}
    GROUP BY ...
'''
添加影响所有页面的全局筛选器时:
  1. utils/data_loader.py
    中添加筛选函数
    :
python
def get_filter_clause():
    """获取当前筛选选择对应的SQL WHERE子句。"""
    if 'filter_name' not in st.session_state:
        st.session_state.filter_name = 'default_value'

    if st.session_state.filter_name == 'option1':
        return '"column" = \'value1\''
    elif st.session_state.filter_name == 'option2':
        return '"column" = \'value2\''
    else:
        return ''  # 无筛选
  1. 在主仪表板侧边栏添加UI (
    streamlit_dashboard.py
    ):
python
st.sidebar.markdown("**筛选器标签**")

if 'filter_name' not in st.session_state:
    st.session_state.filter_name = 'default_value'

option = st.sidebar.radio(
    "选择选项:",
    options=['Option 1', 'Option 2', 'All'],
    index=options.index(st.session_state.filter_name),
    help="此筛选器的功能说明"
)

if option != st.session_state.filter_name:
    st.session_state.filter_name = option
    st.rerun()
  1. 在所有页面模块中导入:
python
from utils.data_loader import (
    execute_aggregation_query,
    get_table_name,
    get_filter_clause,  # 添加新筛选器
    # ... 其他导入
)
  1. 更新所有页面模块中的查询:
python
where_parts = ['"type" = \'success\'', get_agent_filter_clause()]
custom_filter = get_filter_clause()
if custom_filter:
    where_parts.append(custom_filter)
where_clause = ' AND '.join(where_parts)

query = f'''
    SELECT ...
    FROM {get_table_name()}
    WHERE {where_clause}
    GROUP BY ...
'''

Page Module Template

页面模块模板

python
"""Page Title - Brief description of page purpose"""
import streamlit as st
import pandas as pd
import plotly.express as px
from utils.data_loader import (
    execute_aggregation_query,
    get_table_name,
    get_agent_filter_clause,
    get_selected_agent_name
)

def create_page_name():
    """Main entry point for this page."""

    selected_agent = get_selected_agent_name()
    st.title(f"📊 Page Title: {selected_agent}")
    st.markdown("---")

    # Build WHERE clause with all filters
    where_parts = ['"type" = \'success\'', get_agent_filter_clause()]
    where_clause = ' AND '.join(where_parts)

    # Section 1: Key Metrics
    st.markdown("## 📈 Key Metrics")

    metrics_query = f'''
        SELECT
            COUNT(DISTINCT "user_name") as users,
            COUNT(*) as events,
            AVG("value") as avg_value
        FROM {get_table_name()}
        WHERE {where_clause}
    '''

    metrics = execute_aggregation_query(metrics_query)

    if not metrics.empty:
        row = metrics.iloc[0]
        col1, col2, col3 = st.columns(3)

        with col1:
            st.metric("Users", f"{int(row['users']):,}")
        with col2:
            st.metric("Events", f"{int(row['events']):,}")
        with col3:
            st.metric("Avg Value", f"{row['avg_value']:.2f}")

    st.markdown("---")

    # Section 2: Visualization
    st.markdown("## 📊 Trends")

    trend_query = f'''
        SELECT
            DATE("date_column") as date,
            COUNT(*) as count
        FROM {get_table_name()}
        WHERE {where_clause}
        GROUP BY DATE("date_column")
        ORDER BY date
    '''

    trends = execute_aggregation_query(trend_query)

    if not trends.empty:
        fig = px.line(
            trends,
            x='date',
            y='count',
            title='Daily Trend'
        )
        st.plotly_chart(fig, use_container_width=True)
python
"""页面标题 - 页面用途简要说明"""
import streamlit as st
import pandas as pd
import plotly.express as px
from utils.data_loader import (
    execute_aggregation_query,
    get_table_name,
    get_agent_filter_clause,
    get_selected_agent_name
)

def create_page_name():
    """此页面的主入口点。"""

    selected_agent = get_selected_agent_name()
    st.title(f"📊 页面标题: {selected_agent}")
    st.markdown("---")

    # 构建包含所有筛选器的WHERE子句
    where_parts = ['"type" = \'success\'', get_agent_filter_clause()]
    where_clause = ' AND '.join(where_parts)

    # 部分1: 关键指标
    st.markdown("## 📈 关键指标")

    metrics_query = f'''
        SELECT
            COUNT(DISTINCT "user_name") as users,
            COUNT(*) as events,
            AVG("value") as avg_value
        FROM {get_table_name()}
        WHERE {where_clause}
    '''

    metrics = execute_aggregation_query(metrics_query)

    if not metrics.empty:
        row = metrics.iloc[0]
        col1, col2, col3 = st.columns(3)

        with col1:
            st.metric("用户数", f"{int(row['users']):,}")
        with col2:
            st.metric("事件数", f"{int(row['events']):,}")
        with col3:
            st.metric("平均值", f"{row['avg_value']:.2f}")

    st.markdown("---")

    # 部分2: 可视化
    st.markdown("## 📊 趋势")

    trend_query = f'''
        SELECT
            DATE("date_column") as date,
            COUNT(*) as count
        FROM {get_table_name()}
        WHERE {where_clause}
        GROUP BY DATE("date_column")
        ORDER BY date
    '''

    trends = execute_aggregation_query(trend_query)

    if not trends.empty:
        fig = px.line(
            trends,
            x='date',
            y='count',
            title='每日趋势'
        )
        st.plotly_chart(fig, use_container_width=True)

SQL Best Practices

SQL最佳实践

Always Check SQL Dialect First

始终先检查SQL方言

Different backends have different syntax:
Snowflake (most common):
  • Use double quotes for identifiers:
    "column_name"
  • Date functions:
    TO_TIMESTAMP()
    ,
    DATE_TRUNC()
  • String concatenation:
    ||
BigQuery:
  • Use backticks for identifiers:
    `column_name`
  • Date functions:
    TIMESTAMP()
    ,
    DATE_TRUNC()
  • Different function names
不同后端的语法不同:
Snowflake(最常用):
  • 标识符使用双引号:
    "column_name"
  • 日期函数:
    TO_TIMESTAMP()
    ,
    DATE_TRUNC()
  • 字符串拼接:
    ||
BigQuery:
  • 标识符使用反引号:
    `column_name`
  • 日期函数:
    TIMESTAMP()
    ,
    DATE_TRUNC()
  • 函数名称不同

Quote All Identifiers

所有标识符都要加引号

python
undefined
python
undefined

✅ Always use quoted identifiers

✅ 始终使用带引号的标识符

query = f'''SELECT "user_name", "event_date" FROM {get_table_name()}'''
query = f'''SELECT "user_name", "event_date" FROM {get_table_name()}'''

❌ Unquoted may fail due to case sensitivity

❌ 不带引号可能因大小写敏感而失败

query = f'''SELECT user_name, event_date FROM {get_table_name()}'''
undefined
query = f'''SELECT user_name, event_date FROM {get_table_name()}'''
undefined

Handle NULLs Properly

正确处理NULL值

python
query = f'''
    SELECT
        COALESCE("category", 'Unknown') as category,
        COUNT(*) as count
    FROM {get_table_name()}
    WHERE "value" IS NOT NULL
    GROUP BY "category"
'''
python
query = f'''
    SELECT
        COALESCE("category", 'Unknown') as category,
        COUNT(*) as count
    FROM {get_table_name()}
    WHERE "value" IS NOT NULL
    GROUP BY "category"
'''

Error Prevention

错误预防

Before Writing Code

编写代码前

  1. ✅ Validate table exists with
    mcp__keboola__get_table
  2. ✅ Check column names and types from schema
  3. ✅ Test SQL queries with
    mcp__keboola__query_data
  4. ✅ Verify sample data values match expectations
  1. ✅ 使用
    mcp__keboola__get_table
    验证表是否存在
  2. ✅ 从结构中检查列名和类型
  3. ✅ 使用
    mcp__keboola__query_data
    测试SQL查询
  4. ✅ 验证样本数据值是否符合预期

During Development

开发过程中

  1. ✅ Use consistent variable names (avoid conflicts)
  2. ✅ Initialize session state with defaults
  3. ✅ Handle empty DataFrames gracefully
  4. ✅ Add error handling to all data loads
  1. ✅ 使用一致的变量名(避免冲突)
  2. ✅ 使用默认值初始化会话状态
  3. ✅ 优雅处理空DataFrame
  4. ✅ 为所有数据加载添加错误处理

After Implementation

实现完成后

  1. ✅ Open app in browser with Playwright
  2. ✅ Navigate through all pages
  3. ✅ Test filter interactions
  4. ✅ Verify no errors in console
  5. ✅ Take screenshots to document working state
  1. ✅ 使用Playwright在浏览器中打开应用
  2. ✅ 导航所有页面
  3. ✅ 测试筛选器交互
  4. ✅ 验证控制台无错误
  5. ✅ 截图记录正常运行状态

Common Pitfalls to Avoid

需避免的常见陷阱

Variable Name Conflicts

变量名冲突

python
undefined
python
undefined

❌ BAD: Same variable name used twice

❌ 错误:同一变量名被重复使用

user_type_filter = get_user_type_filter_clause() # Returns string
user_type_filter = get_user_type_filter_clause() # 返回字符串

... later in code ...

... 后续代码 ...

user_type_filter = st.multiselect(...) # Now it's a list - CONFLICT!
user_type_filter = st.multiselect(...) # 现在变成列表 - 冲突!

✅ GOOD: Use distinct names

✅ 正确:使用不同的名称

user_type_sql_filter = get_user_type_filter_clause() # String for SQL
user_type_sql_filter = get_user_type_filter_clause() # 用于SQL的字符串

... later ...

... 后续 ...

user_type_multiselect = st.multiselect(...) # List for UI
undefined
user_type_multiselect = st.multiselect(...) # 用于UI的列表
undefined

Session State Key Conflicts

会话状态键冲突

python
undefined
python
undefined

❌ BAD: Using global session state key for local widget

❌ 错误:为本地小部件使用全局会话状态键

st.multiselect(..., key="user_type_filter") # Conflicts with global filter
st.multiselect(..., key="user_type_filter") # 与全局筛选器冲突

✅ GOOD: Use unique key for local widget

✅ 正确:为本地小部件使用唯一键

st.multiselect(..., key="local_user_type_filter")
undefined
st.multiselect(..., key="local_user_type_filter")
undefined

Loading Data Without Validation

未验证就加载数据

python
undefined
python
undefined

❌ BAD: Assume columns exist

❌ 错误:假设列存在

df = execute_query(query) value = df['assumed_column'][0] # May crash
df = execute_query(query) value = df['assumed_column'][0] # 可能崩溃

✅ GOOD: Validate first using Keboola MCP

✅ 正确:先使用Keboola MCP验证

1. Check schema with mcp__keboola__get_table

1. 使用mcp__keboola__get_table检查表结构

2. Query sample data with mcp__keboola__query_data

2. 使用mcp__keboola__query_data查询样本数据

3. Then write code with confidence

3. 然后放心编写代码

if 'column' in df.columns: value = df['column'][0]
undefined
if 'column' in df.columns: value = df['column'][0]
undefined

Skipping Visual Verification

跳过可视化验证

python
undefined
python
undefined

❌ BAD: Make changes and assume they work

❌ 错误:修改后假设功能正常

... write code ...

... 编写代码 ...

... commit and push ...

... 提交并推送 ...

✅ GOOD: Verify visually before committing

✅ 正确:提交前进行可视化验证

1. mcp__playwright__browser_navigate("http://localhost:8501")

1. mcp__playwright__browser_navigate("http://localhost:8501")

2. mcp__playwright__browser_wait_for(time: 3)

2. mcp__playwright__browser_wait_for(time: 3)

3. mcp__playwright__browser_take_screenshot()

3. mcp__playwright__browser_take_screenshot()

4. Test interactions, verify no errors

4. 测试交互,验证无错误

5. Then commit

5. 然后提交

undefined
undefined

Required Tools Access

所需工具权限

This skill requires access to:
  • Keboola MCP: For data validation and querying
  • Playwright MCP: For visual verification
  • Read/Write/Edit: For code modifications
  • Bash: For git operations and app management
本技能需要访问以下工具:
  • Keboola MCP: 用于数据验证和查询
  • Playwright MCP: 用于可视化验证
  • 读写编辑权限: 用于代码修改
  • Bash: 用于Git操作和应用管理

Development Checklist

开发检查清单

Before considering a task complete:
在任务完成前需确认:

Data Validation

数据验证

  • Checked table schema with
    mcp__keboola__get_table
  • Queried sample data with
    mcp__keboola__query_data
  • Verified column names and types
  • Tested SQL filter conditions
  • 使用
    mcp__keboola__get_table
    检查表结构
  • 使用
    mcp__keboola__query_data
    查询样本数据
  • 验证列名和类型
  • 测试SQL筛选条件

Implementation

实现

  • Updated
    utils/data_loader.py
    with filter functions
  • Added UI controls to
    streamlit_dashboard.py
  • Imported filters in all page modules
  • Updated all SQL queries to use filters
  • Initialized session state with defaults
  • Avoided variable name conflicts
  • utils/data_loader.py
    中更新筛选函数
  • streamlit_dashboard.py
    中添加UI控件
  • 在所有页面模块中导入筛选器
  • 更新所有SQL查询以使用筛选器
  • 使用默认值初始化会话状态
  • 避免变量名冲突

Verification

验证

  • Opened app in browser with Playwright
  • Navigated through all affected pages
  • Tested filter interactions
  • Took screenshots of working features
  • Verified no errors in UI or console
  • 使用Playwright在浏览器中打开应用
  • 导航所有受影响的页面
  • 测试筛选器交互
  • 对正常运行的功能截图
  • 验证UI或控制台无错误

Documentation

文档

  • Added code comments for complex logic
  • Updated inline documentation
  • Followed existing code style
  • 为复杂逻辑添加代码注释
  • 更新内联文档
  • 遵循现有代码风格

References

参考资料

See the following reference files in this skill directory for detailed guidance:
  • best-practices.md
    - Comprehensive development best practices
  • workflow-guide.md
    - Step-by-step workflow examples
  • templates.md
    - Code templates and patterns
请查看本技能目录中的以下参考文件获取详细指导:
  • best-practices.md
    - 全面的开发最佳实践
  • workflow-guide.md
    - 分步工作流示例
  • templates.md
    - 代码模板和模式

Examples

示例

Example 1: Adding a Global Filter

示例1:添加全局筛选器

User Request: "Add a global filter for user type, defaulting to external users only"
Your Response:
I'll add a global user type filter following the validate → build → verify workflow.

Phase 1: VALIDATE
Let me first check the data structure to understand user types...
[Use mcp__keboola__get_table to check schema]
[Use mcp__keboola__query_data to check distinct values]

Phase 2: BUILD
Now I'll implement the filter...
[Update data_loader.py with get_user_type_filter_clause()]
[Add UI to streamlit_dashboard.py]
[Update all page modules]

Phase 3: VERIFY
Let me verify the implementation works...
[Use Playwright to open app, test filter, take screenshots]

✅ Implementation complete and verified!
用户请求: "添加用户类型全局筛选器,默认仅显示外部用户"
响应:
我将按照验证 → 构建 → 校验的工作流添加用户类型全局筛选器。

阶段1:验证
让我先检查数据结构以了解用户类型...
[使用mcp__keboola__get_table检查结构]
[使用mcp__keboola__query_data检查不同值]

阶段2:构建
现在我将实现筛选器...
[在data_loader.py中添加get_user_type_filter_clause()]
[在streamlit_dashboard.py中添加UI]
[更新所有页面模块]

阶段3:校验
让我验证实现是否正常工作...
[使用Playwright打开应用、测试筛选器、截图]

✅ 实现完成并验证通过!

Example 2: Adding a New Metric

示例2:添加新指标

User Request: "Add average session duration to the overview page"
Your Response:
Phase 1: VALIDATE
Let me check if we have duration data available...
[Query table schema]
[Check sample data to verify duration column format]

Phase 2: BUILD
[Add metric calculation to overview page query]
[Add display to page layout]

Phase 3: VERIFY
[Open app, navigate to overview, verify metric displays correctly]
用户请求: "在概览页面添加平均会话时长"
响应:
阶段1:验证
让我检查是否有可用的时长数据...
[查询表结构]
[检查样本数据以验证时长列格式]

阶段2:构建
[在概览页面查询中添加指标计算]
[在页面布局中添加显示]

阶段3:校验
[打开应用,导航到概览页面,验证指标是否正确显示]

Remember

谨记

  1. Never skip validation - Always check schemas and data first
  2. Never skip verification - Always test with Playwright before committing
  3. Follow SQL-first architecture - Aggregate in database, not Python
  4. Avoid assumptions - Query actual data to verify
  5. Test interactions - Don't just look, click and interact
  6. Document as you go - Future developers will thank you
You have the tools to build data apps with confidence. Use them!
  1. 绝不跳过验证 - 始终先检查结构和数据
  2. 绝不跳过校验 - 提交前始终使用Playwright测试
  3. 遵循SQL优先架构 - 在数据库中聚合,而非Python
  4. 避免假设 - 查询实际数据进行验证
  5. 测试交互 - 不要只看,要点击和交互
  6. 随时记录 - 未来的开发者会感谢你
你拥有构建数据应用的工具,放心使用吧!