QuickBooks Online API Expert Guide
QuickBooks Online API 专家指南
The QuickBooks Online API provides comprehensive access to accounting data and operations for QuickBooks Online companies. This skill enables you to build integrations that handle invoicing, payments, customer management, inventory tracking, and financial reporting. The API uses OAuth 2.0 for authentication and supports operations across all major accounting entities including customers, invoices, payments, items, accounts, and more.
The QuickBooks Online API is REST-based, returns JSON or XML responses, and provides SDKs for Java, Python, PHP, Node.js, and C#. It supports both sandbox (development) and production environments.
When to Use This Skill
Use this skill when:
- Building QuickBooks integrations for accounting automation
- Implementing invoicing workflows or payment processing
- Creating customer or vendor management features
- Working with QuickBooks Online API authentication (OAuth2)
- Troubleshooting API errors or validation failures
- Implementing batch operations for bulk data updates
- Setting up change data capture (CDC) or webhooks for data synchronization
- Designing multi-currency or international accounting integrations
- Building reports or analytics on top of QuickBooks data
- Migrating data to/from QuickBooks Online
QuickBooks Online API 可为QuickBooks Online公司提供会计数据和操作的全面访问能力。本指南可帮助你构建处理发票开具、支付、客户管理、库存追踪和财务报表的集成服务。该API使用OAuth 2.0进行身份验证,支持所有核心会计实体的操作,包括客户、发票、支付、商品、账户等。
QuickBooks Online API 基于REST架构,返回JSON或XML格式的响应,并提供面向Java、Python、PHP、Node.js和C#的SDK,同时支持沙箱(开发)和生产环境。
Authentication & OAuth2 Setup
适用场景
QuickBooks Online API requires OAuth 2.0 authentication. The flow involves:
- Register your app at developer.intuit.com to get Client ID and Client Secret
- Direct users to authorization URL where they grant access to their QuickBooks company
- Exchange authorization code for tokens (access token + refresh token)
- Use access token in API requests (Authorization: Bearer header)
- Refresh tokens before expiration to maintain access
当你有以下需求时可使用本指南:
- 构建QuickBooks集成实现会计自动化
- 实现发票工作流或支付处理功能
- 开发客户或供应商管理特性
- 处理QuickBooks Online API身份验证(OAuth2)
- 排查API错误或验证失败问题
- 实现批量数据更新的批处理操作
- 搭建变更数据捕获(CDC)或webhook实现数据同步
- 设计多币种或国际会计集成方案
- 基于QuickBooks数据构建报表或分析功能
- 向/从QuickBooks Online迁移数据
Token Lifecycle
身份验证与OAuth2配置
Access Tokens:
- Valid for 3600 seconds (1 hour)
- Include in Authorization header:
Authorization: Bearer {access_token}
- Return 401 Unauthorized when expired
Refresh Tokens:
- Valid for 100 days from issuance
- Use to obtain new access token + refresh token pair
- Previous refresh token expires 24 hours after new one is issued
- Always use the most recent refresh token
QuickBooks Online API要求使用OAuth 2.0身份验证,流程包含以下步骤:
- 在developer.intuit.com注册你的应用,获取客户端ID(Client ID)和客户端密钥(Client Secret)
- 引导用户跳转至授权URL,由用户授予你访问其QuickBooks公司数据的权限
- 用授权码交换令牌(访问令牌+刷新令牌)
- 在API请求中使用访问令牌(Authorization: Bearer请求头)
- 在令牌过期前刷新令牌以保持访问权限
Token Refresh Pattern
令牌生命周期
Node.js Example:
javascript
const oauthClient = require('intuit-oauth');
// Refresh access token
oauthClient.refresh()
.then(function(authResponse) {
const newAccessToken = authResponse.token.access_token;
const newRefreshToken = authResponse.token.refresh_token;
const expiresIn = authResponse.token.expires_in; // 3600 seconds
// Store new tokens securely (database, encrypted storage)
console.log('Tokens refreshed successfully');
})
.catch(function(e) {
console.error('Token refresh failed:', e.originalMessage);
// Handle re-authentication if refresh token is invalid
});
Python Example:
python
from intuitlib.client import AuthClient
auth_client = AuthClient(
client_id='YOUR_CLIENT_ID',
client_secret='YOUR_CLIENT_SECRET',
redirect_uri='YOUR_REDIRECT_URI',
environment='sandbox' # or 'production'
)
访问令牌:
- 有效期为 3600秒(1小时)
- 需包含在Authorization请求头中:
Authorization: Bearer {access_token}
- 过期时返回401 Unauthorized错误
刷新令牌:
- 自签发起有效期为 100天
- 用于获取新的访问令牌+刷新令牌对
- 旧刷新令牌会在新令牌签发后24小时过期
- 请始终使用最新的刷新令牌
auth_client.refresh(refresh_token='STORED_REFRESH_TOKEN')
Node.js示例:
javascript
const oauthClient = require('intuit-oauth');
// Refresh access token
oauthClient.refresh()
.then(function(authResponse) {
const newAccessToken = authResponse.token.access_token;
const newRefreshToken = authResponse.token.refresh_token;
const expiresIn = authResponse.token.expires_in; // 3600 seconds
// Store new tokens securely (database, encrypted storage)
console.log('Tokens refreshed successfully');
})
.catch(function(e) {
console.error('Token refresh failed:', e.originalMessage);
// Handle re-authentication if refresh token is invalid
});
Python示例:
python
from intuitlib.client import AuthClient
auth_client = AuthClient(
client_id='YOUR_CLIENT_ID',
client_secret='YOUR_CLIENT_SECRET',
redirect_uri='YOUR_REDIRECT_URI',
environment='sandbox' # or 'production'
)
Get new tokens
Refresh tokens
new_access_token = auth_client.access_token
new_refresh_token = auth_client.refresh_token
auth_client.refresh(refresh_token='STORED_REFRESH_TOKEN')
Best Practices
Get new tokens
- Refresh proactively: Refresh tokens before they expire (e.g., after 50 minutes)
- Store securely: Encrypt tokens in database, never commit to version control
- Handle 401 responses: Automatically attempt token refresh on authentication errors
- Realm ID (Company ID): Store the realmId returned during OAuth - required for all API calls
- Scopes: Request only necessary scopes (accounting, payments, etc.)
new_access_token = auth_client.access_token
new_refresh_token = auth_client.refresh_token
Core Entities Reference
最佳实践
Represents customers and sub-customers (jobs) in QuickBooks.
Key Fields:
- (string, read-only): Unique identifier
- (string, required): Customer display name (must be unique)
- , (string): First and last name
- (string): Company name for business customers
- (object): Email address
{ "Address": "email@example.com" }
- (object): Phone number
{ "FreeFormNumber": "(555) 123-4567" }
- , (object): Billing and shipping addresses
- (decimal, read-only): Current outstanding balance
- (boolean): Whether customer is active
- (string, required for updates): Version number for optimistic locking
Reference Type: Use
in transactions:
{ "value": "123", "name": "Customer Name" }
- 主动刷新:在令牌过期前刷新(例如使用50分钟后就刷新)
- 安全存储:将令牌加密存储在数据库中,绝对不要提交到版本控制系统
- 处理401响应:遇到身份验证错误时自动尝试刷新令牌
- Realm ID(公司ID):存储OAuth流程中返回的realmId,所有API调用都需要该参数
- 权限范围:仅请求必要的权限范围(会计、支付等)
Represents sales invoices sent to customers.
Key Fields:
- (string, read-only): Unique identifier
- (string): Invoice number (auto-generated if not provided)
- (date): Transaction date (YYYY-MM-DD format)
- (date): Payment due date
- (object, required): Reference to customer
{ "value": "customerId" }
- (array, required): Invoice line items (see Line Items section)
- (decimal, read-only): Calculated total amount
- (decimal, read-only): Remaining unpaid balance
- (enum): NotSet, NeedToSend, EmailSent
- (object): Customer email for invoice delivery
- (object): Tax calculation details
- (array): Linked transactions (payments, credit memos)
- (string, required for updates): Version number
Line Items:
json
{
"Line": [
{
"Amount": 100.00,
"DetailType": "SalesItemLineDetail",
"SalesItemLineDetail": {
"ItemRef": { "value": "1", "name": "Services" },
"Qty": 1,
"UnitPrice": 100.00,
"TaxCodeRef": { "value": "TAX" }
}
},
{
"Amount": 100.00,
"DetailType": "SubTotalLineDetail",
"SubTotalLineDetail": {}
}
]
}
代表QuickBooks中的客户和子客户(项目)。
关键字段:
- (字符串,只读):唯一标识符
- (字符串,必填):客户显示名称(必须唯一)
- 、(字符串):名字和姓氏
- (字符串):企业客户的公司名称
- (对象):邮箱地址
{ "Address": "email@example.com" }
- (对象):电话号码
{ "FreeFormNumber": "(555) 123-4567" }
- 、(对象):账单地址和配送地址
- (小数,只读):当前未结清余额
- (布尔值):客户是否处于激活状态
- (字符串,更新时必填):用于乐观锁的版本号
引用类型:在交易中使用
:
{ "value": "123", "name": "Customer Name" }
Represents payments received from customers against invoices.
Key Fields:
- (string, read-only): Unique identifier
- (decimal, required): Total payment amount
- (object, required): Reference to customer
- (object): Payment method (cash, check, credit card, etc.)
- (string): Reference number (check number, transaction ID)
- (date): Payment date
- (object): Bank account for deposit
- (array): Payment application to invoices/credit memos
- (decimal, read-only): Amount not applied to invoices
- (string, required for updates): Version number
Payment Line Item (applies payment to invoice):
json
{
"Line": [
{
"Amount": 100.00,
"LinkedTxn": [
{
"TxnId": "123",
"TxnType": "Invoice"
}
]
}
]
}
代表发送给客户的销售发票。
关键字段:
- (字符串,只读):唯一标识符
- (字符串):发票编号(未提供则自动生成)
- (日期):交易日期(YYYY-MM-DD格式)
- (日期):付款截止日期
- (对象,必填):关联客户的引用
{ "value": "customerId" }
- (数组,必填):发票明细项(见明细项部分)
- (小数,只读):计算得出的总金额
- (小数,只读):剩余未支付余额
- (枚举):NotSet(未设置)、NeedToSend(待发送)、EmailSent(已发送)
- (对象):发票投递的客户邮箱
- (对象):税费计算详情
- (数组):关联交易(支付、贷项通知单)
- (字符串,更新时必填):版本号
明细项示例:
json
{
"Line": [
{
"Amount": 100.00,
"DetailType": "SalesItemLineDetail",
"SalesItemLineDetail": {
"ItemRef": { "value": "1", "name": "Services" },
"Qty": 1,
"UnitPrice": 100.00,
"TaxCodeRef": { "value": "TAX" }
}
},
{
"Amount": 100.00,
"DetailType": "SubTotalLineDetail",
"SubTotalLineDetail": {}
}
]
}
Represents products or services sold.
Types:
- : Services (consulting, labor, etc.)
- : Physical products tracked in inventory
- : Physical products not tracked
- : Grouping for other items
Key Fields:
- (string, read-only): Unique identifier
- (string, required): Item name (must be unique)
- (enum, required): Service, Inventory, NonInventory, Category
- (string): Item description
- (decimal): Sales price
- (decimal): Purchase/cost price
- (object, required): Income account reference
- (object): Expense account for purchases
- (boolean): Whether to track inventory quantity
- (decimal): Current inventory quantity
- (boolean): Whether item is active
代表从客户处收到的对应发票的付款。
关键字段:
- (字符串,只读):唯一标识符
- (小数,必填):总付款金额
- (对象,必填):关联客户的引用
- (对象):支付方式(现金、支票、信用卡等)
- (字符串):参考编号(支票号、交易ID)
- (日期):付款日期
- (对象):存入的银行账户
- (数组):对应发票/贷项通知单的付款分配
- (小数,只读):未分配到发票的金额
- (字符串,更新时必填):版本号
支付明细项(将付款分配到发票):
json
{
"Line": [
{
"Amount": 100.00,
"LinkedTxn": [
{
"TxnId": "123",
"TxnType": "Invoice"
}
]
}
]
}
Represents accounts in the chart of accounts.
Key Fields:
- (string, read-only): Unique identifier
- (string, required): Account name
- (enum, required): Bank, Accounts Receivable, Accounts Payable, Income, Expense, etc.
- (enum): More specific type (CashOnHand, Checking, Savings, etc.)
- (decimal, read-only): Current account balance
- (boolean): Whether account is active
- (enum): Asset, Liability, Equity, Revenue, Expense
Common Account Types:
- : Bank and cash accounts
- : Customer balances
- : Vendor balances
- : Revenue accounts
- : Expense accounts
- : Short-term assets
- : Long-term assets
代表销售的产品或服务。
类型:
- :服务(咨询、人工等)
- :需要跟踪库存的实体产品
- :不跟踪库存的实体产品
- :其他商品的分组分类
关键字段:
- (字符串,只读):唯一标识符
- (字符串,必填):商品名称(必须唯一)
- (枚举,必填):Service、Inventory、NonInventory、Category
- (字符串):商品描述
- (小数):销售价格
- (小数):采购/成本价
- (对象,必填):收入账户引用
- (对象):采购对应的支出账户
- (布尔值):是否跟踪库存数量
- (小数):当前库存数量
- (布尔值):商品是否处于激活状态
CRUD Operations Patterns
账户(Account)
Minimum Required Fields: Each entity has specific required fields (usually a name/reference and amount).
Endpoint Pattern:
POST /v3/company/{realmId}/{entityName}
Request Headers:
Authorization: Bearer {access_token}
Accept: application/json
Content-Type: application/json
Python Example - Create Invoice:
python
import requests
realm_id = "YOUR_REALM_ID"
access_token = "YOUR_ACCESS_TOKEN"
url = f"https://sandbox-quickbooks.api.intuit.com/v3/company/{realm_id}/invoice"
headers = {
"Authorization": f"Bearer {access_token}",
"Accept": "application/json",
"Content-Type": "application/json"
}
invoice_data = {
"Line": [
{
"Amount": 100.00,
"DetailType": "SalesItemLineDetail",
"SalesItemLineDetail": {
"ItemRef": {"value": "1"}
}
}
],
"CustomerRef": {"value": "1"}
}
response = requests.post(url, json=invoice_data, headers=headers)
if response.status_code == 200:
invoice = response.json()['Invoice']
print(f"Invoice created: {invoice['Id']}")
else:
print(f"Error: {response.status_code} - {response.text}")
代表会计科目表中的账户。
关键字段:
- (字符串,只读):唯一标识符
- (字符串,必填):账户名称
- (枚举,必填):银行、应收账款、应付账款、收入、支出等
- (枚举):更具体的类型(库存现金、支票账户、储蓄账户等)
- (小数,只读):当前账户余额
- (布尔值):账户是否处于激活状态
- (枚举):资产、负债、权益、收入、支出
常见账户类型:
- :银行和现金账户
- :客户应收账款
- :供应商应付账款
- :收入账户
- :支出账户
- :流动资产
- :固定资产
Single Entity:
GET /v3/company/{realmId}/{entityName}/{entityId}
Node.js Example - Read Customer:
javascript
const axios = require('axios');
async function readCustomer(realmId, customerId, accessToken) {
const url = `https://sandbox-quickbooks.api.intuit.com/v3/company/${realmId}/customer/${customerId}`;
try {
const response = await axios.get(url, {
headers: {
'Authorization': `Bearer ${accessToken}`,
'Accept': 'application/json'
}
});
return response.data.Customer;
} catch (error) {
if (error.response && error.response.status === 401) {
// Token expired, refresh and retry
console.error('Authentication failed - refresh token needed');
} else {
console.error('Read failed:', error.response?.data || error.message);
}
throw error;
}
}
最低必填字段:每个实体都有特定的必填字段(通常是名称/引用和金额)。
端点模式:
POST /v3/company/{realmId}/{entityName}
请求头:
Authorization: Bearer {access_token}
Accept: application/json
Content-Type: application/json
Python示例 - 创建发票:
python
import requests
realm_id = "YOUR_REALM_ID"
access_token = "YOUR_ACCESS_TOKEN"
url = f"https://sandbox-quickbooks.api.intuit.com/v3/company/{realm_id}/invoice"
headers = {
"Authorization": f"Bearer {access_token}",
"Accept": "application/json",
"Content-Type": "application/json"
}
invoice_data = {
"Line": [
{
"Amount": 100.00,
"DetailType": "SalesItemLineDetail",
"SalesItemLineDetail": {
"ItemRef": {"value": "1"}
}
}
],
"CustomerRef": {"value": "1"}
}
response = requests.post(url, json=invoice_data, headers=headers)
if response.status_code == 200:
invoice = response.json()['Invoice']
print(f"Invoice created: {invoice['Id']}")
else:
print(f"Error: {response.status_code} - {response.text}")
Two types of updates:
1. Full Update: All writable fields must be included. Omitted fields are set to NULL.
2. Sparse Update: Only specified fields are updated. Set
in request body.
Important: Always include
from the latest read response. This prevents concurrent modification conflicts.
Python Example - Sparse Update Customer Email:
python
import requests
def sparse_update_customer(realm_id, customer_id, sync_token, new_email, access_token):
url = f"https://sandbox-quickbooks.api.intuit.com/v3/company/{realm_id}/customer"
headers = {
"Authorization": f"Bearer {access_token}",
"Accept": "application/json",
"Content-Type": "application/json"
}
# Sparse update - only updating email
customer_data = {
"Id": customer_id,
"SyncToken": sync_token,
"sparse": True,
"PrimaryEmailAddr": {
"Address": new_email
}
}
response = requests.post(url, json=customer_data, headers=headers)
if response.status_code == 200:
updated_customer = response.json()['Customer']
print(f"Customer updated, new SyncToken: {updated_customer['SyncToken']}")
return updated_customer
else:
print(f"Update failed: {response.text}")
return None
SyncToken Handling:
单个实体:
GET /v3/company/{realmId}/{entityName}/{entityId}
Node.js示例 - 查询客户:
javascript
const axios = require('axios');
async function readCustomer(realmId, customerId, accessToken) {
const url = `https://sandbox-quickbooks.api.intuit.com/v3/company/${realmId}/customer/${customerId}`;
try {
const response = await axios.get(url, {
headers: {
'Authorization': `Bearer ${accessToken}`,
'Accept': 'application/json'
}
});
return response.data.Customer;
} catch (error) {
if (error.response && error.response.status === 401) {
// Token expired, refresh and retry
console.error('Authentication failed - refresh token needed');
} else {
console.error('Read failed:', error.response?.data || error.message);
}
throw error;
}
}
1. Read entity to get latest SyncToken
更新操作
customer = read_customer(realm_id, customer_id, access_token)
两种更新类型:
1. 全量更新:必须包含所有可写字段,省略的字段会被设为NULL。
2. 稀疏更新:仅更新指定字段,需在请求体中设置
。
重要提示:始终包含最新查询响应中的
,可防止并发修改冲突。
Python示例 - 稀疏更新客户邮箱:
python
import requests
def sparse_update_customer(realm_id, customer_id, sync_token, new_email, access_token):
url = f"https://sandbox-quickbooks.api.intuit.com/v3/company/{realm_id}/customer"
headers = {
"Authorization": f"Bearer {access_token}",
"Accept": "application/json",
"Content-Type": "application/json"
}
# Sparse update - only updating email
customer_data = {
"Id": customer_id,
"SyncToken": sync_token,
"sparse": True,
"PrimaryEmailAddr": {
"Address": new_email
}
}
response = requests.post(url, json=customer_data, headers=headers)
if response.status_code == 200:
updated_customer = response.json()['Customer']
print(f"Customer updated, new SyncToken: {updated_customer['SyncToken']}")
return updated_customer
else:
print(f"Update failed: {response.text}")
return None
SyncToken处理:
2. Update with current SyncToken
1. Read entity to get latest SyncToken
updated = sparse_update_customer(
realm_id,
customer_id,
customer['SyncToken'], # Use current sync token
"newemail@example.com",
access_token
)
customer = read_customer(realm_id, customer_id, access_token)
3. Store new SyncToken for next update
2. Update with current SyncToken
new_sync_token = updated['SyncToken']
updated = sparse_update_customer(
realm_id,
customer_id,
customer['SyncToken'], # Use current sync token
"newemail@example.com",
access_token
)
Delete Operations
3. Store new SyncToken for next update
Most entities use
soft delete (setting
to false) or
void operations.
Soft Delete Pattern:
javascript
// Mark customer as inactive
const deleteCustomer = {
Id: customerId,
SyncToken: currentSyncToken,
sparse: true,
Active: false
};
// POST to update endpoint
axios.post(`${baseUrl}/customer`, deleteCustomer, { headers });
Hard Delete (limited entities):
POST /v3/company/{realmId}/{entityName}?operation=delete
json
{
"Id": "123",
"SyncToken": "2"
}
new_sync_token = updated['SyncToken']
Query Language & Filtering
删除操作
QuickBooks uses SQL-like query syntax with limitations.
大多数实体使用
软删除(将
设为false)或
作废操作。
软删除模式:
javascript
// Mark customer as inactive
const deleteCustomer = {
Id: customerId,
SyncToken: currentSyncToken,
sparse: true,
Active: false
};
// POST to update endpoint
axios.post(`${baseUrl}/customer`, deleteCustomer, { headers });
硬删除(仅部分实体支持):
POST /v3/company/{realmId}/{entityName}?operation=delete
json
{
"Id": "123",
"SyncToken": "2"
}
Basic Pattern:
SELECT * FROM {EntityName} WHERE {field} {operator} '{value}'
Endpoint:
GET /v3/company/{realmId}/query?query={sqlQuery}
QuickBooks使用类SQL的查询语法,但有一定限制。
- : Equals
- , , , : Comparison
- : Match any value in list
- : Pattern matching (only wildcard supported, no )
基础模式:
SELECT * FROM {EntityName} WHERE {field} {operator} '{value}'
端点:
GET /v3/company/{realmId}/query?query={sqlQuery}
Query customers by name:
sql
SELECT * FROM Customer WHERE DisplayName LIKE 'Acme%'
Query invoices by date range:
sql
SELECT * FROM Invoice WHERE TxnDate >= '2024-01-01' AND TxnDate <= '2024-12-31'
Query with ordering:
sql
SELECT * FROM Customer WHERE Active = true ORDERBY DisplayName
Pagination:
sql
SELECT * FROM Invoice STARTPOSITION 1 MAXRESULTS 100
Python Example - Query with Filters:
python
import requests
from urllib.parse import quote
def query_invoices_by_customer(realm_id, customer_id, access_token):
query = f"SELECT * FROM Invoice WHERE CustomerRef = '{customer_id}' ORDERBY TxnDate DESC"
encoded_query = quote(query)
url = f"https://sandbox-quickbooks.api.intuit.com/v3/company/{realm_id}/query?query={encoded_query}"
headers = {
"Authorization": f"Bearer {access_token}",
"Accept": "application/json"
}
response = requests.get(url, headers=headers)
if response.status_code == 200:
result = response.json()['QueryResponse']
invoices = result.get('Invoice', [])
print(f"Found {len(invoices)} invoices")
return invoices
else:
print(f"Query failed: {response.text}")
return []
- :等于
- 、、、:比较
- :匹配列表中的任意值
- :模式匹配(仅支持通配符,不支持)
- No wildcards except %: LIKE only supports (not )
- No JOIN operations: Query single entity at a time
- Limited functions: No aggregate functions (SUM, COUNT, etc.)
- Max 1000 results: Use pagination for larger result sets
- All fields returned: Cannot select specific fields (always returns all)
按名称查询客户:
sql
SELECT * FROM Customer WHERE DisplayName LIKE 'Acme%'
按日期范围查询发票:
sql
SELECT * FROM Invoice WHERE TxnDate >= '2024-01-01' AND TxnDate <= '2024-12-31'
排序查询:
sql
SELECT * FROM Customer WHERE Active = true ORDERBY DisplayName
分页:
sql
SELECT * FROM Invoice STARTPOSITION 1 MAXRESULTS 100
Python示例 - 带过滤的查询:
python
import requests
from urllib.parse import quote
def query_invoices_by_customer(realm_id, customer_id, access_token):
query = f"SELECT * FROM Invoice WHERE CustomerRef = '{customer_id}' ORDERBY TxnDate DESC"
encoded_query = quote(query)
url = f"https://sandbox-quickbooks.api.intuit.com/v3/company/{realm_id}/query?query={encoded_query}"
headers = {
"Authorization": f"Bearer {access_token}",
"Accept": "application/json"
}
response = requests.get(url, headers=headers)
if response.status_code == 200:
result = response.json()['QueryResponse']
invoices = result.get('Invoice', [])
print(f"Found {len(invoices)} invoices")
return invoices
else:
print(f"Query failed: {response.text}")
return []
python
def query_all_customers(realm_id, access_token):
all_customers = []
start_position = 1
max_results = 1000
while True:
query = f"SELECT * FROM Customer STARTPOSITION {start_position} MAXRESULTS {max_results}"
encoded_query = quote(query)
url = f"{base_url}/company/{realm_id}/query?query={encoded_query}"
response = requests.get(url, headers={"Authorization": f"Bearer {access_token}"})
result = response.json()['QueryResponse']
customers = result.get('Customer', [])
if not customers:
break
all_customers.extend(customers)
# Check if more results exist
if len(customers) < max_results:
break
start_position += max_results
return all_customers
- 仅支持%通配符:LIKE仅支持(不支持)
- 不支持JOIN操作:每次只能查询单个实体
- 函数支持有限:不支持聚合函数(SUM、COUNT等)
- 最多返回1000条结果:更大的结果集需要使用分页
- 返回所有字段:无法选择指定字段(始终返回全部字段)
Batch operations allow multiple API calls in a single HTTP request (up to 30 operations).
python
def query_all_customers(realm_id, access_token):
all_customers = []
start_position = 1
max_results = 1000
while True:
query = f"SELECT * FROM Customer STARTPOSITION {start_position} MAXRESULTS {max_results}"
encoded_query = quote(query)
url = f"{base_url}/company/{realm_id}/query?query={encoded_query}"
response = requests.get(url, headers={"Authorization": f"Bearer {access_token}"})
result = response.json()['QueryResponse']
customers = result.get('Customer', [])
if not customers:
break
all_customers.extend(customers)
# Check if more results exist
if len(customers) < max_results:
break
start_position += max_results
return all_customers
Batch Request Structure
批处理操作
Endpoint:
POST /v3/company/{realmId}/batch
Request Body:
json
{
"BatchItemRequest": [
{
"bId": "bid1",
"operation": "create",
"Customer": {
"DisplayName": "New Customer 1"
}
},
{
"bId": "bid2",
"operation": "update",
"Invoice": {
"Id": "123",
"SyncToken": "1",
"sparse": true,
"EmailStatus": "NeedToSend"
}
},
{
"bId": "bid3",
"operation": "query",
"Query": "SELECT * FROM Customer WHERE Active = true MAXRESULTS 10"
}
]
}
批处理操作允许在单个HTTP请求中执行多个API调用(最多30个操作)。
Each operation has a unique
(batch ID) for tracking results:
Response Structure:
json
{
"BatchItemResponse": [
{
"bId": "bid1",
"Customer": {
"Id": "456",
"DisplayName": "New Customer 1"
}
},
{
"bId": "bid2",
"Invoice": {
"Id": "123",
"SyncToken": "2"
}
},
{
"bId": "bid3",
"QueryResponse": {
"Customer": [...]
}
}
]
}
端点:
POST /v3/company/{realmId}/batch
请求体:
json
{
"BatchItemRequest": [
{
"bId": "bid1",
"operation": "create",
"Customer": {
"DisplayName": "New Customer 1"
}
},
{
"bId": "bid2",
"operation": "update",
"Invoice": {
"Id": "123",
"SyncToken": "1",
"sparse": true,
"EmailStatus": "NeedToSend"
}
},
{
"bId": "bid3",
"operation": "query",
"Query": "SELECT * FROM Customer WHERE Active = true MAXRESULTS 10"
}
]
}
Node.js Example - Batch Update Customers
批处理ID追踪
javascript
async function batchUpdateCustomers(realmId, customers, accessToken) {
const batchItems = customers.map((customer, index) => ({
bId: `customer_${index}`,
operation: 'update',
Customer: {
Id: customer.Id,
SyncToken: customer.SyncToken,
sparse: true,
Active: true // Reactivate all customers
}
}));
const url = `https://sandbox-quickbooks.api.intuit.com/v3/company/${realmId}/batch`;
try {
const response = await axios.post(url, {
BatchItemRequest: batchItems
}, {
headers: {
'Authorization': `Bearer ${accessToken}`,
'Content-Type': 'application/json'
}
});
const results = response.data.BatchItemResponse;
// Process results by batch ID
results.forEach(result => {
if (result.Fault) {
console.error(`Error for ${result.bId}:`, result.Fault);
} else {
console.log(`Success for ${result.bId}: Customer ${result.Customer.Id}`);
}
});
return results;
} catch (error) {
console.error('Batch operation failed:', error.response?.data || error.message);
throw error;
}
}
响应结构:
json
{
"BatchItemResponse": [
{
"bId": "bid1",
"Customer": {
"Id": "456",
"DisplayName": "New Customer 1"
}
},
{
"bId": "bid2",
"Invoice": {
"Id": "123",
"SyncToken": "2"
}
},
{
"bId": "bid3",
"QueryResponse": {
"Customer": [...]
}
}
]
}
Benefits of Batch Operations
Node.js示例 - 批量更新客户
- Reduced API calls: 30 operations in one request vs 30 separate requests
- Lower latency: Single round-trip instead of multiple
- Rate limit friendly: Counts as single API call for rate limiting
- Atomic per operation: Each operation succeeds or fails independently
javascript
async function batchUpdateCustomers(realmId, customers, accessToken) {
const batchItems = customers.map((customer, index) => ({
bId: `customer_${index}`,
operation: 'update',
Customer: {
Id: customer.Id,
SyncToken: customer.SyncToken,
sparse: true,
Active: true // Reactivate all customers
}
}));
const url = `https://sandbox-quickbooks.api.intuit.com/v3/company/${realmId}/batch`;
try {
const response = await axios.post(url, {
BatchItemRequest: batchItems
}, {
headers: {
'Authorization': `Bearer ${accessToken}`,
'Content-Type': 'application/json'
}
});
const results = response.data.BatchItemResponse;
// Process results by batch ID
results.forEach(result => {
if (result.Fault) {
console.error(`Error for ${result.bId}:`, result.Fault);
} else {
console.log(`Success for ${result.bId}: Customer ${result.Customer.Id}`);
}
});
return results;
} catch (error) {
console.error('Batch operation failed:', error.response?.data || error.message);
throw error;
}
}
Batch Operation Types
批处理操作的优势
- : Create new entity
- : Update existing entity
- : Delete entity
- : Execute query
- 减少API调用次数:一次请求完成30个操作,无需发起30次独立请求
- 降低延迟:仅需一次往返,无需多次请求
- 友好的速率限制:仅计为1次API调用,不占用过多速率配额
- 操作级原子性:每个操作独立成功或失败,互不影响
Error Handling & Troubleshooting
批处理操作类型
- 200 OK: Request successful (but may contain element in body)
- 400 Bad Request: Invalid syntax or malformed request
- 401 Unauthorized: Invalid/expired access token
- 403 Forbidden: Insufficient permissions or restricted resource
- 404 Not Found: Resource doesn't exist
- 429 Too Many Requests: Rate limit exceeded
- 500 Internal Server Error: Server-side issue (retry once)
- 503 Service Unavailable: Service temporarily unavailable (retry with backoff)
- :创建新实体
- :更新现有实体
- :删除实体
- :执行查询
Even with 200 OK, response may contain fault element:
json
{
"Fault": {
"Error": [
{
"Message": "Duplicate Name Exists Error",
"Detail": "The name supplied already exists.",
"code": "6240",
"element": "Customer.DisplayName"
}
],
"type": "ValidationFault"
},
"time": "2024-12-09T10:30:00.000-08:00"
}
Fault Types:
-
ValidationFault: Invalid request data or business rule violation
- Fix: Correct request payload, check required fields
-
SystemFault: Server-side error
- Fix: Retry request, contact support if persists
-
AuthenticationFault: Invalid credentials
- Fix: Refresh access token, re-authenticate
-
AuthorizationFault: Insufficient permissions
- Fix: Check OAuth scopes, ensure user has admin access
- 200 OK:请求成功(但响应体中可能包含元素)
- 400 Bad Request:无效语法或请求格式错误
- 401 Unauthorized:访问令牌无效/过期
- 403 Forbidden:权限不足或资源受限
- 404 Not Found:资源不存在
- 429 Too Many Requests:超出速率限制
- 500 Internal Server Error:服务端问题(可重试一次)
- 503 Service Unavailable:服务暂时不可用(使用退避策略重试)
| Code | Error | Solution |
|---|
| 6000 | Business validation error | Check TotalAmt and required fields |
| 3200 | Stale object (SyncToken mismatch) | Re-read entity to get latest SyncToken |
| 3100 | Invalid reference | Verify referenced entity exists (CustomerRef, ItemRef) |
| 6240 | Duplicate name | Use unique DisplayName for Customer/Item |
| 610 | Object not found | Check entity ID exists |
| 4001 | Invalid token | Refresh access token |
即使返回200 OK,响应中也可能包含错误元素:
json
{
"Fault": {
"Error": [
{
"Message": "Duplicate Name Exists Error",
"Detail": "The name supplied already exists.",
"code": "6240",
"element": "Customer.DisplayName"
}
],
"type": "ValidationFault"
},
"time": "2024-12-09T10:30:00.000-08:00"
}
错误类型:
-
ValidationFault:无效请求数据或违反业务规则
-
SystemFault:服务端错误
-
AuthenticationFault:无效凭证
-
AuthorizationFault:权限不足
- 解决方案:检查OAuth权限范围,确认用户拥有管理员权限
Exception Handling by SDK
常见错误码
Java SDK Exceptions:
- : Validation faults
- : Service faults
- : Authentication faults
- : 400 status
- : 401 status
- : 500 status
Python Exception Handling:
python
from intuitlib.exceptions import AuthClientError
try:
response = requests.post(url, json=data, headers=headers)
response.raise_for_status()
# Check for fault in response body
result = response.json()
if 'Fault' in result:
fault = result['Fault']
print(f"Fault Type: {fault['type']}")
for error in fault['Error']:
print(f" Code {error['code']}: {error['Message']}")
print(f" Element: {error.get('element', 'N/A')}")
return None
return result
except requests.exceptions.HTTPError as e:
if e.response.status_code == 401:
# Token expired, refresh
print("Token expired, refreshing...")
# Implement token refresh logic
elif e.response.status_code == 429:
# Rate limited, implement backoff
print("Rate limited, backing off...")
else:
print(f"HTTP Error: {e.response.status_code}")
print(f"Response: {e.response.text}")
except AuthClientError as e:
print(f"Auth error: {str(e)}")
| 代码 | 错误描述 | 解决方案 |
|---|
| 6000 | 业务验证错误 | 检查TotalAmt和必填字段 |
| 3200 | 对象过期(SyncToken不匹配) | 重新查询实体获取最新的SyncToken |
| 3100 | 无效引用 | 验证引用的实体是否存在(CustomerRef、ItemRef) |
| 6240 | 名称重复 | 为客户/商品使用唯一的DisplayName |
| 610 | 对象不存在 | 检查实体ID是否存在 |
| 4001 | 令牌无效 | 刷新访问令牌 |
Debugging Strategies
SDK异常处理
- Check response body even with 200: Fault elements can appear in successful responses
- Log intuit_tid: Include in support requests for faster resolution
- Validate SyncToken: Always use latest version from read operations
- Test in sandbox first: Use sandbox companies for development
- Implement retry logic: Exponential backoff for 500/503 errors
- Parse error details: Check , , fields
Java SDK异常:
- :验证错误
- :服务错误
- :身份验证错误
- :400状态码
- :401状态码
- :500状态码
Python异常处理:
python
from intuitlib.exceptions import AuthClientError
try:
response = requests.post(url, json=data, headers=headers)
response.raise_for_status()
# Check for fault in response body
result = response.json()
if 'Fault' in result:
fault = result['Fault']
print(f"Fault Type: {fault['type']}")
for error in fault['Error']:
print(f" Code {error['code']}: {error['Message']}")
print(f" Element: {error.get('element', 'N/A')}")
return None
return result
except requests.exceptions.HTTPError as e:
if e.response.status_code == 401:
# Token expired, refresh
print("Token expired, refreshing...")
# Implement token refresh logic
elif e.response.status_code == 429:
# Rate limited, implement backoff
print("Rate limited, backing off...")
else:
print(f"HTTP Error: {e.response.status_code}")
print(f"Response: {e.response.text}")
except AuthClientError as e:
print(f"Auth error: {str(e)}")
Retry Pattern with Exponential Backoff
调试策略
javascript
async function apiCallWithRetry(apiFunction, maxRetries = 3) {
for (let attempt = 0; attempt < maxRetries; attempt++) {
try {
return await apiFunction();
} catch (error) {
const status = error.response?.status;
// Retry on server errors
if (status >= 500 && status < 600 && attempt < maxRetries - 1) {
const delay = Math.pow(2, attempt) * 1000; // 1s, 2s, 4s
console.log(`Attempt ${attempt + 1} failed, retrying in ${delay}ms...`);
await new Promise(resolve => setTimeout(resolve, delay));
continue;
}
// Don't retry on client errors
throw error;
}
}
}
- 即使返回200也要检查响应体:成功响应中也可能包含错误元素
- 记录intuit_tid:在提交支持请求时包含该ID可加快解决速度
- 验证SyncToken:始终使用查询操作返回的最新版本
- 优先在沙箱中测试:使用沙箱公司进行开发
- 实现重试逻辑:对500/503错误使用指数退避策略
- 解析错误详情:检查、、字段
Change Detection & Webhooks
指数退避重试模式
Change Data Capture (CDC)
CDC returns entities that changed within a specified timeframe (up to 30 days).
Endpoint:
GET /v3/company/{realmId}/cdc?entities={entityList}&changedSince={dateTime}
Parameters:
- : Comma-separated list (e.g., "Invoice,Customer,Payment")
- : ISO 8601 timestamp (e.g., "2024-12-01T09:00:00-07:00")
Python Example:
python
from datetime import datetime, timedelta
from urllib.parse import urlencode
def get_changed_entities(realm_id, entity_types, since_datetime, access_token):
# Format: 2024-12-01T09:00:00-07:00
changed_since = since_datetime.strftime('%Y-%m-%dT%H:%M:%S-07:00')
params = {
'entities': ','.join(entity_types),
'changedSince': changed_since
}
url = f"https://sandbox-quickbooks.api.intuit.com/v3/company/{realm_id}/cdc"
response = requests.get(
url,
params=params,
headers={"Authorization": f"Bearer {access_token}"}
)
if response.status_code == 200:
cdc_response = response.json()['CDCResponse']
# Process changed entities
for query_response in cdc_response:
entity_type = query_response.get('QueryResponse', [{}])[0]
for entity_name, entities in entity_type.items():
if entities:
for entity in entities:
status = entity.get('status', 'Updated')
if status == 'Deleted':
print(f"Deleted {entity_name}: {entity['Id']}")
else:
print(f"Changed {entity_name}: {entity['Id']}")
return cdc_response
else:
print(f"CDC request failed: {response.text}")
return None
javascript
async function apiCallWithRetry(apiFunction, maxRetries = 3) {
for (let attempt = 0; attempt < maxRetries; attempt++) {
try {
return await apiFunction();
} catch (error) {
const status = error.response?.status;
// Retry on server errors
if (status >= 500 && status < 600 && attempt < maxRetries - 1) {
const delay = Math.pow(2, attempt) * 1000; // 1s, 2s, 4s
console.log(`Attempt ${attempt + 1} failed, retrying in ${delay}ms...`);
await new Promise(resolve => setTimeout(resolve, delay));
continue;
}
// Don't retry on client errors
throw error;
}
}
}
Usage: Get all invoices and customers changed in last 24 hours
变更检测与Webhooks
since = datetime.now() - timedelta(hours=24)
changes = get_changed_entities(realm_id, ['Invoice', 'Customer'], since, access_token)
**Response Structure**:
```json
{
"CDCResponse": [
{
"QueryResponse": [
{
"Invoice": [
{
"Id": "123",
"MetaData": {
"LastUpdatedTime": "2024-12-09T10:30:00-08:00"
},
"TotalAmt": 100.00,
"Balance": 50.00
// ... full invoice object
}
]
}
]
},
{
"QueryResponse": [
{
"Customer": [
{
"Id": "456",
"status": "Deleted"
}
]
}
]
}
],
"time": "2024-12-09T11:00:00.000-08:00"
}
CDC返回指定时间范围内(最多30天)发生变更的实体。
端点:
GET /v3/company/{realmId}/cdc?entities={entityList}&changedSince={dateTime}
参数:
- :逗号分隔的实体列表(例如:"Invoice,Customer,Payment")
- :ISO 8601格式的时间戳(例如:"2024-12-01T09:00:00-07:00")
Python示例:
python
from datetime import datetime, timedelta
from urllib.parse import urlencode
def get_changed_entities(realm_id, entity_types, since_datetime, access_token):
# Format: 2024-12-01T09:00:00-07:00
changed_since = since_datetime.strftime('%Y-%m-%dT%H:%M:%S-07:00')
params = {
'entities': ','.join(entity_types),
'changedSince': changed_since
}
url = f"https://sandbox-quickbooks.api.intuit.com/v3/company/{realm_id}/cdc"
response = requests.get(
url,
params=params,
headers={"Authorization": f"Bearer {access_token}"}
)
if response.status_code == 200:
cdc_response = response.json()['CDCResponse']
# Process changed entities
for query_response in cdc_response:
entity_type = query_response.get('QueryResponse', [{}])[0]
for entity_name, entities in entity_type.items():
if entities:
for entity in entities:
status = entity.get('status', 'Updated')
if status == 'Deleted':
print(f"Deleted {entity_name}: {entity['Id']}")
else:
print(f"Changed {entity_name}: {entity['Id']}")
return cdc_response
else:
print(f"CDC request failed: {response.text}")
return None
CDC Best Practices
Usage: Get all invoices and customers changed in last 24 hours
- Query shorter periods: Max 1000 entities per response, use hourly/daily checks
- Store last sync time: Track to set parameter
- Handle deletes: Entities with only contain ID
- Fetch full entity: CDC returns full payload (not just changes)
- Combine with webhooks: Use webhooks for real-time, CDC as backup
since = datetime.now() - timedelta(hours=24)
changes = get_changed_entities(realm_id, ['Invoice', 'Customer'], since, access_token)
**响应结构**:
```json
{
"CDCResponse": [
{
"QueryResponse": [
{
"Invoice": [
{
"Id": "123",
"MetaData": {
"LastUpdatedTime": "2024-12-09T10:30:00-08:00"
},
"TotalAmt": 100.00,
"Balance": 50.00
// ... full invoice object
}
]
}
]
},
{
"QueryResponse": [
{
"Customer": [
{
"Id": "456",
"status": "Deleted"
}
]
}
]
}
],
"time": "2024-12-09T11:00:00.000-08:00"
}
Webhooks (Real-time Notifications)
CDC最佳实践
Webhooks send HTTP POST notifications when data changes.
Setup:
- Configure webhook URL in developer dashboard
- Implement POST endpoint to receive notifications
- Return 200 OK within 1 second
- Process notification asynchronously
Notification Payload:
json
{
"eventNotifications": [
{
"realmId": "123456789",
"dataChangeEvent": {
"entities": [
{
"name": "Invoice",
"id": "145",
"operation": "Create",
"lastUpdated": "2024-12-09T10:30:00.000Z"
},
{
"name": "Payment",
"id": "456",
"operation": "Update",
"lastUpdated": "2024-12-09T10:31:00.000Z"
},
{
"name": "Customer",
"id": "789",
"operation": "Merge",
"lastUpdated": "2024-12-09T10:32:00.000Z",
"deletedId": "788"
}
]
}
}
]
}
Node.js Webhook Handler:
javascript
const express = require('express');
const crypto = require('crypto');
const app = express();
app.use(express.json());
// Webhook endpoint
app.post('/webhooks/quickbooks', async (req, res) => {
// Verify webhook signature (recommended)
const signature = req.headers['intuit-signature'];
const payload = JSON.stringify(req.body);
// Return 200 immediately (process async)
res.status(200).send('OK');
// Process notifications asynchronously
processWebhook(req.body).catch(console.error);
});
async function processWebhook(notification) {
for (const event of notification.eventNotifications) {
const realmId = event.realmId;
for (const entity of event.dataChangeEvent.entities) {
console.log(`${entity.operation} on ${entity.name} ID ${entity.id}`);
// Fetch full entity data
if (entity.operation !== 'Delete') {
await fetchAndProcessEntity(realmId, entity.name, entity.id);
} else {
await handleEntityDeletion(realmId, entity.name, entity.id);
}
}
}
}
async function fetchAndProcessEntity(realmId, entityType, entityId) {
// Fetch full entity using read endpoint
const url = `https://quickbooks.api.intuit.com/v3/company/${realmId}/${entityType.toLowerCase()}/${entityId}`;
// ... implement fetch and processing logic
}
- 查询较短的时间范围:每次响应最多返回1000个实体,使用每小时/每日检查
- 存储上次同步时间:记录来设置参数
- 处理删除操作:标记为的实体仅包含ID
- 获取完整实体:CDC返回完整payload(不仅是变更部分)
- 与webhook结合使用:用webhook实现实时通知,CDC作为备份
Webhook vs CDC Decision Matrix
Webhooks(实时通知)
| Use Case | Recommendation |
|---|
| Real-time sync | Webhooks |
| Periodic sync (hourly/daily) | CDC |
| Initial data load | CDC |
| Reconnection after downtime | CDC |
| High-volume changes | CDC (reduces notification overhead) |
| Low-latency requirements | Webhooks |
| Backup/redundancy | Both (webhooks primary, CDC backup) |
Webhooks会在数据变更时发送HTTP POST通知。
配置步骤:
- 在开发者控制台配置webhook URL
- 实现POST端点接收通知
- 1秒内返回200 OK
- 异步处理通知
通知Payload:
json
{
"eventNotifications": [
{
"realmId": "123456789",
"dataChangeEvent": {
"entities": [
{
"name": "Invoice",
"id": "145",
"operation": "Create",
"lastUpdated": "2024-12-09T10:30:00.000Z"
},
{
"name": "Payment",
"id": "456",
"operation": "Update",
"lastUpdated": "2024-12-09T10:31:00.000Z"
},
{
"name": "Customer",
"id": "789",
"operation": "Merge",
"lastUpdated": "2024-12-09T10:32:00.000Z",
"deletedId": "788"
}
]
}
}
]
}
Node.js Webhook处理器:
javascript
const express = require('express');
const crypto = require('crypto');
const app = express();
app.use(express.json());
// Webhook endpoint
app.post('/webhooks/quickbooks', async (req, res) => {
// Verify webhook signature (recommended)
const signature = req.headers['intuit-signature'];
const payload = JSON.stringify(req.body);
// Return 200 immediately (process async)
res.status(200).send('OK');
// Process notifications asynchronously
processWebhook(req.body).catch(console.error);
});
async function processWebhook(notification) {
for (const event of notification.eventNotifications) {
const realmId = event.realmId;
for (const entity of event.dataChangeEvent.entities) {
console.log(`${entity.operation} on ${entity.name} ID ${entity.id}`);
// Fetch full entity data
if (entity.operation !== 'Delete') {
await fetchAndProcessEntity(realmId, entity.name, entity.id);
} else {
await handleEntityDeletion(realmId, entity.name, entity.id);
}
}
}
}
async function fetchAndProcessEntity(realmId, entityType, entityId) {
// Fetch full entity using read endpoint
const url = `https://quickbooks.api.intuit.com/v3/company/${realmId}/${entityType.toLowerCase()}/${entityId}`;
// ... implement fetch and processing logic
}
Combined Approach Pattern
Webhook与CDC选择矩阵
python
class QuickBooksSync:
def __init__(self):
self.last_cdc_sync = self.load_last_sync_time()
def handle_webhook(self, notification):
"""Process real-time webhook"""
for entity in notification['dataChangeEvent']['entities']:
self.process_entity_change(entity)
# Update last known change time
self.last_cdc_sync = datetime.now()
self.save_last_sync_time()
def periodic_cdc_sync(self):
"""Catch any missed changes"""
changes = get_changed_entities(
self.realm_id,
['Invoice', 'Customer', 'Payment'],
self.last_cdc_sync,
self.access_token
)
for entity in self.extract_entities(changes):
if not self.entity_exists_locally(entity):
# Missed by webhook, process now
self.process_entity_change(entity)
self.last_cdc_sync = datetime.now()
self.save_last_sync_time()
| 场景 | 推荐方案 |
|---|
| 实时同步 | Webhooks |
| 定期同步(每小时/每日) | CDC |
| 初始数据加载 | CDC |
| 停机后重连 | CDC |
| 高频率变更 | CDC(减少通知开销) |
| 低延迟要求 | Webhooks |
| 备份/冗余 | 两者结合(Webhook为主,CDC为备份) |
-
Use batch operations for bulk changes
- Combine up to 30 operations in single request
- Reduces API calls and improves throughput
- Example: Batch update 30 customers vs 30 individual updates
-
Implement CDC or webhooks for syncing
- Avoid polling all entities repeatedly
- CDC returns only changed entities
- Webhooks provide real-time notifications without polling
-
Sparse updates minimize payload
- Only send fields being changed
- Reduces data transfer and processing time
- Prevents accidental field overwrites
-
Cache reference data locally
- Payment methods, tax codes, accounts rarely change
- Query once and cache with TTL
- Reduces redundant API calls
-
Paginate large result sets
- Use MAXRESULTS to limit query results
- Process in batches to avoid memory issues
- Example: Query 100 customers at a time
python
class QuickBooksSync:
def __init__(self):
self.last_cdc_sync = self.load_last_sync_time()
def handle_webhook(self, notification):
"""Process real-time webhook"""
for entity in notification['dataChangeEvent']['entities']:
self.process_entity_change(entity)
# Update last known change time
self.last_cdc_sync = datetime.now()
self.save_last_sync_time()
def periodic_cdc_sync(self):
"""Catch any missed changes"""
changes = get_changed_entities(
self.realm_id,
['Invoice', 'Customer', 'Payment'],
self.last_cdc_sync,
self.access_token
)
for entity in self.extract_entities(changes):
if not self.entity_exists_locally(entity):
# Missed by webhook, process now
self.process_entity_change(entity)
self.last_cdc_sync = datetime.now()
self.save_last_sync_time()
-
Always use SyncToken for updates
- Prevents concurrent modification conflicts
- Read entity before update to get latest token
- Handle 3200 errors by re-reading and retrying
-
Handle concurrent modifications gracefully
python
def safe_update(realm_id, customer_id, changes, access_token):
max_attempts = 3
for attempt in range(max_attempts):
# Read latest version
customer = read_customer(realm_id, customer_id, access_token)
# Apply changes
customer.update(changes)
customer['sparse'] = True
# Attempt update
try:
return update_customer(realm_id, customer, access_token)
except SyncTokenError:
if attempt == max_attempts - 1:
raise
continue # Retry with fresh SyncToken
-
Validate required fields before API calls
- Check business rules locally first
- Reduces validation errors from API
- Example: Verify customer exists before creating invoice
-
Use webhooks + CDC for reliable tracking
- Webhooks for real-time updates
- Periodic CDC as backup for missed changes
- Store last sync timestamp
-
批量变更使用批处理操作
- 单次请求最多合并30个操作
- 减少API调用次数,提升吞吐量
- 示例:批量更新30个客户,而不是发起30次独立更新
-
使用CDC或webhook进行同步
- 避免反复轮询所有实体
- CDC仅返回变更的实体
- Webhooks无需轮询即可提供实时通知
-
稀疏更新最小化payload
- 仅发送需要变更的字段
- 减少数据传输和处理时间
- 防止意外覆盖字段
-
本地缓存参考数据
- 支付方式、税码、账户很少变更
- 查询一次后设置TTL缓存
- 减少冗余API调用
-
大结果集分页处理
- 使用MAXRESULTS限制查询返回数量
- 分批处理避免内存问题
- 示例:每次查询100个客户
-
Access tokens expire after 3600 seconds
- Set up automatic refresh before expiration
- Refresh at 50-minute mark to be safe
-
Refresh tokens proactively
javascript
class TokenManager {
constructor() {
this.refreshTimer = null;
}
scheduleRefresh(expiresIn) {
// Refresh 5 minutes before expiration
const refreshTime = (expiresIn - 300) * 1000;
this.refreshTimer = setTimeout(() => {
this.refreshAccessToken();
}, refreshTime);
}
async refreshAccessToken() {
try {
const newTokens = await oauthClient.refresh();
this.storeTokens(newTokens);
this.scheduleRefresh(newTokens.expires_in);
} catch (error) {
// Refresh failed, need re-authentication
this.handleReauthentication();
}
}
}
-
Always use latest refresh token
- Previous refresh tokens expire 24 hours after new one issued
- Store refresh token immediately after refresh
- Never use old refresh tokens
-
Store tokens securely
- Encrypt in database
- Never commit to version control
- Use environment variables for development
-
Handle 401 responses automatically
python
def api_call_with_auto_refresh(api_function):
try:
return api_function()
except Unauthorized401Error:
# Attempt token refresh
refresh_tokens()
# Retry with new token
return api_function()
-
更新操作始终使用SyncToken
- 防止并发修改冲突
- 更新前先查询实体获取最新令牌
- 遇到3200错误时重新查询并重试
-
优雅处理并发修改
python
def safe_update(realm_id, customer_id, changes, access_token):
max_attempts = 3
for attempt in range(max_attempts):
# Read latest version
customer = read_customer(realm_id, customer_id, access_token)
# Apply changes
customer.update(changes)
customer['sparse'] = True
# Attempt update
try:
return update_customer(realm_id, customer, access_token)
except SyncTokenError:
if attempt == max_attempts - 1:
raise
continue # Retry with fresh SyncToken
-
API调用前验证必填字段
- 优先在本地检查业务规则
- 减少API返回的验证错误
- 示例:创建发票前验证客户是否存在
-
使用webhook + CDC实现可靠追踪
- Webhook用于实时更新
- 定期CDC作为遗漏变更的备份
- 存储上次同步时间戳
-
Implement exponential backoff for 429
python
def call_with_rate_limit_handling(api_function):
max_retries = 5
base_delay = 1
for attempt in range(max_retries):
try:
return api_function()
except RateLimitError as e:
if attempt == max_retries - 1:
raise
delay = base_delay * (2 ** attempt) # 1s, 2s, 4s, 8s, 16s
time.sleep(delay)
continue
-
Use batch operations to reduce call count
- 1 batch request vs 30 individual = 30x reduction
- Batch counts as single API call for rate limits
-
Monitor rate limit headers (if provided)
- Some endpoints return rate limit info in headers
- Track usage to stay within limits
-
访问令牌3600秒后过期
- 设置自动刷新机制,在过期前刷新
- 安全起见可在使用50分钟后刷新
-
主动刷新令牌
javascript
class TokenManager {
constructor() {
this.refreshTimer = null;
}
scheduleRefresh(expiresIn) {
// Refresh 5 minutes before expiration
const refreshTime = (expiresIn - 300) * 1000;
this.refreshTimer = setTimeout(() => {
this.refreshAccessToken();
}, refreshTime);
}
async refreshAccessToken() {
try {
const newTokens = await oauthClient.refresh();
this.storeTokens(newTokens);
this.scheduleRefresh(newTokens.expires_in);
} catch (error) {
// Refresh failed, need re-authentication
this.handleReauthentication();
}
}
}
-
始终使用最新的刷新令牌
- 旧刷新令牌会在新令牌签发后24小时过期
- 刷新后立即存储新的刷新令牌
- 不要使用旧的刷新令牌
-
安全存储令牌
- 加密存储在数据库中
- 绝对不要提交到版本控制系统
- 开发环境使用环境变量
-
自动处理401响应
python
def api_call_with_auto_refresh(api_function):
try:
return api_function()
except Unauthorized401Error:
# Attempt token refresh
refresh_tokens()
# Retry with new token
return api_function()
Multi-currency Considerations
API速率限制
-
CurrencyRef required when multicurrency enabled
json
{
"Invoice": {
"CurrencyRef": {
"value": "USD",
"name": "United States Dollar"
}
}
}
-
Exchange rate handling
- API automatically applies exchange rates
- ExchangeRate field shows conversion rate used
- Home currency amounts calculated automatically
-
Locale-specific required fields
- France: DocNumber required if custom transaction numbers enabled
- UK: Different tax handling (VAT)
- Check locale-specific documentation
-
对429错误实现指数退避
python
def call_with_rate_limit_handling(api_function):
max_retries = 5
base_delay = 1
for attempt in range(max_retries):
try:
return api_function()
except RateLimitError as e:
if attempt == max_retries - 1:
raise
delay = base_delay * (2 ** attempt) # 1s, 2s, 4s, 8s, 16s
time.sleep(delay)
continue
-
使用批处理操作减少调用次数
- 1次批处理请求相当于30次独立请求,减少30倍调用量
- 批处理仅计为1次API速率调用
-
监控速率限制头(如有提供)
- 部分端点会在响应头中返回速率限制信息
- 跟踪使用情况,避免超出限制
Testing & Development
多币种注意事项
-
Use sandbox companies (free with developer account)
- Create at developer.intuit.com
- Separate from production data
- Full API feature parity
-
Test OAuth flow end-to-end
- Authorization URL → code exchange → token refresh
- Test token expiration handling
- Verify refresh token rotation
-
Validate webhook endpoint
- Test with sample payloads
- Ensure < 1 second response time
- Handle webhook signature verification
-
Handle all fault types in production
- ValidationFault, SystemFault, AuthenticationFault, AuthorizationFault
- Log error details (code, message, element)
- Implement appropriate retry logic
-
Monitor API calls and errors
- Track success/failure rates
- Alert on elevated error rates
- Log intuit_tid for support requests
-
启用多币种时必须提供CurrencyRef
json
{
"Invoice": {
"CurrencyRef": {
"value": "USD",
"name": "United States Dollar"
}
}
}
-
汇率处理
- API会自动应用汇率
- ExchangeRate字段显示使用的转换汇率
- 本位币金额会自动计算
-
特定区域的必填字段
- 法国:如果启用自定义交易编号,则DocNumber为必填
- 英国:不同的税务处理(VAT)
- 查阅对应区域的文档
Workflow 1: Create and Send Invoice
Scenario: Create an invoice for a customer and send via email.
Steps:
- Query or create customer
-
使用沙箱公司(开发者账号免费提供)
- 在developer.intuit.com创建
- 与生产数据隔离
- 完整的API功能一致性
-
端到端测试OAuth流程
- 授权URL → 码交换 → 令牌刷新
- 测试令牌过期处理
- 验证刷新令牌轮换机制
-
验证webhook端点
- 使用示例payload测试
- 确保响应时间小于1秒
- 处理webhook签名验证
-
生产环境处理所有错误类型
- ValidationFault、SystemFault、AuthenticationFault、AuthorizationFault
- 记录错误详情(代码、消息、元素)
- 实现合适的重试逻辑
-
监控API调用和错误
- 跟踪成功/失败率
- 错误率升高时发出告警
- 记录intuit_tid用于支持请求
Check if customer exists
常见工作流
customers = query_customers_by_email(realm_id, "customer@example.com", access_token)
if not customers:
# Create new customer
customer = create_customer(realm_id, {
"DisplayName": "Acme Corp",
"PrimaryEmailAddr": {"Address": "customer@example.com"},
"BillAddr": {
"Line1": "123 Main St",
"City": "San Francisco",
"CountrySubDivisionCode": "CA",
"PostalCode": "94105"
}
}, access_token)
else:
customer = customers[0]
customer_id = customer['Id']
2. **Query items for line items**
```python
场景:为客户创建发票并通过邮件发送。
步骤:
- 查询或创建客户
Get service item
Check if customer exists
query = "SELECT * FROM Item WHERE Type = 'Service' AND Name = 'Consulting'"
items = query_entity(realm_id, query, access_token)
service_item = items[0]
3. **Create invoice with line items**
```python
invoice_data = {
"TxnDate": "2024-12-09",
"DueDate": "2024-12-23",
"CustomerRef": {"value": customer_id},
"BillEmail": {"Address": "customer@example.com"},
"EmailStatus": "NeedToSend", # Mark for email sending
"Line": [
{
"Amount": 1500.00,
"DetailType": "SalesItemLineDetail",
"SalesItemLineDetail": {
"ItemRef": {"value": service_item['Id']},
"Qty": 10,
"UnitPrice": 150.00,
"TaxCodeRef": {"value": "NON"} # Non-taxable
},
"Description": "Consulting services - December 2024"
},
{
"Amount": 1500.00,
"DetailType": "SubTotalLineDetail",
"SubTotalLineDetail": {}
}
]
}
invoice = create_invoice(realm_id, invoice_data, access_token)
print(f"Invoice {invoice['DocNumber']} created: ${invoice['TotalAmt']}")
- Send invoice email (automatic if EmailStatus = "NeedToSend")
customers = query_customers_by_email(realm_id, "customer@example.com", access_token)
if not customers:
# Create new customer
customer = create_customer(realm_id, {
"DisplayName": "Acme Corp",
"PrimaryEmailAddr": {"Address": "customer@example.com"},
"BillAddr": {
"Line1": "123 Main St",
"City": "San Francisco",
"CountrySubDivisionCode": "CA",
"PostalCode": "94105"
}
}, access_token)
else:
customer = customers[0]
customer_id = customer['Id']
2. **查询明细项对应的商品**
```python
QuickBooks automatically sends email when EmailStatus is NeedToSend
Get service item
Alternatively, use send endpoint:
send_url = f"{base_url}/company/{realm_id}/invoice/{invoice['Id']}/send"
params = {"sendTo": "customer@example.com"}
response = requests.post(send_url, params=params, headers=headers)
if response.status_code == 200:
print(f"Invoice sent to {customer['PrimaryEmailAddr']['Address']}")
5. **Handle response and linked transactions**
```python
query = "SELECT * FROM Item WHERE Type = 'Service' AND Name = 'Consulting'"
items = query_entity(realm_id, query, access_token)
service_item = items[0]
3. **创建带明细项的发票**
```python
invoice_data = {
"TxnDate": "2024-12-09",
"DueDate": "2024-12-23",
"CustomerRef": {"value": customer_id},
"BillEmail": {"Address": "customer@example.com"},
"EmailStatus": "NeedToSend", # Mark for email sending
"Line": [
{
"Amount": 1500.00,
"DetailType": "SalesItemLineDetail",
"SalesItemLineDetail": {
"ItemRef": {"value": service_item['Id']},
"Qty": 10,
"UnitPrice": 150.00,
"TaxCodeRef": {"value": "NON"} # Non-taxable
},
"Description": "Consulting services - December 2024"
},
{
"Amount": 1500.00,
"DetailType": "SubTotalLineDetail",
"SubTotalLineDetail": {}
}
]
}
invoice = create_invoice(realm_id, invoice_data, access_token)
print(f"Invoice {invoice['DocNumber']} created: ${invoice['TotalAmt']}")
- 发送发票邮件(如果EmailStatus = "NeedToSend"则自动发送)
Check invoice status
QuickBooks automatically sends email when EmailStatus is NeedToSend
Alternatively, use send endpoint:
print(f"Invoice ID: {invoice['Id']}")
print(f"Balance: ${invoice['Balance']}")
print(f"Email Status: {invoice['EmailStatus']}")
send_url = f"{base_url}/company/{realm_id}/invoice/{invoice['Id']}/send"
params = {"sendTo": "customer@example.com"}
response = requests.post(send_url, params=params, headers=headers)
if response.status_code == 200:
print(f"Invoice sent to {customer['PrimaryEmailAddr']['Address']}")
5. **处理响应和关联交易**
```python
Track linked transactions
Check invoice status
if 'LinkedTxn' in invoice:
for linked in invoice['LinkedTxn']:
print(f"Linked {linked['TxnType']}: {linked['TxnId']}")
print(f"Invoice ID: {invoice['Id']}")
print(f"Balance: ${invoice['Balance']}")
print(f"Email Status: {invoice['EmailStatus']}")
Workflow 2: Record Payment Against Invoice
Track linked transactions
Scenario: Customer pays an invoice via check.
Steps:
- Query invoice by DocNumber
python
def find_invoice_by_number(realm_id, doc_number, access_token):
query = f"SELECT * FROM Invoice WHERE DocNumber = '{doc_number}'"
invoices = query_entity(realm_id, query, access_token)
if not invoices:
raise ValueError(f"Invoice {doc_number} not found")
return invoices[0]
invoice = find_invoice_by_number(realm_id, "1045", access_token)
customer_id = invoice['CustomerRef']['value']
balance = invoice['Balance']
- Create payment entity
if 'LinkedTxn' in invoice:
for linked in invoice['LinkedTxn']:
print(f"Linked {linked['TxnType']}: {linked['TxnId']}")
Get payment method (Check)
工作流2:记录发票对应的付款
payment_methods = query_entity(realm_id, "SELECT * FROM PaymentMethod WHERE Name = 'Check'", access_token)
payment_method_id = payment_methods[0]['Id']
payment_data = {
"TotalAmt": balance, # Pay full amount
"CustomerRef": {"value": customer_id},
"PaymentMethodRef": {"value": payment_method_id},
"PaymentRefNum": "1234", # Check number
"TxnDate": "2024-12-09",
"Line": [
{
"Amount": balance,
"LinkedTxn": [
{
"TxnId": invoice['Id'],
"TxnType": "Invoice"
}
]
}
]
}
payment = create_payment(realm_id, payment_data, access_token)
3. **Verify balance updates**
```python
场景:客户通过支票支付发票。
步骤:
- 按DocNumber查询发票
python
def find_invoice_by_number(realm_id, doc_number, access_token):
query = f"SELECT * FROM Invoice WHERE DocNumber = '{doc_number}'"
invoices = query_entity(realm_id, query, access_token)
if not invoices:
raise ValueError(f"Invoice {doc_number} not found")
return invoices[0]
invoice = find_invoice_by_number(realm_id, "1045", access_token)
customer_id = invoice['CustomerRef']['value']
balance = invoice['Balance']
- 创建支付实体
Re-read invoice to see updated balance
Get payment method (Check)
updated_invoice = read_invoice(realm_id, invoice['Id'], access_token)
print(f"Original balance: ${balance}")
print(f"Payment amount: ${payment['TotalAmt']}")
print(f"New balance: ${updated_invoice['Balance']}")
print(f"Unapplied payment amount: ${payment.get('UnappliedAmt', 0)}")
4. **Handle partial payments**
```python
def apply_partial_payment(realm_id, invoice_id, payment_amount, customer_id, access_token):
payment_data = {
"TotalAmt": payment_amount, # Less than invoice balance
"CustomerRef": {"value": customer_id},
"Line": [
{
"Amount": payment_amount,
"LinkedTxn": [
{
"TxnId": invoice_id,
"TxnType": "Invoice"
}
]
}
]
}
payment = create_payment(realm_id, payment_data, access_token)
# Check unapplied amount
if payment['UnappliedAmt'] > 0:
print(f"Warning: ${payment['UnappliedAmt']} unapplied (overpayment or error)")
return payment
- Apply payment to multiple invoices
python
def pay_multiple_invoices(realm_id, invoice_ids, amounts, customer_id, total_paid, access_token):
lines = []
for invoice_id, amount in zip(invoice_ids, amounts):
lines.append({
"Amount": amount,
"LinkedTxn": [{
"TxnId": invoice_id,
"TxnType": "Invoice"
}]
})
payment_data = {
"TotalAmt": total_paid,
"CustomerRef": {"value": customer_id},
"Line": lines
}
return create_payment(realm_id, payment_data, access_token)
payment_methods = query_entity(realm_id, "SELECT * FROM PaymentMethod WHERE Name = 'Check'", access_token)
payment_method_id = payment_methods[0]['Id']
payment_data = {
"TotalAmt": balance, # Pay full amount
"CustomerRef": {"value": customer_id},
"PaymentMethodRef": {"value": payment_method_id},
"PaymentRefNum": "1234", # Check number
"TxnDate": "2024-12-09",
"Line": [
{
"Amount": balance,
"LinkedTxn": [
{
"TxnId": invoice['Id'],
"TxnType": "Invoice"
}
]
}
]
}
payment = create_payment(realm_id, payment_data, access_token)
Example: Pay two invoices with single check
Re-read invoice to see updated balance
payment = pay_multiple_invoices(
realm_id,
["145", "146"], # Invoice IDs
[100.00, 50.00], # Amounts applied to each
customer_id,
150.00, # Total check amount
access_token
)
updated_invoice = read_invoice(realm_id, invoice['Id'], access_token)
print(f"Original balance: ${balance}")
print(f"Payment amount: ${payment['TotalAmt']}")
print(f"New balance: ${updated_invoice['Balance']}")
print(f"Unapplied payment amount: ${payment.get('UnappliedAmt', 0)}")
4. **处理部分付款**
```python
def apply_partial_payment(realm_id, invoice_id, payment_amount, customer_id, access_token):
payment_data = {
"TotalAmt": payment_amount, # Less than invoice balance
"CustomerRef": {"value": customer_id},
"Line": [
{
"Amount": payment_amount,
"LinkedTxn": [
{
"TxnId": invoice_id,
"TxnType": "Invoice"
}
]
}
]
}
payment = create_payment(realm_id, payment_data, access_token)
# Check unapplied amount
if payment['UnappliedAmt'] > 0:
print(f"Warning: ${payment['UnappliedAmt']} unapplied (overpayment or error)")
return payment
- 将付款分配到多张发票
python
def pay_multiple_invoices(realm_id, invoice_ids, amounts, customer_id, total_paid, access_token):
lines = []
for invoice_id, amount in zip(invoice_ids, amounts):
lines.append({
"Amount": amount,
"LinkedTxn": [{
"TxnId": invoice_id,
"TxnType": "Invoice"
}]
})
payment_data = {
"TotalAmt": total_paid,
"CustomerRef": {"value": customer_id},
"Line": lines
}
return create_payment(realm_id, payment_data, access_token)
Workflow 3: Customer Management
Example: Pay two invoices with single check
Scenario: Complete customer lifecycle management.
1. Create customer with address
python
def create_customer_complete(realm_id, customer_info, access_token):
customer_data = {
"DisplayName": customer_info['display_name'],
"GivenName": customer_info.get('first_name'),
"FamilyName": customer_info.get('last_name'),
"CompanyName": customer_info.get('company_name'),
"PrimaryEmailAddr": {
"Address": customer_info['email']
},
"PrimaryPhone": {
"FreeFormNumber": customer_info.get('phone')
},
"BillAddr": {
"Line1": customer_info['address_line1'],
"City": customer_info['city'],
"CountrySubDivisionCode": customer_info['state'],
"PostalCode": customer_info['zip']
},
"ShipAddr": {
"Line1": customer_info.get('ship_line1', customer_info['address_line1']),
"City": customer_info.get('ship_city', customer_info['city']),
"CountrySubDivisionCode": customer_info.get('ship_state', customer_info['state']),
"PostalCode": customer_info.get('ship_zip', customer_info['zip'])
}
}
return create_customer(realm_id, customer_data, access_token)
2. Sparse update to modify email
python
def update_customer_email(realm_id, customer_id, new_email, access_token):
# Read current customer
customer = read_customer(realm_id, customer_id, access_token)
# Sparse update - only email
update_data = {
"Id": customer_id,
"SyncToken": customer['SyncToken'],
"sparse": True,
"PrimaryEmailAddr": {
"Address": new_email
}
}
return update_customer(realm_id, update_data, access_token)
3. Query customer transactions
python
def get_customer_transactions(realm_id, customer_id, access_token):
transactions = {}
# Query invoices
invoice_query = f"SELECT * FROM Invoice WHERE CustomerRef = '{customer_id}'"
transactions['invoices'] = query_entity(realm_id, invoice_query, access_token)
# Query payments
payment_query = f"SELECT * FROM Payment WHERE CustomerRef = '{customer_id}'"
transactions['payments'] = query_entity(realm_id, payment_query, access_token)
# Query estimates
estimate_query = f"SELECT * FROM Estimate WHERE CustomerRef = '{customer_id}'"
transactions['estimates'] = query_entity(realm_id, estimate_query, access_token)
# Calculate totals
total_invoiced = sum(inv['TotalAmt'] for inv in transactions['invoices'])
total_paid = sum(pmt['TotalAmt'] for pmt in transactions['payments'])
transactions['summary'] = {
'total_invoiced': total_invoiced,
'total_paid': total_paid,
'balance': total_invoiced - total_paid
}
return transactions
4. Update AR account reference
payment = pay_multiple_invoices(
realm_id,
["145", "146"], # Invoice IDs
[100.00, 50.00], # Amounts applied to each
customer_id,
150.00, # Total check amount
access_token
)
Change default AR account for customer
工作流3:客户管理
def update_customer_ar_account(realm_id, customer_id, new_ar_account_id, access_token):
customer = read_customer(realm_id, customer_id, access_token)
update_data = {
"Id": customer_id,
"SyncToken": customer['SyncToken'],
"sparse": True,
"ARAccountRef": {
"value": new_ar_account_id
}
}
return update_customer(realm_id, update_data, access_token)
场景:完整的客户生命周期管理。
1. 创建带地址的客户
python
def create_customer_complete(realm_id, customer_info, access_token):
customer_data = {
"DisplayName": customer_info['display_name'],
"GivenName": customer_info.get('first_name'),
"FamilyName": customer_info.get('last_name'),
"CompanyName": customer_info.get('company_name'),
"PrimaryEmailAddr": {
"Address": customer_info['email']
},
"PrimaryPhone": {
"FreeFormNumber": customer_info.get('phone')
},
"BillAddr": {
"Line1": customer_info['address_line1'],
"City": customer_info['city'],
"CountrySubDivisionCode": customer_info['state'],
"PostalCode": customer_info['zip']
},
"ShipAddr": {
"Line1": customer_info.get('ship_line1', customer_info['address_line1']),
"City": customer_info.get('ship_city', customer_info['city']),
"CountrySubDivisionCode": customer_info.get('ship_state', customer_info['state']),
"PostalCode": customer_info.get('ship_zip', customer_info['zip'])
}
}
return create_customer(realm_id, customer_data, access_token)
2. 稀疏更新修改邮箱
python
def update_customer_email(realm_id, customer_id, new_email, access_token):
# Read current customer
customer = read_customer(realm_id, customer_id, access_token)
# Sparse update - only email
update_data = {
"Id": customer_id,
"SyncToken": customer['SyncToken'],
"sparse": True,
"PrimaryEmailAddr": {
"Address": new_email
}
}
return update_customer(realm_id, update_data, access_token)
3. 查询客户交易
python
def get_customer_transactions(realm_id, customer_id, access_token):
transactions = {}
# Query invoices
invoice_query = f"SELECT * FROM Invoice WHERE CustomerRef = '{customer_id}'"
transactions['invoices'] = query_entity(realm_id, invoice_query, access_token)
# Query payments
payment_query = f"SELECT * FROM Payment WHERE CustomerRef = '{customer_id}'"
transactions['payments'] = query_entity(realm_id, payment_query, access_token)
# Query estimates
estimate_query = f"SELECT * FROM Estimate WHERE CustomerRef = '{customer_id}'"
transactions['estimates'] = query_entity(realm_id, estimate_query, access_token)
# Calculate totals
total_invoiced = sum(inv['TotalAmt'] for inv in transactions['invoices'])
total_paid = sum(pmt['TotalAmt'] for pmt in transactions['payments'])
transactions['summary'] = {
'total_invoiced': total_invoiced,
'total_paid': total_paid,
'balance': total_invoiced - total_paid
}
return transactions
4. 更新应收账款账户引用
Workflow 4: Batch Sync Operation
Change default AR account for customer
Scenario: Sync changed entities using CDC and batch updates.
1. Use CDC to get changed entities
python
from datetime import datetime, timedelta
def sync_changed_entities(realm_id, last_sync_time, access_token):
# Get changes since last sync
entity_types = ['Invoice', 'Customer', 'Payment', 'Item']
changes = get_changed_entities(realm_id, entity_types, last_sync_time, access_token)
return changes
2. Build batch operation with updates
python
def build_batch_updates(changes):
batch_items = []
bid_counter = 0
# Process each entity type
for entity_type in ['Customer', 'Invoice', 'Payment']:
entities = extract_entities_by_type(changes, entity_type)
for entity in entities:
if entity.get('status') == 'Deleted':
# Skip deleted entities or handle separately
continue
# Example: Mark all invoices as reviewed
if entity_type == 'Invoice':
batch_items.append({
"bId": f"invoice_{bid_counter}",
"operation": "update",
"Invoice": {
"Id": entity['Id'],
"SyncToken": entity['SyncToken'],
"sparse": True,
"PrivateNote": f"Synced at {datetime.now().isoformat()}"
}
})
bid_counter += 1
return batch_items
3. Execute batch asynchronously
python
async def execute_batch_sync(realm_id, batch_items, access_token):
# Split into batches of 30 (API limit)
batch_size = 30
results = []
for i in range(0, len(batch_items), batch_size):
batch_chunk = batch_items[i:i+batch_size]
batch_request = {
"BatchItemRequest": batch_chunk
}
url = f"https://sandbox-quickbooks.api.intuit.com/v3/company/{realm_id}/batch"
response = await async_post(url, batch_request, {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
})
if response.status == 200:
batch_response = response.json()
results.extend(batch_response['BatchItemResponse'])
else:
print(f"Batch {i//batch_size + 1} failed: {response.text}")
return results
4. Process batch responses by batch ID
python
def process_batch_results(batch_results):
success_count = 0
error_count = 0
errors = []
for result in batch_results:
bid = result['bId']
if 'Fault' in result:
error_count += 1
fault = result['Fault']
errors.append({
'batch_id': bid,
'error_code': fault['Error'][0]['code'],
'message': fault['Error'][0]['Message']
})
print(f"Error in {bid}: {fault['Error'][0]['Message']}")
else:
success_count += 1
# Extract updated entity
entity_type = list(result.keys())[0]
if entity_type != 'bId':
entity = result[entity_type]
print(f"Success {bid}: {entity_type} {entity['Id']} updated")
summary = {
'total': len(batch_results),
'success': success_count,
'errors': error_count,
'error_details': errors
}
return summary
def update_customer_ar_account(realm_id, customer_id, new_ar_account_id, access_token):
customer = read_customer(realm_id, customer_id, access_token)
update_data = {
"Id": customer_id,
"SyncToken": customer['SyncToken'],
"sparse": True,
"ARAccountRef": {
"value": new_ar_account_id
}
}
return update_customer(realm_id, update_data, access_token)
Complete workflow
工作流4:批量同步操作
async def sync_workflow(realm_id, last_sync_time, access_token):
# 1. Get changes via CDC
changes = sync_changed_entities(realm_id, last_sync_time, access_token)
# 2. Build batch updates
batch_items = build_batch_updates(changes)
if not batch_items:
print("No changes to sync")
return
# 3. Execute batch
results = await execute_batch_sync(realm_id, batch_items, access_token)
# 4. Process results
summary = process_batch_results(results)
print(f"Sync complete: {summary['success']}/{summary['total']} successful")
if summary['errors'] > 0:
print(f"Errors encountered: {summary['errors']}")
for error in summary['error_details']:
print(f" {error['batch_id']}: {error['message']}")
return summary
场景:使用CDC和批量更新同步变更的实体。
1. 使用CDC获取变更的实体
python
from datetime import datetime, timedelta
def sync_changed_entities(realm_id, last_sync_time, access_token):
# Get changes since last sync
entity_types = ['Invoice', 'Customer', 'Payment', 'Item']
changes = get_changed_entities(realm_id, entity_types, last_sync_time, access_token)
return changes
2. 构建批量更新操作
python
def build_batch_updates(changes):
batch_items = []
bid_counter = 0
# Process each entity type
for entity_type in ['Customer', 'Invoice', 'Payment']:
entities = extract_entities_by_type(changes, entity_type)
for entity in entities:
if entity.get('status') == 'Deleted':
# Skip deleted entities or handle separately
continue
# Example: Mark all invoices as reviewed
if entity_type == 'Invoice':
batch_items.append({
"bId": f"invoice_{bid_counter}",
"operation": "update",
"Invoice": {
"Id": entity['Id'],
"SyncToken": entity['SyncToken'],
"sparse": True,
"PrivateNote": f"Synced at {datetime.now().isoformat()}"
}
})
bid_counter += 1
return batch_items
3. 异步执行批处理
python
async def execute_batch_sync(realm_id, batch_items, access_token):
# Split into batches of 30 (API limit)
batch_size = 30
results = []
for i in range(0, len(batch_items), batch_size):
batch_chunk = batch_items[i:i+batch_size]
batch_request = {
"BatchItemRequest": batch_chunk
}
url = f"https://sandbox-quickbooks.api.intuit.com/v3/company/{realm_id}/batch"
response = await async_post(url, batch_request, {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
})
if response.status == 200:
batch_response = response.json()
results.extend(batch_response['BatchItemResponse'])
else:
print(f"Batch {i//batch_size + 1} failed: {response.text}")
return results
4. 按批处理ID处理批量响应
python
def process_batch_results(batch_results):
success_count = 0
error_count = 0
errors = []
for result in batch_results:
bid = result['bId']
if 'Fault' in result:
error_count += 1
fault = result['Fault']
errors.append({
'batch_id': bid,
'error_code': fault['Error'][0]['code'],
'message': fault['Error'][0]['Message']
})
print(f"Error in {bid}: {fault['Error'][0]['Message']}")
else:
success_count += 1
# Extract updated entity
entity_type = list(result.keys())[0]
if entity_type != 'bId':
entity = result[entity_type]
print(f"Success {bid}: {entity_type} {entity['Id']} updated")
summary = {
'total': len(batch_results),
'success': success_count,
'errors': error_count,
'error_details': errors
}
return summary
Code Examples
Complete workflow
Example 1: OAuth2 Token Refresh (Node.js)
Complete token management with automatic refresh:
javascript
const OAuthClient = require('intuit-oauth');
class QuickBooksAuth {
constructor(clientId, clientSecret, redirectUri, environment = 'sandbox') {
this.oauthClient = new OAuthClient({
clientId: clientId,
clientSecret: clientSecret,
environment: environment,
redirectUri: redirectUri
});
this.accessToken = null;
this.refreshToken = null;
this.tokenExpiry = null;
this.refreshTimer = null;
}
// Store tokens after authorization
async storeTokens(authResponse) {
this.accessToken = authResponse.token.access_token;
this.refreshToken = authResponse.token.refresh_token;
// Calculate expiry time
const expiresIn = authResponse.token.expires_in; // 3600 seconds
this.tokenExpiry = Date.now() + (expiresIn * 1000);
// Schedule automatic refresh (5 minutes before expiry)
this.scheduleRefresh(expiresIn - 300);
// Persist tokens to secure storage
await this.saveToDatabase({
access_token: this.accessToken,
refresh_token: this.refreshToken,
expiry: this.tokenExpiry
});
}
// Schedule automatic token refresh
scheduleRefresh(delaySeconds) {
if (this.refreshTimer) {
clearTimeout(this.refreshTimer);
}
this.refreshTimer = setTimeout(async () => {
try {
await this.refreshAccessToken();
} catch (error) {
console.error('Scheduled token refresh failed:', error);
// Notify admin that re-authentication needed
this.notifyReauthenticationNeeded();
}
}, delaySeconds * 1000);
}
// Refresh access token
async refreshAccessToken() {
try {
// Set refresh token in client
this.oauthClient.setToken({
refresh_token: this.refreshToken
});
// Refresh
const authResponse = await this.oauthClient.refresh();
console.log('Token refreshed successfully');
// Store new tokens
await this.storeTokens(authResponse);
return authResponse;
} catch (error) {
console.error('Token refresh failed:', error.originalMessage);
// Check if refresh token is invalid
if (error.error === 'invalid_grant') {
console.error('Refresh token invalid - re-authentication required');
this.accessToken = null;
this.refreshToken = null;
throw new Error('Re-authentication required');
}
throw error;
}
}
// Get valid access token (refresh if needed)
async getAccessToken() {
// Check if token is about to expire (within 5 minutes)
const bufferTime = 5 * 60 * 1000; // 5 minutes
if (!this.accessToken || Date.now() >= (this.tokenExpiry - bufferTime)) {
console.log('Token expired or expiring soon, refreshing...');
await this.refreshAccessToken();
}
return this.accessToken;
}
// Make API call with automatic token refresh
async apiCall(url, options = {}) {
try {
const token = await this.getAccessToken();
const response = await fetch(url, {
...options,
headers: {
...options.headers,
'Authorization': `Bearer ${token}`,
'Accept': 'application/json'
}
});
// Handle 401 - token might have expired
if (response.status === 401) {
console.log('401 Unauthorized - refreshing token and retrying...');
await this.refreshAccessToken();
// Retry with new token
const newToken = await this.getAccessToken();
return fetch(url, {
...options,
headers: {
...options.headers,
'Authorization': `Bearer ${newToken}`,
'Accept': 'application/json'
}
});
}
return response;
} catch (error) {
console.error('API call failed:', error);
throw error;
}
}
// Save tokens to database (implement based on your storage)
async saveToDatabase(tokens) {
// Example: Save to database
// await db.tokens.update({ realmId }, tokens);
}
// Notify admin about re-auth requirement
notifyReauthenticationNeeded() {
// Example: Send email or notification
console.error('Re-authentication required for QuickBooks integration');
}
}
// Usage
const auth = new QuickBooksAuth(
'YOUR_CLIENT_ID',
'YOUR_CLIENT_SECRET',
'https://yourapp.com/callback',
'sandbox'
);
// After OAuth authorization
auth.storeTokens(authResponse);
// Make API calls - automatic token refresh
const response = await auth.apiCall(
'https://sandbox-quickbooks.api.intuit.com/v3/company/123/customer/456',
{ method: 'GET' }
);
async def sync_workflow(realm_id, last_sync_time, access_token):
# 1. Get changes via CDC
changes = sync_changed_entities(realm_id, last_sync_time, access_token)
# 2. Build batch updates
batch_items = build_batch_updates(changes)
if not batch_items:
print("No changes to sync")
return
# 3. Execute batch
results = await execute_batch_sync(realm_id, batch_items, access_token)
# 4. Process results
summary = process_batch_results(results)
print(f"Sync complete: {summary['success']}/{summary['total']} successful")
if summary['errors'] > 0:
print(f"Errors encountered: {summary['errors']}")
for error in summary['error_details']:
print(f" {error['batch_id']}: {error['message']}")
return summary
Example 2: Create Invoice (Python)
代码示例
Complete invoice creation with line items and tax:
python
import requests
from datetime import datetime, timedelta
class QuickBooksInvoice:
def __init__(self, realm_id, access_token, base_url='https://sandbox-quickbooks.api.intuit.com'):
self.realm_id = realm_id
self.access_token = access_token
self.base_url = base_url
def create_invoice(self, customer_id, line_items, due_days=30, tax_code='TAX', memo=None):
"""
Create an invoice with multiple line items
Args:
customer_id: QuickBooks customer ID
line_items: List of dicts with 'item_id', 'quantity', 'unit_price', 'description'
due_days: Days until due date
tax_code: Tax code ('TAX' for taxable, 'NON' for non-taxable)
memo: Customer memo
Returns:
Created invoice dict or None if error
"""
# Calculate dates
txn_date = datetime.now().strftime('%Y-%m-%d')
due_date = (datetime.now() + timedelta(days=due_days)).strftime('%Y-%m-%d')
# Build line items
lines = []
subtotal = 0
for idx, item in enumerate(line_items, start=1):
amount = item['quantity'] * item['unit_price']
subtotal += amount
lines.append({
"LineNum": idx,
"Amount": amount,
"DetailType": "SalesItemLineDetail",
"Description": item.get('description', ''),
"SalesItemLineDetail": {
"ItemRef": {
"value": item['item_id']
},
"Qty": item['quantity'],
"UnitPrice": item['unit_price'],
"TaxCodeRef": {
"value": tax_code
}
}
})
# Add subtotal line
lines.append({
"Amount": subtotal,
"DetailType": "SubTotalLineDetail",
"SubTotalLineDetail": {}
})
# Build invoice payload
invoice_data = {
"TxnDate": txn_date,
"DueDate": due_date,
"CustomerRef": {
"value": customer_id
},
"Line": lines,
"BillEmail": {}, # Will be populated from customer
"EmailStatus": "NotSet"
}
# Add memo if provided
if memo:
invoice_data["CustomerMemo"] = {
"value": memo
}
# Make API request
url = f"{self.base_url}/v3/company/{self.realm_id}/invoice"
headers = {
"Authorization": f"Bearer {self.access_token}",
"Accept": "application/json",
"Content-Type": "application/json"
}
try:
response = requests.post(url, json=invoice_data, headers=headers)
response.raise_for_status()
# Check for fault in response
result = response.json()
if 'Fault' in result:
self._handle_fault(result['Fault'])
return None
invoice = result['Invoice']
print(f"✓ Invoice {invoice['DocNumber']} created")
print(f" Customer: {invoice['CustomerRef']['value']}")
print(f" Total: ${invoice['TotalAmt']:.2f}")
print(f" Due: {invoice['DueDate']}")
print(f" Balance: ${invoice['Balance']:.2f}")
return invoice
except requests.exceptions.HTTPError as e:
print(f"✗ HTTP Error: {e.response.status_code}")
print(f" Response: {e.response.text}")
return None
except Exception as e:
print(f"✗ Error creating invoice: {str(e)}")
return None
def send_invoice(self, invoice_id, email_address):
"""Send invoice via email"""
url = f"{self.base_url}/v3/company/{self.realm_id}/invoice/{invoice_id}/send"
headers = {
"Authorization": f"Bearer {self.access_token}",
"Accept": "application/json"
}
params = {"sendTo": email_address}
try:
response = requests.post(url, params=params, headers=headers)
response.raise_for_status()
result = response.json()
if 'Fault' in result:
self._handle_fault(result['Fault'])
return False
invoice = result['Invoice']
print(f"✓ Invoice {invoice['DocNumber']} sent to {email_address}")
print(f" Email Status: {invoice['EmailStatus']}")
return True
except Exception as e:
print(f"✗ Error sending invoice: {str(e)}")
return False
def _handle_fault(self, fault):
"""Handle fault responses"""
print(f"✗ Fault Type: {fault['type']}")
for error in fault['Error']:
print(f" Error {error['code']}: {error['Message']}")
if 'element' in error:
print(f" Element: {error['element']}")
带自动刷新的完整令牌管理:
javascript
const OAuthClient = require('intuit-oauth');
class QuickBooksAuth {
constructor(clientId, clientSecret, redirectUri, environment = 'sandbox') {
this.oauthClient = new OAuthClient({
clientId: clientId,
clientSecret: clientSecret,
environment: environment,
redirectUri: redirectUri
});
this.accessToken = null;
this.refreshToken = null;
this.tokenExpiry = null;
this.refreshTimer = null;
}
// Store tokens after authorization
async storeTokens(authResponse) {
this.accessToken = authResponse.token.access_token;
this.refreshToken = authResponse.token.refresh_token;
// Calculate expiry time
const expiresIn = authResponse.token.expires_in; // 3600 seconds
this.tokenExpiry = Date.now() + (expiresIn * 1000);
// Schedule automatic refresh (5 minutes before expiry)
this.scheduleRefresh(expiresIn - 300);
// Persist tokens to secure storage
await this.saveToDatabase({
access_token: this.accessToken,
refresh_token: this.refreshToken,
expiry: this.tokenExpiry
});
}
// Schedule automatic token refresh
scheduleRefresh(delaySeconds) {
if (this.refreshTimer) {
clearTimeout(this.refreshTimer);
}
this.refreshTimer = setTimeout(async () => {
try {
await this.refreshAccessToken();
} catch (error) {
console.error('Scheduled token refresh failed:', error);
// Notify admin that re-authentication needed
this.notifyReauthenticationNeeded();
}
}, delaySeconds * 1000);
}
// Refresh access token
async refreshAccessToken() {
try {
// Set refresh token in client
this.oauthClient.setToken({
refresh_token: this.refreshToken
});
// Refresh
const authResponse = await this.oauthClient.refresh();
console.log('Token refreshed successfully');
// Store new tokens
await this.storeTokens(authResponse);
return authResponse;
} catch (error) {
console.error('Token refresh failed:', error.originalMessage);
// Check if refresh token is invalid
if (error.error === 'invalid_grant') {
console.error('Refresh token invalid - re-authentication required');
this.accessToken = null;
this.refreshToken = null;
throw new Error('Re-authentication required');
}
throw error;
}
}
// Get valid access token (refresh if needed)
async getAccessToken() {
// Check if token is about to expire (within 5 minutes)
const bufferTime = 5 * 60 * 1000; // 5 minutes
if (!this.accessToken || Date.now() >= (this.tokenExpiry - bufferTime)) {
console.log('Token expired or expiring soon, refreshing...');
await this.refreshAccessToken();
}
return this.accessToken;
}
// Make API call with automatic token refresh
async apiCall(url, options = {}) {
try {
const token = await this.getAccessToken();
const response = await fetch(url, {
...options,
headers: {
...options.headers,
'Authorization': `Bearer ${token}`,
'Accept': 'application/json'
}
});
// Handle 401 - token might have expired
if (response.status === 401) {
console.log('401 Unauthorized - refreshing token and retrying...');
await this.refreshAccessToken();
// Retry with new token
const newToken = await this.getAccessToken();
return fetch(url, {
...options,
headers: {
...options.headers,
'Authorization': `Bearer ${newToken}`,
'Accept': 'application/json'
}
});
}
return response;
} catch (error) {
console.error('API call failed:', error);
throw error;
}
}
// Save tokens to database (implement based on your storage)
async saveToDatabase(tokens) {
// Example: Save to database
// await db.tokens.update({ realmId }, tokens);
}
// Notify admin about re-auth requirement
notifyReauthenticationNeeded() {
// Example: Send email or notification
console.error('Re-authentication required for QuickBooks integration');
}
}
// Usage
const auth = new QuickBooksAuth(
'YOUR_CLIENT_ID',
'YOUR_CLIENT_SECRET',
'https://yourapp.com/callback',
'sandbox'
);
// After OAuth authorization
auth.storeTokens(authResponse);
// Make API calls - automatic token refresh
const response = await auth.apiCall(
'https://sandbox-quickbooks.api.intuit.com/v3/company/123/customer/456',
{ method: 'GET' }
);
Usage example
示例2:创建发票(Python)
invoice_manager = QuickBooksInvoice(realm_id='123456789', access_token='your_token')
带明细项和税费的完整发票创建:
python
import requests
from datetime import datetime, timedelta
class QuickBooksInvoice:
def __init__(self, realm_id, access_token, base_url='https://sandbox-quickbooks.api.intuit.com'):
self.realm_id = realm_id
self.access_token = access_token
self.base_url = base_url
def create_invoice(self, customer_id, line_items, due_days=30, tax_code='TAX', memo=None):
"""
Create an invoice with multiple line items
Args:
customer_id: QuickBooks customer ID
line_items: List of dicts with 'item_id', 'quantity', 'unit_price', 'description'
due_days: Days until due date
tax_code: Tax code ('TAX' for taxable, 'NON' for non-taxable)
memo: Customer memo
Returns:
Created invoice dict or None if error
"""
# Calculate dates
txn_date = datetime.now().strftime('%Y-%m-%d')
due_date = (datetime.now() + timedelta(days=due_days)).strftime('%Y-%m-%d')
# Build line items
lines = []
subtotal = 0
for idx, item in enumerate(line_items, start=1):
amount = item['quantity'] * item['unit_price']
subtotal += amount
lines.append({
"LineNum": idx,
"Amount": amount,
"DetailType": "SalesItemLineDetail",
"Description": item.get('description', ''),
"SalesItemLineDetail": {
"ItemRef": {
"value": item['item_id']
},
"Qty": item['quantity'],
"UnitPrice": item['unit_price'],
"TaxCodeRef": {
"value": tax_code
}
}
})
# Add subtotal line
lines.append({
"Amount": subtotal,
"DetailType": "SubTotalLineDetail",
"SubTotalLineDetail": {}
})
# Build invoice payload
invoice_data = {
"TxnDate": txn_date,
"DueDate": due_date,
"CustomerRef": {
"value": customer_id
},
"Line": lines,
"BillEmail": {}, # Will be populated from customer
"EmailStatus": "NotSet"
}
# Add memo if provided
if memo:
invoice_data["CustomerMemo"] = {
"value": memo
}
# Make API request
url = f"{self.base_url}/v3/company/{self.realm_id}/invoice"
headers = {
"Authorization": f"Bearer {self.access_token}",
"Accept": "application/json",
"Content-Type": "application/json"
}
try:
response = requests.post(url, json=invoice_data, headers=headers)
response.raise_for_status()
# Check for fault in response
result = response.json()
if 'Fault' in result:
self._handle_fault(result['Fault'])
return None
invoice = result['Invoice']
print(f"✓ Invoice {invoice['DocNumber']} created")
print(f" Customer: {invoice['CustomerRef']['value']}")
print(f" Total: ${invoice['TotalAmt']:.2f}")
print(f" Due: {invoice['DueDate']}")
print(f" Balance: ${invoice['Balance']:.2f}")
return invoice
except requests.exceptions.HTTPError as e:
print(f"✗ HTTP Error: {e.response.status_code}")
print(f" Response: {e.response.text}")
return None
except Exception as e:
print(f"✗ Error creating invoice: {str(e)}")
return None
def send_invoice(self, invoice_id, email_address):
"""Send invoice via email"""
url = f"{self.base_url}/v3/company/{self.realm_id}/invoice/{invoice_id}/send"
headers = {
"Authorization": f"Bearer {self.access_token}",
"Accept": "application/json"
}
params = {"sendTo": email_address}
try:
response = requests.post(url, params=params, headers=headers)
response.raise_for_status()
result = response.json()
if 'Fault' in result:
self._handle_fault(result['Fault'])
return False
invoice = result['Invoice']
print(f"✓ Invoice {invoice['DocNumber']} sent to {email_address}")
print(f" Email Status: {invoice['EmailStatus']}")
return True
except Exception as e:
print(f"✗ Error sending invoice: {str(e)}")
return False
def _handle_fault(self, fault):
"""Handle fault responses"""
print(f"✗ Fault Type: {fault['type']}")
for error in fault['Error']:
print(f" Error {error['code']}: {error['Message']}")
if 'element' in error:
print(f" Element: {error['element']}")
Create invoice with multiple items
Usage example
invoice = invoice_manager.create_invoice(
customer_id='42',
line_items=[
{
'item_id': '1',
'quantity': 10,
'unit_price': 150.00,
'description': 'Consulting services - December 2024'
},
{
'item_id': '5',
'quantity': 1,
'unit_price': 500.00,
'description': 'Project management - December 2024'
}
],
due_days=30,
tax_code='TAX', # or 'NON' for non-taxable
memo='Thank you for your business!'
)
if invoice:
# Send invoice via email
invoice_manager.send_invoice(invoice['Id'], 'customer@example.com')
invoice_manager = QuickBooksInvoice(realm_id='123456789', access_token='your_token')
Example 3: Sparse Update Customer (Node.js)
Create invoice with multiple items
Demonstrate sparse update pattern with error handling:
javascript
const axios = require('axios');
class QuickBooksCustomer {
constructor(realmId, accessToken, baseUrl = 'https://sandbox-quickbooks.api.intuit.com') {
this.realmId = realmId;
this.accessToken = accessToken;
this.baseUrl = baseUrl;
}
async readCustomer(customerId) {
const url = `${this.baseUrl}/v3/company/${this.realmId}/customer/${customerId}`;
try {
const response = await axios.get(url, {
headers: {
'Authorization': `Bearer ${this.accessToken}`,
'Accept': 'application/json'
}
});
return response.data.Customer;
} catch (error) {
console.error('Read customer failed:', error.response?.data || error.message);
throw error;
}
}
async sparseUpdate(customerId, updates) {
// First, read customer to get current SyncToken
const customer = await this.readCustomer(customerId);
// Build sparse update payload
const updateData = {
Id: customerId,
SyncToken: customer.SyncToken,
sparse: true,
...updates
};
const url = `${this.baseUrl}/v3/company/${this.realmId}/customer`;
try {
const response = await axios.post(url, updateData, {
headers: {
'Authorization': `Bearer ${this.accessToken}`,
'Accept': 'application/json',
'Content-Type': 'application/json'
}
});
// Check for fault
if (response.data.Fault) {
this.handleFault(response.data.Fault);
return null;
}
const updatedCustomer = response.data.Customer;
console.log(`✓ Customer ${updatedCustomer.DisplayName} updated`);
console.log(` New SyncToken: ${updatedCustomer.SyncToken}`);
return updatedCustomer;
} catch (error) {
if (error.response?.status === 400) {
const fault = error.response.data.Fault;
// Handle SyncToken mismatch
if (fault.Error[0].code === '3200') {
console.log('SyncToken mismatch - retrying with fresh token...');
// Recursive retry with new token
return this.sparseUpdate(customerId, updates);
}
}
console.error('Update failed:', error.response?.data || error.message);
throw error;
}
}
// Example: Update email
async updateEmail(customerId, newEmail) {
return this.sparseUpdate(customerId, {
PrimaryEmailAddr: {
Address: newEmail
}
});
}
// Example: Update phone
async updatePhone(customerId, newPhone) {
return this.sparseUpdate(customerId, {
PrimaryPhone: {
FreeFormNumber: newPhone
}
});
}
// Example: Update billing address
async updateBillingAddress(customerId, address) {
return this.sparseUpdate(customerId, {
BillAddr: {
Line1: address.line1,
City: address.city,
CountrySubDivisionCode: address.state,
PostalCode: address.zip
}
});
}
// Example: Deactivate customer
async deactivateCustomer(customerId) {
return this.sparseUpdate(customerId, {
Active: false
});
}
// Example: Update multiple fields at once
async updateMultipleFields(customerId, updates) {
const sparseUpdates = {};
if (updates.email) {
sparseUpdates.PrimaryEmailAddr = { Address: updates.email };
}
if (updates.phone) {
sparseUpdates.PrimaryPhone = { FreeFormNumber: updates.phone };
}
if (updates.displayName) {
sparseUpdates.DisplayName = updates.displayName;
}
if (updates.notes) {
sparseUpdates.Notes = updates.notes;
}
return this.sparseUpdate(customerId, sparseUpdates);
}
handleFault(fault) {
console.error(`✗ Fault Type: ${fault.type}`);
fault.Error.forEach(error => {
console.error(` Error ${error.code}: ${error.Message}`);
if (error.element) {
console.error(` Element: ${error.element}`);
}
});
}
}
// Usage
const customerManager = new QuickBooksCustomer('123456789', 'your_access_token');
// Update email
await customerManager.updateEmail('42', 'newemail@example.com');
// Update phone
await customerManager.updatePhone('42', '(555) 987-6543');
// Update address
await customerManager.updateBillingAddress('42', {
line1: '456 New Street',
city: 'San Francisco',
state: 'CA',
zip: '94105'
});
// Update multiple fields
await customerManager.updateMultipleFields('42', {
email: 'updated@example.com',
phone: '(555) 111-2222',
displayName: 'Updated Customer Name',
notes: 'VIP customer - priority support'
});
// Deactivate customer
await customerManager.deactivateCustomer('42');
invoice = invoice_manager.create_invoice(
customer_id='42',
line_items=[
{
'item_id': '1',
'quantity': 10,
'unit_price': 150.00,
'description': 'Consulting services - December 2024'
},
{
'item_id': '5',
'quantity': 1,
'unit_price': 500.00,
'description': 'Project management - December 2024'
}
],
due_days=30,
tax_code='TAX', # or 'NON' for non-taxable
memo='Thank you for your business!'
)
if invoice:
# Send invoice via email
invoice_manager.send_invoice(invoice['Id'], 'customer@example.com')
Example 4: Query with Filters (Python)
示例3:稀疏更新客户(Node.js)
Complex query with date range and sorting:
python
import requests
from urllib.parse import quote
from datetime import datetime, timedelta
class QuickBooksQuery:
def __init__(self, realm_id, access_token, base_url='https://sandbox-quickbooks.api.intuit.com'):
self.realm_id = realm_id
self.access_token = access_token
self.base_url = base_url
def query(self, sql_query):
"""Execute SQL-like query"""
encoded_query = quote(sql_query)
url = f"{self.base_url}/v3/company/{self.realm_id}/query?query={encoded_query}"
headers = {
"Authorization": f"Bearer {self.access_token}",
"Accept": "application/json"
}
try:
response = requests.get(url, headers=headers)
response.raise_for_status()
result = response.json()
if 'Fault' in result:
print(f"Query error: {result['Fault']}")
return []
query_response = result.get('QueryResponse', {})
# Extract entities (keys vary by entity type)
for key, value in query_response.items():
if key not in ['startPosition', 'maxResults', 'totalCount']:
return value if isinstance(value, list) else []
return []
except Exception as e:
print(f"Query failed: {str(e)}")
return []
def query_invoices_by_date_range(self, start_date, end_date, customer_id=None):
"""Query invoices within date range, optionally filtered by customer"""
query = f"SELECT * FROM Invoice WHERE TxnDate >= '{start_date}' AND TxnDate <= '{end_date}'"
if customer_id:
query += f" AND CustomerRef = '{customer_id}'"
query += " ORDERBY TxnDate DESC"
invoices = self.query(query)
print(f"Found {len(invoices)} invoices between {start_date} and {end_date}")
# Calculate totals
total_amount = sum(inv['TotalAmt'] for inv in invoices)
total_balance = sum(inv['Balance'] for inv in invoices)
print(f"Total invoiced: ${total_amount:.2f}")
print(f"Outstanding balance: ${total_balance:.2f}")
return invoices
def query_overdue_invoices(self, as_of_date=None):
"""Query invoices past due date"""
if not as_of_date:
as_of_date = datetime.now().strftime('%Y-%m-%d')
query = f"SELECT * FROM Invoice WHERE Balance > '0' AND DueDate < '{as_of_date}' ORDERBY DueDate"
invoices = self.query(query)
print(f"Found {len(invoices)} overdue invoices as of {as_of_date}")
# Group by customer
by_customer = {}
for inv in invoices:
customer_id = inv['CustomerRef']['value']
if customer_id not in by_customer:
by_customer[customer_id] = {
'customer_name': inv['CustomerRef'].get('name', 'Unknown'),
'invoices': [],
'total_overdue': 0
}
by_customer[customer_id]['invoices'].append(inv)
by_customer[customer_id]['total_overdue'] += inv['Balance']
# Print summary
for customer_id, data in by_customer.items():
print(f"\nCustomer: {data['customer_name']}")
print(f" Overdue invoices: {len(data['invoices'])}")
print(f" Total overdue: ${data['total_overdue']:.2f}")
return invoices
def query_customers_by_balance(self, min_balance=0):
"""Query customers with balance greater than minimum"""
query = f"SELECT * FROM Customer WHERE Balance > '{min_balance}' ORDERBY Balance DESC"
customers = self.query(query)
print(f"Found {len(customers)} customers with balance > ${min_balance}")
total_ar = sum(cust['Balance'] for cust in customers)
print(f"Total accounts receivable: ${total_ar:.2f}")
return customers
def query_items_by_type(self, item_type='Service'):
"""Query items by type (Service, Inventory, NonInventory, Category)"""
query = f"SELECT * FROM Item WHERE Type = '{item_type}' AND Active = true ORDERBY Name"
items = self.query(query)
print(f"Found {len(items)} active {item_type} items")
return items
def search_customers_by_name(self, search_term):
"""Search customers by display name"""
query = f"SELECT * FROM Customer WHERE DisplayName LIKE '%{search_term}%' ORDERBY DisplayName"
customers = self.query(query)
print(f"Found {len(customers)} customers matching '{search_term}'")
for cust in customers:
print(f" {cust['DisplayName']} - Balance: ${cust['Balance']:.2f}")
return customers
def query_recent_payments(self, days=30):
"""Query payments from last N days"""
start_date = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d')
query = f"SELECT * FROM Payment WHERE TxnDate >= '{start_date}' ORDERBY TxnDate DESC"
payments = self.query(query)
print(f"Found {len(payments)} payments in last {days} days")
total_received = sum(pmt['TotalAmt'] for pmt in payments)
print(f"Total payments received: ${total_received:.2f}")
return payments
展示带错误处理的稀疏更新模式:
javascript
const axios = require('axios');
class QuickBooksCustomer {
constructor(realmId, accessToken, baseUrl = 'https://sandbox-quickbooks.api.intuit.com') {
this.realmId = realmId;
this.accessToken = accessToken;
this.baseUrl = baseUrl;
}
async readCustomer(customerId) {
const url = `${this.baseUrl}/v3/company/${this.realmId}/customer/${customerId}`;
try {
const response = await axios.get(url, {
headers: {
'Authorization': `Bearer ${this.accessToken}`,
'Accept': 'application/json'
}
});
return response.data.Customer;
} catch (error) {
console.error('Read customer failed:', error.response?.data || error.message);
throw error;
}
}
async sparseUpdate(customerId, updates) {
// First, read customer to get current SyncToken
const customer = await this.readCustomer(customerId);
// Build sparse update payload
const updateData = {
Id: customerId,
SyncToken: customer.SyncToken,
sparse: true,
...updates
};
const url = `${this.baseUrl}/v3/company/${this.realmId}/customer`;
try {
const response = await axios.post(url, updateData, {
headers: {
'Authorization': `Bearer ${this.accessToken}`,
'Accept': 'application/json',
'Content-Type': 'application/json'
}
});
// Check for fault
if (response.data.Fault) {
this.handleFault(response.data.Fault);
return null;
}
const updatedCustomer = response.data.Customer;
console.log(`✓ Customer ${updatedCustomer.DisplayName} updated`);
console.log(` New SyncToken: ${updatedCustomer.SyncToken}`);
return updatedCustomer;
} catch (error) {
if (error.response?.status === 400) {
const fault = error.response.data.Fault;
// Handle SyncToken mismatch
if (fault.Error[0].code === '3200') {
console.log('SyncToken mismatch - retrying with fresh token...');
// Recursive retry with new token
return this.sparseUpdate(customerId, updates);
}
}
console.error('Update failed:', error.response?.data || error.message);
throw error;
}
}
// Example: Update email
async updateEmail(customerId, newEmail) {
return this.sparseUpdate(customerId, {
PrimaryEmailAddr: {
Address: newEmail
}
});
}
// Example: Update phone
async updatePhone(customerId, newPhone) {
return this.sparseUpdate(customerId, {
PrimaryPhone: {
FreeFormNumber: newPhone
}
});
}
// Example: Update billing address
async updateBillingAddress(customerId, address) {
return this.sparseUpdate(customerId, {
BillAddr: {
Line1: address.line1,
City: address.city,
CountrySubDivisionCode: address.state,
PostalCode: address.zip
}
});
}
// Example: Deactivate customer
async deactivateCustomer(customerId) {
return this.sparseUpdate(customerId, {
Active: false
});
}
// Example: Update multiple fields at once
async updateMultipleFields(customerId, updates) {
const sparseUpdates = {};
if (updates.email) {
sparseUpdates.PrimaryEmailAddr = { Address: updates.email };
}
if (updates.phone) {
sparseUpdates.PrimaryPhone = { FreeFormNumber: updates.phone };
}
if (updates.displayName) {
sparseUpdates.DisplayName = updates.displayName;
}
if (updates.notes) {
sparseUpdates.Notes = updates.notes;
}
return this.sparseUpdate(customerId, sparseUpdates);
}
handleFault(fault) {
console.error(`✗ Fault Type: ${fault.type}`);
fault.Error.forEach(error => {
console.error(` Error ${error.code}: ${error.Message}`);
if (error.element) {
console.error(` Element: ${error.element}`);
}
});
}
}
// Usage
const customerManager = new QuickBooksCustomer('123456789', 'your_access_token');
// Update email
await customerManager.updateEmail('42', 'newemail@example.com');
// Update phone
await customerManager.updatePhone('42', '(555) 987-6543');
// Update address
await customerManager.updateBillingAddress('42', {
line1: '456 New Street',
city: 'San Francisco',
state: 'CA',
zip: '94105'
});
// Update multiple fields
await customerManager.updateMultipleFields('42', {
email: 'updated@example.com',
phone: '(555) 111-2222',
displayName: 'Updated Customer Name',
notes: 'VIP customer - priority support'
});
// Deactivate customer
await customerManager.deactivateCustomer('42');
query_service = QuickBooksQuery(realm_id='123456789', access_token='your_token')
带日期范围和排序的复杂查询:
python
import requests
from urllib.parse import quote
from datetime import datetime, timedelta
class QuickBooksQuery:
def __init__(self, realm_id, access_token, base_url='https://sandbox-quickbooks.api.intuit.com'):
self.realm_id = realm_id
self.access_token = access_token
self.base_url = base_url
def query(self, sql_query):
"""Execute SQL-like query"""
encoded_query = quote(sql_query)
url = f"{self.base_url}/v3/company/{self.realm_id}/query?query={encoded_query}"
headers = {
"Authorization": f"Bearer {self.access_token}",
"Accept": "application/json"
}
try:
response = requests.get(url, headers=headers)
response.raise_for_status()
result = response.json()
if 'Fault' in result:
print(f"Query error: {result['Fault']}")
return []
query_response = result.get('QueryResponse', {})
# Extract entities (keys vary by entity type)
for key, value in query_response.items():
if key not in ['startPosition', 'maxResults', 'totalCount']:
return value if isinstance(value, list) else []
return []
except Exception as e:
print(f"Query failed: {str(e)}")
return []
def query_invoices_by_date_range(self, start_date, end_date, customer_id=None):
"""Query invoices within date range, optionally filtered by customer"""
query = f"SELECT * FROM Invoice WHERE TxnDate >= '{start_date}' AND TxnDate <= '{end_date}'"
if customer_id:
query += f" AND CustomerRef = '{customer_id}'"
query += " ORDERBY TxnDate DESC"
invoices = self.query(query)
print(f"Found {len(invoices)} invoices between {start_date} and {end_date}")
# Calculate totals
total_amount = sum(inv['TotalAmt'] for inv in invoices)
total_balance = sum(inv['Balance'] for inv in invoices)
print(f"Total invoiced: ${total_amount:.2f}")
print(f"Outstanding balance: ${total_balance:.2f}")
return invoices
def query_overdue_invoices(self, as_of_date=None):
"""Query invoices past due date"""
if not as_of_date:
as_of_date = datetime.now().strftime('%Y-%m-%d')
query = f"SELECT * FROM Invoice WHERE Balance > '0' AND DueDate < '{as_of_date}' ORDERBY DueDate"
invoices = self.query(query)
print(f"Found {len(invoices)} overdue invoices as of {as_of_date}")
# Group by customer
by_customer = {}
for inv in invoices:
customer_id = inv['CustomerRef']['value']
if customer_id not in by_customer:
by_customer[customer_id] = {
'customer_name': inv['CustomerRef'].get('name', 'Unknown'),
'invoices': [],
'total_overdue': 0
}
by_customer[customer_id]['invoices'].append(inv)
by_customer[customer_id]['total_overdue'] += inv['Balance']
# Print summary
for customer_id, data in by_customer.items():
print(f"\nCustomer: {data['customer_name']}")
print(f" Overdue invoices: {len(data['invoices'])}")
print(f" Total overdue: ${data['total_overdue']:.2f}")
return invoices
def query_customers_by_balance(self, min_balance=0):
"""Query customers with balance greater than minimum"""
query = f"SELECT * FROM Customer WHERE Balance > '{min_balance}' ORDERBY Balance DESC"
customers = self.query(query)
print(f"Found {len(customers)} customers with balance > ${min_balance}")
total_ar = sum(cust['Balance'] for cust in customers)
print(f"Total accounts receivable: ${total_ar:.2f}")
return customers
def query_items_by_type(self, item_type='Service'):
"""Query items by type (Service, Inventory, NonInventory, Category)"""
query = f"SELECT * FROM Item WHERE Type = '{item_type}' AND Active = true ORDERBY Name"
items = self.query(query)
print(f"Found {len(items)} active {item_type} items")
return items
def search_customers_by_name(self, search_term):
"""Search customers by display name"""
query = f"SELECT * FROM Customer WHERE DisplayName LIKE '%{search_term}%' ORDERBY DisplayName"
customers = self.query(query)
print(f"Found {len(customers)} customers matching '{search_term}'")
for cust in customers:
print(f" {cust['DisplayName']} - Balance: ${cust['Balance']:.2f}")
return customers
def query_recent_payments(self, days=30):
"""Query payments from last N days"""
start_date = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d')
query = f"SELECT * FROM Payment WHERE TxnDate >= '{start_date}' ORDERBY TxnDate DESC"
payments = self.query(query)
print(f"Found {len(payments)} payments in last {days} days")
total_received = sum(pmt['TotalAmt'] for pmt in payments)
print(f"Total payments received: ${total_received:.2f}")
return payments
Query invoices for date range
Usage
invoices = query_service.query_invoices_by_date_range(
start_date='2024-01-01',
end_date='2024-12-31'
)
query_service = QuickBooksQuery(realm_id='123456789', access_token='your_token')
Query invoices for specific customer
Query invoices for date range
customer_invoices = query_service.query_invoices_by_date_range(
start_date='2024-01-01',
end_date='2024-12-31',
customer_id='42'
)
invoices = query_service.query_invoices_by_date_range(
start_date='2024-01-01',
end_date='2024-12-31'
)
Find overdue invoices
Query invoices for specific customer
overdue = query_service.query_overdue_invoices()
customer_invoices = query_service.query_invoices_by_date_range(
start_date='2024-01-01',
end_date='2024-12-31',
customer_id='42'
)
Find customers with high balances
Find overdue invoices
high_balance_customers = query_service.query_customers_by_balance(min_balance=1000.00)
overdue = query_service.query_overdue_invoices()
Search for customer
Find customers with high balances
customers = query_service.search_customers_by_name('Acme')
high_balance_customers = query_service.query_customers_by_balance(min_balance=1000.00)
Get recent payments
Search for customer
recent_payments = query_service.query_recent_payments(days=30)
customers = query_service.search_customers_by_name('Acme')
Example 5: Batch Operations (Node.js)
Get recent payments
Batch create/update multiple entities:
javascript
const axios = require('axios');
class QuickBooksBatch {
constructor(realmId, accessToken, baseUrl = 'https://sandbox-quickbooks.api.intuit.com') {
this.realmId = realmId;
this.accessToken = accessToken;
this.baseUrl = baseUrl;
}
async executeBatch(batchItems) {
const url = `${this.baseUrl}/v3/company/${this.realmId}/batch`;
try {
const response = await axios.post(url, {
BatchItemRequest: batchItems
}, {
headers: {
'Authorization': `Bearer ${this.accessToken}`,
'Content-Type': 'application/json',
'Accept': 'application/json'
}
});
const results = response.data.BatchItemResponse;
// Process results
const summary = {
total: results.length,
success: 0,
errors: 0,
results: []
};
results.forEach(result => {
if (result.Fault) {
summary.errors++;
console.error(`✗ Error for ${result.bId}:`);
result.Fault.Error.forEach(err => {
console.error(` ${err.code}: ${err.Message}`);
});
summary.results.push({
bId: result.bId,
status: 'error',
error: result.Fault
});
} else {
summary.success++;
// Extract entity from result
const entityType = Object.keys(result).find(k => k !== 'bId');
const entity = result[entityType];
console.log(`✓ Success for ${result.bId}: ${entityType} ${entity.Id}`);
summary.results.push({
bId: result.bId,
status: 'success',
entityType: entityType,
entity: entity
});
}
});
console.log(`\nBatch complete: ${summary.success}/${summary.total} successful`);
return summary;
} catch (error) {
console.error('Batch operation failed:', error.response?.data || error.message);
throw error;
}
}
// Batch create customers
async batchCreateCustomers(customers) {
const batchItems = customers.map((customer, index) => ({
bId: `customer_create_${index}`,
operation: 'create',
Customer: {
DisplayName: customer.displayName,
PrimaryEmailAddr: { Address: customer.email },
PrimaryPhone: { FreeFormNumber: customer.phone },
BillAddr: {
Line1: customer.address,
City: customer.city,
CountrySubDivisionCode: customer.state,
PostalCode: customer.zip
}
}
}));
return this.executeBatch(batchItems);
}
// Batch update invoices
async batchUpdateInvoices(updates) {
const batchItems = updates.map((update, index) => ({
bId: `invoice_update_${index}`,
operation: 'update',
Invoice: {
Id: update.id,
SyncToken: update.syncToken,
sparse: true,
...update.changes
}
}));
return this.executeBatch(batchItems);
}
// Batch query multiple entities
async batchQuery(queries) {
const batchItems = queries.map((query, index) => ({
bId: `query_${index}`,
operation: 'query',
Query: query.sql
}));
const results = await this.executeBatch(batchItems);
// Extract query results
const queryResults = {};
results.results.forEach(result => {
if (result.status === 'success' && result.entity.QueryResponse) {
queryResults[result.bId] = result.entity.QueryResponse;
}
});
return queryResults;
}
// Mixed batch operations
async mixedBatch(operations) {
const batchItems = operations.map((op, index) => {
const item = {
bId: `op_${index}_${op.type}`,
operation: op.operation
};
// Add entity or query data
if (op.operation === 'query') {
item.Query = op.data;
} else {
item[op.entityType] = op.data;
}
return item;
});
return this.executeBatch(batchItems);
}
}
// Usage Examples
const batchService = new QuickBooksBatch('123456789', 'your_access_token');
// Example 1: Batch create customers
const newCustomers = [
{
displayName: 'Acme Corp',
email: 'contact@acme.com',
phone: '(555) 111-1111',
address: '123 Main St',
city: 'San Francisco',
state: 'CA',
zip: '94105'
},
{
displayName: 'TechStart Inc',
email: 'hello@techstart.com',
phone: '(555) 222-2222',
address: '456 Market St',
city: 'San Francisco',
state: 'CA',
zip: '94103'
}
];
const createResults = await batchService.batchCreateCustomers(newCustomers);
// Example 2: Batch update invoices (mark as sent)
const invoiceUpdates = [
{ id: '145', syncToken: '0', changes: { EmailStatus: 'NeedToSend' } },
{ id: '146', syncToken: '0', changes: { EmailStatus: 'NeedToSend' } },
{ id: '147', syncToken: '0', changes: { EmailStatus: 'NeedToSend' } }
];
const updateResults = await batchService.batchUpdateInvoices(invoiceUpdates);
// Example 3: Batch queries
const queries = [
{ sql: 'SELECT * FROM Customer WHERE Active = true MAXRESULTS 10' },
{ sql: 'SELECT * FROM Invoice WHERE Balance > 0 MAXRESULTS 10' },
{ sql: 'SELECT * FROM Payment MAXRESULTS 10' }
];
const queryResults = await batchService.batchQuery(queries);
// Example 4: Mixed batch operations
const mixedOps = [
{
type: 'create_customer',
operation: 'create',
entityType: 'Customer',
data: {
DisplayName: 'New Customer',
PrimaryEmailAddr: { Address: 'new@example.com' }
}
},
{
type: 'update_invoice',
operation: 'update',
entityType: 'Invoice',
data: {
Id: '145',
SyncToken: '1',
sparse: true,
CustomerMemo: { value: 'Thank you!' }
}
},
{
type: 'query_items',
operation: 'query',
data: 'SELECT * FROM Item WHERE Type = \'Service\' MAXRESULTS 5'
}
];
const mixedResults = await batchService.mixedBatch(mixedOps);
console.log(`Mixed batch: ${mixedResults.success}/${mixedResults.total} successful`);
recent_payments = query_service.query_recent_payments(days=30)
Example 6: Payment Application (Python)
示例5:批处理操作(Node.js)
Apply payment to multiple invoices:
python
import requests
class QuickBooksPayment:
def __init__(self, realm_id, access_token, base_url='https://sandbox-quickbooks.api.intuit.com'):
self.realm_id = realm_id
self.access_token = access_token
self.base_url = base_url
def create_payment(self, customer_id, total_amount, payment_method_id,
payment_ref_num, txn_date, invoice_applications):
"""
Create payment and apply to one or more invoices
Args:
customer_id: QuickBooks customer ID
total_amount: Total payment amount
payment_method_id: Payment method ID
payment_ref_num: Check number or transaction reference
txn_date: Payment date (YYYY-MM-DD)
invoice_applications: List of {'invoice_id': str, 'amount': float}
Returns:
Created payment dict or None if error
"""
# Build line items for invoice applications
lines = []
total_applied = 0
for application in invoice_applications:
lines.append({
"Amount": application['amount'],
"LinkedTxn": [
{
"TxnId": application['invoice_id'],
"TxnType": "Invoice"
}
]
})
total_applied += application['amount']
# Check for unapplied amount
unapplied = total_amount - total_applied
if unapplied < 0:
print(f"Warning: Applied amount (${total_applied}) exceeds payment (${total_amount})")
return None
# Build payment payload
payment_data = {
"TotalAmt": total_amount,
"CustomerRef": {
"value": customer_id
},
"TxnDate": txn_date,
"PaymentMethodRef": {
"value": payment_method_id
},
"PaymentRefNum": payment_ref_num,
"Line": lines
}
# Make API request
url = f"{self.base_url}/v3/company/{self.realm_id}/payment"
headers = {
"Authorization": f"Bearer {self.access_token}",
"Accept": "application/json",
"Content-Type": "application/json"
}
try:
response = requests.post(url, json=payment_data, headers=headers)
response.raise_for_status()
result = response.json()
if 'Fault' in result:
self._handle_fault(result['Fault'])
return None
payment = result['Payment']
print(f"✓ Payment created: ID {payment['Id']}")
print(f" Customer: {payment['CustomerRef']['value']}")
print(f" Total Amount: ${payment['TotalAmt']:.2f}")
print(f" Applied Amount: ${total_applied:.2f}")
print(f" Unapplied Amount: ${payment.get('UnappliedAmt', 0):.2f}")
print(f" Reference: {payment.get('PaymentRefNum', 'N/A')}")
# Show invoice applications
for line in payment['Line']:
if 'LinkedTxn' in line:
for linked in line['LinkedTxn']:
print(f" Applied ${line['Amount']:.2f} to {linked['TxnType']} {linked['TxnId']}")
return payment
except requests.exceptions.HTTPError as e:
print(f"✗ HTTP Error: {e.response.status_code}")
print(f" Response: {e.response.text}")
return None
except Exception as e:
print(f"✗ Error creating payment: {str(e)}")
return None
def apply_payment_to_invoices(self, customer_id, check_number, check_amount,
check_date, invoices):
"""
Apply a single check payment to multiple invoices
Args:
customer_id: Customer ID
check_number: Check number
check_amount: Total check amount
check_date: Check date
invoices: List of {'id': str, 'amount_to_apply': float, 'balance': float}
Returns:
Payment dict or None
"""
# Get check payment method ID
payment_methods = self.query_payment_methods()
check_method = next((pm for pm in payment_methods if pm['Name'].lower() == 'check'), None)
if not check_method:
print("Check payment method not found")
return None
# Build invoice applications
applications = []
total_to_apply = 0
for invoice in invoices:
amount = min(invoice['amount_to_apply'], invoice['balance'])
applications.append({
'invoice_id': invoice['id'],
'amount': amount
})
total_to_apply += amount
print(f"Will apply ${amount:.2f} to Invoice {invoice['id']}")
# Check if payment covers all applications
if total_to_apply > check_amount:
print(f"Warning: Total applications (${total_to_apply}) exceeds check amount (${check_amount})")
return None
# Create payment
payment = self.create_payment(
customer_id=customer_id,
total_amount=check_amount,
payment_method_id=check_method['Id'],
payment_ref_num=check_number,
txn_date=check_date,
invoice_applications=applications
)
return payment
def apply_partial_payment(self, customer_id, payment_amount, payment_method_id,
invoice_id, partial_amount):
"""Apply partial payment to invoice"""
if partial_amount > payment_amount:
print("Partial amount cannot exceed total payment")
return None
applications = [{
'invoice_id': invoice_id,
'amount': partial_amount
}]
payment = self.create_payment(
customer_id=customer_id,
total_amount=payment_amount,
payment_method_id=payment_method_id,
payment_ref_num='',
txn_date=datetime.now().strftime('%Y-%m-%d'),
invoice_applications=applications
)
if payment and payment.get('UnappliedAmt', 0) > 0:
print(f"\nNote: ${payment['UnappliedAmt']:.2f} remains unapplied")
print("This amount can be applied to future invoices or refunded")
return payment
def query_payment_methods(self):
"""Get available payment methods"""
from urllib.parse import quote
query = "SELECT * FROM PaymentMethod"
encoded_query = quote(query)
url = f"{self.base_url}/v3/company/{self.realm_id}/query?query={encoded_query}"
headers = {
"Authorization": f"Bearer {self.access_token}",
"Accept": "application/json"
}
response = requests.get(url, headers=headers)
result = response.json()
return result.get('QueryResponse', {}).get('PaymentMethod', [])
def _handle_fault(self, fault):
"""Handle fault responses"""
print(f"✗ Fault Type: {fault['type']}")
for error in fault['Error']:
print(f" Error {error['code']}: {error['Message']}")
if 'element' in error:
print(f" Element: {error['element']}")
批量创建/更新多个实体:
javascript
const axios = require('axios');
class QuickBooksBatch {
constructor(realmId, accessToken, baseUrl = 'https://sandbox-quickbooks.api.intuit.com') {
this.realmId = realmId;
this.accessToken = accessToken;
this.baseUrl = baseUrl;
}
async executeBatch(batchItems) {
const url = `${this.baseUrl}/v3/company/${this.realmId}/batch`;
try {
const response = await axios.post(url, {
BatchItemRequest: batchItems
}, {
headers: {
'Authorization': `Bearer ${this.accessToken}`,
'Content-Type': 'application/json',
'Accept': 'application/json'
}
});
const results = response.data.BatchItemResponse;
// Process results
const summary = {
total: results.length,
success: 0,
errors: 0,
results: []
};
results.forEach(result => {
if (result.Fault) {
summary.errors++;
console.error(`✗ Error for ${result.bId}:`);
result.Fault.Error.forEach(err => {
console.error(` ${err.code}: ${err.Message}`);
});
summary.results.push({
bId: result.bId,
status: 'error',
error: result.Fault
});
} else {
summary.success++;
// Extract entity from result
const entityType = Object.keys(result).find(k => k !== 'bId');
const entity = result[entityType];
console.log(`✓ Success for ${result.bId}: ${entityType} ${entity.Id}`);
summary.results.push({
bId: result.bId,
status: 'success',
entityType: entityType,
entity: entity
});
}
});
console.log(`\nBatch complete: ${summary.success}/${summary.total} successful`);
return summary;
} catch (error) {
console.error('Batch operation failed:', error.response?.data || error.message);
throw error;
}
}
// Batch create customers
async batchCreateCustomers(customers) {
const batchItems = customers.map((customer, index) => ({
bId: `customer_create_${index}`,
operation: 'create',
Customer: {
DisplayName: customer.displayName,
PrimaryEmailAddr: { Address: customer.email },
PrimaryPhone: { FreeFormNumber: customer.phone },
BillAddr: {
Line1: customer.address,
City: customer.city,
CountrySubDivisionCode: customer.state,
PostalCode: customer.zip
}
}
}));
return this.executeBatch(batchItems);
}
// Batch update invoices
async batchUpdateInvoices(updates) {
const batchItems = updates.map((update, index) => ({
bId: `invoice_update_${index}`,
operation: 'update',
Invoice: {
Id: update.id,
SyncToken: update.syncToken,
sparse: true,
...update.changes
}
}));
return this.executeBatch(batchItems);
}
// Batch query multiple entities
async batchQuery(queries) {
const batchItems = queries.map((query, index) => ({
bId: `query_${index}`,
operation: 'query',
Query: query.sql
}));
const results = await this.executeBatch(batchItems);
// Extract query results
const queryResults = {};
results.results.forEach(result => {
if (result.status === 'success' && result.entity.QueryResponse) {
queryResults[result.bId] = result.entity.QueryResponse;
}
});
return queryResults;
}
// Mixed batch operations
async mixedBatch(operations) {
const batchItems = operations.map((op, index) => {
const item = {
bId: `op_${index}_${op.type}`,
operation: op.operation
};
// Add entity or query data
if (op.operation === 'query') {
item.Query = op.data;
} else {
item[op.entityType] = op.data;
}
return item;
});
return this.executeBatch(batchItems);
}
}
// Usage Examples
const batchService = new QuickBooksBatch('123456789', 'your_access_token');
// Example 1: Batch create customers
const newCustomers = [
{
displayName: 'Acme Corp',
email: 'contact@acme.com',
phone: '(555) 111-1111',
address: '123 Main St',
city: 'San Francisco',
state: 'CA',
zip: '94105'
},
{
displayName: 'TechStart Inc',
email: 'hello@techstart.com',
phone: '(555) 222-2222',
address: '456 Market St',
city: 'San Francisco',
state: 'CA',
zip: '94103'
}
];
const createResults = await batchService.batchCreateCustomers(newCustomers);
// Example 2: Batch update invoices (mark as sent)
const invoiceUpdates = [
{ id: '145', syncToken: '0', changes: { EmailStatus: 'NeedToSend' } },
{ id: '146', syncToken: '0', changes: { EmailStatus: 'NeedToSend' } },
{ id: '147', syncToken: '0', changes: { EmailStatus: 'NeedToSend' } }
];
const updateResults = await batchService.batchUpdateInvoices(invoiceUpdates);
// Example 3: Batch queries
const queries = [
{ sql: 'SELECT * FROM Customer WHERE Active = true MAXRESULTS 10' },
{ sql: 'SELECT * FROM Invoice WHERE Balance > 0 MAXRESULTS 10' },
{ sql: 'SELECT * FROM Payment MAXRESULTS 10' }
];
const queryResults = await batchService.batchQuery(queries);
// Example 4: Mixed batch operations
const mixedOps = [
{
type: 'create_customer',
operation: 'create',
entityType: 'Customer',
data: {
DisplayName: 'New Customer',
PrimaryEmailAddr: { Address: 'new@example.com' }
}
},
{
type: 'update_invoice',
operation: 'update',
entityType: 'Invoice',
data: {
Id: '145',
SyncToken: '1',
sparse: true,
CustomerMemo: { value: 'Thank you!' }
}
},
{
type: 'query_items',
operation: 'query',
data: 'SELECT * FROM Item WHERE Type = \'Service\' MAXRESULTS 5'
}
];
const mixedResults = await batchService.mixedBatch(mixedOps);
console.log(`Mixed batch: ${mixedResults.success}/${mixedResults.total} successful`);
Usage Examples
示例6:付款分配(Python)
payment_service = QuickBooksPayment(realm_id='123456789', access_token='your_token')
将付款分配到多张发票:
python
import requests
class QuickBooksPayment:
def __init__(self, realm_id, access_token, base_url='https://sandbox-quickbooks.api.intuit.com'):
self.realm_id = realm_id
self.access_token = access_token
self.base_url = base_url
def create_payment(self, customer_id, total_amount, payment_method_id,
payment_ref_num, txn_date, invoice_applications):
"""
Create payment and apply to one or more invoices
Args:
customer_id: QuickBooks customer ID
total_amount: Total payment amount
payment_method_id: Payment method ID
payment_ref_num: Check number or transaction reference
txn_date: Payment date (YYYY-MM-DD)
invoice_applications: List of {'invoice_id': str, 'amount': float}
Returns:
Created payment dict or None if error
"""
# Build line items for invoice applications
lines = []
total_applied = 0
for application in invoice_applications:
lines.append({
"Amount": application['amount'],
"LinkedTxn": [
{
"TxnId": application['invoice_id'],
"TxnType": "Invoice"
}
]
})
total_applied += application['amount']
# Check for unapplied amount
unapplied = total_amount - total_applied
if unapplied < 0:
print(f"Warning: Applied amount (${total_applied}) exceeds payment (${total_amount})")
return None
# Build payment payload
payment_data = {
"TotalAmt": total_amount,
"CustomerRef": {
"value": customer_id
},
"TxnDate": txn_date,
"PaymentMethodRef": {
"value": payment_method_id
},
"PaymentRefNum": payment_ref_num,
"Line": lines
}
# Make API request
url = f"{self.base_url}/v3/company/{self.realm_id}/payment"
headers = {
"Authorization": f"Bearer {self.access_token}",
"Accept": "application/json",
"Content-Type": "application/json"
}
try:
response = requests.post(url, json=payment_data, headers=headers)
response.raise_for_status()
result = response.json()
if 'Fault' in result:
self._handle_fault(result['Fault'])
return None
payment = result['Payment']
print(f"✓ Payment created: ID {payment['Id']}")
print(f" Customer: {payment['CustomerRef']['value']}")
print(f" Total Amount: ${payment['TotalAmt']:.2f}")
print(f" Applied Amount: ${total_applied:.2f}")
print(f" Unapplied Amount: ${payment.get('UnappliedAmt', 0):.2f}")
print(f" Reference: {payment.get('PaymentRefNum', 'N/A')}")
# Show invoice applications
for line in payment['Line']:
if 'LinkedTxn' in line:
for linked in line['LinkedTxn']:
print(f" Applied ${line['Amount']:.2f} to {linked['TxnType']} {linked['TxnId']}")
return payment
except requests.exceptions.HTTPError as e:
print(f"✗ HTTP Error: {e.response.status_code}")
print(f" Response: {e.response.text}")
return None
except Exception as e:
print(f"✗ Error creating payment: {str(e)}")
return None
def apply_payment_to_invoices(self, customer_id, check_number, check_amount,
check_date, invoices):
"""
Apply a single check payment to multiple invoices
Args:
customer_id: Customer ID
check_number: Check number
check_amount: Total check amount
check_date: Check date
invoices: List of {'id': str, 'amount_to_apply': float, 'balance': float}
Returns:
Payment dict or None
"""
# Get check payment method ID
payment_methods = self.query_payment_methods()
check_method = next((pm for pm in payment_methods if pm['Name'].lower() == 'check'), None)
if not check_method:
print("Check payment method not found")
return None
# Build invoice applications
applications = []
total_to_apply = 0
for invoice in invoices:
amount = min(invoice['amount_to_apply'], invoice['balance'])
applications.append({
'invoice_id': invoice['id'],
'amount': amount
})
total_to_apply += amount
print(f"Will apply ${amount:.2f} to Invoice {invoice['id']}")
# Check if payment covers all applications
if total_to_apply > check_amount:
print(f"Warning: Total applications (${total_to_apply}) exceeds check amount (${check_amount})")
return None
# Create payment
payment = self.create_payment(
customer_id=customer_id,
total_amount=check_amount,
payment_method_id=check_method['Id'],
payment_ref_num=check_number,
txn_date=check_date,
invoice_applications=applications
)
return payment
def apply_partial_payment(self, customer_id, payment_amount, payment_method_id,
invoice_id, partial_amount):
"""Apply partial payment to invoice"""
if partial_amount > payment_amount:
print("Partial amount cannot exceed total payment")
return None
applications = [{
'invoice_id': invoice_id,
'amount': partial_amount
}]
payment = self.create_payment(
customer_id=customer_id,
total_amount=payment_amount,
payment_method_id=payment_method_id,
payment_ref_num='',
txn_date=datetime.now().strftime('%Y-%m-%d'),
invoice_applications=applications
)
if payment and payment.get('UnappliedAmt', 0) > 0:
print(f"\nNote: ${payment['UnappliedAmt']:.2f} remains unapplied")
print("This amount can be applied to future invoices or refunded")
return payment
def query_payment_methods(self):
"""Get available payment methods"""
from urllib.parse import quote
query = "SELECT * FROM PaymentMethod"
encoded_query = quote(query)
url = f"{self.base_url}/v3/company/{self.realm_id}/query?query={encoded_query}"
headers = {
"Authorization": f"Bearer {self.access_token}",
"Accept": "application/json"
}
response = requests.get(url, headers=headers)
result = response.json()
return result.get('QueryResponse', {}).get('PaymentMethod', [])
def _handle_fault(self, fault):
"""Handle fault responses"""
print(f"✗ Fault Type: {fault['type']}")
for error in fault['Error']:
print(f" Error {error['code']}: {error['Message']}")
if 'element' in error:
print(f" Element: {error['element']}")
Example 1: Apply single check to multiple invoices
Usage Examples
payment = payment_service.apply_payment_to_invoices(
customer_id='42',
check_number='1234',
check_amount=1500.00,
check_date='2024-12-09',
invoices=[
{'id': '145', 'amount_to_apply': 1000.00, 'balance': 1000.00},
{'id': '146', 'amount_to_apply': 500.00, 'balance': 750.00}
]
)
payment_service = QuickBooksPayment(realm_id='123456789', access_token='your_token')
Example 2: Partial payment on single invoice
Example 1: Apply single check to multiple invoices
partial_payment = payment_service.apply_partial_payment(
customer_id='42',
payment_amount=500.00,
payment_method_id='1', # Cash
invoice_id='147',
partial_amount=500.00 # Invoice balance is $1000, paying $500
)
payment = payment_service.apply_payment_to_invoices(
customer_id='42',
check_number='1234',
check_amount=1500.00,
check_date='2024-12-09',
invoices=[
{'id': '145', 'amount_to_apply': 1000.00, 'balance': 1000.00},
{'id': '146', 'amount_to_apply': 500.00, 'balance': 750.00}
]
)
Example 3: Payment with unapplied amount (credit for future invoices)
Example 2: Partial payment on single invoice
credit_payment = payment_service.create_payment(
customer_id='42',
total_amount=2000.00, # Customer pays $2000
payment_method_id='1',
payment_ref_num='',
txn_date='2024-12-09',
invoice_applications=[
{'invoice_id': '145', 'amount': 1000.00} # Only $1000 applied
]
# $1000 remains unapplied as credit
)
partial_payment = payment_service.apply_partial_payment(
customer_id='42',
payment_amount=500.00,
payment_method_id='1', # Cash
invoice_id='147',
partial_amount=500.00 # Invoice balance is $1000, paying $500
)
API Reference Quick Links
Example 3: Payment with unapplied amount (credit for future invoices)
Context7 Library: Use Context7 MCP to fetch latest documentation:
- Library ID:
/websites/developer_intuit_app_developer_qbo
- Use for up-to-date code examples and API changes
Official Resources:
-
- Interactive API reference with sandbox testing
- Entity-specific documentation and sample requests
-
- Manage apps, keys, webhooks
- Create sandbox companies
SDKs:
Common Endpoints:
- Base URL (Sandbox):
https://sandbox-quickbooks.api.intuit.com/v3/company/{realmId}
- Base URL (Production):
https://quickbooks.api.intuit.com/v3/company/{realmId}
- Token endpoint:
https://oauth.platform.intuit.com/oauth2/v1/tokens/bearer
- OAuth authorization:
https://appcenter.intuit.com/connect/oauth2
credit_payment = payment_service.create_payment(
customer_id='42',
total_amount=2000.00, # Customer pays $2000
payment_method_id='1',
payment_ref_num='',
txn_date='2024-12-09',
invoice_applications=[
{'invoice_id': '145', 'amount': 1000.00} # Only $1000 applied
]
# $1000 remains unapplied as credit
)
Troubleshooting Common Issues
API参考快速链接
SyncToken Mismatch (Error 3200)
Symptom: "stale object error" when updating entities
Cause: SyncToken in request doesn't match current version (concurrent modification)
Solution:
python
def safe_update_with_retry(entity_id, updates, max_attempts=3):
for attempt in range(max_attempts):
try:
# Read latest version
entity = read_entity(entity_id)
# Apply changes
entity.update(updates)
entity['sparse'] = True
# Attempt update
return update_entity(entity)
except SyncTokenError as e:
if attempt == max_attempts - 1:
raise
print(f"SyncToken mismatch, retrying... (attempt {attempt + 1})")
continue
Context7库:使用Context7 MCP获取最新文档:
- 库ID:
/websites/developer_intuit_app_developer_qbo
- 用于获取最新的代码示例和API变更
官方资源:
-
- 交互式API参考,支持沙箱测试
- 实体专属文档和示例请求
-
SDK:
常用端点:
- 沙箱基础URL:
https://sandbox-quickbooks.api.intuit.com/v3/company/{realmId}
- 生产基础URL:
https://quickbooks.api.intuit.com/v3/company/{realmId}
- 令牌端点:
https://oauth.platform.intuit.com/oauth2/v1/tokens/bearer
- OAuth授权:
https://appcenter.intuit.com/connect/oauth2
Required Field Missing (Error 6000)
常见问题排查
Symptom: "business validation error" or "required field missing"
Cause: Missing required fields like TotalAmt, CustomerRef, or entity-specific requirements
Solution:
- Check API documentation for entity-specific required fields
- For Payment: TotalAmt and CustomerRef are required
- For Invoice: CustomerRef and Line array are required
- Validate data locally before API call
Common Required Fields:
- Customer: DisplayName (must be unique)
- Invoice: CustomerRef, Line (at least one)
- Payment: TotalAmt, CustomerRef
- Item: Name, Type, IncomeAccountRef (for Service)
症状:更新实体时出现"stale object error"
原因:请求中的SyncToken与当前版本不匹配(并发修改)
解决方案:
python
def safe_update_with_retry(entity_id, updates, max_attempts=3):
for attempt in range(max_attempts):
try:
# Read latest version
entity = read_entity(entity_id)
# Apply changes
entity.update(updates)
entity['sparse'] = True
# Attempt update
return update_entity(entity)
except SyncTokenError as e:
if attempt == max_attempts - 1:
raise
print(f"SyncToken mismatch, retrying... (attempt {attempt + 1})")
continue
OAuth Token Expiration (401 Unauthorized)
缺少必填字段(错误6000)
Symptom: "invalid_token" or "token_expired" errors
Cause: Access token expired (after 3600 seconds)
Solution:
javascript
async function apiCallWithAutoRefresh(apiFunction) {
try {
return await apiFunction();
} catch (error) {
if (error.response?.status === 401) {
// Token expired, refresh
await refreshAccessToken();
// Retry with new token
return await apiFunction();
}
throw error;
}
}
Prevention:
- Implement proactive token refresh (every 50 minutes)
- Store token expiry time and check before requests
- Handle 401 responses automatically
症状:出现"business validation error"或"required field missing"
原因:缺少必填字段,例如TotalAmt、CustomerRef或实体特定的要求字段
解决方案:
- 查阅API文档获取实体专属的必填字段
- 支付:TotalAmt和CustomerRef为必填
- 发票:CustomerRef和Line数组为必填
- API调用前先在本地验证数据
常见必填字段:
- 客户:DisplayName(必须唯一)
- 发票:CustomerRef、Line(至少一个)
- 支付:TotalAmt、CustomerRef
- 商品:Name、Type、IncomeAccountRef(服务类型)
Invalid Reference (Error 3100)
OAuth令牌过期(401 Unauthorized)
Symptom: "object not found" when referencing CustomerRef, ItemRef, etc.
Cause: Referenced entity doesn't exist or was deleted
Solution:
python
def validate_reference(entity_type, entity_id):
"""Verify entity exists before creating reference"""
try:
entity = read_entity(entity_type, entity_id)
return True
except NotFoundError:
print(f"{entity_type} {entity_id} not found")
return False
症状:出现"invalid_token"或"token_expired"错误
原因:访问令牌过期(3600秒后)
解决方案:
javascript
async function apiCallWithAutoRefresh(apiFunction) {
try {
return await apiFunction();
} catch (error) {
if (error.response?.status === 401) {
// Token expired, refresh
await refreshAccessToken();
// Retry with new token
return await apiFunction();
}
throw error;
}
}
预防措施:
- 实现主动令牌刷新(每50分钟刷新一次)
- 存储令牌过期时间,请求前先检查
- 自动处理401响应
Before creating invoice
无效引用(错误3100)
if validate_reference('Customer', customer_id):
if validate_reference('Item', item_id):
create_invoice(customer_id, item_id)
症状:引用CustomerRef、ItemRef等时出现"object not found"
原因:引用的实体不存在或已被删除
解决方案:
python
def validate_reference(entity_type, entity_id):
"""Verify entity exists before creating reference"""
try:
entity = read_entity(entity_type, entity_id)
return True
except NotFoundError:
print(f"{entity_type} {entity_id} not found")
return False
Rate Limiting (429 Too Many Requests)
Before creating invoice
Symptom: "throttle_limit_exceeded" or 429 status code
Cause: Exceeded API rate limits
Solution - Exponential backoff with jitter:
python
import time
import random
def api_call_with_backoff(api_function, max_retries=5):
for attempt in range(max_retries):
try:
return api_function()
except RateLimitError:
if attempt == max_retries - 1:
raise
# Exponential backoff with jitter
delay = (2 ** attempt) + random.uniform(0, 1)
print(f"Rate limited, waiting {delay:.1f}s...")
time.sleep(delay)
Prevention:
- Use batch operations to reduce call count
- Implement request queuing with rate limiting
- Cache frequently accessed reference data
if validate_reference('Customer', customer_id):
if validate_reference('Item', item_id):
create_invoice(customer_id, item_id)
Batch Operation Failures
速率限制(429 Too Many Requests)
Symptom: Some operations in batch fail while others succeed
Cause: Each batch operation is independent; one failure doesn't affect others
Solution:
javascript
function processBatchResults(results) {
const failed = results.filter(r => r.Fault);
const succeeded = results.filter(r => !r.Fault);
console.log(`Batch: ${succeeded.length} success, ${failed.length} failed`);
// Retry failed operations individually
for (const failure of failed) {
console.log(`Retrying ${failure.bId}...`);
// Implement individual retry logic
}
return { succeeded, failed };
}
症状:出现"throttle_limit_exceeded"或429状态码
原因:超出API速率限制
解决方案 - 带抖动的指数退避:
python
import time
import random
def api_call_with_backoff(api_function, max_retries=5):
for attempt in range(max_retries):
try:
return api_function()
except RateLimitError:
if attempt == max_retries - 1:
raise
# Exponential backoff with jitter
delay = (2 ** attempt) + random.uniform(0, 1)
print(f"Rate limited, waiting {delay:.1f}s...")
time.sleep(delay)
预防措施:
- 使用批处理操作减少调用次数
- 实现带速率限制的请求队列
- 缓存频繁访问的参考数据
Multi-currency Validation Errors
批处理操作失败
Symptom: "currency not enabled" or "exchange rate required"
Cause: Multi-currency features not enabled or missing CurrencyRef
Solution:
json
{
"Invoice": {
"CurrencyRef": {
"value": "USD",
"name": "United States Dollar"
},
"ExchangeRate": 1.0
}
}
Check:
- Verify multi-currency enabled in QuickBooks company preferences
- Always include CurrencyRef when multi-currency is enabled
- For foreign currency, API calculates exchange rate automatically
症状:批处理中的部分操作失败,其他操作成功
原因:每个批处理操作都是独立的,单个失败不会影响其他操作
解决方案:
javascript
function processBatchResults(results) {
const failed = results.filter(r => r.Fault);
const succeeded = results.filter(r => !r.Fault);
console.log(`Batch: ${succeeded.length} success, ${failed.length} failed`);
// Retry failed operations individually
for (const failure of failed) {
console.log(`Retrying ${failure.bId}...`);
// Implement individual retry logic
}
return { succeeded, failed };
}
Webhook Not Receiving Notifications
多币种验证错误
Symptom: Webhook endpoint configured but not receiving POST requests
Cause: Endpoint issues, SSL problems, or slow response time
Solution:
- Verify endpoint is publicly accessible (not localhost)
- Use HTTPS (required for webhooks)
- Respond within 1 second (return 200 OK immediately, process async)
- Test with sample payload:
bash
curl -X POST https://yourapp.com/webhooks/quickbooks \
-H "Content-Type: application/json" \
-d '{"eventNotifications":[]}'
- Check webhook logs in developer dashboard
症状:出现"currency not enabled"或"exchange rate required"
原因:未启用多币种功能或缺少CurrencyRef
解决方案:
json
{
"Invoice": {
"CurrencyRef": {
"value": "USD",
"name": "United States Dollar"
},
"ExchangeRate": 1.0
}
}
检查项:
- 确认QuickBooks公司偏好设置中已启用多币种
- 启用多币种时始终包含CurrencyRef
- 对于外币,API会自动计算汇率
Deleted Entities in CDC Response
Webhook收不到通知
Symptom: Entities with status="Deleted" only contain ID
Cause: CDC returns minimal data for deleted entities
Solution:
python
def process_cdc_changes(changes):
for entity in changes:
if entity.get('status') == 'Deleted':
# Only ID available
handle_deletion(entity['Id'])
else:
# Full entity data available
process_entity_update(entity)
This skill provides comprehensive guidance for QuickBooks Online API integration. For the most current API documentation and changes, use Context7 with library ID
/websites/developer_intuit_app_developer_qbo
.
症状:已配置webhook端点,但收不到POST请求
原因:端点问题、SSL问题或响应时间过长
解决方案:
- 验证端点可公开访问(不是localhost)
- 使用HTTPS(webhook要求)
- 1秒内响应(立即返回200 OK,异步处理)
- 使用示例payload测试:
bash
curl -X POST https://yourapp.com/webhooks/quickbooks \
-H "Content-Type: application/json" \
-d '{"eventNotifications":[]}'
- 查看开发者控制台中的webhook日志
症状:标记为status="Deleted"的实体仅包含ID
原因:CDC对已删除实体仅返回最小数据
解决方案:
python
def process_cdc_changes(changes):
for entity in changes:
if entity.get('status') == 'Deleted':
# Only ID available
handle_deletion(entity['Id'])
else:
# Full entity data available
process_entity_update(entity)
本指南提供了QuickBooks Online API集成的全面指导。如需获取最新的API文档和变更,请使用库ID为
/websites/developer_intuit_app_developer_qbo
的Context7。