Scheduling and Timers
The Eventicle scheduler system provides robust timing capabilities for sagas, enabling delayed operations, timeouts, and complex time-based workflows. The scheduler ensures reliable execution of timed operations even in distributed environments.
Overview
The scheduler system provides:
-
Timer Management: Schedule and cancel timers for saga operations
-
Serialized Execution: Ensure saga events and timers are processed in order
-
Reliability: Persistent timers that survive application restarts
-
Scalability: Distributed timer processing across multiple instances
-
Flexibility: Multiple scheduler implementations for different needs
Scheduler Types
Eventicle supports multiple scheduler implementations:
-
Local Scheduler: In-memory scheduling for development and testing
-
BullMQ Scheduler: Redis-backed distributed scheduling for production
-
BullMQ Pro Scheduler: Enhanced version with advanced serialization guarantees
Local Scheduler
The local scheduler uses in-memory timers and is suitable for development:
import {
LocalScheduleJobRunner,
setScheduler
} from "@eventicle/eventiclejs";
// Configure local scheduler
const localScheduler = new LocalScheduleJobRunner();
setScheduler(localScheduler);
// Sagas can now use timers
export function orderTimeoutSaga() {
return saga<"orderTimeout", OrderData>("OrderTimeoutSaga")
.startOn("OrderCreated", {}, async (instance, event) => {
instance.data = {
orderId: event.payload.orderId,
customerId: event.payload.customerId,
createdAt: event.payload.createdAt
};
// Schedule timeout in 30 minutes
instance.scheduleTimer("orderTimeout", 30 * 60 * 1000);
})
.on("OrderCompleted", {
matchInstance: (event) => ({
instanceProperty: "orderId",
value: event.payload.orderId
})
}, async (instance, event) => {
// Cancel timeout since order completed
instance.cancelTimer("orderTimeout");
instance.complete();
})
.onTimer("orderTimeout", async (instance) => {
// Handle timeout - perhaps cancel the order
await cancelOrder(instance.data.orderId, "Order timeout");
instance.complete();
});
}
BullMQ Scheduler
For production environments, use the BullMQ scheduler with Redis:
import {
BullMQScheduleJobRunner,
setScheduler
} from "@eventicle/eventiclejs";
import Redis from "ioredis";
// Configure Redis connection
const redis = new Redis({
host: "localhost",
port: 6379,
maxRetriesPerRequest: 3
});
// Configure BullMQ scheduler
const bullMQScheduler = new BullMQScheduleJobRunner({
connection: redis,
queueName: "saga-timers",
concurrency: 10
});
setScheduler(bullMQScheduler);
// Start the scheduler
await bullMQScheduler.startup();
Saga Scheduler (Advanced)
The Saga Scheduler provides enhanced serialization guarantees for complex workflows:
interface SagaScheduler {
// Handle saga events with guaranteed serialization
sagaHandleEvent(
saga: Saga<any, any>,
event: EventicleEvent,
instanceId: string
): Promise<void>;
// Handle timer events with serialization
handleTimer(
saga: Saga<any, any>,
name: string,
data: { instanceId: string }
): Promise<void>;
}
Why Serialization Matters
Without proper serialization, saga instances can have race conditions:
// Problematic scenario without serialization:
// 1. Timer fires: "processPayment"
// 2. Event arrives: "PaymentReceived"
// 3. Both execute simultaneously on same saga instance
// 4. Race condition: timer might cancel payment that already succeeded
// With saga scheduler:
// 1. Timer queued with instance ID
// 2. Event queued with same instance ID
// 3. Queue processes one at a time per instance
// 4. No race conditions
Custom Saga Scheduler Implementation
import { SagaScheduler } from "@eventicle/eventiclejs";
import { QueuePro, WorkerPro } from "bullmq-pro";
export class CustomSagaScheduler implements SagaScheduler {
private queue: QueuePro;
private worker: WorkerPro;
constructor(private redisConfig: any) {}
async startup() {
this.queue = new QueuePro("saga-scheduler", {
connection: this.redisConfig,
defaultJobOptions: {
removeOnComplete: 100,
removeOnFail: 50
}
});
this.worker = new WorkerPro(
"saga-scheduler",
async (job) => {
const saga = findSaga(job.data.sagaName);
if (job.data.type === "timer") {
await this.executeTimer(saga, job.data);
} else {
await this.executeEvent(saga, job.data);
}
},
{
// Critical: Force serialization per saga instance
group: {
concurrency: 1 // One job per group (instance) at a time
},
concurrency: 200, // But allow many different instances
connection: this.redisConfig
}
);
}
async sagaHandleEvent(
saga: Saga<any, any>,
event: EventicleEvent,
instanceId: string
): Promise<void> {
await this.queue.add(
"saga-event",
{
type: "event",
sagaName: saga.name,
event,
instanceId
},
{
// Group by instance ID for serialization
group: { id: instanceId },
// Add delay if needed for ordering
delay: 0
}
);
}
async handleTimer(
saga: Saga<any, any>,
name: string,
data: { instanceId: string }
): Promise<void> {
await this.queue.add(
"saga-timer",
{
type: "timer",
sagaName: saga.name,
timerName: name,
instanceId: data.instanceId
},
{
// Same group ensures serialization with events
group: { id: data.instanceId }
}
);
}
private async executeEvent(saga: Saga<any, any>, jobData: any) {
// Execute saga event handler
await sagaHandleEvent(saga, jobData.event, jobData.instanceId);
}
private async executeTimer(saga: Saga<any, any>, jobData: any) {
// Execute saga timer handler
await handleTimerEvent(saga, jobData.timerName, {
instanceId: jobData.instanceId
});
}
}
Timer Patterns
Simple Timeout Pattern
export function simpleTimeoutSaga() {
return saga<"timeout", { orderId: string }>("SimpleTimeoutSaga")
.startOn("ProcessStarted", {}, async (instance, event) => {
instance.data = { orderId: event.payload.orderId };
// Set 5-minute timeout
instance.scheduleTimer("timeout", 5 * 60 * 1000);
})
.on("ProcessCompleted", {
matchInstance: (event) => ({
instanceProperty: "orderId",
value: event.payload.orderId
})
}, async (instance, event) => {
instance.cancelTimer("timeout");
instance.complete();
})
.onTimer("timeout", async (instance) => {
await handleTimeout(instance.data.orderId);
instance.complete();
});
}
Recurring Timer Pattern
export function healthCheckSaga() {
return saga<"healthCheck", { serviceId: string; checks: number }>(
"HealthCheckSaga"
)
.startOn("ServiceRegistered", {}, async (instance, event) => {
instance.data = {
serviceId: event.payload.serviceId,
checks: 0
};
// Start health checks every 30 seconds
instance.scheduleTimer("healthCheck", 30 * 1000);
})
.on("ServiceUnregistered", {
matchInstance: (event) => ({
instanceProperty: "serviceId",
value: event.payload.serviceId
})
}, async (instance) => {
instance.cancelTimer("healthCheck");
instance.complete();
})
.onTimer("healthCheck", async (instance) => {
const isHealthy = await checkServiceHealth(instance.data.serviceId);
instance.data.checks++;
if (!isHealthy) {
await handleUnhealthyService(instance.data.serviceId);
instance.complete();
} else {
// Schedule next check
instance.scheduleTimer("healthCheck", 30 * 1000);
}
});
}
Multi-Timer Pattern
export function paymentProcessingSaga() {
return saga<
"paymentTimeout" | "retryDelay" | "statusCheck",
PaymentData
>("PaymentProcessingSaga")
.startOn("PaymentInitiated", {}, async (instance, event) => {
instance.data = {
paymentId: event.payload.paymentId,
amount: event.payload.amount,
attempts: 0,
maxAttempts: 3
};
// Set overall timeout
instance.scheduleTimer("paymentTimeout", 10 * 60 * 1000); // 10 minutes
// Start status checking
instance.scheduleTimer("statusCheck", 5 * 1000); // 5 seconds
await initiatePayment(instance.data);
})
.on("PaymentCompleted", {
matchInstance: (event) => ({
instanceProperty: "paymentId",
value: event.payload.paymentId
})
}, async (instance) => {
// Cancel all timers
instance.cancelTimer("paymentTimeout");
instance.cancelTimer("retryDelay");
instance.cancelTimer("statusCheck");
instance.complete();
})
.on("PaymentFailed", {
matchInstance: (event) => ({
instanceProperty: "paymentId",
value: event.payload.paymentId
})
}, async (instance, event) => {
instance.data.attempts++;
if (instance.data.attempts < instance.data.maxAttempts) {
// Schedule retry with exponential backoff
const delay = Math.pow(2, instance.data.attempts) * 1000;
instance.scheduleTimer("retryDelay", delay);
} else {
// Max attempts reached
await handlePaymentFailure(instance.data.paymentId);
instance.complete();
}
})
.onTimer("paymentTimeout", async (instance) => {
await handlePaymentTimeout(instance.data.paymentId);
instance.complete();
})
.onTimer("retryDelay", async (instance) => {
await retryPayment(instance.data);
// Restart status checking
instance.scheduleTimer("statusCheck", 5 * 1000);
})
.onTimer("statusCheck", async (instance) => {
const status = await checkPaymentStatus(instance.data.paymentId);
if (status === "pending") {
// Continue checking
instance.scheduleTimer("statusCheck", 5 * 1000);
}
// If completed or failed, events will be emitted
});
}
Configuration and Best Practices
Scheduler Configuration
// Production configuration
const productionScheduler = new CustomSagaScheduler({
host: process.env.REDIS_HOST,
port: parseInt(process.env.REDIS_PORT),
password: process.env.REDIS_PASSWORD,
maxRetriesPerRequest: 3,
lazyConnect: true,
keepAlive: 30000
});
// Configure job options
const schedulerConfig = {
defaultJobOptions: {
removeOnComplete: 1000, // Keep last 1000 completed jobs
removeOnFail: 500, // Keep last 500 failed jobs
attempts: 3, // Retry failed jobs 3 times
backoff: {
type: 'exponential',
delay: 2000
}
},
settings: {
stalledInterval: 30 * 1000, // Check for stalled jobs every 30s
maxStalledCount: 1 // Max times a job can be stalled
}
};
Timer Best Practices
-
Use Appropriate Timeouts: Balance responsiveness with system load
// Good: Reasonable timeouts
instance.scheduleTimer("userResponse", 5 * 60 * 1000); // 5 minutes
instance.scheduleTimer("paymentTimeout", 30 * 60 * 1000); // 30 minutes
// Avoid: Too short (creates load) or too long (poor UX)
instance.scheduleTimer("check", 100); // Too frequent
instance.scheduleTimer("timeout", 24 * 60 * 60 * 1000); // Too long
-
Cancel Timers When Appropriate:
.on("ProcessCompleted", {}, async (instance) => {
// Always cancel timers when process completes
instance.cancelTimer("timeout");
instance.cancelTimer("reminder");
instance.complete();
})
-
Handle Timer Failures:
.onTimer("criticalTimer", async (instance) => {
try {
await performCriticalOperation();
} catch (error) {
console.error("Critical timer failed:", error);
// Decide: retry, escalate, or fail
if (shouldRetry(error)) {
instance.scheduleTimer("criticalTimer", 60 * 1000); // Retry in 1 minute
} else {
await escalateFailure(instance.data);
instance.complete();
}
}
})
Monitoring and Observability
Timer Metrics
import { metrics } from "@eventicle/eventiclejs";
// Enable timer metrics
metrics.enable({
timerScheduled: true,
timerExecuted: true,
timerCancelled: true,
timerFailed: true
});
// Custom timer monitoring
.onTimer("monitoredTimer", async (instance) => {
const startTime = Date.now();
try {
await performOperation();
// Record success metric
metrics.recordTimer("timer.execution.duration", Date.now() - startTime, {
saga: "MySaga",
timer: "monitoredTimer",
status: "success"
});
} catch (error) {
metrics.recordTimer("timer.execution.duration", Date.now() - startTime, {
saga: "MySaga",
timer: "monitoredTimer",
status: "error"
});
throw error;
}
})
Health Checks
export class SchedulerHealthCheck {
constructor(private scheduler: CustomSagaScheduler) {}
async checkHealth(): Promise<boolean> {
try {
// Test scheduler responsiveness
const testTimer = Date.now();
await this.scheduler.handleTimer(
testSaga,
"healthCheck",
{ instanceId: `health-${testTimer}` }
);
return true;
} catch (error) {
console.error("Scheduler health check failed:", error);
return false;
}
}
}
Testing Schedulers
describe("Timer-based Saga", () => {
let mockScheduler: jest.Mocked<SagaScheduler>;
beforeEach(() => {
mockScheduler = {
sagaHandleEvent: jest.fn(),
handleTimer: jest.fn()
};
setScheduler(mockScheduler);
});
it("should schedule timer on saga start", async () => {
const saga = timeoutSaga();
const instance = new SagaInstance(saga, "instance-1");
await saga.handlers.ProcessStarted(instance, startEvent);
expect(mockScheduler.handleTimer).toHaveBeenCalledWith(
saga,
"timeout",
expect.objectContaining({ instanceId: "instance-1" })
);
});
it("should cancel timer on completion", async () => {
const saga = timeoutSaga();
const instance = new SagaInstance(saga, "instance-1");
// Start process
await saga.handlers.ProcessStarted(instance, startEvent);
// Complete process
await saga.handlers.ProcessCompleted(instance, completeEvent);
// Timer should be cancelled (implementation specific verification)
expect(instance.scheduledTimers).toHaveLength(0);
});
});
Next Steps
-
Explore Event Sagas for workflow patterns
-
Learn about Error Handling in timed operations
-
See Observability for monitoring schedulers
-
Understand Deployment considerations for distributed schedulers