Event Clients
Event clients are the core abstraction in Eventicle for storing and retrieving events. This guide covers the different types of event clients and how to configure them for your needs.
Overview
An EventClient provides the interface for:
-
Emitting events to streams
-
Subscribing to event streams
-
Replaying historical events
-
Managing consumer groups
Eventicle provides several implementations:
-
In-Memory: For development and testing
-
Kafka: For production event streaming
-
Datastore: For PostgreSQL-backed event storage
-
Transactional: For coordinating events with database transactions
The EventClient Interface
All event clients implement this core interface:
interface EventClient {
// Emit events
emit(events: EventicleEvent[]): Promise<void>;
// Subscribe to new events
subscribe(options: {
stream: string;
consumerGroup: string;
handler: (event: EventicleEvent) => Promise<void>;
parallelEventCount?: number;
}): void;
// Replay events from beginning
coldReplay(options: {
stream: string;
handler: (event: EventicleEvent) => Promise<void>;
}): Promise<void>;
// Replay then continue subscribing
hotReplay(options: {
stream: string;
consumerGroup: string;
handler: (event: EventicleEvent) => Promise<void>;
}): Promise<void>;
}
In-Memory Event Client
Perfect for development and testing:
import { setEventClient, eventClientInMemory } from "@eventicle/eventiclejs";
// Basic setup
setEventClient(eventClientInMemory());
// With custom configuration
setEventClient(eventClientInMemory({
maxEventsPerStream: 10000, // Limit stream size
logger: customLogger // Custom logging
}));
Kafka Event Client
The recommended choice for production:
import { setEventClient, eventClientOnKafka } from "@eventicle/eventiclejs";
setEventClient(eventClientOnKafka({
brokers: ["kafka1:9092", "kafka2:9092"],
clientId: "my-service",
// Consumer configuration
consumerConfig: {
groupId: "my-service-group",
sessionTimeout: 30000,
heartbeatInterval: 3000,
maxBytesPerPartition: 1048576,
maxWaitTimeInMs: 100
},
// Producer configuration
producerConfig: {
allowAutoTopicCreation: true,
transactionTimeout: 60000,
compression: CompressionTypes.GZIP
},
// Batching configuration
batching: {
size: 100, // Batch size
lingerMs: 10 // Max wait time
}
}));
Advanced Kafka Configuration
Topic Management
// Automatic topic creation with configuration
setEventClient(eventClientOnKafka({
brokers: ["localhost:9092"],
topicConfig: {
numPartitions: 6,
replicationFactor: 3,
configEntries: [
{ name: "retention.ms", value: "604800000" }, // 7 days
{ name: "compression.type", value: "gzip" },
{ name: "min.insync.replicas", value: "2" }
]
}
}));
Consumer Group Management
// Multiple consumer groups for same stream
const client = eventClientOnKafka({
brokers: ["localhost:9092"],
groupId: "default-group"
});
// Analytics consumer
client.subscribe({
stream: "orders",
consumerGroup: "analytics",
handler: analyticsHandler
});
// Notification consumer
client.subscribe({
stream: "orders",
consumerGroup: "notifications",
handler: notificationHandler
});
Throttling and Backpressure
import { KafkaThrottle } from "@eventicle/eventiclejs";
// Configure throttling
const throttle = new KafkaThrottle({
maxConcurrent: 10, // Max concurrent messages
maxPerSecond: 1000 // Rate limit
});
// Apply to subscriptions
client.subscribe({
stream: "high-volume",
consumerGroup: "processor",
parallelEventCount: 10,
throttle: throttle,
handler: async (event) => {
await processEvent(event);
}
});
PostgreSQL Datastore Event Client
For SQL-based event storage:
import {
setEventClient,
eventClientOnDatastore,
setDataStore
} from "@eventicle/eventiclejs";
// Configure PostgreSQL connection
const pgDatastore = new PostgresDatastore({
host: process.env.DB_HOST,
port: 5432,
database: process.env.DB_NAME,
user: process.env.DB_USER,
password: process.env.DB_PASSWORD,
// Connection pool settings
max: 20,
idleTimeoutMillis: 30000,
connectionTimeoutMillis: 2000
});
setDataStore(pgDatastore);
setEventClient(eventClientOnDatastore());
Schema Setup
The datastore client requires these tables:
-- Events table
CREATE TABLE events (
id UUID PRIMARY KEY,
stream VARCHAR(255) NOT NULL,
domain_id VARCHAR(255) NOT NULL,
type VARCHAR(255) NOT NULL,
sequence BIGSERIAL,
timestamp BIGINT NOT NULL,
payload JSONB,
metadata JSONB,
caused_by_id UUID,
caused_by_type VARCHAR(255),
INDEX idx_stream_sequence (stream, sequence),
INDEX idx_domain_id (domain_id),
INDEX idx_timestamp (timestamp)
);
-- Consumer positions
CREATE TABLE consumer_positions (
consumer_group VARCHAR(255),
stream VARCHAR(255),
position BIGINT,
updated_at TIMESTAMP DEFAULT NOW(),
PRIMARY KEY (consumer_group, stream)
);
Transactional Event Client
Coordinate events with database transactions:
import { TransactionalEventClient } from "@eventicle/eventiclejs";
const txClient = new TransactionalEventClient({
baseClient: eventClientOnDatastore(),
datastore: pgDatastore
});
// Use within a transaction
await datastore.transaction(async (tx) => {
// Database operations
await tx.query("INSERT INTO users ...");
// Events are only emitted if transaction succeeds
await txClient.emit([{
type: "UserCreated",
stream: "users",
domainId: userId
}]);
});
Idempotent Event Client
Prevent duplicate event processing:
import { IdempotentEventClient } from "@eventicle/eventiclejs";
const idempotentClient = new IdempotentEventClient({
client: eventClientOnKafka({ /* config */ }),
// Deduplication window
deduplicationWindow: 3600000, // 1 hour
// Storage for processed events
storage: new RedisStorage({
host: "localhost",
port: 6379
})
});
setEventClient(idempotentClient);
Clean Starting Proxy Client
Ensure clean startup by removing incomplete events:
import { cleanStartingProxyEventClient } from "@eventicle/eventiclejs";
const cleanClient = cleanStartingProxyEventClient({
eventClient: eventClientOnKafka({ /* config */ }),
eventSourceName: "my-service",
// Remove events from last 5 minutes on startup
cleanupWindow: 300000
});
setEventClient(cleanClient);
Choosing an Event Client
Event Client Middleware
Create custom event client behavior:
class LoggingEventClient implements EventClient {
constructor(private wrapped: EventClient) {}
async emit(events: EventicleEvent[]): Promise<void> {
console.log(`Emitting ${events.length} events`);
await this.wrapped.emit(events);
}
subscribe(options: SubscribeOptions): void {
const wrappedHandler = async (event: EventicleEvent) => {
console.log(`Processing: ${event.type}`);
await options.handler(event);
};
this.wrapped.subscribe({
...options,
handler: wrappedHandler
});
}
// Implement other methods...
}
// Use the middleware
const baseClient = eventClientOnKafka({ /* config */ });
const loggingClient = new LoggingEventClient(baseClient);
setEventClient(loggingClient);
Monitoring and Observability
Metrics
import { metrics } from "@eventicle/eventiclejs";
// Enable metrics collection
metrics.enable({
eventEmitted: true,
eventProcessed: true,
processingDuration: true,
consumerLag: true
});
// Access metrics
const stats = metrics.getStats();
console.log("Events processed:", stats.eventsProcessed);
console.log("Average duration:", stats.avgProcessingTime);
Health Checks
// Kafka health check
async function checkKafkaHealth(): Promise<boolean> {
try {
const admin = kafka.admin();
await admin.listTopics();
await admin.disconnect();
return true;
} catch (error) {
console.error("Kafka health check failed:", error);
return false;
}
}
// Datastore health check
async function checkDatastoreHealth(): Promise<boolean> {
try {
await datastore.query("SELECT 1");
return true;
} catch (error) {
console.error("Datastore health check failed:", error);
return false;
}
}
Best Practices
-
Choose the Right Client: Match the client to your requirements
-
Configure Appropriately: Tune settings for your workload
-
Monitor Performance: Track metrics and lag
-
Handle Errors: Implement proper error handling
-
Test Thoroughly: Use in-memory client for tests
Next Steps
-
Learn about Aggregate Roots for event generation
-
Explore Event Views for consuming events
-
Understand Transaction Handling