Commands

Commands represent user intentions to change the system state. They encapsulate business operations and coordinate the execution of domain logic through aggregates, ensuring that business rules are enforced before events are emitted.

What Are Commands?

Commands:

  • Represent Intentions: Express what the user wants to do

  • Validate Input: Ensure data is correct before processing

  • Enforce Business Rules: Check constraints and invariants

  • Coordinate Operations: Orchestrate aggregate interactions

  • Handle Errors: Provide meaningful error responses

  • Return Results: Give feedback on the operation outcome

Command Structure

Commands in Eventicle follow a consistent pattern:

import { dispatchDirectCommand, aggregates } from "@eventicle/eventiclejs";
import { BankAccount } from "../aggregates/bank-account";

export class AccountCommands {
  static async openAccount(request: {
    accountId: string;
    customerId: string;
    initialDeposit: number;
    accountType: "checking" | "savings";
  }) {
    return dispatchDirectCommand(async () => {
      // 1. Validate input
      if (request.initialDeposit < 0) {
        throw new Error("Initial deposit cannot be negative");
      }

      if (request.initialDeposit < 25) {
        throw new Error("Minimum initial deposit is $25");
      }

      // 2. Check business rules
      const existingAccount = await aggregates.load(BankAccount, request.accountId);
      if (existingAccount && existingAccount.id) {
        throw new Error("Account already exists");
      }

      // 3. Execute domain logic
      const account = BankAccount.open(
        request.accountId,
        request.customerId,
        request.initialDeposit,
        request.accountType
      );

      // 4. Persist changes and emit events
      const events = await aggregates.persist(account);

      // 5. Return result
      return {
        response: {
          accountId: account.id,
          balance: account.balance,
          status: account.status
        },
        events
      };
    }, "accounts"); // Stream name
  }

  static async deposit(request: {
    accountId: string;
    amount: number;
    description?: string;
  }) {
    return dispatchDirectCommand(async () => {
      // Validate input
      if (request.amount <= 0) {
        throw new Error("Deposit amount must be positive");
      }

      if (request.amount > 10000) {
        throw new Error("Deposit amount exceeds daily limit");
      }

      // Load aggregate
      const account = await aggregates.load(BankAccount, request.accountId);
      if (!account || !account.id) {
        throw new Error("Account not found");
      }

      // Execute business logic
      account.deposit(request.amount, request.description);

      // Persist and return
      const events = await aggregates.persist(account);

      return {
        response: {
          accountId: account.id,
          newBalance: account.balance,
          transactionId: events[events.length - 1]?.id
        },
        events
      };
    }, "accounts");
  }

  static async transfer(request: {
    fromAccountId: string;
    toAccountId: string;
    amount: number;
    description?: string;
  }) {
    return dispatchDirectCommand(async () => {
      // Validate input
      if (request.amount <= 0) {
        throw new Error("Transfer amount must be positive");
      }

      if (request.fromAccountId === request.toAccountId) {
        throw new Error("Cannot transfer to the same account");
      }

      // Load both accounts
      const [fromAccount, toAccount] = await Promise.all([
        aggregates.load(BankAccount, request.fromAccountId),
        aggregates.load(BankAccount, request.toAccountId)
      ]);

      if (!fromAccount?.id) {
        throw new Error("Source account not found");
      }

      if (!toAccount?.id) {
        throw new Error("Destination account not found");
      }

      // Execute transfer
      fromAccount.withdraw(request.amount, `Transfer to ${request.toAccountId}`);
      toAccount.deposit(request.amount, `Transfer from ${request.fromAccountId}`);

      // Persist both accounts
      const allEvents = await aggregates.persistAll([fromAccount, toAccount]);

      return {
        response: {
          transferId: allEvents[0]?.id,
          fromAccountBalance: fromAccount.balance,
          toAccountBalance: toAccount.balance
        },
        events: allEvents
      };
    }, "accounts");
  }
}

Command Patterns

Simple Command Pattern

For straightforward operations on a single aggregate:

export class OrderCommands {
  static async createOrder(customerId: string, items: OrderItem[]) {
    return dispatchDirectCommand(async () => {
      // Validation
      if (!items || items.length === 0) {
        throw new Error("Order must contain at least one item");
      }

      // Create aggregate
      const order = Order.create(customerId, items);

      // Persist
      const events = await aggregates.persist(order);

      return {
        response: { orderId: order.id },
        events
      };
    }, "orders");
  }
}

Multi-Aggregate Command Pattern

For operations that span multiple aggregates:

export class InventoryCommands {
  static async reserveInventory(orderId: string, items: OrderItem[]) {
    return dispatchDirectCommand(async () => {
      const inventoryUpdates: Product[] = [];

      // Load all required products
      for (const item of items) {
        const product = await aggregates.load(Product, item.productId);
        if (!product?.id) {
          throw new Error(`Product ${item.productId} not found`);
        }

        // Check availability
        if (product.availableStock < item.quantity) {
          throw new Error(`Insufficient stock for ${product.name}`);
        }

        // Reserve inventory
        product.reserveStock(item.quantity, orderId);
        inventoryUpdates.push(product);
      }

      // Persist all changes
      const events = await aggregates.persistAll(inventoryUpdates);

      return {
        response: {
          reservationId: `reservation-${orderId}`,
          reservedItems: items.length
        },
        events
      };
    }, "inventory");
  }
}

Saga-Triggered Command Pattern

For commands initiated by sagas:

export class PaymentCommands {
  static async processPayment(request: {
    orderId: string;
    amount: number;
    paymentMethod: string;
    customerId: string;
  }) {
    return dispatchDirectCommand(async () => {
      // Create payment aggregate
      const payment = Payment.create(
        request.orderId,
        request.amount,
        request.paymentMethod,
        request.customerId
      );

      try {
        // Integrate with payment gateway
        const paymentResult = await processWithGateway({
          amount: request.amount,
          paymentMethod: request.paymentMethod
        });

        if (paymentResult.success) {
          payment.markSuccessful(paymentResult.transactionId);
        } else {
          payment.markFailed(paymentResult.errorCode, paymentResult.errorMessage);
        }

      } catch (error) {
        payment.markFailed("GATEWAY_ERROR", error.message);
      }

      const events = await aggregates.persist(payment);

      return {
        response: {
          paymentId: payment.id,
          status: payment.status,
          transactionId: payment.transactionId
        },
        events
      };
    }, "payments");
  }
}

async function processWithGateway(request: any) {
  // Mock payment gateway integration
  // In real implementation, integrate with Stripe, PayPal, etc.
  return {
    success: Math.random() > 0.1, // 90% success rate
    transactionId: `txn_${Date.now()}`,
    errorCode: "CARD_DECLINED",
    errorMessage: "Insufficient funds"
  };
}

Input Validation

Basic Validation

function validateCreateUserRequest(request: any) {
  const errors: string[] = [];

  if (!request.email || typeof request.email !== 'string') {
    errors.push("Email is required and must be a string");
  }

  if (!request.email.includes('@')) {
    errors.push("Email must be a valid email address");
  }

  if (!request.password || request.password.length < 8) {
    errors.push("Password must be at least 8 characters long");
  }

  if (errors.length > 0) {
    throw new Error(`Validation failed: ${errors.join(', ')}`);
  }
}

Schema-Based Validation

import Joi from 'joi';

const createOrderSchema = Joi.object({
  customerId: Joi.string().uuid().required(),
  items: Joi.array().items(
    Joi.object({
      productId: Joi.string().uuid().required(),
      quantity: Joi.number().integer().min(1).required(),
      price: Joi.number().positive().required()
    })
  ).min(1).required(),
  shippingAddress: Joi.object({
    street: Joi.string().required(),
    city: Joi.string().required(),
    zipCode: Joi.string().required(),
    country: Joi.string().required()
  }).required()
});

