Event Views and Querying

Event Views provide the query side of CQRS by consuming events and projecting them into optimized data structures for reading. They enable efficient querying of event-sourced data without impacting the write side performance.

What Are Event Views?

Event Views:

  • Transform Events: Convert events into query-optimized structures

  • Provide Read Models: Create dedicated models for different query patterns

  • Enable CQRS: Separate read and write concerns

  • Support Multiple Projections: Multiple views can process the same events differently

  • Handle Eventually Consistent Data: Process events asynchronously

Event View Types

Eventicle provides two types of event views:

  • Standard Event Views: Process events exactly once with ordering guarantees

  • Raw Event Views: Lightweight views that receive all events without guarantees

Standard Event Views

Standard Event Views provide reliable, ordered event processing:

import { EventView, EventicleEvent, dataStore } from "@eventicle/eventiclejs";

interface ProductSummary {
  productId: string;
  name: string;
  price: number;
  category: string;
  stock: number;
  status: "active" | "discontinued";
  createdAt: Date;
  updatedAt: Date;
}

export class ProductCatalogView implements EventView {
  // View configuration
  consumerGroup = "ProductCatalogView";
  streamsToSubscribe = ["products", "inventory"];
  parallelEventCount = 15;

  async handleEvent(event: EventicleEvent): Promise<void> {
    const store = await dataStore();

    try {
      switch (event.type) {
        case "ProductCreated":
          await store.save("product-catalog", event.payload.productId, {
            productId: event.payload.productId,
            name: event.payload.name,
            price: event.payload.price,
            category: event.payload.category,
            stock: 0,
            status: "active",
            createdAt: event.payload.createdAt,
            updatedAt: event.payload.createdAt
          });
          break;

        case "ProductPriceUpdated":
          const product = await store.load("product-catalog", event.payload.productId);
          if (product) {
            product.price = event.payload.newPrice;
            product.updatedAt = event.payload.updatedAt;
            await store.save("product-catalog", event.payload.productId, product);
          }
          break;

        case "InventoryAdjusted":
          const productForInventory = await store.load("product-catalog", event.payload.productId);
          if (productForInventory) {
            productForInventory.stock = event.payload.newStock;
            productForInventory.updatedAt = event.payload.adjustedAt;
            await store.save("product-catalog", event.payload.productId, productForInventory);
          }
          break;

        case "ProductDiscontinued":
          const discontinuedProduct = await store.load("product-catalog", event.payload.productId);
          if (discontinuedProduct) {
            discontinuedProduct.status = "discontinued";
            discontinuedProduct.updatedAt = event.payload.discontinuedAt;
            await store.save("product-catalog", event.payload.productId, discontinuedProduct);
          }
          break;
      }
    } catch (error) {
      console.error("Error processing event in ProductCatalogView:", error);
      throw error; // Re-throw to trigger retry
    }
  }

  // Query methods
  async getProduct(productId: string): Promise<ProductSummary | null> {
    const store = await dataStore();
    return await store.load("product-catalog", productId);
  }

  async getProductsByCategory(category: string): Promise<ProductSummary[]> {
    const store = await dataStore();
    const allProducts = await store.scan("product-catalog");
    return allProducts.filter(p => p.category === category && p.status === "active");
  }

  async getActiveProducts(): Promise<ProductSummary[]> {
    const store = await dataStore();
    const allProducts = await store.scan("product-catalog");
    return allProducts.filter(p => p.status === "active");
  }

  async getLowStockProducts(threshold: number = 10): Promise<ProductSummary[]> {
    const store = await dataStore();
    const allProducts = await store.scan("product-catalog");
    return allProducts.filter(p => p.stock <= threshold && p.status === "active");
  }

  async searchProducts(query: string): Promise<ProductSummary[]> {
    const store = await dataStore();
    const allProducts = await store.scan("product-catalog");
    const lowerQuery = query.toLowerCase();
    return allProducts.filter(p =>
      p.status === "active" && (
        p.name.toLowerCase().includes(lowerQuery) ||
        p.category.toLowerCase().includes(lowerQuery)
      )
    );
  }
}

Key Features of Standard Views

  • Exactly-Once Processing: Each event is processed only once

  • Ordered Processing: Events from each stream are processed in order

  • Position Tracking: Eventicle tracks which events have been processed

  • Error Handling: Failed events can be retried

  • Consumer Groups: Multiple instances can share the load

Complex Event Views

Views can handle complex projections across multiple event types:

interface OrderAnalytics {
  customerId: string;
  totalOrders: number;
  totalSpent: number;
  averageOrderValue: number;
  lastOrderDate: Date;
  favoriteCategory: string;
  ordersByMonth: Record<string, number>;
}

export class CustomerAnalyticsView implements EventView {
  consumerGroup = "CustomerAnalyticsView";
  streamsToSubscribe = ["orders", "payments"];

