package @eventicle/eventiclejs
Classes
Name | Summary |
---|---|
In memory Datastore implementation. This is suitable for testing and single instance demonstration purposes. |
|
A Scheduled Job Runner that uses node-cron, setTimeout and runs as a single node. Persists and loads schedules on process stop/ start in the datastore with the types |
|
A saga! |
|
The data for a single execution of a Saga Sagas are stateful concepts, and this type contains the state. |
Interfaces
Name | Summary |
---|---|
A Command. It is generally preferred dispatchDirectCommand where the command definition is implicit, and more fully type checked. This, along with dispatchCommand is available if you wish to separate your code more fully, or introduce a remote capable message based command bus. |
|
A CommandIntent is a message instructing Eventicle to perform an action that may emit events that should be sent externally using the EventClient |
|
The global return type for Command, whether the command is explicit, as in dispatchCommand, or implicit, as in dispatchDirectCommand. This return type is passed to the caller, but before that happens, it will be processed by the dispatcher to capture any events that need to be emitted. |
|
An adapter is an observer on an event stream. It only operates on hot event data, and will never attempt to replay everything |
|
Convert EventicleEvent to/ from EncodedEvent. EncodedEvent is suitable for the EventClient implementations to send on the wire, as it is a Buffer and a set of message headers. |
|
Functions
aggregateObservationAdapter()
aggregateObservationAdapter: () => AggregateObservationAdapter
aggregateObserver(aggregateType, id, timeout, exec)
Block for an asynchronous, event based, workflow to occur.
Reads streams and allows short lived observers to be notified if a particular AggregateRoot instance has been modified. This is picked out based on the domainId
of the event.
This allows a synchronous API to block and wait for asynchronous changes to finish occurring before continuing processing
For example, given a User
AggregateRoot, you can wait for a Saga (or other operational component) to approve a user account, by blocking until the given User
has raised the user.approved
event.
export class UserAggregate extends AggregateRoot {
approved: boolean = false
constructor() {
super("User")
this.reducers = []
// persist the approval in the current aggregate state.
// This avoids any potential race conditions between performing the action that requires
// approval and the observer starting to monitor. See below.
this.reducers["user.approved"] = (ev) => {
this.approved = true
}
}
approve() {
this.raiseEvent({
type: "user.approved",
data: {}
// other event fields
})
}
}
First register the observer. This will open a new event subscription, with a dynamically generated consumer group. All events on the topic will be received by this instances, so be mindful of doing this on topics with large volumes of data.
await registerAdapter(createAggregateObservationAdapter([AnAggregate]));
In your synchronous API (eg, HTTP/ REST, GQL).
// The userId is the domainId of the UserAggregate instance we want to observe
export doActionThatRequiresApproval(userId: string) {
// perform some action that triggers async workflows
// now wait for the User to be approved
const user = await aggregateObserver(
UserAggregate,
userId, // the UserAggregate domainId
10000, // max time to wait for the User instance to come into the required state
(ar, ev) => {
// this is called once straight away with just the current state of the User aggregate (ev is null)
// then, every time an event is observed for the UserAggregate with the domainId == userId
// check the current state of the aggregate is what you want to see, or the event being received
// is the one you are waiting for. Or, ideally, both of those things
// doing both will avoid any race conditions.
return ev && ev.type == "user.approved" || ar.approved
});
// user is now approved, or an Error has been thrown
// perform any ops on the user as required.
}
Name | Type | Description |
---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
Returns |
|
export declare function aggregateObserver<AR extends AggregateRoot>(aggregateType: {
new (...params: any[]): AR;
}, id: string, timeout: number, exec: (ar: AR, event?: EventicleEvent) => boolean): Promise<AR>;
allSagaInstances(workspaceId)
Name | Type | Description |
---|---|---|
|
|
|
Returns |
|
export declare function allSagaInstances(workspaceId?: string): Promise<SagaInstance<any, any>[]>;
apm.apmJoinEvent(event, name, type, subtype)
Will join the APM transaction that is stamped in the event header - apmTrace
This will generate a new span within the APM transaction.
Name | Type | Description |
---|---|---|
|
the event to join the APM trace on |
|
|
|
The name of the APM transaction |
|
|
the APM type |
|
|
the APM subtype |
Returns |
|
export declare function apmJoinEvent(event: EventicleEvent, name: string, type: string, subtype: string): void;
connectBroker(config)
Name | Type | Description |
---|---|---|
|
|
|
Returns |
|
export declare function connectBroker(config: KafkaConfig): Promise<void>;
consumeFullEventLog(stream)
Will cold replay the entire event stream into a list.
Mostly useful for testing, use against long running event streams/ Kafka will blow your local memory!
Name | Type | Description |
---|---|---|
|
|
The event stream to cold replay |
Returns |
|
export declare function consumeFullEventLog(stream: string): Promise<EventicleEvent[]>;
createAggregateObservationAdapter(aggregates)
Create a new EventAdapter with a dynamic Consumer group id that will subscribe to the topics for the given AggregateRoots
Name | Type | Description |
---|---|---|
|
|
the list of aggregate roots to observe. Their topic names will be extracted. |
Returns |
|
export declare function createAggregateObservationAdapter<AR extends AggregateRoot>(aggregates: {
new (): AR;
}[]): AggregateObservationAdapter;
dispatchCommand(commandIntent)
This will lookup a pre-registered Command from the , then execute the command with the given payload.
It is generally preferred dispatchDirectCommand where the command definition is implicit, and more fully type checked.
This, along with dispatchCommand is available if you wish to separate your code more fully, or introduce a remote capable message based command bus.
Name | Type | Description |
---|---|---|
|
||
Returns |
|
export declare function dispatchCommand<T>(commandIntent: CommandIntent<T>): Promise<CommandReturn<T>>;
dispatchDirectCommand(command, streamToEmit, transactionControl)
Dispatch a command directly, without a CommandIntent message in between.
Cannot be distributed or load balanced, but requires less boilerplate.
Name | Type | Description |
---|---|---|
|
|
|
|
|
|
|
|
|
Returns |
|
export declare function dispatchDirectCommand<T>(command: () => Promise<CommandReturn<T>>, streamToEmit: string, transactionControl?: TransactionOptions): Promise<T>;
eventClientOnDatastore()
This is a test capable event client.
It fully implements the event client semantics, and persists its events into the given data store.
Good to pair with the InMemDataStore for testing and local dev usage.
Not recommended for production (really!), as you disable any possibility of distribution
export declare function eventClientOnDatastore(): EventClient;
eventClientOnKafka(config, consumerConfig, onTopicFailureConfig)
Name | Type | Description |
---|---|---|
|
|
as per kafkjs |
|
|
as per kafkajs |
|
|
If a consumer fails because the topic doesn’t exist, configure this to request the topic is auto generated with the given config |
Returns |
|
export declare function eventClientOnKafka(config: KafkaConfig, consumerConfig?: ConsumerConfigFactory, onTopicFailureConfig?: (topicName: any) => Promise<TopicFailureConfiguration>): Promise<EventClient>;
metrics()
export declare function metrics(): {
"view-latency": any;
"adapter-latency": any;
"saga-latency": any;
};
registerAdapter(adapter)
This will connect the given EventAdapter to event streams.
An EventAdapter is a hot
subscription, and will receive events emitted after it first connects.
If it is offline for a period, the backing event store (eg, Kafka) will allow the adapter to reconnect and pick up from where it had previous processed up to.
Name | Type | Description |
---|---|---|
|
||
Returns |
|
export declare function registerAdapter(adapter: EventAdapter): Promise<EventSubscriptionControl>;
registerCommand(command)
Name | Type | Description |
---|---|---|
|
|
|
Returns |
|
export declare function registerCommand<I, O>(command: Command<I, O>): void;
registerRawView(view)
Will register a raw event view
This subscribes it to the appropriate event streams. For every event received, handeEvent will be called.
Events are not processed through the EventClientCodec, and so are observed encoded as an EncodedEvent
This can be useful if you want to persist the event in a raw form, as a binary encoded stream.
Name | Type | Description |
---|---|---|
|
The View to subscribe to event streams |
|
Returns |
|
export declare function registerRawView(view: RawEventView): Promise<EventSubscriptionControl>;
registerSaga(saga)
Name | Type | Description |
---|---|---|
|
|
|
Returns |
|
export declare function registerSaga<TimeoutNames, Y>(saga: Saga<TimeoutNames, Y>): Promise<EventSubscriptionControl>;
registerView(view)
Name | Type | Description |
---|---|---|
|
||
Returns |
|
export declare function registerView(view: EventView): Promise<EventSubscriptionControl>;
saga(name)
Name | Type | Description |
---|---|---|
|
|
|
Returns |
|
export declare function saga<TimeoutNames, SagaInstanceData>(name: string): Saga<TimeoutNames, SagaInstanceData>;
setEventClient(cl)
Name | Type | Description |
---|---|---|
|
||
Returns |
|
export declare function setEventClient(cl: EventClient): void;
setEventClientCodec(cl)
Name | Type | Description |
---|---|---|
|
||
Returns |
|
export declare function setEventClientCodec(cl: EventClientCodec): void;