package @eventicle/eventiclejs

Classes

Name Summary

AggregateRoot

InMemoryDatastore

In memory Datastore implementation.

This is suitable for testing and single instance demonstration purposes.

LocalScheduleJobRunner

A Scheduled Job Runner that uses node-cron, setTimeout and runs as a single node.

Persists and loads schedules on process stop/ start in the datastore with the types lock-manager-timer and lock-manager-cron

Saga

A saga!

SagaInstance

The data for a single execution of a Saga

Sagas are stateful concepts, and this type contains the state.

Interfaces

Name Summary

Command

A Command.

It is generally preferred dispatchDirectCommand where the command definition is implicit, and more fully type checked.

This, along with dispatchCommand is available if you wish to separate your code more fully, or introduce a remote capable message based command bus.

CommandIntent

A CommandIntent is a message instructing Eventicle to perform an action that may emit events that should be sent externally using the EventClient

CommandReturn

The global return type for Command, whether the command is explicit, as in dispatchCommand, or implicit, as in dispatchDirectCommand.

This return type is passed to the caller, but before that happens, it will be processed by the dispatcher to capture any events that need to be emitted.

EncodedEvent

EventAdapter

An adapter is an observer on an event stream.

It only operates on hot event data, and will never attempt to replay everything

EventClient

EventClientCodec

Convert EventicleEvent to/ from EncodedEvent.

EncodedEvent is suitable for the EventClient implementations to send on the wire, as it is a Buffer and a set of message headers.

EventicleEvent

EventView

RawEventView

Functions

aggregateObservationAdapter()

Signature
aggregateObservationAdapter: () => AggregateObservationAdapter

aggregateObserver(aggregateType, id, timeout, exec)

Block for an asynchronous, event based, workflow to occur.

Reads streams and allows short lived observers to be notified if a particular AggregateRoot instance has been modified. This is picked out based on the domainId of the event.

This allows a synchronous API to block and wait for asynchronous changes to finish occurring before continuing processing

For example, given a User AggregateRoot, you can wait for a Saga (or other operational component) to approve a user account, by blocking until the given User has raised the user.approved event.

export class UserAggregate extends AggregateRoot {
  approved: boolean = false

  constructor() {
     super("User")

     this.reducers = []
     // persist the approval in the current aggregate state.
     // This avoids any potential race conditions between performing the action that requires
     // approval and the observer starting to monitor. See below.
     this.reducers["user.approved"] = (ev) => {
       this.approved = true
     }
  }

  approve() {
    this.raiseEvent({
      type: "user.approved",
      data: {}
      // other event fields
    })
  }
}

First register the observer. This will open a new event subscription, with a dynamically generated consumer group. All events on the topic will be received by this instances, so be mindful of doing this on topics with large volumes of data.

  await registerAdapter(createAggregateObservationAdapter([AnAggregate]));

In your synchronous API (eg, HTTP/ REST, GQL).

// The userId is the domainId of the UserAggregate instance we want to observe
export doActionThatRequiresApproval(userId: string) {

  // perform some action that triggers async workflows

  // now wait for the User to be approved
  const user = await aggregateObserver(
                            UserAggregate,
                            userId,  // the UserAggregate domainId
                            10000,   // max time to wait for the User instance to come into the required state
                            (ar, ev) => {
          // this is called once straight away with just the current state of the User aggregate (ev is null)
          // then, every time an event is observed for the UserAggregate with the domainId == userId
          // check the current state of the aggregate is what you want to see, or the event being received
          // is the one you are waiting for.  Or, ideally, both of those things
          // doing both will avoid any race conditions.
     return ev && ev.type == "user.approved" || ar.approved
  });

  // user is now approved, or an Error has been thrown
  // perform any ops on the user as required.

}
Parameters
Name Type Description

aggregateType

{ new (…​params: any[]): AR; }

id

string

timeout

number

exec

(ar: AR, event?: EventicleEvent) => boolean

Returns

Promise<AR>

Signature
export declare function aggregateObserver<AR extends AggregateRoot>(aggregateType: {
    new (...params: any[]): AR;
}, id: string, timeout: number, exec: (ar: AR, event?: EventicleEvent) => boolean): Promise<AR>;

allSagaInstances(workspaceId)

Parameters
Name Type Description

workspaceId

string

Returns

Promise<SagaInstance<any, any>[]>

Signature
export declare function allSagaInstances(workspaceId?: string): Promise<SagaInstance<any, any>[]>;

allSagas()

Signature
export declare function allSagas(): Promise<Saga<any, any>[]>;

apm.apmJoinEvent(event, name, type, subtype)

Will join the APM transaction that is stamped in the event header - apmTrace

This will generate a new span within the APM transaction.

Parameters
Name Type Description

event

EventicleEvent

the event to join the APM trace on

name

string

The name of the APM transaction

type

string

the APM type

subtype

string

the APM subtype

Returns

void

Signature
export declare function apmJoinEvent(event: EventicleEvent, name: string, type: string, subtype: string): void;

connectBroker(config)

Parameters
Name Type Description

config

KafkaConfig

Returns

Promise<void>

Signature
export declare function connectBroker(config: KafkaConfig): Promise<void>;

consumeFullEventLog(stream)

Will cold replay the entire event stream into a list.

Mostly useful for testing, use against long running event streams/ Kafka will blow your local memory!

Parameters
Name Type Description

stream

string

The event stream to cold replay

Returns

Promise<EventicleEvent[]>

Signature
export declare function consumeFullEventLog(stream: string): Promise<EventicleEvent[]>;

createAggregateObservationAdapter(aggregates)

