Event Clients

Event clients are the core abstraction in Eventicle for storing and retrieving events. This guide covers the different types of event clients and how to configure them for your needs.

Overview

An EventClient provides the interface for:

  • Emitting events to streams

  • Subscribing to event streams

  • Replaying historical events

  • Managing consumer groups

Eventicle provides several implementations:

  • In-Memory: For development and testing

  • Kafka: For production event streaming

  • Datastore: For PostgreSQL-backed event storage

  • Transactional: For coordinating events with database transactions

The EventClient Interface

All event clients implement this core interface:

interface EventClient {
  // Emit events
  emit(events: EventicleEvent[]): Promise<void>;

  // Subscribe to new events
  subscribe(options: {
    stream: string;
    consumerGroup: string;
    handler: (event: EventicleEvent) => Promise<void>;
    parallelEventCount?: number;
  }): void;

  // Replay events from beginning
  coldReplay(options: {
    stream: string;
    handler: (event: EventicleEvent) => Promise<void>;
  }): Promise<void>;

  // Replay then continue subscribing
  hotReplay(options: {
    stream: string;
    consumerGroup: string;
    handler: (event: EventicleEvent) => Promise<void>;
  }): Promise<void>;
}

In-Memory Event Client

Perfect for development and testing:

import { setEventClient, eventClientInMemory } from "@eventicle/eventiclejs";

// Basic setup
setEventClient(eventClientInMemory());

// With custom configuration
setEventClient(eventClientInMemory({
  maxEventsPerStream: 10000,  // Limit stream size
  logger: customLogger         // Custom logging
}));

Features

  • Zero external dependencies

  • Fast performance for tests

  • Full API compatibility

  • Automatic event ordering

Limitations

  • Data lost on restart

  • No persistence

  • Single process only

  • Limited by available memory

Kafka Event Client

The recommended choice for production:

import { setEventClient, eventClientOnKafka } from "@eventicle/eventiclejs";

setEventClient(eventClientOnKafka({
  brokers: ["kafka1:9092", "kafka2:9092"],
  clientId: "my-service",

  // Consumer configuration
  consumerConfig: {
    groupId: "my-service-group",
    sessionTimeout: 30000,
    heartbeatInterval: 3000,
    maxBytesPerPartition: 1048576,
    maxWaitTimeInMs: 100
  },

  // Producer configuration
  producerConfig: {
    allowAutoTopicCreation: true,
    transactionTimeout: 60000,
    compression: CompressionTypes.GZIP
  },

  // Batching configuration
  batching: {
    size: 100,              // Batch size
    lingerMs: 10            // Max wait time
  }
}));

Advanced Kafka Configuration

Topic Management

// Automatic topic creation with configuration
setEventClient(eventClientOnKafka({
  brokers: ["localhost:9092"],
  topicConfig: {
    numPartitions: 6,
    replicationFactor: 3,
    configEntries: [
      { name: "retention.ms", value: "604800000" },      // 7 days
      { name: "compression.type", value: "gzip" },
      { name: "min.insync.replicas", value: "2" }
    ]
  }
}));

Consumer Group Management

// Multiple consumer groups for same stream
const client = eventClientOnKafka({
  brokers: ["localhost:9092"],
  groupId: "default-group"
});

// Analytics consumer
client.subscribe({
  stream: "orders",
  consumerGroup: "analytics",
  handler: analyticsHandler
});

// Notification consumer
client.subscribe({
  stream: "orders",
  consumerGroup: "notifications",
  handler: notificationHandler
});

Throttling and Backpressure

import { KafkaThrottle } from "@eventicle/eventiclejs";

// Configure throttling
const throttle = new KafkaThrottle({
  maxConcurrent: 10,      // Max concurrent messages
  maxPerSecond: 1000      // Rate limit
});

// Apply to subscriptions
client.subscribe({
  stream: "high-volume",
  consumerGroup: "processor",
  parallelEventCount: 10,
  throttle: throttle,
  handler: async (event) => {
    await processEvent(event);
  }
});

PostgreSQL Datastore Event Client

For SQL-based event storage:

import {
  setEventClient,
  eventClientOnDatastore,
  setDataStore
} from "@eventicle/eventiclejs";

// Configure PostgreSQL connection
const pgDatastore = new PostgresDatastore({
  host: process.env.DB_HOST,
  port: 5432,
  database: process.env.DB_NAME,
  user: process.env.DB_USER,
  password: process.env.DB_PASSWORD,

  // Connection pool settings
  max: 20,
  idleTimeoutMillis: 30000,
  connectionTimeoutMillis: 2000
});

setDataStore(pgDatastore);
setEventClient(eventClientOnDatastore());

Schema Setup

The datastore client requires these tables:

-- Events table
CREATE TABLE events (
  id UUID PRIMARY KEY,
  stream VARCHAR(255) NOT NULL,
  domain_id VARCHAR(255) NOT NULL,
  type VARCHAR(255) NOT NULL,
  sequence BIGSERIAL,
  timestamp BIGINT NOT NULL,
  payload JSONB,
  metadata JSONB,
  caused_by_id UUID,
  caused_by_type VARCHAR(255),
  INDEX idx_stream_sequence (stream, sequence),
  INDEX idx_domain_id (domain_id),
  INDEX idx_timestamp (timestamp)
);

-- Consumer positions
CREATE TABLE consumer_positions (
  consumer_group VARCHAR(255),
  stream VARCHAR(255),
  position BIGINT,
  updated_at TIMESTAMP DEFAULT NOW(),
  PRIMARY KEY (consumer_group, stream)
);

Features

  • ACID compliance

  • SQL query capabilities

  • Integrated with existing databases

  • Transaction support

Transactional Event Client

Coordinate events with database transactions:

import { TransactionalEventClient } from "@eventicle/eventiclejs";

const txClient = new TransactionalEventClient({
  baseClient: eventClientOnDatastore(),
  datastore: pgDatastore
});

// Use within a transaction
await datastore.transaction(async (tx) => {
  // Database operations
  await tx.query("INSERT INTO users ...");

  // Events are only emitted if transaction succeeds
  await txClient.emit([{
    type: "UserCreated",
    stream: "users",
    domainId: userId
  }]);
});

Idempotent Event Client

Prevent duplicate event processing:

import { IdempotentEventClient } from "@eventicle/eventiclejs";

const idempotentClient = new IdempotentEventClient({
  client: eventClientOnKafka({ /* config */ }),

  // Deduplication window
  deduplicationWindow: 3600000, // 1 hour

  // Storage for processed events
  storage: new RedisStorage({
    host: "localhost",
    port: 6379
  })
});

setEventClient(idempotentClient);

Clean Starting Proxy Client

Ensure clean startup by removing incomplete events:

import { cleanStartingProxyEventClient } from "@eventicle/eventiclejs";

const cleanClient = cleanStartingProxyEventClient({
  eventClient: eventClientOnKafka({ /* config */ }),
  eventSourceName: "my-service",

  // Remove events from last 5 minutes on startup
  cleanupWindow: 300000
});

setEventClient(cleanClient);

Choosing an Event Client

Development and Testing

Use the in-memory client:

  • Fast test execution

  • No infrastructure needed

  • Deterministic behavior

  • Easy debugging

Production - High Throughput

Use Kafka for:

  • High message volumes

  • Multiple consumers

  • Stream processing

  • Horizontal scaling

Production - Transactional

Use PostgreSQL for:

  • ACID requirements

  • Smaller event volumes

  • Existing PostgreSQL infrastructure

  • SQL query needs

Event Client Middleware

Create custom event client behavior:

class LoggingEventClient implements EventClient {
  constructor(private wrapped: EventClient) {}

  async emit(events: EventicleEvent[]): Promise<void> {
    console.log(`Emitting ${events.length} events`);
    await this.wrapped.emit(events);
  }

  subscribe(options: SubscribeOptions): void {
    const wrappedHandler = async (event: EventicleEvent) => {
      console.log(`Processing: ${event.type}`);
      await options.handler(event);
    };

    this.wrapped.subscribe({
      ...options,
      handler: wrappedHandler
    });
  }

  // Implement other methods...
}

// Use the middleware
const baseClient = eventClientOnKafka({ /* config */ });
const loggingClient = new LoggingEventClient(baseClient);
setEventClient(loggingClient);

Monitoring and Observability

Metrics

import { metrics } from "@eventicle/eventiclejs";

// Enable metrics collection
metrics.enable({
  eventEmitted: true,
  eventProcessed: true,
  processingDuration: true,
  consumerLag: true
});

// Access metrics
const stats = metrics.getStats();
console.log("Events processed:", stats.eventsProcessed);
console.log("Average duration:", stats.avgProcessingTime);

Health Checks

// Kafka health check
async function checkKafkaHealth(): Promise<boolean> {
  try {
    const admin = kafka.admin();
    await admin.listTopics();
    await admin.disconnect();
    return true;
  } catch (error) {
    console.error("Kafka health check failed:", error);
    return false;
  }
}

// Datastore health check
async function checkDatastoreHealth(): Promise<boolean> {
  try {
    await datastore.query("SELECT 1");
    return true;
  } catch (error) {
    console.error("Datastore health check failed:", error);
    return false;
  }
}

Best Practices

  1. Choose the Right Client: Match the client to your requirements

  2. Configure Appropriately: Tune settings for your workload

  3. Monitor Performance: Track metrics and lag

  4. Handle Errors: Implement proper error handling

  5. Test Thoroughly: Use in-memory client for tests

Next Steps