  async handleEvent(event: EventicleEvent): Promise<void> {
    const store = await dataStore();

    switch (event.type) {
      case "OrderCreated":
        await this.updateCustomerAnalytics(
          event.payload.customerId,
          {
            newOrder: true,
            orderDate: event.payload.createdAt,
            items: event.payload.items
          }
        );
        break;

      case "PaymentCompleted":
        await this.updateCustomerAnalytics(
          event.payload.customerId,
          {
            payment: true,
            amount: event.payload.amount,
            paymentDate: event.payload.completedAt
          }
        );
        break;
    }
  }

  private async updateCustomerAnalytics(
    customerId: string,
    update: any
  ): Promise<void> {
    const store = await dataStore();

    let analytics = await store.load("customer-analytics", customerId) || {
      customerId,
      totalOrders: 0,
      totalSpent: 0,
      averageOrderValue: 0,
      lastOrderDate: new Date(0),
      favoriteCategory: "",
      ordersByMonth: {}
    };

    if (update.newOrder) {
      analytics.totalOrders++;
      analytics.lastOrderDate = update.orderDate;

      // Update monthly statistics
      const monthKey = update.orderDate.toISOString().slice(0, 7); // YYYY-MM
      analytics.ordersByMonth[monthKey] = (analytics.ordersByMonth[monthKey] || 0) + 1;

      // Update favorite category
      await this.updateFavoriteCategory(analytics, update.items);
    }

    if (update.payment) {
      analytics.totalSpent += update.amount;
      analytics.averageOrderValue = analytics.totalOrders > 0
        ? analytics.totalSpent / analytics.totalOrders
        : 0;
    }

    await store.save("customer-analytics", customerId, analytics);
  }

  private async updateFavoriteCategory(analytics: any, items: any[]) {
    const categoryCount: Record<string, number> = {};

    items.forEach(item => {
      categoryCount[item.category] = (categoryCount[item.category] || 0) + 1;
    });

    let maxCount = 0;
    let favoriteCategory = "";

    Object.entries(categoryCount).forEach(([category, count]) => {
      if (count > maxCount) {
        maxCount = count;
        favoriteCategory = category;
      }
    });

    analytics.favoriteCategory = favoriteCategory;
  }

  // Query methods
  async getCustomerAnalytics(customerId: string): Promise<OrderAnalytics | null> {
    const store = await dataStore();
    return await store.load("customer-analytics", customerId);
  }

  async getTopCustomers(limit: number = 10): Promise<OrderAnalytics[]> {
    const store = await dataStore();
    const allAnalytics = await store.scan("customer-analytics");
    return allAnalytics
      .sort((a, b) => b.totalSpent - a.totalSpent)
      .slice(0, limit);
  }
}

Raw Event Views

Raw Event Views receive all events without processing guarantees:

import { RawEventView, EventicleEvent } from "@eventicle/eventiclejs";

export class AuditLogView implements RawEventView {
  streamsToSubscribe = ["*"]; // Subscribe to all streams

  async handleEvent(event: EventicleEvent): Promise<void> {
    // Log all events for auditing
    console.log("AUDIT:", {
      timestamp: new Date(),
      eventId: event.id,
      eventType: event.type,
      stream: event.stream,
      domainId: event.domainId,
      payload: event.payload
    });

    // Could also send to external audit system
    await this.sendToAuditSystem(event);
  }

  private async sendToAuditSystem(event: EventicleEvent) {
    // Integration with external audit/logging system
    // await auditService.logEvent(event);
  }
}

export class MetricsView implements RawEventView {
  streamsToSubscribe = ["orders", "payments", "users"];

  async handleEvent(event: EventicleEvent): Promise<void> {
    // Send metrics to monitoring system
    await this.recordMetric(`event.${event.type}`, 1, {
      stream: event.stream,
      timestamp: event.timestamp
    });

    // Record business metrics
    if (event.type === "OrderCreated") {
      await this.recordMetric("business.order.created", 1);
      await this.recordMetric("business.order.value", event.payload.totalAmount);
    }
  }

  private async recordMetric(name: string, value: number, tags?: any) {
    // Integration with metrics system (Prometheus, StatsD, etc.)
    console.log(`METRIC: ${name} = ${value}`, tags);
  }
}

View Registration and Hydration

Registering Views

import { registerView, registerRawView } from "@eventicle/eventiclejs";

// Register standard views
registerView(new ProductCatalogView());
registerView(new CustomerAnalyticsView());

// Register raw views
registerRawView(new AuditLogView());
registerRawView(new MetricsView());

View Hydration

Hydrate views with historical data:

import { viewHydrator, eventClient } from "@eventicle/eventiclejs";

// Hydrate a single view
const productView = new ProductCatalogView();
await viewHydrator(productView, "products").replay();

// Hydrate multiple streams
await viewHydrator(productView, ["products", "inventory"]).replay();

// Hydrate with custom range
await viewHydrator(productView, "products").replayFromPosition(1000);

// Hydrate and continue processing
await viewHydrator(productView, "products").replayAndContinue();

View Performance Optimization

Parallel Processing

