package @eventicle/eventiclejs
Classes
| Name | Summary |
|---|---|
Base class for implementing event-sourced aggregate roots. Aggregate roots are the core building blocks of domain-driven design and event sourcing. They encapsulate business logic, maintain consistency boundaries, and generate events that represent state changes. The aggregate’s current state is rebuilt by replaying its historical events through reducer functions. Key Features: - *\*Event Sourcing\**: State is derived from events, not stored directly - \**Business Logic\**: Encapsulates domain rules and invariants - \**Event Generation\**: Emits events when state changes occur - \**Immutable History\**: Complete audit trail of all changes - \**Checkpointing\*\*: Optional performance optimization for large event histories |
|
In memory Datastore implementation. This is suitable for testing and single instance demonstration purposes. |
|
A Scheduled Job Runner that uses node-cron, setTimeout and runs as a single node. Persists and loads schedules on process stop/ start in the datastore with the types |
|
Defines a long-running business process that coordinates across events and time. Sagas implement the Saga pattern for managing complex workflows that span multiple aggregates, external services, and time-based operations. They provide stateful event processing with support for timers, error handling, and process coordination. TimeoutNames - Union type of timer names this saga can schedule InstanceData - Type of the saga’s persistent state data Key Features: - *\*Stateful Processing\**: Maintains state across multiple events - \**Timer Support\**: Schedule delays and recurring operations - \**Error Handling\**: Custom error handling and compensation logic - \**Event Correlation\**: Match events to specific saga instances - \**Parallel Processing\*\*: Configurable concurrency for high throughput |
|
Represents a single running instance of a saga workflow. SagaInstance contains the runtime state and execution context for a specific saga execution. Each instance tracks its data, manages timers, and provides methods for saga lifecycle management. TimeoutNames - Union type of timer names this saga can schedule T - Type of the saga’s persistent data |
Interfaces
| Name | Summary |
|---|---|
Defines a command handler in Eventicle’s CQRS architecture. Commands encapsulate business operations that change system state and emit domain events. They provide the "C" in CQRS, handling write operations while maintaining strong consistency and business rule enforcement. I - Input data type for the command O - Output/response type from the command Key Features: - *\*Business Logic\**: Encapsulates domain operations and invariants - \**Event Generation\**: Produces events representing state changes - \**Type Safety\**: Strongly-typed input/output contracts - \**Transactional\**: Atomic execution with event emission - \**Scalable\*\*: Can be distributed across multiple instances |
|
Represents a request to execute a command with typed payload data. CommandIntent provides a standardized way to request command execution in Eventicle’s CQRS architecture. It decouples command requests from their implementations, enabling features like remote command execution, load balancing, and message-based architectures. T - The type of the command payload data |
|
Standard return type for all command executions in Eventicle. CommandReturn provides a consistent interface for command results, separating the response data (returned to caller) from domain events (published to streams). This enables clean separation between synchronous API responses and asynchronous event-driven side effects. T - Type of the response data returned to the caller |
|
Represents an event in its encoded, wire-ready format. EncodedEvent is the transport representation of an EventicleEvent, containing the serialized event data as a buffer along with metadata headers. This format is used by EventClient implementations for efficient network transmission and storage. |
|
Defines an event adapter for real-time event processing and integration. EventAdapter provides a hot-subscription pattern for processing live events as they occur, without replaying historical data. Unlike EventView which processes both historical and live events, adapters focus on real-time integration scenarios like external system synchronization, notifications, and live data feeds. Key Characteristics: - *\*Hot Subscription Only\**: Processes new events, never replays history - \**External Integration\**: Designed for pushing data to external systems - \**Transactional\**: Each event is processed within a database transaction - \**Error Handling\**: Built-in error recovery and monitoring - \**Consumer Groups\*\*: Supports load balancing across multiple instances |
|
Core interface for event streaming clients in Eventicle. EventClient provides the fundamental operations for working with event streams: publishing events, subscribing to streams, and replaying historical events. Different implementations support various backends like Kafka, Redis, or in-memory storage for development and testing. Stream Types: - *\*Hot Streams\**: Only new events (live subscription) - \**Cold Streams\**: Historical events from the beginning - \**Cold-Hot Streams\*\*: Historical events followed by live subscription |
|
Codec interface for converting between domain events and wire format. EventClientCodec provides the serialization/deserialization layer between EventicleEvent (domain representation) and EncodedEvent (wire representation). Different implementations can provide JSON, Avro, Protocol Buffers, or custom encoding schemes. The codec is responsible for: - Serializing event payloads and metadata - Handling distributed tracing headers - Version compatibility and schema evolution - Compression and encryption (if needed) |
|
Core event interface representing something that happened in the system. EventicleEvent is the fundamental building block of event-driven architectures in Eventicle. Events are immutable facts about state changes that occurred, providing the foundation for event sourcing, CQRS, and saga coordination. T - The type of the event payload data |
|
Defines an event view for building read-side projections in CQRS architectures. EventView implements the "Q" (Query) side of CQRS, processing domain events to build optimized read models, projections, and materialized views. Views enable efficient querying by maintaining denormalized data structures tailored for specific read patterns. Key Features: - *\*Event Processing\**: Handles events from multiple streams - \**Consumer Groups\**: Enables load balancing across multiple instances - \**Parallel Processing\**: Configurable concurrency for high throughput - \**Cold/Hot Replay\**: Processes historical events then continues with live events - \**Error Handling\*\*: Built-in error recovery and monitoring |
|
Defines a view for processing events in their raw, encoded format. RawEventView enables processing events without decoding them into domain objects, which is useful for scenarios requiring high performance, binary storage, or custom encoding schemes. Events are processed as EncodedEvent objects containing the raw buffer and metadata headers. Use Cases: - *\*High Performance\**: Avoid decoding overhead for throughput-sensitive scenarios - \**Binary Storage\**: Store events in their original encoded format - \**Custom Processing\**: Implement custom decoding or transformation logic - \**Event Forwarding\**: Route events to external systems without modification - \**Audit Trails\*\*: Preserve exact event format for compliance requirements |
Functions
aggregateObservationAdapter()
aggregateObservationAdapter: () => AggregateObservationAdapter
aggregateObserver(aggregateType, id, timeout, exec)
Block for an asynchronous, event based, workflow to occur.
Reads streams and allows short lived observers to be notified if a particular AggregateRoot instance has been modified. This is picked out based on the domainId of the event.
This allows a synchronous API to block and wait for asynchronous changes to finish occurring before continuing processing
For example, given a User AggregateRoot, you can wait for a Saga (or other operational component) to approve a user account, by blocking until the given User has raised the user.approved event.
export class UserAggregate extends AggregateRoot {
approved: boolean = false
constructor() {
super("User")
this.reducers = []
// persist the approval in the current aggregate state.
// This avoids any potential race conditions between performing the action that requires
// approval and the observer starting to monitor. See below.
this.reducers["user.approved"] = (ev) => {
this.approved = true
}
}
approve() {
this.raiseEvent({
type: "user.approved",
data: {}
// other event fields
})
}
}
First register the observer. This will open a new event subscription, with a dynamically generated consumer group. All events on the topic will be received by this instances, so be mindful of doing this on topics with large volumes of data.
await registerAdapter(createAggregateObservationAdapter([AnAggregate]));
In your synchronous API (eg, HTTP/ REST, GQL).
// The userId is the domainId of the UserAggregate instance we want to observe
export doActionThatRequiresApproval(userId: string) {
// perform some action that triggers async workflows
// now wait for the User to be approved
const user = await aggregateObserver(
UserAggregate,
userId, // the UserAggregate domainId
10000, // max time to wait for the User instance to come into the required state
(ar, ev) => {
// this is called once straight away with just the current state of the User aggregate (ev is null)
// then, every time an event is observed for the UserAggregate with the domainId == userId
// check the current state of the aggregate is what you want to see, or the event being received
// is the one you are waiting for. Or, ideally, both of those things
// doing both will avoid any race conditions.
return ev && ev.type == "user.approved" || ar.approved
});
// user is now approved, or an Error has been thrown
// perform any ops on the user as required.
}
| Name | Type | Description |
|---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
Returns |
|
export declare function aggregateObserver<AR extends AggregateRoot>(aggregateType: {
new (...params: any[]): AR;
}, id: string, timeout: number, exec: (ar: AR, event?: EventicleEvent) => boolean): Promise<AR>;
allSagaInstances(workspaceId)
| Name | Type | Description |
|---|---|---|
|
|
|
Returns |
|
export declare function allSagaInstances(workspaceId?: string): Promise<SagaInstance<any, any>[]>;
apm.apmJoinEvent(event, name, type, subtype)
Will join the APM transaction that is stamped in the event header - apmTrace
This will generate a new span within the APM transaction.
| Name | Type | Description |
|---|---|---|
|
the event to join the APM trace on |
|
|
|
The name of the APM transaction |
|
|
the APM type |
|
|
the APM subtype |
Returns |
|
export declare function apmJoinEvent(event: EventicleEvent, name: string, type: string, subtype: string): void;
connectBroker(config)
| Name | Type | Description |
|---|---|---|
|
|
|
Returns |
|
export declare function connectBroker(config: KafkaConfig): Promise<void>;
consumeFullEventLog(stream)
Will cold replay the entire event stream into a list.
Mostly useful for testing, use against long running event streams/ Kafka will blow your local memory!
| Name | Type | Description |
|---|---|---|
|
|
The event stream to cold replay |
Returns |
|
export declare function consumeFullEventLog(stream: string): Promise<EventicleEvent[]>;
createAggregateObservationAdapter(aggregates)
Create a new EventAdapter with a dynamic Consumer group id that will subscribe to the topics for the given AggregateRoots
| Name | Type | Description |
|---|---|---|
|
|
the list of aggregate roots to observe. Their topic names will be extracted. |
Returns |
|
export declare function createAggregateObservationAdapter<AR extends AggregateRoot>(aggregates: {
new (): AR;
}[]): AggregateObservationAdapter;
dispatchCommand(commandIntent)
This will lookup a pre-registered Command from the , then execute the command with the given payload.
It is generally preferred dispatchDirectCommand where the command definition is implicit, and more fully type checked.
This, along with dispatchCommand is available if you wish to separate your code more fully, or introduce a remote capable message based command bus.
| Name | Type | Description |
|---|---|---|
|
||
Returns |
|
export declare function dispatchCommand<T>(commandIntent: CommandIntent<T>): Promise<CommandReturn<T>>;
dispatchDirectCommand(command, streamToEmit, transactionControl)
Dispatch a command directly, without a CommandIntent message in between.
Cannot be distributed or load balanced, but requires less boilerplate.
| Name | Type | Description |
|---|---|---|
|
|
|
|
|
|
|
|
|
Returns |
|
export declare function dispatchDirectCommand<T>(command: () => Promise<CommandReturn<T>>, streamToEmit: string, transactionControl?: TransactionOptions): Promise<T>;
eventClientOnDatastore()
This is a test capable event client.
It fully implements the event client semantics, and persists its events into the given data store.
Good to pair with the InMemDataStore for testing and local dev usage.
Not recommended for production (really!), as you disable any possibility of distribution
export declare function eventClientOnDatastore(): EventClient;
eventClientOnKafka(config, consumerConfig, onTopicFailureConfig)
| Name | Type | Description |
|---|---|---|
|
|
as per kafkjs |
|
|
as per kafkajs |
|
|
If a consumer fails because the topic doesn’t exist, configure this to request the topic is auto generated with the given config |
Returns |
|
export declare function eventClientOnKafka(config: KafkaConfig, consumerConfig?: ConsumerConfigFactory, onTopicFailureConfig?: (topicName: any) => Promise<TopicFailureConfiguration>): Promise<EventClient>;
metrics()
export declare function metrics(): {
"view-latency": any;
"adapter-latency": any;
"saga-latency": any;
};
registerAdapter(adapter)
This will connect the given EventAdapter to event streams.
An EventAdapter is a hot subscription, and will receive events emitted after it first connects.
If it is offline for a period, the backing event store (eg, Kafka) will allow the adapter to reconnect and pick up from where it had previous processed up to.
| Name | Type | Description |
|---|---|---|
|
||
Returns |
|
export declare function registerAdapter(adapter: EventAdapter): Promise<EventSubscriptionControl>;
registerCommand(command)
| Name | Type | Description |
|---|---|---|
|
|
|
Returns |
|
export declare function registerCommand<I, O>(command: Command<I, O>): void;
registerRawView(view)
Will register a raw event view
This subscribes it to the appropriate event streams. For every event received, handeEvent will be called.
Events are not processed through the EventClientCodec, and so are observed encoded as an EncodedEvent
This can be useful if you want to persist the event in a raw form, as a binary encoded stream.
| Name | Type | Description |
|---|---|---|
|
The View to subscribe to event streams |
|
Returns |
|
export declare function registerRawView(view: RawEventView): Promise<EventSubscriptionControl>;
registerSaga(saga)
| Name | Type | Description |
|---|---|---|
|
|
|
Returns |
|
export declare function registerSaga<TimeoutNames, Y>(saga: Saga<TimeoutNames, Y>): Promise<EventSubscriptionControl>;
registerView(view)
| Name | Type | Description |
|---|---|---|
|
||
Returns |
|
export declare function registerView(view: EventView): Promise<EventSubscriptionControl>;
saga(name)
| Name | Type | Description |
|---|---|---|
|
|
|
Returns |
|
export declare function saga<TimeoutNames, SagaInstanceData>(name: string): Saga<TimeoutNames, SagaInstanceData>;
setEventClient(cl)
| Name | Type | Description |
|---|---|---|
|
||
Returns |
|
export declare function setEventClient(cl: EventClient): void;
setEventClientCodec(cl)
| Name | Type | Description |
|---|---|---|
|
||
Returns |
|
export declare function setEventClientCodec(cl: EventClientCodec): void;
setEventSourceName(name)
| Name | Type | Description |
|---|---|---|
|
|
|
Returns |
|
export declare function setEventSourceName(name: string): void;