Scheduling and Timers

The Eventicle scheduler system provides robust timing capabilities for sagas, enabling delayed operations, timeouts, and complex time-based workflows. The scheduler ensures reliable execution of timed operations even in distributed environments.

Overview

The scheduler system provides:

  • Timer Management: Schedule and cancel timers for saga operations

  • Serialized Execution: Ensure saga events and timers are processed in order

  • Reliability: Persistent timers that survive application restarts

  • Scalability: Distributed timer processing across multiple instances

  • Flexibility: Multiple scheduler implementations for different needs

Scheduler Types

Eventicle supports multiple scheduler implementations:

  • Local Scheduler: In-memory scheduling for development and testing

  • BullMQ Scheduler: Redis-backed distributed scheduling for production

  • BullMQ Pro Scheduler: Enhanced version with advanced serialization guarantees

Local Scheduler

The local scheduler uses in-memory timers and is suitable for development:

import {
  LocalScheduleJobRunner,
  setScheduler
} from "@eventicle/eventiclejs";

// Configure local scheduler
const localScheduler = new LocalScheduleJobRunner();
setScheduler(localScheduler);

// Sagas can now use timers
export function orderTimeoutSaga() {
  return saga<"orderTimeout", OrderData>("OrderTimeoutSaga")
    .startOn("OrderCreated", {}, async (instance, event) => {
      instance.data = {
        orderId: event.payload.orderId,
        customerId: event.payload.customerId,
        createdAt: event.payload.createdAt
      };

      // Schedule timeout in 30 minutes
      instance.scheduleTimer("orderTimeout", 30 * 60 * 1000);
    })
    .on("OrderCompleted", {
      matchInstance: (event) => ({
        instanceProperty: "orderId",
        value: event.payload.orderId
      })
    }, async (instance, event) => {
      // Cancel timeout since order completed
      instance.cancelTimer("orderTimeout");
      instance.complete();
    })
    .onTimer("orderTimeout", async (instance) => {
      // Handle timeout - perhaps cancel the order
      await cancelOrder(instance.data.orderId, "Order timeout");
      instance.complete();
    });
}

Local Scheduler Limitations

  • No Persistence: Timers are lost on application restart

  • Single Instance: Cannot distribute across multiple processes

  • Memory Bound: Limited by available memory

BullMQ Scheduler

For production environments, use the BullMQ scheduler with Redis:

import {
  BullMQScheduleJobRunner,
  setScheduler
} from "@eventicle/eventiclejs";
import Redis from "ioredis";

// Configure Redis connection
const redis = new Redis({
  host: "localhost",
  port: 6379,
  maxRetriesPerRequest: 3
});

// Configure BullMQ scheduler
const bullMQScheduler = new BullMQScheduleJobRunner({
  connection: redis,
  queueName: "saga-timers",
  concurrency: 10
});

setScheduler(bullMQScheduler);

// Start the scheduler
await bullMQScheduler.startup();

BullMQ Features

  • Persistence: Timers stored in Redis

  • Distribution: Multiple workers can process timers

  • Reliability: Automatic retry and failure handling

  • Monitoring: Built-in job monitoring and metrics

Saga Scheduler (Advanced)

The Saga Scheduler provides enhanced serialization guarantees for complex workflows:

interface SagaScheduler {
  // Handle saga events with guaranteed serialization
  sagaHandleEvent(
    saga: Saga<any, any>,
    event: EventicleEvent,
    instanceId: string
  ): Promise<void>;

  // Handle timer events with serialization
  handleTimer(
    saga: Saga<any, any>,
    name: string,
    data: { instanceId: string }
  ): Promise<void>;
}

Why Serialization Matters

Without proper serialization, saga instances can have race conditions:

// Problematic scenario without serialization:
// 1. Timer fires: "processPayment"
// 2. Event arrives: "PaymentReceived"
// 3. Both execute simultaneously on same saga instance
// 4. Race condition: timer might cancel payment that already succeeded

// With saga scheduler:
// 1. Timer queued with instance ID
// 2. Event queued with same instance ID
// 3. Queue processes one at a time per instance
// 4. No race conditions

Custom Saga Scheduler Implementation

import { SagaScheduler } from "@eventicle/eventiclejs";
import { QueuePro, WorkerPro } from "bullmq-pro";

export class CustomSagaScheduler implements SagaScheduler {
  private queue: QueuePro;
  private worker: WorkerPro;

  constructor(private redisConfig: any) {}

  async startup() {
    this.queue = new QueuePro("saga-scheduler", {
      connection: this.redisConfig,
      defaultJobOptions: {
        removeOnComplete: 100,
        removeOnFail: 50
      }
    });

    this.worker = new WorkerPro(
      "saga-scheduler",
      async (job) => {
        const saga = findSaga(job.data.sagaName);

        if (job.data.type === "timer") {
          await this.executeTimer(saga, job.data);
        } else {
          await this.executeEvent(saga, job.data);
        }
      },
      {
        // Critical: Force serialization per saga instance
        group: {
          concurrency: 1  // One job per group (instance) at a time
        },
        concurrency: 200,  // But allow many different instances
        connection: this.redisConfig
      }
    );
  }

