Commands
Commands represent user intentions to change the system state. They encapsulate business operations and coordinate the execution of domain logic through aggregates, ensuring that business rules are enforced before events are emitted.
What Are Commands?
Commands:
-
Represent Intentions: Express what the user wants to do
-
Validate Input: Ensure data is correct before processing
-
Enforce Business Rules: Check constraints and invariants
-
Coordinate Operations: Orchestrate aggregate interactions
-
Handle Errors: Provide meaningful error responses
-
Return Results: Give feedback on the operation outcome
Command Structure
Commands in Eventicle follow a consistent pattern:
import { dispatchDirectCommand, aggregates } from "@eventicle/eventiclejs";
import { BankAccount } from "../aggregates/bank-account";
export class AccountCommands {
static async openAccount(request: {
accountId: string;
customerId: string;
initialDeposit: number;
accountType: "checking" | "savings";
}) {
return dispatchDirectCommand(async () => {
// 1. Validate input
if (request.initialDeposit < 0) {
throw new Error("Initial deposit cannot be negative");
}
if (request.initialDeposit < 25) {
throw new Error("Minimum initial deposit is $25");
}
// 2. Check business rules
const existingAccount = await aggregates.load(BankAccount, request.accountId);
if (existingAccount && existingAccount.id) {
throw new Error("Account already exists");
}
// 3. Execute domain logic
const account = BankAccount.open(
request.accountId,
request.customerId,
request.initialDeposit,
request.accountType
);
// 4. Persist changes and emit events
const events = await aggregates.persist(account);
// 5. Return result
return {
response: {
accountId: account.id,
balance: account.balance,
status: account.status
},
events
};
}, "accounts"); // Stream name
}
static async deposit(request: {
accountId: string;
amount: number;
description?: string;
}) {
return dispatchDirectCommand(async () => {
// Validate input
if (request.amount <= 0) {
throw new Error("Deposit amount must be positive");
}
if (request.amount > 10000) {
throw new Error("Deposit amount exceeds daily limit");
}
// Load aggregate
const account = await aggregates.load(BankAccount, request.accountId);
if (!account || !account.id) {
throw new Error("Account not found");
}
// Execute business logic
account.deposit(request.amount, request.description);
// Persist and return
const events = await aggregates.persist(account);
return {
response: {
accountId: account.id,
newBalance: account.balance,
transactionId: events[events.length - 1]?.id
},
events
};
}, "accounts");
}
static async transfer(request: {
fromAccountId: string;
toAccountId: string;
amount: number;
description?: string;
}) {
return dispatchDirectCommand(async () => {
// Validate input
if (request.amount <= 0) {
throw new Error("Transfer amount must be positive");
}
if (request.fromAccountId === request.toAccountId) {
throw new Error("Cannot transfer to the same account");
}
// Load both accounts
const [fromAccount, toAccount] = await Promise.all([
aggregates.load(BankAccount, request.fromAccountId),
aggregates.load(BankAccount, request.toAccountId)
]);
if (!fromAccount?.id) {
throw new Error("Source account not found");
}
if (!toAccount?.id) {
throw new Error("Destination account not found");
}
// Execute transfer
fromAccount.withdraw(request.amount, `Transfer to ${request.toAccountId}`);
toAccount.deposit(request.amount, `Transfer from ${request.fromAccountId}`);
// Persist both accounts
const allEvents = await aggregates.persistAll([fromAccount, toAccount]);
return {
response: {
transferId: allEvents[0]?.id,
fromAccountBalance: fromAccount.balance,
toAccountBalance: toAccount.balance
},
events: allEvents
};
}, "accounts");
}
}
Command Patterns
Simple Command Pattern
For straightforward operations on a single aggregate:
export class OrderCommands {
static async createOrder(customerId: string, items: OrderItem[]) {
return dispatchDirectCommand(async () => {
// Validation
if (!items || items.length === 0) {
throw new Error("Order must contain at least one item");
}
// Create aggregate
const order = Order.create(customerId, items);
// Persist
const events = await aggregates.persist(order);
return {
response: { orderId: order.id },
events
};
}, "orders");
}
}
Multi-Aggregate Command Pattern
For operations that span multiple aggregates:
export class InventoryCommands {
static async reserveInventory(orderId: string, items: OrderItem[]) {
return dispatchDirectCommand(async () => {
const inventoryUpdates: Product[] = [];
// Load all required products
for (const item of items) {
const product = await aggregates.load(Product, item.productId);
if (!product?.id) {
throw new Error(`Product ${item.productId} not found`);
}
// Check availability
if (product.availableStock < item.quantity) {
throw new Error(`Insufficient stock for ${product.name}`);
}
// Reserve inventory
product.reserveStock(item.quantity, orderId);
inventoryUpdates.push(product);
}
// Persist all changes
const events = await aggregates.persistAll(inventoryUpdates);
return {
response: {
reservationId: `reservation-${orderId}`,
reservedItems: items.length
},
events
};
}, "inventory");
}
}
Saga-Triggered Command Pattern
For commands initiated by sagas:
export class PaymentCommands {
static async processPayment(request: {
orderId: string;
amount: number;
paymentMethod: string;
customerId: string;
}) {
return dispatchDirectCommand(async () => {
// Create payment aggregate
const payment = Payment.create(
request.orderId,
request.amount,
request.paymentMethod,
request.customerId
);
try {
// Integrate with payment gateway
const paymentResult = await processWithGateway({
amount: request.amount,
paymentMethod: request.paymentMethod
});
if (paymentResult.success) {
payment.markSuccessful(paymentResult.transactionId);
} else {
payment.markFailed(paymentResult.errorCode, paymentResult.errorMessage);
}
} catch (error) {
payment.markFailed("GATEWAY_ERROR", error.message);
}
const events = await aggregates.persist(payment);
return {
response: {
paymentId: payment.id,
status: payment.status,
transactionId: payment.transactionId
},
events
};
}, "payments");
}
}
async function processWithGateway(request: any) {
// Mock payment gateway integration
// In real implementation, integrate with Stripe, PayPal, etc.
return {
success: Math.random() > 0.1, // 90% success rate
transactionId: `txn_${Date.now()}`,
errorCode: "CARD_DECLINED",
errorMessage: "Insufficient funds"
};
}
Input Validation
Basic Validation
function validateCreateUserRequest(request: any) {
const errors: string[] = [];
if (!request.email || typeof request.email !== 'string') {
errors.push("Email is required and must be a string");
}
if (!request.email.includes('@')) {
errors.push("Email must be a valid email address");
}
if (!request.password || request.password.length < 8) {
errors.push("Password must be at least 8 characters long");
}
if (errors.length > 0) {
throw new Error(`Validation failed: ${errors.join(', ')}`);
}
}
Schema-Based Validation
import Joi from 'joi';
const createOrderSchema = Joi.object({
customerId: Joi.string().uuid().required(),
items: Joi.array().items(
Joi.object({
productId: Joi.string().uuid().required(),
quantity: Joi.number().integer().min(1).required(),
price: Joi.number().positive().required()
})
).min(1).required(),
shippingAddress: Joi.object({
street: Joi.string().required(),
city: Joi.string().required(),
zipCode: Joi.string().required(),
country: Joi.string().required()
}).required()
});
export class OrderCommands {
static async createOrder(request: any) {
return dispatchDirectCommand(async () => {
// Validate with schema
const { error, value } = createOrderSchema.validate(request);
if (error) {
throw new Error(`Validation error: ${error.details[0].message}`);
}
// Proceed with validated data
const order = Order.create(value.customerId, value.items, value.shippingAddress);
const events = await aggregates.persist(order);
return {
response: { orderId: order.id },
events
};
}, "orders");
}
}
Error Handling
Business Logic Errors
export class BusinessError extends Error {
constructor(
message: string,
public readonly code: string,
public readonly details?: any
) {
super(message);
this.name = 'BusinessError';
}
}
export class ProductCommands {
static async updatePrice(productId: string, newPrice: number) {
return dispatchDirectCommand(async () => {
const product = await aggregates.load(Product, productId);
if (!product?.id) {
throw new BusinessError(
"Product not found",
"PRODUCT_NOT_FOUND",
{ productId }
);
}
if (product.status === "discontinued") {
throw new BusinessError(
"Cannot update price of discontinued product",
"PRODUCT_DISCONTINUED",
{ productId, status: product.status }
);
}
if (newPrice <= 0) {
throw new BusinessError(
"Price must be positive",
"INVALID_PRICE",
{ newPrice }
);
}
product.updatePrice(newPrice);
const events = await aggregates.persist(product);
return {
response: {
productId: product.id,
oldPrice: product.previousPrice,
newPrice: product.price
},
events
};
}, "products");
}
}
Retry Logic
export class RobustCommands {
static async processWithRetry<T>(
operation: () => Promise<T>,
maxRetries: number = 3
): Promise<T> {
let lastError: Error;
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
return await operation();
} catch (error) {
lastError = error;
// Don't retry business logic errors
if (error instanceof BusinessError) {
throw error;
}
if (attempt === maxRetries) {
throw lastError;
}
// Exponential backoff
const delay = Math.pow(2, attempt) * 1000;
await new Promise(resolve => setTimeout(resolve, delay));
}
}
throw lastError!;
}
static async reliableOperation(data: any) {
return this.processWithRetry(async () => {
return dispatchDirectCommand(async () => {
// Operation that might fail due to infrastructure issues
const result = await someUnreliableOperation(data);
return {
response: result,
events: []
};
}, "operations");
});
}
}
Command Authorization
interface User {
id: string;
roles: string[];
permissions: string[];
}
class AuthorizationError extends Error {
constructor(message: string, public readonly requiredPermission: string) {
super(message);
this.name = 'AuthorizationError';
}
}
function requirePermission(permission: string) {
return function(target: any, propertyKey: string, descriptor: PropertyDescriptor) {
const originalMethod = descriptor.value;
descriptor.value = async function(user: User, ...args: any[]) {
if (!user.permissions.includes(permission)) {
throw new AuthorizationError(
`Permission ${permission} required`,
permission
);
}
return originalMethod.apply(this, args);
};
};
}
export class SecureCommands {
@requirePermission('admin:delete-user')
static async deleteUser(user: User, userId: string) {
return dispatchDirectCommand(async () => {
const userToDelete = await aggregates.load(UserAggregate, userId);
if (!userToDelete?.id) {
throw new BusinessError("User not found", "USER_NOT_FOUND");
}
userToDelete.markDeleted(user.id);
const events = await aggregates.persist(userToDelete);
return {
response: { deletedUserId: userId },
events
};
}, "users");
}
@requirePermission('finance:process-refund')
static async processRefund(user: User, orderId: string, amount: number) {
return dispatchDirectCommand(async () => {
// Refund logic here
const refund = Refund.create(orderId, amount, user.id);
const events = await aggregates.persist(refund);
return {
response: { refundId: refund.id },
events
};
}, "refunds");
}
}
Testing Commands
import {
eventClientInMemory,
setEventClient,
InMemoryDatastore,
setDataStore,
consumeFullEventLog
} from "@eventicle/eventiclejs";
describe("AccountCommands", () => {
beforeEach(() => {
setEventClient(eventClientInMemory());
setDataStore(new InMemoryDatastore());
});
describe("openAccount", () => {
it("should create a new account with valid input", async () => {
const result = await AccountCommands.openAccount({
accountId: "acc-123",
customerId: "customer-456",
initialDeposit: 1000,
accountType: "checking"
});
expect(result.response.accountId).toBe("acc-123");
expect(result.response.balance).toBe(1000);
expect(result.events).toHaveLength(1);
expect(result.events[0].type).toBe("AccountOpened");
});
it("should reject negative initial deposit", async () => {
await expect(AccountCommands.openAccount({
accountId: "acc-123",
customerId: "customer-456",
initialDeposit: -100,
accountType: "checking"
})).rejects.toThrow("Initial deposit cannot be negative");
});
it("should reject duplicate account", async () => {
// Create first account
await AccountCommands.openAccount({
accountId: "acc-123",
customerId: "customer-456",
initialDeposit: 1000,
accountType: "checking"
});
// Wait for events to be processed
await consumeFullEventLog();
// Try to create duplicate
await expect(AccountCommands.openAccount({
accountId: "acc-123",
customerId: "customer-789",
initialDeposit: 500,
accountType: "savings"
})).rejects.toThrow("Account already exists");
});
});
describe("transfer", () => {
beforeEach(async () => {
// Set up test accounts
await AccountCommands.openAccount({
accountId: "acc-source",
customerId: "customer-1",
initialDeposit: 1000,
accountType: "checking"
});
await AccountCommands.openAccount({
accountId: "acc-dest",
customerId: "customer-2",
initialDeposit: 500,
accountType: "savings"
});
await consumeFullEventLog();
});
it("should transfer money between accounts", async () => {
const result = await AccountCommands.transfer({
fromAccountId: "acc-source",
toAccountId: "acc-dest",
amount: 200
});
expect(result.response.fromAccountBalance).toBe(800);
expect(result.response.toAccountBalance).toBe(700);
expect(result.events).toHaveLength(2); // One for each account
});
});
});
Command Composition
export class CompositeCommands {
static async createUserAndAccount(request: {
email: string;
password: string;
initialDeposit: number;
}) {
return dispatchDirectCommand(async () => {
// Step 1: Create user
const user = User.create(request.email, request.password);
const userEvents = await aggregates.persist(user);
// Step 2: Create account for user
const account = BankAccount.open(
`acc-${user.id}`,
user.id,
request.initialDeposit,
"checking"
);
const accountEvents = await aggregates.persist(account);
return {
response: {
userId: user.id,
accountId: account.id,
initialBalance: account.balance
},
events: [...userEvents, ...accountEvents]
};
}, "user-registration");
}
}
Best Practices
-
Validate Early: Check input parameters before loading aggregates
-
Fail Fast: Throw meaningful errors for invalid operations
-
Single Responsibility: Each command should do one thing well
-
Idempotency: Design commands to be safely retryable
-
Return Useful Data: Provide feedback about what was accomplished
-
Handle Errors Gracefully: Distinguish between different error types
-
Use Proper Abstractions: Separate validation, authorization, and business logic
-
Test Thoroughly: Cover both success and failure scenarios
Next Steps
-
Learn about Implementing Commands in detail
-
Explore Aggregate Roots for domain logic
-
Understand Testing Commands patterns
-
See Error Handling strategies