Documents
Async Workflow Execution
Async Workflow Execution
Type
Topic
Status
Published
Created
Feb 24, 2026
Updated
May 26, 2026
Created by
Dosu Bot
Updated by
Dosu Bot

Async Workflow Execution#

Async Workflow Execution in DBOS Python enables developers to write durable, resilient workflows using Python's native async/await syntax. This feature allows workflows to perform non-blocking I/O-bound operations while maintaining DBOS's guarantees of durability, idempotency, and automatic recovery.

DBOS Python provides seamless support for both async and sync workflows through a unified decorator interface. The @DBOS.workflow() decorator automatically handles both execution modes by detecting whether the decorated function is a coroutine using Python's introspection capabilities. The framework uses parallel execution paths for async and sync workflows, with async workflows executed on asyncio event loops with cancellation protection via asyncio.shield(), while sync workflows run in a ThreadPoolExecutor.

Async workflows are particularly useful for I/O-bound operations, concurrent API calls, and scheduled tasks. The framework maintains all durability and recovery guarantees regardless of whether workflows are synchronous or asynchronous.

Architecture and Implementation#

Runtime Detection and Function Introspection#

The critical distinction between sync and async execution happens at line 1164 in _core.py:

return _mark_coroutine(wrapper) if inspect.iscoroutinefunction(func) else wrapper

The framework uses inspect.iscoroutinefunction() throughout the codebase to determine if a function is async, appearing in multiple locations including workflow execution in _get_wf_invoke_func(), step execution in run_step_async(), and step decorator wrapping.

Event Loop Management#

The BackgroundEventLoop class manages async execution for workflows not started within an existing event loop:

def submit_coroutine(self, coro: Coroutine[Any, Any, T]) -> T:
    """Submit a coroutine to the background event loop"""
    if self._main_loop is not None and self._main_loop.is_running():
        return asyncio.run_coroutine_threadsafe(coro, self._main_loop).result()
    if self._loop is None:
        raise RuntimeError("Event loop not started")
    return asyncio.run_coroutine_threadsafe(coro, self._loop).result()

This is used in _get_wf_invoke_func() at line 492-494 to execute async workflows from synchronous contexts.

Execution Paths#

The framework provides parallel execution paths:

Sync Path: start_workflow() returns WorkflowHandle[R], executes workflows in a thread pool via dbos._executor.submit(), and uses _execute_workflow_wthread() for execution.

Async Path: start_workflow_async() returns WorkflowHandleAsync[R], executes workflows using asyncio.create_task(), and uses _execute_workflow_async() for execution. Tasks are shielded from cancellation: asyncio.shield(asyncio.create_task(coro)).

Database Operation Handling#

Async workflows wrap database calls in asyncio.to_thread() to avoid blocking the event loop:

status, should_execute = await asyncio.to_thread(
    _init_workflow,
    dbos,
    new_wf_ctx,
    inputs=inputs,
    # ... other parameters
)

Child workflow recording also uses asyncio.to_thread() to prevent blocking:

await asyncio.to_thread(
    dbos._sys_db.record_child_workflow,
    new_wf_ctx.parent_workflow_id,
    new_child_workflow_id,
    new_wf_ctx.parent_workflow_fid,
    get_dbos_func_name(func),
)

Defining Async Workflows#

Basic Syntax#

Async (coroutine) workflows are defined with async def and decorated with @DBOS.workflow():

@DBOS.step()
async def example_step():
    async with aiohttp.ClientSession() as session:
        async with session.get("https://example.com") as response:
            return await response.text()

@DBOS.workflow()
async def example_workflow(friend: str):
    await DBOS.sleep_async(10)
    body = await example_step()
    result = await asyncio.to_thread(example_transaction, body)
    return result

Decorator Parameters#

The @DBOS.workflow() decorator accepts several parameters that work identically for both sync and async workflows:

  • name: Optional workflow name (defaults to function qualified name)
  • max_recovery_attempts: Number of recovery attempts on failure
  • serialization_type: Controls serialization format (WorkflowSerializationFormat.DEFAULT or .PORTABLE)
  • validate_args: Optional argument validator function

Starting Async Workflows#

# Start async workflow (runs in same event loop as caller)
handle: WorkflowHandleAsync = await DBOS.start_workflow_async(example_workflow, "var1", "var2")

# Enqueue async workflow (runs in different event loop)
await enqueue_async(example_workflow, "input_data")

# Enqueue with delay (workflow enters DELAYED status)
with SetEnqueueOptions(delay_seconds=60.0):
    delayed_handle = await enqueue_async(example_workflow, "delayed_data")

Calling a coroutine workflow or starting it with DBOS.start_workflow_async runs it in the same event loop as the caller, while enqueueing with enqueue_async starts the workflow in a different event loop.

Async Steps#

Step Definition#

Async steps are fully supported through the @DBOS.step() decorator:

@DBOS.step(retries_allowed=True, max_attempts=10)
async def example_step():
    async with aiohttp.ClientSession() as session:
        async with session.get("https://example.com") as response:
            return await response.text()

@DBOS.step(preemptible=True)
async def long_running_operation():
    # This step can be immediately cancelled if its workflow is cancelled
    async with aiohttp.ClientSession() as session:
        async with session.get("https://example.com/long-operation") as response:
            return await response.text()

Step Options#

Step options are defined in the StepOptions TypedDict:

OptionTypeDefaultDescription
nameOptional[str]NoneOptional name for the step
retries_allowedboolFalseWhether the step should be retried on failure
interval_secondsfloat1.0Initial delay between retry attempts
max_attemptsint3Maximum number of attempts
backoff_ratefloat2.0Exponential backoff multiplier
should_retryOptional[Callable[[BaseException], bool]]NoneOptional predicate called with a raised exception to decide whether the step should be retried. If it returns False, the exception is re-raised immediately without further retries.
preemptibleboolFalseIf True, cancel the async step immediately if its workflow is cancelled. By default, when a workflow is cancelled, step execution is not preempted—the workflow continues running until the current step completes, then is preempted at the start of the next step. Setting preemptible=True changes this behavior so the step (and workflow) is cancelled immediately (or after a short polling interval). Only supported for async steps; attempting to use with sync steps will raise an exception.