  async sagaHandleEvent(
    saga: Saga<any, any>,
    event: EventicleEvent,
    instanceId: string
  ): Promise<void> {
    await this.queue.add(
      "saga-event",
      {
        type: "event",
        sagaName: saga.name,
        event,
        instanceId
      },
      {
        // Group by instance ID for serialization
        group: { id: instanceId },
        // Add delay if needed for ordering
        delay: 0
      }
    );
  }

  async handleTimer(
    saga: Saga<any, any>,
    name: string,
    data: { instanceId: string }
  ): Promise<void> {
    await this.queue.add(
      "saga-timer",
      {
        type: "timer",
        sagaName: saga.name,
        timerName: name,
        instanceId: data.instanceId
      },
      {
        // Same group ensures serialization with events
        group: { id: data.instanceId }
      }
    );
  }

  private async executeEvent(saga: Saga<any, any>, jobData: any) {
    // Execute saga event handler
    await sagaHandleEvent(saga, jobData.event, jobData.instanceId);
  }

  private async executeTimer(saga: Saga<any, any>, jobData: any) {
    // Execute saga timer handler
    await handleTimerEvent(saga, jobData.timerName, {
      instanceId: jobData.instanceId
    });
  }
}

Timer Patterns

Simple Timeout Pattern

export function simpleTimeoutSaga() {
  return saga<"timeout", { orderId: string }>("SimpleTimeoutSaga")
    .startOn("ProcessStarted", {}, async (instance, event) => {
      instance.data = { orderId: event.payload.orderId };

      // Set 5-minute timeout
      instance.scheduleTimer("timeout", 5 * 60 * 1000);
    })
    .on("ProcessCompleted", {
      matchInstance: (event) => ({
        instanceProperty: "orderId",
        value: event.payload.orderId
      })
    }, async (instance, event) => {
      instance.cancelTimer("timeout");
      instance.complete();
    })
    .onTimer("timeout", async (instance) => {
      await handleTimeout(instance.data.orderId);
      instance.complete();
    });
}

Recurring Timer Pattern

export function healthCheckSaga() {
  return saga<"healthCheck", { serviceId: string; checks: number }>(
    "HealthCheckSaga"
  )
    .startOn("ServiceRegistered", {}, async (instance, event) => {
      instance.data = {
        serviceId: event.payload.serviceId,
        checks: 0
      };

      // Start health checks every 30 seconds
      instance.scheduleTimer("healthCheck", 30 * 1000);
    })
    .on("ServiceUnregistered", {
      matchInstance: (event) => ({
        instanceProperty: "serviceId",
        value: event.payload.serviceId
      })
    }, async (instance) => {
      instance.cancelTimer("healthCheck");
      instance.complete();
    })
    .onTimer("healthCheck", async (instance) => {
      const isHealthy = await checkServiceHealth(instance.data.serviceId);
      instance.data.checks++;

      if (!isHealthy) {
        await handleUnhealthyService(instance.data.serviceId);
        instance.complete();
      } else {
        // Schedule next check
        instance.scheduleTimer("healthCheck", 30 * 1000);
      }
    });
}

Multi-Timer Pattern

export function paymentProcessingSaga() {
  return saga<
    "paymentTimeout" | "retryDelay" | "statusCheck",
    PaymentData
  >("PaymentProcessingSaga")
    .startOn("PaymentInitiated", {}, async (instance, event) => {
      instance.data = {
        paymentId: event.payload.paymentId,
        amount: event.payload.amount,
        attempts: 0,
        maxAttempts: 3
      };

      // Set overall timeout
      instance.scheduleTimer("paymentTimeout", 10 * 60 * 1000); // 10 minutes

      // Start status checking
      instance.scheduleTimer("statusCheck", 5 * 1000); // 5 seconds

      await initiatePayment(instance.data);
    })
    .on("PaymentCompleted", {
      matchInstance: (event) => ({
        instanceProperty: "paymentId",
        value: event.payload.paymentId
      })
    }, async (instance) => {
      // Cancel all timers
      instance.cancelTimer("paymentTimeout");
      instance.cancelTimer("retryDelay");
      instance.cancelTimer("statusCheck");
      instance.complete();
    })
    .on("PaymentFailed", {
      matchInstance: (event) => ({
        instanceProperty: "paymentId",
        value: event.payload.paymentId
      })
    }, async (instance, event) => {
      instance.data.attempts++;

      if (instance.data.attempts < instance.data.maxAttempts) {
        // Schedule retry with exponential backoff
        const delay = Math.pow(2, instance.data.attempts) * 1000;
        instance.scheduleTimer("retryDelay", delay);
      } else {
        // Max attempts reached
        await handlePaymentFailure(instance.data.paymentId);
        instance.complete();
      }
    })
    .onTimer("paymentTimeout", async (instance) => {
      await handlePaymentTimeout(instance.data.paymentId);
      instance.complete();
    })
    .onTimer("retryDelay", async (instance) => {
      await retryPayment(instance.data);
      // Restart status checking
      instance.scheduleTimer("statusCheck", 5 * 1000);
    })
    .onTimer("statusCheck", async (instance) => {
      const status = await checkPaymentStatus(instance.data.paymentId);

      if (status === "pending") {
        // Continue checking
        instance.scheduleTimer("statusCheck", 5 * 1000);
      }
      // If completed or failed, events will be emitted
    });
}

