Event Sagas

Sagas are long-running business processes that coordinate activities across multiple aggregates, external services, and time-based operations. They implement the Saga pattern for managing distributed transactions and complex workflows in event-driven systems.

What Are Sagas?

Sagas provide:

  • Workflow Orchestration: Coordinate complex business processes

  • Cross-Aggregate Operations: Handle operations spanning multiple aggregates

  • Time-Based Processing: Handle timeouts, delays, and scheduled operations

  • Error Handling: Implement compensation and retry logic

  • State Management: Maintain workflow state across multiple events

Saga Types

Eventicle supports two types of sagas:

  • Stateless Sagas: Simple event handlers without persistent state

  • Stateful Sagas: Complex workflows with persistent state and timers

Stateless Sagas

Stateless sagas react to events without maintaining state:

import { saga } from "@eventicle/eventiclejs";

export function notificationSaga() {
  return saga("NotificationSaga")
    .subscribeStreams(["orders", "users"])
    .on(
      "OrderCreated",
      async (instance, event) => {
        await sendEmail(event.payload.customerEmail, {
          subject: "Order Confirmation",
          template: "order-confirmation",
          data: { orderId: event.payload.orderId }
        });
      }
    )
    .on(
      "UserRegistered",
      async (instance, event) => {
        await sendWelcomeEmail(event.payload.email);
        await addToMailingList(event.payload.email);
      }
    );
}

async function sendEmail(email: string, options: any) {
  console.log(`📧 Sending email to ${email}: ${options.subject}`);
  // Integrate with email service
}

async function sendWelcomeEmail(email: string) {
  await sendEmail(email, {
    subject: "Welcome to our platform!",
    template: "welcome"
  });
}

async function addToMailingList(email: string) {
  console.log(`📮 Adding ${email} to mailing list`);
  // Integrate with mailing service
}

Stateful Sagas

Stateful sagas maintain state and can handle complex workflows:

import { saga, SagaInstance } from "@eventicle/eventiclejs";

interface PaymentProcessingData {
  orderId: string;
  amount: number;
  customerId: string;
  paymentMethod: string;
  attempts: number;
  status: "pending" | "processing" | "completed" | "failed";
}

type PaymentTimers = "paymentTimeout" | "retryPayment";

export function paymentProcessingSaga() {
  return saga<PaymentTimers, PaymentProcessingData>("PaymentProcessingSaga")
    .subscribeStreams(["orders", "payments"])
    .parallelEvents(5)
    .startOn(
      "OrderCreated",
      {
        withLock: (instance, event) => `payment-${event.payload.orderId}`
      },
      async (instance, event) => {
        // Initialize saga state
        instance.data = {
          orderId: event.payload.orderId,
          amount: event.payload.totalAmount,
          customerId: event.payload.customerId,
          paymentMethod: event.payload.paymentMethod,
          attempts: 0,
          status: "pending"
        };

        // Start payment processing
        await processPayment(instance);

        // Set timeout for payment processing
        instance.scheduleTimer("paymentTimeout", 5 * 60 * 1000); // 5 minutes
      }
    )
    .on(
      "PaymentSucceeded",
      {
        matchInstance: (event) => ({
          instanceProperty: "orderId",
          value: event.payload.orderId
        })
      },
      async (instance, event) => {
        instance.data.status = "completed";

        // Cancel timeout since payment succeeded
        instance.cancelTimer("paymentTimeout");

        // Complete the order
        await completeOrder(instance.data.orderId);

        // End the saga
        instance.complete();
      }
    )
    .on(
      "PaymentFailed",
      {
        matchInstance: (event) => ({
          instanceProperty: "orderId",
          value: event.payload.orderId
        })
      },
      async (instance, event) => {
        instance.data.attempts++;

        if (instance.data.attempts < 3) {
          // Retry payment after delay
          instance.scheduleTimer("retryPayment", 30 * 1000); // 30 seconds
        } else {
          // Max attempts reached, fail the order
          instance.data.status = "failed";
          await failOrder(instance.data.orderId, "Payment processing failed");
          instance.complete();
        }
      }
    )
    .onTimer("paymentTimeout", async (instance) => {
      console.log(`⏰ Payment timeout for order ${instance.data.orderId}`);

      instance.data.status = "failed";
      await failOrder(instance.data.orderId, "Payment processing timeout");
      instance.complete();
    })
    .onTimer("retryPayment", async (instance) => {
      console.log(`🔄 Retrying payment for order ${instance.data.orderId}`);
      await processPayment(instance);
    });
}