The retry logic applies exponential backoff with a maximum retry interval of 1 hour (3600 seconds).

run_step_async#

The DBOS.run_step_async() method provides dynamic step execution:

@DBOS.workflow()
async def my_async_workflow() -> str:
    # Call async function
    result1 = await DBOS.run_step_async(
        {"name": "async_step", "retries_allowed": True, "max_attempts": 3},
        my_async_function,
        "param1"
    )

    # Call sync function in thread pool
    result2 = await DBOS.run_step_async(
        {"name": "blocking_step"},
        my_blocking_function,
        "param2"
    )

    return result1 + result2

The implementation handles both coroutine and regular functions, with sync functions executed via asyncio.to_thread() to avoid blocking the event loop.

Concurrency Patterns#

Parallel Step Execution#

Running async steps in parallel requires starting them in deterministic order:

# Start steps in deterministic order, then await together
tasks = [
    asyncio.create_task(step1("arg1")),
    asyncio.create_task(step2("arg2")),
    asyncio.create_task(step3("arg3")),
    asyncio.create_task(step4("arg4")),
]

# Use return_exceptions=True for proper error handling
results = await asyncio.gather(*tasks, return_exceptions=True)

Without return_exceptions=True, gather raises exceptions immediately and stops awaiting remaining tasks, potentially losing errors.

Durable asyncio.wait#

DBOS.asyncio_wait() is a durable implementation of asyncio.wait() that checkpoints which tasks are done versus pending, ensuring deterministic recovery when workflows are interrupted. It supports all the same parameters as the standard library function:

Signature:

DBOS.asyncio_wait(
    fs: List[Awaitable[Any]],
    *,
    timeout: Optional[float] = None,
    return_when: str = asyncio.ALL_COMPLETED,
) -> tuple[set[asyncio.Task[Any]], set[asyncio.Task[Any]]]

Parameters:

  • fs: List of awaitables (tasks, coroutines, or futures) to wait on
  • timeout: Optional timeout in seconds. If specified, returns after the timeout even if not all tasks are complete.
  • return_when: Controls when the function returns. Options include:
    • asyncio.ALL_COMPLETED (default): Wait for all tasks to complete
    • asyncio.FIRST_COMPLETED: Return as soon as any task completes
    • asyncio.FIRST_EXCEPTION: Return as soon as any task raises an exception

Returns:
A tuple of two sets: (done_tasks, pending_tasks) where done_tasks contains completed tasks and pending_tasks contains tasks that are still running.

Common Use Cases:

Waiting for the first step to complete:

@DBOS.step()
async def fast_step(val: str) -> str:
    return val + "_done"

@DBOS.step()
async def slow_step(val: str) -> str:
    await asyncio.sleep(10)
    return val + "_done"

@DBOS.workflow()
async def race_workflow() -> str:
    done, pending = await DBOS.asyncio_wait(
        [fast_step("fast"), slow_step("slow")],
        return_when=asyncio.FIRST_COMPLETED,
    )

    # Use the first result
    first_result = next(iter(done)).result()

    # Wait for remaining tasks to complete
    if pending:
        await DBOS.asyncio_wait(list(pending))

    return first_result

Waiting with a timeout:

@DBOS.workflow()
async def timeout_workflow() -> tuple[list[str], int]:
    done, pending = await DBOS.asyncio_wait(
        [step1(), step2(), step3()],
        timeout=5.0,
    )

    completed_results = [t.result() for t in done]
    pending_count = len(pending)

    # Wait for remaining tasks after timeout
    if pending:
        await DBOS.asyncio_wait(list(pending))

    return completed_results, pending_count

Waiting for the first exception:

@DBOS.workflow()
async def exception_handling_workflow() -> None:
    done, pending = await DBOS.asyncio_wait(
        [risky_step1(), risky_step2(), risky_step3()],
        return_when=asyncio.FIRST_EXCEPTION,
    )

    # Check for exceptions in completed tasks
    for task in done:
        if task.exception():
            # Handle the exception
            pass

    # Wait for remaining tasks
    if pending:
        await DBOS.asyncio_wait(list(pending))

Key Differences from asyncio.gather:

Unlike asyncio.gather(), which waits for all tasks to complete and returns their results as a list, DBOS.asyncio_wait() provides more granular control:

  • Returns a tuple of (done, pending) sets instead of a list of results
  • Supports timeouts and selective completion via return_when parameter
  • Does not automatically raise exceptions—you must check tasks for exceptions explicitly
  • Better suited for scenarios where you need to handle partial completion or timeouts

When called outside a workflow, DBOS.asyncio_wait() falls back to the standard asyncio.wait() function.

Child Workflows#

Child workflows can be called directly or started asynchronously:

@DBOS.workflow()
async def parent_workflow() -> tuple[str, str, str]:
    parent_id = DBOS.workflow_id
    # Direct child call
    result = await child_workflow("test")
    # Start child workflow asynchronously
    child_handle = await dbos.start_workflow_async(child_workflow, "async")
    child_result = await child_handle.get_result()
    return parent_id, child_handle.workflow_id, result + child_result

Communication Patterns#

Async workflows should use async versions of DBOS context methods:

# Send/receive messages
await DBOS.send_async(destination_id, message, topic)
message = await DBOS.recv_async(topic, timeout_seconds=60)

# Set/get events
await DBOS.set_event_async(key, value)
value = await DBOS.get_event_async(workflow_id, key)

# Sleep
await DBOS.sleep_async(10)

Idempotent Message Delivery#

Both send() and send_async() support an optional idempotency_key parameter for idempotent message delivery. This parameter ensures that duplicate sends with the same key are ignored:

# With idempotency key - prevents duplicate delivery
DBOS.send(
    destination_id, 
    message, 
    topic, 
    idempotency_key="unique-key-123"
)

# Async version with idempotency key
await DBOS.send_async(
    destination_id, 
    message, 
    topic, 
    idempotency_key="unique-key-456"
)

The idempotency_key parameter can be used in any context: from workflows, from steps, or from outside workflow contexts. When called from within a workflow, the workflow's own idempotency mechanism already handles deduplication, but an explicit idempotency_key can still be provided for additional control.

Waiting for Multiple Workflows#

