Loading...
Loading...
Expert guide for QuickBooks Online API integration covering authentication, CRUD operations, batch processing, and best practices for invoicing, payments, and customer management.
npx skill4agent add linehaul-ai/linehaulai-claude-marketplace quickbooks-online-apiAuthorization: Bearer {access_token}const oauthClient = require('intuit-oauth');
// Refresh access token
oauthClient.refresh()
.then(function(authResponse) {
const newAccessToken = authResponse.token.access_token;
const newRefreshToken = authResponse.token.refresh_token;
const expiresIn = authResponse.token.expires_in; // 3600 seconds
// Store new tokens securely (database, encrypted storage)
console.log('Tokens refreshed successfully');
})
.catch(function(e) {
console.error('Token refresh failed:', e.originalMessage);
// Handle re-authentication if refresh token is invalid
});from intuitlib.client import AuthClient
auth_client = AuthClient(
client_id='YOUR_CLIENT_ID',
client_secret='YOUR_CLIENT_SECRET',
redirect_uri='YOUR_REDIRECT_URI',
environment='sandbox' # or 'production'
)
# Refresh tokens
auth_client.refresh(refresh_token='STORED_REFRESH_TOKEN')
# Get new tokens
new_access_token = auth_client.access_token
new_refresh_token = auth_client.refresh_tokenIdDisplayNameGivenNameFamilyNameCompanyNamePrimaryEmailAddr{ "Address": "email@example.com" }PrimaryPhone{ "FreeFormNumber": "(555) 123-4567" }BillAddrShipAddrBalanceActiveSyncTokenCustomerRef{ "value": "123", "name": "Customer Name" }IdDocNumberTxnDateDueDateCustomerRef{ "value": "customerId" }LineTotalAmtBalanceEmailStatusBillEmailTxnTaxDetailLinkedTxnSyncToken{
"Line": [
{
"Amount": 100.00,
"DetailType": "SalesItemLineDetail",
"SalesItemLineDetail": {
"ItemRef": { "value": "1", "name": "Services" },
"Qty": 1,
"UnitPrice": 100.00,
"TaxCodeRef": { "value": "TAX" }
}
},
{
"Amount": 100.00,
"DetailType": "SubTotalLineDetail",
"SubTotalLineDetail": {}
}
]
}IdTotalAmtCustomerRefPaymentMethodRefPaymentRefNumTxnDateDepositToAccountRefLineUnappliedAmtSyncToken{
"Line": [
{
"Amount": 100.00,
"LinkedTxn": [
{
"TxnId": "123",
"TxnType": "Invoice"
}
]
}
]
}ServiceInventoryNonInventoryCategoryIdNameTypeDescriptionUnitPricePurchaseCostIncomeAccountRefExpenseAccountRefTrackQtyOnHandQtyOnHandActiveIdNameAccountTypeAccountSubTypeCurrentBalanceActiveClassificationBankAccounts ReceivableAccounts PayableIncomeExpenseOther Current AssetFixed AssetPOST /v3/company/{realmId}/{entityName}Authorization: Bearer {access_token}
Accept: application/json
Content-Type: application/jsonimport requests
realm_id = "YOUR_REALM_ID"
access_token = "YOUR_ACCESS_TOKEN"
url = f"https://sandbox-quickbooks.api.intuit.com/v3/company/{realm_id}/invoice"
headers = {
"Authorization": f"Bearer {access_token}",
"Accept": "application/json",
"Content-Type": "application/json"
}
invoice_data = {
"Line": [
{
"Amount": 100.00,
"DetailType": "SalesItemLineDetail",
"SalesItemLineDetail": {
"ItemRef": {"value": "1"}
}
}
],
"CustomerRef": {"value": "1"}
}
response = requests.post(url, json=invoice_data, headers=headers)
if response.status_code == 200:
invoice = response.json()['Invoice']
print(f"Invoice created: {invoice['Id']}")
else:
print(f"Error: {response.status_code} - {response.text}")GET /v3/company/{realmId}/{entityName}/{entityId}const axios = require('axios');
async function readCustomer(realmId, customerId, accessToken) {
const url = `https://sandbox-quickbooks.api.intuit.com/v3/company/${realmId}/customer/${customerId}`;
try {
const response = await axios.get(url, {
headers: {
'Authorization': `Bearer ${accessToken}`,
'Accept': 'application/json'
}
});
return response.data.Customer;
} catch (error) {
if (error.response && error.response.status === 401) {
// Token expired, refresh and retry
console.error('Authentication failed - refresh token needed');
} else {
console.error('Read failed:', error.response?.data || error.message);
}
throw error;
}
}"sparse": trueSyncTokenimport requests
def sparse_update_customer(realm_id, customer_id, sync_token, new_email, access_token):
url = f"https://sandbox-quickbooks.api.intuit.com/v3/company/{realm_id}/customer"
headers = {
"Authorization": f"Bearer {access_token}",
"Accept": "application/json",
"Content-Type": "application/json"
}
# Sparse update - only updating email
customer_data = {
"Id": customer_id,
"SyncToken": sync_token,
"sparse": True,
"PrimaryEmailAddr": {
"Address": new_email
}
}
response = requests.post(url, json=customer_data, headers=headers)
if response.status_code == 200:
updated_customer = response.json()['Customer']
print(f"Customer updated, new SyncToken: {updated_customer['SyncToken']}")
return updated_customer
else:
print(f"Update failed: {response.text}")
return None# 1. Read entity to get latest SyncToken
customer = read_customer(realm_id, customer_id, access_token)
# 2. Update with current SyncToken
updated = sparse_update_customer(
realm_id,
customer_id,
customer['SyncToken'], # Use current sync token
"newemail@example.com",
access_token
)
# 3. Store new SyncToken for next update
new_sync_token = updated['SyncToken']Active// Mark customer as inactive
const deleteCustomer = {
Id: customerId,
SyncToken: currentSyncToken,
sparse: true,
Active: false
};
// POST to update endpoint
axios.post(`${baseUrl}/customer`, deleteCustomer, { headers });POST /v3/company/{realmId}/{entityName}?operation=delete{
"Id": "123",
"SyncToken": "2"
}SELECT * FROM {EntityName} WHERE {field} {operator} '{value}'GET /v3/company/{realmId}/query?query={sqlQuery}=<><=>=INLIKE%_SELECT * FROM Customer WHERE DisplayName LIKE 'Acme%'SELECT * FROM Invoice WHERE TxnDate >= '2024-01-01' AND TxnDate <= '2024-12-31'SELECT * FROM Customer WHERE Active = true ORDERBY DisplayNameSELECT * FROM Invoice STARTPOSITION 1 MAXRESULTS 100import requests
from urllib.parse import quote
def query_invoices_by_customer(realm_id, customer_id, access_token):
query = f"SELECT * FROM Invoice WHERE CustomerRef = '{customer_id}' ORDERBY TxnDate DESC"
encoded_query = quote(query)
url = f"https://sandbox-quickbooks.api.intuit.com/v3/company/{realm_id}/query?query={encoded_query}"
headers = {
"Authorization": f"Bearer {access_token}",
"Accept": "application/json"
}
response = requests.get(url, headers=headers)
if response.status_code == 200:
result = response.json()['QueryResponse']
invoices = result.get('Invoice', [])
print(f"Found {len(invoices)} invoices")
return invoices
else:
print(f"Query failed: {response.text}")
return []%_def query_all_customers(realm_id, access_token):
all_customers = []
start_position = 1
max_results = 1000
while True:
query = f"SELECT * FROM Customer STARTPOSITION {start_position} MAXRESULTS {max_results}"
encoded_query = quote(query)
url = f"{base_url}/company/{realm_id}/query?query={encoded_query}"
response = requests.get(url, headers={"Authorization": f"Bearer {access_token}"})
result = response.json()['QueryResponse']
customers = result.get('Customer', [])
if not customers:
break
all_customers.extend(customers)
# Check if more results exist
if len(customers) < max_results:
break
start_position += max_results
return all_customersPOST /v3/company/{realmId}/batch{
"BatchItemRequest": [
{
"bId": "bid1",
"operation": "create",
"Customer": {
"DisplayName": "New Customer 1"
}
},
{
"bId": "bid2",
"operation": "update",
"Invoice": {
"Id": "123",
"SyncToken": "1",
"sparse": true,
"EmailStatus": "NeedToSend"
}
},
{
"bId": "bid3",
"operation": "query",
"Query": "SELECT * FROM Customer WHERE Active = true MAXRESULTS 10"
}
]
}bId{
"BatchItemResponse": [
{
"bId": "bid1",
"Customer": {
"Id": "456",
"DisplayName": "New Customer 1"
}
},
{
"bId": "bid2",
"Invoice": {
"Id": "123",
"SyncToken": "2"
}
},
{
"bId": "bid3",
"QueryResponse": {
"Customer": [...]
}
}
]
}async function batchUpdateCustomers(realmId, customers, accessToken) {
const batchItems = customers.map((customer, index) => ({
bId: `customer_${index}`,
operation: 'update',
Customer: {
Id: customer.Id,
SyncToken: customer.SyncToken,
sparse: true,
Active: true // Reactivate all customers
}
}));
const url = `https://sandbox-quickbooks.api.intuit.com/v3/company/${realmId}/batch`;
try {
const response = await axios.post(url, {
BatchItemRequest: batchItems
}, {
headers: {
'Authorization': `Bearer ${accessToken}`,
'Content-Type': 'application/json'
}
});
const results = response.data.BatchItemResponse;
// Process results by batch ID
results.forEach(result => {
if (result.Fault) {
console.error(`Error for ${result.bId}:`, result.Fault);
} else {
console.log(`Success for ${result.bId}: Customer ${result.Customer.Id}`);
}
});
return results;
} catch (error) {
console.error('Batch operation failed:', error.response?.data || error.message);
throw error;
}
}createupdatedeletequery<Fault>{
"Fault": {
"Error": [
{
"Message": "Duplicate Name Exists Error",
"Detail": "The name supplied already exists.",
"code": "6240",
"element": "Customer.DisplayName"
}
],
"type": "ValidationFault"
},
"time": "2024-12-09T10:30:00.000-08:00"
}| Code | Error | Solution |
|---|---|---|
| 6000 | Business validation error | Check TotalAmt and required fields |
| 3200 | Stale object (SyncToken mismatch) | Re-read entity to get latest SyncToken |
| 3100 | Invalid reference | Verify referenced entity exists (CustomerRef, ItemRef) |
| 6240 | Duplicate name | Use unique DisplayName for Customer/Item |
| 610 | Object not found | Check entity ID exists |
| 4001 | Invalid token | Refresh access token |
ValidationExceptionServiceExceptionAuthenticationExceptionBadRequestExceptionInvalidTokenExceptionInternalServiceExceptionfrom intuitlib.exceptions import AuthClientError
try:
response = requests.post(url, json=data, headers=headers)
response.raise_for_status()
# Check for fault in response body
result = response.json()
if 'Fault' in result:
fault = result['Fault']
print(f"Fault Type: {fault['type']}")
for error in fault['Error']:
print(f" Code {error['code']}: {error['Message']}")
print(f" Element: {error.get('element', 'N/A')}")
return None
return result
except requests.exceptions.HTTPError as e:
if e.response.status_code == 401:
# Token expired, refresh
print("Token expired, refreshing...")
# Implement token refresh logic
elif e.response.status_code == 429:
# Rate limited, implement backoff
print("Rate limited, backing off...")
else:
print(f"HTTP Error: {e.response.status_code}")
print(f"Response: {e.response.text}")
except AuthClientError as e:
print(f"Auth error: {str(e)}")error.codeelementmessageasync function apiCallWithRetry(apiFunction, maxRetries = 3) {
for (let attempt = 0; attempt < maxRetries; attempt++) {
try {
return await apiFunction();
} catch (error) {
const status = error.response?.status;
// Retry on server errors
if (status >= 500 && status < 600 && attempt < maxRetries - 1) {
const delay = Math.pow(2, attempt) * 1000; // 1s, 2s, 4s
console.log(`Attempt ${attempt + 1} failed, retrying in ${delay}ms...`);
await new Promise(resolve => setTimeout(resolve, delay));
continue;
}
// Don't retry on client errors
throw error;
}
}
}GET /v3/company/{realmId}/cdc?entities={entityList}&changedSince={dateTime}entitieschangedSincefrom datetime import datetime, timedelta
from urllib.parse import urlencode
def get_changed_entities(realm_id, entity_types, since_datetime, access_token):
# Format: 2024-12-01T09:00:00-07:00
changed_since = since_datetime.strftime('%Y-%m-%dT%H:%M:%S-07:00')
params = {
'entities': ','.join(entity_types),
'changedSince': changed_since
}
url = f"https://sandbox-quickbooks.api.intuit.com/v3/company/{realm_id}/cdc"
response = requests.get(
url,
params=params,
headers={"Authorization": f"Bearer {access_token}"}
)
if response.status_code == 200:
cdc_response = response.json()['CDCResponse']
# Process changed entities
for query_response in cdc_response:
entity_type = query_response.get('QueryResponse', [{}])[0]
for entity_name, entities in entity_type.items():
if entities:
for entity in entities:
status = entity.get('status', 'Updated')
if status == 'Deleted':
print(f"Deleted {entity_name}: {entity['Id']}")
else:
print(f"Changed {entity_name}: {entity['Id']}")
return cdc_response
else:
print(f"CDC request failed: {response.text}")
return None
# Usage: Get all invoices and customers changed in last 24 hours
since = datetime.now() - timedelta(hours=24)
changes = get_changed_entities(realm_id, ['Invoice', 'Customer'], since, access_token){
"CDCResponse": [
{
"QueryResponse": [
{
"Invoice": [
{
"Id": "123",
"MetaData": {
"LastUpdatedTime": "2024-12-09T10:30:00-08:00"
},
"TotalAmt": 100.00,
"Balance": 50.00
// ... full invoice object
}
]
}
]
},
{
"QueryResponse": [
{
"Customer": [
{
"Id": "456",
"status": "Deleted"
}
]
}
]
}
],
"time": "2024-12-09T11:00:00.000-08:00"
}LastUpdatedTimechangedSincestatus: "Deleted"{
"eventNotifications": [
{
"realmId": "123456789",
"dataChangeEvent": {
"entities": [
{
"name": "Invoice",
"id": "145",
"operation": "Create",
"lastUpdated": "2024-12-09T10:30:00.000Z"
},
{
"name": "Payment",
"id": "456",
"operation": "Update",
"lastUpdated": "2024-12-09T10:31:00.000Z"
},
{
"name": "Customer",
"id": "789",
"operation": "Merge",
"lastUpdated": "2024-12-09T10:32:00.000Z",
"deletedId": "788"
}
]
}
}
]
}const express = require('express');
const crypto = require('crypto');
const app = express();
app.use(express.json());
// Webhook endpoint
app.post('/webhooks/quickbooks', async (req, res) => {
// Verify webhook signature (recommended)
const signature = req.headers['intuit-signature'];
const payload = JSON.stringify(req.body);
// Return 200 immediately (process async)
res.status(200).send('OK');
// Process notifications asynchronously
processWebhook(req.body).catch(console.error);
});
async function processWebhook(notification) {
for (const event of notification.eventNotifications) {
const realmId = event.realmId;
for (const entity of event.dataChangeEvent.entities) {
console.log(`${entity.operation} on ${entity.name} ID ${entity.id}`);
// Fetch full entity data
if (entity.operation !== 'Delete') {
await fetchAndProcessEntity(realmId, entity.name, entity.id);
} else {
await handleEntityDeletion(realmId, entity.name, entity.id);
}
}
}
}
async function fetchAndProcessEntity(realmId, entityType, entityId) {
// Fetch full entity using read endpoint
const url = `https://quickbooks.api.intuit.com/v3/company/${realmId}/${entityType.toLowerCase()}/${entityId}`;
// ... implement fetch and processing logic
}| Use Case | Recommendation |
|---|---|
| Real-time sync | Webhooks |
| Periodic sync (hourly/daily) | CDC |
| Initial data load | CDC |
| Reconnection after downtime | CDC |
| High-volume changes | CDC (reduces notification overhead) |
| Low-latency requirements | Webhooks |
| Backup/redundancy | Both (webhooks primary, CDC backup) |
class QuickBooksSync:
def __init__(self):
self.last_cdc_sync = self.load_last_sync_time()
def handle_webhook(self, notification):
"""Process real-time webhook"""
for entity in notification['dataChangeEvent']['entities']:
self.process_entity_change(entity)
# Update last known change time
self.last_cdc_sync = datetime.now()
self.save_last_sync_time()
def periodic_cdc_sync(self):
"""Catch any missed changes"""
changes = get_changed_entities(
self.realm_id,
['Invoice', 'Customer', 'Payment'],
self.last_cdc_sync,
self.access_token
)
for entity in self.extract_entities(changes):
if not self.entity_exists_locally(entity):
# Missed by webhook, process now
self.process_entity_change(entity)
self.last_cdc_sync = datetime.now()
self.save_last_sync_time()def safe_update(realm_id, customer_id, changes, access_token):
max_attempts = 3
for attempt in range(max_attempts):
# Read latest version
customer = read_customer(realm_id, customer_id, access_token)
# Apply changes
customer.update(changes)
customer['sparse'] = True
# Attempt update
try:
return update_customer(realm_id, customer, access_token)
except SyncTokenError:
if attempt == max_attempts - 1:
raise
continue # Retry with fresh SyncTokenclass TokenManager {
constructor() {
this.refreshTimer = null;
}
scheduleRefresh(expiresIn) {
// Refresh 5 minutes before expiration
const refreshTime = (expiresIn - 300) * 1000;
this.refreshTimer = setTimeout(() => {
this.refreshAccessToken();
}, refreshTime);
}
async refreshAccessToken() {
try {
const newTokens = await oauthClient.refresh();
this.storeTokens(newTokens);
this.scheduleRefresh(newTokens.expires_in);
} catch (error) {
// Refresh failed, need re-authentication
this.handleReauthentication();
}
}
}def api_call_with_auto_refresh(api_function):
try:
return api_function()
except Unauthorized401Error:
# Attempt token refresh
refresh_tokens()
# Retry with new token
return api_function()def call_with_rate_limit_handling(api_function):
max_retries = 5
base_delay = 1
for attempt in range(max_retries):
try:
return api_function()
except RateLimitError as e:
if attempt == max_retries - 1:
raise
delay = base_delay * (2 ** attempt) # 1s, 2s, 4s, 8s, 16s
time.sleep(delay)
continue{
"Invoice": {
"CurrencyRef": {
"value": "USD",
"name": "United States Dollar"
}
}
}# Check if customer exists
customers = query_customers_by_email(realm_id, "customer@example.com", access_token)
if not customers:
# Create new customer
customer = create_customer(realm_id, {
"DisplayName": "Acme Corp",
"PrimaryEmailAddr": {"Address": "customer@example.com"},
"BillAddr": {
"Line1": "123 Main St",
"City": "San Francisco",
"CountrySubDivisionCode": "CA",
"PostalCode": "94105"
}
}, access_token)
else:
customer = customers[0]
customer_id = customer['Id']# Get service item
query = "SELECT * FROM Item WHERE Type = 'Service' AND Name = 'Consulting'"
items = query_entity(realm_id, query, access_token)
service_item = items[0]invoice_data = {
"TxnDate": "2024-12-09",
"DueDate": "2024-12-23",
"CustomerRef": {"value": customer_id},
"BillEmail": {"Address": "customer@example.com"},
"EmailStatus": "NeedToSend", # Mark for email sending
"Line": [
{
"Amount": 1500.00,
"DetailType": "SalesItemLineDetail",
"SalesItemLineDetail": {
"ItemRef": {"value": service_item['Id']},
"Qty": 10,
"UnitPrice": 150.00,
"TaxCodeRef": {"value": "NON"} # Non-taxable
},
"Description": "Consulting services - December 2024"
},
{
"Amount": 1500.00,
"DetailType": "SubTotalLineDetail",
"SubTotalLineDetail": {}
}
]
}
invoice = create_invoice(realm_id, invoice_data, access_token)
print(f"Invoice {invoice['DocNumber']} created: ${invoice['TotalAmt']}")# QuickBooks automatically sends email when EmailStatus is NeedToSend
# Alternatively, use send endpoint:
send_url = f"{base_url}/company/{realm_id}/invoice/{invoice['Id']}/send"
params = {"sendTo": "customer@example.com"}
response = requests.post(send_url, params=params, headers=headers)
if response.status_code == 200:
print(f"Invoice sent to {customer['PrimaryEmailAddr']['Address']}")# Check invoice status
print(f"Invoice ID: {invoice['Id']}")
print(f"Balance: ${invoice['Balance']}")
print(f"Email Status: {invoice['EmailStatus']}")
# Track linked transactions
if 'LinkedTxn' in invoice:
for linked in invoice['LinkedTxn']:
print(f"Linked {linked['TxnType']}: {linked['TxnId']}")def find_invoice_by_number(realm_id, doc_number, access_token):
query = f"SELECT * FROM Invoice WHERE DocNumber = '{doc_number}'"
invoices = query_entity(realm_id, query, access_token)
if not invoices:
raise ValueError(f"Invoice {doc_number} not found")
return invoices[0]
invoice = find_invoice_by_number(realm_id, "1045", access_token)
customer_id = invoice['CustomerRef']['value']
balance = invoice['Balance']# Get payment method (Check)
payment_methods = query_entity(realm_id, "SELECT * FROM PaymentMethod WHERE Name = 'Check'", access_token)
payment_method_id = payment_methods[0]['Id']
payment_data = {
"TotalAmt": balance, # Pay full amount
"CustomerRef": {"value": customer_id},
"PaymentMethodRef": {"value": payment_method_id},
"PaymentRefNum": "1234", # Check number
"TxnDate": "2024-12-09",
"Line": [
{
"Amount": balance,
"LinkedTxn": [
{
"TxnId": invoice['Id'],
"TxnType": "Invoice"
}
]
}
]
}
payment = create_payment(realm_id, payment_data, access_token)# Re-read invoice to see updated balance
updated_invoice = read_invoice(realm_id, invoice['Id'], access_token)
print(f"Original balance: ${balance}")
print(f"Payment amount: ${payment['TotalAmt']}")
print(f"New balance: ${updated_invoice['Balance']}")
print(f"Unapplied payment amount: ${payment.get('UnappliedAmt', 0)}")def apply_partial_payment(realm_id, invoice_id, payment_amount, customer_id, access_token):
payment_data = {
"TotalAmt": payment_amount, # Less than invoice balance
"CustomerRef": {"value": customer_id},
"Line": [
{
"Amount": payment_amount,
"LinkedTxn": [
{
"TxnId": invoice_id,
"TxnType": "Invoice"
}
]
}
]
}
payment = create_payment(realm_id, payment_data, access_token)
# Check unapplied amount
if payment['UnappliedAmt'] > 0:
print(f"Warning: ${payment['UnappliedAmt']} unapplied (overpayment or error)")
return paymentdef pay_multiple_invoices(realm_id, invoice_ids, amounts, customer_id, total_paid, access_token):
lines = []
for invoice_id, amount in zip(invoice_ids, amounts):
lines.append({
"Amount": amount,
"LinkedTxn": [{
"TxnId": invoice_id,
"TxnType": "Invoice"
}]
})
payment_data = {
"TotalAmt": total_paid,
"CustomerRef": {"value": customer_id},
"Line": lines
}
return create_payment(realm_id, payment_data, access_token)
# Example: Pay two invoices with single check
payment = pay_multiple_invoices(
realm_id,
["145", "146"], # Invoice IDs
[100.00, 50.00], # Amounts applied to each
customer_id,
150.00, # Total check amount
access_token
)def create_customer_complete(realm_id, customer_info, access_token):
customer_data = {
"DisplayName": customer_info['display_name'],
"GivenName": customer_info.get('first_name'),
"FamilyName": customer_info.get('last_name'),
"CompanyName": customer_info.get('company_name'),
"PrimaryEmailAddr": {
"Address": customer_info['email']
},
"PrimaryPhone": {
"FreeFormNumber": customer_info.get('phone')
},
"BillAddr": {
"Line1": customer_info['address_line1'],
"City": customer_info['city'],
"CountrySubDivisionCode": customer_info['state'],
"PostalCode": customer_info['zip']
},
"ShipAddr": {
"Line1": customer_info.get('ship_line1', customer_info['address_line1']),
"City": customer_info.get('ship_city', customer_info['city']),
"CountrySubDivisionCode": customer_info.get('ship_state', customer_info['state']),
"PostalCode": customer_info.get('ship_zip', customer_info['zip'])
}
}
return create_customer(realm_id, customer_data, access_token)def update_customer_email(realm_id, customer_id, new_email, access_token):
# Read current customer
customer = read_customer(realm_id, customer_id, access_token)
# Sparse update - only email
update_data = {
"Id": customer_id,
"SyncToken": customer['SyncToken'],
"sparse": True,
"PrimaryEmailAddr": {
"Address": new_email
}
}
return update_customer(realm_id, update_data, access_token)def get_customer_transactions(realm_id, customer_id, access_token):
transactions = {}
# Query invoices
invoice_query = f"SELECT * FROM Invoice WHERE CustomerRef = '{customer_id}'"
transactions['invoices'] = query_entity(realm_id, invoice_query, access_token)
# Query payments
payment_query = f"SELECT * FROM Payment WHERE CustomerRef = '{customer_id}'"
transactions['payments'] = query_entity(realm_id, payment_query, access_token)
# Query estimates
estimate_query = f"SELECT * FROM Estimate WHERE CustomerRef = '{customer_id}'"
transactions['estimates'] = query_entity(realm_id, estimate_query, access_token)
# Calculate totals
total_invoiced = sum(inv['TotalAmt'] for inv in transactions['invoices'])
total_paid = sum(pmt['TotalAmt'] for pmt in transactions['payments'])
transactions['summary'] = {
'total_invoiced': total_invoiced,
'total_paid': total_paid,
'balance': total_invoiced - total_paid
}
return transactions# Change default AR account for customer
def update_customer_ar_account(realm_id, customer_id, new_ar_account_id, access_token):
customer = read_customer(realm_id, customer_id, access_token)
update_data = {
"Id": customer_id,
"SyncToken": customer['SyncToken'],
"sparse": True,
"ARAccountRef": {
"value": new_ar_account_id
}
}
return update_customer(realm_id, update_data, access_token)from datetime import datetime, timedelta
def sync_changed_entities(realm_id, last_sync_time, access_token):
# Get changes since last sync
entity_types = ['Invoice', 'Customer', 'Payment', 'Item']
changes = get_changed_entities(realm_id, entity_types, last_sync_time, access_token)
return changesdef build_batch_updates(changes):
batch_items = []
bid_counter = 0
# Process each entity type
for entity_type in ['Customer', 'Invoice', 'Payment']:
entities = extract_entities_by_type(changes, entity_type)
for entity in entities:
if entity.get('status') == 'Deleted':
# Skip deleted entities or handle separately
continue
# Example: Mark all invoices as reviewed
if entity_type == 'Invoice':
batch_items.append({
"bId": f"invoice_{bid_counter}",
"operation": "update",
"Invoice": {
"Id": entity['Id'],
"SyncToken": entity['SyncToken'],
"sparse": True,
"PrivateNote": f"Synced at {datetime.now().isoformat()}"
}
})
bid_counter += 1
return batch_itemsasync def execute_batch_sync(realm_id, batch_items, access_token):
# Split into batches of 30 (API limit)
batch_size = 30
results = []
for i in range(0, len(batch_items), batch_size):
batch_chunk = batch_items[i:i+batch_size]
batch_request = {
"BatchItemRequest": batch_chunk
}
url = f"https://sandbox-quickbooks.api.intuit.com/v3/company/{realm_id}/batch"
response = await async_post(url, batch_request, {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
})
if response.status == 200:
batch_response = response.json()
results.extend(batch_response['BatchItemResponse'])
else:
print(f"Batch {i//batch_size + 1} failed: {response.text}")
return resultsdef process_batch_results(batch_results):
success_count = 0
error_count = 0
errors = []
for result in batch_results:
bid = result['bId']
if 'Fault' in result:
error_count += 1
fault = result['Fault']
errors.append({
'batch_id': bid,
'error_code': fault['Error'][0]['code'],
'message': fault['Error'][0]['Message']
})
print(f"Error in {bid}: {fault['Error'][0]['Message']}")
else:
success_count += 1
# Extract updated entity
entity_type = list(result.keys())[0]
if entity_type != 'bId':
entity = result[entity_type]
print(f"Success {bid}: {entity_type} {entity['Id']} updated")
summary = {
'total': len(batch_results),
'success': success_count,
'errors': error_count,
'error_details': errors
}
return summary
# Complete workflow
async def sync_workflow(realm_id, last_sync_time, access_token):
# 1. Get changes via CDC
changes = sync_changed_entities(realm_id, last_sync_time, access_token)
# 2. Build batch updates
batch_items = build_batch_updates(changes)
if not batch_items:
print("No changes to sync")
return
# 3. Execute batch
results = await execute_batch_sync(realm_id, batch_items, access_token)
# 4. Process results
summary = process_batch_results(results)
print(f"Sync complete: {summary['success']}/{summary['total']} successful")
if summary['errors'] > 0:
print(f"Errors encountered: {summary['errors']}")
for error in summary['error_details']:
print(f" {error['batch_id']}: {error['message']}")
return summaryconst OAuthClient = require('intuit-oauth');
class QuickBooksAuth {
constructor(clientId, clientSecret, redirectUri, environment = 'sandbox') {
this.oauthClient = new OAuthClient({
clientId: clientId,
clientSecret: clientSecret,
environment: environment,
redirectUri: redirectUri
});
this.accessToken = null;
this.refreshToken = null;
this.tokenExpiry = null;
this.refreshTimer = null;
}
// Store tokens after authorization
async storeTokens(authResponse) {
this.accessToken = authResponse.token.access_token;
this.refreshToken = authResponse.token.refresh_token;
// Calculate expiry time
const expiresIn = authResponse.token.expires_in; // 3600 seconds
this.tokenExpiry = Date.now() + (expiresIn * 1000);
// Schedule automatic refresh (5 minutes before expiry)
this.scheduleRefresh(expiresIn - 300);
// Persist tokens to secure storage
await this.saveToDatabase({
access_token: this.accessToken,
refresh_token: this.refreshToken,
expiry: this.tokenExpiry
});
}
// Schedule automatic token refresh
scheduleRefresh(delaySeconds) {
if (this.refreshTimer) {
clearTimeout(this.refreshTimer);
}
this.refreshTimer = setTimeout(async () => {
try {
await this.refreshAccessToken();
} catch (error) {
console.error('Scheduled token refresh failed:', error);
// Notify admin that re-authentication needed
this.notifyReauthenticationNeeded();
}
}, delaySeconds * 1000);
}
// Refresh access token
async refreshAccessToken() {
try {
// Set refresh token in client
this.oauthClient.setToken({
refresh_token: this.refreshToken
});
// Refresh
const authResponse = await this.oauthClient.refresh();
console.log('Token refreshed successfully');
// Store new tokens
await this.storeTokens(authResponse);
return authResponse;
} catch (error) {
console.error('Token refresh failed:', error.originalMessage);
// Check if refresh token is invalid
if (error.error === 'invalid_grant') {
console.error('Refresh token invalid - re-authentication required');
this.accessToken = null;
this.refreshToken = null;
throw new Error('Re-authentication required');
}
throw error;
}
}
// Get valid access token (refresh if needed)
async getAccessToken() {
// Check if token is about to expire (within 5 minutes)
const bufferTime = 5 * 60 * 1000; // 5 minutes
if (!this.accessToken || Date.now() >= (this.tokenExpiry - bufferTime)) {
console.log('Token expired or expiring soon, refreshing...');
await this.refreshAccessToken();
}
return this.accessToken;
}
// Make API call with automatic token refresh
async apiCall(url, options = {}) {
try {
const token = await this.getAccessToken();
const response = await fetch(url, {
...options,
headers: {
...options.headers,
'Authorization': `Bearer ${token}`,
'Accept': 'application/json'
}
});
// Handle 401 - token might have expired
if (response.status === 401) {
console.log('401 Unauthorized - refreshing token and retrying...');
await this.refreshAccessToken();
// Retry with new token
const newToken = await this.getAccessToken();
return fetch(url, {
...options,
headers: {
...options.headers,
'Authorization': `Bearer ${newToken}`,
'Accept': 'application/json'
}
});
}
return response;
} catch (error) {
console.error('API call failed:', error);
throw error;
}
}
// Save tokens to database (implement based on your storage)
async saveToDatabase(tokens) {
// Example: Save to database
// await db.tokens.update({ realmId }, tokens);
}
// Notify admin about re-auth requirement
notifyReauthenticationNeeded() {
// Example: Send email or notification
console.error('Re-authentication required for QuickBooks integration');
}
}
// Usage
const auth = new QuickBooksAuth(
'YOUR_CLIENT_ID',
'YOUR_CLIENT_SECRET',
'https://yourapp.com/callback',
'sandbox'
);
// After OAuth authorization
auth.storeTokens(authResponse);
// Make API calls - automatic token refresh
const response = await auth.apiCall(
'https://sandbox-quickbooks.api.intuit.com/v3/company/123/customer/456',
{ method: 'GET' }
);import requests
from datetime import datetime, timedelta
class QuickBooksInvoice:
def __init__(self, realm_id, access_token, base_url='https://sandbox-quickbooks.api.intuit.com'):
self.realm_id = realm_id
self.access_token = access_token
self.base_url = base_url
def create_invoice(self, customer_id, line_items, due_days=30, tax_code='TAX', memo=None):
"""
Create an invoice with multiple line items
Args:
customer_id: QuickBooks customer ID
line_items: List of dicts with 'item_id', 'quantity', 'unit_price', 'description'
due_days: Days until due date
tax_code: Tax code ('TAX' for taxable, 'NON' for non-taxable)
memo: Customer memo
Returns:
Created invoice dict or None if error
"""
# Calculate dates
txn_date = datetime.now().strftime('%Y-%m-%d')
due_date = (datetime.now() + timedelta(days=due_days)).strftime('%Y-%m-%d')
# Build line items
lines = []
subtotal = 0
for idx, item in enumerate(line_items, start=1):
amount = item['quantity'] * item['unit_price']
subtotal += amount
lines.append({
"LineNum": idx,
"Amount": amount,
"DetailType": "SalesItemLineDetail",
"Description": item.get('description', ''),
"SalesItemLineDetail": {
"ItemRef": {
"value": item['item_id']
},
"Qty": item['quantity'],
"UnitPrice": item['unit_price'],
"TaxCodeRef": {
"value": tax_code
}
}
})
# Add subtotal line
lines.append({
"Amount": subtotal,
"DetailType": "SubTotalLineDetail",
"SubTotalLineDetail": {}
})
# Build invoice payload
invoice_data = {
"TxnDate": txn_date,
"DueDate": due_date,
"CustomerRef": {
"value": customer_id
},
"Line": lines,
"BillEmail": {}, # Will be populated from customer
"EmailStatus": "NotSet"
}
# Add memo if provided
if memo:
invoice_data["CustomerMemo"] = {
"value": memo
}
# Make API request
url = f"{self.base_url}/v3/company/{self.realm_id}/invoice"
headers = {
"Authorization": f"Bearer {self.access_token}",
"Accept": "application/json",
"Content-Type": "application/json"
}
try:
response = requests.post(url, json=invoice_data, headers=headers)
response.raise_for_status()
# Check for fault in response
result = response.json()
if 'Fault' in result:
self._handle_fault(result['Fault'])
return None
invoice = result['Invoice']
print(f"✓ Invoice {invoice['DocNumber']} created")
print(f" Customer: {invoice['CustomerRef']['value']}")
print(f" Total: ${invoice['TotalAmt']:.2f}")
print(f" Due: {invoice['DueDate']}")
print(f" Balance: ${invoice['Balance']:.2f}")
return invoice
except requests.exceptions.HTTPError as e:
print(f"✗ HTTP Error: {e.response.status_code}")
print(f" Response: {e.response.text}")
return None
except Exception as e:
print(f"✗ Error creating invoice: {str(e)}")
return None
def send_invoice(self, invoice_id, email_address):
"""Send invoice via email"""
url = f"{self.base_url}/v3/company/{self.realm_id}/invoice/{invoice_id}/send"
headers = {
"Authorization": f"Bearer {self.access_token}",
"Accept": "application/json"
}
params = {"sendTo": email_address}
try:
response = requests.post(url, params=params, headers=headers)
response.raise_for_status()
result = response.json()
if 'Fault' in result:
self._handle_fault(result['Fault'])
return False
invoice = result['Invoice']
print(f"✓ Invoice {invoice['DocNumber']} sent to {email_address}")
print(f" Email Status: {invoice['EmailStatus']}")
return True
except Exception as e:
print(f"✗ Error sending invoice: {str(e)}")
return False
def _handle_fault(self, fault):
"""Handle fault responses"""
print(f"✗ Fault Type: {fault['type']}")
for error in fault['Error']:
print(f" Error {error['code']}: {error['Message']}")
if 'element' in error:
print(f" Element: {error['element']}")
# Usage example
invoice_manager = QuickBooksInvoice(realm_id='123456789', access_token='your_token')
# Create invoice with multiple items
invoice = invoice_manager.create_invoice(
customer_id='42',
line_items=[
{
'item_id': '1',
'quantity': 10,
'unit_price': 150.00,
'description': 'Consulting services - December 2024'
},
{
'item_id': '5',
'quantity': 1,
'unit_price': 500.00,
'description': 'Project management - December 2024'
}
],
due_days=30,
tax_code='TAX', # or 'NON' for non-taxable
memo='Thank you for your business!'
)
if invoice:
# Send invoice via email
invoice_manager.send_invoice(invoice['Id'], 'customer@example.com')const axios = require('axios');
class QuickBooksCustomer {
constructor(realmId, accessToken, baseUrl = 'https://sandbox-quickbooks.api.intuit.com') {
this.realmId = realmId;
this.accessToken = accessToken;
this.baseUrl = baseUrl;
}
async readCustomer(customerId) {
const url = `${this.baseUrl}/v3/company/${this.realmId}/customer/${customerId}`;
try {
const response = await axios.get(url, {
headers: {
'Authorization': `Bearer ${this.accessToken}`,
'Accept': 'application/json'
}
});
return response.data.Customer;
} catch (error) {
console.error('Read customer failed:', error.response?.data || error.message);
throw error;
}
}
async sparseUpdate(customerId, updates) {
// First, read customer to get current SyncToken
const customer = await this.readCustomer(customerId);
// Build sparse update payload
const updateData = {
Id: customerId,
SyncToken: customer.SyncToken,
sparse: true,
...updates
};
const url = `${this.baseUrl}/v3/company/${this.realmId}/customer`;
try {
const response = await axios.post(url, updateData, {
headers: {
'Authorization': `Bearer ${this.accessToken}`,
'Accept': 'application/json',
'Content-Type': 'application/json'
}
});
// Check for fault
if (response.data.Fault) {
this.handleFault(response.data.Fault);
return null;
}
const updatedCustomer = response.data.Customer;
console.log(`✓ Customer ${updatedCustomer.DisplayName} updated`);
console.log(` New SyncToken: ${updatedCustomer.SyncToken}`);
return updatedCustomer;
} catch (error) {
if (error.response?.status === 400) {
const fault = error.response.data.Fault;
// Handle SyncToken mismatch
if (fault.Error[0].code === '3200') {
console.log('SyncToken mismatch - retrying with fresh token...');
// Recursive retry with new token
return this.sparseUpdate(customerId, updates);
}
}
console.error('Update failed:', error.response?.data || error.message);
throw error;
}
}
// Example: Update email
async updateEmail(customerId, newEmail) {
return this.sparseUpdate(customerId, {
PrimaryEmailAddr: {
Address: newEmail
}
});
}
// Example: Update phone
async updatePhone(customerId, newPhone) {
return this.sparseUpdate(customerId, {
PrimaryPhone: {
FreeFormNumber: newPhone
}
});
}
// Example: Update billing address
async updateBillingAddress(customerId, address) {
return this.sparseUpdate(customerId, {
BillAddr: {
Line1: address.line1,
City: address.city,
CountrySubDivisionCode: address.state,
PostalCode: address.zip
}
});
}
// Example: Deactivate customer
async deactivateCustomer(customerId) {
return this.sparseUpdate(customerId, {
Active: false
});
}
// Example: Update multiple fields at once
async updateMultipleFields(customerId, updates) {
const sparseUpdates = {};
if (updates.email) {
sparseUpdates.PrimaryEmailAddr = { Address: updates.email };
}
if (updates.phone) {
sparseUpdates.PrimaryPhone = { FreeFormNumber: updates.phone };
}
if (updates.displayName) {
sparseUpdates.DisplayName = updates.displayName;
}
if (updates.notes) {
sparseUpdates.Notes = updates.notes;
}
return this.sparseUpdate(customerId, sparseUpdates);
}
handleFault(fault) {
console.error(`✗ Fault Type: ${fault.type}`);
fault.Error.forEach(error => {
console.error(` Error ${error.code}: ${error.Message}`);
if (error.element) {
console.error(` Element: ${error.element}`);
}
});
}
}
// Usage
const customerManager = new QuickBooksCustomer('123456789', 'your_access_token');
// Update email
await customerManager.updateEmail('42', 'newemail@example.com');
// Update phone
await customerManager.updatePhone('42', '(555) 987-6543');
// Update address
await customerManager.updateBillingAddress('42', {
line1: '456 New Street',
city: 'San Francisco',
state: 'CA',
zip: '94105'
});
// Update multiple fields
await customerManager.updateMultipleFields('42', {
email: 'updated@example.com',
phone: '(555) 111-2222',
displayName: 'Updated Customer Name',
notes: 'VIP customer - priority support'
});
// Deactivate customer
await customerManager.deactivateCustomer('42');import requests
from urllib.parse import quote
from datetime import datetime, timedelta
class QuickBooksQuery:
def __init__(self, realm_id, access_token, base_url='https://sandbox-quickbooks.api.intuit.com'):
self.realm_id = realm_id
self.access_token = access_token
self.base_url = base_url
def query(self, sql_query):
"""Execute SQL-like query"""
encoded_query = quote(sql_query)
url = f"{self.base_url}/v3/company/{self.realm_id}/query?query={encoded_query}"
headers = {
"Authorization": f"Bearer {self.access_token}",
"Accept": "application/json"
}
try:
response = requests.get(url, headers=headers)
response.raise_for_status()
result = response.json()
if 'Fault' in result:
print(f"Query error: {result['Fault']}")
return []
query_response = result.get('QueryResponse', {})
# Extract entities (keys vary by entity type)
for key, value in query_response.items():
if key not in ['startPosition', 'maxResults', 'totalCount']:
return value if isinstance(value, list) else []
return []
except Exception as e:
print(f"Query failed: {str(e)}")
return []
def query_invoices_by_date_range(self, start_date, end_date, customer_id=None):
"""Query invoices within date range, optionally filtered by customer"""
query = f"SELECT * FROM Invoice WHERE TxnDate >= '{start_date}' AND TxnDate <= '{end_date}'"
if customer_id:
query += f" AND CustomerRef = '{customer_id}'"
query += " ORDERBY TxnDate DESC"
invoices = self.query(query)
print(f"Found {len(invoices)} invoices between {start_date} and {end_date}")
# Calculate totals
total_amount = sum(inv['TotalAmt'] for inv in invoices)
total_balance = sum(inv['Balance'] for inv in invoices)
print(f"Total invoiced: ${total_amount:.2f}")
print(f"Outstanding balance: ${total_balance:.2f}")
return invoices
def query_overdue_invoices(self, as_of_date=None):
"""Query invoices past due date"""
if not as_of_date:
as_of_date = datetime.now().strftime('%Y-%m-%d')
query = f"SELECT * FROM Invoice WHERE Balance > '0' AND DueDate < '{as_of_date}' ORDERBY DueDate"
invoices = self.query(query)
print(f"Found {len(invoices)} overdue invoices as of {as_of_date}")
# Group by customer
by_customer = {}
for inv in invoices:
customer_id = inv['CustomerRef']['value']
if customer_id not in by_customer:
by_customer[customer_id] = {
'customer_name': inv['CustomerRef'].get('name', 'Unknown'),
'invoices': [],
'total_overdue': 0
}
by_customer[customer_id]['invoices'].append(inv)
by_customer[customer_id]['total_overdue'] += inv['Balance']
# Print summary
for customer_id, data in by_customer.items():
print(f"\nCustomer: {data['customer_name']}")
print(f" Overdue invoices: {len(data['invoices'])}")
print(f" Total overdue: ${data['total_overdue']:.2f}")
return invoices
def query_customers_by_balance(self, min_balance=0):
"""Query customers with balance greater than minimum"""
query = f"SELECT * FROM Customer WHERE Balance > '{min_balance}' ORDERBY Balance DESC"
customers = self.query(query)
print(f"Found {len(customers)} customers with balance > ${min_balance}")
total_ar = sum(cust['Balance'] for cust in customers)
print(f"Total accounts receivable: ${total_ar:.2f}")
return customers
def query_items_by_type(self, item_type='Service'):
"""Query items by type (Service, Inventory, NonInventory, Category)"""
query = f"SELECT * FROM Item WHERE Type = '{item_type}' AND Active = true ORDERBY Name"
items = self.query(query)
print(f"Found {len(items)} active {item_type} items")
return items
def search_customers_by_name(self, search_term):
"""Search customers by display name"""
query = f"SELECT * FROM Customer WHERE DisplayName LIKE '%{search_term}%' ORDERBY DisplayName"
customers = self.query(query)
print(f"Found {len(customers)} customers matching '{search_term}'")
for cust in customers:
print(f" {cust['DisplayName']} - Balance: ${cust['Balance']:.2f}")
return customers
def query_recent_payments(self, days=30):
"""Query payments from last N days"""
start_date = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d')
query = f"SELECT * FROM Payment WHERE TxnDate >= '{start_date}' ORDERBY TxnDate DESC"
payments = self.query(query)
print(f"Found {len(payments)} payments in last {days} days")
total_received = sum(pmt['TotalAmt'] for pmt in payments)
print(f"Total payments received: ${total_received:.2f}")
return payments
# Usage
query_service = QuickBooksQuery(realm_id='123456789', access_token='your_token')
# Query invoices for date range
invoices = query_service.query_invoices_by_date_range(
start_date='2024-01-01',
end_date='2024-12-31'
)
# Query invoices for specific customer
customer_invoices = query_service.query_invoices_by_date_range(
start_date='2024-01-01',
end_date='2024-12-31',
customer_id='42'
)
# Find overdue invoices
overdue = query_service.query_overdue_invoices()
# Find customers with high balances
high_balance_customers = query_service.query_customers_by_balance(min_balance=1000.00)
# Search for customer
customers = query_service.search_customers_by_name('Acme')
# Get recent payments
recent_payments = query_service.query_recent_payments(days=30)const axios = require('axios');
class QuickBooksBatch {
constructor(realmId, accessToken, baseUrl = 'https://sandbox-quickbooks.api.intuit.com') {
this.realmId = realmId;
this.accessToken = accessToken;
this.baseUrl = baseUrl;
}
async executeBatch(batchItems) {
const url = `${this.baseUrl}/v3/company/${this.realmId}/batch`;
try {
const response = await axios.post(url, {
BatchItemRequest: batchItems
}, {
headers: {
'Authorization': `Bearer ${this.accessToken}`,
'Content-Type': 'application/json',
'Accept': 'application/json'
}
});
const results = response.data.BatchItemResponse;
// Process results
const summary = {
total: results.length,
success: 0,
errors: 0,
results: []
};
results.forEach(result => {
if (result.Fault) {
summary.errors++;
console.error(`✗ Error for ${result.bId}:`);
result.Fault.Error.forEach(err => {
console.error(` ${err.code}: ${err.Message}`);
});
summary.results.push({
bId: result.bId,
status: 'error',
error: result.Fault
});
} else {
summary.success++;
// Extract entity from result
const entityType = Object.keys(result).find(k => k !== 'bId');
const entity = result[entityType];
console.log(`✓ Success for ${result.bId}: ${entityType} ${entity.Id}`);
summary.results.push({
bId: result.bId,
status: 'success',
entityType: entityType,
entity: entity
});
}
});
console.log(`\nBatch complete: ${summary.success}/${summary.total} successful`);
return summary;
} catch (error) {
console.error('Batch operation failed:', error.response?.data || error.message);
throw error;
}
}
// Batch create customers
async batchCreateCustomers(customers) {
const batchItems = customers.map((customer, index) => ({
bId: `customer_create_${index}`,
operation: 'create',
Customer: {
DisplayName: customer.displayName,
PrimaryEmailAddr: { Address: customer.email },
PrimaryPhone: { FreeFormNumber: customer.phone },
BillAddr: {
Line1: customer.address,
City: customer.city,
CountrySubDivisionCode: customer.state,
PostalCode: customer.zip
}
}
}));
return this.executeBatch(batchItems);
}
// Batch update invoices
async batchUpdateInvoices(updates) {
const batchItems = updates.map((update, index) => ({
bId: `invoice_update_${index}`,
operation: 'update',
Invoice: {
Id: update.id,
SyncToken: update.syncToken,
sparse: true,
...update.changes
}
}));
return this.executeBatch(batchItems);
}
// Batch query multiple entities
async batchQuery(queries) {
const batchItems = queries.map((query, index) => ({
bId: `query_${index}`,
operation: 'query',
Query: query.sql
}));
const results = await this.executeBatch(batchItems);
// Extract query results
const queryResults = {};
results.results.forEach(result => {
if (result.status === 'success' && result.entity.QueryResponse) {
queryResults[result.bId] = result.entity.QueryResponse;
}
});
return queryResults;
}
// Mixed batch operations
async mixedBatch(operations) {
const batchItems = operations.map((op, index) => {
const item = {
bId: `op_${index}_${op.type}`,
operation: op.operation
};
// Add entity or query data
if (op.operation === 'query') {
item.Query = op.data;
} else {
item[op.entityType] = op.data;
}
return item;
});
return this.executeBatch(batchItems);
}
}
// Usage Examples
const batchService = new QuickBooksBatch('123456789', 'your_access_token');
// Example 1: Batch create customers
const newCustomers = [
{
displayName: 'Acme Corp',
email: 'contact@acme.com',
phone: '(555) 111-1111',
address: '123 Main St',
city: 'San Francisco',
state: 'CA',
zip: '94105'
},
{
displayName: 'TechStart Inc',
email: 'hello@techstart.com',
phone: '(555) 222-2222',
address: '456 Market St',
city: 'San Francisco',
state: 'CA',
zip: '94103'
}
];
const createResults = await batchService.batchCreateCustomers(newCustomers);
// Example 2: Batch update invoices (mark as sent)
const invoiceUpdates = [
{ id: '145', syncToken: '0', changes: { EmailStatus: 'NeedToSend' } },
{ id: '146', syncToken: '0', changes: { EmailStatus: 'NeedToSend' } },
{ id: '147', syncToken: '0', changes: { EmailStatus: 'NeedToSend' } }
];
const updateResults = await batchService.batchUpdateInvoices(invoiceUpdates);
// Example 3: Batch queries
const queries = [
{ sql: 'SELECT * FROM Customer WHERE Active = true MAXRESULTS 10' },
{ sql: 'SELECT * FROM Invoice WHERE Balance > 0 MAXRESULTS 10' },
{ sql: 'SELECT * FROM Payment MAXRESULTS 10' }
];
const queryResults = await batchService.batchQuery(queries);
// Example 4: Mixed batch operations
const mixedOps = [
{
type: 'create_customer',
operation: 'create',
entityType: 'Customer',
data: {
DisplayName: 'New Customer',
PrimaryEmailAddr: { Address: 'new@example.com' }
}
},
{
type: 'update_invoice',
operation: 'update',
entityType: 'Invoice',
data: {
Id: '145',
SyncToken: '1',
sparse: true,
CustomerMemo: { value: 'Thank you!' }
}
},
{
type: 'query_items',
operation: 'query',
data: 'SELECT * FROM Item WHERE Type = \'Service\' MAXRESULTS 5'
}
];
const mixedResults = await batchService.mixedBatch(mixedOps);
console.log(`Mixed batch: ${mixedResults.success}/${mixedResults.total} successful`);import requests
class QuickBooksPayment:
def __init__(self, realm_id, access_token, base_url='https://sandbox-quickbooks.api.intuit.com'):
self.realm_id = realm_id
self.access_token = access_token
self.base_url = base_url
def create_payment(self, customer_id, total_amount, payment_method_id,
payment_ref_num, txn_date, invoice_applications):
"""
Create payment and apply to one or more invoices
Args:
customer_id: QuickBooks customer ID
total_amount: Total payment amount
payment_method_id: Payment method ID
payment_ref_num: Check number or transaction reference
txn_date: Payment date (YYYY-MM-DD)
invoice_applications: List of {'invoice_id': str, 'amount': float}
Returns:
Created payment dict or None if error
"""
# Build line items for invoice applications
lines = []
total_applied = 0
for application in invoice_applications:
lines.append({
"Amount": application['amount'],
"LinkedTxn": [
{
"TxnId": application['invoice_id'],
"TxnType": "Invoice"
}
]
})
total_applied += application['amount']
# Check for unapplied amount
unapplied = total_amount - total_applied
if unapplied < 0:
print(f"Warning: Applied amount (${total_applied}) exceeds payment (${total_amount})")
return None
# Build payment payload
payment_data = {
"TotalAmt": total_amount,
"CustomerRef": {
"value": customer_id
},
"TxnDate": txn_date,
"PaymentMethodRef": {
"value": payment_method_id
},
"PaymentRefNum": payment_ref_num,
"Line": lines
}
# Make API request
url = f"{self.base_url}/v3/company/{self.realm_id}/payment"
headers = {
"Authorization": f"Bearer {self.access_token}",
"Accept": "application/json",
"Content-Type": "application/json"
}
try:
response = requests.post(url, json=payment_data, headers=headers)
response.raise_for_status()
result = response.json()
if 'Fault' in result:
self._handle_fault(result['Fault'])
return None
payment = result['Payment']
print(f"✓ Payment created: ID {payment['Id']}")
print(f" Customer: {payment['CustomerRef']['value']}")
print(f" Total Amount: ${payment['TotalAmt']:.2f}")
print(f" Applied Amount: ${total_applied:.2f}")
print(f" Unapplied Amount: ${payment.get('UnappliedAmt', 0):.2f}")
print(f" Reference: {payment.get('PaymentRefNum', 'N/A')}")
# Show invoice applications
for line in payment['Line']:
if 'LinkedTxn' in line:
for linked in line['LinkedTxn']:
print(f" Applied ${line['Amount']:.2f} to {linked['TxnType']} {linked['TxnId']}")
return payment
except requests.exceptions.HTTPError as e:
print(f"✗ HTTP Error: {e.response.status_code}")
print(f" Response: {e.response.text}")
return None
except Exception as e:
print(f"✗ Error creating payment: {str(e)}")
return None
def apply_payment_to_invoices(self, customer_id, check_number, check_amount,
check_date, invoices):
"""
Apply a single check payment to multiple invoices
Args:
customer_id: Customer ID
check_number: Check number
check_amount: Total check amount
check_date: Check date
invoices: List of {'id': str, 'amount_to_apply': float, 'balance': float}
Returns:
Payment dict or None
"""
# Get check payment method ID
payment_methods = self.query_payment_methods()
check_method = next((pm for pm in payment_methods if pm['Name'].lower() == 'check'), None)
if not check_method:
print("Check payment method not found")
return None
# Build invoice applications
applications = []
total_to_apply = 0
for invoice in invoices:
amount = min(invoice['amount_to_apply'], invoice['balance'])
applications.append({
'invoice_id': invoice['id'],
'amount': amount
})
total_to_apply += amount
print(f"Will apply ${amount:.2f} to Invoice {invoice['id']}")
# Check if payment covers all applications
if total_to_apply > check_amount:
print(f"Warning: Total applications (${total_to_apply}) exceeds check amount (${check_amount})")
return None
# Create payment
payment = self.create_payment(
customer_id=customer_id,
total_amount=check_amount,
payment_method_id=check_method['Id'],
payment_ref_num=check_number,
txn_date=check_date,
invoice_applications=applications
)
return payment
def apply_partial_payment(self, customer_id, payment_amount, payment_method_id,
invoice_id, partial_amount):
"""Apply partial payment to invoice"""
if partial_amount > payment_amount:
print("Partial amount cannot exceed total payment")
return None
applications = [{
'invoice_id': invoice_id,
'amount': partial_amount
}]
payment = self.create_payment(
customer_id=customer_id,
total_amount=payment_amount,
payment_method_id=payment_method_id,
payment_ref_num='',
txn_date=datetime.now().strftime('%Y-%m-%d'),
invoice_applications=applications
)
if payment and payment.get('UnappliedAmt', 0) > 0:
print(f"\nNote: ${payment['UnappliedAmt']:.2f} remains unapplied")
print("This amount can be applied to future invoices or refunded")
return payment
def query_payment_methods(self):
"""Get available payment methods"""
from urllib.parse import quote
query = "SELECT * FROM PaymentMethod"
encoded_query = quote(query)
url = f"{self.base_url}/v3/company/{self.realm_id}/query?query={encoded_query}"
headers = {
"Authorization": f"Bearer {self.access_token}",
"Accept": "application/json"
}
response = requests.get(url, headers=headers)
result = response.json()
return result.get('QueryResponse', {}).get('PaymentMethod', [])
def _handle_fault(self, fault):
"""Handle fault responses"""
print(f"✗ Fault Type: {fault['type']}")
for error in fault['Error']:
print(f" Error {error['code']}: {error['Message']}")
if 'element' in error:
print(f" Element: {error['element']}")
# Usage Examples
payment_service = QuickBooksPayment(realm_id='123456789', access_token='your_token')
# Example 1: Apply single check to multiple invoices
payment = payment_service.apply_payment_to_invoices(
customer_id='42',
check_number='1234',
check_amount=1500.00,
check_date='2024-12-09',
invoices=[
{'id': '145', 'amount_to_apply': 1000.00, 'balance': 1000.00},
{'id': '146', 'amount_to_apply': 500.00, 'balance': 750.00}
]
)
# Example 2: Partial payment on single invoice
partial_payment = payment_service.apply_partial_payment(
customer_id='42',
payment_amount=500.00,
payment_method_id='1', # Cash
invoice_id='147',
partial_amount=500.00 # Invoice balance is $1000, paying $500
)
# Example 3: Payment with unapplied amount (credit for future invoices)
credit_payment = payment_service.create_payment(
customer_id='42',
total_amount=2000.00, # Customer pays $2000
payment_method_id='1',
payment_ref_num='',
txn_date='2024-12-09',
invoice_applications=[
{'invoice_id': '145', 'amount': 1000.00} # Only $1000 applied
]
# $1000 remains unapplied as credit
)/websites/developer_intuit_app_developer_qbohttps://sandbox-quickbooks.api.intuit.com/v3/company/{realmId}https://quickbooks.api.intuit.com/v3/company/{realmId}https://oauth.platform.intuit.com/oauth2/v1/tokens/bearerhttps://appcenter.intuit.com/connect/oauth2def safe_update_with_retry(entity_id, updates, max_attempts=3):
for attempt in range(max_attempts):
try:
# Read latest version
entity = read_entity(entity_id)
# Apply changes
entity.update(updates)
entity['sparse'] = True
# Attempt update
return update_entity(entity)
except SyncTokenError as e:
if attempt == max_attempts - 1:
raise
print(f"SyncToken mismatch, retrying... (attempt {attempt + 1})")
continueasync function apiCallWithAutoRefresh(apiFunction) {
try {
return await apiFunction();
} catch (error) {
if (error.response?.status === 401) {
// Token expired, refresh
await refreshAccessToken();
// Retry with new token
return await apiFunction();
}
throw error;
}
}def validate_reference(entity_type, entity_id):
"""Verify entity exists before creating reference"""
try:
entity = read_entity(entity_type, entity_id)
return True
except NotFoundError:
print(f"{entity_type} {entity_id} not found")
return False
# Before creating invoice
if validate_reference('Customer', customer_id):
if validate_reference('Item', item_id):
create_invoice(customer_id, item_id)import time
import random
def api_call_with_backoff(api_function, max_retries=5):
for attempt in range(max_retries):
try:
return api_function()
except RateLimitError:
if attempt == max_retries - 1:
raise
# Exponential backoff with jitter
delay = (2 ** attempt) + random.uniform(0, 1)
print(f"Rate limited, waiting {delay:.1f}s...")
time.sleep(delay)function processBatchResults(results) {
const failed = results.filter(r => r.Fault);
const succeeded = results.filter(r => !r.Fault);
console.log(`Batch: ${succeeded.length} success, ${failed.length} failed`);
// Retry failed operations individually
for (const failure of failed) {
console.log(`Retrying ${failure.bId}...`);
// Implement individual retry logic
}
return { succeeded, failed };
}{
"Invoice": {
"CurrencyRef": {
"value": "USD",
"name": "United States Dollar"
},
"ExchangeRate": 1.0
}
}curl -X POST https://yourapp.com/webhooks/quickbooks \
-H "Content-Type: application/json" \
-d '{"eventNotifications":[]}'def process_cdc_changes(changes):
for entity in changes:
if entity.get('status') == 'Deleted':
# Only ID available
handle_deletion(entity['Id'])
else:
# Full entity data available
process_entity_update(entity)/websites/developer_intuit_app_developer_qbo