export class HighVolumeView implements EventView {
  consumerGroup = "HighVolumeView";
  streamsToSubscribe = ["transactions"];
  parallelEventCount = 50; // Process up to 50 events concurrently

  async handleEvent(event: EventicleEvent): Promise<void> {
    // High-throughput event processing
    await this.processEventEfficiently(event);
  }

  private async processEventEfficiently(event: EventicleEvent) {
    // Batch operations, use connection pooling, etc.
  }
}

Batch Processing

export class BatchProcessingView implements EventView {
  consumerGroup = "BatchProcessingView";
  streamsToSubscribe = ["orders"];

  private eventBatch: EventicleEvent[] = [];
  private batchSize = 100;
  private batchTimeout = 5000; // 5 seconds

  async handleEvent(event: EventicleEvent): Promise<void> {
    this.eventBatch.push(event);

    if (this.eventBatch.length >= this.batchSize) {
      await this.processBatch();
    } else {
      // Set timeout to process partial batch
      setTimeout(() => this.processBatch(), this.batchTimeout);
    }
  }

  private async processBatch() {
    if (this.eventBatch.length === 0) return;

    const batch = [...this.eventBatch];
    this.eventBatch = [];

    // Process batch efficiently
    await this.bulkInsert(batch);
  }

  private async bulkInsert(events: EventicleEvent[]) {
    // Efficient bulk operations
    const store = await dataStore();
    // await store.bulkSave(events.map(e => ({ key: e.domainId, value: e })));
  }
}

Error Handling in Views

export class RobustView implements EventView {
  consumerGroup = "RobustView";
  streamsToSubscribe = ["orders"];

  async handleEvent(event: EventicleEvent): Promise<void> {
    const maxRetries = 3;
    let attempt = 0;

    while (attempt < maxRetries) {
      try {
        await this.processEvent(event);
        return; // Success

      } catch (error) {
        attempt++;
        console.error(`View processing error (attempt ${attempt}):`, error);

        if (attempt >= maxRetries) {
          // Send to dead letter queue or alert
          await this.handleFailedEvent(event, error);
          throw error;
        }

        // Exponential backoff
        await this.delay(Math.pow(2, attempt) * 1000);
      }
    }
  }

  private async processEvent(event: EventicleEvent) {
    // Main processing logic
  }

  private async handleFailedEvent(event: EventicleEvent, error: any) {
    // Log to monitoring system
    console.error("Failed to process event after retries:", {
      eventId: event.id,
      eventType: event.type,
      error: error.message
    });

    // Could send to dead letter queue
    // await deadLetterQueue.send(event);
  }

  private delay(ms: number): Promise<void> {
    return new Promise(resolve => setTimeout(resolve, ms));
  }
}

View Testing

import {
  eventClientInMemory,
  setEventClient,
  InMemoryDatastore,
  setDataStore
} from "@eventicle/eventiclejs";

describe("ProductCatalogView", () => {
  let view: ProductCatalogView;

  beforeEach(() => {
    // Set up test environment
    setEventClient(eventClientInMemory());
    setDataStore(new InMemoryDatastore());
    view = new ProductCatalogView();
  });

  it("should create product on ProductCreated event", async () => {
    const event = {
      id: "event-1",
      type: "ProductCreated",
      stream: "products",
      domainId: "product-123",
      timestamp: Date.now(),
      payload: {
        productId: "product-123",
        name: "Test Product",
        price: 29.99,
        category: "Electronics",
        createdAt: new Date()
      }
    };

    await view.handleEvent(event);

    const product = await view.getProduct("product-123");
    expect(product).toBeDefined();
    expect(product.name).toBe("Test Product");
    expect(product.price).toBe(29.99);
  });

  it("should update product price on ProductPriceUpdated event", async () => {
    // First create a product
    await view.handleEvent({
      id: "event-1",
      type: "ProductCreated",
      stream: "products",
      domainId: "product-123",
      timestamp: Date.now(),
      payload: {
        productId: "product-123",
        name: "Test Product",
        price: 29.99,
        category: "Electronics",
        createdAt: new Date()
      }
    });

    // Then update the price
    await view.handleEvent({
      id: "event-2",
      type: "ProductPriceUpdated",
      stream: "products",
      domainId: "product-123",
      timestamp: Date.now(),
      payload: {
        productId: "product-123",
        newPrice: 24.99,
        updatedAt: new Date()
      }
    });

    const product = await view.getProduct("product-123");
    expect(product.price).toBe(24.99);
  });
});

Best Practices

  1. Design for Queries: Structure views based on how data will be queried

  2. Handle Missing Data: Always check if related data exists before updating

  3. Implement Idempotency: Views should handle duplicate events gracefully

  4. Use Appropriate View Type: Standard views for consistency, raw views for metrics

  5. Optimize for Read Performance: Denormalize data for efficient queries

  6. Handle Errors Gracefully: Implement retry logic and error reporting

  7. Monitor View Lag: Track how far behind views are from the event stream

  8. Version Views: Plan for view schema evolution

Next Steps