DBOS.wait_first() and DBOS.wait_first_async() enable waiting for any one of multiple workflows to complete. These methods poll the database until at least one workflow's status is no longer PENDING, ENQUEUED, or DELAYED, then return the corresponding handle. This is useful for patterns like racing workflows, implementing timeouts, or processing results as they become available.

Basic Usage#

@DBOS.workflow()
async def coordinator_workflow() -> str:
    # Start multiple child workflows
    handles = [
        await DBOS.start_workflow_async(task_workflow, i) 
        for i in range(3)
    ]

    # Wait for first to complete
    first_completed = await DBOS.wait_first_async(handles)
    return await first_completed.get_result()

Processing Tasks as They Complete#

wait_first() can be used in a loop to process workflow results in completion order rather than start order:

@DBOS.workflow()
def process_tasks() -> List[str]:
    # Start multiple concurrent tasks
    handles = [queue.enqueue(process_task, i) for i in range(5)]

    # Process results as tasks complete (not in start order)
    results = []
    remaining = list(handles)
    while remaining:
        completed = DBOS.wait_first(remaining)
        results.append(completed.get_result())
        remaining = [h for h in remaining if h.workflow_id != completed.workflow_id]

    return results

Parameters and Error Handling#

Both methods accept the following parameters:

  • handles: List of workflow handles to wait on. Must not be empty and must not contain duplicate workflow IDs.
  • polling_interval_sec: Optional polling interval for checking workflow completion status (defaults to DEFAULT_POLLING_INTERVAL).

Error Handling:

  • Raises ValueError if the handles list is empty
  • Raises ValueError if the handles list contains duplicate workflow IDs

Transactions and Async Limitations#

Transaction Limitation#

Transactions explicitly do not support async functions. The code raises a DBOSException when a coroutine function is decorated with @DBOS.transaction():

if inspect.iscoroutinefunction(func):
    raise DBOSException(
        f"Function {transaction_name} is a coroutine function, but DBOS.transaction does not support coroutine functions"
    )

Technical Reasons:

Calling Transactions from Async Contexts#

When calling synchronous transactions from async contexts (such as async workflows), you should use asyncio.to_thread() to avoid blocking the event loop. If a transaction is called directly while an event loop is running, DBOS will log a warning to help identify this pattern:

@DBOS.transaction()
def my_db_transaction() -> str:
    rows = DBOS.sql_session.execute(sa.text("SELECT 1")).fetchall()
    return str(rows[0][0])

@DBOS.workflow()
async def async_workflow() -> str:
    # Recommended: Use asyncio.to_thread() to avoid blocking the event loop
    result = await asyncio.to_thread(my_db_transaction)
    return result

Warning Behavior:

If you call a transaction directly from an async context without using asyncio.to_thread(), DBOS will log a warning message:

Transaction {transaction_name} was called while an event loop is running. Invoke transactions from an async context using asyncio.to_thread to avoid blocking the event loop.

This warning helps developers identify potentially problematic patterns where synchronous transactions may block the event loop. While DBOS can handle the sync/async boundary internally, using asyncio.to_thread() is the recommended approach for proper async execution.

Integration Examples#

FastAPI Integration#

FastAPI integration example shows how to combine async workflows with HTTP endpoints:

@asynccontextmanager
async def lifespan(app: FastAPI) -> Any:
    nonlocal resource
    resource = 1
    yield
    resource = None

app = FastAPI(lifespan=lifespan)
DBOS(fastapi=app, config=config)

queue = Queue("queue")

@app.get("/")
@DBOS.workflow()
async def resource_workflow() -> Any:
    handle = await queue.enqueue_async(queue_workflow)
    return {
        "resource": resource,
        "loop": id(asyncio.get_event_loop()),
        "queue_loop": await handle.get_result(),
    }

Queue Integration#

Queue operations with async workflows enable background processing:

@DBOS.workflow()
async def test_workflow(var1: str, var2: str) -> str:
    var1 = await test_step(var1)
    return var1 + var2

@DBOS.step()
async def test_step(var: str) -> str:
    return var + "d"

queue = await DBOS.register_queue_async("test_queue")

# Immediate enqueueing
with SetWorkflowID(wfid):
    handle = await queue.enqueue_async(test_workflow, "abc", "123")
assert (await handle.get_result()) == "abcd123"

# Delayed enqueueing - workflow enters DELAYED status
with SetEnqueueOptions(delay_seconds=10.0):
    delayed_handle = await queue.enqueue_async(test_workflow, "xyz", "789")
# Workflow transitions to ENQUEUED after 10 seconds, then executes
assert (await delayed_handle.get_result()) == "xyzd789"

Async Queue Management#

DBOS provides async versions of queue management methods for use in async contexts:

Queue Registration, Retrieval, Deletion, and Listing:

  • DBOS.register_queue_async() - async version of register_queue()
  • DBOS.retrieve_queue_async() - async version of retrieve_queue()
  • DBOS.delete_queue_async() - async version of delete_queue()
  • DBOS.list_queues() - List all database-backed queues registered in the system database. Returns a list of Queue objects.
  • DBOS.list_queues_async() - async version of list_queues()
  • DBOSClient.register_queue_async(), retrieve_queue_async(), delete_queue_async(), list_queues(), list_queues_async() - corresponding client methods

Queue Property Getters and Setters:

Async getters:

  • queue.get_concurrency_async()
  • queue.get_worker_concurrency_async()
  • queue.get_limiter_async()
  • queue.get_priority_enabled_async()
  • queue.get_partition_queue_async()
  • queue.get_polling_interval_sec_async()

Async setters:

  • queue.set_concurrency_async()
  • queue.set_worker_concurrency_async()
  • queue.set_limiter_async()
  • queue.set_priority_enabled_async()
  • queue.set_partition_queue_async()
  • queue.set_polling_interval_sec_async()

Usage from Async Contexts:

When calling queue management methods from async code, use the async variants to avoid blocking the event loop. The synchronous methods issue warnings (or may raise errors) when called from a running asyncio event loop:

# In async code, use async methods
queue = await DBOS.register_queue_async(
    "my_queue",
    concurrency=10,
    polling_interval_sec=0.5
)

# Get and set queue properties asynchronously
current_concurrency = await queue.get_concurrency_async()
await queue.set_concurrency_async(15)

