Loading...
Loading...
Designs reliable ETL and data synchronization jobs with incremental updates, idempotency guarantees, watermark tracking, error handling, and retry logic. Use for "ETL jobs", "data sync", "incremental sync", or "data pipeline".
npx skill4agent add patricio0312rev/skills etl-sync-job-builder// jobs/sync-users.ts
interface SyncJob {
name: string;
source: "database" | "api" | "file";
destination: "database" | "warehouse" | "s3";
schedule: string;
}
export class ETLJob {
constructor(private name: string, private watermarkKey: string) {}
async run() {
console.log(`🔄 Starting ${this.name}...`);
try {
// 1. Get last watermark
const lastSync = await this.getWatermark();
console.log(` Last sync: ${lastSync}`);
// 2. Extract data
const data = await this.extract(lastSync);
console.log(` Extracted ${data.length} records`);
// 3. Transform data
const transformed = await this.transform(data);
// 4. Load data
await this.load(transformed);
// 5. Update watermark
await this.updateWatermark(new Date());
console.log(`✅ ${this.name} complete`);
} catch (error) {
console.error(`❌ ${this.name} failed:`, error);
throw error;
}
}
private async extract(since: Date) {
// Extract logic
return [];
}
private async transform(data: any[]) {
// Transform logic
return data;
}
private async load(data: any[]) {
// Load logic
}
private async getWatermark(): Promise<Date> {
const watermark = await prisma.syncWatermark.findUnique({
where: { key: this.watermarkKey },
});
return watermark?.lastSync || new Date(0);
}
private async updateWatermark(timestamp: Date) {
await prisma.syncWatermark.upsert({
where: { key: this.watermarkKey },
create: { key: this.watermarkKey, lastSync: timestamp },
update: { lastSync: timestamp },
});
}
}// Track sync progress
model SyncWatermark {
key String @id
lastSync DateTime
metadata Json?
@@index([lastSync])
}// Incremental sync using watermark
async function syncOrdersIncremental() {
// Get last sync time
const watermark = await prisma.syncWatermark.findUnique({
where: { key: "orders_sync" },
});
const lastSync = watermark?.lastSync || new Date(0);
// Fetch only new/updated records
const newOrders = await sourceDb.order.findMany({
where: {
updated_at: { gt: lastSync },
},
orderBy: { updated_at: "asc" },
});
console.log(`📦 Syncing ${newOrders.length} orders...`);
// Process in batches
for (let i = 0; i < newOrders.length; i += 100) {
const batch = newOrders.slice(i, i + 100);
await destinationDb.order.createMany({
data: batch,
skipDuplicates: true, // Idempotency
});
}
// Update watermark to latest record
if (newOrders.length > 0) {
const latestTimestamp = newOrders[newOrders.length - 1].updated_at;
await prisma.syncWatermark.upsert({
where: { key: "orders_sync" },
create: { key: "orders_sync", lastSync: latestTimestamp },
update: { lastSync: latestTimestamp },
});
}
console.log(`✅ Sync complete`);
}// Idempotent sync - safe to run multiple times
async function syncUsersIdempotent(users: User[]) {
for (const user of users) {
await prisma.user.upsert({
where: { id: user.id },
create: user,
update: {
email: user.email,
name: user.name,
updated_at: user.updated_at,
},
});
}
}
// Batch upsert for better performance
async function syncUsersBatch(users: User[]) {
// PostgreSQL: Use ON CONFLICT
await prisma.$executeRaw`
INSERT INTO users (id, email, name, updated_at)
SELECT * FROM UNNEST(
${users.map((u) => u.id)}::bigint[],
${users.map((u) => u.email)}::text[],
${users.map((u) => u.name)}::text[],
${users.map((u) => u.updated_at)}::timestamp[]
)
ON CONFLICT (id) DO UPDATE SET
email = EXCLUDED.email,
name = EXCLUDED.name,
updated_at = EXCLUDED.updated_at
WHERE users.updated_at < EXCLUDED.updated_at
`;
}async function syncWithRetry<T>(
operation: () => Promise<T>,
maxRetries: number = 3,
baseDelay: number = 1000
): Promise<T> {
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
return await operation();
} catch (error) {
if (attempt === maxRetries) throw error;
const delay = baseDelay * Math.pow(2, attempt);
console.log(` Retry ${attempt + 1}/${maxRetries} after ${delay}ms`);
await sleep(delay);
}
}
throw new Error("Max retries exceeded");
}
// Usage
await syncWithRetry(
async () => {
return await syncOrders();
},
3,
1000
);// Listen to database changes
import { PrismaClient } from "@prisma/client";
const prisma = new PrismaClient();
// PostgreSQL: Listen to logical replication
async function setupCDC() {
await prisma.$executeRaw`
CREATE PUBLICATION orders_publication FOR TABLE orders;
`;
// Subscribe to changes (using pg library)
const client = await pg.connect();
client.query("LISTEN orders_changed;");
client.on("notification", async (msg) => {
const change = JSON.parse(msg.payload);
if (change.operation === "INSERT" || change.operation === "UPDATE") {
await syncOrder(change.data);
}
});
}interface ConflictResolution {
strategy: "source-wins" | "dest-wins" | "latest-wins" | "merge";
}
async function syncWithConflictResolution(
sourceRecord: any,
destRecord: any,
strategy: ConflictResolution["strategy"]
) {
if (strategy === "source-wins") {
return sourceRecord;
}
if (strategy === "dest-wins") {
return destRecord;
}
if (strategy === "latest-wins") {
return sourceRecord.updated_at > destRecord.updated_at
? sourceRecord
: destRecord;
}
if (strategy === "merge") {
// Merge non-null fields
return {
...destRecord,
...Object.fromEntries(
Object.entries(sourceRecord).filter(([_, v]) => v != null)
),
};
}
}// Track sync job metrics
interface SyncMetrics {
jobName: string;
startTime: Date;
endTime: Date;
recordsProcessed: number;
recordsInserted: number;
recordsUpdated: number;
recordsSkipped: number;
errors: number;
durationMs: number;
}
async function logSyncMetrics(metrics: SyncMetrics) {
await prisma.syncMetric.create({
data: metrics,
});
console.log(`
📊 Sync Metrics
Job: ${metrics.jobName}
Records: ${metrics.recordsProcessed}
Inserted: ${metrics.recordsInserted}
Updated: ${metrics.recordsUpdated}
Errors: ${metrics.errors}
Duration: ${metrics.durationMs}ms
`);
}// jobs/sync-orders-to-warehouse.ts
export class OrdersETLJob extends ETLJob {
constructor() {
super("orders-etl", "orders_warehouse_sync");
}
async extract(since: Date): Promise<Order[]> {
return prisma.order.findMany({
where: {
updated_at: { gt: since },
},
include: {
items: true,
user: true,
},
orderBy: { updated_at: "asc" },
});
}
async transform(orders: Order[]): Promise<WarehouseOrder[]> {
return orders.map((order) => ({
order_id: order.id,
user_email: order.user.email,
total_amount: order.total,
item_count: order.items.length,
status: order.status,
order_date: order.created_at,
synced_at: new Date(),
}));
}
async load(data: WarehouseOrder[]): Promise<void> {
const batchSize = 100;
for (let i = 0; i < data.length; i += batchSize) {
const batch = data.slice(i, i + batchSize);
await warehouseDb.$executeRaw`
INSERT INTO orders_fact (
order_id, user_email, total_amount, item_count,
status, order_date, synced_at
)
VALUES ${batch
.map(
(o) => `(
${o.order_id}, '${o.user_email}', ${o.total_amount},
${o.item_count}, '${o.status}', '${o.order_date}',
'${o.synced_at}'
)`
)
.join(",")}
ON CONFLICT (order_id) DO UPDATE SET
total_amount = EXCLUDED.total_amount,
status = EXCLUDED.status,
synced_at = EXCLUDED.synced_at
`;
}
}
}
// Run job
new OrdersETLJob().run();// Schedule ETL jobs
import cron from "node-cron";
// Run every hour
cron.schedule("0 * * * *", async () => {
await new OrdersETLJob().run();
});
// Run every 15 minutes
cron.schedule("*/15 * * * *", async () => {
await syncUsersIncremental();
});
// Run nightly at 2 AM
cron.schedule("0 2 * * *", async () => {
await fullDataSync();
});async function syncWithErrorHandling() {
const checkpoint = await getCheckpoint();
let processedRecords = 0;
try {
const records = await fetchRecords(checkpoint);
for (const record of records) {
try {
await processRecord(record);
processedRecords++;
// Save checkpoint every 100 records
if (processedRecords % 100 === 0) {
await saveCheckpoint(record.id);
}
} catch (error) {
// Log error but continue
console.error(`Failed to process record ${record.id}:`, error);
await logFailedRecord(record.id, error);
}
}
await saveCheckpoint("completed");
} catch (error) {
// Critical failure - job will retry from checkpoint
console.error("Job failed:", error);
throw error;
}
}