Event Views and Querying
Event Views provide the query side of CQRS by consuming events and projecting them into optimized data structures for reading. They enable efficient querying of event-sourced data without impacting the write side performance.
What Are Event Views?
Event Views:
-
Transform Events: Convert events into query-optimized structures
-
Provide Read Models: Create dedicated models for different query patterns
-
Enable CQRS: Separate read and write concerns
-
Support Multiple Projections: Multiple views can process the same events differently
-
Handle Eventually Consistent Data: Process events asynchronously
Event View Types
Eventicle provides two types of event views:
-
Standard Event Views: Process events exactly once with ordering guarantees
-
Raw Event Views: Lightweight views that receive all events without guarantees
Standard Event Views
Standard Event Views provide reliable, ordered event processing:
import { EventView, EventicleEvent, dataStore } from "@eventicle/eventiclejs";
interface ProductSummary {
productId: string;
name: string;
price: number;
category: string;
stock: number;
status: "active" | "discontinued";
createdAt: Date;
updatedAt: Date;
}
export class ProductCatalogView implements EventView {
// View configuration
consumerGroup = "ProductCatalogView";
streamsToSubscribe = ["products", "inventory"];
parallelEventCount = 15;
async handleEvent(event: EventicleEvent): Promise<void> {
const store = await dataStore();
try {
switch (event.type) {
case "ProductCreated":
await store.save("product-catalog", event.payload.productId, {
productId: event.payload.productId,
name: event.payload.name,
price: event.payload.price,
category: event.payload.category,
stock: 0,
status: "active",
createdAt: event.payload.createdAt,
updatedAt: event.payload.createdAt
});
break;
case "ProductPriceUpdated":
const product = await store.load("product-catalog", event.payload.productId);
if (product) {
product.price = event.payload.newPrice;
product.updatedAt = event.payload.updatedAt;
await store.save("product-catalog", event.payload.productId, product);
}
break;
case "InventoryAdjusted":
const productForInventory = await store.load("product-catalog", event.payload.productId);
if (productForInventory) {
productForInventory.stock = event.payload.newStock;
productForInventory.updatedAt = event.payload.adjustedAt;
await store.save("product-catalog", event.payload.productId, productForInventory);
}
break;
case "ProductDiscontinued":
const discontinuedProduct = await store.load("product-catalog", event.payload.productId);
if (discontinuedProduct) {
discontinuedProduct.status = "discontinued";
discontinuedProduct.updatedAt = event.payload.discontinuedAt;
await store.save("product-catalog", event.payload.productId, discontinuedProduct);
}
break;
}
} catch (error) {
console.error("Error processing event in ProductCatalogView:", error);
throw error; // Re-throw to trigger retry
}
}
// Query methods
async getProduct(productId: string): Promise<ProductSummary | null> {
const store = await dataStore();
return await store.load("product-catalog", productId);
}
async getProductsByCategory(category: string): Promise<ProductSummary[]> {
const store = await dataStore();
const allProducts = await store.scan("product-catalog");
return allProducts.filter(p => p.category === category && p.status === "active");
}
async getActiveProducts(): Promise<ProductSummary[]> {
const store = await dataStore();
const allProducts = await store.scan("product-catalog");
return allProducts.filter(p => p.status === "active");
}
async getLowStockProducts(threshold: number = 10): Promise<ProductSummary[]> {
const store = await dataStore();
const allProducts = await store.scan("product-catalog");
return allProducts.filter(p => p.stock <= threshold && p.status === "active");
}
async searchProducts(query: string): Promise<ProductSummary[]> {
const store = await dataStore();
const allProducts = await store.scan("product-catalog");
const lowerQuery = query.toLowerCase();
return allProducts.filter(p =>
p.status === "active" && (
p.name.toLowerCase().includes(lowerQuery) ||
p.category.toLowerCase().includes(lowerQuery)
)
);
}
}
Key Features of Standard Views
-
Exactly-Once Processing: Each event is processed only once
-
Ordered Processing: Events from each stream are processed in order
-
Position Tracking: Eventicle tracks which events have been processed
-
Error Handling: Failed events can be retried
-
Consumer Groups: Multiple instances can share the load
Complex Event Views
Views can handle complex projections across multiple event types:
interface OrderAnalytics {
customerId: string;
totalOrders: number;
totalSpent: number;
averageOrderValue: number;
lastOrderDate: Date;
favoriteCategory: string;
ordersByMonth: Record<string, number>;
}
export class CustomerAnalyticsView implements EventView {
consumerGroup = "CustomerAnalyticsView";
streamsToSubscribe = ["orders", "payments"];
async handleEvent(event: EventicleEvent): Promise<void> {
const store = await dataStore();
switch (event.type) {
case "OrderCreated":
await this.updateCustomerAnalytics(
event.payload.customerId,
{
newOrder: true,
orderDate: event.payload.createdAt,
items: event.payload.items
}
);
break;
case "PaymentCompleted":
await this.updateCustomerAnalytics(
event.payload.customerId,
{
payment: true,
amount: event.payload.amount,
paymentDate: event.payload.completedAt
}
);
break;
}
}
private async updateCustomerAnalytics(
customerId: string,
update: any
): Promise<void> {
const store = await dataStore();
let analytics = await store.load("customer-analytics", customerId) || {
customerId,
totalOrders: 0,
totalSpent: 0,
averageOrderValue: 0,
lastOrderDate: new Date(0),
favoriteCategory: "",
ordersByMonth: {}
};
if (update.newOrder) {
analytics.totalOrders++;
analytics.lastOrderDate = update.orderDate;
// Update monthly statistics
const monthKey = update.orderDate.toISOString().slice(0, 7); // YYYY-MM
analytics.ordersByMonth[monthKey] = (analytics.ordersByMonth[monthKey] || 0) + 1;
// Update favorite category
await this.updateFavoriteCategory(analytics, update.items);
}
if (update.payment) {
analytics.totalSpent += update.amount;
analytics.averageOrderValue = analytics.totalOrders > 0
? analytics.totalSpent / analytics.totalOrders
: 0;
}
await store.save("customer-analytics", customerId, analytics);
}
private async updateFavoriteCategory(analytics: any, items: any[]) {
const categoryCount: Record<string, number> = {};
items.forEach(item => {
categoryCount[item.category] = (categoryCount[item.category] || 0) + 1;
});
let maxCount = 0;
let favoriteCategory = "";
Object.entries(categoryCount).forEach(([category, count]) => {
if (count > maxCount) {
maxCount = count;
favoriteCategory = category;
}
});
analytics.favoriteCategory = favoriteCategory;
}
// Query methods
async getCustomerAnalytics(customerId: string): Promise<OrderAnalytics | null> {
const store = await dataStore();
return await store.load("customer-analytics", customerId);
}
async getTopCustomers(limit: number = 10): Promise<OrderAnalytics[]> {
const store = await dataStore();
const allAnalytics = await store.scan("customer-analytics");
return allAnalytics
.sort((a, b) => b.totalSpent - a.totalSpent)
.slice(0, limit);
}
}
Raw Event Views
Raw Event Views receive all events without processing guarantees:
import { RawEventView, EventicleEvent } from "@eventicle/eventiclejs";
export class AuditLogView implements RawEventView {
streamsToSubscribe = ["*"]; // Subscribe to all streams
async handleEvent(event: EventicleEvent): Promise<void> {
// Log all events for auditing
console.log("AUDIT:", {
timestamp: new Date(),
eventId: event.id,
eventType: event.type,
stream: event.stream,
domainId: event.domainId,
payload: event.payload
});
// Could also send to external audit system
await this.sendToAuditSystem(event);
}
private async sendToAuditSystem(event: EventicleEvent) {
// Integration with external audit/logging system
// await auditService.logEvent(event);
}
}
export class MetricsView implements RawEventView {
streamsToSubscribe = ["orders", "payments", "users"];
async handleEvent(event: EventicleEvent): Promise<void> {
// Send metrics to monitoring system
await this.recordMetric(`event.${event.type}`, 1, {
stream: event.stream,
timestamp: event.timestamp
});
// Record business metrics
if (event.type === "OrderCreated") {
await this.recordMetric("business.order.created", 1);
await this.recordMetric("business.order.value", event.payload.totalAmount);
}
}
private async recordMetric(name: string, value: number, tags?: any) {
// Integration with metrics system (Prometheus, StatsD, etc.)
console.log(`METRIC: ${name} = ${value}`, tags);
}
}
View Registration and Hydration
Registering Views
import { registerView, registerRawView } from "@eventicle/eventiclejs";
// Register standard views
registerView(new ProductCatalogView());
registerView(new CustomerAnalyticsView());
// Register raw views
registerRawView(new AuditLogView());
registerRawView(new MetricsView());
View Hydration
Hydrate views with historical data:
import { viewHydrator, eventClient } from "@eventicle/eventiclejs";
// Hydrate a single view
const productView = new ProductCatalogView();
await viewHydrator(productView, "products").replay();
// Hydrate multiple streams
await viewHydrator(productView, ["products", "inventory"]).replay();
// Hydrate with custom range
await viewHydrator(productView, "products").replayFromPosition(1000);
// Hydrate and continue processing
await viewHydrator(productView, "products").replayAndContinue();
View Performance Optimization
Parallel Processing
export class HighVolumeView implements EventView {
consumerGroup = "HighVolumeView";
streamsToSubscribe = ["transactions"];
parallelEventCount = 50; // Process up to 50 events concurrently
async handleEvent(event: EventicleEvent): Promise<void> {
// High-throughput event processing
await this.processEventEfficiently(event);
}
private async processEventEfficiently(event: EventicleEvent) {
// Batch operations, use connection pooling, etc.
}
}
Batch Processing
export class BatchProcessingView implements EventView {
consumerGroup = "BatchProcessingView";
streamsToSubscribe = ["orders"];
private eventBatch: EventicleEvent[] = [];
private batchSize = 100;
private batchTimeout = 5000; // 5 seconds
async handleEvent(event: EventicleEvent): Promise<void> {
this.eventBatch.push(event);
if (this.eventBatch.length >= this.batchSize) {
await this.processBatch();
} else {
// Set timeout to process partial batch
setTimeout(() => this.processBatch(), this.batchTimeout);
}
}
private async processBatch() {
if (this.eventBatch.length === 0) return;
const batch = [...this.eventBatch];
this.eventBatch = [];
// Process batch efficiently
await this.bulkInsert(batch);
}
private async bulkInsert(events: EventicleEvent[]) {
// Efficient bulk operations
const store = await dataStore();
// await store.bulkSave(events.map(e => ({ key: e.domainId, value: e })));
}
}
Error Handling in Views
export class RobustView implements EventView {
consumerGroup = "RobustView";
streamsToSubscribe = ["orders"];
async handleEvent(event: EventicleEvent): Promise<void> {
const maxRetries = 3;
let attempt = 0;
while (attempt < maxRetries) {
try {
await this.processEvent(event);
return; // Success
} catch (error) {
attempt++;
console.error(`View processing error (attempt ${attempt}):`, error);
if (attempt >= maxRetries) {
// Send to dead letter queue or alert
await this.handleFailedEvent(event, error);
throw error;
}
// Exponential backoff
await this.delay(Math.pow(2, attempt) * 1000);
}
}
}
private async processEvent(event: EventicleEvent) {
// Main processing logic
}
private async handleFailedEvent(event: EventicleEvent, error: any) {
// Log to monitoring system
console.error("Failed to process event after retries:", {
eventId: event.id,
eventType: event.type,
error: error.message
});
// Could send to dead letter queue
// await deadLetterQueue.send(event);
}
private delay(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
View Testing
import {
eventClientInMemory,
setEventClient,
InMemoryDatastore,
setDataStore
} from "@eventicle/eventiclejs";
describe("ProductCatalogView", () => {
let view: ProductCatalogView;
beforeEach(() => {
// Set up test environment
setEventClient(eventClientInMemory());
setDataStore(new InMemoryDatastore());
view = new ProductCatalogView();
});
it("should create product on ProductCreated event", async () => {
const event = {
id: "event-1",
type: "ProductCreated",
stream: "products",
domainId: "product-123",
timestamp: Date.now(),
payload: {
productId: "product-123",
name: "Test Product",
price: 29.99,
category: "Electronics",
createdAt: new Date()
}
};
await view.handleEvent(event);
const product = await view.getProduct("product-123");
expect(product).toBeDefined();
expect(product.name).toBe("Test Product");
expect(product.price).toBe(29.99);
});
it("should update product price on ProductPriceUpdated event", async () => {
// First create a product
await view.handleEvent({
id: "event-1",
type: "ProductCreated",
stream: "products",
domainId: "product-123",
timestamp: Date.now(),
payload: {
productId: "product-123",
name: "Test Product",
price: 29.99,
category: "Electronics",
createdAt: new Date()
}
});
// Then update the price
await view.handleEvent({
id: "event-2",
type: "ProductPriceUpdated",
stream: "products",
domainId: "product-123",
timestamp: Date.now(),
payload: {
productId: "product-123",
newPrice: 24.99,
updatedAt: new Date()
}
});
const product = await view.getProduct("product-123");
expect(product.price).toBe(24.99);
});
});
Best Practices
-
Design for Queries: Structure views based on how data will be queried
-
Handle Missing Data: Always check if related data exists before updating
-
Implement Idempotency: Views should handle duplicate events gracefully
-
Use Appropriate View Type: Standard views for consistency, raw views for metrics
-
Optimize for Read Performance: Denormalize data for efficient queries
-
Handle Errors Gracefully: Implement retry logic and error reporting
-
Monitor View Lag: Track how far behind views are from the event stream
-
Version Views: Plan for view schema evolution
Next Steps
-
Learn about Creating Event Views in detail
-
Explore Performance Optimization for views
-
Understand Raw Event Views use cases
-
See Testing Event Views patterns