Loading...
Loading...
Expert for developing Streamlit data apps for Keboola deployment. Activates when building, modifying, or debugging Keboola data apps, Streamlit dashboards, adding filters, creating pages, or fixing data app issues. Validates data structures using Keboola MCP before writing code, tests implementations with Playwright browser automation, and follows SQL-first architecture patterns.
npx skill4agent add keboola/ai-kit dataapp-devUse mcp__keboola__get_project_info to understand:
- SQL dialect (Snowflake, BigQuery, etc.)
- Available data sources
- Project configurationUse mcp__keboola__get_table with table_id to check:
- Column names (exact case-sensitive names)
- Data types (database_native_type, keboola_base_type)
- Fully qualified table names for queries
- Primary keysUse mcp__keboola__query_data to:
- Verify column values (e.g., distinct values in categorical columns)
- Test filter conditions
- Validate SQL syntax before embedding in code
- Check data volumes1. mcp__keboola__get_table("out.c-analysis.usage_data")
→ Verify "user_type" column exists
→ Get fully qualified name: "KBC_USE4_361"."out.c-analysis"."usage_data"
2. mcp__keboola__query_data(
sql: 'SELECT DISTINCT "user_type", COUNT(*) FROM "KBC_USE4_361"."out.c-analysis"."usage_data" GROUP BY "user_type"',
query_name: "Check user_type values"
)
→ Confirm values: 'External User', 'Keboola User'
→ Validate filter logic before codingutils/data_loader.pyget_user_type_filter_clause()@st.cache_data(ttl=300)get_table_name()where_parts = ['"type" = \'success\'', get_agent_filter_clause()]
user_filter = get_user_type_filter_clause()
if user_filter:
where_parts.append(user_filter)
where_clause = ' AND '.join(where_parts)from utils.data_loader import (
execute_aggregation_query,
get_table_name,
get_agent_filter_clause,
get_user_type_filter_clause, # Add new filters here
get_selected_agent_name
)if 'filter_name' not in st.session_state:
st.session_state.filter_name = 'default_value'local_user_type_filteruser_type_filterUse Bash to check: lsof -ti:8501
If not running, start it: streamlit run streamlit_app.py (in background)mcp__playwright__browser_navigate(url: "http://localhost:8501")mcp__playwright__browser_wait_for(time: 3)mcp__playwright__browser_take_screenshot(filename: "feature-verification.png")- Click different filter options
- Navigate to different pages
- Verify data updates correctly
- Check for errors in consoleNavigate through each page section and verify:
- No errors displayed
- Metrics show expected values
- Charts render correctly
- Filters work as expectedquery = f'''
SELECT
"category",
COUNT(*) as count,
AVG("value") as avg_value
FROM {get_table_name()}
WHERE "date" >= CURRENT_DATE - INTERVAL '90 days'
AND {get_filter_clause()}
GROUP BY "category"
'''df = execute_aggregation_query(f"SELECT * FROM {get_table_name()}")
result = df.groupby('category').agg({'value': 'mean'}).streamlit/secrets.tomlimport os
import streamlit as st
# Works in both environments
kbc_url = os.environ.get('KBC_URL') or st.secrets.get("KBC_URL")
kbc_token = os.environ.get('KBC_TOKEN') or st.secrets.get("KBC_TOKEN")streamlit_app.py # Entry point, navigation, global filters
utils/data_loader.py # All SQL queries and data access
page_modules/*.py # Individual page logic# Initialize with defaults
if 'filter_name' not in st.session_state:
st.session_state.filter_name = 'default_value'
# Create UI control
option = st.sidebar.radio(
"Label:",
options=['Option 1', 'Option 2'],
index=options.index(st.session_state.filter_name)
)
# Update and trigger rerun if changed
if option != st.session_state.filter_name:
st.session_state.filter_name = option
st.rerun()utils/data_loader.pydef get_filter_clause():
"""Get SQL WHERE clause for current filter selection."""
if 'filter_name' not in st.session_state:
st.session_state.filter_name = 'default_value'
if st.session_state.filter_name == 'option1':
return '"column" = \'value1\''
elif st.session_state.filter_name == 'option2':
return '"column" = \'value2\''
else:
return '' # No filterstreamlit_dashboard.pyst.sidebar.markdown("**Filter Label**")
if 'filter_name' not in st.session_state:
st.session_state.filter_name = 'default_value'
option = st.sidebar.radio(
"Select option:",
options=['Option 1', 'Option 2', 'All'],
index=options.index(st.session_state.filter_name),
help="Description of what this filter does"
)
if option != st.session_state.filter_name:
st.session_state.filter_name = option
st.rerun()from utils.data_loader import (
execute_aggregation_query,
get_table_name,
get_filter_clause, # Add new filter
# ... other imports
)where_parts = ['"type" = \'success\'', get_agent_filter_clause()]
custom_filter = get_filter_clause()
if custom_filter:
where_parts.append(custom_filter)
where_clause = ' AND '.join(where_parts)
query = f'''
SELECT ...
FROM {get_table_name()}
WHERE {where_clause}
GROUP BY ...
'''"""Page Title - Brief description of page purpose"""
import streamlit as st
import pandas as pd
import plotly.express as px
from utils.data_loader import (
execute_aggregation_query,
get_table_name,
get_agent_filter_clause,
get_selected_agent_name
)
def create_page_name():
"""Main entry point for this page."""
selected_agent = get_selected_agent_name()
st.title(f"📊 Page Title: {selected_agent}")
st.markdown("---")
# Build WHERE clause with all filters
where_parts = ['"type" = \'success\'', get_agent_filter_clause()]
where_clause = ' AND '.join(where_parts)
# Section 1: Key Metrics
st.markdown("## 📈 Key Metrics")
metrics_query = f'''
SELECT
COUNT(DISTINCT "user_name") as users,
COUNT(*) as events,
AVG("value") as avg_value
FROM {get_table_name()}
WHERE {where_clause}
'''
metrics = execute_aggregation_query(metrics_query)
if not metrics.empty:
row = metrics.iloc[0]
col1, col2, col3 = st.columns(3)
with col1:
st.metric("Users", f"{int(row['users']):,}")
with col2:
st.metric("Events", f"{int(row['events']):,}")
with col3:
st.metric("Avg Value", f"{row['avg_value']:.2f}")
st.markdown("---")
# Section 2: Visualization
st.markdown("## 📊 Trends")
trend_query = f'''
SELECT
DATE("date_column") as date,
COUNT(*) as count
FROM {get_table_name()}
WHERE {where_clause}
GROUP BY DATE("date_column")
ORDER BY date
'''
trends = execute_aggregation_query(trend_query)
if not trends.empty:
fig = px.line(
trends,
x='date',
y='count',
title='Daily Trend'
)
st.plotly_chart(fig, use_container_width=True)"column_name"TO_TIMESTAMP()DATE_TRUNC()||`column_name`TIMESTAMP()DATE_TRUNC()# ✅ Always use quoted identifiers
query = f'''SELECT "user_name", "event_date" FROM {get_table_name()}'''
# ❌ Unquoted may fail due to case sensitivity
query = f'''SELECT user_name, event_date FROM {get_table_name()}'''query = f'''
SELECT
COALESCE("category", 'Unknown') as category,
COUNT(*) as count
FROM {get_table_name()}
WHERE "value" IS NOT NULL
GROUP BY "category"
'''mcp__keboola__get_tablemcp__keboola__query_data# ❌ BAD: Same variable name used twice
user_type_filter = get_user_type_filter_clause() # Returns string
# ... later in code ...
user_type_filter = st.multiselect(...) # Now it's a list - CONFLICT!
# ✅ GOOD: Use distinct names
user_type_sql_filter = get_user_type_filter_clause() # String for SQL
# ... later ...
user_type_multiselect = st.multiselect(...) # List for UI# ❌ BAD: Using global session state key for local widget
st.multiselect(..., key="user_type_filter") # Conflicts with global filter
# ✅ GOOD: Use unique key for local widget
st.multiselect(..., key="local_user_type_filter")# ❌ BAD: Assume columns exist
df = execute_query(query)
value = df['assumed_column'][0] # May crash
# ✅ GOOD: Validate first using Keboola MCP
# 1. Check schema with mcp__keboola__get_table
# 2. Query sample data with mcp__keboola__query_data
# 3. Then write code with confidence
if 'column' in df.columns:
value = df['column'][0]# ❌ BAD: Make changes and assume they work
# ... write code ...
# ... commit and push ...
# ✅ GOOD: Verify visually before committing
# 1. mcp__playwright__browser_navigate("http://localhost:8501")
# 2. mcp__playwright__browser_wait_for(time: 3)
# 3. mcp__playwright__browser_take_screenshot()
# 4. Test interactions, verify no errors
# 5. Then commitmcp__keboola__get_tablemcp__keboola__query_datautils/data_loader.pystreamlit_dashboard.pybest-practices.mdworkflow-guide.mdtemplates.mdI'll add a global user type filter following the validate → build → verify workflow.
Phase 1: VALIDATE
Let me first check the data structure to understand user types...
[Use mcp__keboola__get_table to check schema]
[Use mcp__keboola__query_data to check distinct values]
Phase 2: BUILD
Now I'll implement the filter...
[Update data_loader.py with get_user_type_filter_clause()]
[Add UI to streamlit_dashboard.py]
[Update all page modules]
Phase 3: VERIFY
Let me verify the implementation works...
[Use Playwright to open app, test filter, take screenshots]
✅ Implementation complete and verified!Phase 1: VALIDATE
Let me check if we have duration data available...
[Query table schema]
[Check sample data to verify duration column format]
Phase 2: BUILD
[Add metric calculation to overview page query]
[Add display to page layout]
Phase 3: VERIFY
[Open app, navigate to overview, verify metric displays correctly]