# Clean up when done
await DBOS.delete_queue_async("my_queue")

The queue.enqueue_async() method was already available and continues to work as before for enqueueing workflows asynchronously.

Delayed Workflow Enqueueing#

Workflows can be enqueued with a delay using the delay_seconds parameter in SetEnqueueOptions. When a workflow is enqueued with a delay:

  1. DELAYED Status: The workflow enters the DELAYED status rather than ENQUEUED
  2. Not Immediately Available: The workflow is not available for dequeue until the delay expires
  3. Automatic Transition: After the delay period, the workflow automatically transitions to ENQUEUED status
  4. Queue Processing: Once ENQUEUED, the workflow can be dequeued and executed normally

Example:

queue = Queue("my_queue")

@DBOS.workflow()
async def delayed_task(data: str) -> str:
    return f"Processed: {data}"

# Enqueue workflow to execute after 60 seconds
with SetEnqueueOptions(delay_seconds=60.0):
    handle = await queue.enqueue_async(delayed_task, "important_data")

# Workflow is in DELAYED status
status = handle.get_status()
assert status.status == "DELAYED"
assert status.delay_until_epoch_ms is not None

# After 60 seconds, workflow transitions to ENQUEUED and executes
result = await handle.get_result()
assert result == "Processed: important_data"

Delayed Workflow Behavior:

  • Visibility: Delayed workflows are visible in list_workflows() and list_queued_workflows() results
  • Cancellation: Delayed workflows can be cancelled before execution using cancel_workflow() or cancel_workflow_async()
  • Resume: If a delayed workflow is cancelled, it can be resumed with resume_workflow(), which transitions it immediately to ENQUEUED (bypassing the original delay)
  • wait_first(): Delayed workflows are treated as active workflows and will unblock wait_first() when they complete
  • Deduplication: Delayed workflows participate in deduplication—attempting to enqueue a duplicate workflow with the same deduplication ID while one is in DELAYED status will fail

Modifying Workflow Delays#

You can modify the delay on a DELAYED or ENQUEUED workflow using DBOS.set_workflow_delay() (sync) or DBOS.set_workflow_delay_async() (async). The method accepts the workflow_id and either delay_seconds (relative delay from now) or delay_until_epoch_ms (absolute timestamp in milliseconds). You must provide exactly one of these parameters, not both. The method only affects workflows in DELAYED or ENQUEUED status.

# Example: Enqueue with a long delay, then shorten it
with SetEnqueueOptions(delay_seconds=600.0):
    handle = queue.enqueue(my_workflow)

# Shorten the delay to 5 seconds
DBOS.set_workflow_delay(handle.workflow_id, delay_seconds=5.0)

Background Workflow Execution#

From the Hacker News Research Agent example, demonstrating how to start durable background agents and query their status:

@app.post("/agents")
def start_agent(request: AgentStartRequest):
    # Start a durable agent in the background
    DBOS.start_workflow(agentic_research_workflow, request.topic)
    return {"ok": True}

@app.get("/agents", response_model=list[AgentStatus])
async def list_agents():
    # List all active agents
    agent_workflows = await DBOS.list_workflows_async(
        name=agentic_research_workflow.__qualname__,
        sort_desc=True,
    )
    # Retrieve their statuses concurrently
    statuses = await asyncio.gather(
        *[DBOS.get_event_async(w.workflow_id, AGENT_STATUS) for w in agent_workflows]
    )
    return statuses

Scheduled Workflows#

Async Scheduled Workflows#

Async scheduled workflows are supported, as introduced in PR #493 (October 20, 2025). This enables non-blocking scheduled operations:

@DBOS.workflow()
async def my_async_workflow(scheduled_time: datetime, context: Any) -> None:
    # Workflow logic here
    pass

# Schedule at runtime
DBOS.create_schedule(
    schedule_name="my-schedule",
    workflow_fn=my_async_workflow,
    schedule="*/5 * * * * *", # Cron expression with seconds support
    context={"env": "production"},
    automatic_backfill=True, # Backfill missed executions on startup
    cron_timezone="America/New_York", # Evaluate cron in specific timezone
)

Scheduled vs. Delayed Workflows#

DBOS Python provides two distinct mechanisms for deferred workflow execution:

Delayed Enqueueing (via delay_seconds):

  • Use Case: One-time delayed execution of a workflow
  • Configuration: Specify delay_seconds in SetEnqueueOptions or EnqueueOptions
  • Status: Workflows enter DELAYED status and automatically transition to ENQUEUED after the delay
  • Persistence: The delay is recorded with the workflow instance; once executed, it's complete
  • Example: Retry a failed operation after 5 minutes, send a reminder email 1 hour from now

Scheduled Workflows (via cron expressions):

  • Use Case: Recurring execution on a schedule
  • Configuration: Use create_schedule() with a cron expression
  • Status: Each scheduled invocation creates a new workflow instance
  • Persistence: The schedule persists in the database and continues firing until explicitly deleted
  • Example: Run a nightly cleanup job, send daily reports, poll an API every 15 minutes

When to Use Each:

  • Use delayed enqueueing when you need to execute a specific workflow instance after a delay (e.g., implementing exponential backoff, scheduling a one-time reminder)
  • Use scheduled workflows when you need recurring, time-based execution (e.g., periodic maintenance, regular data synchronization, daily batch processing)

Schedule Configuration#

The create_schedule() and create_schedule_async() methods accept several parameters:

  • schedule_name: Unique name for the schedule
  • workflow_fn: The workflow function to invoke (must accept (datetime, context))
  • schedule: Cron expression with seconds support (6 fields)
  • context: Optional context object passed to every invocation (defaults to None)
  • automatic_backfill: Optional boolean (defaults to False). When enabled, the scheduler automatically backfills missed executions on startup based on when the schedule last fired.
  • cron_timezone: Optional IANA timezone name (e.g., "America/New_York", "Europe/London", "Asia/Tokyo"). When specified, the cron expression is evaluated in that timezone instead of UTC (the default).

Last Fired Tracking#

The scheduler tracks when each schedule last executed successfully via the last_fired_at field. This timestamp enables the automatic backfill feature and helps with debugging and monitoring scheduled workflows. After each successful execution, last_fired_at is updated to the scheduled execution time.

Timezone-Aware Scheduling#

