durable-execution - v0.32.0
    Preparing search index...

    Class DurableExecutor

    A durable executor. It is used to execute tasks durably, reliably and resiliently.

    Multiple durable executors can share the same storage. In such a case, all the tasks should be present for all the durable executors. The work is distributed among the durable executors. See the usage and task examples sections for more details on creating and enqueuing tasks.

    If using effect, use the makeEffectDurableExecutor function to create the executor with first class support for effect. See makeEffectDurableExecutor for more details.

    import { childTask, DurableExecutor } from 'durable-execution'
    import { Schema } from 'effect'

    const executor = await DurableExecutor.make(storage)

    // Create tasks
    const extractFileTitle = executor
    .inputSchema(Schema.Struct({ filePath: Schema.String }))
    .task({
    id: 'extractFileTitle',
    timeoutMs: 30_000, // 30 seconds
    run: async (ctx, input) => {
    // ... extract the file title
    return {
    title: 'File Title',
    }
    },
    })

    const summarizeFile = executor
    .validateInput(async (input: { filePath: string }) => {
    // Example validation function - implement your own validation logic
    if (!isValidFilePath(input.filePath)) {
    throw new Error('Invalid file path')
    }
    return {
    filePath: input.filePath,
    }
    })
    .task({
    id: 'summarizeFile',
    timeoutMs: 30_000, // 30 seconds
    run: async (ctx, input) => {
    // ... summarize the file
    return {
    summary: 'File summary',
    }
    },
    })

    const uploadFile = executor
    .inputSchema(Schema.Struct({ filePath: Schema.String, uploadUrl: Schema.String }))
    .parentTask({
    id: 'uploadFile',
    timeoutMs: 60_000, // 1 minute
    runParent: async (ctx, input) => {
    // ... upload file to the given uploadUrl
    // Extract the file title and summarize the file in parallel
    return {
    output: {
    filePath: input.filePath,
    uploadUrl: input.uploadUrl,
    fileSize: 100,
    },
    children: [
    childTask(extractFileTitle, { filePath: input.filePath }),
    childTask(summarizeFile, { filePath: input.filePath }),
    ],
    }
    },
    finalize: {
    id: 'uploadFileFinalize',
    timeoutMs: 60_000, // 1 minute
    run: async (ctx, { output, children }) => {
    // ... combine the output of the run function and children tasks
    return {
    filePath: output.filePath,
    uploadUrl: output.uploadUrl,
    fileSize: 100,
    title: 'File Title',
    summary: 'File summary',
    }
    }
    },
    })

    async function app() {
    // Enqueue task and manage its execution lifecycle
    const uploadFileHandle = await executor.enqueueTask(uploadFile, {
    filePath: 'file.txt',
    uploadUrl: 'https://example.com/upload',
    })
    const uploadFileExecution = await uploadFileHandle.getExecution()
    const uploadFileFinishedExecution = await uploadFileHandle.waitAndGetFinishedExecution()
    await uploadFileHandle.cancel()

    console.log(uploadFileExecution)
    }

    // Start the durable executor
    await executor.start()

    // Run the app
    await app()

    // Shutdown the durable executor when the app is done
    await executor.shutdown()
    Index

    Properties

    id: string

    Methods

    • Make a durable executor.

      Parameters

      Returns Promise<DurableExecutor>

      Options available:

      • serializer - The serializer to use for the durable executor. If not provided, a default serializer using superjson will be used.
      • logLevel - The log level to use for the durable executor. If not provided, defaults to info.
      • expireLeewayMs - The expiration leeway duration after which a task execution is considered expired. If not provided, defaults to 300_000 (5 minutes).
      • backgroundProcessIntraBatchSleepMs - The duration to sleep between batches of background processes. If not provided, defaults to 500 (500ms).
      • maxConcurrentTaskExecutions - The maximum number of tasks that can run concurrently. If not provided, defaults to 5000.
      • maxTaskExecutionsPerBatch - The maximum number of tasks to process in each batch. If not provided, defaults to 100.
      • processOnChildrenFinishedTaskExecutionsBatchSize - The maximum number of on children finished task executions to process in each batch. If not provided, defaults to 100.
      • markFinishedTaskExecutionsAsCloseStatusReadyBatchSize - The maximum number of finished task executions to mark as close status ready in each batch. If not provided, defaults to 100.
      • closeFinishedTaskExecutionsBatchSize - The maximum number of finished task executions to close in each batch. If not provided, defaults to 100.
      • cancelNeedsPromiseCancellationTaskExecutionsBatchSize - The maximum number of needs promise cancellation task executions to cancel in each batch. If not provided, defaults to 100.
      • retryExpiredTaskExecutionsBatchSize - The maximum number of expired task executions to retry in each batch. If not provided, defaults to 100.
      • maxChildrenPerTaskExecution - The maximum number of children tasks per parent task. If not provided, defaults to 1000.
      • maxSerializedInputDataSize - The maximum size of serialized input data in bytes. If not provided, defaults to 1MB.
      • maxSerializedOutputDataSize - The maximum size of serialized output data in bytes. If not provided, defaults to 1MB.
      • enableStorageBatching - Whether to enable storage batching. If not provided, defaults to false.
      • enableStorageStats - Whether to enable storage stats. If not provided, defaults to false.
      • storageBackgroundBatchingProcessIntraBatchSleepMs - The sleep duration between batches of storage operations. Only applicable if storage batching is enabled. If not provided, defaults to 10ms.
      • storageMaxRetryAttempts - The maximum number of times to retry a storage operation. If not provided, defaults to 1.
    • Create a new task that runs a task until it returns { isDone: true, output: TOutput }. If the task doesn't return { isDone: true, output: TOutput } within the max attempts, the looping task will return { isSuccess: false }. Any error from the task will be considered a failure of the looping task.

      Type Parameters

      • TInput = undefined
      • TOutput = unknown

      Parameters

      • id: string

        The id of the looping task.

      • iterationTask: Task<TInput, { isDone: false } | { isDone: true; output: TOutput }>

        The iteration task to run.

      • maxAttempts: number

        The maximum number of attempts to run the task.

      • OptionalsleepMsBeforeRun: number | ((attempt: number) => number)

        The sleep time before running the task. If a function is provided, it will be called with the attempt number. The initial attempt is 0, then 1, then 2, etc.

      Returns Task<
          TInput,
          { isSuccess: false }
          | { isSuccess: true; output: TOutput },
          "parentTask",
      >

      The looping task.

    • Add a validate input function to the durable executor.

      Type Parameters

      • TRunInput
      • TInput

      Parameters

      Returns {
          task: <TOutput = unknown>(
              taskOptions: TaskOptions<TRunInput, TOutput>,
          ) => Task<TInput, TOutput, "task">;
          parentTask: <
              TRunOutput = unknown,
              TOutput = DefaultParentTaskOutput<TRunOutput>,
              TFinalizeTaskRunOutput = unknown,
          >(
              parentTaskOptions: ParentTaskOptions<
                  TRunInput,
                  TRunOutput,
                  TOutput,
                  TFinalizeTaskRunOutput,
              >,
          ) => Task<TInput, TOutput, "parentTask">;
          sequentialTasks: <TSequentialTasks extends readonly AnyTask[]>(
              id: string,
              tasks: SequentialTasks<TSequentialTasks>,
          ) => Task<
              TInput,
              InferTaskOutput<LastTaskElementInArray<TSequentialTasks>>,
          >;
          loopingTask: <TOutput = unknown>(
              id: string,
              iterationTask: Task<
                  TRunInput,
                  { isDone: false }
                  | { isDone: true; output: TOutput },
              >,
              maxAttempts: number,
              sleepMsBeforeRun?: number | ((attempt: number) => number),
          ) => Task<
              TInput,
              { isSuccess: false }
              | { isSuccess: true; output: TOutput },
              "parentTask",
          >;
      }

      The validate input function.

    • Enqueue a task for execution.

      Type Parameters

      Parameters

      • ...rest: undefined extends InferTaskInput<TTask>
            ? [
                task: TTask,
                input?: InferTaskInput<TTask>,
                options?: TaskEnqueueOptions<TTask> & {
                    taskExecutionsStorageTransaction?: Pick<
                        TaskExecutionsStorage,
                        "insertMany",
                    >;
                },
            ]
            : [
                task: TTask,
                input: InferTaskInput<TTask>,
                options?: TaskEnqueueOptions<TTask> & {
                    taskExecutionsStorageTransaction?: Pick<
                        TaskExecutionsStorage,
                        "insertMany",
                    >;
                },
            ]

        The task to enqueue, input, and options.

      Returns Promise<
          {
              taskId: string;
              executionId: string;
              getExecution: () => Promise<TaskExecution<InferTaskOutput<TTask>>>;
              waitAndGetFinishedExecution: (
                  options?: { pollingIntervalMs?: number; signal?: AbortSignal },
              ) => Promise<FinishedTaskExecution<InferTaskOutput<TTask>>>;
              cancel: () => Promise<void>;
          },
      >

      A handle to the task execution.

    • Get a handle to a task execution.

      Type Parameters

      Parameters

      • task: TTask

        The task to get the handle for.

      • executionId: string

        The id of the execution to get the handle for.

      Returns Promise<
          {
              taskId: string;
              executionId: string;
              getExecution: () => Promise<TaskExecution<InferTaskOutput<TTask>>>;
              waitAndGetFinishedExecution: (
                  options?: { pollingIntervalMs?: number; signal?: AbortSignal },
              ) => Promise<FinishedTaskExecution<InferTaskOutput<TTask>>>;
              cancel: () => Promise<void>;
          },
      >

      The handle to the task execution.

    • Shutdown the durable executor. Cancels all active executions and stops the background processes and promises.

      On shutdown, these happen in this order:

      • Stop enqueuing new tasks
      • Stop background processes and promises after the current iteration
      • Wait for active task executions to finish. Task execution context contains a shutdown signal that can be used to gracefully shutdown the task when executor is shutting down.

      Returns Promise<void>

    • Get the running task execution ids.

      Returns ReadonlySet<string>

      The running task execution ids.

    • Get storage metrics for monitoring and debugging.

      Returns Promise<
          {
              processName: string;
              count: number;
              min: number;
              max: number;
              quantiles: readonly (readonly [number, Option<number>])[];
          }[],
      >

      The storage metrics.