export class OrderCommands {
  static async createOrder(request: any) {
    return dispatchDirectCommand(async () => {
      // Validate with schema
      const { error, value } = createOrderSchema.validate(request);
      if (error) {
        throw new Error(`Validation error: ${error.details[0].message}`);
      }

      // Proceed with validated data
      const order = Order.create(value.customerId, value.items, value.shippingAddress);

      const events = await aggregates.persist(order);

      return {
        response: { orderId: order.id },
        events
      };
    }, "orders");
  }
}

Error Handling

Business Logic Errors

export class BusinessError extends Error {
  constructor(
    message: string,
    public readonly code: string,
    public readonly details?: any
  ) {
    super(message);
    this.name = 'BusinessError';
  }
}

export class ProductCommands {
  static async updatePrice(productId: string, newPrice: number) {
    return dispatchDirectCommand(async () => {
      const product = await aggregates.load(Product, productId);

      if (!product?.id) {
        throw new BusinessError(
          "Product not found",
          "PRODUCT_NOT_FOUND",
          { productId }
        );
      }

      if (product.status === "discontinued") {
        throw new BusinessError(
          "Cannot update price of discontinued product",
          "PRODUCT_DISCONTINUED",
          { productId, status: product.status }
        );
      }

      if (newPrice <= 0) {
        throw new BusinessError(
          "Price must be positive",
          "INVALID_PRICE",
          { newPrice }
        );
      }

      product.updatePrice(newPrice);

      const events = await aggregates.persist(product);

      return {
        response: {
          productId: product.id,
          oldPrice: product.previousPrice,
          newPrice: product.price
        },
        events
      };
    }, "products");
  }
}

Retry Logic

export class RobustCommands {
  static async processWithRetry<T>(
    operation: () => Promise<T>,
    maxRetries: number = 3
  ): Promise<T> {
    let lastError: Error;

    for (let attempt = 1; attempt <= maxRetries; attempt++) {
      try {
        return await operation();
      } catch (error) {
        lastError = error;

        // Don't retry business logic errors
        if (error instanceof BusinessError) {
          throw error;
        }

        if (attempt === maxRetries) {
          throw lastError;
        }

        // Exponential backoff
        const delay = Math.pow(2, attempt) * 1000;
        await new Promise(resolve => setTimeout(resolve, delay));
      }
    }

    throw lastError!;
  }

  static async reliableOperation(data: any) {
    return this.processWithRetry(async () => {
      return dispatchDirectCommand(async () => {
        // Operation that might fail due to infrastructure issues
        const result = await someUnreliableOperation(data);
        return {
          response: result,
          events: []
        };
      }, "operations");
    });
  }
}

Command Authorization

interface User {
  id: string;
  roles: string[];
  permissions: string[];
}

class AuthorizationError extends Error {
  constructor(message: string, public readonly requiredPermission: string) {
    super(message);
    this.name = 'AuthorizationError';
  }
}

function requirePermission(permission: string) {
  return function(target: any, propertyKey: string, descriptor: PropertyDescriptor) {
    const originalMethod = descriptor.value;

    descriptor.value = async function(user: User, ...args: any[]) {
      if (!user.permissions.includes(permission)) {
        throw new AuthorizationError(
          `Permission ${permission} required`,
          permission
        );
      }

      return originalMethod.apply(this, args);
    };
  };
}

export class SecureCommands {
  @requirePermission('admin:delete-user')
  static async deleteUser(user: User, userId: string) {
    return dispatchDirectCommand(async () => {
      const userToDelete = await aggregates.load(UserAggregate, userId);

      if (!userToDelete?.id) {
        throw new BusinessError("User not found", "USER_NOT_FOUND");
      }

      userToDelete.markDeleted(user.id);

      const events = await aggregates.persist(userToDelete);

      return {
        response: { deletedUserId: userId },
        events
      };
    }, "users");
  }