While UTC remains the default timezone for cron evaluation, the cron_timezone parameter allows async workflows to be scheduled according to local business hours in specific regions. This is especially useful for workflows that interact with time-sensitive external systems or services in specific geographical locations.

For example, a workflow that needs to run at midnight Eastern Time every day can be configured as:

DBOS.create_schedule(
    schedule_name="daily-midnight-eastern",
    workflow_fn=daily_task,
    schedule="0 0 * * *", # Midnight
    cron_timezone="America/New_York",
)

The scheduler will automatically handle daylight saving time transitions for the specified timezone.

Automatic Backfill#

When automatic_backfill is enabled, the scheduler will automatically enqueue any schedule instances that were missed while the service was down. On startup, the scheduler checks the last_fired_at timestamp for each schedule with automatic backfill enabled. If last_fired_at is set and is in the past, the scheduler backfills all missed executions between that time and the current time before resuming normal scheduled operation.

This ensures that scheduled workflows don't miss important executions due to service downtime or restarts.

Management APIs#

Dynamic scheduling provides extensive management capabilities:

# List all schedules
schedules = DBOS.list_schedules()

# Pause/resume schedules
DBOS.pause_schedule("my-schedule")
DBOS.resume_schedule("my-schedule")

# Trigger immediate execution
DBOS.trigger_schedule("my-schedule")

# Delete a schedule
DBOS.delete_schedule("my-schedule")

# Backfill missed executions
DBOS.backfill_schedule(
    schedule_name="my-schedule",
    start_time=datetime(2026, 1, 1),
    end_time=datetime(2026, 1, 31),
)

Best Practices#

Use Async Context Methods#

Coroutine workflows should always use async versions of DBOS context methods:

  • DBOS.sleep_async() instead of DBOS.sleep()
  • DBOS.send_async() instead of DBOS.send()
  • DBOS.recv_async() instead of DBOS.recv()
  • DBOS.start_workflow_async() instead of DBOS.start_workflow()

Maintain Determinism#

Even async workflows must be deterministic. All non-deterministic operations (database access, API calls, random numbers, time) should be performed in steps, not directly in workflow functions.

Error Handling with asyncio.gather#

Always specify return_exceptions=True when using asyncio.gather():

results = await asyncio.gather(*tasks, return_exceptions=True)

Handle Synchronous Code#

Use asyncio.to_thread() to run synchronous functions from async workflows:

result = await asyncio.to_thread(sync_function, arg1, arg2)

Deterministic Ordering for Parallel Steps#

When running async steps in parallel, they must be started in a well-defined sequence before awaiting. Non-deterministic ordering (where execution depends on timing of previous steps) is not allowed.

Recovery and Error Handling#

Automatic Recovery#

If an async workflow is interrupted, it automatically resumes from the last completed step upon restart, maintaining the same recovery guarantees as synchronous workflows.

Workflow recovery is demonstrated in tests:

@DBOS.workflow()
async def test_workflow(var: str, var2: str) -> str:
    output = await test_step(var2)
    return var

# Execute workflow
with SetWorkflowID(workflow_id):
    assert (await test_workflow(value, value)) == value

# Simulate crash
# ... (set status to PENDING in database)

# Recover workflow
handles = DBOS._recover_pending_workflows()
assert recovered_result

Timeout Handling#

Timeout handling example:

@DBOS.workflow()
async def blocked_workflow() -> None:
    while True:
        await DBOS.sleep_async(0.1)

with SetWorkflowTimeout(0.1):
    with pytest.raises(DBOSAwaitedWorkflowCancelledError):
        await blocked_workflow()

Task Cancellation Protection#

Task cancellation protection ensures workflows complete despite caller cancellation:

@DBOS.workflow()
async def test_workflow(duration: float) -> str:
    await DBOS.sleep_async(duration)
    return "completed"

async def run_workflow_task() -> str:
    with SetWorkflowID(wfid):
        handle = await DBOS.start_workflow_async(test_workflow, 1.0)
    return await handle.get_result()

task = asyncio.create_task(run_workflow_task())
task.cancel() # Cancel the task

# Workflow completes despite task cancellation
handle = await DBOS.retrieve_workflow_async(wfid)
assert await handle.get_result() == "completed"

Workflow Handle Resilience After Cancellation#

When a workflow is cancelled and later resumed, the original workflow handle automatically reflects the resumed workflow's execution. Both WorkflowHandle.get_result() and WorkflowHandleAsync.get_result() catch cancellation exceptions (DBOSWorkflowCancelledError and DBOSAwaitedWorkflowCancelledError) and automatically poll the system database to retrieve the workflow's final result. If the workflow was resumed after cancellation, the handle returns the result of the resumed execution instead of raising a cancellation exception.

Implementation Details:

When get_result() encounters a cancellation exception from the underlying future or task, it catches the exception and invokes await_workflow_result() (for sync handles) or await_workflow_result_async() (for async handles) to check the database for the workflow's current status. This allows the handle to transparently recover when workflows are cancelled and then resumed.

This behavior makes workflow handles more resilient—they reflect the eventual state of the workflow, including any resumptions, rather than just the initial execution attempt:

@DBOS.workflow()
async def simple_workflow(x: int) -> int:
    await step_one(x)
    await step_two(x)
    return x

# Start workflow and cancel it
wfid = str(uuid.uuid4())
with SetWorkflowID(wfid):
    cancelled_handle = await DBOS.start_workflow_async(simple_workflow, 42)

await DBOS.cancel_workflow_async(wfid)

# First get_result() call will raise DBOSAwaitedWorkflowCancelledError
with pytest.raises(DBOSAwaitedWorkflowCancelledError):
    await cancelled_handle.get_result()

# Resume the workflow
resume_handle = await DBOS.resume_workflow_async(wfid)
assert (await resume_handle.get_result()) == 42

# The original cancelled_handle now returns the resumed workflow's result
# by catching the cancellation error and checking the database
assert (await cancelled_handle.get_result()) == 42

Note: A cancellation exception (DBOSWorkflowCancelledError or DBOSAwaitedWorkflowCancelledError) is only raised if the workflow remains cancelled and is not subsequently resumed.