Create a new EventAdapter with a dynamic Consumer group id that will subscribe to the topics for the given AggregateRoots

Parameters
Name Type Description

aggregates

{ new (): AR; }[]

the list of aggregate roots to observe. Their topic names will be extracted.

Returns

\~AggregateObservationAdapter

Signature
export declare function createAggregateObservationAdapter<AR extends AggregateRoot>(aggregates: {
    new (): AR;
}[]): AggregateObservationAdapter;

dispatchCommand(commandIntent)

This will lookup a pre-registered Command from the , then execute the command with the given payload.

It is generally preferred dispatchDirectCommand where the command definition is implicit, and more fully type checked.

This, along with dispatchCommand is available if you wish to separate your code more fully, or introduce a remote capable message based command bus.

Parameters
Name Type Description

commandIntent

CommandIntent<T>

Returns

Promise<CommandReturn<T>>

Signature
export declare function dispatchCommand<T>(commandIntent: CommandIntent<T>): Promise<CommandReturn<T>>;

dispatchDirectCommand(command, streamToEmit, transactionControl)

Dispatch a command directly, without a CommandIntent message in between.

Cannot be distributed or load balanced, but requires less boilerplate.

Parameters
Name Type Description

command

() => Promise<CommandReturn<T>>

streamToEmit

string

transactionControl

TransactionOptions

Returns

Promise<T>

Signature
export declare function dispatchDirectCommand<T>(command: () => Promise<CommandReturn<T>>, streamToEmit: string, transactionControl?: TransactionOptions): Promise<T>;

eventClient()

Signature
export declare function eventClient(): EventClient;

eventClientCodec()

Signature
export declare function eventClientCodec(): EventClientCodec;

eventClientOnDatastore()

This is a test capable event client.

It fully implements the event client semantics, and persists its events into the given data store.

Good to pair with the InMemDataStore for testing and local dev usage.

Not recommended for production (really!), as you disable any possibility of distribution

Signature
export declare function eventClientOnDatastore(): EventClient;

eventClientOnKafka(config, consumerConfig, onTopicFailureConfig)

Parameters
Name Type Description

config

KafkaConfig

as per kafkjs

consumerConfig

\~ConsumerConfigFactory

as per kafkajs

onTopicFailureConfig

(topicName: any) => Promise<\~TopicFailureConfiguration>

If a consumer fails because the topic doesn’t exist, configure this to request the topic is auto generated with the given config

Returns

Promise<EventClient>

Signature
export declare function eventClientOnKafka(config: KafkaConfig, consumerConfig?: ConsumerConfigFactory, onTopicFailureConfig?: (topicName: any) => Promise<TopicFailureConfiguration>): Promise<EventClient>;

eventSourceName()

Signature
export declare function eventSourceName(): string;

metrics()

Signature
export declare function metrics(): {
    "view-latency": any;
    "adapter-latency": any;
    "saga-latency": any;
};

registerAdapter(adapter)

This will connect the given EventAdapter to event streams.

An EventAdapter is a hot subscription, and will receive events emitted after it first connects.

If it is offline for a period, the backing event store (eg, Kafka) will allow the adapter to reconnect and pick up from where it had previous processed up to.

Parameters
Name Type Description

adapter

EventAdapter

Returns

Promise<\~EventSubscriptionControl>

Signature
export declare function registerAdapter(adapter: EventAdapter): Promise<EventSubscriptionControl>;

registerCommand(command)

Parameters
Name Type Description

command

Command<I, O>

Returns

void

Signature
export declare function registerCommand<I, O>(command: Command<I, O>): void;

registerRawView(view)

Will register a raw event view

This subscribes it to the appropriate event streams. For every event received, handeEvent will be called.

Events are not processed through the EventClientCodec, and so are observed encoded as an EncodedEvent

This can be useful if you want to persist the event in a raw form, as a binary encoded stream.

Parameters
Name Type Description

view

RawEventView

The View to subscribe to event streams

Returns

Promise<\~EventSubscriptionControl>

Signature
export declare function registerRawView(view: RawEventView): Promise<EventSubscriptionControl>;

registerSaga(saga)

Parameters
Name Type Description

saga

Saga<TimeoutNames, Y>

Returns

Promise<\~EventSubscriptionControl>

Signature
export declare function registerSaga<TimeoutNames, Y>(saga: Saga<TimeoutNames, Y>): Promise<EventSubscriptionControl>;

registerView(view)

Parameters
Name Type Description

view

EventView

Returns

Promise<\~EventSubscriptionControl>

Signature
export declare function registerView(view: EventView): Promise<EventSubscriptionControl>;

removeAllSagas()

Signature
export declare function removeAllSagas(): Promise<void>;

saga(name)

Parameters
Name Type Description

name

string

Returns

Saga<TimeoutNames, SagaInstanceData>

Signature
export declare function saga<TimeoutNames, SagaInstanceData>(name: string): Saga<TimeoutNames, SagaInstanceData>;

scheduler()

Signature
export declare function scheduler(): ScheduleJobRunner;

setEventClient(cl)

Parameters
Name Type Description

cl

EventClient

Returns

void

Signature
export declare function setEventClient(cl: EventClient): void;

setEventClientCodec(cl)

Parameters
Name Type Description

cl

EventClientCodec

Returns

void

Signature
export declare function setEventClientCodec(cl: EventClientCodec): void;

setEventSourceName(name)

Parameters
Name Type Description

name

string

Returns

void

Signature
export declare function setEventSourceName(name: string): void;

setScheduler(scheduler)

Parameters
Name Type Description

scheduler

ScheduleJobRunner

Returns

void

Signature
export declare function setScheduler(scheduler: ScheduleJobRunner): void;