  @requirePermission('finance:process-refund')
  static async processRefund(user: User, orderId: string, amount: number) {
    return dispatchDirectCommand(async () => {
      // Refund logic here
      const refund = Refund.create(orderId, amount, user.id);

      const events = await aggregates.persist(refund);

      return {
        response: { refundId: refund.id },
        events
      };
    }, "refunds");
  }
}

Testing Commands

import {
  eventClientInMemory,
  setEventClient,
  InMemoryDatastore,
  setDataStore,
  consumeFullEventLog
} from "@eventicle/eventiclejs";

describe("AccountCommands", () => {
  beforeEach(() => {
    setEventClient(eventClientInMemory());
    setDataStore(new InMemoryDatastore());
  });

  describe("openAccount", () => {
    it("should create a new account with valid input", async () => {
      const result = await AccountCommands.openAccount({
        accountId: "acc-123",
        customerId: "customer-456",
        initialDeposit: 1000,
        accountType: "checking"
      });

      expect(result.response.accountId).toBe("acc-123");
      expect(result.response.balance).toBe(1000);
      expect(result.events).toHaveLength(1);
      expect(result.events[0].type).toBe("AccountOpened");
    });

    it("should reject negative initial deposit", async () => {
      await expect(AccountCommands.openAccount({
        accountId: "acc-123",
        customerId: "customer-456",
        initialDeposit: -100,
        accountType: "checking"
      })).rejects.toThrow("Initial deposit cannot be negative");
    });

    it("should reject duplicate account", async () => {
      // Create first account
      await AccountCommands.openAccount({
        accountId: "acc-123",
        customerId: "customer-456",
        initialDeposit: 1000,
        accountType: "checking"
      });

      // Wait for events to be processed
      await consumeFullEventLog();

      // Try to create duplicate
      await expect(AccountCommands.openAccount({
        accountId: "acc-123",
        customerId: "customer-789",
        initialDeposit: 500,
        accountType: "savings"
      })).rejects.toThrow("Account already exists");
    });
  });

  describe("transfer", () => {
    beforeEach(async () => {
      // Set up test accounts
      await AccountCommands.openAccount({
        accountId: "acc-source",
        customerId: "customer-1",
        initialDeposit: 1000,
        accountType: "checking"
      });

      await AccountCommands.openAccount({
        accountId: "acc-dest",
        customerId: "customer-2",
        initialDeposit: 500,
        accountType: "savings"
      });

      await consumeFullEventLog();
    });

    it("should transfer money between accounts", async () => {
      const result = await AccountCommands.transfer({
        fromAccountId: "acc-source",
        toAccountId: "acc-dest",
        amount: 200
      });

      expect(result.response.fromAccountBalance).toBe(800);
      expect(result.response.toAccountBalance).toBe(700);
      expect(result.events).toHaveLength(2); // One for each account
    });
  });
});

Command Composition

export class CompositeCommands {
  static async createUserAndAccount(request: {
    email: string;
    password: string;
    initialDeposit: number;
  }) {
    return dispatchDirectCommand(async () => {
      // Step 1: Create user
      const user = User.create(request.email, request.password);
      const userEvents = await aggregates.persist(user);

      // Step 2: Create account for user
      const account = BankAccount.open(
        `acc-${user.id}`,
        user.id,
        request.initialDeposit,
        "checking"
      );
      const accountEvents = await aggregates.persist(account);

      return {
        response: {
          userId: user.id,
          accountId: account.id,
          initialBalance: account.balance
        },
        events: [...userEvents, ...accountEvents]
      };
    }, "user-registration");
  }
}

Best Practices

  1. Validate Early: Check input parameters before loading aggregates

  2. Fail Fast: Throw meaningful errors for invalid operations

  3. Single Responsibility: Each command should do one thing well

  4. Idempotency: Design commands to be safely retryable

  5. Return Useful Data: Provide feedback about what was accomplished

  6. Handle Errors Gracefully: Distinguish between different error types

  7. Use Proper Abstractions: Separate validation, authorization, and business logic

  8. Test Thoroughly: Cover both success and failure scenarios

Next Steps