Limitations and Considerations#

  1. Transactions Don't Support Async: At this time, DBOS does not support coroutine transactions. Decorating an async def function with @DBOS.transaction will raise an error at runtime.

  2. Event Loop Management: Calling a coroutine workflow or starting it with DBOS.start_workflow_async runs it in the same event loop as the caller. Enqueueing with enqueue_async starts the workflow in a different event loop.

  3. Deterministic Ordering Required: When running async steps in parallel, they must be started in a well-defined sequence before awaiting. Non-deterministic ordering (where execution depends on timing of previous steps) is not allowed.

  4. Serialization Requirements: Like synchronous workflows, async workflow inputs/outputs and step outputs must be serializable (default is pickle, customizable).

  5. Schedule Persistence: Removing code decorators does not stop dynamic schedules—they persist in the database. You must explicitly call DBOS.delete_schedule() to remove a schedule.

Async Context Methods Reference#

DBOS provides async versions of most workflow management methods, including:

Messaging:

Workflow Management:

Events and Streams:

Timing:

All async context methods return coroutines that must be awaited in async functions.

send / send_async#

Signature (async):

DBOS.send_async(
    destination_id: str,
    message: Any,
    topic: Optional[str] = None,
    *,
    idempotency_key: Optional[str] = None,
    serialization_type: Optional[WorkflowSerializationFormat] = WorkflowSerializationFormat.DEFAULT,
    send_to_forks: bool = False
) -> None

Signature (sync):

DBOS.send(
    destination_id: str,
    message: Any,
    topic: Optional[str] = None,
    *,
    idempotency_key: Optional[str] = None,
    serialization_type: Optional[WorkflowSerializationFormat] = WorkflowSerializationFormat.DEFAULT,
    send_to_forks: bool = False
) -> None

Send a message to a workflow identified by destination_id. The message can optionally be associated with a topic for filtered receiving. Messages can be sent from workflows, steps, or outside workflow contexts.

Parameters:

  • destination_id: The workflow ID to send the message to.
  • message: The message payload to send.
  • topic: Optional topic string for message filtering. If not specified, uses a default topic.
  • idempotency_key: Optional unique identifier for idempotent message delivery. If provided, duplicate sends with the same key are ignored. Messages are deduplicated based on the message_uuid (which is set to idempotency_key if provided, or auto-generated otherwise).
  • serialization_type: Optional serialization format for the message.
  • send_to_forks: If True, the message is also delivered to all workflows recursively forked from destination_id. Defaults to False.

Implementation Details:

When called from within a workflow context, send() records the message as a workflow step, leveraging the workflow's built-in idempotency guarantees. When called outside a workflow context (e.g., from a non-workflow function or external code), send() directly inserts the message into the database with an idempotent constraint on message_uuid.

Prior to this implementation, send() calls outside workflow contexts created temporary workflows, but this approach has been removed in favor of direct database insertion. Messages are now marked as "consumed" rather than deleted when received, enabling the use of message_uuid as an idempotency key.

send_to_forks with idempotency_key:

When using send_to_forks=True with an idempotency_key, the system derives a distinct, deterministic message_uuid per destination (format: {idempotency_key}::{destination_id}) to ensure each fork receives the message while maintaining idempotency on replay.

DBOSClient methods:

The same methods are available on DBOSClient with identical signatures:

  • DBOSClient.send()
  • DBOSClient.send_async()

send_bulk / send_bulk_async#

Signature (async):

DBOS.send_bulk_async(
    messages: List[SendMessage],
    *,
    serialization_type: Optional[WorkflowSerializationFormat] = WorkflowSerializationFormat.DEFAULT,
    send_to_forks: bool = False
) -> None

Signature (sync):

DBOS.send_bulk(
    messages: List[SendMessage],
    *,
    serialization_type: Optional[WorkflowSerializationFormat] = WorkflowSerializationFormat.DEFAULT,
    send_to_forks: bool = False
) -> None

Send multiple messages to workflow executions in a single transaction. This is more efficient than calling send() repeatedly when you need to deliver multiple messages at once.

Parameters:

  • messages: List of SendMessage objects. Each SendMessage has fields:
    • destination_id: str - The workflow ID to send the message to
    • message: Any - The message payload
    • topic: Optional[str] = None - Optional topic for message filtering
    • idempotency_key: Optional[str] = None - Optional unique identifier for idempotent delivery
  • serialization_type: Optional serialization format for all messages.
  • send_to_forks: If True, every message is also delivered to all workflows recursively forked from its destination. Defaults to False.

Behavior:

All messages in a send_bulk() call are delivered atomically in a single database transaction. If any destination is invalid or if the operation fails, the entire batch is rolled back and no messages are delivered.

Idempotency:

Each message can have its own idempotency_key for independent deduplication. However, duplicate idempotency_key values within the same bulk send operation will raise a DBOSException before the transaction starts.

send_to_forks with idempotency_key:

When using send_to_forks=True with an idempotency_key, the system derives a distinct, deterministic message_uuid per destination (format: {idempotency_key}::{destination_id}) to ensure each fork receives the message while maintaining idempotency on replay.

Example:

from dbos import SendMessage

# Send multiple messages to different workflows atomically
DBOS.send_bulk([
    SendMessage("workflow-1", {"status": "ready"}, topic="status"),
    SendMessage("workflow-2", {"status": "ready"}, topic="status"),
    SendMessage("workflow-3", {"action": "start"}),
])

# Async version
await DBOS.send_bulk_async([
    SendMessage("workflow-1", {"status": "ready"}),
    SendMessage("workflow-2", {"status": "ready"}),
])

# Send to a workflow and all its forks
await DBOS.send_bulk_async(
    [SendMessage("parent-workflow", {"broadcast": "message"})],
    send_to_forks=True
)

DBOSClient methods:

The same methods are available on DBOSClient with identical signatures:

  • DBOSClient.send_bulk()
  • DBOSClient.send_bulk_async()

wait_first / wait_first_async#

Signature (async):

DBOS.wait_first_async(
    handles: List[WorkflowHandleAsync[Any]], 
    *, 
    polling_interval_sec: float = DEFAULT_POLLING_INTERVAL
) -> WorkflowHandleAsync[Any]

Signature (sync):

