lakebase-provisioned
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseLakebase Provisioned
Lakebase Provisioned
Patterns and best practices for using Lakebase Provisioned (Databricks managed PostgreSQL) for OLTP workloads.
将Lakebase Provisioned(Databricks托管PostgreSQL)用于OLTP工作负载的模式与最佳实践。
When to Use
适用场景
Use this skill when:
- Building applications that need a PostgreSQL database for transactional workloads
- Adding persistent state to Databricks Apps
- Implementing reverse ETL from Delta Lake to an operational database
- Storing chat/agent memory for LangChain applications
在以下场景中使用该技能:
- 构建需要PostgreSQL数据库处理事务型工作负载的应用
- 为Databricks Apps添加持久化状态
- 实现从Delta Lake到业务数据库的反向ETL
- 为LangChain应用存储对话/Agent记忆
Overview
概述
Lakebase Provisioned is Databricks' managed PostgreSQL database service for OLTP (Online Transaction Processing) workloads. It provides a fully managed PostgreSQL-compatible database that integrates with Unity Catalog and supports OAuth token-based authentication.
| Feature | Description |
|---|---|
| Managed PostgreSQL | Fully managed instances with automatic provisioning |
| OAuth Authentication | Token-based auth via Databricks SDK (1-hour expiry) |
| Unity Catalog | Register databases for governance |
| Reverse ETL | Sync data from Delta tables to PostgreSQL |
| Apps Integration | First-class support in Databricks Apps |
Available Regions (AWS): us-east-1, us-east-2, us-west-2, eu-central-1, eu-west-1, ap-south-1, ap-southeast-1, ap-southeast-2
Lakebase Provisioned是Databricks针对OLTP(在线事务处理)工作负载推出的托管PostgreSQL数据库服务。它提供完全托管的PostgreSQL兼容数据库,可与Unity Catalog集成,并支持基于OAuth令牌的身份验证。
| 功能 | 描述 |
|---|---|
| 托管PostgreSQL | 自动配置的全托管实例 |
| OAuth身份验证 | 通过Databricks SDK实现的基于令牌的身份验证(有效期1小时) |
| Unity Catalog | 注册数据库以进行治理 |
| 反向ETL | 将Delta表中的数据同步到PostgreSQL |
| Apps集成 | 对Databricks Apps提供一等支持 |
可用区域(AWS): us-east-1, us-east-2, us-west-2, eu-central-1, eu-west-1, ap-south-1, ap-southeast-1, ap-southeast-2
Quick Start
快速开始
Create and connect to a Lakebase Provisioned instance:
python
from databricks.sdk import WorkspaceClient
import uuid创建并连接到Lakebase Provisioned实例:
python
from databricks.sdk import WorkspaceClient
import uuidInitialize client
初始化客户端
w = WorkspaceClient()
w = WorkspaceClient()
Create a database instance
创建数据库实例
instance = w.database.create_database_instance(
name="my-lakebase-instance",
capacity="CU_1", # CU_1, CU_2, CU_4, CU_8
stopped=False
)
print(f"Instance created: {instance.name}")
print(f"DNS endpoint: {instance.read_write_dns}")
undefinedinstance = w.database.create_database_instance(
name="my-lakebase-instance",
capacity="CU_1", # CU_1, CU_2, CU_4, CU_8
stopped=False
)
print(f"实例已创建:{instance.name}")
print(f"DNS端点:{instance.read_write_dns}")
undefinedCommon Patterns
常见模式
Generate OAuth Token
生成OAuth令牌
python
from databricks.sdk import WorkspaceClient
import uuid
w = WorkspaceClient()python
from databricks.sdk import WorkspaceClient
import uuid
w = WorkspaceClient()Generate OAuth token for database connection
生成用于数据库连接的OAuth令牌
cred = w.database.generate_database_credential(
request_id=str(uuid.uuid4()),
instance_names=["my-lakebase-instance"]
)
token = cred.token # Use this as password in connection string
undefinedcred = w.database.generate_database_credential(
request_id=str(uuid.uuid4()),
instance_names=["my-lakebase-instance"]
)
token = cred.token # 将此作为连接字符串中的密码使用
undefinedConnect from Notebook
从Notebook连接
python
import psycopg
from databricks.sdk import WorkspaceClient
import uuidpython
import psycopg
from databricks.sdk import WorkspaceClient
import uuidGet instance details
获取实例详情
w = WorkspaceClient()
instance = w.database.get_database_instance(name="my-lakebase-instance")
w = WorkspaceClient()
instance = w.database.get_database_instance(name="my-lakebase-instance")
Generate token
生成令牌
cred = w.database.generate_database_credential(
request_id=str(uuid.uuid4()),
instance_names=["my-lakebase-instance"]
)
cred = w.database.generate_database_credential(
request_id=str(uuid.uuid4()),
instance_names=["my-lakebase-instance"]
)
Connect using psycopg3
使用psycopg3连接
conn_string = f"host={instance.read_write_dns} dbname=postgres user={w.current_user.me().user_name} password={cred.token} sslmode=require"
with psycopg.connect(conn_string) as conn:
with conn.cursor() as cur:
cur.execute("SELECT version()")
print(cur.fetchone())
undefinedconn_string = f"host={instance.read_write_dns} dbname=postgres user={w.current_user.me().user_name} password={cred.token} sslmode=require"
with psycopg.connect(conn_string) as conn:
with conn.cursor() as cur:
cur.execute("SELECT version()")
print(cur.fetchone())
undefinedSQLAlchemy with Token Refresh (Production)
带令牌刷新的SQLAlchemy(生产环境)
For long-running applications, tokens must be refreshed (expire after 1 hour):
python
import asyncio
import os
import uuid
from sqlalchemy import event
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from databricks.sdk import WorkspaceClient对于长时间运行的应用,必须刷新令牌(有效期1小时):
python
import asyncio
import os
import uuid
from sqlalchemy import event
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from databricks.sdk import WorkspaceClientToken refresh state
令牌刷新状态
_current_token = None
_token_refresh_task = None
TOKEN_REFRESH_INTERVAL = 50 * 60 # 50 minutes (before 1-hour expiry)
def _generate_token(instance_name: str) -> str:
"""Generate fresh OAuth token."""
w = WorkspaceClient()
cred = w.database.generate_database_credential(
request_id=str(uuid.uuid4()),
instance_names=[instance_name]
)
return cred.token
async def _token_refresh_loop(instance_name: str):
"""Background task to refresh token every 50 minutes."""
global _current_token
while True:
await asyncio.sleep(TOKEN_REFRESH_INTERVAL)
_current_token = await asyncio.to_thread(_generate_token, instance_name)
def init_database(instance_name: str, database_name: str, username: str) -> AsyncEngine:
"""Initialize database with OAuth token injection."""
global _current_token
w = WorkspaceClient()
instance = w.database.get_database_instance(name=instance_name)
# Generate initial token
_current_token = _generate_token(instance_name)
# Build URL (password injected via do_connect)
url = f"postgresql+psycopg://{username}@{instance.read_write_dns}:5432/{database_name}"
engine = create_async_engine(
url,
pool_size=5,
max_overflow=10,
pool_recycle=3600,
connect_args={"sslmode": "require"}
)
# Inject token on each connection
@event.listens_for(engine.sync_engine, "do_connect")
def provide_token(dialect, conn_rec, cargs, cparams):
cparams["password"] = _current_token
return engineundefined_current_token = None
_token_refresh_task = None
TOKEN_REFRESH_INTERVAL = 50 * 60 # 50分钟(在1小时有效期前刷新)
def _generate_token(instance_name: str) -> str:
"""生成新的OAuth令牌。"""
w = WorkspaceClient()
cred = w.database.generate_database_credential(
request_id=str(uuid.uuid4()),
instance_names=[instance_name]
)
return cred.token
async def _token_refresh_loop(instance_name: str):
"""后台任务,每50分钟刷新一次令牌。"""
global _current_token
while True:
await asyncio.sleep(TOKEN_REFRESH_INTERVAL)
_current_token = await asyncio.to_thread(_generate_token, instance_name)
def init_database(instance_name: str, database_name: str, username: str) -> AsyncEngine:
"""初始化数据库并注入OAuth令牌。"""
global _current_token
w = WorkspaceClient()
instance = w.database.get_database_instance(name=instance_name)
# 生成初始令牌
_current_token = _generate_token(instance_name)
# 构建URL(密码通过do_connect注入)
url = f"postgresql+psycopg://{username}@{instance.read_write_dns}:5432/{database_name}"
engine = create_async_engine(
url,
pool_size=5,
max_overflow=10,
pool_recycle=3600,
connect_args={"sslmode": "require"}
)
# 为每个连接注入令牌
@event.listens_for(engine.sync_engine, "do_connect")
def provide_token(dialect, conn_rec, cargs, cparams):
cparams["password"] = _current_token
return engineundefinedDatabricks Apps Integration
Databricks Apps集成
For Databricks Apps, use environment variables for configuration:
python
undefined对于Databricks Apps,使用环境变量进行配置:
python
undefinedEnvironment variables set by Databricks Apps:
Databricks Apps设置的环境变量:
- LAKEBASE_INSTANCE_NAME: Instance name
- LAKEBASE_INSTANCE_NAME: 实例名称
- LAKEBASE_DATABASE_NAME: Database name
- LAKEBASE_DATABASE_NAME: 数据库名称
- LAKEBASE_USERNAME: Username (optional, defaults to service principal)
- LAKEBASE_USERNAME: 用户名(可选,默认使用服务主体)
import os
def is_lakebase_configured() -> bool:
"""Check if Lakebase is configured for this app."""
return bool(
os.environ.get("LAKEBASE_PG_URL") or
(os.environ.get("LAKEBASE_INSTANCE_NAME") and
os.environ.get("LAKEBASE_DATABASE_NAME"))
)
Add Lakebase as an app resource via CLI:
```bash
databricks apps add-resource $APP_NAME \
--resource-type database \
--resource-name lakebase \
--database-instance my-lakebase-instanceimport os
def is_lakebase_configured() -> bool:
"""检查当前应用是否已配置Lakebase。"""
return bool(
os.environ.get("LAKEBASE_PG_URL") or
(os.environ.get("LAKEBASE_INSTANCE_NAME") and
os.environ.get("LAKEBASE_DATABASE_NAME"))
)
通过CLI将Lakebase添加为应用资源:
```bash
databricks apps add-resource $APP_NAME \
--resource-type database \
--resource-name lakebase \
--database-instance my-lakebase-instanceRegister with Unity Catalog
在Unity Catalog中注册
python
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()python
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()Register database in Unity Catalog
在Unity Catalog中注册数据库
w.database.register_database_instance(
name="my-lakebase-instance",
catalog="my_catalog",
schema="my_schema"
)
undefinedw.database.register_database_instance(
name="my-lakebase-instance",
catalog="my_catalog",
schema="my_schema"
)
undefinedMLflow Model Resources
MLflow模型资源
Declare Lakebase as a model resource for automatic credential provisioning:
python
from mlflow.models.resources import DatabricksLakebase
resources = [
DatabricksLakebase(database_instance_name="my-lakebase-instance"),
]将Lakebase声明为模型资源以自动配置凭据:
python
from mlflow.models.resources import DatabricksLakebase
resources = [
DatabricksLakebase(database_instance_name="my-lakebase-instance"),
]When logging model
记录模型时
mlflow.langchain.log_model(
model,
artifact_path="model",
resources=resources,
pip_requirements=["databricks-langchain[memory]"]
)
undefinedmlflow.langchain.log_model(
model,
artifact_path="model",
resources=resources,
pip_requirements=["databricks-langchain[memory]"]
)
undefinedMCP Tools
MCP工具
The following MCP tools are available for managing Lakebase Provisioned infrastructure.
以下MCP工具可用于管理Lakebase Provisioned基础设施。
Instance Management
实例管理
| Tool | Description |
|---|---|
| Create a managed PostgreSQL instance (CU_1, CU_2, CU_4, CU_8) |
| Get instance details (state, DNS, capacity) |
| List all instances in the workspace |
| Resize or start/stop an instance |
| Delete an instance |
| Generate OAuth token for PostgreSQL connections (1-hour expiry) |
| 工具 | 描述 |
|---|---|
| 创建托管PostgreSQL实例(支持CU_1、CU_2、CU_4、CU_8规格) |
| 获取实例详情(状态、DNS、规格) |
| 列出工作区中的所有实例 |
| 调整实例规格或启动/停止实例 |
| 删除实例 |
| 生成用于PostgreSQL连接的OAuth令牌(有效期1小时) |
Unity Catalog Registration
Unity Catalog注册
| Tool | Description |
|---|---|
| Register a Lakebase instance as a Unity Catalog catalog. Params: |
| Get catalog registration details |
| Remove catalog registration (does not delete instance) |
| 工具 | 描述 |
|---|---|
| 将Lakebase实例注册为Unity Catalog目录。参数: |
| 获取目录注册详情 |
| 移除目录注册(不会删除实例) |
Reverse ETL (Synced Tables)
反向ETL(同步表)
| Tool | Description |
|---|---|
| Create a synced table from Delta to Lakebase. Params: |
| Get synced table status |
| Delete a synced table |
| 工具 | 描述 |
|---|---|
| 创建从Delta到Lakebase的同步表。参数: |
| 获取同步表状态 |
| 删除同步表 |
Reference Files
参考文档
- connection-patterns.md - Detailed connection patterns for different use cases
- reverse-etl.md - Syncing data from Delta Lake to Lakebase
- connection-patterns.md - 针对不同使用场景的详细连接模式
- reverse-etl.md - 从Delta Lake同步数据到Lakebase的指南
CLI Quick Reference
CLI快速参考
bash
undefinedbash
undefinedCreate instance
创建实例
databricks database create-database-instance
--name my-lakebase-instance
--capacity CU_1
--name my-lakebase-instance
--capacity CU_1
databricks database create-database-instance
--name my-lakebase-instance
--capacity CU_1
--name my-lakebase-instance
--capacity CU_1
Get instance details
获取实例详情
databricks database get-database-instance --name my-lakebase-instance
databricks database get-database-instance --name my-lakebase-instance
Generate credentials
生成凭据
databricks database generate-database-credential
--request-id $(uuidgen)
--json '{"instance_names": ["my-lakebase-instance"]}'
--request-id $(uuidgen)
--json '{"instance_names": ["my-lakebase-instance"]}'
databricks database generate-database-credential
--request-id $(uuidgen)
--json '{"instance_names": ["my-lakebase-instance"]}'
--request-id $(uuidgen)
--json '{"instance_names": ["my-lakebase-instance"]}'
List instances
列出实例
databricks database list-database-instances
databricks database list-database-instances
Stop instance (saves cost)
停止实例(节省成本)
databricks database stop-database-instance --name my-lakebase-instance
databricks database stop-database-instance --name my-lakebase-instance
Start instance
启动实例
databricks database start-database-instance --name my-lakebase-instance
undefineddatabricks database start-database-instance --name my-lakebase-instance
undefinedCommon Issues
常见问题
| Issue | Solution |
|---|---|
| Token expired during long query | Implement token refresh loop (see SQLAlchemy with Token Refresh section); tokens expire after 1 hour |
| DNS resolution fails on macOS | Use |
| Connection refused | Ensure instance is not stopped; check |
| Permission denied | User must be granted access to the Lakebase instance |
| SSL required error | Always use |
| 问题 | 解决方案 |
|---|---|
| 长查询过程中令牌过期 | 实现令牌刷新循环(参考带令牌刷新的SQLAlchemy章节);令牌有效期为1小时 |
| macOS上DNS解析失败 | 使用 |
| 连接被拒绝 | 确保实例未处于停止状态;检查 |
| 权限被拒绝 | 必须为用户授予Lakebase实例的访问权限 |
| 需要SSL的错误 | 连接字符串中始终使用 |
SDK Version Requirements
SDK版本要求
- Databricks SDK for Python: >= 0.61.0 (0.81.0+ recommended for full API support)
- psycopg: 3.x (supports parameter for DNS workaround)
hostaddr - SQLAlchemy: 2.x with driver
postgresql+psycopg
python
%pip install -U "databricks-sdk>=0.81.0" "psycopg[binary]>=3.0" sqlalchemy- Databricks SDK for Python:>= 0.61.0(推荐使用0.81.0+以获得完整API支持)
- psycopg:3.x(支持参数以解决DNS问题)
hostaddr - SQLAlchemy:2.x搭配驱动
postgresql+psycopg
python
%pip install -U "databricks-sdk>=0.81.0" "psycopg[binary]>=3.0" sqlalchemyNotes
注意事项
- Capacity values use compute unit sizing: ,
CU_1,CU_2,CU_4.CU_8 - Lakebase Autoscaling is a newer offering with automatic scaling but limited regional availability. This skill focuses on Lakebase Provisioned which is more widely available.
- For memory/state in LangChain agents, use which includes Lakebase support.
databricks-langchain[memory] - Tokens are short-lived (1 hour) - production apps MUST implement token refresh.
- 规格值使用计算单元进行划分:、
CU_1、CU_2、CU_4。CU_8 - Lakebase自动扩缩容是较新的产品,支持自动扩缩但区域可用性有限。本技能聚焦于Lakebase Provisioned,它的可用性更广。
- 对于LangChain Agent的记忆/状态存储,使用包含Lakebase支持的。
databricks-langchain[memory] - 令牌有效期短(1小时)——生产应用必须实现令牌刷新机制。