lakebase-provisioned

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Lakebase Provisioned

Lakebase Provisioned

Patterns and best practices for using Lakebase Provisioned (Databricks managed PostgreSQL) for OLTP workloads.
将Lakebase Provisioned(Databricks托管PostgreSQL)用于OLTP工作负载的模式与最佳实践。

When to Use

适用场景

Use this skill when:
  • Building applications that need a PostgreSQL database for transactional workloads
  • Adding persistent state to Databricks Apps
  • Implementing reverse ETL from Delta Lake to an operational database
  • Storing chat/agent memory for LangChain applications
在以下场景中使用该技能:
  • 构建需要PostgreSQL数据库处理事务型工作负载的应用
  • 为Databricks Apps添加持久化状态
  • 实现从Delta Lake到业务数据库的反向ETL
  • 为LangChain应用存储对话/Agent记忆

Overview

概述

Lakebase Provisioned is Databricks' managed PostgreSQL database service for OLTP (Online Transaction Processing) workloads. It provides a fully managed PostgreSQL-compatible database that integrates with Unity Catalog and supports OAuth token-based authentication.
FeatureDescription
Managed PostgreSQLFully managed instances with automatic provisioning
OAuth AuthenticationToken-based auth via Databricks SDK (1-hour expiry)
Unity CatalogRegister databases for governance
Reverse ETLSync data from Delta tables to PostgreSQL
Apps IntegrationFirst-class support in Databricks Apps
Available Regions (AWS): us-east-1, us-east-2, us-west-2, eu-central-1, eu-west-1, ap-south-1, ap-southeast-1, ap-southeast-2
Lakebase Provisioned是Databricks针对OLTP(在线事务处理)工作负载推出的托管PostgreSQL数据库服务。它提供完全托管的PostgreSQL兼容数据库,可与Unity Catalog集成,并支持基于OAuth令牌的身份验证。
功能描述
托管PostgreSQL自动配置的全托管实例
OAuth身份验证通过Databricks SDK实现的基于令牌的身份验证(有效期1小时)
Unity Catalog注册数据库以进行治理
反向ETL将Delta表中的数据同步到PostgreSQL
Apps集成对Databricks Apps提供一等支持
可用区域(AWS): us-east-1, us-east-2, us-west-2, eu-central-1, eu-west-1, ap-south-1, ap-southeast-1, ap-southeast-2

Quick Start

快速开始

Create and connect to a Lakebase Provisioned instance:
python
from databricks.sdk import WorkspaceClient
import uuid
创建并连接到Lakebase Provisioned实例:
python
from databricks.sdk import WorkspaceClient
import uuid

Initialize client

初始化客户端

w = WorkspaceClient()
w = WorkspaceClient()

Create a database instance

创建数据库实例