Configuration and Best Practices

Scheduler Configuration

// Production configuration
const productionScheduler = new CustomSagaScheduler({
  host: process.env.REDIS_HOST,
  port: parseInt(process.env.REDIS_PORT),
  password: process.env.REDIS_PASSWORD,
  maxRetriesPerRequest: 3,
  lazyConnect: true,
  keepAlive: 30000
});

// Configure job options
const schedulerConfig = {
  defaultJobOptions: {
    removeOnComplete: 1000,    // Keep last 1000 completed jobs
    removeOnFail: 500,         // Keep last 500 failed jobs
    attempts: 3,               // Retry failed jobs 3 times
    backoff: {
      type: 'exponential',
      delay: 2000
    }
  },
  settings: {
    stalledInterval: 30 * 1000,    // Check for stalled jobs every 30s
    maxStalledCount: 1             // Max times a job can be stalled
  }
};

Timer Best Practices

  1. Use Appropriate Timeouts: Balance responsiveness with system load

// Good: Reasonable timeouts
instance.scheduleTimer("userResponse", 5 * 60 * 1000);    // 5 minutes
instance.scheduleTimer("paymentTimeout", 30 * 60 * 1000); // 30 minutes

// Avoid: Too short (creates load) or too long (poor UX)
instance.scheduleTimer("check", 100);              // Too frequent
instance.scheduleTimer("timeout", 24 * 60 * 60 * 1000); // Too long
  1. Cancel Timers When Appropriate:

.on("ProcessCompleted", {}, async (instance) => {
  // Always cancel timers when process completes
  instance.cancelTimer("timeout");
  instance.cancelTimer("reminder");
  instance.complete();
})
  1. Handle Timer Failures:

.onTimer("criticalTimer", async (instance) => {
  try {
    await performCriticalOperation();
  } catch (error) {
    console.error("Critical timer failed:", error);

    // Decide: retry, escalate, or fail
    if (shouldRetry(error)) {
      instance.scheduleTimer("criticalTimer", 60 * 1000); // Retry in 1 minute
    } else {
      await escalateFailure(instance.data);
      instance.complete();
    }
  }
})

Monitoring and Observability

Timer Metrics

import { metrics } from "@eventicle/eventiclejs";

// Enable timer metrics
metrics.enable({
  timerScheduled: true,
  timerExecuted: true,
  timerCancelled: true,
  timerFailed: true
});

// Custom timer monitoring
.onTimer("monitoredTimer", async (instance) => {
  const startTime = Date.now();

  try {
    await performOperation();

    // Record success metric
    metrics.recordTimer("timer.execution.duration", Date.now() - startTime, {
      saga: "MySaga",
      timer: "monitoredTimer",
      status: "success"
    });

  } catch (error) {
    metrics.recordTimer("timer.execution.duration", Date.now() - startTime, {
      saga: "MySaga",
      timer: "monitoredTimer",
      status: "error"
    });

    throw error;
  }
})

Health Checks

export class SchedulerHealthCheck {
  constructor(private scheduler: CustomSagaScheduler) {}

  async checkHealth(): Promise<boolean> {
    try {
      // Test scheduler responsiveness
      const testTimer = Date.now();
      await this.scheduler.handleTimer(
        testSaga,
        "healthCheck",
        { instanceId: `health-${testTimer}` }
      );

      return true;
    } catch (error) {
      console.error("Scheduler health check failed:", error);
      return false;
    }
  }
}

Testing Schedulers

describe("Timer-based Saga", () => {
  let mockScheduler: jest.Mocked<SagaScheduler>;

  beforeEach(() => {
    mockScheduler = {
      sagaHandleEvent: jest.fn(),
      handleTimer: jest.fn()
    };

    setScheduler(mockScheduler);
  });

  it("should schedule timer on saga start", async () => {
    const saga = timeoutSaga();
    const instance = new SagaInstance(saga, "instance-1");

    await saga.handlers.ProcessStarted(instance, startEvent);

    expect(mockScheduler.handleTimer).toHaveBeenCalledWith(
      saga,
      "timeout",
      expect.objectContaining({ instanceId: "instance-1" })
    );
  });

  it("should cancel timer on completion", async () => {
    const saga = timeoutSaga();
    const instance = new SagaInstance(saga, "instance-1");

    // Start process
    await saga.handlers.ProcessStarted(instance, startEvent);

    // Complete process
    await saga.handlers.ProcessCompleted(instance, completeEvent);

    // Timer should be cancelled (implementation specific verification)
    expect(instance.scheduledTimers).toHaveLength(0);
  });
});

Next Steps