async function processPayment(instance: SagaInstance<any, PaymentProcessingData>) {
  instance.data.status = "processing";

  // Simulate payment processing
  console.log(`💳 Processing payment for order ${instance.data.orderId}`);

  // In a real system, this would integrate with payment gateway
  // await paymentGateway.processPayment({
  //   amount: instance.data.amount,
  //   paymentMethod: instance.data.paymentMethod,
  //   orderId: instance.data.orderId
  // });
}

async function completeOrder(orderId: string) {
  console.log(`✅ Completing order ${orderId}`);
  // Emit OrderCompleted event
}

async function failOrder(orderId: string, reason: string) {
  console.log(`❌ Failing order ${orderId}: ${reason}`);
  // Emit OrderFailed event
}

Saga Configuration

Event Subscriptions

// Subscribe to multiple streams
saga("MultiStreamSaga")
  .subscribeStreams(["orders", "inventory", "shipping"])

// Configure parallel processing
saga("HighVolumeSaga")
  .subscribeStreams(["transactions"])
  .parallelEvents(20) // Process up to 20 events in parallel

Instance Matching

// Match by aggregate ID
.on("OrderUpdated", {
  matchInstance: (event) => ({
    instanceProperty: "orderId",
    value: event.payload.orderId
  })
}, handler)

// Match by correlation ID
.on("PaymentProcessed", {
  matchInstance: (event) => ({
    instanceProperty: "correlationId",
    value: event.payload.correlationId
  })
}, handler)

// Custom matching logic
.on("CustomerAction", {
  matchInstance: (event) => {
    // Complex matching logic
    if (event.payload.actionType === "purchase") {
      return {
        instanceProperty: "customerId",
        value: event.payload.customerId
      };
    }
    return null; // Don't match
  }
}, handler)

Locking

Prevent concurrent processing of related events:

.startOn("OrderCreated", {
  withLock: (instance, event) => `order-processing-${event.payload.orderId}`
}, handler)

.on("PaymentReceived", {
  matchInstance: (event) => ({ instanceProperty: "orderId", value: event.payload.orderId }),
  withLock: (instance, event) => `payment-${event.payload.orderId}`
}, handler)

Timer Operations

Scheduling Timers

// Schedule a one-time timer
instance.scheduleTimer("reminderTimer", 24 * 60 * 60 * 1000); // 24 hours

// Schedule recurring timer (if needed in timer handler)
instance.scheduleTimer("recurringCheck", 60 * 1000); // 1 minute

Handling Timers

.onTimer("reminderTimer", async (instance) => {
  if (instance.data.status === "pending") {
    await sendReminder(instance.data.customerId);

    // Schedule another reminder
    instance.scheduleTimer("reminderTimer", 24 * 60 * 60 * 1000);
  }
})

.onTimer("healthCheck", async (instance) => {
  const isHealthy = await checkExternalService();

  if (!isHealthy) {
    await handleServiceFailure(instance);
  }

  // Continue health checks
  instance.scheduleTimer("healthCheck", 5 * 60 * 1000); // 5 minutes
})

Canceling Timers

.on("ProcessCompleted", { /* matching */ }, async (instance, event) => {
  // Cancel any pending timers
  instance.cancelTimer("timeoutTimer");
  instance.cancelTimer("reminderTimer");

  instance.complete();
})

Advanced Saga Patterns

Process Manager Pattern

interface OrderFulfillmentData {
  orderId: string;
  items: OrderItem[];
  reservationIds: string[];
  shipmentId?: string;
  status: "started" | "reserved" | "shipped" | "delivered" | "failed";
}

export function orderFulfillmentSaga() {
  return saga<any, OrderFulfillmentData>("OrderFulfillmentSaga")
    .subscribeStreams(["orders", "inventory", "shipping"])
    .startOn("OrderPlaced", { /* config */ }, async (instance, event) => {
      instance.data = {
        orderId: event.payload.orderId,
        items: event.payload.items,
        reservationIds: [],
        status: "started"
      };

      // Start inventory reservation for each item
      for (const item of instance.data.items) {
        await reserveInventory(item.productId, item.quantity, instance.data.orderId);
      }
    })
    .on("InventoryReserved", { /* matching */ }, async (instance, event) => {
      instance.data.reservationIds.push(event.payload.reservationId);

      // Check if all items are reserved
      if (instance.data.reservationIds.length === instance.data.items.length) {
        instance.data.status = "reserved";
        await createShipment(instance.data.orderId, instance.data.items);
      }
    })
    .on("InventoryReservationFailed", { /* matching */ }, async (instance, event) => {
      // Compensate: release any existing reservations
      for (const reservationId of instance.data.reservationIds) {
        await releaseReservation(reservationId);
      }

      instance.data.status = "failed";
      await failOrder(instance.data.orderId, "Insufficient inventory");
      instance.complete();
    })
    .on("ShipmentCreated", { /* matching */ }, async (instance, event) => {
      instance.data.shipmentId = event.payload.shipmentId;
      instance.data.status = "shipped";

      // Start tracking delivery
      instance.scheduleTimer("deliveryCheck", 24 * 60 * 60 * 1000); // Check daily
    })
    .onTimer("deliveryCheck", async (instance) => {
      const deliveryStatus = await checkDeliveryStatus(instance.data.shipmentId!);

      if (deliveryStatus === "delivered") {
        instance.data.status = "delivered";
        await completeOrder(instance.data.orderId);
        instance.complete();
      } else {
        // Continue checking
        instance.scheduleTimer("deliveryCheck", 24 * 60 * 60 * 1000);
      }
    });
}

