Event Sagas
Sagas are long-running business processes that coordinate activities across multiple aggregates, external services, and time-based operations. They implement the Saga pattern for managing distributed transactions and complex workflows in event-driven systems.
What Are Sagas?
Sagas provide:
-
Workflow Orchestration: Coordinate complex business processes
-
Cross-Aggregate Operations: Handle operations spanning multiple aggregates
-
Time-Based Processing: Handle timeouts, delays, and scheduled operations
-
Error Handling: Implement compensation and retry logic
-
State Management: Maintain workflow state across multiple events
Saga Types
Eventicle supports two types of sagas:
-
Stateless Sagas: Simple event handlers without persistent state
-
Stateful Sagas: Complex workflows with persistent state and timers
Stateless Sagas
Stateless sagas react to events without maintaining state:
import { saga } from "@eventicle/eventiclejs";
export function notificationSaga() {
return saga("NotificationSaga")
.subscribeStreams(["orders", "users"])
.on(
"OrderCreated",
async (instance, event) => {
await sendEmail(event.payload.customerEmail, {
subject: "Order Confirmation",
template: "order-confirmation",
data: { orderId: event.payload.orderId }
});
}
)
.on(
"UserRegistered",
async (instance, event) => {
await sendWelcomeEmail(event.payload.email);
await addToMailingList(event.payload.email);
}
);
}
async function sendEmail(email: string, options: any) {
console.log(`📧 Sending email to ${email}: ${options.subject}`);
// Integrate with email service
}
async function sendWelcomeEmail(email: string) {
await sendEmail(email, {
subject: "Welcome to our platform!",
template: "welcome"
});
}
async function addToMailingList(email: string) {
console.log(`📮 Adding ${email} to mailing list`);
// Integrate with mailing service
}
Stateful Sagas
Stateful sagas maintain state and can handle complex workflows:
import { saga, SagaInstance } from "@eventicle/eventiclejs";
interface PaymentProcessingData {
orderId: string;
amount: number;
customerId: string;
paymentMethod: string;
attempts: number;
status: "pending" | "processing" | "completed" | "failed";
}
type PaymentTimers = "paymentTimeout" | "retryPayment";
export function paymentProcessingSaga() {
return saga<PaymentTimers, PaymentProcessingData>("PaymentProcessingSaga")
.subscribeStreams(["orders", "payments"])
.parallelEvents(5)
.startOn(
"OrderCreated",
{
withLock: (instance, event) => `payment-${event.payload.orderId}`
},
async (instance, event) => {
// Initialize saga state
instance.data = {
orderId: event.payload.orderId,
amount: event.payload.totalAmount,
customerId: event.payload.customerId,
paymentMethod: event.payload.paymentMethod,
attempts: 0,
status: "pending"
};
// Start payment processing
await processPayment(instance);
// Set timeout for payment processing
instance.scheduleTimer("paymentTimeout", 5 * 60 * 1000); // 5 minutes
}
)
.on(
"PaymentSucceeded",
{
matchInstance: (event) => ({
instanceProperty: "orderId",
value: event.payload.orderId
})
},
async (instance, event) => {
instance.data.status = "completed";
// Cancel timeout since payment succeeded
instance.cancelTimer("paymentTimeout");
// Complete the order
await completeOrder(instance.data.orderId);
// End the saga
instance.complete();
}
)
.on(
"PaymentFailed",
{
matchInstance: (event) => ({
instanceProperty: "orderId",
value: event.payload.orderId
})
},
async (instance, event) => {
instance.data.attempts++;
if (instance.data.attempts < 3) {
// Retry payment after delay
instance.scheduleTimer("retryPayment", 30 * 1000); // 30 seconds
} else {
// Max attempts reached, fail the order
instance.data.status = "failed";
await failOrder(instance.data.orderId, "Payment processing failed");
instance.complete();
}
}
)
.onTimer("paymentTimeout", async (instance) => {
console.log(`⏰ Payment timeout for order ${instance.data.orderId}`);
instance.data.status = "failed";
await failOrder(instance.data.orderId, "Payment processing timeout");
instance.complete();
})
.onTimer("retryPayment", async (instance) => {
console.log(`🔄 Retrying payment for order ${instance.data.orderId}`);
await processPayment(instance);
});
}
async function processPayment(instance: SagaInstance<any, PaymentProcessingData>) {
instance.data.status = "processing";
// Simulate payment processing
console.log(`💳 Processing payment for order ${instance.data.orderId}`);
// In a real system, this would integrate with payment gateway
// await paymentGateway.processPayment({
// amount: instance.data.amount,
// paymentMethod: instance.data.paymentMethod,
// orderId: instance.data.orderId
// });
}
async function completeOrder(orderId: string) {
console.log(`✅ Completing order ${orderId}`);
// Emit OrderCompleted event
}
async function failOrder(orderId: string, reason: string) {
console.log(`❌ Failing order ${orderId}: ${reason}`);
// Emit OrderFailed event
}
Saga Configuration
Event Subscriptions
// Subscribe to multiple streams
saga("MultiStreamSaga")
.subscribeStreams(["orders", "inventory", "shipping"])
// Configure parallel processing
saga("HighVolumeSaga")
.subscribeStreams(["transactions"])
.parallelEvents(20) // Process up to 20 events in parallel
Instance Matching
// Match by aggregate ID
.on("OrderUpdated", {
matchInstance: (event) => ({
instanceProperty: "orderId",
value: event.payload.orderId
})
}, handler)
// Match by correlation ID
.on("PaymentProcessed", {
matchInstance: (event) => ({
instanceProperty: "correlationId",
value: event.payload.correlationId
})
}, handler)
// Custom matching logic
.on("CustomerAction", {
matchInstance: (event) => {
// Complex matching logic
if (event.payload.actionType === "purchase") {
return {
instanceProperty: "customerId",
value: event.payload.customerId
};
}
return null; // Don't match
}
}, handler)
Locking
Prevent concurrent processing of related events:
.startOn("OrderCreated", {
withLock: (instance, event) => `order-processing-${event.payload.orderId}`
}, handler)
.on("PaymentReceived", {
matchInstance: (event) => ({ instanceProperty: "orderId", value: event.payload.orderId }),
withLock: (instance, event) => `payment-${event.payload.orderId}`
}, handler)
Timer Operations
Scheduling Timers
// Schedule a one-time timer
instance.scheduleTimer("reminderTimer", 24 * 60 * 60 * 1000); // 24 hours
// Schedule recurring timer (if needed in timer handler)
instance.scheduleTimer("recurringCheck", 60 * 1000); // 1 minute
Handling Timers
.onTimer("reminderTimer", async (instance) => {
if (instance.data.status === "pending") {
await sendReminder(instance.data.customerId);
// Schedule another reminder
instance.scheduleTimer("reminderTimer", 24 * 60 * 60 * 1000);
}
})
.onTimer("healthCheck", async (instance) => {
const isHealthy = await checkExternalService();
if (!isHealthy) {
await handleServiceFailure(instance);
}
// Continue health checks
instance.scheduleTimer("healthCheck", 5 * 60 * 1000); // 5 minutes
})
Advanced Saga Patterns
Process Manager Pattern
interface OrderFulfillmentData {
orderId: string;
items: OrderItem[];
reservationIds: string[];
shipmentId?: string;
status: "started" | "reserved" | "shipped" | "delivered" | "failed";
}
export function orderFulfillmentSaga() {
return saga<any, OrderFulfillmentData>("OrderFulfillmentSaga")
.subscribeStreams(["orders", "inventory", "shipping"])
.startOn("OrderPlaced", { /* config */ }, async (instance, event) => {
instance.data = {
orderId: event.payload.orderId,
items: event.payload.items,
reservationIds: [],
status: "started"
};
// Start inventory reservation for each item
for (const item of instance.data.items) {
await reserveInventory(item.productId, item.quantity, instance.data.orderId);
}
})
.on("InventoryReserved", { /* matching */ }, async (instance, event) => {
instance.data.reservationIds.push(event.payload.reservationId);
// Check if all items are reserved
if (instance.data.reservationIds.length === instance.data.items.length) {
instance.data.status = "reserved";
await createShipment(instance.data.orderId, instance.data.items);
}
})
.on("InventoryReservationFailed", { /* matching */ }, async (instance, event) => {
// Compensate: release any existing reservations
for (const reservationId of instance.data.reservationIds) {
await releaseReservation(reservationId);
}
instance.data.status = "failed";
await failOrder(instance.data.orderId, "Insufficient inventory");
instance.complete();
})
.on("ShipmentCreated", { /* matching */ }, async (instance, event) => {
instance.data.shipmentId = event.payload.shipmentId;
instance.data.status = "shipped";
// Start tracking delivery
instance.scheduleTimer("deliveryCheck", 24 * 60 * 60 * 1000); // Check daily
})
.onTimer("deliveryCheck", async (instance) => {
const deliveryStatus = await checkDeliveryStatus(instance.data.shipmentId!);
if (deliveryStatus === "delivered") {
instance.data.status = "delivered";
await completeOrder(instance.data.orderId);
instance.complete();
} else {
// Continue checking
instance.scheduleTimer("deliveryCheck", 24 * 60 * 60 * 1000);
}
});
}
Compensation Pattern
export function paymentWithCompensationSaga() {
return saga<any, PaymentData>("PaymentCompensationSaga")
.startOn("PaymentRequested", {}, async (instance, event) => {
try {
// Step 1: Reserve funds
await reserveFunds(event.payload.accountId, event.payload.amount);
instance.data.fundsReserved = true;
// Step 2: Process payment
await processPayment(event.payload);
instance.data.paymentProcessed = true;
// Step 3: Confirm transaction
await confirmTransaction(event.payload.transactionId);
instance.complete();
} catch (error) {
// Compensation: undo completed steps
await compensatePayment(instance);
throw error;
}
});
}
async function compensatePayment(instance: SagaInstance<any, PaymentData>) {
if (instance.data.paymentProcessed) {
await reversePayment(instance.data.transactionId);
}
if (instance.data.fundsReserved) {
await releaseFunds(instance.data.accountId, instance.data.amount);
}
}
Saga Registration and Management
Registering Sagas
import { registerSaga } from "@eventicle/eventiclejs";
// Register sagas at application startup
registerSaga(notificationSaga());
registerSaga(paymentProcessingSaga());
registerSaga(orderFulfillmentSaga());
Saga Introspection
import { allSagas, allSagaInstances, removeAllSagas } from "@eventicle/eventiclejs";
// Get all registered sagas
const sagas = allSagas();
console.log("Registered sagas:", sagas.map(s => s.name));
// Get all running saga instances
const instances = await allSagaInstances();
console.log(`${instances.length} saga instances running`);
// Remove all sagas (useful for testing)
await removeAllSagas();
Error Handling in Sagas
export function robustSaga() {
return saga<any, any>("RobustSaga")
.on("SomeEvent", {}, async (instance, event) => {
try {
await riskyOperation(event.payload);
} catch (error) {
console.error("Saga error:", error);
// Increment retry count
instance.data.retryCount = (instance.data.retryCount || 0) + 1;
if (instance.data.retryCount < 3) {
// Retry with exponential backoff
const delay = Math.pow(2, instance.data.retryCount) * 1000;
instance.scheduleTimer("retryOperation", delay);
} else {
// Max retries reached, handle failure
await handleFailure(instance, error);
instance.complete();
}
}
})
.onTimer("retryOperation", async (instance) => {
// Retry the operation
await riskyOperation(instance.data.originalPayload);
});
}
Testing Sagas
import {
mockSagasExceptFor,
allSagaInstances,
consumeFullEventLog
} from "@eventicle/eventiclejs";
describe("PaymentProcessingSaga", () => {
beforeEach(async () => {
await mockSagasExceptFor(["PaymentProcessingSaga"]);
});
it("should complete payment successfully", async () => {
// Emit events
await eventClient().emit([{
type: "OrderCreated",
stream: "orders",
domainId: "order-123",
payload: {
orderId: "order-123",
totalAmount: 100,
customerId: "customer-456",
paymentMethod: "credit-card"
}
}]);
// Wait for saga processing
await consumeFullEventLog();
// Check saga state
const instances = await allSagaInstances();
const paymentSaga = instances.find(i => i.sagaName === "PaymentProcessingSaga");
expect(paymentSaga.data.status).toBe("processing");
expect(paymentSaga.data.orderId).toBe("order-123");
});
});
Best Practices
-
Keep Sagas Focused: Each saga should handle one business process
-
Handle Failures: Always implement error handling and compensation
-
Use Timeouts: Set reasonable timeouts for external operations
-
Minimize State: Keep saga state as small as possible
-
Idempotency: Make saga operations idempotent
-
Testing: Test both happy path and failure scenarios
-
Monitoring: Add logging and metrics for saga operations
Next Steps
-
Learn about Building Sagas for Workflows
-
Explore Testing Sagas thoroughly
-
Understand Scheduling and Timers