DBOS.wait_first(
    handles: List[WorkflowHandle[Any]], 
    *, 
    polling_interval_sec: float = DEFAULT_POLLING_INTERVAL
) -> WorkflowHandle[Any]

Wait for any one of the given workflow handles to complete and return the first handle that completes. These methods poll the database until at least one workflow's status is no longer PENDING, ENQUEUED, or DELAYED, then return the corresponding handle.

Parameters:

  • handles: List of workflow handles to wait on. Must not be empty and must not contain duplicate workflow IDs.
  • polling_interval_sec: Optional polling interval for checking workflow completion status. Defaults to DEFAULT_POLLING_INTERVAL.

Returns:
The first workflow handle that completes.

Raises:
ValueError if the handles list is empty or contains duplicate workflow IDs.

Example - Racing workflows:

@DBOS.workflow()
async def fetch_from_api_a() -> str:
    # Fetch data from API A
    return await some_async_call()

@DBOS.workflow()
async def fetch_from_api_b() -> str:
    # Fetch data from API B
    return await another_async_call()

# Start both workflows concurrently
handle_a = await DBOS.start_workflow_async(fetch_from_api_a)
handle_b = await DBOS.start_workflow_async(fetch_from_api_b)

# Use whichever completes first
first_completed = await DBOS.wait_first_async([handle_a, handle_b])
result = await first_completed.get_result()

Example - Processing tasks as they complete:

@DBOS.workflow()
def process_tasks() -> List[str]:
    # Start multiple concurrent tasks
    handles = [queue.enqueue(process_task, i) for i in range(5)]

    # Process results as tasks complete (not in start order)
    results = []
    remaining = list(handles)
    while remaining:
        completed = DBOS.wait_first(remaining)
        results.append(completed.get_result())
        remaining = [h for h in remaining if h.workflow_id != completed.workflow_id]

    return results

cancel_workflow / cancel_workflow_async#

Signature (async):

DBOS.cancel_workflow_async(workflow_id: str) -> None

Signature (sync):

DBOS.cancel_workflow(workflow_id: str) -> None

Cancel a workflow by ID. The workflow's status is set to CANCELLED and it is removed from any queue, but only if the workflow is not already complete.

Parameters:

  • workflow_id: The workflow ID to cancel.

Example:

# Synchronous cancellation
DBOS.cancel_workflow(workflow_id)

# Asynchronous cancellation
await DBOS.cancel_workflow_async(workflow_id)

Cancellation Behavior:

By default, when a workflow is cancelled, step execution is not preempted. The workflow continues running until its current step completes, then is preempted at the start of the next step. For async steps that may run for an extended duration, you can use the preemptible option to change this behavior:

@DBOS.step(preemptible=True)
async def long_running_step():
    # This step will be cancelled immediately (or after a short polling interval)
    # if its workflow is cancelled
    await asyncio.sleep(60)
    return "done"

When preemptible=True, the step and workflow are cancelled immediately (or after a short polling interval) when the workflow is cancelled, rather than waiting for the step to complete.

cancel_workflows / cancel_workflows_async#

Signature (async):

DBOS.cancel_workflows_async(workflow_ids: List[str]) -> None

Signature (sync):

DBOS.cancel_workflows(workflow_ids: List[str]) -> None

Cancel multiple workflows at once by providing a list of workflow IDs. This is more efficient than calling cancel_workflow() repeatedly for each workflow. All specified workflows will have their status set to CANCELLED and be removed from any queues, but only if they are not already complete.

Parameters:

  • workflow_ids: List of workflow IDs to cancel.

Example:

# Cancel multiple workflows efficiently
workflow_ids = [wf1_id, wf2_id, wf3_id]
DBOS.cancel_workflows(workflow_ids)

# Async version
await DBOS.cancel_workflows_async(workflow_ids)

resume_workflow / resume_workflow_async#

Signature (async):

DBOS.resume_workflow_async(
    workflow_id: str,
    *,
    queue_name: Optional[str] = None,
) -> WorkflowHandleAsync[Any]

Signature (sync):

DBOS.resume_workflow(
    workflow_id: str,
    *,
    queue_name: Optional[str] = None,
) -> WorkflowHandle[Any]

Resume a cancelled workflow by ID. The workflow's status is set to ENQUEUED and its recovery attempts and deadline are cleared, but only if the workflow is not already complete. Returns a handle to the resumed workflow.

Parameters:

  • workflow_id: The workflow ID to resume.
  • queue_name: Optional queue name to enqueue the resumed workflow to. If not specified, the workflow is enqueued to the internal queue.

Returns:
A workflow handle for the resumed workflow.

Example:

# Resume a cancelled workflow to the internal queue
handle = DBOS.resume_workflow(workflow_id)
result = handle.get_result()

# Resume to a specific queue
handle = DBOS.resume_workflow(workflow_id, queue_name="my_queue")
result = handle.get_result()

# Async version
handle = await DBOS.resume_workflow_async(workflow_id, queue_name="my_queue")
result = await handle.get_result()

resume_workflows / resume_workflows_async#

Signature (async):

DBOS.resume_workflows_async(
    workflow_ids: List[str],
    *,
    queue_name: Optional[str] = None,
) -> List[WorkflowHandleAsync[Any]]

Signature (sync):

DBOS.resume_workflows(
    workflow_ids: List[str],
    *,
    queue_name: Optional[str] = None,
) -> List[WorkflowHandle[Any]]

Resume multiple cancelled workflows at once by providing a list of workflow IDs. This is more efficient than calling resume_workflow() repeatedly for each workflow. All specified workflows will have their status set to ENQUEUED and their recovery attempts and deadline cleared, but only if they are not already complete. Returns a list of handles corresponding to the resumed workflows.

Parameters:

  • workflow_ids: List of workflow IDs to resume.
  • queue_name: Optional queue name to enqueue the resumed workflows to. If not specified, workflows are enqueued to the internal queue.

Returns:
A list of workflow handles for the resumed workflows, in the same order as the input workflow IDs.

Example:

# Resume multiple workflows to the internal queue
workflow_ids = [wf1_id, wf2_id, wf3_id]
handles = DBOS.resume_workflows(workflow_ids)
results = [h.get_result() for h in handles]

# Resume to a specific queue
handles = DBOS.resume_workflows(workflow_ids, queue_name="my_queue")
results = [h.get_result() for h in handles]

