durable-execution - v0.7.0
    Preparing search index...

    Class DurableExecutor

    A durable executor. It is used to execute durable tasks.

    Multiple durable executors can share the same storage. In such a case, all the tasks should be present for all the durable executors. The work is distributed among the durable executors. See the usage and task examples sections for more details on creating and enqueuing tasks.

    const executor = new DurableExecutor(storage)

    // Create tasks
    const extractFileTitle = executor
    .inputSchema(v.object({ filePath: v.string() }))
    .task({
    id: 'extractFileTitle',
    timeoutMs: 30_000, // 30 seconds
    run: async (ctx, input) => {
    // ... extract the file title
    return {
    title: 'File Title',
    }
    },
    })

    const summarizeFile = executor
    .validateInput(async (input: { filePath: string }) => {
    if (!isValidFilePath(input.filePath)) {
    throw new Error('Invalid file path')
    }
    return {
    filePath: input.filePath,
    }
    })
    .task({
    id: 'summarizeFile',
    timeoutMs: 30_000, // 30 seconds
    run: async (ctx, input) => {
    // ... summarize the file
    return {
    summary: 'File summary',
    }
    },
    })

    const uploadFile = executor
    .inputSchema(v.object({ filePath: v.string(), uploadUrl: v.string() }))
    .parentTask({
    id: 'uploadFile',
    timeoutMs: 60_000, // 1 minute
    runParent: async (ctx, input) => {
    // ... upload file to the given uploadUrl
    // Extract the file title and summarize the file in parallel
    return {
    output: {
    filePath: input.filePath,
    uploadUrl: input.uploadUrl,
    fileSize: 100,
    },
    children: [
    {
    task: extractFileTitle,
    input: { filePath: input.filePath },
    },
    {
    task: summarizeFile,
    input: { filePath: input.filePath },
    },
    ],
    }
    },
    onRunAndChildrenComplete: {
    id: 'onUploadFileAndChildrenComplete',
    timeoutMs: 60_000, // 1 minute
    run: async (ctx, { input, output, childrenOutputs }) => {
    // ... combine the output of the run function and children tasks
    return {
    filePath: input.filePath,
    uploadUrl: input.uploadUrl,
    fileSize: 100,
    title: 'File Title',
    summary: 'File summary',
    }
    }
    },
    })

    async function app() {
    // Enqueue task and manage its execution lifecycle
    const uploadFileHandle = await executor.enqueueTask(uploadFile, {filePath: 'file.txt'})
    const uploadFileExecution = await uploadFileHandle.getExecution()
    const uploadFileFinishedExecution = await uploadFileHandle.waitAndGetExecution()
    await uploadFileHandle.cancel()

    console.log(uploadFileExecution)
    }

    // Start the durable executor and run the app
    await Promise.all([
    executor.start(), // Start the durable executor in the background
    app(), // Run the app
    ])

    // Shutdown the durable executor when the app is done
    await executor.shutdown()
    Index

    Constructors

    • Create a durable executor.

      Parameters

      • storage: DurableStorage

        The storage to use for the durable executor.

      • options: {
            serializer?: Serializer;
            logger?: Logger;
            enableDebug?: boolean;
            expireMs?: number;
        } = {}

        The options for the durable executor.

        • Optionalserializer?: Serializer

          The serializer to use for the durable executor. If not provided, a default serializer using superjson will be used.

        • Optionallogger?: Logger

          The logger to use for the durable executor. If not provided, a console logger will be used.

        • OptionalenableDebug?: boolean

          Whether to enable debug logging. If true, debug logging will be enabled.

        • OptionalexpireMs?: number

      Returns DurableExecutor

    Methods

    • Shutdown the durable executor. Cancels all active executions and stops the background process.

      On shutdown, these happen in this order:

      • Stop enqueuing new tasks
      • Stop background processes after the current iteration
      • Wait for active task executions to finish. Task execution context contains a shutdown signal that can be used to gracefully shutdown the task when executor is shutting down.

      Returns Promise<void>

    • Type Parameters

      • TInputSchema extends StandardSchemaV1<unknown, unknown>

      Parameters

      Returns {
          task: <TOutput = unknown>(
              taskOptions: DurableTaskOptions<InferOutput<TInputSchema>, TOutput>,
          ) => DurableTask<InferInput<TInputSchema>, TOutput>;
          parentTask: <
              TRunOutput = unknown,
              TOutput = {
                  output: TRunOutput;
                  childrenOutputs: DurableTaskChildExecutionOutput[];
              },
              TOnRunAndChildrenCompleteRunOutput = unknown,
          >(
              parentTaskOptions: DurableParentTaskOptions<
                  InferOutput<TInputSchema>,
                  TRunOutput,
                  TOutput,
                  TOnRunAndChildrenCompleteRunOutput,
              >,
          ) => DurableTask<InferInput<TInputSchema>, TOutput>;
      }

    • Create a new task that runs a sequence of tasks sequentially.

      The tasks list must be a list of tasks that are compatible with each other. The input of any task must be the same as the output of the previous task. The output of the last task will be the output of the sequential task.

      The tasks list cannot be empty.

      Type Parameters

      Parameters

      Returns DurableTask<
          ExtractDurableTaskInput<T[0]>,
          ExtractDurableTaskOutput<LastElement<T>>,
      >

      The sequential task.

    • Get the running task execution ids.

      Returns ReadonlySet<string>

      The running task execution ids.