durable-execution - v0.32.0
    Preparing search index...

    Type Alias TaskExecutionsStorage

    The storage interface for persisting task execution state. Implementations must ensure all operations are atomic to maintain consistency in distributed environments.

    If the implementation is not atomic, use TaskExecutionsStorageWithMutex to wrap the storage implementation and make all operations atomic.

    If the implementation doesn't support batching methods, use TaskExecutionsStorageWithBatching to wrap the storage implementation and implement the batching methods.

    • Atomicity: All operations must be atomic (use transactions where available)
    • Concurrency: Support multiple parallel transactions without deadlocks
    • Consistency: Ensure data consistency across all operations
    • Durability: Data must persist across process restarts

    For optimal performance, create these indexes in your storage backend:

    • Unique Indexes:

      • uniqueIndex(executionId)
      • uniqueIndex(sleepingTaskUniqueId) - sparse/partial index where not null
    • Non-unique Indexes (order matters for performance):

    • index(status, startAt) - for processing ready task executions

    • index(status, onChildrenFinishedProcessingStatus, activeChildrenCount, updatedAt) - for processing parent task executions when children are finished processing

    • index(closeStatus, updatedAt) - for task execution closure handling

    • index(status, isSleepingTask, expiresAt) - for task execution timeout handling

    • index(onChildrenFinishedProcessingExpiresAt) - for recovery during processing parent task executions when children are finished processing

    • index(closeExpiresAt) - for task execution closure handling recovery

    • index(executorId, needsPromiseCancellation, updatedAt) - for task execution cancellation

    • index(parent.executionId, isFinished) - for child task execution queries

    • index(isFinished, closeStatus, updatedAt) - for finished task execution cleanup

    // Custom storage implementation
    class MongoDBStorage implements TaskExecutionsStorage {
    async insertMany(executions: ReadonlyArray<TaskExecutionStorageValue>) {
    await this.collection.insertMany(executions)
    }

    async getManyById(requests: ReadonlyArray<{executionId: string}>) {
    const ids = requests.map(r => r.executionId)
    const docs = await this.collection.find({ executionId: { $in: ids } })
    return requests.map(req =>
    docs.find(doc => doc.executionId === req.executionId) || undefined
    )
    }

    // ... implement other methods
    }
    type TaskExecutionsStorage = {
        insertMany: (
            executions: ReadonlyArray<TaskExecutionStorageValue>,
        ) => void | Promise<void>;
        getManyById: (
            requests: ReadonlyArray<
                { executionId: string; filters?: TaskExecutionStorageGetByIdFilters },
            >,
        ) =>
            | (TaskExecutionStorageValue | undefined)[]
            | Promise<(TaskExecutionStorageValue | undefined)[]>;
        getManyBySleepingTaskUniqueId: (
            requests: ReadonlyArray<{ sleepingTaskUniqueId: string }>,
        ) =>
            | (TaskExecutionStorageValue | undefined)[]
            | Promise<(TaskExecutionStorageValue | undefined)[]>;
        updateManyById: (
            requests: ReadonlyArray<
                {
                    executionId: string;
                    filters?: TaskExecutionStorageGetByIdFilters;
                    update: TaskExecutionStorageUpdate;
                },
            >,
        ) => void
        | Promise<void>;
        updateManyByIdAndInsertChildrenIfUpdated: (
            requests: ReadonlyArray<
                {
                    executionId: string;
                    filters?: TaskExecutionStorageGetByIdFilters;
                    update: TaskExecutionStorageUpdate;
                    childrenTaskExecutionsToInsertIfAnyUpdated: ReadonlyArray<
                        TaskExecutionStorageValue,
                    >;
                },
            >,
        ) => void
        | Promise<void>;
        updateByStatusAndStartAtLessThanAndReturn: (
            request: {
                status: TaskExecutionStatus;
                startAtLessThan: number;
                update: TaskExecutionStorageUpdate;
                updateExpiresAtWithStartedAt: number;
                limit: number;
            },
        ) => TaskExecutionStorageValue[]
        | Promise<TaskExecutionStorageValue[]>;
        updateByStatusAndOnChildrenFinishedProcessingStatusAndActiveChildrenCountZeroAndReturn: (
            request: {
                status: TaskExecutionStatus;
                onChildrenFinishedProcessingStatus: TaskExecutionOnChildrenFinishedProcessingStatus;
                update: TaskExecutionStorageUpdate;
                limit: number;
            },
        ) => TaskExecutionStorageValue[]
        | Promise<TaskExecutionStorageValue[]>;
        updateByCloseStatusAndReturn: (
            request: {
                closeStatus: TaskExecutionCloseStatus;
                update: TaskExecutionStorageUpdate;
                limit: number;
            },
        ) => TaskExecutionStorageValue[]
        | Promise<TaskExecutionStorageValue[]>;
        updateByStatusAndIsSleepingTaskAndExpiresAtLessThan: (
            request: {
                status: TaskExecutionStatus;
                isSleepingTask: boolean;
                expiresAtLessThan: number;
                update: TaskExecutionStorageUpdate;
                limit: number;
            },
        ) => number
        | Promise<number>;
        updateByOnChildrenFinishedProcessingExpiresAtLessThan: (
            request: {
                onChildrenFinishedProcessingExpiresAtLessThan: number;
                update: TaskExecutionStorageUpdate;
                limit: number;
            },
        ) => number
        | Promise<number>;
        updateByCloseExpiresAtLessThan: (
            request: {
                closeExpiresAtLessThan: number;
                update: TaskExecutionStorageUpdate;
                limit: number;
            },
        ) => number
        | Promise<number>;
        updateByExecutorIdAndNeedsPromiseCancellationAndReturn: (
            request: {
                executorId: string;
                needsPromiseCancellation: boolean;
                update: TaskExecutionStorageUpdate;
                limit: number;
            },
        ) => TaskExecutionStorageValue[]
        | Promise<TaskExecutionStorageValue[]>;
        getManyByParentExecutionId: (
            requests: ReadonlyArray<{ parentExecutionId: string }>,
        ) => TaskExecutionStorageValue[][] | Promise<TaskExecutionStorageValue[][]>;
        updateManyByParentExecutionIdAndIsFinished: (
            requests: ReadonlyArray<
                {
                    parentExecutionId: string;
                    isFinished: boolean;
                    update: TaskExecutionStorageUpdate;
                },
            >,
        ) => void
        | Promise<void>;
        updateAndDecrementParentActiveChildrenCountByIsFinishedAndCloseStatus: (
            request: {
                isFinished: boolean;
                closeStatus: TaskExecutionCloseStatus;
                update: TaskExecutionStorageUpdate;
                limit: number;
            },
        ) => number
        | Promise<number>;
        deleteById: (request: { executionId: string }) => void | Promise<void>;
        deleteAll: () => void | Promise<void>;
    }

    Implemented by

    Index

    Properties

    insertMany: (
        executions: ReadonlyArray<TaskExecutionStorageValue>,
    ) => void | Promise<void>

    Insert many task executions.

    Type Declaration

    getManyById: (
        requests: ReadonlyArray<
            { executionId: string; filters?: TaskExecutionStorageGetByIdFilters },
        >,
    ) =>
        | (TaskExecutionStorageValue | undefined)[]
        | Promise<(TaskExecutionStorageValue | undefined)[]>

    Get many task executions by id and optionally filter them.

    Type Declaration

    getManyBySleepingTaskUniqueId: (
        requests: ReadonlyArray<{ sleepingTaskUniqueId: string }>,
    ) =>
        | (TaskExecutionStorageValue | undefined)[]
        | Promise<(TaskExecutionStorageValue | undefined)[]>

    Get many task executions by sleeping task unique id.

    Type Declaration

    updateManyById: (
        requests: ReadonlyArray<
            {
                executionId: string;
                filters?: TaskExecutionStorageGetByIdFilters;
                update: TaskExecutionStorageUpdate;
            },
        >,
    ) => void
    | Promise<void>

    Update many task executions by id and optionally filter them. Each request should be atomic.

    Type Declaration

    updateManyByIdAndInsertChildrenIfUpdated: (
        requests: ReadonlyArray<
            {
                executionId: string;
                filters?: TaskExecutionStorageGetByIdFilters;
                update: TaskExecutionStorageUpdate;
                childrenTaskExecutionsToInsertIfAnyUpdated: ReadonlyArray<
                    TaskExecutionStorageValue,
                >;
            },
        >,
    ) => void
    | Promise<void>

    Update many task executions by id and insert children task executions if updated. Each request should be atomic.

    Type Declaration

      • (
            requests: ReadonlyArray<
                {
                    executionId: string;
                    filters?: TaskExecutionStorageGetByIdFilters;
                    update: TaskExecutionStorageUpdate;
                    childrenTaskExecutionsToInsertIfAnyUpdated: ReadonlyArray<
                        TaskExecutionStorageValue,
                    >;
                },
            >,
        ): void
        | Promise<void>
      • Parameters

        Returns void | Promise<void>

    updateByStatusAndStartAtLessThanAndReturn: (
        request: {
            status: TaskExecutionStatus;
            startAtLessThan: number;
            update: TaskExecutionStorageUpdate;
            updateExpiresAtWithStartedAt: number;
            limit: number;
        },
    ) => TaskExecutionStorageValue[]
    | Promise<TaskExecutionStorageValue[]>

    Update task executions by status and start at less than and return the task executions that were updated. The task executions are ordered by startAt ascending.

    Update expiresAt = updateExpiresAtWithStartedAt + existingTaskExecution.timeoutMs.

    Type Declaration

    updateByStatusAndOnChildrenFinishedProcessingStatusAndActiveChildrenCountZeroAndReturn: (
        request: {
            status: TaskExecutionStatus;
            onChildrenFinishedProcessingStatus: TaskExecutionOnChildrenFinishedProcessingStatus;
            update: TaskExecutionStorageUpdate;
            limit: number;
        },
    ) => TaskExecutionStorageValue[]
    | Promise<TaskExecutionStorageValue[]>

    Update task executions by status and on children finished processing status and active children task executions count zero and return the task executions that were updated. The task executions are ordered by updatedAt ascending.

    Type Declaration

    updateByCloseStatusAndReturn: (
        request: {
            closeStatus: TaskExecutionCloseStatus;
            update: TaskExecutionStorageUpdate;
            limit: number;
        },
    ) => TaskExecutionStorageValue[]
    | Promise<TaskExecutionStorageValue[]>

    Update task executions by close status and return the task executions that were updated. The task executions are ordered by updatedAt ascending.

    Type Declaration

    updateByStatusAndIsSleepingTaskAndExpiresAtLessThan: (
        request: {
            status: TaskExecutionStatus;
            isSleepingTask: boolean;
            expiresAtLessThan: number;
            update: TaskExecutionStorageUpdate;
            limit: number;
        },
    ) => number
    | Promise<number>

    Update task executions by is sleeping task and expires at less than and return the number of task executions that were updated. The task executions are ordered by expiresAt ascending.

    Type Declaration

      • (
            request: {
                status: TaskExecutionStatus;
                isSleepingTask: boolean;
                expiresAtLessThan: number;
                update: TaskExecutionStorageUpdate;
                limit: number;
            },
        ): number
        | Promise<number>
      • Parameters

        • request: {
              status: TaskExecutionStatus;
              isSleepingTask: boolean;
              expiresAtLessThan: number;
              update: TaskExecutionStorageUpdate;
              limit: number;
          }

          The request to update the task executions.

          • status: TaskExecutionStatus

            The status of the task executions to update.

          • isSleepingTask: boolean

            The is sleeping task of the task executions to update.

          • expiresAtLessThan: number

            The expires at less than of the task executions to update.

          • update: TaskExecutionStorageUpdate

            The update object.

          • limit: number

            The maximum number of task executions to update.

        Returns number | Promise<number>

        The number of task executions that were updated.

    updateByOnChildrenFinishedProcessingExpiresAtLessThan: (
        request: {
            onChildrenFinishedProcessingExpiresAtLessThan: number;
            update: TaskExecutionStorageUpdate;
            limit: number;
        },
    ) => number
    | Promise<number>

    Update task executions by on children finished processing expires at less than and return the number of task executions that were updated. The task executions are ordered by onChildrenFinishedProcessingExpiresAt ascending.

    Type Declaration

      • (
            request: {
                onChildrenFinishedProcessingExpiresAtLessThan: number;
                update: TaskExecutionStorageUpdate;
                limit: number;
            },
        ): number
        | Promise<number>
      • Parameters

        • request: {
              onChildrenFinishedProcessingExpiresAtLessThan: number;
              update: TaskExecutionStorageUpdate;
              limit: number;
          }

          The request to update the task executions.

          • onChildrenFinishedProcessingExpiresAtLessThan: number

            The on children finished processing expires at less than of the task executions to update.

          • update: TaskExecutionStorageUpdate

            The update object.

          • limit: number

            The maximum number of task executions to update.

        Returns number | Promise<number>

        The number of task executions that were updated.

    updateByCloseExpiresAtLessThan: (
        request: {
            closeExpiresAtLessThan: number;
            update: TaskExecutionStorageUpdate;
            limit: number;
        },
    ) => number
    | Promise<number>

    Update task executions by close expires at less than and return the number of task executions that were updated. The task executions are ordered by closeExpiresAt ascending.

    Type Declaration

      • (
            request: {
                closeExpiresAtLessThan: number;
                update: TaskExecutionStorageUpdate;
                limit: number;
            },
        ): number
        | Promise<number>
      • Parameters

        • request: {
              closeExpiresAtLessThan: number;
              update: TaskExecutionStorageUpdate;
              limit: number;
          }

          The request to update the task executions.

          • closeExpiresAtLessThan: number

            The close expires at less than of the task executions to update.

          • update: TaskExecutionStorageUpdate

            The update object.

          • limit: number

            The maximum number of task executions to update.

        Returns number | Promise<number>

        The task executions that were updated.

    updateByExecutorIdAndNeedsPromiseCancellationAndReturn: (
        request: {
            executorId: string;
            needsPromiseCancellation: boolean;
            update: TaskExecutionStorageUpdate;
            limit: number;
        },
    ) => TaskExecutionStorageValue[]
    | Promise<TaskExecutionStorageValue[]>

    Update task executions by executor id and needs promise cancellation. The task executions are ordered by updatedAt ascending.

    Type Declaration

    getManyByParentExecutionId: (
        requests: ReadonlyArray<{ parentExecutionId: string }>,
    ) => TaskExecutionStorageValue[][] | Promise<TaskExecutionStorageValue[][]>

    Get many task executions by parent execution id.

    Type Declaration

    updateManyByParentExecutionIdAndIsFinished: (
        requests: ReadonlyArray<
            {
                parentExecutionId: string;
                isFinished: boolean;
                update: TaskExecutionStorageUpdate;
            },
        >,
    ) => void
    | Promise<void>

    Update many task executions by parent execution id and is finished. Each request should be atomic.

    Type Declaration

      • (
            requests: ReadonlyArray<
                {
                    parentExecutionId: string;
                    isFinished: boolean;
                    update: TaskExecutionStorageUpdate;
                },
            >,
        ): void
        | Promise<void>
      • Parameters

        • requests: ReadonlyArray<
              {
                  parentExecutionId: string;
                  isFinished: boolean;
                  update: TaskExecutionStorageUpdate;
              },
          >

          The requests to update the task executions.

        Returns void | Promise<void>

    updateAndDecrementParentActiveChildrenCountByIsFinishedAndCloseStatus: (
        request: {
            isFinished: boolean;
            closeStatus: TaskExecutionCloseStatus;
            update: TaskExecutionStorageUpdate;
            limit: number;
        },
    ) => number
    | Promise<number>

    Update task executions by is finished and close status. Also, decrement parent active children count for all these task executions atomically along with the update. The task executions are ordered by updatedAt ascending.

    Type Declaration

      • (
            request: {
                isFinished: boolean;
                closeStatus: TaskExecutionCloseStatus;
                update: TaskExecutionStorageUpdate;
                limit: number;
            },
        ): number
        | Promise<number>
      • Parameters

        Returns number | Promise<number>

        The number of task executions that were updated.

    deleteById: (request: { executionId: string }) => void | Promise<void>

    Delete task execution by id. This is used for testing. Ideally the storage implementation should have a test and production mode and this method should be a no-op in production.

    Type Declaration

      • (request: { executionId: string }): void | Promise<void>
      • Parameters

        • request: { executionId: string }

          The request to delete the task execution.

          • executionId: string

            The id of the task execution to delete.

        Returns void | Promise<void>

    deleteAll: () => void | Promise<void>

    Delete all task executions. This is used for testing. Ideally the storage implementation should have a test and production mode and this method should be a no-op in production.