instance = w.database.create_database_instance( name="my-lakebase-instance", capacity="CU_1", # CU_1, CU_2, CU_4, CU_8 stopped=False ) print(f"Instance created: {instance.name}") print(f"DNS endpoint: {instance.read_write_dns}")
undefined
instance = w.database.create_database_instance( name="my-lakebase-instance", capacity="CU_1", # CU_1, CU_2, CU_4, CU_8 stopped=False ) print(f"实例已创建:{instance.name}") print(f"DNS端点:{instance.read_write_dns}")
undefined

Common Patterns

常见模式

Generate OAuth Token

生成OAuth令牌

python
from databricks.sdk import WorkspaceClient
import uuid

w = WorkspaceClient()
python
from databricks.sdk import WorkspaceClient
import uuid

w = WorkspaceClient()

Generate OAuth token for database connection

生成用于数据库连接的OAuth令牌

cred = w.database.generate_database_credential( request_id=str(uuid.uuid4()), instance_names=["my-lakebase-instance"] ) token = cred.token # Use this as password in connection string
undefined
cred = w.database.generate_database_credential( request_id=str(uuid.uuid4()), instance_names=["my-lakebase-instance"] ) token = cred.token # 将此作为连接字符串中的密码使用
undefined

Connect from Notebook

从Notebook连接

python
import psycopg
from databricks.sdk import WorkspaceClient
import uuid
python
import psycopg
from databricks.sdk import WorkspaceClient
import uuid

Get instance details

获取实例详情

w = WorkspaceClient() instance = w.database.get_database_instance(name="my-lakebase-instance")
w = WorkspaceClient() instance = w.database.get_database_instance(name="my-lakebase-instance")

Generate token

生成令牌

cred = w.database.generate_database_credential( request_id=str(uuid.uuid4()), instance_names=["my-lakebase-instance"] )
cred = w.database.generate_database_credential( request_id=str(uuid.uuid4()), instance_names=["my-lakebase-instance"] )

Connect using psycopg3

使用psycopg3连接

conn_string = f"host={instance.read_write_dns} dbname=postgres user={w.current_user.me().user_name} password={cred.token} sslmode=require" with psycopg.connect(conn_string) as conn: with conn.cursor() as cur: cur.execute("SELECT version()") print(cur.fetchone())
undefined
conn_string = f"host={instance.read_write_dns} dbname=postgres user={w.current_user.me().user_name} password={cred.token} sslmode=require" with psycopg.connect(conn_string) as conn: with conn.cursor() as cur: cur.execute("SELECT version()") print(cur.fetchone())
undefined

SQLAlchemy with Token Refresh (Production)

带令牌刷新的SQLAlchemy(生产环境)

For long-running applications, tokens must be refreshed (expire after 1 hour):
python
import asyncio
import os
import uuid
from sqlalchemy import event
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from databricks.sdk import WorkspaceClient
对于长时间运行的应用,必须刷新令牌(有效期1小时):
python
import asyncio
import os
import uuid
from sqlalchemy import event
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from databricks.sdk import WorkspaceClient

Token refresh state

令牌刷新状态

_current_token = None _token_refresh_task = None TOKEN_REFRESH_INTERVAL = 50 * 60 # 50 minutes (before 1-hour expiry)
def _generate_token(instance_name: str) -> str: """Generate fresh OAuth token.""" w = WorkspaceClient() cred = w.database.generate_database_credential( request_id=str(uuid.uuid4()), instance_names=[instance_name] ) return cred.token
async def _token_refresh_loop(instance_name: str): """Background task to refresh token every 50 minutes.""" global _current_token while True: await asyncio.sleep(TOKEN_REFRESH_INTERVAL) _current_token = await asyncio.to_thread(_generate_token, instance_name)
def init_database(instance_name: str, database_name: str, username: str) -> AsyncEngine: """Initialize database with OAuth token injection.""" global _current_token
w = WorkspaceClient()
instance = w.database.get_database_instance(name=instance_name)

# Generate initial token
_current_token = _generate_token(instance_name)

# Build URL (password injected via do_connect)
url = f"postgresql+psycopg://{username}@{instance.read_write_dns}:5432/{database_name}"

engine = create_async_engine(
    url,
    pool_size=5,
    max_overflow=10,
    pool_recycle=3600,
    connect_args={"sslmode": "require"}
)

# Inject token on each connection
@event.listens_for(engine.sync_engine, "do_connect")
def provide_token(dialect, conn_rec, cargs, cparams):
    cparams["password"] = _current_token

return engine
undefined
_current_token = None _token_refresh_task = None TOKEN_REFRESH_INTERVAL = 50 * 60 # 50分钟(在1小时有效期前刷新)
def _generate_token(instance_name: str) -> str: """生成新的OAuth令牌。""" w = WorkspaceClient() cred = w.database.generate_database_credential( request_id=str(uuid.uuid4()), instance_names=[instance_name] ) return cred.token
async def _token_refresh_loop(instance_name: str): """后台任务,每50分钟刷新一次令牌。""" global _current_token while True: await asyncio.sleep(TOKEN_REFRESH_INTERVAL) _current_token = await asyncio.to_thread(_generate_token, instance_name)
def init_database(instance_name: str, database_name: str, username: str) -> AsyncEngine: """初始化数据库并注入OAuth令牌。""" global _current_token
w = WorkspaceClient()
instance = w.database.get_database_instance(name=instance_name)

# 生成初始令牌
_current_token = _generate_token(instance_name)

# 构建URL(密码通过do_connect注入)
url = f"postgresql+psycopg://{username}@{instance.read_write_dns}:5432/{database_name}"

engine = create_async_engine(
    url,
    pool_size=5,
    max_overflow=10,
    pool_recycle=3600,
    connect_args={"sslmode": "require"}
)

# 为每个连接注入令牌
@event.listens_for(engine.sync_engine, "do_connect")
def provide_token(dialect, conn_rec, cargs, cparams):
    cparams["password"] = _current_token

return engine
undefined

Databricks Apps Integration

Databricks Apps集成

For Databricks Apps, use environment variables for configuration:
python
undefined
对于Databricks Apps,使用环境变量进行配置:
python
undefined

Environment variables set by Databricks Apps:

Databricks Apps设置的环境变量:

- LAKEBASE_INSTANCE_NAME: Instance name

- LAKEBASE_INSTANCE_NAME: 实例名称

- LAKEBASE_DATABASE_NAME: Database name

- LAKEBASE_DATABASE_NAME: 数据库名称

- LAKEBASE_USERNAME: Username (optional, defaults to service principal)

- LAKEBASE_USERNAME: 用户名(可选,默认使用服务主体)

import os
def is_lakebase_configured() -> bool: """Check if Lakebase is configured for this app.""" return bool( os.environ.get("LAKEBASE_PG_URL") or (os.environ.get("LAKEBASE_INSTANCE_NAME") and os.environ.get("LAKEBASE_DATABASE_NAME")) )

Add Lakebase as an app resource via CLI:

```bash
databricks apps add-resource $APP_NAME \
    --resource-type database \
    --resource-name lakebase \
    --database-instance my-lakebase-instance
import os
def is_lakebase_configured() -> bool: """检查当前应用是否已配置Lakebase。""" return bool( os.environ.get("LAKEBASE_PG_URL") or (os.environ.get("LAKEBASE_INSTANCE_NAME") and os.environ.get("LAKEBASE_DATABASE_NAME")) )

通过CLI将Lakebase添加为应用资源:

```bash
databricks apps add-resource $APP_NAME \
    --resource-type database \
    --resource-name lakebase \
    --database-instance my-lakebase-instance

Register with Unity Catalog

在Unity Catalog中注册

python
from databricks.sdk import WorkspaceClient

w = WorkspaceClient()
python
from databricks.sdk import WorkspaceClient

w = WorkspaceClient()

Register database in Unity Catalog

在Unity Catalog中注册数据库

w.database.register_database_instance( name="my-lakebase-instance", catalog="my_catalog", schema="my_schema" )
undefined
w.database.register_database_instance( name="my-lakebase-instance", catalog="my_catalog", schema="my_schema" )
undefined

MLflow Model Resources

MLflow模型资源

Declare Lakebase as a model resource for automatic credential provisioning:
python
from mlflow.models.resources import DatabricksLakebase

resources = [
    DatabricksLakebase(database_instance_name="my-lakebase-instance"),
]
将Lakebase声明为模型资源以自动配置凭据:
python
from mlflow.models.resources import DatabricksLakebase

resources = [
    DatabricksLakebase(database_instance_name="my-lakebase-instance"),
]

When logging model

记录模型时

mlflow.langchain.log_model( model, artifact_path="model", resources=resources, pip_requirements=["databricks-langchain[memory]"] )
undefined
mlflow.langchain.log_model( model, artifact_path="model", resources=resources, pip_requirements=["databricks-langchain[memory]"] )
undefined

MCP Tools

MCP工具

The following MCP tools are available for managing Lakebase Provisioned infrastructure.
以下MCP工具可用于管理Lakebase Provisioned基础设施。

Instance Management

实例管理

ToolDescription
create_lakebase_instance
Create a managed PostgreSQL instance (CU_1, CU_2, CU_4, CU_8)
get_lakebase_instance
Get instance details (state, DNS, capacity)
list_lakebase_instances
List all instances in the workspace
update_lakebase_instance
Resize or start/stop an instance
delete_lakebase_instance
Delete an instance
generate_lakebase_credential
Generate OAuth token for PostgreSQL connections (1-hour expiry)
工具描述
create_lakebase_instance
创建托管PostgreSQL实例(支持CU_1、CU_2、CU_4、CU_8规格)
get_lakebase_instance
获取实例详情(状态、DNS、规格)
list_lakebase_instances
列出工作区中的所有实例
update_lakebase_instance
调整实例规格或启动/停止实例
delete_lakebase_instance
删除实例
generate_lakebase_credential
生成用于PostgreSQL连接的OAuth令牌(有效期1小时)

Unity Catalog Registration

Unity Catalog注册

ToolDescription
create_lakebase_catalog
Register a Lakebase instance as a Unity Catalog catalog. Params:
name
,
instance_name
,
database_name
(default: "databricks_postgres"),
create_database_if_not_exists
(default: False). The catalog is read-only.
get_lakebase_catalog
Get catalog registration details
delete_lakebase_catalog
Remove catalog registration (does not delete instance)
工具描述
create_lakebase_catalog
将Lakebase实例注册为Unity Catalog目录。参数:
name
instance_name
database_name
(默认值:"databricks_postgres")、
create_database_if_not_exists
(默认值:False)。该目录为只读。
get_lakebase_catalog
获取目录注册详情
delete_lakebase_catalog
移除目录注册(不会删除实例)

Reverse ETL (Synced Tables)

反向ETL(同步表)

ToolDescription
create_synced_table
Create a synced table from Delta to Lakebase. Params:
instance_name
,
source_table_name
,
target_table_name
,
primary_key_columns
(optional),
scheduling_policy
("TRIGGERED"/"SNAPSHOT"/"CONTINUOUS", default: "TRIGGERED")
get_synced_table
Get synced table status
delete_synced_table
Delete a synced table
工具描述
create_synced_table
创建从Delta到Lakebase的同步表。参数:
instance_name
source_table_name
target_table_name
primary_key_columns
(可选)、
scheduling_policy
(可选值:"TRIGGERED"/"SNAPSHOT"/"CONTINUOUS",默认值:"TRIGGERED")
get_synced_table
获取同步表状态
delete_synced_table
删除同步表

Reference Files

参考文档

  • connection-patterns.md - Detailed connection patterns for different use cases
  • reverse-etl.md - Syncing data from Delta Lake to Lakebase
  • connection-patterns.md - 针对不同使用场景的详细连接模式
  • reverse-etl.md - 从Delta Lake同步数据到Lakebase的指南

CLI Quick Reference

CLI快速参考

bash
undefined
bash
undefined

Create instance

创建实例

databricks database create-database-instance
--name my-lakebase-instance
--capacity CU_1
databricks database create-database-instance
--name my-lakebase-instance
--capacity CU_1

Get instance details

获取实例详情

databricks database get-database-instance --name my-lakebase-instance
databricks database get-database-instance --name my-lakebase-instance

Generate credentials

生成凭据

databricks database generate-database-credential
--request-id $(uuidgen)
--json '{"instance_names": ["my-lakebase-instance"]}'
databricks database generate-database-credential
--request-id $(uuidgen)
--json '{"instance_names": ["my-lakebase-instance"]}'

List instances

列出实例

databricks database list-database-instances
databricks database list-database-instances

Stop instance (saves cost)

停止实例(节省成本)

databricks database stop-database-instance --name my-lakebase-instance
databricks database stop-database-instance --name my-lakebase-instance

Start instance

启动实例

databricks database start-database-instance --name my-lakebase-instance
undefined
databricks database start-database-instance --name my-lakebase-instance
undefined

Common Issues

常见问题

IssueSolution
Token expired during long queryImplement token refresh loop (see SQLAlchemy with Token Refresh section); tokens expire after 1 hour
DNS resolution fails on macOSUse
dig
command to resolve hostname, pass
hostaddr
to psycopg
Connection refusedEnsure instance is not stopped; check
instance.state
Permission deniedUser must be granted access to the Lakebase instance
SSL required errorAlways use
sslmode=require
in connection string
问题解决方案
长查询过程中令牌过期实现令牌刷新循环(参考带令牌刷新的SQLAlchemy章节);令牌有效期为1小时
macOS上DNS解析失败使用
dig
命令解析主机名,在psycopg中传入
hostaddr
参数
连接被拒绝确保实例未处于停止状态;检查
instance.state
权限被拒绝必须为用户授予Lakebase实例的访问权限
需要SSL的错误连接字符串中始终使用
sslmode=require

SDK Version Requirements

SDK版本要求

  • Databricks SDK for Python: >= 0.61.0 (0.81.0+ recommended for full API support)
  • psycopg: 3.x (supports
    hostaddr
    parameter for DNS workaround)
  • SQLAlchemy: 2.x with
    postgresql+psycopg
    driver
python
%pip install -U "databricks-sdk>=0.81.0" "psycopg[binary]>=3.0" sqlalchemy
  • Databricks SDK for Python:>= 0.61.0(推荐使用0.81.0+以获得完整API支持)
  • psycopg:3.x(支持
    hostaddr
    参数以解决DNS问题)
  • SQLAlchemy:2.x搭配
    postgresql+psycopg
    驱动
python
%pip install -U "databricks-sdk>=0.81.0" "psycopg[binary]>=3.0" sqlalchemy

Notes

注意事项

  • Capacity values use compute unit sizing:
    CU_1
    ,
    CU_2
    ,
    CU_4
    ,
    CU_8
    .
  • Lakebase Autoscaling is a newer offering with automatic scaling but limited regional availability. This skill focuses on Lakebase Provisioned which is more widely available.
  • For memory/state in LangChain agents, use
    databricks-langchain[memory]
    which includes Lakebase support.
  • Tokens are short-lived (1 hour) - production apps MUST implement token refresh.
  • 规格值使用计算单元进行划分:
    CU_1
    CU_2
    CU_4
    CU_8
  • Lakebase自动扩缩容是较新的产品,支持自动扩缩但区域可用性有限。本技能聚焦于Lakebase Provisioned,它的可用性更广。
  • 对于LangChain Agent的记忆/状态存储,使用包含Lakebase支持的
    databricks-langchain[memory]
  • 令牌有效期短(1小时)——生产应用必须实现令牌刷新机制。