Loading...
Loading...
Create and configure Inngest durable functions. Covers triggers (events, cron, invoke), step execution and memoization, idempotency, cancellation, error handling, retries, logging, and observability.
npx skill4agent add inngest/inngest-skills inngest-durable-functionsThese skills are focused on TypeScript. For Python or Go, refer to the Inngest documentation for language-specific guidance. Core concepts apply across all languages.
// ❌ BAD: Non-deterministic logic outside steps
async ({ event, step }) => {
const timestamp = Date.now(); // This runs multiple times!
const result = await step.run("process-data", () => {
return processData(event.data);
});
};
// ✅ GOOD: All non-deterministic logic in steps
async ({ event, step }) => {
const result = await step.run("process-with-timestamp", () => {
const timestamp = Date.now(); // Only runs once
return processData(event.data, timestamp);
});
};step.invoke()step.sendEvent()step.run()step.run()const processOrder = inngest.createFunction(
{
id: "process-order", // Unique, never change this
retries: 4, // Default: 4 retries per step
concurrency: 10 // Max concurrent executions
},
{ event: "order/created" }, // Trigger
async ({ event, step }) => {
// Your durable workflow
}
);// Step IDs can be reused - Inngest handles counters automatically
const data = await step.run("fetch-data", () => fetchUserData());
const more = await step.run("fetch-data", () => fetchOrderData()); // Different execution
// Use descriptive IDs for clarity
await step.run("validate-payment", () => validatePayment(event.data.paymentId));
await step.run("charge-customer", () => chargeCustomer(event.data));
await step.run("send-confirmation", () => sendEmail(event.data.email));// Single event trigger
{ event: "user/signup" }
// Event with conditional filter
{
event: "user/action",
if: 'event.data.action == "purchase" && event.data.amount > 100'
}
// Multiple triggers (up to 10)
[
{ event: "user/signup" },
{ event: "user/login", if: 'event.data.firstLogin == true' },
{ cron: "0 9 * * *" } // Daily at 9 AM
]// Basic cron
{
cron: "0 */6 * * *";
} // Every 6 hours
// With timezone
{
cron: "TZ=Europe/Paris 0 12 * * 5";
} // Fridays at noon Paris time
// Combine with events
[
{ event: "manual/report.requested" },
{ cron: "0 0 * * 0" } // Weekly on Sunday
];// Invoke another function as a step
const result = await step.invoke("generate-report", {
function: generateReportFunction,
data: { userId: event.data.userId }
});
// Use returned data
await step.run("process-report", () => {
return processReport(result);
});// Prevent duplicate events with custom ID
await inngest.send({
id: `checkout-completed-${cartId}`, // 24-hour deduplication
name: "cart/checkout.completed",
data: { cartId, email: "user@example.com" }
});const sendEmail = inngest.createFunction(
{
id: "send-checkout-email",
// Only run once per cartId per 24 hours
idempotency: "event.data.cartId"
},
{ event: "cart/checkout.completed" },
async ({ event, step }) => {
// This function won't run twice for same cartId
}
);
// Complex idempotency keys
const processUserAction = inngest.createFunction(
{
id: "process-user-action",
// Unique per user + organization combination
idempotency: 'event.data.userId + "-" + event.data.organizationId'
},
{ event: "user/action.performed" },
async ({ event, step }) => {
/* ... */
}
);eventasyncconst processOrder = inngest.createFunction(
{
id: "process-order",
cancelOn: [
{
event: "order/cancelled",
if: "event.data.orderId == async.data.orderId"
}
]
},
{ event: "order/created" },
async ({ event, step }) => {
await step.sleepUntil("wait-for-payment", event.data.paymentDue);
// Will be cancelled if order/cancelled event received
await step.run("charge-payment", () => processPayment(event.data));
}
);const processWithTimeout = inngest.createFunction(
{
id: "process-with-timeout",
timeouts: {
start: "5m", // Cancel if not started within 5 minutes
run: "30m" // Cancel if running longer than 30 minutes
}
},
{ event: "long/process.requested" },
async ({ event, step }) => {
/* ... */
}
);// Listen for cancellation events
const cleanupCancelled = inngest.createFunction(
{ id: "cleanup-cancelled-process" },
{ event: "inngest/function.cancelled" },
async ({ event, step }) => {
if (event.data.function_id === "process-order") {
await step.run("cleanup-resources", () => {
return cleanupOrderResources(event.data.run_id);
});
}
}
);const reliableFunction = inngest.createFunction(
{
id: "reliable-function",
retries: 10 // Up to 10 retries per step
},
{ event: "critical/task" },
async ({ event, step, attempt }) => {
// Access attempt number (0-indexed)
if (attempt > 5) {
// Different logic for later attempts
}
}
);import { NonRetriableError } from "inngest";
const processUser = inngest.createFunction(
{ id: "process-user" },
{ event: "user/process.requested" },
async ({ event, step }) => {
const user = await step.run("fetch-user", async () => {
const user = await db.users.findOne(event.data.userId);
if (!user) {
// Don't retry - user doesn't exist
throw new NonRetriableError("User not found, stopping execution");
}
return user;
});
// Continue processing...
}
);import { RetryAfterError } from "inngest";
const respectRateLimit = inngest.createFunction(
{ id: "api-call" },
{ event: "api/call.requested" },
async ({ event, step }) => {
await step.run("call-api", async () => {
const response = await externalAPI.call(event.data);
if (response.status === 429) {
// Retry after specific time from API
const retryAfter = response.headers["retry-after"];
throw new RetryAfterError("Rate limited", `${retryAfter}s`);
}
return response.data;
});
}
);import winston from "winston";
// Configure logger
const logger = winston.createLogger({
level: "info",
format: winston.format.json(),
transports: [new winston.transports.Console()]
});
const inngest = new Inngest({
id: "my-app",
logger // Pass logger to client
});const processData = inngest.createFunction(
{ id: "process-data" },
{ event: "data/process.requested" },
async ({ event, step, logger }) => {
// ✅ GOOD: Log inside steps to avoid duplicates
const result = await step.run("fetch-data", async () => {
logger.info("Fetching data for user", { userId: event.data.userId });
return await fetchUserData(event.data.userId);
});
// ❌ AVOID: Logging outside steps can duplicate
// logger.info("Processing complete"); // This could run multiple times!
await step.run("log-completion", async () => {
logger.info("Processing complete", { resultCount: result.length });
});
}
);// Enable checkpointing for lower latency
const realTimeFunction = inngest.createFunction(
{
id: "real-time-function",
checkpointing: {
maxRuntime: "300s", // Max continuous execution time
bufferedSteps: 2, // Buffer 2 steps before checkpointing
maxInterval: "10s" // Max wait before checkpoint
}
},
{ event: "realtime/process" },
async ({ event, step }) => {
// Steps execute immediately with periodic checkpointing
const result1 = await step.run("step-1", () => process1(event.data));
const result2 = await step.run("step-2", () => process2(result1));
return { result2 };
}
);const conditionalProcess = inngest.createFunction(
{ id: "conditional-process" },
{ event: "process/conditional" },
async ({ event, step }) => {
const userData = await step.run("fetch-user", () => {
return getUserData(event.data.userId);
});
// Conditional step execution
if (userData.isPremium) {
await step.run("premium-processing", () => {
return processPremiumFeatures(userData);
});
}
// Always runs
await step.run("standard-processing", () => {
return processStandardFeatures(userData);
});
}
);const robustProcess = inngest.createFunction(
{ id: "robust-process" },
{ event: "process/robust" },
async ({ event, step }) => {
let primaryResult;
try {
primaryResult = await step.run("primary-service", () => {
return callPrimaryService(event.data);
});
} catch (error) {
// Fallback to secondary service
primaryResult = await step.run("fallback-service", () => {
return callSecondaryService(event.data);
});
}
return { result: primaryResult };
}
);inngest-events