Compensation Pattern

export function paymentWithCompensationSaga() {
  return saga<any, PaymentData>("PaymentCompensationSaga")
    .startOn("PaymentRequested", {}, async (instance, event) => {
      try {
        // Step 1: Reserve funds
        await reserveFunds(event.payload.accountId, event.payload.amount);
        instance.data.fundsReserved = true;

        // Step 2: Process payment
        await processPayment(event.payload);
        instance.data.paymentProcessed = true;

        // Step 3: Confirm transaction
        await confirmTransaction(event.payload.transactionId);

        instance.complete();

      } catch (error) {
        // Compensation: undo completed steps
        await compensatePayment(instance);
        throw error;
      }
    });
}

async function compensatePayment(instance: SagaInstance<any, PaymentData>) {
  if (instance.data.paymentProcessed) {
    await reversePayment(instance.data.transactionId);
  }

  if (instance.data.fundsReserved) {
    await releaseFunds(instance.data.accountId, instance.data.amount);
  }
}

Saga Registration and Management

Registering Sagas

import { registerSaga } from "@eventicle/eventiclejs";

// Register sagas at application startup
registerSaga(notificationSaga());
registerSaga(paymentProcessingSaga());
registerSaga(orderFulfillmentSaga());

Saga Introspection

import { allSagas, allSagaInstances, removeAllSagas } from "@eventicle/eventiclejs";

// Get all registered sagas
const sagas = allSagas();
console.log("Registered sagas:", sagas.map(s => s.name));

// Get all running saga instances
const instances = await allSagaInstances();
console.log(`${instances.length} saga instances running`);

// Remove all sagas (useful for testing)
await removeAllSagas();

Error Handling in Sagas

export function robustSaga() {
  return saga<any, any>("RobustSaga")
    .on("SomeEvent", {}, async (instance, event) => {
      try {
        await riskyOperation(event.payload);

      } catch (error) {
        console.error("Saga error:", error);

        // Increment retry count
        instance.data.retryCount = (instance.data.retryCount || 0) + 1;

        if (instance.data.retryCount < 3) {
          // Retry with exponential backoff
          const delay = Math.pow(2, instance.data.retryCount) * 1000;
          instance.scheduleTimer("retryOperation", delay);
        } else {
          // Max retries reached, handle failure
          await handleFailure(instance, error);
          instance.complete();
        }
      }
    })
    .onTimer("retryOperation", async (instance) => {
      // Retry the operation
      await riskyOperation(instance.data.originalPayload);
    });
}

Testing Sagas

import {
  mockSagasExceptFor,
  allSagaInstances,
  consumeFullEventLog
} from "@eventicle/eventiclejs";

describe("PaymentProcessingSaga", () => {
  beforeEach(async () => {
    await mockSagasExceptFor(["PaymentProcessingSaga"]);
  });

  it("should complete payment successfully", async () => {
    // Emit events
    await eventClient().emit([{
      type: "OrderCreated",
      stream: "orders",
      domainId: "order-123",
      payload: {
        orderId: "order-123",
        totalAmount: 100,
        customerId: "customer-456",
        paymentMethod: "credit-card"
      }
    }]);

    // Wait for saga processing
    await consumeFullEventLog();

    // Check saga state
    const instances = await allSagaInstances();
    const paymentSaga = instances.find(i => i.sagaName === "PaymentProcessingSaga");

    expect(paymentSaga.data.status).toBe("processing");
    expect(paymentSaga.data.orderId).toBe("order-123");
  });
});

Best Practices

  1. Keep Sagas Focused: Each saga should handle one business process

  2. Handle Failures: Always implement error handling and compensation

  3. Use Timeouts: Set reasonable timeouts for external operations

  4. Minimize State: Keep saga state as small as possible

  5. Idempotency: Make saga operations idempotent

  6. Testing: Test both happy path and failure scenarios

  7. Monitoring: Add logging and metrics for saga operations

Next Steps