# Async version
handles = await DBOS.resume_workflows_async(workflow_ids, queue_name="my_queue")
results = [await h.get_result() for h in handles]

delete_workflow / delete_workflow_async#

Signature (async):

DBOS.delete_workflow_async(
    workflow_id: str, 
    *, 
    delete_children: bool = False
) -> None

Signature (sync):

DBOS.delete_workflow(
    workflow_id: str, 
    *, 
    delete_children: bool = False
) -> None

Delete a workflow and all its associated data by ID. If delete_children is True, also recursively deletes all child workflows.

Parameters:

  • workflow_id: The workflow ID to delete.
  • delete_children: If True, also delete all child workflows recursively. Defaults to False.

Example:

# Delete a workflow and all its children
DBOS.delete_workflow(workflow_id, delete_children=True)

# Async version
await DBOS.delete_workflow_async(workflow_id, delete_children=True)

delete_workflows / delete_workflows_async#

Signature (async):

DBOS.delete_workflows_async(
    workflow_ids: List[str], 
    *, 
    delete_children: bool = False
) -> None

Signature (sync):

DBOS.delete_workflows(
    workflow_ids: List[str], 
    *, 
    delete_children: bool = False
) -> None

Delete multiple workflows and all their associated data at once by providing a list of workflow IDs. This is more efficient than calling delete_workflow() repeatedly for each workflow. If delete_children is True, also recursively deletes all child workflows of each specified workflow.

Parameters:

  • workflow_ids: List of workflow IDs to delete.
  • delete_children: If True, also delete all child workflows of each specified workflow recursively. Defaults to False.

Example:

# Delete multiple workflows efficiently
workflow_ids = [wf1_id, wf2_id, wf3_id]
DBOS.delete_workflows(workflow_ids)

# Delete workflows and all their children
DBOS.delete_workflows(workflow_ids, delete_children=True)

# Async version
await DBOS.delete_workflows_async(workflow_ids, delete_children=True)

fork_workflow / fork_workflow_async#

Signature (async):

DBOS.fork_workflow_async(
    workflow_id: str,
    start_step: int,
    *,
    application_version: Optional[str] = None,
    queue_name: Optional[str] = None,
    queue_partition_key: Optional[str] = None,
) -> WorkflowHandleAsync[Any]

Signature (sync):

DBOS.fork_workflow(
    workflow_id: str,
    start_step: int,
    *,
    application_version: Optional[str] = None,
    queue_name: Optional[str] = None,
    queue_partition_key: Optional[str] = None,
) -> WorkflowHandle[Any]

Restart a workflow with a new workflow ID from a specific step. This creates a new workflow instance that begins execution from the specified step, using the original workflow's inputs and state.

Parameters:

  • workflow_id: The original workflow ID to fork from.
  • start_step: The step number from which to start the forked workflow.
  • application_version: Optional application version for the forked workflow. If not specified, uses the current application version.
  • queue_name: Optional queue name to enqueue the forked workflow to. If not specified, the workflow is enqueued to the internal queue.
  • queue_partition_key: Optional partition key for the forked workflow within the specified queue. Only used if queue_name is also specified.

Returns:
A workflow handle for the forked workflow.

Example:

# Fork a workflow from step 2 to the internal queue
forked_handle = DBOS.fork_workflow(original_workflow_id, 2)
result = forked_handle.get_result()

# Fork to a specific queue with a partition key
forked_handle = DBOS.fork_workflow(
    original_workflow_id,
    2,
    queue_name="my_queue",
    queue_partition_key="partition_1"
)
result = forked_handle.get_result()

# Async version
forked_handle = await DBOS.fork_workflow_async(
    original_workflow_id,
    2,
    queue_name="my_queue",
    queue_partition_key="partition_1"
)
result = await forked_handle.get_result()

set_workflow_delay / set_workflow_delay_async#

Signature (async):

DBOS.set_workflow_delay_async(
    workflow_id: str,
    *,
    delay_seconds: Optional[float] = None,
    delay_until_epoch_ms: Optional[int] = None,
) -> None

Signature (sync):

DBOS.set_workflow_delay(
    workflow_id: str,
    *,
    delay_seconds: Optional[float] = None,
    delay_until_epoch_ms: Optional[int] = None,
) -> None

Set or update the delay on a workflow. Provide exactly one of delay_seconds (relative) or delay_until_epoch_ms (absolute). Only affects DELAYED or ENQUEUED workflows.

Parameters:

  • workflow_id: The workflow ID whose delay should be modified.
  • delay_seconds: Optional relative delay in seconds from now. If specified, the workflow will be delayed until current_time + delay_seconds.
  • delay_until_epoch_ms: Optional absolute timestamp in milliseconds since epoch. If specified, the workflow will be delayed until this exact time.

Returns:
None

Example:

# Enqueue with a long delay, then shorten it
with SetEnqueueOptions(delay_seconds=600.0):
    handle = queue.enqueue(my_workflow)

# Shorten the delay to 5 seconds using relative time
DBOS.set_workflow_delay(handle.workflow_id, delay_seconds=5.0)

# Or set an absolute timestamp (500ms from now)
soon = int(time.time() * 1000) + 500
DBOS.set_workflow_delay(handle.workflow_id, delay_until_epoch_ms=soon)

# Async version
await DBOS.set_workflow_delay_async(handle.workflow_id, delay_seconds=5.0)

Relevant Code Files#

FileDescriptionURL
dbos/_core.pyCore workflow and step execution logic, including async/sync detection and execution pathsView
dbos/_dbos.pyMain DBOS class with workflow, step, and transaction decoratorsView
dbos/_event_loop.pyBackgroundEventLoop implementation for managing async executionView
dbos/_scheduler.pyDynamic scheduler implementation for scheduled workflowsView
tests/test_async.pyComprehensive test suite for async workflow patternsView
tests/test_concurrency.pyTests for concurrent async operations and patternsView
  • Workflow Durability and Recovery: How DBOS maintains workflow state and recovers from failures
  • Workflow Scheduling: Cron-based scheduling of workflows with async support
  • Queues: Enqueuing workflows for background execution
  • Steps: Reusable operations within workflows with retry logic
  • Transactions: Database transactions for ACID guarantees (sync-only)