Loading...
Loading...
Migrate an MLflow ResponsesAgent from Databricks Model Serving to Databricks Apps. Use when: (1) User wants to migrate from Model Serving to Apps, (2) User has a ResponsesAgent with predict()/predict_stream() methods, (3) User wants to convert to @invoke/@stream decorators.
npx skill4agent add databricks/app-templates migrate-from-model-servingResponsesAgentpredict()predict_stream()@invoke@streampredict()predict_stream()@invoke@stream<working-directory>/
├── original_mlflow_model/ # Downloaded artifacts from Model Serving
│ ├── MLmodel
│ ├── code/
│ │ └── agent.py
│ ├── input_example.json
│ └── requirements.txt
│
└── <app-name>/ # New Databricks App (ready to deploy)
├── agent_server/
│ ├── agent.py # Migrated agent code
│ └── ...
├── app.yaml
├── databricks.yml # Bundle config with resources
├── pyproject.toml
├── requirements.txt
└── ...is the name the user provides at the start of the migration. It is used as both the directory name and the Databricks App name at deploy time.<app-name>
AskUserQuestiondatabricks auth profilesawaitasync forResponsesAgent@invoke@stream<profile>databricks--profile <profile><app-name>databricks bundle deploy<async>yesnodatabricks current-user me --profile <profile>databricks auth login --profile <profile>Important: Remember to includeon every--profile <profile>CLI command throughout the migration.databricks
<app-name>/AGENTS.mdCLAUDE.md.claude/.git/original_mlflow_model/.migration-venv/<app-name>/Note: Thescaffold is intentionally framework-agnostic — it contains theagent_server/agent.py/@invokedecorator pattern with TODO placeholders. Step 3 (Migrate the Agent Code) will replace these placeholders with the actual agent logic from the original Model Serving endpoint.@stream
User tip: Pressto toggle the task list view in your terminal. The display shows up to 10 tasks at a time with status indicators.Ctrl+T
TaskCreate| Task | Description |
|---|---|
| Authenticate to Databricks | Verify Databricks CLI authentication and validate the selected profile |
| Download original agent artifacts | Download the MLflow model artifacts from Model Serving endpoint |
| Analyze and understand agent code | Examine the original agent code, identify tools, resources, and dependencies |
| Migrate agent code to Apps format | Transform ResponsesAgent class to @invoke/@stream decorated functions |
| Set up and configure the app | Install dependencies, run quickstart, configure environment |
| Test agent locally | Start local server and verify the agent works correctly |
| Deploy to Databricks Apps | Configure databricks.yml resources and deploy with Databricks Asset Bundles |
| Test deployed app | Verify the deployed app responds correctly |
in_progresscompletedTask: Mark "Authenticate to Databricks" as. Mark "Download original agent artifacts" ascompleted.in_progressNote: Theand<profile>values were collected from the user in the "Before You Begin" section. Use them throughout.<app-name>
# Get endpoint info (remember to include --profile if using non-default)
databricks serving-endpoints get <endpoint-name> --profile <profile> --output jsonserved_entities[0].entity_nameentity_versiontraffic_config.routesuv run --withmlflow[databricks]boto3DATABRICKS_CONFIG_PROFILE=<profile> uv run --no-project \
--with "mlflow[databricks]>=2.15.0" \
--with "databricks-sdk>=0.30.0" \
python3 << 'EOF'
import mlflow
mlflow.set_tracking_uri("databricks")
# Replace with actual values from step 1.1
MODEL_NAME = "<model-name>"
VERSION = "<version>"
print(f"Downloading model: models:/{MODEL_NAME}/{VERSION}")
mlflow.artifacts.download_artifacts(
artifact_uri=f"models:/{MODEL_NAME}/{VERSION}",
dst_path="./original_mlflow_model"
)
print("Download complete! Artifacts saved to ./original_mlflow_model")
EOF# List all downloaded files recursively
find ./original_mlflow_model -type f | head -50
# Check for MLmodel file (contains resource requirements)
cat ./original_mlflow_model/MLmodel
# Check for input example (useful for testing)
cat ./original_mlflow_model/input_example.json 2>/dev/null/codecode_paths=["..."]# List all code files
ls -la ./original_mlflow_model/code/
# The main agent is typically agent.py, but there may be additional modules
find ./original_mlflow_model/code -name "*.py" -type f/artifactsartifacts={...}# Check for artifacts folder
ls -la ./original_mlflow_model/artifacts/ 2>/dev/null
# List all artifacts
find ./original_mlflow_model/artifacts -type f 2>/dev/nullImportant: Take note of ALL files inand/code. You will need to copy these to the migrated app and ensure imports still work correctly./artifacts
./original_mlflow_model/
├── MLmodel # Model metadata and resource requirements
├── code/ # Code logged via code_paths=["..."]
│ ├── agent.py # Main agent implementation
│ ├── utils.py # (optional) Helper modules
│ ├── tools.py # (optional) Custom tool definitions
│ └── ... # Any other code dependencies
├── artifacts/ # (optional) Artifacts logged via artifacts={...}
│ ├── config.yaml # (optional) Configuration files
│ ├── prompts/ # (optional) Prompt templates
│ └── ... # Any other artifacts (data files, etc.)
├── input_example.json # Sample request for testing
├── requirements.txt # Original dependencies
└── ...code/agent.pyResponsesAgentpredict()predict_stream()code/*.pyMLmodelresourcesartifacts/input_example.jsonboto3mlflow[databricks]mlflow--with[databricks]boto3databricks auth login --profile <profile># List profiles to see which workspace each points to
databricks auth profiles
# Verify you can access the workspace
databricks current-user me --profile <profile>
# List models in that workspace
databricks registered-models list --profile <profile>
databricks model-versions list --name "<model-name>" --profile <profile>Task: Mark "Download original agent artifacts" as. Mark "Analyze and understand agent code" ascompleted.in_progress
ResponsesAgentfrom mlflow.pyfunc import ResponsesAgent, ResponsesAgentRequest, ResponsesAgentResponse
class MyAgent(ResponsesAgent):
def predict(self, request: ResponsesAgentRequest, params=None) -> ResponsesAgentResponse:
# Synchronous implementation
...
return ResponsesAgentResponse(output=outputs)
def predict_stream(self, request: ResponsesAgentRequest, params=None):
# Synchronous generator
for chunk in ...:
yield ResponsesAgentStreamEvent(...)<async>from mlflow.genai.agent_server import invoke, stream
from mlflow.types.responses import (
ResponsesAgentRequest,
ResponsesAgentResponse,
ResponsesAgentStreamEvent,
)
@invoke()
async def non_streaming(request: ResponsesAgentRequest) -> ResponsesAgentResponse:
# Async implementation - typically calls streaming() and collects results
outputs = [
event.item
async for event in streaming(request)
if event.type == "response.output_item.done"
]
return ResponsesAgentResponse(output=outputs)
@stream()
async def streaming(request: ResponsesAgentRequest) -> AsyncGenerator[ResponsesAgentStreamEvent, None]:
# Async generator
async for event in ...:
yield event<async>from mlflow.genai.agent_server import invoke, stream
from mlflow.types.responses import (
ResponsesAgentRequest,
ResponsesAgentResponse,
ResponsesAgentStreamEvent,
)
@invoke()
def non_streaming(request: ResponsesAgentRequest) -> ResponsesAgentResponse:
# Same sync logic from original predict(), extracted from the class
...
return ResponsesAgentResponse(output=outputs)
@stream()
def streaming(request: ResponsesAgentRequest):
# Same sync generator from original predict_stream(), extracted from the class
for chunk in ...:
yield ResponsesAgentStreamEvent(...)| Aspect | Model Serving | Apps (async) | Apps (sync) |
|---|---|---|---|
| Structure | | Decorated functions | Decorated functions |
| Functions | | | |
| Streaming | Sync generator ( | Async generator ( | Sync generator ( |
| Server | MLflow Model Server | MLflow GenAI Server (FastAPI) | MLflow GenAI Server (FastAPI) |
| Deployment | | | |
<async>Skip this section if the user chose synchronous migration. The sync path keeps all original I/O calls as-is.
# OLD (sync)
response = client.chat(messages)
# NEW (async)
response = await client.achat(messages)
# OLD (sync iteration)
for chunk in stream:
yield chunk
# NEW (async iteration)
async for chunk in stream:
yield chunkTask: Mark "Analyze and understand agent code" as. Mark "Migrate agent code to Apps format" ascompleted.in_progress
/codeagent_server/# Copy all Python files from original code folder
cp ./original_mlflow_model/code/*.py ./<app-name>/agent_server/
# If there are subdirectories with code, copy those too
# cp -r ./original_mlflow_model/code/submodule ./<app-name>/agent_server/# Create an artifacts directory in the migrated app if needed
mkdir -p ./<app-name>/agent_server/artifacts
# Copy all artifacts
cp -r ./original_mlflow_model/artifacts/* ./<app-name>/agent_server/artifacts/ 2>/dev/null || true# BEFORE (if files were in different locations):
from code.utils import helper_function
from artifacts.prompts import SYSTEM_PROMPT
# AFTER (files are now in agent_server/):
from agent_server.utils import helper_function
# Or if in same directory:
from .utils import helper_function
# For artifacts, update file paths:
# BEFORE:
with open("artifacts/config.yaml") as f:
# AFTER:
import os
config_path = os.path.join(os.path.dirname(__file__), "artifacts", "config.yaml")
with open(config_path) as f:Important: Review each copied file and ensure all imports resolve correctly. The most common issues are:
- Relative imports that assumed a different directory structure
- Hardcoded file paths to artifacts
- Missing
files for package imports__init__.py
databricks-claude-sonnet-4-5<async>ResponsesAgent@invoke@stream<app-name>/agent_server/agent.pyfrom mlflow.genai.agent_server import invoke, stream
from mlflow.types.responses import (
ResponsesAgentRequest,
ResponsesAgentResponse,
ResponsesAgentStreamEvent,
)
# Move any class __init__ or class-level setup to module level
# e.g., client initialization, tool setup, etc.
@invoke()
def non_streaming(request: ResponsesAgentRequest) -> ResponsesAgentResponse:
# Paste the body of the original predict() method here
# Remove 'self.' references — replace with module-level variables
# Remove 'params' parameter (not used in Apps)
...
return ResponsesAgentResponse(output=outputs)
@stream()
def streaming(request: ResponsesAgentRequest):
# Paste the body of the original predict_stream() method here
# Remove 'self.' references — replace with module-level variables
# Remove 'params' parameter (not used in Apps)
for chunk in ...:
yield ResponsesAgentStreamEvent(...)class MyAgent(ResponsesAgent):self__init__self.some_attribute@invoke()@stream()forasync forawait<async><app-name>/agent_server/agent.pyLLM_ENDPOINT_NAME = "<your-endpoint-from-original>"SYSTEM_PROMPT = """<your-system-prompt-from-original>"""from langchain_core.tools import tool
@tool
async def my_custom_tool(arg: str) -> str:
"""Tool description."""
# Your tool logic (make async if needed)
return resultdef predict()async def non_streaming()def predict_stream()async def streaming()client.chat()await client.achat()for chunk in stream:async for chunk in stream:awaitAsyncCheckpointSaverLAKEBASE_INSTANCE_NAME.envrequest.custom_inputsrequest.context.conversation_idAsyncDatabricksStoreLAKEBASE_INSTANCE_NAME.envrequest.custom_inputsrequest.context.user_idTask: Mark "Migrate agent code to Apps format" as. Mark "Set up and configure the app" ascompleted.in_progress
# Create a minimal README if one doesn't exist
if [ ! -f "README.md" ]; then
echo "# Migrated Agent App" > README.md
ficd <app-name>
uv syncrequirements.txtuvpyproject.tomlecho "uv" > requirements.txtuv run quickstartuv run quickstart.envImportant: The quickstart script creates the MLflow experiment that the app needs for logging traces and models. This experiment will be added as a resource when deploying the app.
.env# Databricks authentication
DATABRICKS_CONFIG_PROFILE=<your-profile>
# MLflow experiment (created by quickstart, or create manually)
MLFLOW_EXPERIMENT_ID=<experiment-id>
# Example: Lakebase for stateful agents
LAKEBASE_INSTANCE_NAME=<your-lakebase-instance>
# Example: Custom API keys
MY_API_KEY=<value>databricks experiments create-experiment "/Users/<your-username>/<app-name>" --profile <profile>Task: Mark "Set up and configure the app" as. Mark "Test agent locally" ascompleted.in_progress
Test your migrated agent locally before deploying to Databricks Apps. This helps catch configuration issues early and ensures the agent works correctly.
cd <app-name>
uv run start-apphttp://localhost:8000Note: If you only need the API endpoint (without the chat UI), you can runinstead.uv run start-server
input_example.json# Check the original input example (from the <app-name> directory)
cat ../original_mlflow_model/input_example.json{"input": [{"role": "user", "content": "What is an LLM agent?"}], "custom_inputs": {"thread_id": "example-thread-123"}}# Test with the original input example
curl -X POST http://localhost:8000/invocations \
-H "Content-Type: application/json" \
-d "$(cat ../original_mlflow_model/input_example.json)"# Non-streaming
curl -X POST http://localhost:8000/invocations \
-H "Content-Type: application/json" \
-d '{"input": [{"role": "user", "content": "Hello!"}]}'
# Streaming
curl -X POST http://localhost:8000/invocations \
-H "Content-Type: application/json" \
-d '{"input": [{"role": "user", "content": "Hello!"}], "stream": true}'# With thread_id for short-term memory
curl -X POST http://localhost:8000/invocations \
-H "Content-Type: application/json" \
-d '{"input": [{"role": "user", "content": "Hi"}], "custom_inputs": {"thread_id": "test-123"}}'
# With user_id for long-term memory
curl -X POST http://localhost:8000/invocations \
-H "Content-Type: application/json" \
-d '{"input": [{"role": "user", "content": "Hi"}], "custom_inputs": {"user_id": "user@example.com"}}'Note: Only proceed to Step 6 (Deploy) after confirming the agent works correctly locally.
Task: Mark "Test agent locally" as. Mark "Deploy to Databricks Apps" ascompleted.in_progress
databricks.ymlMLmodelresources../original_mlflow_model/MLmodel./original_mlflow_model/MLmodelresources:
api_version: '1'
databricks:
lakebase:
- name: lakebase
serving_endpoint:
- name: databricks-claude-sonnet-4-5databricks.ymldatabricks.yml<app-name>resources.apps.agent_migration.nametargets.prod.resources.apps.agent_migration.nameresources.apps.agent_migration.resourcesdatabricks.yml| MLmodel Resource | | Key Fields |
|---|---|---|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
Note: Theresource is already configured in the scaffoldexperimentand is automatically created by the bundle. You do not need to add it manually.databricks.yml
databricks.ymlresources:
experiments:
agent_migration_experiment:
name: /Users/${workspace.current_user.userName}/${bundle.name}-${bundle.target}
apps:
agent_migration:
name: "<app-name>" # Update to user's app name
description: "Migrated agent from Model Serving to Databricks Apps"
source_code_path: ./
resources:
- name: 'experiment'
experiment:
experiment_id: "${resources.experiments.agent_migration_experiment.id}"
permission: 'CAN_MANAGE'
- name: 'serving-endpoint'
serving_endpoint:
name: 'databricks-claude-sonnet-4-5'
permission: 'CAN_QUERY'
- name: 'python-exec'
uc_securable:
securable_full_name: 'system.ai.python_exec'
securable_type: 'FUNCTION'
permission: 'EXECUTE'
targets:
prod:
resources:
apps:
agent_migration:
name: "<app-name>" # Same name for production - name: 'database'
database:
database_name: 'databricks_postgres'
instance_name: 'lakebase'
permission: 'CAN_CONNECT_AND_CREATE'<app-name># 1. Validate bundle configuration (catches errors before deploy)
databricks bundle validate --profile <profile>
# 2. Deploy the bundle (creates/updates resources, uploads files)
databricks bundle deploy --profile <profile>
# 3. Run the app (starts/restarts with uploaded source code) - REQUIRED!
databricks bundle run agent_migration --profile <profile>Important:only uploads files and configures resources.bundle deployis required to actually start/restart the app with the new code. If you only runbundle run, the app will continue running old code!deploy
Task: Mark "Deploy to Databricks Apps" as. Mark "Test deployed app" ascompleted.in_progress
# Get the app URL
APP_URL=$(databricks apps get <app-name> --profile <profile> --output json | jq -r '.url')
# Get OAuth token
TOKEN=$(databricks auth token --profile <profile> | jq -r .access_token)
# Query the app
curl -X POST ${APP_URL}/invocations \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-d '{"input": [{"role": "user", "content": "Hello!"}]}'Task: Mark "Test deployed app" as. Migration complete!completed
# Validate bundle configuration
databricks bundle validate --profile <profile>
# View app logs
databricks apps logs <app-name> --profile <profile> --follow
# Check app status
databricks apps get <app-name> --profile <profile> --output json | jq '{app_status, compute_status}'
# Get app URL
databricks apps get <app-name> --profile <profile> --output json | jq -r '.url'databricks bundle deploy<app-name>/
├── agent_server/
│ ├── __init__.py
│ ├── agent.py # Main agent logic - THIS IS WHERE YOU MIGRATE TO
│ ├── start_server.py # FastAPI server setup
│ ├── utils.py # Helper utilities
│ └── evaluate_agent.py # Agent evaluation
├── scripts/
│ ├── __init__.py
│ ├── quickstart.py # Setup script
│ └── start_app.py # App startup
├── app.yaml # Databricks Apps configuration
├── databricks.yml # Databricks Asset Bundle configuration (resources, targets)
├── pyproject.toml # Dependencies (for local dev with uv)
├── requirements.txt # REQUIRED: Must contain "uv" for Databricks Apps
├── .env.example # Environment template
└── README.mdIMPORTANT: Thefile must exist and containrequirements.txtso that Databricks Apps can install dependencies using theuv. Without this file, the app will fail to start.pyproject.toml
class ChatAgent(ResponsesAgent):
def predict(self, request, params=None):
messages = to_chat_completions_input(request.input)
response = self.llm.invoke(messages)
return ResponsesAgentResponse(output=[...])llm = ... # Move class-level init to module level
@invoke()
def non_streaming(request: ResponsesAgentRequest) -> ResponsesAgentResponse:
messages = to_chat_completions_input(request.input)
response = llm.invoke(messages)
return ResponsesAgentResponse(output=[...])
@stream()
def streaming(request: ResponsesAgentRequest):
# Original predict_stream() body, with self. removed
...@invoke()
async def non_streaming(request: ResponsesAgentRequest) -> ResponsesAgentResponse:
outputs = [e.item async for e in streaming(request) if e.type == "response.output_item.done"]
return ResponsesAgentResponse(output=outputs)
@stream()
async def streaming(request: ResponsesAgentRequest) -> AsyncGenerator[ResponsesAgentStreamEvent, None]:
messages = {"messages": to_chat_completions_input([i.model_dump() for i in request.input])}
agent = await init_agent()
async for event in process_agent_astream_events(agent.astream(messages, stream_mode=["updates", "messages"])):
yield eventfrom langchain_core.tools import tool
@tool
async def search_docs(query: str) -> str:
"""Search the documentation."""
results = await vector_store.asimilarity_search(query)
return format_results(results)from langchain.agents import create_agent
from databricks_langchain import ChatDatabricks
async def init_agent():
tools = await mcp_client.get_tools() # MCP tools are async
model = ChatDatabricks(endpoint=LLM_ENDPOINT_NAME)
return create_agent(model=model, tools=tools, system_prompt=SYSTEM_PROMPT)uv sync # Reinstall dependenciesdatabricks auth login # Re-authenticateawait client.achat()client.chat()async forfor